core: Improve start/stop of contact connections

This commit is contained in:
John Brooks 2016-10-28 16:08:16 -06:00
parent 6f7e030811
commit 1c0c3242eb
3 changed files with 90 additions and 23 deletions

View File

@ -29,9 +29,11 @@ type Contact struct {
mutex sync.Mutex mutex sync.Mutex
events *utils.Publisher events *utils.Publisher
connEnabled bool
connection *protocol.OpenConnection connection *protocol.OpenConnection
connChannel chan *protocol.OpenConnection connChannel chan *protocol.OpenConnection
connClosedChannel chan struct{} connClosedChannel chan struct{}
connStopped chan struct{}
outboundConnAuthKnown bool outboundConnAuthKnown bool
@ -40,12 +42,10 @@ type Contact struct {
func ContactFromConfig(core *Ricochet, id int, data ConfigContact, events *utils.Publisher) (*Contact, error) { func ContactFromConfig(core *Ricochet, id int, data ConfigContact, events *utils.Publisher) (*Contact, error) {
contact := &Contact{ contact := &Contact{
core: core, core: core,
id: id, id: id,
data: data, data: data,
events: events, events: events,
connChannel: make(chan *protocol.OpenConnection),
connClosedChannel: make(chan struct{}),
} }
if id < 0 { if id < 0 {
@ -62,13 +62,6 @@ func ContactFromConfig(core *Ricochet, id int, data ConfigContact, events *utils
} }
} }
// XXX Ugly and fragile way to inhibit connections
if contact.status != ricochet.Contact_REJECTED {
// XXX Should have some global trigger that starts all contact connections
// at the right time
go contact.contactConnection()
}
return contact, nil return contact, nil
} }
@ -157,6 +150,48 @@ func (c *Contact) Connection() *protocol.OpenConnection {
return c.connection return c.connection
} }
func (c *Contact) StartConnection() {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.connEnabled {
return
}
c.connEnabled = true
c.connChannel = make(chan *protocol.OpenConnection)
c.connClosedChannel = make(chan struct{})
c.connStopped = make(chan struct{})
go c.contactConnection()
}
func (c *Contact) StopConnection() {
c.mutex.Lock()
if !c.connEnabled {
c.mutex.Unlock()
return
}
stopped := c.connStopped
close(c.connChannel)
c.connChannel = nil
c.connClosedChannel = nil
c.connStopped = nil
c.mutex.Unlock()
<-stopped
}
func (c *Contact) shouldMakeOutboundConnections() bool {
c.mutex.Lock()
defer c.mutex.Unlock()
// Don't make connections to contacts in the REJECTED state
if c.status == ricochet.Contact_REJECTED {
return false
}
return c.connEnabled
}
// Goroutine to handle the protocol connection for a contact. // Goroutine to handle the protocol connection for a contact.
// Responsible for making outbound connections and taking over authenticated // Responsible for making outbound connections and taking over authenticated
// inbound connections, running protocol handlers on the active connection, and // inbound connections, running protocol handlers on the active connection, and
@ -174,23 +209,28 @@ func (c *Contact) contactConnection() {
// to handler for parsing, which could let it go on any goroutine we want, if it's desirable // to handler for parsing, which could let it go on any goroutine we want, if it's desirable
// to put it on e.g. this routine. Hmm. // to put it on e.g. this routine. Hmm.
connChannel := c.connChannel
connClosedChannel := c.connClosedChannel
stopped := c.connStopped
connectionLoop:
for { for {
// If there is no active connection, spawn an outbound connector. // If there is no active connection, spawn an outbound connector.
// A successful connection is returned via connChannel; otherwise, it will keep trying. // A successful connection is returned via connChannel; otherwise, it will keep trying.
var outboundCtx context.Context var outboundCtx context.Context
outboundCancel := func() {} outboundCancel := func() {}
if c.connection == nil { if c.connection == nil && c.shouldMakeOutboundConnections() {
outboundCtx, outboundCancel = context.WithCancel(context.Background()) outboundCtx, outboundCancel = context.WithCancel(context.Background())
go c.connectOutbound(outboundCtx, c.connChannel) go c.connectOutbound(outboundCtx, connChannel)
} }
select { select {
case conn, ok := <-c.connChannel: case conn, ok := <-connChannel:
outboundCancel() outboundCancel()
if !ok { if !ok {
// Closing connChannel exits this connection routine, for contact // Closing connChannel exits this connection routine, for contact
// deletion, exit, or some other case. // deletion, exit, or some other case.
break break connectionLoop
} else if conn == nil { } else if conn == nil {
// Signal used to restart outbound connection attempts // Signal used to restart outbound connection attempts
continue continue
@ -210,7 +250,7 @@ func (c *Contact) contactConnection() {
continue continue
} }
case <-c.connClosedChannel: case <-connClosedChannel:
outboundCancel() outboundCancel()
c.clearConnection(nil) c.clearConnection(nil)
} }
@ -218,6 +258,7 @@ func (c *Contact) contactConnection() {
log.Printf("Exiting contact connection loop for %s", c.Address()) log.Printf("Exiting contact connection loop for %s", c.Address())
c.clearConnection(nil) c.clearConnection(nil)
close(stopped)
} }
// Attempt an outbound connection to the contact, retrying automatically using OnionConnector. // Attempt an outbound connection to the contact, retrying automatically using OnionConnector.
@ -499,20 +540,27 @@ func (c *Contact) updateContactRequest(status string) bool {
// XXX also will go away during protocol API rework // XXX also will go away during protocol API rework
func (c *Contact) OnConnectionAuthenticated(conn *protocol.OpenConnection, knownContact bool) { func (c *Contact) OnConnectionAuthenticated(conn *protocol.OpenConnection, knownContact bool) {
c.mutex.Lock()
if c.connChannel == nil {
log.Printf("Inbound connection from contact, but connections are not enabled for contact %v", c)
c.mutex.Unlock()
conn.Close()
}
// XXX this is ugly // XXX this is ugly
if conn.Client { if conn.Client {
c.outboundConnAuthKnown = knownContact c.outboundConnAuthKnown = knownContact
} }
c.connChannel <- conn c.connChannel <- conn
c.mutex.Unlock()
} }
// XXX rework connection close to have a proper notification instead of this "find contact" mess. // XXX rework connection close to have a proper notification instead of this "find contact" mess.
func (c *Contact) OnConnectionClosed(conn *protocol.OpenConnection) { func (c *Contact) OnConnectionClosed(conn *protocol.OpenConnection) {
c.mutex.Lock() c.mutex.Lock()
if c.connection != conn { if c.connection != conn || c.connClosedChannel == nil {
c.mutex.Unlock() c.mutex.Unlock()
return return
} }
c.mutex.Unlock()
c.connClosedChannel <- struct{}{} c.connClosedChannel <- struct{}{}
c.mutex.Unlock()
} }

View File

@ -144,6 +144,7 @@ func (this *ContactList) AddContactRequest(address, name, fromName, text string)
} }
this.events.Publish(event) this.events.Publish(event)
contact.StartConnection()
return contact, nil return contact, nil
} }
@ -155,9 +156,14 @@ func (this *ContactList) RemoveContact(contact *Contact) error {
return errors.New("Not in contact list") return errors.New("Not in contact list")
} }
// XXX Not persisting in config contact.StopConnection()
// XXX This will have to do some things to the contact itself
// eventually too, such as killing connections and other resources. config := this.core.Config.OpenWrite()
delete(config.Contacts, strconv.Itoa(contact.Id()))
if err := config.Save(); err != nil {
return err
}
delete(this.contacts, contact.Id()) delete(this.contacts, contact.Id())
event := ricochet.ContactEvent{ event := ricochet.ContactEvent{
@ -173,3 +179,15 @@ func (this *ContactList) RemoveContact(contact *Contact) error {
return nil return nil
} }
func (this *ContactList) StartConnections() {
for _, contact := range this.Contacts() {
contact.StartConnection()
}
}
func (this *ContactList) StopConnections() {
for _, contact := range this.Contacts() {
contact.StopConnection()
}
}

View File

@ -43,6 +43,7 @@ func CreateIdentity(core *Ricochet) (*Identity, error) {
} }
me.contactList = contactList me.contactList = contactList
contactList.StartConnections()
go me.publishService(me.privateKey) go me.publishService(me.privateKey)
return me, nil return me, nil
} }