From b5301c278285c983c1f69d03b246c72098f73d5c Mon Sep 17 00:00:00 2001 From: John Brooks Date: Sat, 23 Sep 2017 15:56:29 -0600 Subject: [PATCH] Implement inbound contact request protocol This implements a handler for inbound contact request connections, taking advantage of the synchronous Process API. This is largely untested code at the moment. --- core/contactlist.go | 3 +- core/identity.go | 9 +- core/inboundcontactrequest.go | 233 +++++++++++++++++++++++++++++----- 3 files changed, 207 insertions(+), 38 deletions(-) diff --git a/core/contactlist.go b/core/contactlist.go index 5edcbfc..34eb669 100644 --- a/core/contactlist.go +++ b/core/contactlist.go @@ -298,7 +298,8 @@ func (cl *ContactList) RemoveInboundContactRequest(request *InboundContactReques return errors.New("Request is not in contact list") } - request.CloseConnection() + // Close connection asynchronously to avoid potential deadlocking + go request.CloseConnection() // XXX Remove from config delete(cl.inboundRequests, requestData.Address) diff --git a/core/identity.go b/core/identity.go index 58a53b9..9cc263a 100644 --- a/core/identity.go +++ b/core/identity.go @@ -197,9 +197,12 @@ func (me *Identity) handleInboundConnection(conn net.Conn) error { return nil } - // XXX-protocol Unknown contact, should pass to handler for contact requests - log.Printf("Inbound connection is a contact request, but that's not implemented yet") - return errors.New("inbound contact request connections aren't implemented") + err = HandleInboundRequestConnection(rc, me.contactList) + if err == nil { + // Connection now belongs to an accepted contact, don't close it + conn = nil + } + return err } func (me *Identity) Address() string { diff --git a/core/inboundcontactrequest.go b/core/inboundcontactrequest.go index 8fdd022..0c8c740 100644 --- a/core/inboundcontactrequest.go +++ b/core/inboundcontactrequest.go @@ -3,6 +3,7 @@ package core import ( "errors" "github.com/ricochet-im/ricochet-go/rpc" + channels "github.com/s-rah/go-ricochet/channels" connection "github.com/s-rah/go-ricochet/connection" "log" "sync" @@ -10,17 +11,165 @@ import ( ) type InboundContactRequest struct { - core *Ricochet - mutex sync.Mutex - data ricochet.ContactRequest - conn *connection.Connection - channelID int32 - Address string + core *Ricochet + mutex sync.Mutex + data ricochet.ContactRequest + Address string + + // If non-nil, sent the new contact on accept or nil on reject. + // Used to signal back to active connections when request state changes. + contactResultChan chan *Contact // Called when the request state is changed StatusChanged func(request *InboundContactRequest) } +type inboundRequestHandler struct { + Name string + Message string + + RequestReceivedChan chan struct{} + ResponseChan chan string +} + +func (irh *inboundRequestHandler) ContactRequestRejected() {} +func (irh *inboundRequestHandler) ContactRequestAccepted() {} +func (irh *inboundRequestHandler) ContactRequestError() {} +func (irh *inboundRequestHandler) ContactRequest(name, message string) string { + if len(name) > 0 && !IsNicknameAcceptable(name) { + log.Printf("protocol: Stripping unacceptable nickname from inbound request; encoded: %x", []byte(name)) + name = "" + } + irh.Name = name + if len(message) > 0 && !IsMessageAcceptable(message) { + log.Printf("protocol: Stripping unacceptable message from inbound request; len: %d, encoded: %x", len(message), []byte(message)) + message = "" + } + irh.Message = message + + // Signal to the goroutine that a request was received + irh.RequestReceivedChan <- struct{}{} + // Wait for a response + response := <-irh.ResponseChan + return response +} + +// HandleInboundRequestConnection takes an authenticated connection that does not +// associate to any known contact and handles inbound contact request channels. +// If no request is seen after a short timeout, the connection will be closed. +// If a valid request is received, the connection will stay open to wait for the +// user's reply. +// +// This function takes full ownership of the connection. On return, the connection +// will be either closed and a non-nil error returned, or will be assigned to an +// existing Contact along with a nil return value. +func HandleInboundRequestConnection(conn *connection.Connection, contactList *ContactList) error { + log.Printf("Handling inbound contact request connection") + ach := &connection.AutoConnectionHandler{} + ach.Init() + + // There may only ever be one contact request channel on the connection + req := &inboundRequestHandler{ + RequestReceivedChan: make(chan struct{}), + ResponseChan: make(chan string), + } + // XXX should close conn if the channel goes away... + ach.RegisterChannelHandler("im.ricochet.contact.request", func() channels.Handler { + return &channels.ContactRequestChannel{Handler: req} + }) + + processChan := make(chan error) + go func() { + processChan <- conn.Process(ach) + }() + + address, ok := AddressFromPlainHost(conn.RemoteHostname) + if !ok { + conn.Conn.Close() + return <-processChan + } + + // Expecting to receive request data within 15 seconds + select { + case <-req.RequestReceivedChan: + break + case err := <-processChan: + if err == nil { + conn.Conn.Close() + err = errors.New("unexpected break") + } + return err + case <-time.After(15 * time.Second): + // Didn't receive a contact request fast enough + conn.Conn.Close() + return <-processChan + } + + // Function to respond to the request; changed after the initial response + respond := func(status string) { req.ResponseChan <- status } + + request, contact := contactList.AddOrUpdateInboundContactRequest(address, req.Name, req.Message) + if contact == nil && request != nil && !request.IsRejected() { + // Pending request; keep connection open and wait for a user response + respond("Pending") + contactChan := request.getContactResultChannel() + select { + case c, ok := <-contactChan: + if !ok { + // Replaced by a different connection or otherwise cancelled without reply + conn.Conn.Close() + return <-processChan + } + // Set contact and fall out to handle the reply + contact = c + break + case err := <-processChan: + request.clearContactResultChannel(contactChan) + for { + _, ok := <-contactChan + if !ok { + break + } + } + return err + } + + // Change how future responses are sent + respond = func(status string) { + conn.Do(func() error { + channel := conn.Channel("im.ricochet.contact.request", channels.Inbound) + if channel == nil { + return errors.New("no channel") + } + channel.Handler.(*channels.ContactRequestChannel).SendResponse(status) + // Also close the channel; this was a final response + channel.CloseChannel() + return nil + }) + } + } + + // Have a response (either immediately or after pending) + if contact != nil { + // Accepted + respond("Accepted") + if err := conn.Break(); err != nil { + // Connection lost; but request was accepted anyway, so it'll get reconnected later + return err + } + contact.AssignConnection(conn) + return nil + } else { + // Rejected + respond("Rejected") + if request != nil { + contactList.RemoveInboundContactRequest(request) + } + conn.Conn.Close() + return <-processChan + } +} + // CreateInboundContactRequest constructs a new InboundContactRequest, usually from a newly // received request on an open connection. Requests are managed through the ContactList, so // generally you should use ContactList.AddOrUpdateInboundContactRequest instead of calling @@ -41,23 +190,35 @@ func CreateInboundContactRequest(core *Ricochet, address, nickname, message stri return cr } -// XXX-protocol There should be stricter management & a timeout for this connection -func (cr *InboundContactRequest) SetConnection(conn *connection.Connection, channelID int32) { +// getContactResultChannel returns a channel that will be sent a Contact if the request is +// accepted, nil if the request is rejected, or closed if the channel is no longer used. +// This is used to communciate with active connections for pending requests. +func (cr *InboundContactRequest) getContactResultChannel() chan *Contact { cr.mutex.Lock() defer cr.mutex.Unlock() - if cr.conn != nil && cr.conn != conn { - log.Printf("Replacing connection on an inbound contact request") - cr.conn.Conn.Close() + if cr.contactResultChan != nil { + close(cr.contactResultChan) + } + cr.contactResultChan = make(chan *Contact) + return cr.contactResultChan +} + +func (cr *InboundContactRequest) clearContactResultChannel(c chan *Contact) { + cr.mutex.Lock() + defer cr.mutex.Unlock() + if cr.contactResultChan == c { + close(cr.contactResultChan) + cr.contactResultChan = nil } - cr.conn = conn - cr.channelID = channelID } func (cr *InboundContactRequest) CloseConnection() { - if cr.conn != nil { - cr.conn.Conn.Close() - cr.conn = nil + cr.mutex.Lock() + defer cr.mutex.Unlock() + if cr.contactResultChan != nil { + close(cr.contactResultChan) + cr.contactResultChan = nil } } @@ -117,27 +278,32 @@ func (cr *InboundContactRequest) Accept() (*Contact, error) { return nil, err } - if err := cr.AcceptWithContact(contact); err != nil { + if err := cr.acceptWithContact(contact); err != nil { return contact, err } return contact, nil } func (cr *InboundContactRequest) AcceptWithContact(contact *Contact) error { + cr.mutex.Lock() + defer cr.mutex.Unlock() + return cr.acceptWithContact(contact) +} + +// Assumes mutex +func (cr *InboundContactRequest) acceptWithContact(contact *Contact) error { if contact.Address() != cr.data.Address { return errors.New("Contact address does not match request in accept") } - cr.core.Identity.ContactList().RemoveInboundContactRequest(cr) - - // Pass the open connection to the new contact - if cr.conn != nil { - // XXX-protocol cr.conn.AckContactRequest(cr.channelID, "Accepted") - // XXX-protocol cr.conn.CloseChannel(cr.channelID) - contact.AssignConnection(cr.conn) - cr.conn = nil + // Send to active connection if present + if cr.contactResultChan != nil { + cr.contactResultChan <- contact + close(cr.contactResultChan) + cr.contactResultChan = nil } + cr.core.Identity.ContactList().RemoveInboundContactRequest(cr) return nil } @@ -152,20 +318,19 @@ func (cr *InboundContactRequest) Reject() { log.Printf("Rejecting contact request from %s", cr.data.Address) cr.data.Rejected = true + // Signal to the active connection + if cr.contactResultChan != nil { + cr.contactResultChan <- nil + close(cr.contactResultChan) + cr.contactResultChan = nil + } + // Signal update to the callback (probably from ContactList) if cr.StatusChanged != nil { cr.StatusChanged(cr) } - if cr.conn != nil { - // XXX-protocol cr.conn.AckContactRequest(cr.channelID, "Rejected") - // XXX-protocol cr.conn.CloseChannel(cr.channelID) - cr.conn.Conn.Close() - cr.conn = nil - - // The request can be removed once a protocol response is sent - cr.core.Identity.ContactList().RemoveInboundContactRequest(cr) - } + cr.core.Identity.ContactList().RemoveInboundContactRequest(cr) } func (cr *InboundContactRequest) Data() ricochet.ContactRequest {