cli: Fix threading issues around event monitors

This commit is contained in:
John Brooks 2016-10-15 21:57:07 -06:00
parent c089cf3e34
commit 99d44e1278
2 changed files with 145 additions and 126 deletions

View File

@ -42,6 +42,8 @@ func main() {
os.Exit(1) os.Exit(1)
} }
go c.Run()
input.SetPrompt("> ") input.SetPrompt("> ")
for { for {
line := input.Line() line := input.Line()

View File

@ -16,15 +16,18 @@ type Client struct {
ServerStatus ricochet.ServerStatusReply ServerStatus ricochet.ServerStatusReply
Identity ricochet.Identity Identity ricochet.Identity
// XXX threadsafety
NetworkStatus ricochet.NetworkStatus NetworkStatus ricochet.NetworkStatus
Contacts *ContactList Contacts *ContactList
CurrentContact *Contact CurrentContact *Contact
monitorsChannel chan interface{}
populatedContacts bool
} }
// XXX need to handle backend connection loss/reconnection.. // XXX need to handle backend connection loss/reconnection..
func (c *Client) Initialize() error { func (c *Client) Initialize() error {
c.Contacts = NewContactList() c.Contacts = NewContactList()
c.monitorsChannel = make(chan interface{}, 10)
// Query server status and version // Query server status and version
status, err := c.Backend.GetServerStatus(context.Background(), &ricochet.ServerStatusRequest{ status, err := c.Backend.GetServerStatus(context.Background(), &ricochet.ServerStatusRequest{
@ -48,12 +51,30 @@ func (c *Client) Initialize() error {
// Spawn routines to query and monitor state changes // Spawn routines to query and monitor state changes
go c.monitorNetwork() go c.monitorNetwork()
go c.monitorContacts() go c.monitorContacts()
go c.monitorConversations() // Conversation monitor isn't started until contacts are populated
// XXX block until populated/initialized? // XXX block until populated/initialized?
return nil return nil
} }
func (c *Client) Run() {
for {
select {
case v := <-c.monitorsChannel:
switch event := v.(type) {
case *ricochet.NetworkStatus:
c.onNetworkStatus(event)
case *ricochet.ContactEvent:
c.onContactEvent(event)
case *ricochet.ConversationEvent:
c.onConversationEvent(event)
default:
log.Panicf("Unknown event type on monitor channel: %v", event)
}
}
}
}
func (c *Client) SetCurrentContact(contact *Contact) { func (c *Client) SetCurrentContact(contact *Contact) {
c.CurrentContact = contact c.CurrentContact = contact
if c.CurrentContact != nil { if c.CurrentContact != nil {
@ -86,8 +107,7 @@ func (c *Client) monitorNetwork() {
break break
} }
log.Printf("Network status changed: %v", status) c.monitorsChannel <- status
c.NetworkStatus = *status
} }
} }
@ -99,52 +119,68 @@ func (c *Client) monitorContacts() {
return return
} }
// Populate initial contacts list
for { for {
event, err := stream.Recv() event, err := stream.Recv()
if err != nil { if err != nil {
log.Printf("Contact populate error: %v", err) log.Printf("Contact monitor error: %v", err)
// XXX handle // XXX handle
break break
} }
if event.Type != ricochet.ContactEvent_POPULATE { c.monitorsChannel <- event
}
}
func (c *Client) monitorConversations() {
stream, err := c.Backend.MonitorConversations(context.Background(), &ricochet.MonitorConversationsRequest{})
if err != nil {
log.Printf("Initializing conversations monitor failed: %v", err)
// XXX handle
return
}
for {
event, err := stream.Recv()
if err != nil {
log.Printf("Conversations monitor error: %v", err)
// XXX handle
break
}
c.monitorsChannel <- event
}
}
func (c *Client) onNetworkStatus(status *ricochet.NetworkStatus) {
log.Printf("Network status changed: %v", status)
c.NetworkStatus = *status
}
func (c *Client) onContactEvent(event *ricochet.ContactEvent) {
if !c.populatedContacts && event.Type != ricochet.ContactEvent_POPULATE {
log.Printf("Ignoring unexpected contact event during populate: %v", event) log.Printf("Ignoring unexpected contact event during populate: %v", event)
continue return
}
// Populate is terminated by a nil subject
if event.Subject == nil {
break
}
if contact := event.GetContact(); contact != nil {
c.Contacts.Populate(contact)
} else if request := event.GetRequest(); request != nil {
// XXX handle requests
log.Printf("XXX contact requests not supported")
} else {
log.Printf("XXX invalid event")
}
}
log.Printf("Loaded %d contacts", len(c.Contacts.Contacts))
for {
event, err := stream.Recv()
if err != nil {
log.Printf("Contact status monitor error: %v", err)
// XXX handle
break
} }
data := event.GetContact() data := event.GetContact()
switch event.Type { switch event.Type {
case ricochet.ContactEvent_POPULATE:
// Populate is terminated by a nil subject
if event.Subject == nil {
c.populatedContacts = true
log.Printf("Loaded %d contacts", len(c.Contacts.Contacts))
go c.monitorConversations()
} else if data != nil {
c.Contacts.Populate(data)
} else {
log.Printf("Invalid contact populate event: %v", event)
}
case ricochet.ContactEvent_ADD: case ricochet.ContactEvent_ADD:
if data == nil { if data == nil {
log.Printf("Ignoring contact add event with null data") log.Printf("Ignoring contact add event with null data")
continue return
} }
c.Contacts.Added(data) c.Contacts.Added(data)
@ -152,7 +188,7 @@ func (c *Client) monitorContacts() {
case ricochet.ContactEvent_UPDATE: case ricochet.ContactEvent_UPDATE:
if data == nil { if data == nil {
log.Printf("Ignoring contact update event with null data") log.Printf("Ignoring contact update event with null data")
continue return
} }
contact := c.Contacts.ByIdAndAddress(data.Id, data.Address) contact := c.Contacts.ByIdAndAddress(data.Id, data.Address)
@ -165,7 +201,7 @@ func (c *Client) monitorContacts() {
case ricochet.ContactEvent_DELETE: case ricochet.ContactEvent_DELETE:
if data == nil { if data == nil {
log.Printf("Ignoring contact delete event with null data") log.Printf("Ignoring contact delete event with null data")
continue return
} }
contact, _ := c.Contacts.Deleted(data) contact, _ := c.Contacts.Deleted(data)
@ -178,36 +214,18 @@ func (c *Client) monitorContacts() {
log.Printf("Ignoring unknown contact event: %v", event) log.Printf("Ignoring unknown contact event: %v", event)
} }
} }
}
func (c *Client) monitorConversations() { func (c *Client) onConversationEvent(event *ricochet.ConversationEvent) {
stream, err := c.Backend.MonitorConversations(context.Background(), &ricochet.MonitorConversationsRequest{})
if err != nil {
log.Printf("Initializing conversations monitor failed: %v", err)
// XXX handle
return
}
log.Printf("Monitoring conversations")
for {
event, err := stream.Recv()
if err != nil {
log.Printf("Conversations monitor error: %v", err)
// XXX handle
break
}
// XXX Should also handle POPULATE
if event.Type != ricochet.ConversationEvent_RECEIVE && if event.Type != ricochet.ConversationEvent_RECEIVE &&
event.Type != ricochet.ConversationEvent_SEND { event.Type != ricochet.ConversationEvent_SEND &&
continue event.Type != ricochet.ConversationEvent_POPULATE {
return
} }
message := event.Msg message := event.Msg
if message == nil || message.Recipient == nil || message.Sender == nil { if message == nil || message.Recipient == nil || message.Sender == nil {
log.Printf("Ignoring invalid conversation event: %v", event) log.Printf("Ignoring invalid conversation event: %v", event)
continue return
} }
var remoteEntity *ricochet.Entity var remoteEntity *ricochet.Entity
@ -220,7 +238,7 @@ func (c *Client) monitorConversations() {
remoteContact := c.Contacts.ByIdAndAddress(remoteEntity.ContactId, remoteEntity.Address) remoteContact := c.Contacts.ByIdAndAddress(remoteEntity.ContactId, remoteEntity.Address)
if remoteContact == nil { if remoteContact == nil {
log.Printf("Ignoring conversation event with unknown contact: %v", event) log.Printf("Ignoring conversation event with unknown contact: %v", event)
continue return
} }
if remoteContact == c.CurrentContact { if remoteContact == c.CurrentContact {
@ -248,4 +266,3 @@ func (c *Client) monitorConversations() {
}() }()
} }
} }
}