From 289d0f4dc2e292189011f2f2b1cba06f82b3f4d2 Mon Sep 17 00:00:00 2001 From: John Brooks Date: Mon, 19 Sep 2016 19:53:21 -0600 Subject: [PATCH] core: Send contact update events on status change --- backend/rpc.go | 10 +--- core/contact.go | 119 ++++++++++++++++++++++++++++++-------------- core/contactlist.go | 2 +- 3 files changed, 84 insertions(+), 47 deletions(-) diff --git a/backend/rpc.go b/backend/rpc.go index 025bd37..b456553 100644 --- a/backend/rpc.go +++ b/backend/rpc.go @@ -6,7 +6,6 @@ import ( rpc "github.com/special/notricochet/rpc" "golang.org/x/net/context" "log" - "time" ) var NotImplementedError error = errors.New("Not implemented") @@ -85,17 +84,10 @@ func (s *RpcServer) MonitorContacts(req *rpc.MonitorContactsRequest, stream rpc. // Populate contacts := s.core.Identity.ContactList().Contacts() for _, contact := range contacts { - data := &rpc.Contact{ - Id: int32(contact.Id()), - Address: contact.Address(), - Nickname: contact.Nickname(), - WhenCreated: contact.WhenCreated().Format(time.RFC3339), - LastConnected: contact.LastConnected().Format(time.RFC3339), - } event := &rpc.ContactEvent{ Type: rpc.ContactEvent_POPULATE, Subject: &rpc.ContactEvent_Contact{ - Contact: data, + Contact: contact.Data(), }, } if err := stream.Send(event); err != nil { diff --git a/core/contact.go b/core/contact.go index 7f37e62..c06998c 100644 --- a/core/contact.go +++ b/core/contact.go @@ -3,8 +3,10 @@ package core import ( "fmt" protocol "github.com/s-rah/go-ricochet" + "github.com/special/notricochet/core/utils" "github.com/special/notricochet/rpc" "log" + "strconv" "strings" "sync" "time" @@ -14,18 +16,24 @@ import ( // Contact, ConfigContact, and rpc.Contact. This should be reduced somehow. type Contact struct { - id int - data ConfigContact + core *Ricochet - mutex sync.Mutex + id int + data ConfigContact + status ricochet.Contact_Status + + mutex sync.Mutex + events *utils.Publisher connection *protocol.OpenConnection } -func ContactFromConfig(id int, data ConfigContact) (*Contact, error) { +func ContactFromConfig(core *Ricochet, id int, data ConfigContact, events *utils.Publisher) (*Contact, error) { contact := &Contact{ - id: id, - data: data, + core: core, + id: id, + data: data, + events: events, } if id < 0 { @@ -76,56 +84,50 @@ func (c *Contact) WhenCreated() time.Time { func (c *Contact) Status() ricochet.Contact_Status { c.mutex.Lock() defer c.mutex.Unlock() - if c.connection == nil { - return ricochet.Contact_UNKNOWN - } else { - return ricochet.Contact_ONLINE + return c.status +} + +func (c *Contact) Data() *ricochet.Contact { + c.mutex.Lock() + defer c.mutex.Unlock() + data := &ricochet.Contact{ + Id: int32(c.id), + Address: "ricochet:" + c.data.Hostname[0:16], + Nickname: c.data.Nickname, + WhenCreated: c.data.WhenCreated, + LastConnected: c.data.LastConnected, + Status: c.status, } + return data } func (c *Contact) SetConnection(conn *protocol.OpenConnection) error { c.mutex.Lock() - defer c.mutex.Unlock() if conn == c.connection { + c.mutex.Unlock() return fmt.Errorf("Duplicate assignment of connection %v to contact %v", conn, c) } if !conn.IsAuthed || conn.Closed { + c.mutex.Unlock() + conn.Close() return fmt.Errorf("Connection %v is not in a valid state to assign to contact %v", conn, c) } if c.data.Hostname[0:16] != conn.OtherHostname { + c.mutex.Unlock() + conn.Close() return fmt.Errorf("Connection hostname %s doesn't match contact hostname %s when assigning connection", conn.OtherHostname, c.data.Hostname[0:16]) } - if c.connection != nil && c.connection.Closed { - log.Printf("Replacing dead connection %v for contact %v", c.connection, c) - c.connection = nil - } - - // Decide whether to replace an existing connection with this one if c.connection != nil { - // If the existing connection is in the same direction, always use the new one - if c.connection.Client == conn.Client { - log.Printf("Replacing existing same-direction connection %v with new connection %v for contact %v", c.connection, conn, c) - c.connection.Close() - c.connection = nil - } - - // If the existing connection is more than 30 seconds old, use the new one - // XXX implement this - - // Fall back to string comparison of hostnames for a stable resolution - preferOutbound := conn.MyHostname < conn.OtherHostname - if preferOutbound == conn.Client { - // New connection wins - log.Printf("Replacing existing connection %v with new connection %v for contact %v according to fallback order", c.connection, conn, c) + if c.shouldReplaceConnection(conn) { + // XXX Signal state change for connection loss? c.connection.Close() c.connection = nil } else { - // Old connection wins - log.Printf("Keeping existing connection %v instead of new connection %v for contact %v according to fallback order", c.connection, conn, c) + c.mutex.Unlock() conn.Close() return fmt.Errorf("Using existing connection") } @@ -136,13 +138,56 @@ func (c *Contact) SetConnection(conn *protocol.OpenConnection) error { // if the outbound connection will lose the fallback comparison above. // XXX implement this + // XXX react to connection state changes c.connection = conn + c.status = ricochet.Contact_ONLINE log.Printf("Assigned connection %v to contact %v", c.connection, c) // XXX implicit accept contact requests - // XXX update connected date - // XXX signal state and data changes - // XXX react to connection state changes + + // Update LastConnected time + config := c.core.Config.OpenWrite() + c.data.LastConnected = time.Now().Format(time.RFC3339) + config.Contacts[strconv.Itoa(c.id)] = c.data + config.Save() + + c.mutex.Unlock() + + event := ricochet.ContactEvent{ + Type: ricochet.ContactEvent_UPDATE, + Subject: &ricochet.ContactEvent_Contact{ + Contact: c.Data(), + }, + } + c.events.Publish(event) return nil } + +// Decide whether to replace the existing connection with conn. +// Assumes mutex is held. +func (c *Contact) shouldReplaceConnection(conn *protocol.OpenConnection) bool { + if c.connection == nil { + return true + } else if c.connection.Closed { + log.Printf("Replacing dead connection %v for contact %v", c.connection, c) + return true + } else if c.connection.Client == conn.Client { + // If the existing connection is in the same direction, always use the new one + log.Printf("Replacing existing same-direction connection %v with new connection %v for contact %v", c.connection, conn, c) + return true + } else if false { + // If the existing connection is more than 30 seconds old, use the new one + // XXX implement this + } else if preferOutbound := conn.MyHostname < conn.OtherHostname; preferOutbound == conn.Client { + // Fall back to string comparison of hostnames for a stable resolution + // New connection wins + log.Printf("Replacing existing connection %v with new connection %v for contact %v according to fallback order", c.connection, conn, c) + return true + } else { + // Old connection wins fallback + log.Printf("Keeping existing connection %v instead of new connection %v for contact %v according to fallback order", c.connection, conn, c) + return false + } + return false +} diff --git a/core/contactlist.go b/core/contactlist.go index 895783d..e2456ad 100644 --- a/core/contactlist.go +++ b/core/contactlist.go @@ -36,7 +36,7 @@ func LoadContactList(core *Ricochet) (*ContactList, error) { return nil, fmt.Errorf("Duplicate contact id '%d'", id) } - contact, err := ContactFromConfig(id, data) + contact, err := ContactFromConfig(core, id, data, list.events) if err != nil { return nil, err }