diff --git a/core/contact-protocol.go b/core/contact-protocol.go new file mode 100644 index 0000000..fb2a50c --- /dev/null +++ b/core/contact-protocol.go @@ -0,0 +1,49 @@ +package core + +import ( + "github.com/s-rah/go-ricochet/channels" + "github.com/s-rah/go-ricochet/connection" + "log" + "time" +) + +type ContactProtocolHandler struct { + connection.AutoConnectionHandler + conn *connection.Connection + contact *Contact +} + +func NewContactProtocolHandler(contact *Contact, conn *connection.Connection) *ContactProtocolHandler { + handler := &ContactProtocolHandler{ + conn: conn, + contact: contact, + } + handler.Init(nil, conn.RemoteHostname) + + handler.RegisterChannelHandler("im.ricochet.chat", func() channels.Handler { + chat := new(channels.ChatChannel) + chat.Handler = handler + return chat + }) + + // XXX Somebody needs to be calling Process, nobody is yet, need that rework in contact + return handler +} + +// Implement ChatChannelHandler for im.ricochet.chat +func (handler *ContactProtocolHandler) ChatMessage(messageID uint32, when time.Time, message string) bool { + // XXX sanity checks, message contents, etc + log.Printf("chat message: %d %d %v %s", messageID, when, message) + + conversation := handler.contact.Conversation() + conversation.Receive(uint64(messageID), when.Unix(), message) + return true +} + +func (handler *ContactProtocolHandler) ChatMessageAck(messageID uint32) { + // XXX no success field + log.Printf("chat ack: %d", messageID) + + conversation := handler.contact.Conversation() + conversation.UpdateSentStatus(uint64(messageID), true) +} diff --git a/core/contact.go b/core/contact.go index 5e1cc3f..f5233e4 100644 --- a/core/contact.go +++ b/core/contact.go @@ -5,6 +5,7 @@ import ( "github.com/ricochet-im/ricochet-go/core/utils" "github.com/ricochet-im/ricochet-go/rpc" protocol "github.com/s-rah/go-ricochet" + connection "github.com/s-rah/go-ricochet/connection" "golang.org/x/net/context" "log" "strconv" @@ -29,24 +30,24 @@ type Contact struct { events *utils.Publisher connEnabled bool - connection *protocol.OpenConnection - connChannel chan *protocol.OpenConnection - connClosedChannel chan struct{} - connStopped chan struct{} + connection *connection.Connection + connChannel chan *connection.Connection + connEnabledSignal chan bool + connectionOnce sync.Once timeConnected time.Time - outboundConnAuthKnown bool - conversation *Conversation } func ContactFromConfig(core *Ricochet, id int, data ConfigContact, events *utils.Publisher) (*Contact, error) { contact := &Contact{ - core: core, - id: id, - data: data, - events: events, + core: core, + id: id, + data: data, + events: events, + connChannel: make(chan *connection.Connection), + connEnabledSignal: make(chan bool), } if id < 0 { @@ -153,41 +154,31 @@ func (c *Contact) Conversation() *Conversation { return c.conversation } -// XXX Thread safety disaster -func (c *Contact) Connection() *protocol.OpenConnection { +func (c *Contact) Connection() *connection.Connection { c.mutex.Lock() defer c.mutex.Unlock() return c.connection } +// StartConnection enables inbound and outbound connections for this contact, if other +// conditions permit them. This function is safe to call repeatedly. func (c *Contact) StartConnection() { - c.mutex.Lock() - defer c.mutex.Unlock() - - if c.connEnabled { - return - } + c.connectionOnce.Do(func() { + go c.contactConnection() + }) c.connEnabled = true - c.connChannel = make(chan *protocol.OpenConnection) - c.connClosedChannel = make(chan struct{}) - c.connStopped = make(chan struct{}) - go c.contactConnection() + c.connEnabledSignal <- true } func (c *Contact) StopConnection() { - c.mutex.Lock() - if !c.connEnabled { - c.mutex.Unlock() - return - } - stopped := c.connStopped - close(c.connChannel) - c.connChannel = nil - c.connClosedChannel = nil - c.connStopped = nil - c.mutex.Unlock() - <-stopped + // Must be running to consume connEnabledSignal + c.connectionOnce.Do(func() { + go c.contactConnection() + }) + + c.connEnabled = false + c.connEnabledSignal <- false } func (c *Contact) shouldMakeOutboundConnections() bool { @@ -202,85 +193,149 @@ func (c *Contact) shouldMakeOutboundConnections() bool { return c.connEnabled } +// closeUnhandledConnection takes a connection without an active Process routine +// and ensures that it is fully closed and destroyed. It is safe to call on +// a connection that has already been closed and on any connection in any +// state, as long as Process() is not currently running. +func closeUnhandledConnection(conn *connection.Connection) { + conn.Conn.Close() + nullHandler := &connection.AutoConnectionHandler{} + nullHandler.Init() + conn.Process(nullHandler) +} + // Goroutine to handle the protocol connection for a contact. // Responsible for making outbound connections and taking over authenticated // inbound connections, running protocol handlers on the active connection, and // reacting to connection loss. Nothing else may write Contact.connection. +// +// This goroutine is started by the first call to StartConnection or StopConnection +// and persists for the lifetime of the contact. When connections are stopped, it +// consumes connChannel and closes all (presumably inbound) connections. +// XXX Need a hard kill for destroying contacts func (c *Contact) contactConnection() { - // XXX Should the protocol continue handling its own goroutines? - // I'm thinking the protocol API design I want is: - // - "handler" assigned per-connection - // - each inbound listener has its own handler also, assigns for conns - // - so, non-authed connection has a handler that _can't_ do anything other than auth - // - inbound contact req connection gets a special handler for that case - // - authenticated contact conns get handler changed here - // It's probably more sensible to never break the conn read loop, because of buffering etc - // So likely we don't want to change that goroutine. Could still use a channel to pass it - // to handler for parsing, which could let it go on any goroutine we want, if it's desirable - // to put it on e.g. this routine. Hmm. + // Signalled when the active connection is closed + connClosedChannel := make(chan struct{}) + connectionsEnabled := false - connChannel := c.connChannel - connClosedChannel := c.connClosedChannel - stopped := c.connStopped - -connectionLoop: for { - // If there is no active connection, spawn an outbound connector. - // A successful connection is returned via connChannel; otherwise, it will keep trying. + if !connectionsEnabled { + // Reject all connections on connChannel and wait for start signal + select { + case conn := <-c.connChannel: + if conn != nil { + log.Printf("Discarded connection to %s because connections are disabled", c.Address()) + go closeUnhandledConnection(conn) + // XXX-protocol doing this here instead of during auth means they'll keep trying endlessly. Doing it in + // auth means they'll never try again. Both are sometimes wrong. Hmm. + } + case enable := <-c.connEnabledSignal: + if enable { + log.Printf("Contact %s connections are enabled", c.Address()) + connectionsEnabled = true + } + // XXX hard kill + } + continue + } + + // If there is no active connection, spawn an outbound connector. A successful connection + // is returned via connChannel, and otherwise it will keep trying until cancelled via + // the context. var outboundCtx context.Context outboundCancel := func() {} if c.connection == nil && c.shouldMakeOutboundConnections() { outboundCtx, outboundCancel = context.WithCancel(context.Background()) - go c.connectOutbound(outboundCtx, connChannel) + go c.connectOutbound(outboundCtx, c.connChannel) } select { - case conn, ok := <-connChannel: + case conn := <-c.connChannel: outboundCancel() - if !ok { - // Closing connChannel exits this connection routine, for contact - // deletion, exit, or some other case. - break connectionLoop - } else if conn == nil { + if conn == nil { // Signal used to restart outbound connection attempts continue } - // Received a new connection. The connection is already authenticated and ready to use. - // This connection may be inbound or outbound. setConnection will decide whether to - // replace an existing connection. - - // XXX Tweak setConnection logic if needed - // Logic when keeping connection, need to make sure protocol is going, etc... - if err := c.setConnection(conn); err != nil { - log.Printf("Discarded new contact %s connection: %s", c.Address(), err) - if !conn.Closed && conn != c.connection { - conn.Close() - } + c.mutex.Lock() + // Decide whether to keep this connection; if this returns an error, conn is + // already closed. If there was an existing connection and this returns nil, + // the old connection is closed but c.connection has not been reset. + if err := c.considerUsingConnection(conn); err != nil { + log.Printf("Discarded new contact %s connection: %s", c.data.Hostname, err) + go closeUnhandledConnection(conn) + c.mutex.Unlock() continue } + replacingConn := c.connection != nil + c.connection = conn + if replacingConn { + // Wait for old handleConnection to return + c.mutex.Unlock() + <-connClosedChannel + c.mutex.Lock() + } + go c.handleConnection(conn, connClosedChannel) + c.onConnectionStateChanged() + c.mutex.Unlock() case <-connClosedChannel: outboundCancel() - c.clearConnection(nil) + c.mutex.Lock() + c.connection = nil + c.onConnectionStateChanged() + c.mutex.Unlock() + + case enable := <-c.connEnabledSignal: + outboundCancel() + if !enable { + connectionsEnabled = false + log.Printf("Contact %s connections are disabled", c.Address()) + } } } log.Printf("Exiting contact connection loop for %s", c.Address()) - c.clearConnection(nil) - close(stopped) + c.mutex.Lock() + if c.connection != nil { + c.connection.Conn.Close() + c.connection = nil + c.onConnectionStateChanged() + c.mutex.Unlock() + <-connClosedChannel + } else { + c.mutex.Unlock() + } +} + +// Goroutine to maintain an open contact connection, calls Process and reports when closed. +func (c *Contact) handleConnection(conn *connection.Connection, closedChannel chan struct{}) { + // Connection does not outlive this function + defer func() { + conn.Conn.Close() + closedChannel <- struct{}{} + }() + log.Printf("Contact connection for %s ready", conn.RemoteHostname) + handler := NewContactProtocolHandler(c, conn) + err := conn.Process(handler) + if err == nil { + // Somebody called Break? + err = fmt.Errorf("Connection handler interrupted unexpectedly") + } + log.Printf("Contact connection for %s closed: %s", conn.RemoteHostname, err) } // Attempt an outbound connection to the contact, retrying automatically using OnionConnector. // This function _must_ send something to connChannel before returning, unless the context has // been cancelled. -func (c *Contact) connectOutbound(ctx context.Context, connChannel chan *protocol.OpenConnection) { +func (c *Contact) connectOutbound(ctx context.Context, connChannel chan *connection.Connection) { c.mutex.Lock() connector := OnionConnector{ Network: c.core.Network, NeverGiveUp: true, } hostname := c.data.Hostname + isRequest := c.data.Request.Pending c.mutex.Unlock() for { @@ -296,109 +351,130 @@ func (c *Contact) connectOutbound(ctx context.Context, connChannel chan *protoco continue } + // XXX-protocol Ideally this should all take place under ctx also; easy option is a goroutine + // blocked on ctx that kills the connection. log.Printf("Successful outbound connection to contact %s", hostname) - oc, err := protocol.Open(conn, hostname[0:16]) + oc, err := protocol.NegotiateVersionOutbound(conn, hostname[0:16]) if err != nil { - log.Printf("Contact connection protocol failure: %s", err) - if oc != nil { - oc.Close() - } + log.Printf("Outbound connection version negotiation failed: %v", err) + conn.Close() if err := connector.Backoff(ctx); err != nil { return } continue - } else { - log.Printf("Protocol connection open: %v", oc) - // XXX Protocol API needs to be reworked; see notes in - // contactConnection. Ideally we should authenticate here and - // pass the conneciton back, but for now this can do nothing: - // the connection will either succeed and come in via the - // protocol handler, or will be closed and signalled via - // OnConnectionClosed. Alternatively, it will break because this - // is fragile and dumb. - // XXX BUG: This means no backoff for authentication failure - handler := &ProtocolConnection{ - Core: c.core, - Conn: oc, - Contact: c, - MyHostname: c.core.Identity.Address()[9:], - PrivateKey: c.core.Identity.PrivateKey(), - } - go oc.Process(handler) - return } + + log.Printf("Outbound connection negotiated version; authenticating") + privateKey := c.core.Identity.PrivateKey() + known, err := connection.HandleOutboundConnection(oc).ProcessAuthAsClient(&privateKey) + if err != nil { + log.Printf("Outbound connection authentication failed: %v", err) + closeUnhandledConnection(oc) + if err := connector.Backoff(ctx); err != nil { + return + } + continue + } + + // XXX-protocol Also move the "this is an outbound request" logic here to kill the silly flag and + // move those out of a scary mutexed path? XXX + if !known && !isRequest { + log.Printf("Outbound connection to contact says we are not a known contact for %v", c) + // XXX Should move to rejected status, stop attempting connections. + closeUnhandledConnection(oc) + if err := connector.Backoff(ctx); err != nil { + return + } + continue + } else if known && isRequest { + log.Printf("Contact request implicitly accepted for outbound connection by contact %v", c) + c.UpdateContactRequest("Accepted") + } + + log.Printf("Assigning outbound connection to contact") + c.AssignConnection(oc) + break } } -func (c *Contact) setConnection(conn *protocol.OpenConnection) error { - if conn.Client { - log.Printf("Contact %s has a new outbound connection", c.Address()) +// considerUsingConnection takes a newly established connection and decides whether +// the new connection is valid and acceptable, and whether to replace or keep an +// existing connection. To handle race cases when peers are connecting to eachother, +// a particular set of rules is followed for replacing an existing connection. +// +// considerUsingConnection returns nil if the new connection is valid and should be +// used. If this function returns nil, the existing connection has been closed (but +// c.connection is unmodified, and the process routine may still be executing). If +// this function returns an error, conn has been closed. +// +// Assumes that c.mutex is held. +func (c *Contact) considerUsingConnection(conn *connection.Connection) error { + killConn := conn + defer func() { + if killConn != nil { + killConn.Conn.Close() + } + }() + + if conn.IsInbound { + log.Printf("Contact %s has a new inbound connection", c.data.Hostname) } else { - log.Printf("Contact %s has a new inbound connection", c.Address()) + log.Printf("Contact %s has a new outbound connection", c.data.Hostname) } - c.mutex.Lock() - if conn == c.connection { - c.mutex.Unlock() return fmt.Errorf("Duplicate assignment of connection %v to contact %v", conn, c) } - if !conn.IsAuthed || conn.Closed { - c.mutex.Unlock() - conn.Close() - return fmt.Errorf("Connection %v is not in a valid state to assign to contact %v", conn, c) + if !conn.Authentication["im.ricochet.auth.hidden-service"] { + return fmt.Errorf("Connection %v is not authenticated", conn) } - if c.data.Hostname[0:16] != conn.OtherHostname { - c.mutex.Unlock() - conn.Close() - return fmt.Errorf("Connection hostname %s doesn't match contact hostname %s when assigning connection", conn.OtherHostname, c.data.Hostname[0:16]) + if c.data.Hostname[0:16] != conn.RemoteHostname { + return fmt.Errorf("Connection hostname %s doesn't match contact hostname %s when assigning connection", conn.RemoteHostname, c.data.Hostname[0:16]) } - if conn.Client && !c.outboundConnAuthKnown && !c.data.Request.Pending { - log.Printf("Outbound connection to contact says we are not a known contact for %v", c) - // XXX Should move to rejected status, stop attempting connections. - c.mutex.Unlock() - conn.Close() - return fmt.Errorf("Outbound connection says we are not a known contact") - } - - if c.connection != nil { - if c.shouldReplaceConnection(conn) { - // XXX Signal state change for connection loss? - c.connection.Close() - c.connection = nil - } else { - c.mutex.Unlock() - conn.Close() - return fmt.Errorf("Using existing connection") - } + if c.connection != nil && !c.shouldReplaceConnection(conn) { + return fmt.Errorf("Using existing connection") } // If this connection is inbound and there's an outbound attempt, keep this // connection and cancel outbound if we haven't sent authentication yet, or // if the outbound connection will lose the fallback comparison above. - // XXX implement this + // XXX implement this; currently outbound is always cancelled when an inbound + // connection succeeds. - c.connection = conn - log.Printf("Assigned connection %v to contact %v", c.connection, c) + // We will keep conn, close c.connection instead if there was one + killConn = c.connection + return nil +} - if c.data.Request.Pending { - if conn.Client && !c.outboundConnAuthKnown { - // Outbound connection for contact request; send request message - // XXX hardcoded channel ID - log.Printf("Sending outbound contact request to %v", c) - conn.SendContactRequest(5, c.data.Request.MyNickname, c.data.Request.Message) +// onConnectionStateChanged is called by the connection loop when the c.connection +// is changed, which can be a transition to online or offline or a replacement. +// Assumes c.mutex is held. +func (c *Contact) onConnectionStateChanged() { + if c.connection != nil { + if c.data.Request.Pending { + if !c.connection.IsInbound { + // Outbound connection for contact request; send request message + log.Printf("Sending outbound contact request to %v", c) + // XXX-protocol ooooohhhhh no you don't. Cannot interact w/ protocol here, because process may + // not have started yet. Maybe this one needs to go w/ outbound auth in pre-connection also + // honestly, it would not be a bad thing to have outbound unaccepted requests _not_ be + // considered active connections, as long as they get handled properly. + // c.connection.SendContactRequest(5, c.data.Request.MyNickname, c.data.Request.Message) + } else { + // Inbound connection implicitly accepts the contact request and can continue as a contact + log.Printf("Contact request implicitly accepted by contact %v", c) + c.updateContactRequest("Accepted") + } } else { - // Inbound connection or outbound connection with a positive - // 'isKnownContact' response implicitly accepts the contact request - // and can continue as a contact - log.Printf("Contact request implicitly accepted by contact %v", c) - c.updateContactRequest("Accepted") + c.status = ricochet.Contact_ONLINE } } else { - c.status = ricochet.Contact_ONLINE + if c.status == ricochet.Contact_ONLINE { + c.status = ricochet.Contact_OFFLINE + } } // Update LastConnected time @@ -409,8 +485,8 @@ func (c *Contact) setConnection(conn *protocol.OpenConnection) error { config.Contacts[strconv.Itoa(c.id)] = c.data config.Save() + // _really_ assumes c.mutex was held c.mutex.Unlock() - event := ricochet.ContactEvent{ Type: ricochet.ContactEvent_UPDATE, Subject: &ricochet.ContactEvent_Contact{ @@ -419,59 +495,24 @@ func (c *Contact) setConnection(conn *protocol.OpenConnection) error { } c.events.Publish(event) - // Send any queued messages - sent := c.Conversation().SendQueuedMessages() - if sent > 0 { - log.Printf("Sent %d queued messages to contact", sent) + if c.connection != nil { + // Send any queued messages + sent := c.Conversation().SendQueuedMessages() + if sent > 0 { + log.Printf("Sent %d queued messages to contact", sent) + } } - return nil -} - -// Close and clear state related to the active contact connection. -// If ifConn is non-nil, the active connection is only cleared if -// it is the same as ifConn. Returns true if cleared. -func (c *Contact) clearConnection(ifConn *protocol.OpenConnection) bool { c.mutex.Lock() - if c.connection == nil || (ifConn != nil && c.connection != ifConn) { - c.mutex.Unlock() - return false - } - - conn := c.connection - c.connection = nil - c.status = ricochet.Contact_OFFLINE - - // XXX eww, and also potentially deadlockable? - config := c.core.Config.OpenWrite() - c.data.LastConnected = time.Now().Format(time.RFC3339) - config.Contacts[strconv.Itoa(c.id)] = c.data - config.Save() - c.mutex.Unlock() - - if conn != nil && !conn.Closed { - conn.Close() - } - - event := ricochet.ContactEvent{ - Type: ricochet.ContactEvent_UPDATE, - Subject: &ricochet.ContactEvent_Contact{ - Contact: c.Data(), - }, - } - c.events.Publish(event) - return true } // Decide whether to replace the existing connection with conn. // Assumes mutex is held. -func (c *Contact) shouldReplaceConnection(conn *protocol.OpenConnection) bool { +func (c *Contact) shouldReplaceConnection(conn *connection.Connection) bool { + myHostname, _ := PlainHostFromAddress(c.core.Identity.Address()) if c.connection == nil { return true - } else if c.connection.Closed { - log.Printf("Replacing dead connection %v for contact %v", c.connection, c) - return true - } else if c.connection.Client == conn.Client { + } else if c.connection.IsInbound == conn.IsInbound { // If the existing connection is in the same direction, always use the new one log.Printf("Replacing existing same-direction connection %v with new connection %v for contact %v", c.connection, conn, c) return true @@ -479,7 +520,7 @@ func (c *Contact) shouldReplaceConnection(conn *protocol.OpenConnection) bool { // If the existing connection is more than 30 seconds old, use the new one log.Printf("Replacing existing %v old connection %v with new connection %v for contact %v", time.Since(c.timeConnected), c.connection, conn, c) return true - } else if preferOutbound := conn.MyHostname < conn.OtherHostname; preferOutbound == conn.Client { + } else if preferOutbound := myHostname < conn.RemoteHostname; preferOutbound != conn.IsInbound { // Fall back to string comparison of hostnames for a stable resolution // New connection wins log.Printf("Replacing existing connection %v with new connection %v for contact %v according to fallback order", c.connection, conn, c) @@ -552,29 +593,13 @@ func (c *Contact) updateContactRequest(status string) bool { return re } -// XXX also will go away during protocol API rework -func (c *Contact) OnConnectionAuthenticated(conn *protocol.OpenConnection, knownContact bool) { - c.mutex.Lock() - if c.connChannel == nil { - log.Printf("Inbound connection from contact, but connections are not enabled for contact %v", c) - c.mutex.Unlock() - conn.Close() - } - // XXX this is ugly - if conn.Client { - c.outboundConnAuthKnown = knownContact - } - c.connChannel <- conn - c.mutex.Unlock() -} +// AssignConnection takes new connections, inbound or outbound, to this contact, and +// asynchronously decides whether to keep or close them. +func (c *Contact) AssignConnection(conn *connection.Connection) { + c.connectionOnce.Do(func() { + go c.contactConnection() + }) -// XXX rework connection close to have a proper notification instead of this "find contact" mess. -func (c *Contact) OnConnectionClosed(conn *protocol.OpenConnection) { - c.mutex.Lock() - if c.connection != conn || c.connClosedChannel == nil { - c.mutex.Unlock() - return - } - c.connClosedChannel <- struct{}{} - c.mutex.Unlock() + // If connections are disabled, this connection will be closed by contactConnection + c.connChannel <- conn } diff --git a/core/contactlist.go b/core/contactlist.go index 3313b14..5edcbfc 100644 --- a/core/contactlist.go +++ b/core/contactlist.go @@ -208,6 +208,9 @@ func (this *ContactList) RemoveContact(contact *Contact) error { return errors.New("Not in contact list") } + // XXX How do we safely make sure that the contact has stopped everything, and that + // nobody is going to block on it or keep referencing it..? This is insufficient, it + // leaves a goroutine up among other things. contact.StopConnection() config := this.core.Config.OpenWrite() diff --git a/core/conversation.go b/core/conversation.go index 4933e9c..49f92d2 100644 --- a/core/conversation.go +++ b/core/conversation.go @@ -4,7 +4,7 @@ import ( "errors" "github.com/ricochet-im/ricochet-go/core/utils" "github.com/ricochet-im/ricochet-go/rpc" - protocol "github.com/s-rah/go-ricochet" + connection "github.com/s-rah/go-ricochet/connection" "log" "math/rand" "sync" @@ -182,19 +182,23 @@ func (c *Conversation) SendQueuedMessages() int { return sent } -func sendMessageToConnection(conn *protocol.OpenConnection, message *ricochet.Message) { - // XXX hardcoded channel IDs, also channel IDs shouldn't be exposed - channelId := int32(7) - if !conn.Client { - channelId++ - } - // XXX no error handling - if conn.GetChannelType(channelId) != "im.ricochet.chat" { - conn.OpenChatChannel(channelId) - } +func sendMessageToConnection(conn *connection.Connection, message *ricochet.Message) { + // XXX + panic("sendMessageToConnection needs implementing for new protocol API") + /* + // XXX hardcoded channel IDs, also channel IDs shouldn't be exposed + channelId := int32(7) + if conn.IsInbound { + channelId++ + } + // XXX no error handling + if conn.GetChannelType(channelId) != "im.ricochet.chat" { + conn.OpenChatChannel(channelId) + } - // XXX no message IDs - conn.SendMessage(channelId, message.Text) + // XXX no message IDs + conn.SendMessage(channelId, message.Text) + */ } // XXX This is inefficient -- it'll usually only be marking the last message diff --git a/core/identity.go b/core/identity.go index 249b390..58a53b9 100644 --- a/core/identity.go +++ b/core/identity.go @@ -6,8 +6,10 @@ import ( "errors" "github.com/ricochet-im/ricochet-go/core/utils" protocol "github.com/s-rah/go-ricochet" + connection "github.com/s-rah/go-ricochet/connection" "github.com/yawning/bulb/utils/pkcs1" "log" + "net" "sync" ) @@ -103,34 +105,6 @@ func (me *Identity) setPrivateKey(key *rsa.PrivateKey) error { return nil } -type identityService struct { - Identity *Identity - MyHostname string -} - -func (is *identityService) OnNewConnection(oc *protocol.OpenConnection) { - log.Printf("Inbound connection accepted") - oc.MyHostname = is.MyHostname - // XXX Should have pre-auth handling, timeouts - identity := is.Identity - handler := &ProtocolConnection{ - Core: identity.core, - Conn: oc, - GetContactByHostname: func(hostname string) *Contact { - address, ok := AddressFromPlainHost(hostname) - if !ok { - return nil - } - return identity.ContactList().ContactByAddress(address) - }, - } - go oc.Process(handler) -} - -func (is *identityService) OnFailedConnection(err error) { - log.Printf("Inbound connection failed: %v", err) -} - // BUG(special): No error handling for failures under publishService func (me *Identity) publishService(key *rsa.PrivateKey) { // This call will block until a control connection is available and the @@ -160,17 +134,72 @@ func (me *Identity) publishService(key *rsa.PrivateKey) { } log.Printf("Identity service published, accepting connections") - is := &identityService{ - Identity: me, - MyHostname: me.Address()[9:], + for { + conn, err := listener.Accept() + if err != nil { + log.Printf("Identity listener failed: %v", err) + // XXX handle + return + } + + // Handle connection in a separate goroutine and continue listening + go me.handleInboundConnection(conn) + } +} + +func (me *Identity) handleInboundConnection(conn net.Conn) error { + defer func() { + // Close conn on return unless explicitly cleared + if conn != nil { + conn.Close() + } + }() + + contactByHostname := func(hostname string) (*Contact, error) { + address, ok := AddressFromPlainHost(hostname) + if !ok { + // This would be a bug + return nil, errors.New("invalid authenticated hostname") + } + return me.contactList.ContactByAddress(address), nil + } + lookupContactAuth := func(hostname string, publicKey rsa.PublicKey) (bool, bool) { + contact, err := contactByHostname(hostname) + if err != nil { + return false, false + } + // allowed, known + return true, contact != nil } - err = protocol.Serve(listener, is) + rc, err := protocol.NegotiateVersionInbound(conn) if err != nil { - log.Printf("Identity listener failed: %v", err) - // XXX handle - return + log.Printf("Inbound connection failed: %v", err) + return err } + + authHandler := connection.HandleInboundConnection(rc) + err = authHandler.ProcessAuthAsServer(me.privateKey, lookupContactAuth) + if err != nil { + log.Printf("Inbound connection auth failed: %v", err) + return err + } + contact, err := contactByHostname(rc.RemoteHostname) + if err != nil { + log.Printf("Inbound connection lookup failed: %v", err) + return err + } + + if contact != nil { + // Known contact, pass the new connection to Contact + contact.AssignConnection(rc) + conn = nil + return nil + } + + // XXX-protocol Unknown contact, should pass to handler for contact requests + log.Printf("Inbound connection is a contact request, but that's not implemented yet") + return errors.New("inbound contact request connections aren't implemented") } func (me *Identity) Address() string { diff --git a/core/inboundcontactrequest.go b/core/inboundcontactrequest.go index b17c419..8fdd022 100644 --- a/core/inboundcontactrequest.go +++ b/core/inboundcontactrequest.go @@ -3,7 +3,7 @@ package core import ( "errors" "github.com/ricochet-im/ricochet-go/rpc" - protocol "github.com/s-rah/go-ricochet" + connection "github.com/s-rah/go-ricochet/connection" "log" "sync" "time" @@ -13,7 +13,7 @@ type InboundContactRequest struct { core *Ricochet mutex sync.Mutex data ricochet.ContactRequest - conn *protocol.OpenConnection + conn *connection.Connection channelID int32 Address string @@ -41,14 +41,14 @@ func CreateInboundContactRequest(core *Ricochet, address, nickname, message stri return cr } -// XXX There should be stricter management & a timeout for this connection -func (cr *InboundContactRequest) SetConnection(conn *protocol.OpenConnection, channelID int32) { +// XXX-protocol There should be stricter management & a timeout for this connection +func (cr *InboundContactRequest) SetConnection(conn *connection.Connection, channelID int32) { cr.mutex.Lock() defer cr.mutex.Unlock() if cr.conn != nil && cr.conn != conn { log.Printf("Replacing connection on an inbound contact request") - cr.conn.Close() + cr.conn.Conn.Close() } cr.conn = conn cr.channelID = channelID @@ -56,7 +56,7 @@ func (cr *InboundContactRequest) SetConnection(conn *protocol.OpenConnection, ch func (cr *InboundContactRequest) CloseConnection() { if cr.conn != nil { - cr.conn.Close() + cr.conn.Conn.Close() cr.conn = nil } } @@ -131,10 +131,10 @@ func (cr *InboundContactRequest) AcceptWithContact(contact *Contact) error { cr.core.Identity.ContactList().RemoveInboundContactRequest(cr) // Pass the open connection to the new contact - if cr.conn != nil && !cr.conn.Closed { - cr.conn.AckContactRequest(cr.channelID, "Accepted") - cr.conn.CloseChannel(cr.channelID) - contact.OnConnectionAuthenticated(cr.conn, true) + if cr.conn != nil { + // XXX-protocol cr.conn.AckContactRequest(cr.channelID, "Accepted") + // XXX-protocol cr.conn.CloseChannel(cr.channelID) + contact.AssignConnection(cr.conn) cr.conn = nil } @@ -157,10 +157,10 @@ func (cr *InboundContactRequest) Reject() { cr.StatusChanged(cr) } - if cr.conn != nil && !cr.conn.Closed { - cr.conn.AckContactRequest(cr.channelID, "Rejected") - cr.conn.CloseChannel(cr.channelID) - cr.conn.Close() + if cr.conn != nil { + // XXX-protocol cr.conn.AckContactRequest(cr.channelID, "Rejected") + // XXX-protocol cr.conn.CloseChannel(cr.channelID) + cr.conn.Conn.Close() cr.conn = nil // The request can be removed once a protocol response is sent diff --git a/core/protocol.go b/core/protocol.go deleted file mode 100644 index ca8bab7..0000000 --- a/core/protocol.go +++ /dev/null @@ -1,231 +0,0 @@ -package core - -import ( - "crypto/rsa" - "encoding/asn1" - protocol "github.com/s-rah/go-ricochet" - "log" - "time" -) - -type ProtocolConnection struct { - Core *Ricochet - - Conn *protocol.OpenConnection - Contact *Contact - - // Client-side authentication - MyHostname string - PrivateKey rsa.PrivateKey - - // Service-side authentication - GetContactByHostname func(hostname string) *Contact -} - -func (pc *ProtocolConnection) OnReady(oc *protocol.OpenConnection) { - if pc.Conn != nil && pc.Conn != oc { - log.Panicf("ProtocolConnection is already assigned connection %v, but OnReady called for connection %v", pc.Conn, oc) - } - - pc.Conn = oc - - if pc.Conn.Client { - log.Printf("Connected to %s", pc.Conn.OtherHostname) - pc.Conn.MyHostname = pc.MyHostname - pc.Conn.IsAuthed = true // Outbound connections are authenticated - pc.Conn.Authenticate(1) - } -} - -func (pc *ProtocolConnection) OnDisconnect() { - log.Printf("protocol: OnDisconnect: %v", pc) - if pc.Contact != nil { - pc.Contact.OnConnectionClosed(pc.Conn) - } -} - -// Authentication Management -func (pc *ProtocolConnection) OnAuthenticationRequest(channelID int32, clientCookie [16]byte) { - log.Printf("protocol: OnAuthenticationRequest") - pc.Conn.ConfirmAuthChannel(channelID, clientCookie) -} - -func (pc *ProtocolConnection) OnAuthenticationChallenge(channelID int32, serverCookie [16]byte) { - log.Printf("protocol: OnAuthenticationChallenge") - publicKeyBytes, _ := asn1.Marshal(pc.PrivateKey.PublicKey) - pc.Conn.SendProof(1, serverCookie, publicKeyBytes, &pc.PrivateKey) -} - -func (pc *ProtocolConnection) OnAuthenticationProof(channelID int32, publicKey []byte, signature []byte) { - result := pc.Conn.ValidateProof(channelID, publicKey, signature) - - if result { - if len(pc.Conn.OtherHostname) != 16 { - log.Printf("protocol: Invalid format for hostname '%s' in authentication proof", pc.Conn.OtherHostname) - result = false - } else { - pc.Contact = pc.GetContactByHostname(pc.Conn.OtherHostname) - } - } - isKnownContact := (pc.Contact != nil) - - pc.Conn.SendAuthenticationResult(channelID, result, isKnownContact) - pc.Conn.IsAuthed = result - pc.Conn.CloseChannel(channelID) - - log.Printf("protocol: OnAuthenticationProof, result: %v, contact: %v", result, pc.Contact) - if result && pc.Contact != nil { - pc.Contact.OnConnectionAuthenticated(pc.Conn, true) - } -} - -func (pc *ProtocolConnection) OnAuthenticationResult(channelID int32, result bool, isKnownContact bool) { - pc.Conn.IsAuthed = result - pc.Conn.CloseChannel(channelID) - - if !result { - log.Printf("protocol: Outbound connection authentication to %s failed", pc.Conn.OtherHostname) - pc.Conn.Close() - return - } - - log.Printf("protocol: Outbound connection to %s authenticated", pc.Conn.OtherHostname) - if pc.Contact != nil { - pc.Contact.OnConnectionAuthenticated(pc.Conn, isKnownContact) - } -} - -// Contact Management -func (pc *ProtocolConnection) OnContactRequest(channelID int32, nick string, message string) { - if pc.Conn.Client || !pc.Conn.IsAuthed || pc.Contact != nil { - pc.Conn.CloseChannel(channelID) - return - } - - address, ok := AddressFromPlainHost(pc.Conn.OtherHostname) - if !ok { - pc.Conn.CloseChannel(channelID) - return - } - if len(nick) > 0 && !IsNicknameAcceptable(nick) { - log.Printf("protocol: Stripping unacceptable nickname from inbound request; encoded: %x", []byte(nick)) - nick = "" - } - if len(message) > 0 && !IsMessageAcceptable(message) { - log.Printf("protocol: Stripping unacceptable message from inbound request; len: %d, encoded: %x", len(message), []byte(message)) - message = "" - } - - contactList := pc.Core.Identity.ContactList() - request, contact := contactList.AddOrUpdateInboundContactRequest(address, nick, message) - - if contact != nil { - // Accepted immediately - pc.Conn.AckContactRequestOnResponse(channelID, "Accepted") - pc.Conn.CloseChannel(channelID) - contact.OnConnectionAuthenticated(pc.Conn, true) - } else if request != nil && !request.IsRejected() { - // Pending - pc.Conn.AckContactRequestOnResponse(channelID, "Pending") - request.SetConnection(pc.Conn, channelID) - } else { - // Rejected - pc.Conn.AckContactRequestOnResponse(channelID, "Rejected") - pc.Conn.CloseChannel(channelID) - pc.Conn.Close() - if request != nil { - contactList.RemoveInboundContactRequest(request) - } - } -} - -func (pc *ProtocolConnection) OnContactRequestAck(channelID int32, status string) { - if !pc.Conn.Client || pc.Contact == nil { - pc.Conn.CloseChannel(channelID) - return - } - - if !pc.Contact.UpdateContactRequest(status) { - pc.Conn.CloseChannel(channelID) - return - } -} - -func (pc *ProtocolConnection) IsKnownContact(hostname string) bool { - // All uses of this are for authenticated contacts, so it's sufficient to check pc.Contact - if pc.Contact != nil { - contactHostname, _ := PlainHostFromOnion(pc.Contact.Hostname()) - if hostname != contactHostname { - log.Panicf("IsKnownContact called for unexpected hostname '%s'", hostname) - } - return true - } - return false -} - -// Managing Channels -func (pc *ProtocolConnection) OnOpenChannelRequest(channelID int32, channelType string) { - log.Printf("open channel request: %v %v", channelID, channelType) - pc.Conn.AckOpenChannel(channelID, channelType) -} - -func (pc *ProtocolConnection) OnOpenChannelRequestSuccess(channelID int32) { - log.Printf("open channel request success: %v", channelID) -} - -func (pc *ProtocolConnection) OnChannelClosed(channelID int32) { - log.Printf("channel closed: %v", channelID) -} - -// Chat Messages -// XXX messageID should be (at least) uint32 -func (pc *ProtocolConnection) OnChatMessage(channelID int32, messageID int32, message string) { - // XXX no time delta? - // XXX sanity checks, message contents, etc - log.Printf("chat message: %d %d %s", channelID, messageID, message) - - // XXX error case - if pc.Contact == nil { - pc.Conn.Close() - } - - // XXX cache? - conversation := pc.Contact.Conversation() - conversation.Receive(uint64(messageID), time.Now().Unix(), message) - - pc.Conn.AckChatMessage(channelID, messageID) -} - -func (pc *ProtocolConnection) OnChatMessageAck(channelID int32, messageID int32) { - // XXX no success - log.Printf("chat ack: %d %d", channelID, messageID) - - // XXX error case - if pc.Contact == nil { - pc.Conn.Close() - } - - conversation := pc.Contact.Conversation() - conversation.UpdateSentStatus(uint64(messageID), true) -} - -// Handle Errors -func (pc *ProtocolConnection) OnFailedChannelOpen(channelID int32, errorType string) { - log.Printf("failed channel open: %d %s", channelID, errorType) - pc.Conn.UnsetChannel(channelID) -} -func (pc *ProtocolConnection) OnGenericError(channelID int32) { - pc.Conn.RejectOpenChannel(channelID, "GenericError") -} -func (pc *ProtocolConnection) OnUnknownTypeError(channelID int32) { - pc.Conn.RejectOpenChannel(channelID, "UnknownTypeError") -} -func (pc *ProtocolConnection) OnUnauthorizedError(channelID int32) { - pc.Conn.RejectOpenChannel(channelID, "UnauthorizedError") -} -func (pc *ProtocolConnection) OnBadUsageError(channelID int32) { - pc.Conn.RejectOpenChannel(channelID, "BadUsageError") -} -func (pc *ProtocolConnection) OnFailedError(channelID int32) { - pc.Conn.RejectOpenChannel(channelID, "FailedError") -}