From 20700414fc3b11b8db59032d784ed0f1a0b196e9 Mon Sep 17 00:00:00 2001 From: John Brooks Date: Fri, 2 Sep 2016 11:51:24 -0500 Subject: [PATCH] core: Implement contact list monitor and contact removal --- backend/rpc.go | 29 ++++++++++++++++++++++-- core/contactlist.go | 54 +++++++++++++++++++++++++++++++++++++++++---- core/network.go | 2 +- 3 files changed, 78 insertions(+), 7 deletions(-) diff --git a/backend/rpc.go b/backend/rpc.go index 0d6bf0c..52a4cf4 100644 --- a/backend/rpc.go +++ b/backend/rpc.go @@ -93,6 +93,9 @@ func (core *RicochetCore) GetIdentity(ctx context.Context, req *rpc.IdentityRequ } func (core *RicochetCore) MonitorContacts(req *rpc.MonitorContactsRequest, stream rpc.RicochetCore_MonitorContactsServer) error { + monitor := core.Identity().ContactList().EventMonitor().Subscribe(20) + defer core.Identity().ContactList().EventMonitor().Unsubscribe(monitor) + // Populate contacts := core.Identity().ContactList().Contacts() for _, contact := range contacts { @@ -123,7 +126,19 @@ func (core *RicochetCore) MonitorContacts(req *rpc.MonitorContactsRequest, strea } } - return NotImplementedError + for { + event, ok := (<-monitor).(rpc.ContactEvent) + if !ok { + break + } + + log.Printf("Contact event: %v", event) + if err := stream.Send(&event); err != nil { + return err + } + } + + return nil } func (core *RicochetCore) AddContactRequest(ctx context.Context, req *rpc.ContactRequest) (*rpc.Contact, error) { @@ -135,7 +150,17 @@ func (core *RicochetCore) UpdateContact(ctx context.Context, req *rpc.Contact) ( } func (core *RicochetCore) DeleteContact(ctx context.Context, req *rpc.DeleteContactRequest) (*rpc.DeleteContactReply, error) { - return nil, NotImplementedError + contactList := core.Identity().ContactList() + contact := contactList.ContactByAddress(req.Address) + if contact == nil || (req.Id != 0 && contact.Id() != int(req.Id)) { + return nil, errors.New("Contact not found") + } + + if err := contactList.RemoveContact(contact); err != nil { + return nil, err + } + + return &rpc.DeleteContactReply{}, nil } func (core *RicochetCore) AcceptInboundRequest(ctx context.Context, req *rpc.ContactRequest) (*rpc.Contact, error) { diff --git a/core/contactlist.go b/core/contactlist.go index bf67b0c..7931e0a 100644 --- a/core/contactlist.go +++ b/core/contactlist.go @@ -3,17 +3,26 @@ package core import ( "errors" "fmt" + "github.com/special/notricochet/core/utils" + "github.com/special/notricochet/rpc" "strconv" + "sync" ) type ContactList struct { + mutex sync.RWMutex + events *utils.Publisher + contacts map[int]*Contact outboundRequests map[int]*OutboundContactRequest inboundRequests map[int]*InboundContactRequest } func LoadContactList(core Ricochet) (*ContactList, error) { - list := &ContactList{} + list := &ContactList{ + events: utils.CreatePublisher(), + } + config := core.Config().OpenRead() defer config.Close() @@ -38,7 +47,13 @@ func LoadContactList(core Ricochet) (*ContactList, error) { return list, nil } +func (this *ContactList) EventMonitor() utils.Subscribable { + return this.events +} + func (this *ContactList) Contacts() []*Contact { + this.mutex.RLock() + defer this.mutex.RUnlock() re := make([]*Contact, 0, len(this.contacts)) for _, contact := range this.contacts { re = append(re, contact) @@ -47,6 +62,8 @@ func (this *ContactList) Contacts() []*Contact { } func (this *ContactList) OutboundRequests() []*OutboundContactRequest { + this.mutex.RLock() + defer this.mutex.RUnlock() re := make([]*OutboundContactRequest, 0, len(this.outboundRequests)) for _, request := range this.outboundRequests { re = append(re, request) @@ -55,6 +72,8 @@ func (this *ContactList) OutboundRequests() []*OutboundContactRequest { } func (this *ContactList) InboundRequests() []*InboundContactRequest { + this.mutex.RLock() + defer this.mutex.RUnlock() re := make([]*InboundContactRequest, 0, len(this.inboundRequests)) for _, request := range this.inboundRequests { re = append(re, request) @@ -63,10 +82,14 @@ func (this *ContactList) InboundRequests() []*InboundContactRequest { } func (this *ContactList) ContactById(id int) *Contact { + this.mutex.RLock() + defer this.mutex.RUnlock() return this.contacts[id] } func (this *ContactList) ContactByAddress(address string) *Contact { + this.mutex.RLock() + defer this.mutex.RUnlock() for _, contact := range this.contacts { if contact.Address() == address { return contact @@ -75,10 +98,33 @@ func (this *ContactList) ContactByAddress(address string) *Contact { return nil } -func (contacts *ContactList) AddContact(address string, name string) (*Contact, error) { +func (this *ContactList) AddContact(address string, name string) (*Contact, error) { return nil, errors.New("Not implemented") } -func (contacts *ContactList) RemoveContactById(id int) error { - return errors.New("Not implemented") +func (this *ContactList) RemoveContact(contact *Contact) error { + this.mutex.Lock() + defer this.mutex.Unlock() + + if this.contacts[contact.Id()] != contact { + return errors.New("Not in contact list") + } + + // XXX Not persisting in config + // XXX This will have to do some things to the contact itself + // eventually too, such as killing connections and other resources. + delete(this.contacts, contact.Id()) + + event := ricochet.ContactEvent{ + Type: ricochet.ContactEvent_DELETE, + Subject: &ricochet.ContactEvent_Contact{ + Contact: &ricochet.Contact{ + Id: int32(contact.Id()), + Address: contact.Address(), + }, + }, + } + this.events.Publish(event) + + return nil } diff --git a/core/network.go b/core/network.go index f7f90b3..72b8fbc 100644 --- a/core/network.go +++ b/core/network.go @@ -120,7 +120,7 @@ func (n *Network) GetStatus() ricochet.NetworkStatus { // May return nil on failure, and the returned connection can be closed // or otherwise fail at any time. func (n *Network) getConnection() *bulb.Conn { - // Optimistically try to get a connection before subscribin to events + // Optimistically try to get a connection before subscribing to events n.controlMutex.Lock() conn := n.conn n.controlMutex.Unlock()