Skip to content
Snippets Groups Projects

Draft: change API to expose smux.OpenStream()

Closed WofWca requested to merge WofWca/snowflake:expose-smux-openstream into main
4 files
+ 48
57
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 18
14
@@ -172,7 +172,7 @@ func NewSnowflakeClient(config ClientConfig) (*Transport, error) {
// Dial starts the collection of snowflakes and returns a SnowflakeConn that is a
// wrapper around a smux.Stream that will reliably deliver data to a Snowflake
// server through one or more snowflake proxies.
func (t *Transport) Dial() (net.Conn, error) {
func (t *Transport) Dial() (*SnowflakeConn, error) {
// Cleanup functions to run before returning, in case of an error.
var cleanup []func()
defer func() {
@@ -207,17 +207,17 @@ func (t *Transport) Dial() (net.Conn, error) {
})
// On the smux session we overlay a stream.
stream, err := sess.OpenStream()
if err != nil {
return nil, err
}
// Begin exchanging data.
log.Printf("---- SnowflakeConn: begin stream %v ---", stream.ID())
cleanup = append(cleanup, func() { stream.Close() })
// stream, err := sess.OpenStream()
// if err != nil {
// return nil, err
// }
// // Begin exchanging data.
// log.Printf("---- SnowflakeConn: begin stream %v ---", stream.ID())
// cleanup = append(cleanup, func() { stream.Close() })
// All good, clear the cleanup list.
cleanup = nil
return &SnowflakeConn{Stream: stream, sess: sess, pconn: pconn, snowflakes: snowflakes}, nil
return &SnowflakeConn{Sess: sess, pconn: pconn, snowflakes: snowflakes}, nil
}
func (t *Transport) AddSnowflakeEventListener(receiver event.SnowflakeEventReceiver) {
@@ -235,26 +235,30 @@ func (t *Transport) SetRendezvousMethod(r RendezvousMethod) {
// SnowflakeConn is a reliable connection to a snowflake server that implements net.Conn.
type SnowflakeConn struct {
*smux.Stream
sess *smux.Session
// *smux.Stream
Sess *smux.Session
pconn net.PacketConn
snowflakes *Peers
}
func (conn *SnowflakeConn) OpenStream() (*smux.Stream, error) {
return conn.Sess.OpenStream()
}
// Close closes the connection.
//
// The collection of snowflake proxies for this connection is stopped.
func (conn *SnowflakeConn) Close() error {
var err error
log.Printf("---- SnowflakeConn: closed stream %v ---", conn.ID())
err = conn.Stream.Close()
// log.Printf("---- SnowflakeConn: closed stream %v ---", conn.ID())
// err = conn.Stream.Close()
log.Printf("---- SnowflakeConn: end collecting snowflakes ---")
conn.snowflakes.End()
if inerr := conn.pconn.Close(); err == nil {
err = inerr
}
log.Printf("---- SnowflakeConn: discarding finished session ---")
if inerr := conn.sess.Close(); err == nil {
if inerr := conn.Sess.Close(); err == nil {
err = inerr
}
return err
Loading