core: Make outbound contact connections
Refactor the connection management on contacts to have a goroutine responsible for tracking the state of a contact's connection, launching and canceling outbound attempts when appropriate, etc.
This commit is contained in:
		
							parent
							
								
									fc0c6b3c95
								
							
						
					
					
						commit
						f56204116c
					
				
							
								
								
									
										203
									
								
								core/contact.go
								
								
								
								
							
							
						
						
									
										203
									
								
								core/contact.go
								
								
								
								
							|  | @ -5,6 +5,7 @@ import ( | |||
| 	protocol "github.com/s-rah/go-ricochet" | ||||
| 	"github.com/special/notricochet/core/utils" | ||||
| 	"github.com/special/notricochet/rpc" | ||||
| 	"golang.org/x/net/context" | ||||
| 	"log" | ||||
| 	"strconv" | ||||
| 	"strings" | ||||
|  | @ -25,15 +26,19 @@ type Contact struct { | |||
| 	mutex  sync.Mutex | ||||
| 	events *utils.Publisher | ||||
| 
 | ||||
| 	connection *protocol.OpenConnection | ||||
| 	connection        *protocol.OpenConnection | ||||
| 	connChannel       chan *protocol.OpenConnection | ||||
| 	connClosedChannel chan struct{} | ||||
| } | ||||
| 
 | ||||
| func ContactFromConfig(core *Ricochet, id int, data ConfigContact, events *utils.Publisher) (*Contact, error) { | ||||
| 	contact := &Contact{ | ||||
| 		core:   core, | ||||
| 		id:     id, | ||||
| 		data:   data, | ||||
| 		events: events, | ||||
| 		core:              core, | ||||
| 		id:                id, | ||||
| 		data:              data, | ||||
| 		events:            events, | ||||
| 		connChannel:       make(chan *protocol.OpenConnection), | ||||
| 		connClosedChannel: make(chan struct{}), | ||||
| 	} | ||||
| 
 | ||||
| 	if id < 0 { | ||||
|  | @ -42,6 +47,10 @@ func ContactFromConfig(core *Ricochet, id int, data ConfigContact, events *utils | |||
| 		return nil, fmt.Errorf("Invalid contact hostname '%s", data.Hostname) | ||||
| 	} | ||||
| 
 | ||||
| 	// XXX Should have some global trigger that starts all contact connections
 | ||||
| 	// at the right time
 | ||||
| 	go contact.contactConnection() | ||||
| 
 | ||||
| 	return contact, nil | ||||
| } | ||||
| 
 | ||||
|  | @ -101,7 +110,128 @@ func (c *Contact) Data() *ricochet.Contact { | |||
| 	return data | ||||
| } | ||||
| 
 | ||||
| func (c *Contact) SetConnection(conn *protocol.OpenConnection) error { | ||||
| // Goroutine to handle the protocol connection for a contact.
 | ||||
| // Responsible for making outbound connections and taking over authenticated
 | ||||
| // inbound connections, running protocol handlers on the active connection, and
 | ||||
| // reacting to connection loss. Nothing else may write Contact.connection.
 | ||||
| func (c *Contact) contactConnection() { | ||||
| 	// XXX Should the protocol continue handling its own goroutines?
 | ||||
| 	// I'm thinking the protocol API design I want is:
 | ||||
| 	//   - "handler" assigned per-connection
 | ||||
| 	//   - each inbound listener has its own handler also, assigns for conns
 | ||||
| 	//   - so, non-authed connection has a handler that _can't_ do anything other than auth
 | ||||
| 	//   - inbound contact req connection gets a special handler for that case
 | ||||
| 	//   - authenticated contact conns get handler changed here
 | ||||
| 	// It's probably more sensible to never break the conn read loop, because of buffering etc
 | ||||
| 	// So likely we don't want to change that goroutine. Could still use a channel to pass it
 | ||||
| 	// to handler for parsing, which could let it go on any goroutine we want, if it's desirable
 | ||||
| 	// to put it on e.g. this routine. Hmm.
 | ||||
| 
 | ||||
| 	for { | ||||
| 		// If there is no active connection, spawn an outbound connector.
 | ||||
| 		// A successful connection is returned via connChannel; otherwise, it will keep trying.
 | ||||
| 		var outboundCtx context.Context | ||||
| 		outboundCancel := func() {} | ||||
| 		if c.connection == nil { | ||||
| 			outboundCtx, outboundCancel = context.WithCancel(context.Background()) | ||||
| 			go c.connectOutbound(outboundCtx, c.connChannel) | ||||
| 		} | ||||
| 
 | ||||
| 		select { | ||||
| 		case conn, ok := <-c.connChannel: | ||||
| 			outboundCancel() | ||||
| 			if !ok { | ||||
| 				// Closing connChannel exits this connection routine, for contact
 | ||||
| 				// deletion, exit, or some other case.
 | ||||
| 				break | ||||
| 			} else if conn == nil { | ||||
| 				// Signal used to restart outbound connection attempts
 | ||||
| 				continue | ||||
| 			} | ||||
| 
 | ||||
| 			// Received a new connection. The connection is already authenticated and ready to use.
 | ||||
| 			// This connection may be inbound or outbound. setConnection will decide whether to
 | ||||
| 			// replace an existing connection.
 | ||||
| 
 | ||||
| 			// XXX Tweak setConnection logic if needed
 | ||||
| 			// Logic when keeping connection, need to make sure protocol is going, etc...
 | ||||
| 			if err := c.setConnection(conn); err != nil { | ||||
| 				log.Printf("Discarded new contact %s connection: %s", c.Address(), err) | ||||
| 				if !conn.Closed && conn != c.connection { | ||||
| 					conn.Close() | ||||
| 				} | ||||
| 				continue | ||||
| 			} | ||||
| 
 | ||||
| 		case <-c.connClosedChannel: | ||||
| 			outboundCancel() | ||||
| 			c.clearConnection(nil) | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	log.Printf("Exiting contact connection loop for %s", c.Address()) | ||||
| 	c.clearConnection(nil) | ||||
| } | ||||
| 
 | ||||
| // Attempt an outbound connection to the contact, retrying automatically using OnionConnector.
 | ||||
| // This function _must_ send something to connChannel before returning, unless the context has
 | ||||
| // been cancelled.
 | ||||
| func (c *Contact) connectOutbound(ctx context.Context, connChannel chan *protocol.OpenConnection) { | ||||
| 	c.mutex.Lock() | ||||
| 	connector := OnionConnector{ | ||||
| 		Network:     c.core.Network, | ||||
| 		NeverGiveUp: true, | ||||
| 	} | ||||
| 	hostname := c.data.Hostname | ||||
| 	c.mutex.Unlock() | ||||
| 
 | ||||
| 	for { | ||||
| 		conn, err := connector.Connect(hostname+":9878", ctx) | ||||
| 		if err != nil { | ||||
| 			// The only failure here should be context, because NeverGiveUp
 | ||||
| 			// is set, but be robust anyway.
 | ||||
| 			if ctx.Err() != nil { | ||||
| 				return | ||||
| 			} | ||||
| 
 | ||||
| 			log.Printf("Contact connection failure: %s", err) | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
| 		log.Printf("Successful outbound connection to contact %s", hostname) | ||||
| 		oc, err := c.core.Protocol.ConnectOpen(conn, hostname[0:16]) | ||||
| 		if err != nil { | ||||
| 			log.Printf("Contact connection protocol failure: %s", err) | ||||
| 			oc.Close() | ||||
| 			// XXX These failures are probably not worth retrying so much,
 | ||||
| 			// but that would need to be investigated. For now, just do the
 | ||||
| 			// same backoff behavior.
 | ||||
| 			if err := connector.Backoff(ctx); err != nil { | ||||
| 				return | ||||
| 			} | ||||
| 			continue | ||||
| 		} else { | ||||
| 			log.Printf("Protocol connection open: %v", oc) | ||||
| 			// XXX Protocol API needs to be reworked; see notes in
 | ||||
| 			// contactConnection. Ideally we should authenticate here and
 | ||||
| 			// pass the conneciton back, but for now this can do nothing:
 | ||||
| 			// the connection will either succeed and come in via the
 | ||||
| 			// protocol handler, or will be closed and signalled via
 | ||||
| 			// OnConnectionClosed. Alternatively, it will break because this
 | ||||
| 			// is fragile and dumb.
 | ||||
| 			// XXX BUG: This means no backoff for authentication failure
 | ||||
| 			return | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (c *Contact) setConnection(conn *protocol.OpenConnection) error { | ||||
| 	if conn.Client { | ||||
| 		log.Printf("Contact %s has a new outbound connection", c.Address()) | ||||
| 	} else { | ||||
| 		log.Printf("Contact %s has a new inbound connection", c.Address()) | ||||
| 	} | ||||
| 
 | ||||
| 	c.mutex.Lock() | ||||
| 
 | ||||
| 	if conn == c.connection { | ||||
|  | @ -138,7 +268,6 @@ func (c *Contact) SetConnection(conn *protocol.OpenConnection) error { | |||
| 	// if the outbound connection will lose the fallback comparison above.
 | ||||
| 	// XXX implement this
 | ||||
| 
 | ||||
| 	// XXX react to connection state changes
 | ||||
| 	c.connection = conn | ||||
| 	c.status = ricochet.Contact_ONLINE | ||||
| 	log.Printf("Assigned connection %v to contact %v", c.connection, c) | ||||
|  | @ -164,6 +293,41 @@ func (c *Contact) SetConnection(conn *protocol.OpenConnection) error { | |||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Close and clear state related to the active contact connection.
 | ||||
| // If ifConn is non-nil, the active connection is only cleared if
 | ||||
| // it is the same as ifConn. Returns true if cleared.
 | ||||
| func (c *Contact) clearConnection(ifConn *protocol.OpenConnection) bool { | ||||
| 	c.mutex.Lock() | ||||
| 	if c.connection == nil || (ifConn != nil && c.connection != ifConn) { | ||||
| 		c.mutex.Unlock() | ||||
| 		return false | ||||
| 	} | ||||
| 
 | ||||
| 	conn := c.connection | ||||
| 	c.connection = nil | ||||
| 	c.status = ricochet.Contact_OFFLINE | ||||
| 
 | ||||
| 	// XXX eww, and also potentially deadlockable?
 | ||||
| 	config := c.core.Config.OpenWrite() | ||||
| 	c.data.LastConnected = time.Now().Format(time.RFC3339) | ||||
| 	config.Contacts[strconv.Itoa(c.id)] = c.data | ||||
| 	config.Save() | ||||
| 	c.mutex.Unlock() | ||||
| 
 | ||||
| 	if conn != nil && !conn.Closed { | ||||
| 		conn.Close() | ||||
| 	} | ||||
| 
 | ||||
| 	event := ricochet.ContactEvent{ | ||||
| 		Type: ricochet.ContactEvent_UPDATE, | ||||
| 		Subject: &ricochet.ContactEvent_Contact{ | ||||
| 			Contact: c.Data(), | ||||
| 		}, | ||||
| 	} | ||||
| 	c.events.Publish(event) | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| // Decide whether to replace the existing connection with conn.
 | ||||
| // Assumes mutex is held.
 | ||||
| func (c *Contact) shouldReplaceConnection(conn *protocol.OpenConnection) bool { | ||||
|  | @ -192,29 +356,18 @@ func (c *Contact) shouldReplaceConnection(conn *protocol.OpenConnection) bool { | |||
| 	return false | ||||
| } | ||||
| 
 | ||||
| // XXX also will go away during protocol API rework
 | ||||
| func (c *Contact) OnConnectionAuthenticated(conn *protocol.OpenConnection) { | ||||
| 	c.connChannel <- conn | ||||
| } | ||||
| 
 | ||||
| // XXX rework connection close to have a proper notification instead of this "find contact" mess.
 | ||||
| func (c *Contact) OnConnectionClosed(conn *protocol.OpenConnection) { | ||||
| 	c.mutex.Lock() | ||||
| 
 | ||||
| 	if c.connection != conn { | ||||
| 		c.mutex.Unlock() | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	c.connection = nil | ||||
| 	c.status = ricochet.Contact_OFFLINE | ||||
| 
 | ||||
| 	config := c.core.Config.OpenWrite() | ||||
| 	c.data.LastConnected = time.Now().Format(time.RFC3339) | ||||
| 	config.Contacts[strconv.Itoa(c.id)] = c.data | ||||
| 	config.Save() | ||||
| 
 | ||||
| 	c.mutex.Unlock() | ||||
| 
 | ||||
| 	event := ricochet.ContactEvent{ | ||||
| 		Type: ricochet.ContactEvent_UPDATE, | ||||
| 		Subject: &ricochet.ContactEvent_Contact{ | ||||
| 			Contact: c.Data(), | ||||
| 		}, | ||||
| 	} | ||||
| 	c.events.Publish(event) | ||||
| 	c.connClosedChannel <- struct{}{} | ||||
| } | ||||
|  |  | |||
|  | @ -16,6 +16,8 @@ import ( | |||
| 	"time" | ||||
| ) | ||||
| 
 | ||||
| // XXX Network disconnect should kill open connections ... somehow
 | ||||
| 
 | ||||
| type Network struct { | ||||
| 	// Connection settings; can only change while stopped
 | ||||
| 	controlAddress  string | ||||
|  |  | |||
|  | @ -36,8 +36,8 @@ func (p *Protocol) ServeListener(listener net.Listener) { | |||
| // Strangely, ServeListener starts a background routine that watches a channel
 | ||||
| // on p.service for new connections and dispatches their events to the handler
 | ||||
| // for the listener. API needs a little work here.
 | ||||
| func (p *Protocol) Connect(address string) (*protocol.OpenConnection, error) { | ||||
| 	oc, err := p.service.Connect(address) | ||||
| func (p *Protocol) ConnectOpen(conn net.Conn, host string) (*protocol.OpenConnection, error) { | ||||
| 	oc, err := p.service.ConnectOpen(conn, host) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 	} | ||||
|  | @ -104,13 +104,36 @@ func (handler *protocolHandler) OnAuthenticationProof(oc *protocol.OpenConnectio | |||
| 
 | ||||
| 	log.Printf("protocol: OnAuthenticationProof, result: %v, contact: %v", result, contact) | ||||
| 	if result && contact != nil { | ||||
| 		contact.SetConnection(oc) | ||||
| 		contact.OnConnectionAuthenticated(oc) | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (handler *protocolHandler) OnAuthenticationResult(oc *protocol.OpenConnection, channelID int32, result bool, isKnownContact bool) { | ||||
| 	log.Printf("protocol: OnAuthenticationResult, result: %v, known: %v", result, isKnownContact) | ||||
| 	oc.IsAuthed = result | ||||
| 	oc.CloseChannel(channelID) | ||||
| 
 | ||||
| 	if !result { | ||||
| 		log.Printf("protocol: Outbound connection authentication to %s failed", oc.OtherHostname) | ||||
| 		oc.Close() | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	// XXX Contact request, removed cases
 | ||||
| 	if !isKnownContact { | ||||
| 		log.Printf("protocol: Outbound connection authentication to %s succeeded, but we are not a known contact", oc.OtherHostname) | ||||
| 		oc.Close() | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	contact := handler.p.core.Identity.ContactList().ContactByAddress("ricochet:" + oc.OtherHostname) | ||||
| 	if contact == nil { | ||||
| 		log.Printf("protocol: Outbound connection authenticated to %s succeeded, but no matching contact found", oc.OtherHostname) | ||||
| 		oc.Close() | ||||
| 		return | ||||
| 	} | ||||
| 
 | ||||
| 	log.Printf("protocol: Outbound connection to %s authenticated", oc.OtherHostname) | ||||
| 	contact.OnConnectionAuthenticated(oc) | ||||
| } | ||||
| 
 | ||||
| // Contact Management
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue