core: Send contact update events on status change
This commit is contained in:
parent
6cee5d7b45
commit
289d0f4dc2
|
@ -6,7 +6,6 @@ import (
|
||||||
rpc "github.com/special/notricochet/rpc"
|
rpc "github.com/special/notricochet/rpc"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"log"
|
"log"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var NotImplementedError error = errors.New("Not implemented")
|
var NotImplementedError error = errors.New("Not implemented")
|
||||||
|
@ -85,17 +84,10 @@ func (s *RpcServer) MonitorContacts(req *rpc.MonitorContactsRequest, stream rpc.
|
||||||
// Populate
|
// Populate
|
||||||
contacts := s.core.Identity.ContactList().Contacts()
|
contacts := s.core.Identity.ContactList().Contacts()
|
||||||
for _, contact := range contacts {
|
for _, contact := range contacts {
|
||||||
data := &rpc.Contact{
|
|
||||||
Id: int32(contact.Id()),
|
|
||||||
Address: contact.Address(),
|
|
||||||
Nickname: contact.Nickname(),
|
|
||||||
WhenCreated: contact.WhenCreated().Format(time.RFC3339),
|
|
||||||
LastConnected: contact.LastConnected().Format(time.RFC3339),
|
|
||||||
}
|
|
||||||
event := &rpc.ContactEvent{
|
event := &rpc.ContactEvent{
|
||||||
Type: rpc.ContactEvent_POPULATE,
|
Type: rpc.ContactEvent_POPULATE,
|
||||||
Subject: &rpc.ContactEvent_Contact{
|
Subject: &rpc.ContactEvent_Contact{
|
||||||
Contact: data,
|
Contact: contact.Data(),
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := stream.Send(event); err != nil {
|
if err := stream.Send(event); err != nil {
|
||||||
|
|
109
core/contact.go
109
core/contact.go
|
@ -3,8 +3,10 @@ package core
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
protocol "github.com/s-rah/go-ricochet"
|
protocol "github.com/s-rah/go-ricochet"
|
||||||
|
"github.com/special/notricochet/core/utils"
|
||||||
"github.com/special/notricochet/rpc"
|
"github.com/special/notricochet/rpc"
|
||||||
"log"
|
"log"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -14,18 +16,24 @@ import (
|
||||||
// Contact, ConfigContact, and rpc.Contact. This should be reduced somehow.
|
// Contact, ConfigContact, and rpc.Contact. This should be reduced somehow.
|
||||||
|
|
||||||
type Contact struct {
|
type Contact struct {
|
||||||
|
core *Ricochet
|
||||||
|
|
||||||
id int
|
id int
|
||||||
data ConfigContact
|
data ConfigContact
|
||||||
|
status ricochet.Contact_Status
|
||||||
|
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
|
events *utils.Publisher
|
||||||
|
|
||||||
connection *protocol.OpenConnection
|
connection *protocol.OpenConnection
|
||||||
}
|
}
|
||||||
|
|
||||||
func ContactFromConfig(id int, data ConfigContact) (*Contact, error) {
|
func ContactFromConfig(core *Ricochet, id int, data ConfigContact, events *utils.Publisher) (*Contact, error) {
|
||||||
contact := &Contact{
|
contact := &Contact{
|
||||||
|
core: core,
|
||||||
id: id,
|
id: id,
|
||||||
data: data,
|
data: data,
|
||||||
|
events: events,
|
||||||
}
|
}
|
||||||
|
|
||||||
if id < 0 {
|
if id < 0 {
|
||||||
|
@ -76,56 +84,50 @@ func (c *Contact) WhenCreated() time.Time {
|
||||||
func (c *Contact) Status() ricochet.Contact_Status {
|
func (c *Contact) Status() ricochet.Contact_Status {
|
||||||
c.mutex.Lock()
|
c.mutex.Lock()
|
||||||
defer c.mutex.Unlock()
|
defer c.mutex.Unlock()
|
||||||
if c.connection == nil {
|
return c.status
|
||||||
return ricochet.Contact_UNKNOWN
|
}
|
||||||
} else {
|
|
||||||
return ricochet.Contact_ONLINE
|
func (c *Contact) Data() *ricochet.Contact {
|
||||||
|
c.mutex.Lock()
|
||||||
|
defer c.mutex.Unlock()
|
||||||
|
data := &ricochet.Contact{
|
||||||
|
Id: int32(c.id),
|
||||||
|
Address: "ricochet:" + c.data.Hostname[0:16],
|
||||||
|
Nickname: c.data.Nickname,
|
||||||
|
WhenCreated: c.data.WhenCreated,
|
||||||
|
LastConnected: c.data.LastConnected,
|
||||||
|
Status: c.status,
|
||||||
}
|
}
|
||||||
|
return data
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Contact) SetConnection(conn *protocol.OpenConnection) error {
|
func (c *Contact) SetConnection(conn *protocol.OpenConnection) error {
|
||||||
c.mutex.Lock()
|
c.mutex.Lock()
|
||||||
defer c.mutex.Unlock()
|
|
||||||
|
|
||||||
if conn == c.connection {
|
if conn == c.connection {
|
||||||
|
c.mutex.Unlock()
|
||||||
return fmt.Errorf("Duplicate assignment of connection %v to contact %v", conn, c)
|
return fmt.Errorf("Duplicate assignment of connection %v to contact %v", conn, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !conn.IsAuthed || conn.Closed {
|
if !conn.IsAuthed || conn.Closed {
|
||||||
|
c.mutex.Unlock()
|
||||||
|
conn.Close()
|
||||||
return fmt.Errorf("Connection %v is not in a valid state to assign to contact %v", conn, c)
|
return fmt.Errorf("Connection %v is not in a valid state to assign to contact %v", conn, c)
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.data.Hostname[0:16] != conn.OtherHostname {
|
if c.data.Hostname[0:16] != conn.OtherHostname {
|
||||||
|
c.mutex.Unlock()
|
||||||
|
conn.Close()
|
||||||
return fmt.Errorf("Connection hostname %s doesn't match contact hostname %s when assigning connection", conn.OtherHostname, c.data.Hostname[0:16])
|
return fmt.Errorf("Connection hostname %s doesn't match contact hostname %s when assigning connection", conn.OtherHostname, c.data.Hostname[0:16])
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.connection != nil && c.connection.Closed {
|
|
||||||
log.Printf("Replacing dead connection %v for contact %v", c.connection, c)
|
|
||||||
c.connection = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Decide whether to replace an existing connection with this one
|
|
||||||
if c.connection != nil {
|
if c.connection != nil {
|
||||||
// If the existing connection is in the same direction, always use the new one
|
if c.shouldReplaceConnection(conn) {
|
||||||
if c.connection.Client == conn.Client {
|
// XXX Signal state change for connection loss?
|
||||||
log.Printf("Replacing existing same-direction connection %v with new connection %v for contact %v", c.connection, conn, c)
|
|
||||||
c.connection.Close()
|
|
||||||
c.connection = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the existing connection is more than 30 seconds old, use the new one
|
|
||||||
// XXX implement this
|
|
||||||
|
|
||||||
// Fall back to string comparison of hostnames for a stable resolution
|
|
||||||
preferOutbound := conn.MyHostname < conn.OtherHostname
|
|
||||||
if preferOutbound == conn.Client {
|
|
||||||
// New connection wins
|
|
||||||
log.Printf("Replacing existing connection %v with new connection %v for contact %v according to fallback order", c.connection, conn, c)
|
|
||||||
c.connection.Close()
|
c.connection.Close()
|
||||||
c.connection = nil
|
c.connection = nil
|
||||||
} else {
|
} else {
|
||||||
// Old connection wins
|
c.mutex.Unlock()
|
||||||
log.Printf("Keeping existing connection %v instead of new connection %v for contact %v according to fallback order", c.connection, conn, c)
|
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return fmt.Errorf("Using existing connection")
|
return fmt.Errorf("Using existing connection")
|
||||||
}
|
}
|
||||||
|
@ -136,13 +138,56 @@ func (c *Contact) SetConnection(conn *protocol.OpenConnection) error {
|
||||||
// if the outbound connection will lose the fallback comparison above.
|
// if the outbound connection will lose the fallback comparison above.
|
||||||
// XXX implement this
|
// XXX implement this
|
||||||
|
|
||||||
|
// XXX react to connection state changes
|
||||||
c.connection = conn
|
c.connection = conn
|
||||||
|
c.status = ricochet.Contact_ONLINE
|
||||||
log.Printf("Assigned connection %v to contact %v", c.connection, c)
|
log.Printf("Assigned connection %v to contact %v", c.connection, c)
|
||||||
|
|
||||||
// XXX implicit accept contact requests
|
// XXX implicit accept contact requests
|
||||||
// XXX update connected date
|
|
||||||
// XXX signal state and data changes
|
// Update LastConnected time
|
||||||
// XXX react to connection state changes
|
config := c.core.Config.OpenWrite()
|
||||||
|
c.data.LastConnected = time.Now().Format(time.RFC3339)
|
||||||
|
config.Contacts[strconv.Itoa(c.id)] = c.data
|
||||||
|
config.Save()
|
||||||
|
|
||||||
|
c.mutex.Unlock()
|
||||||
|
|
||||||
|
event := ricochet.ContactEvent{
|
||||||
|
Type: ricochet.ContactEvent_UPDATE,
|
||||||
|
Subject: &ricochet.ContactEvent_Contact{
|
||||||
|
Contact: c.Data(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
c.events.Publish(event)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Decide whether to replace the existing connection with conn.
|
||||||
|
// Assumes mutex is held.
|
||||||
|
func (c *Contact) shouldReplaceConnection(conn *protocol.OpenConnection) bool {
|
||||||
|
if c.connection == nil {
|
||||||
|
return true
|
||||||
|
} else if c.connection.Closed {
|
||||||
|
log.Printf("Replacing dead connection %v for contact %v", c.connection, c)
|
||||||
|
return true
|
||||||
|
} else if c.connection.Client == conn.Client {
|
||||||
|
// If the existing connection is in the same direction, always use the new one
|
||||||
|
log.Printf("Replacing existing same-direction connection %v with new connection %v for contact %v", c.connection, conn, c)
|
||||||
|
return true
|
||||||
|
} else if false {
|
||||||
|
// If the existing connection is more than 30 seconds old, use the new one
|
||||||
|
// XXX implement this
|
||||||
|
} else if preferOutbound := conn.MyHostname < conn.OtherHostname; preferOutbound == conn.Client {
|
||||||
|
// Fall back to string comparison of hostnames for a stable resolution
|
||||||
|
// New connection wins
|
||||||
|
log.Printf("Replacing existing connection %v with new connection %v for contact %v according to fallback order", c.connection, conn, c)
|
||||||
|
return true
|
||||||
|
} else {
|
||||||
|
// Old connection wins fallback
|
||||||
|
log.Printf("Keeping existing connection %v instead of new connection %v for contact %v according to fallback order", c.connection, conn, c)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
|
@ -36,7 +36,7 @@ func LoadContactList(core *Ricochet) (*ContactList, error) {
|
||||||
return nil, fmt.Errorf("Duplicate contact id '%d'", id)
|
return nil, fmt.Errorf("Duplicate contact id '%d'", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
contact, err := ContactFromConfig(id, data)
|
contact, err := ContactFromConfig(core, id, data, list.events)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue