core: Early proof-of-concept conversations API

This commit is contained in:
John Brooks 2016-10-05 14:38:18 -07:00
parent e1db24360b
commit 623dccb26d
9 changed files with 319 additions and 92 deletions

View File

@ -149,6 +149,26 @@ func (s *RpcServer) RejectInboundRequest(ctx context.Context, req *rpc.ContactRe
return nil, NotImplementedError return nil, NotImplementedError
} }
func (s *RpcServer) StreamConversations(stream rpc.RicochetCore_StreamConversationsServer) error { func (s *RpcServer) MonitorConversations(req *rpc.MonitorConversationsRequest, stream rpc.RicochetCore_MonitorConversationsServer) error {
return NotImplementedError monitor := ricochet.ConversationEventMonitor().Subscribe(100)
defer ricochet.ConversationEventMonitor().Unsubscribe(monitor)
// XXX should populate
for {
event, ok := (<-monitor).(rpc.ConversationEvent)
if !ok {
break
}
if err := stream.Send(&event); err != nil {
return err
}
}
return nil
}
func (s *RpcServer) SendMessage(ctx context.Context, req *rpc.Message) (*rpc.Message, error) {
return nil, NotImplementedError
} }

View File

@ -29,6 +29,8 @@ type Contact struct {
connection *protocol.OpenConnection connection *protocol.OpenConnection
connChannel chan *protocol.OpenConnection connChannel chan *protocol.OpenConnection
connClosedChannel chan struct{} connClosedChannel chan struct{}
conversation *Conversation
} }
func ContactFromConfig(core *Ricochet, id int, data ConfigContact, events *utils.Publisher) (*Contact, error) { func ContactFromConfig(core *Ricochet, id int, data ConfigContact, events *utils.Publisher) (*Contact, error) {
@ -110,6 +112,19 @@ func (c *Contact) Data() *ricochet.Contact {
return data return data
} }
func (c *Contact) Conversation() *Conversation {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.conversation == nil {
entity := &ricochet.Entity{
ContactId: int32(c.id),
Address: "ricochet:" + c.data.Hostname[0:16],
}
c.conversation = NewConversation(c.core, c, entity)
}
return c.conversation
}
// Goroutine to handle the protocol connection for a contact. // Goroutine to handle the protocol connection for a contact.
// Responsible for making outbound connections and taking over authenticated // Responsible for making outbound connections and taking over authenticated
// inbound connections, running protocol handlers on the active connection, and // inbound connections, running protocol handlers on the active connection, and

113
core/conversation.go Normal file
View File

@ -0,0 +1,113 @@
package core
import (
"github.com/special/notricochet/core/utils"
"github.com/special/notricochet/rpc"
"log"
"sync"
"time"
)
// XXX threading model.. this one isn't great
// XXX Should probably be under core or identity
var conversationStream *utils.Publisher = utils.CreatePublisher()
type Conversation struct {
Core *Ricochet
Contact *Contact
mutex sync.Mutex
localEntity *ricochet.Entity
remoteEntity *ricochet.Entity
messages []*ricochet.Message
}
func NewConversation(core *Ricochet, contact *Contact, remoteEntity *ricochet.Entity) *Conversation {
return &Conversation{
Core: core,
Contact: contact,
localEntity: &ricochet.Entity{IsSelf: true},
remoteEntity: remoteEntity,
messages: make([]*ricochet.Message, 0),
}
}
func ConversationEventMonitor() utils.Subscribable {
return conversationStream
}
func (c *Conversation) Receive(id uint64, timestamp int64, text string) {
message := &ricochet.Message{
Sender: c.remoteEntity,
Recipient: c.localEntity,
Timestamp: timestamp,
Identifier: id,
Status: ricochet.Message_RECEIVED,
Text: text,
}
// XXX container
// XXX limit backlog/etc
c.mutex.Lock()
c.messages = append(c.messages, message)
log.Printf("Conversation received message: %v", message)
c.mutex.Unlock()
// XXX Technically these aren't guaranteed to be in order (because
// the lock has been released) or to all arrive (because of publisher's
// dropping behavior)...
event := ricochet.ConversationEvent{
Type: ricochet.ConversationEvent_RECEIVE,
Msg: message,
}
conversationStream.Publish(event)
}
func (c *Conversation) UpdateSentStatus(id uint64, success bool) {
c.mutex.Lock()
for _, message := range c.messages {
if message.Status != ricochet.Message_SENDING || message.Identifier != id {
continue
}
if success {
message.Status = ricochet.Message_DELIVERED
} else {
message.Status = ricochet.Message_ERROR
}
c.mutex.Unlock()
event := ricochet.ConversationEvent{
Type: ricochet.ConversationEvent_UPDATE,
Msg: message,
}
conversationStream.Publish(event)
return
}
c.mutex.Unlock()
}
func (c *Conversation) Send(text string) {
// XXX protocol
// XXX check that text is ok, get identifier, etc
// XXX decide whether sending or queued based on state
message := &ricochet.Message{
Sender: c.localEntity,
Recipient: c.remoteEntity,
Timestamp: time.Now().Unix(),
Identifier: 0, // XXX
Status: ricochet.Message_SENDING, // XXX
Text: text,
}
c.mutex.Lock()
c.messages = append(c.messages, message)
log.Printf("Conversation sent message: %v", message)
c.mutex.Unlock()
event := ricochet.ConversationEvent{
Type: ricochet.ConversationEvent_SEND,
Msg: message,
}
conversationStream.Publish(event)
}

View File

@ -5,6 +5,7 @@ import (
protocol "github.com/s-rah/go-ricochet" protocol "github.com/s-rah/go-ricochet"
"log" "log"
"net" "net"
"time"
) )
type Protocol struct { type Protocol struct {
@ -150,22 +151,48 @@ func (handler *protocolHandler) OnContactRequestAck(oc *protocol.OpenConnection,
// Managing Channels // Managing Channels
func (handler *protocolHandler) OnOpenChannelRequest(oc *protocol.OpenConnection, channelID int32, channelType string) { func (handler *protocolHandler) OnOpenChannelRequest(oc *protocol.OpenConnection, channelID int32, channelType string) {
log.Printf("open channel request: %v %v", channelID, channelType)
oc.AckOpenChannel(channelID, channelType) oc.AckOpenChannel(channelID, channelType)
} }
func (handler *protocolHandler) OnOpenChannelRequestSuccess(oc *protocol.OpenConnection, channelID int32) { func (handler *protocolHandler) OnOpenChannelRequestSuccess(oc *protocol.OpenConnection, channelID int32) {
log.Printf("open channel request success: %v %v", channelID)
} }
func (handler *protocolHandler) OnChannelClosed(oc *protocol.OpenConnection, channelID int32) { func (handler *protocolHandler) OnChannelClosed(oc *protocol.OpenConnection, channelID int32) {
log.Printf("channel closed: %v", channelID)
} }
// Chat Messages // Chat Messages
// XXX messageID should be (at least) uint32
func (handler *protocolHandler) OnChatMessage(oc *protocol.OpenConnection, channelID int32, messageID int32, message string) { func (handler *protocolHandler) OnChatMessage(oc *protocol.OpenConnection, channelID int32, messageID int32, message string) {
// XXX no time delta?
// XXX sanity checks, message contents, etc
log.Printf("chat message: %d %d %s", channelID, messageID, message)
// XXX ugllly
contact := handler.p.core.Identity.ContactList().ContactByAddress("ricochet:" + oc.OtherHostname)
if contact != nil {
conversation := contact.Conversation()
conversation.Receive(uint64(messageID), time.Now().Unix(), message)
}
oc.AckChatMessage(channelID, messageID)
} }
func (handler *protocolHandler) OnChatMessageAck(oc *protocol.OpenConnection, channelID int32, messageID int32) { func (handler *protocolHandler) OnChatMessageAck(oc *protocol.OpenConnection, channelID int32, messageID int32) {
// XXX no success
log.Printf("chat ack: %d %d", channelID, messageID)
// XXX Also ugly
contact := handler.p.core.Identity.ContactList().ContactByAddress("ricochet:" + oc.OtherHostname)
if contact != nil {
conversation := contact.Conversation()
conversation.UpdateSentStatus(uint64(messageID), true)
}
} }
// Handle Errors // Handle Errors
func (handler *protocolHandler) OnFailedChannelOpen(oc *protocol.OpenConnection, channelID int32, errorType string) { func (handler *protocolHandler) OnFailedChannelOpen(oc *protocol.OpenConnection, channelID int32, errorType string) {
log.Printf("failed channel open: %d %s", channelID, errorType)
oc.UnsetChannel(channelID) oc.UnsetChannel(channelID)
} }
func (handler *protocolHandler) OnGenericError(oc *protocol.OpenConnection, channelID int32) { func (handler *protocolHandler) OnGenericError(oc *protocol.OpenConnection, channelID int32) {

View File

@ -25,6 +25,7 @@ It has these top-level messages:
DeleteContactReply DeleteContactReply
RejectInboundRequestReply RejectInboundRequestReply
ConversationEvent ConversationEvent
MonitorConversationsRequest
Entity Entity
Message Message
ServerStatusRequest ServerStatusRequest

View File

@ -74,7 +74,7 @@ var Message_Status_value = map[string]int32{
func (x Message_Status) String() string { func (x Message_Status) String() string {
return proto.EnumName(Message_Status_name, int32(x)) return proto.EnumName(Message_Status_name, int32(x))
} }
func (Message_Status) EnumDescriptor() ([]byte, []int) { return fileDescriptor2, []int{2, 0} } func (Message_Status) EnumDescriptor() ([]byte, []int) { return fileDescriptor2, []int{3, 0} }
type ConversationEvent struct { type ConversationEvent struct {
Type ConversationEvent_Type `protobuf:"varint,1,opt,name=type,enum=ricochet.ConversationEvent_Type" json:"type,omitempty"` Type ConversationEvent_Type `protobuf:"varint,1,opt,name=type,enum=ricochet.ConversationEvent_Type" json:"type,omitempty"`
@ -93,24 +93,34 @@ func (m *ConversationEvent) GetMsg() *Message {
return nil return nil
} }
type MonitorConversationsRequest struct {
}
func (m *MonitorConversationsRequest) Reset() { *m = MonitorConversationsRequest{} }
func (m *MonitorConversationsRequest) String() string { return proto.CompactTextString(m) }
func (*MonitorConversationsRequest) ProtoMessage() {}
func (*MonitorConversationsRequest) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{1} }
type Entity struct { type Entity struct {
// null is self // contactId and address MAY be unspecified for self
ContactId int32 `protobuf:"varint,1,opt,name=contactId" json:"contactId,omitempty"` ContactId int32 `protobuf:"varint,1,opt,name=contactId" json:"contactId,omitempty"`
Address string `protobuf:"bytes,2,opt,name=address" json:"address,omitempty"` Address string `protobuf:"bytes,2,opt,name=address" json:"address,omitempty"`
IsSelf bool `protobuf:"varint,3,opt,name=isSelf" json:"isSelf,omitempty"`
} }
func (m *Entity) Reset() { *m = Entity{} } func (m *Entity) Reset() { *m = Entity{} }
func (m *Entity) String() string { return proto.CompactTextString(m) } func (m *Entity) String() string { return proto.CompactTextString(m) }
func (*Entity) ProtoMessage() {} func (*Entity) ProtoMessage() {}
func (*Entity) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{1} } func (*Entity) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{2} }
type Message struct { type Message struct {
Sender *Entity `protobuf:"bytes,1,opt,name=sender" json:"sender,omitempty"` Sender *Entity `protobuf:"bytes,1,opt,name=sender" json:"sender,omitempty"`
Recipient *Entity `protobuf:"bytes,2,opt,name=recipient" json:"recipient,omitempty"` Recipient *Entity `protobuf:"bytes,2,opt,name=recipient" json:"recipient,omitempty"`
Timestamp int64 `protobuf:"varint,3,opt,name=timestamp" json:"timestamp,omitempty"` Timestamp int64 `protobuf:"varint,3,opt,name=timestamp" json:"timestamp,omitempty"`
// Identifiers are unique _only_ to a sender/recipient pair in a session // Identifiers are unique for the tuple of (sender, recipient, direction)
// XXX This is a silly thing to perpetuate; should we UUID? // within a single session, and should be randomized between sessions to
Identifier uint32 `protobuf:"varint,4,opt,name=identifier" json:"identifier,omitempty"` // reduce the chance of collision.
Identifier uint64 `protobuf:"varint,4,opt,name=identifier" json:"identifier,omitempty"`
Status Message_Status `protobuf:"varint,5,opt,name=status,enum=ricochet.Message_Status" json:"status,omitempty"` Status Message_Status `protobuf:"varint,5,opt,name=status,enum=ricochet.Message_Status" json:"status,omitempty"`
Text string `protobuf:"bytes,6,opt,name=text" json:"text,omitempty"` Text string `protobuf:"bytes,6,opt,name=text" json:"text,omitempty"`
} }
@ -118,7 +128,7 @@ type Message struct {
func (m *Message) Reset() { *m = Message{} } func (m *Message) Reset() { *m = Message{} }
func (m *Message) String() string { return proto.CompactTextString(m) } func (m *Message) String() string { return proto.CompactTextString(m) }
func (*Message) ProtoMessage() {} func (*Message) ProtoMessage() {}
func (*Message) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{2} } func (*Message) Descriptor() ([]byte, []int) { return fileDescriptor2, []int{3} }
func (m *Message) GetSender() *Entity { func (m *Message) GetSender() *Entity {
if m != nil { if m != nil {
@ -136,6 +146,7 @@ func (m *Message) GetRecipient() *Entity {
func init() { func init() {
proto.RegisterType((*ConversationEvent)(nil), "ricochet.ConversationEvent") proto.RegisterType((*ConversationEvent)(nil), "ricochet.ConversationEvent")
proto.RegisterType((*MonitorConversationsRequest)(nil), "ricochet.MonitorConversationsRequest")
proto.RegisterType((*Entity)(nil), "ricochet.Entity") proto.RegisterType((*Entity)(nil), "ricochet.Entity")
proto.RegisterType((*Message)(nil), "ricochet.Message") proto.RegisterType((*Message)(nil), "ricochet.Message")
proto.RegisterEnum("ricochet.ConversationEvent_Type", ConversationEvent_Type_name, ConversationEvent_Type_value) proto.RegisterEnum("ricochet.ConversationEvent_Type", ConversationEvent_Type_name, ConversationEvent_Type_value)
@ -145,29 +156,31 @@ func init() {
func init() { proto.RegisterFile("conversation.proto", fileDescriptor2) } func init() { proto.RegisterFile("conversation.proto", fileDescriptor2) }
var fileDescriptor2 = []byte{ var fileDescriptor2 = []byte{
// 378 bytes of a gzipped FileDescriptorProto // 413 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x64, 0x92, 0x4d, 0xef, 0xd2, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x64, 0x92, 0xcf, 0x6e, 0xd3, 0x40,
0x10, 0xc6, 0xed, 0x2b, 0x74, 0x10, 0x53, 0xe6, 0xd4, 0x83, 0x31, 0xa4, 0x5e, 0x38, 0x35, 0x06, 0x10, 0xc6, 0xf1, 0xdf, 0xd8, 0x13, 0x40, 0xee, 0x1c, 0x90, 0x25, 0xfe, 0xa8, 0x32, 0x97, 0x9e,
0xfd, 0x00, 0x12, 0xba, 0x31, 0x24, 0x15, 0x70, 0xa1, 0xde, 0x6b, 0xbb, 0xe2, 0x1e, 0x68, 0x9b, 0x2c, 0x14, 0x78, 0x81, 0xaa, 0x5e, 0xa1, 0x48, 0x6e, 0x1a, 0xd6, 0x35, 0xe2, 0x6a, 0xec, 0x6d,
0x76, 0x25, 0xf2, 0xb1, 0xbc, 0xfb, 0xe1, 0x9c, 0x2e, 0x25, 0xe5, 0x1f, 0x6e, 0xdb, 0x79, 0x7e, 0x59, 0x89, 0xd8, 0xc6, 0xbb, 0xad, 0xe8, 0x63, 0x71, 0xe7, 0xe1, 0x18, 0x6f, 0x1c, 0x39, 0x28,
0xd3, 0x79, 0xe6, 0xd9, 0x05, 0xcc, 0xab, 0xf2, 0x22, 0x9a, 0x36, 0x53, 0xb2, 0x2a, 0xa3, 0xba, 0xb7, 0xdd, 0xf9, 0x7e, 0xe3, 0xfd, 0x66, 0x3e, 0x03, 0xd6, 0x5d, 0xfb, 0x28, 0x06, 0x55, 0x69,
0xa9, 0x54, 0x85, 0xe3, 0x46, 0xe6, 0x55, 0xfe, 0x4b, 0xa8, 0xf0, 0xaf, 0x01, 0xb3, 0xf5, 0x03, 0xd9, 0xb5, 0x69, 0x3f, 0x74, 0xba, 0xc3, 0x60, 0x90, 0x75, 0x57, 0xff, 0x10, 0x3a, 0xf9, 0x63,
0xc0, 0x2e, 0xa2, 0x54, 0xf8, 0x09, 0x6c, 0x75, 0xad, 0x45, 0x60, 0xcc, 0x8d, 0xc5, 0x9b, 0xe5, 0xc1, 0xd9, 0xd5, 0x11, 0xc0, 0x1e, 0x45, 0xab, 0xf1, 0x13, 0xb8, 0xfa, 0xa9, 0x17, 0xb1, 0x75,
0x3c, 0xba, 0xe3, 0xd1, 0x13, 0x1a, 0x1d, 0x89, 0xe3, 0x9a, 0xc6, 0xf7, 0x60, 0x9d, 0xdb, 0x53, 0x6e, 0x5d, 0xbc, 0x5c, 0x9d, 0xa7, 0x07, 0x3c, 0x3d, 0x41, 0xd3, 0x5b, 0xe2, 0xb8, 0xa1, 0xf1,
0x60, 0x52, 0xd3, 0x64, 0x39, 0x1b, 0x9a, 0xbe, 0x8a, 0xb6, 0xcd, 0x4e, 0x82, 0x77, 0x6a, 0xb8, 0x3d, 0x38, 0x3b, 0x75, 0x1f, 0xdb, 0xd4, 0xb4, 0x5c, 0x9d, 0xcd, 0x4d, 0xd7, 0x42, 0xa9, 0xea,
0x02, 0xbb, 0x6b, 0xc1, 0x31, 0xd8, 0xdb, 0x34, 0x49, 0xfc, 0x57, 0xf8, 0x1a, 0xc6, 0xfb, 0xdd, 0x5e, 0xf0, 0x51, 0x4d, 0x2e, 0xc1, 0x1d, 0x5b, 0x30, 0x00, 0x77, 0x53, 0xe6, 0x79, 0xf4, 0x0c,
0x3e, 0x4d, 0x56, 0x47, 0xe6, 0x1b, 0x38, 0x81, 0x11, 0x67, 0x6b, 0xb6, 0xf9, 0xce, 0x7c, 0xb3, 0x9f, 0x43, 0xb0, 0xbd, 0xd9, 0x96, 0xf9, 0xe5, 0x2d, 0x8b, 0x2c, 0x5c, 0xc2, 0x82, 0xb3, 0x2b,
0x83, 0x0e, 0x6c, 0x1b, 0xfb, 0x16, 0x02, 0xb8, 0xe9, 0x3e, 0xee, 0x10, 0x3b, 0xfc, 0x0c, 0x2e, 0xb6, 0xfe, 0xca, 0x22, 0x7b, 0x84, 0x0a, 0xb6, 0xc9, 0x22, 0x07, 0x01, 0xfc, 0x72, 0x9b, 0x8d,
0x2b, 0x95, 0x54, 0x57, 0x7c, 0x0b, 0x1e, 0x6d, 0xa7, 0xb2, 0x5c, 0x6d, 0x0a, 0x6d, 0xd6, 0xe1, 0x88, 0x9b, 0xbc, 0x85, 0xd7, 0xd7, 0x5d, 0x2b, 0x75, 0x37, 0x1c, 0xdb, 0x51, 0x5c, 0xfc, 0x7a,
0x43, 0x01, 0x03, 0x18, 0x65, 0x45, 0xd1, 0xd0, 0x74, 0xed, 0xc9, 0xe3, 0xf7, 0xcf, 0xf0, 0x9f, 0x10, 0x4a, 0x27, 0xdf, 0xc0, 0x67, 0xad, 0x96, 0xfa, 0x09, 0xdf, 0x40, 0x48, 0xc3, 0xeb, 0xaa,
0x09, 0xa3, 0xde, 0x15, 0x2e, 0xc0, 0x6d, 0x45, 0x59, 0x88, 0x46, 0xff, 0x60, 0xb2, 0xf4, 0x07, 0xd6, 0xeb, 0xc6, 0xcc, 0xe2, 0xf1, 0xb9, 0x80, 0x31, 0x2c, 0xaa, 0xa6, 0x19, 0xc8, 0x9c, 0xb1,
0xe3, 0xb7, 0x29, 0xbc, 0xd7, 0x31, 0x02, 0xaf, 0x11, 0xb9, 0xac, 0x25, 0xed, 0xdd, 0x6f, 0xf9, 0x1c, 0xf2, 0xc3, 0x15, 0x5f, 0x81, 0x2f, 0x55, 0x21, 0x7e, 0xde, 0xc5, 0x0e, 0x09, 0x01, 0x9f,
0x0c, 0x0f, 0x48, 0xe7, 0x4e, 0xc9, 0xb3, 0x68, 0x55, 0x76, 0xae, 0x03, 0x8b, 0x78, 0x8b, 0x0f, 0x6e, 0xc9, 0x5f, 0x1b, 0x16, 0xd3, 0x30, 0x78, 0x01, 0xbe, 0x12, 0x6d, 0x23, 0x06, 0xf3, 0xe1,
0x05, 0x7c, 0x07, 0x20, 0x0b, 0xc2, 0xe4, 0x4f, 0x49, 0xb3, 0x6d, 0x92, 0xa7, 0xfc, 0xa1, 0x82, 0xe5, 0x2a, 0x9a, 0xe7, 0xdd, 0xbf, 0xce, 0x27, 0x1d, 0x53, 0x08, 0x07, 0x51, 0xcb, 0x5e, 0xd2,
0x1f, 0xc8, 0x97, 0xca, 0xd4, 0xef, 0x36, 0x70, 0xf4, 0x2d, 0x04, 0x4f, 0x81, 0x46, 0x07, 0xad, 0xba, 0xa6, 0xe5, 0x9c, 0xc2, 0x33, 0x32, 0xba, 0xd6, 0x72, 0x47, 0x93, 0x54, 0xbb, 0xde, 0x18,
0xf3, 0x9e, 0x43, 0xa4, 0x5b, 0x13, 0x7f, 0x54, 0xe0, 0xea, 0x65, 0xf5, 0x39, 0x3c, 0x80, 0x7b, 0x70, 0xf8, 0x5c, 0xc0, 0x77, 0x00, 0xb2, 0x21, 0x4c, 0xde, 0x49, 0x7a, 0xdb, 0x25, 0xd9, 0xe5,
0xa3, 0x5e, 0x06, 0xde, 0x47, 0x1c, 0x53, 0xe0, 0x94, 0xec, 0xb7, 0x94, 0xa5, 0x74, 0x36, 0xbb, 0x47, 0x15, 0xfc, 0x40, 0xbe, 0x74, 0xa5, 0x1f, 0x54, 0xec, 0x99, 0xf0, 0xe2, 0x93, 0x1c, 0xd2,
0xf0, 0xbb, 0xbc, 0x37, 0xdb, 0x2f, 0x14, 0xf9, 0x14, 0xbc, 0x98, 0x25, 0x44, 0x71, 0xd2, 0x6c, 0xc2, 0xe8, 0x7c, 0xe2, 0x10, 0x29, 0x6c, 0xf1, 0x5b, 0xc7, 0xbe, 0x59, 0x82, 0x39, 0x27, 0x05,
0xf4, 0xc0, 0x61, 0x9c, 0xef, 0xb8, 0xef, 0xfc, 0x70, 0xf5, 0x2b, 0xfa, 0xf8, 0x3f, 0x00, 0x00, 0xf8, 0x7b, 0xea, 0xff, 0x9c, 0xa6, 0x64, 0x32, 0xca, 0x89, 0x02, 0xf9, 0x52, 0xb2, 0x92, 0xce,
0xff, 0xff, 0xe1, 0x5b, 0xde, 0x0a, 0x5b, 0x02, 0x00, 0x00, 0xf6, 0x98, 0xd9, 0x18, 0xd3, 0x7a, 0xf3, 0x99, 0x92, 0x7a, 0x01, 0x61, 0xc6, 0x72, 0xa2, 0x38,
0x69, 0x2e, 0x86, 0xe0, 0x31, 0xce, 0x6f, 0x78, 0xe4, 0x7d, 0xf7, 0xcd, 0xcf, 0xf7, 0xf1, 0x5f,
0x00, 0x00, 0x00, 0xff, 0xff, 0x58, 0x7a, 0x99, 0xbe, 0x92, 0x02, 0x00, 0x00,
} }

View File

@ -14,19 +14,24 @@ message ConversationEvent {
Message msg = 2; Message msg = 2;
} }
message MonitorConversationsRequest {
}
message Entity { message Entity {
// null is self // contactId and address MAY be unspecified for self
int32 contactId = 1; int32 contactId = 1;
string address = 2; string address = 2;
bool isSelf = 3;
} }
message Message { message Message {
Entity sender = 1; Entity sender = 1;
Entity recipient = 2; Entity recipient = 2;
int64 timestamp = 3; int64 timestamp = 3;
// Identifiers are unique _only_ to a sender/recipient pair in a session // Identifiers are unique for the tuple of (sender, recipient, direction)
// XXX This is a silly thing to perpetuate; should we UUID? // within a single session, and should be randomized between sessions to
uint32 identifier = 4; // reduce the chance of collision.
uint64 identifier = 4;
enum Status { enum Status {
NULL = 0; NULL = 0;

View File

@ -82,7 +82,9 @@ type RicochetCoreClient interface {
DeleteContact(ctx context.Context, in *DeleteContactRequest, opts ...grpc.CallOption) (*DeleteContactReply, error) DeleteContact(ctx context.Context, in *DeleteContactRequest, opts ...grpc.CallOption) (*DeleteContactReply, error)
AcceptInboundRequest(ctx context.Context, in *ContactRequest, opts ...grpc.CallOption) (*Contact, error) AcceptInboundRequest(ctx context.Context, in *ContactRequest, opts ...grpc.CallOption) (*Contact, error)
RejectInboundRequest(ctx context.Context, in *ContactRequest, opts ...grpc.CallOption) (*RejectInboundRequestReply, error) RejectInboundRequest(ctx context.Context, in *ContactRequest, opts ...grpc.CallOption) (*RejectInboundRequestReply, error)
StreamConversations(ctx context.Context, opts ...grpc.CallOption) (RicochetCore_StreamConversationsClient, error) // Open a stream to monitor messages in conversations with contacts.
MonitorConversations(ctx context.Context, in *MonitorConversationsRequest, opts ...grpc.CallOption) (RicochetCore_MonitorConversationsClient, error)
SendMessage(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error)
} }
type ricochetCoreClient struct { type ricochetCoreClient struct {
@ -238,30 +240,31 @@ func (c *ricochetCoreClient) RejectInboundRequest(ctx context.Context, in *Conta
return out, nil return out, nil
} }
func (c *ricochetCoreClient) StreamConversations(ctx context.Context, opts ...grpc.CallOption) (RicochetCore_StreamConversationsClient, error) { func (c *ricochetCoreClient) MonitorConversations(ctx context.Context, in *MonitorConversationsRequest, opts ...grpc.CallOption) (RicochetCore_MonitorConversationsClient, error) {
stream, err := grpc.NewClientStream(ctx, &_RicochetCore_serviceDesc.Streams[2], c.cc, "/ricochet.RicochetCore/StreamConversations", opts...) stream, err := grpc.NewClientStream(ctx, &_RicochetCore_serviceDesc.Streams[2], c.cc, "/ricochet.RicochetCore/MonitorConversations", opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
x := &ricochetCoreStreamConversationsClient{stream} x := &ricochetCoreMonitorConversationsClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil return x, nil
} }
type RicochetCore_StreamConversationsClient interface { type RicochetCore_MonitorConversationsClient interface {
Send(*ConversationEvent) error
Recv() (*ConversationEvent, error) Recv() (*ConversationEvent, error)
grpc.ClientStream grpc.ClientStream
} }
type ricochetCoreStreamConversationsClient struct { type ricochetCoreMonitorConversationsClient struct {
grpc.ClientStream grpc.ClientStream
} }
func (x *ricochetCoreStreamConversationsClient) Send(m *ConversationEvent) error { func (x *ricochetCoreMonitorConversationsClient) Recv() (*ConversationEvent, error) {
return x.ClientStream.SendMsg(m)
}
func (x *ricochetCoreStreamConversationsClient) Recv() (*ConversationEvent, error) {
m := new(ConversationEvent) m := new(ConversationEvent)
if err := x.ClientStream.RecvMsg(m); err != nil { if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err return nil, err
@ -269,6 +272,15 @@ func (x *ricochetCoreStreamConversationsClient) Recv() (*ConversationEvent, erro
return m, nil return m, nil
} }
func (c *ricochetCoreClient) SendMessage(ctx context.Context, in *Message, opts ...grpc.CallOption) (*Message, error) {
out := new(Message)
err := grpc.Invoke(ctx, "/ricochet.RicochetCore/SendMessage", in, out, c.cc, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// Server API for RicochetCore service // Server API for RicochetCore service
type RicochetCoreServer interface { type RicochetCoreServer interface {
@ -301,7 +313,9 @@ type RicochetCoreServer interface {
DeleteContact(context.Context, *DeleteContactRequest) (*DeleteContactReply, error) DeleteContact(context.Context, *DeleteContactRequest) (*DeleteContactReply, error)
AcceptInboundRequest(context.Context, *ContactRequest) (*Contact, error) AcceptInboundRequest(context.Context, *ContactRequest) (*Contact, error)
RejectInboundRequest(context.Context, *ContactRequest) (*RejectInboundRequestReply, error) RejectInboundRequest(context.Context, *ContactRequest) (*RejectInboundRequestReply, error)
StreamConversations(RicochetCore_StreamConversationsServer) error // Open a stream to monitor messages in conversations with contacts.
MonitorConversations(*MonitorConversationsRequest, RicochetCore_MonitorConversationsServer) error
SendMessage(context.Context, *Message) (*Message, error)
} }
func RegisterRicochetCoreServer(s *grpc.Server, srv RicochetCoreServer) { func RegisterRicochetCoreServer(s *grpc.Server, srv RicochetCoreServer) {
@ -512,30 +526,43 @@ func _RicochetCore_RejectInboundRequest_Handler(srv interface{}, ctx context.Con
return interceptor(ctx, in, info, handler) return interceptor(ctx, in, info, handler)
} }
func _RicochetCore_StreamConversations_Handler(srv interface{}, stream grpc.ServerStream) error { func _RicochetCore_MonitorConversations_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(RicochetCoreServer).StreamConversations(&ricochetCoreStreamConversationsServer{stream}) m := new(MonitorConversationsRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(RicochetCoreServer).MonitorConversations(m, &ricochetCoreMonitorConversationsServer{stream})
} }
type RicochetCore_StreamConversationsServer interface { type RicochetCore_MonitorConversationsServer interface {
Send(*ConversationEvent) error Send(*ConversationEvent) error
Recv() (*ConversationEvent, error)
grpc.ServerStream grpc.ServerStream
} }
type ricochetCoreStreamConversationsServer struct { type ricochetCoreMonitorConversationsServer struct {
grpc.ServerStream grpc.ServerStream
} }
func (x *ricochetCoreStreamConversationsServer) Send(m *ConversationEvent) error { func (x *ricochetCoreMonitorConversationsServer) Send(m *ConversationEvent) error {
return x.ServerStream.SendMsg(m) return x.ServerStream.SendMsg(m)
} }
func (x *ricochetCoreStreamConversationsServer) Recv() (*ConversationEvent, error) { func _RicochetCore_SendMessage_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
m := new(ConversationEvent) in := new(Message)
if err := x.ServerStream.RecvMsg(m); err != nil { if err := dec(in); err != nil {
return nil, err return nil, err
} }
return m, nil if interceptor == nil {
return srv.(RicochetCoreServer).SendMessage(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/ricochet.RicochetCore/SendMessage",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(RicochetCoreServer).SendMessage(ctx, req.(*Message))
}
return interceptor(ctx, in, info, handler)
} }
var _RicochetCore_serviceDesc = grpc.ServiceDesc{ var _RicochetCore_serviceDesc = grpc.ServiceDesc{
@ -578,6 +605,10 @@ var _RicochetCore_serviceDesc = grpc.ServiceDesc{
MethodName: "RejectInboundRequest", MethodName: "RejectInboundRequest",
Handler: _RicochetCore_RejectInboundRequest_Handler, Handler: _RicochetCore_RejectInboundRequest_Handler,
}, },
{
MethodName: "SendMessage",
Handler: _RicochetCore_SendMessage_Handler,
},
}, },
Streams: []grpc.StreamDesc{ Streams: []grpc.StreamDesc{
{ {
@ -591,10 +622,9 @@ var _RicochetCore_serviceDesc = grpc.ServiceDesc{
ServerStreams: true, ServerStreams: true,
}, },
{ {
StreamName: "StreamConversations", StreamName: "MonitorConversations",
Handler: _RicochetCore_StreamConversations_Handler, Handler: _RicochetCore_MonitorConversations_Handler,
ServerStreams: true, ServerStreams: true,
ClientStreams: true,
}, },
}, },
Metadata: fileDescriptor3, Metadata: fileDescriptor3,
@ -603,31 +633,32 @@ var _RicochetCore_serviceDesc = grpc.ServiceDesc{
func init() { proto.RegisterFile("core.proto", fileDescriptor3) } func init() { proto.RegisterFile("core.proto", fileDescriptor3) }
var fileDescriptor3 = []byte{ var fileDescriptor3 = []byte{
// 414 bytes of a gzipped FileDescriptorProto // 429 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x54, 0x41, 0xaf, 0xd2, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x54, 0x5d, 0xaf, 0xd2, 0x40,
0x10, 0x4e, 0x4d, 0x24, 0x3a, 0x50, 0x0c, 0x2b, 0x51, 0x2c, 0x88, 0x04, 0x3d, 0x70, 0x22, 0x44, 0x10, 0x0d, 0x26, 0x7e, 0xcd, 0xbd, 0xbd, 0x86, 0x95, 0xe8, 0xb5, 0x22, 0x12, 0xd4, 0xc4, 0x27,
0xc3, 0xcd, 0x83, 0xa4, 0xa8, 0x21, 0x11, 0x0f, 0x6d, 0x30, 0xf1, 0x58, 0xb6, 0x93, 0x58, 0xc5, 0x62, 0x24, 0xbc, 0xf9, 0x20, 0x01, 0x35, 0x24, 0xd6, 0x87, 0x12, 0x4c, 0x4c, 0x7c, 0x29, 0xdb,
0x6e, 0xdd, 0x0e, 0x18, 0x7e, 0xfd, 0x7b, 0x0b, 0xdd, 0xa5, 0x6d, 0x0a, 0x8f, 0x97, 0x77, 0x63, 0x89, 0x56, 0xc9, 0x6e, 0xdd, 0x1d, 0x30, 0xfc, 0x38, 0xff, 0x9b, 0x0b, 0xdd, 0x65, 0xdb, 0xb4,
0xbf, 0xef, 0x9b, 0x6f, 0x67, 0x3e, 0x66, 0x0b, 0xc0, 0x85, 0xc4, 0x71, 0x22, 0x05, 0x09, 0xf6, 0x04, 0xe3, 0x1b, 0x7b, 0xce, 0x99, 0xd3, 0x99, 0xb3, 0xb3, 0x00, 0x70, 0xa9, 0x70, 0x98, 0x2b,
0x44, 0x46, 0x5c, 0xf0, 0x5f, 0x48, 0x8e, 0x1d, 0x23, 0xfd, 0x17, 0xf2, 0x4f, 0x46, 0x38, 0xcd, 0x49, 0x92, 0xdd, 0x51, 0x19, 0x97, 0xfc, 0x3b, 0x52, 0x18, 0x08, 0xa4, 0xdf, 0x52, 0xfd, 0x2c,
0x28, 0xc4, 0x98, 0x22, 0xda, 0xeb, 0xb3, 0xcd, 0x45, 0x4c, 0x01, 0x27, 0x7d, 0x64, 0xea, 0xb8, 0x88, 0xf0, 0x2a, 0x4b, 0x51, 0x50, 0x46, 0x3b, 0x7b, 0x0e, 0xb8, 0x14, 0x94, 0x70, 0xb2, 0x47,
0x43, 0x99, 0x06, 0x14, 0x89, 0x38, 0xc3, 0x86, 0x53, 0x78, 0xee, 0xa3, 0x54, 0xa8, 0x4f, 0x01, 0x66, 0x8e, 0x5b, 0x54, 0x3a, 0xa1, 0x4c, 0x8a, 0x02, 0x1b, 0x8c, 0xe1, 0xfe, 0x02, 0x95, 0x41,
0x6d, 0x53, 0x0f, 0xff, 0x6d, 0x31, 0x25, 0xd6, 0x07, 0x90, 0x09, 0xff, 0xa1, 0xc4, 0x4a, 0xda, 0x17, 0x94, 0xd0, 0x46, 0xc7, 0xf8, 0x6b, 0x83, 0x9a, 0x58, 0x0f, 0x40, 0xe5, 0xfc, 0xb3, 0x11,
0xb1, 0x06, 0xd6, 0xe8, 0xb1, 0x57, 0x40, 0x86, 0x3f, 0xa1, 0x55, 0x2e, 0x4b, 0x36, 0xfb, 0x6b, 0x1b, 0xe9, 0x75, 0xab, 0xdf, 0x7a, 0x79, 0x33, 0x2e, 0x21, 0x83, 0x2f, 0xd0, 0xae, 0x96, 0xe5,
0x45, 0xec, 0x1d, 0xd8, 0xe9, 0xb1, 0xc8, 0x48, 0x1e, 0x29, 0xc9, 0x53, 0xaf, 0x0c, 0xbe, 0xbf, 0xeb, 0xdd, 0xb9, 0x22, 0xf6, 0x1c, 0x02, 0x7d, 0x28, 0x72, 0x92, 0x1b, 0x46, 0x72, 0x37, 0xae,
0xa9, 0x41, 0xc3, 0xd3, 0x03, 0xba, 0x6a, 0x68, 0xb6, 0x84, 0x67, 0x5f, 0x91, 0x8a, 0xd7, 0xb1, 0x82, 0xaf, 0xff, 0xdc, 0x86, 0xcb, 0xd8, 0x0e, 0x38, 0x35, 0x43, 0xb3, 0x08, 0xee, 0x7d, 0x40,
0xd7, 0x63, 0x13, 0xc1, 0xf8, 0x4c, 0xf7, 0x4e, 0xf7, 0x12, 0x7d, 0xe8, 0xf2, 0x1b, 0x34, 0x97, 0x2a, 0x7f, 0x8e, 0x3d, 0x19, 0xba, 0x08, 0x86, 0x0d, 0xdd, 0x87, 0x8f, 0x4f, 0xd1, 0xfb, 0x2e,
0x22, 0x8e, 0x48, 0xc8, 0xef, 0x59, 0x78, 0xec, 0x4d, 0x2e, 0x2f, 0x33, 0xc6, 0xef, 0x65, 0x2e, 0x3f, 0xc2, 0x55, 0x24, 0x45, 0x46, 0x52, 0x7d, 0x2a, 0xc2, 0x63, 0x4f, 0xbd, 0xbc, 0xca, 0x38,
0xd0, 0x4c, 0x66, 0x38, 0xb1, 0xd8, 0x17, 0x68, 0xa8, 0xdf, 0x92, 0x8c, 0x57, 0xb1, 0xb3, 0x02, 0xbf, 0x87, 0x5e, 0x60, 0x99, 0xc2, 0xf0, 0x55, 0x8b, 0xbd, 0x87, 0x4b, 0xf3, 0x5b, 0x91, 0xf3,
0x7e, 0xcd, 0x89, 0xcd, 0xa1, 0xee, 0x93, 0x48, 0x8c, 0x4d, 0xaf, 0x68, 0x73, 0x82, 0xaf, 0xba, 0x2a, 0x77, 0x56, 0xc2, 0xcf, 0x39, 0xb1, 0x19, 0x5c, 0x2c, 0x48, 0xe6, 0xce, 0xa6, 0x5b, 0xb6,
0x7c, 0x84, 0xba, 0x8a, 0x6a, 0xa1, 0xb7, 0x80, 0xbd, 0xca, 0x75, 0x06, 0x33, 0x16, 0xac, 0x4a, 0x39, 0xc2, 0x67, 0x5d, 0xde, 0xc0, 0x85, 0x89, 0x6a, 0x6e, 0xb7, 0x80, 0x3d, 0xf2, 0x3a, 0x87,
0x1d, 0x82, 0xd6, 0xf3, 0xbb, 0xd9, 0xde, 0xa4, 0x6c, 0x50, 0x89, 0xc6, 0x50, 0xc6, 0xe8, 0x45, 0x39, 0x0b, 0x56, 0xa7, 0xf6, 0x41, 0xdb, 0xf9, 0xa7, 0xc5, 0xde, 0x68, 0xd6, 0xaf, 0x45, 0xe3,
0xae, 0xd0, 0xd4, 0xe7, 0x9d, 0xf2, 0x53, 0xd1, 0x7c, 0x82, 0xd6, 0x2c, 0x0c, 0x35, 0x68, 0x16, 0x28, 0x67, 0xf4, 0xc0, 0x2b, 0x2c, 0xf5, 0x6e, 0x6b, 0xfc, 0x4c, 0x34, 0x6f, 0xa1, 0x3d, 0x49,
0xab, 0x53, 0x91, 0x1b, 0xa3, 0x56, 0x85, 0x61, 0x53, 0xb0, 0x57, 0x49, 0x18, 0x10, 0x1a, 0xa0, 0x53, 0x0b, 0xba, 0xc5, 0xba, 0xae, 0xc9, 0x9d, 0x51, 0xbb, 0xc6, 0xb0, 0x31, 0x04, 0xcb, 0x3c,
0xaa, 0x39, 0x57, 0xb6, 0x04, 0x7b, 0x8e, 0x1b, 0xcc, 0xcb, 0xfa, 0xb9, 0xa6, 0x44, 0x98, 0xab, 0x4d, 0x08, 0x1d, 0x50, 0xd7, 0x34, 0x95, 0x45, 0x10, 0xcc, 0x70, 0x8d, 0xbe, 0xac, 0xe7, 0x35,
0x7b, 0x17, 0xf9, 0xc3, 0xc2, 0xb8, 0xd0, 0x9e, 0x71, 0x8e, 0x09, 0x2d, 0xe2, 0xb5, 0xd8, 0xc6, 0x15, 0xc2, 0x7d, 0xba, 0x7b, 0x92, 0xdf, 0x2f, 0xcc, 0x14, 0x3a, 0x13, 0xce, 0x31, 0xa7, 0xb9,
0xe1, 0x83, 0x46, 0x59, 0x41, 0xdb, 0xc3, 0xdf, 0xc8, 0xef, 0x6f, 0xf2, 0x36, 0x67, 0xce, 0x55, 0x58, 0xc9, 0x8d, 0x48, 0xff, 0x6b, 0x94, 0x25, 0x74, 0x62, 0xfc, 0x81, 0xfc, 0xdf, 0x4d, 0x9e,
0x66, 0xbd, 0xf9, 0xea, 0xf9, 0x92, 0xc4, 0xe0, 0xaf, 0x5b, 0x78, 0xda, 0x29, 0xeb, 0x96, 0x5c, 0x79, 0xa6, 0xa9, 0xb2, 0xe8, 0xed, 0x2b, 0x74, 0xfc, 0xbd, 0x1c, 0xdf, 0xb6, 0x66, 0x2f, 0x9a,
0x4f, 0xc4, 0xf1, 0x9f, 0x71, 0xee, 0x22, 0x47, 0xd6, 0xc4, 0x5a, 0xd7, 0x8e, 0x9f, 0x86, 0x0f, 0xee, 0xcd, 0xf3, 0x0d, 0x0f, 0xa5, 0xcc, 0xbb, 0x1b, 0x1c, 0x99, 0xa5, 0x44, 0x91, 0x46, 0xa8,
0xb7, 0x01, 0x00, 0x00, 0xff, 0xff, 0x7f, 0x06, 0xe1, 0xc6, 0x74, 0x04, 0x00, 0x00, 0x75, 0xf2, 0x0d, 0xcb, 0xe9, 0x5b, 0x28, 0xac, 0x43, 0xab, 0x5b, 0x87, 0x3f, 0x96, 0xd1, 0xdf,
0x00, 0x00, 0x00, 0xff, 0xff, 0xa3, 0x96, 0xd5, 0x66, 0xb2, 0x04, 0x00, 0x00,
} }

View File

@ -42,7 +42,9 @@ service RicochetCore {
rpc AcceptInboundRequest (ContactRequest) returns (Contact); rpc AcceptInboundRequest (ContactRequest) returns (Contact);
rpc RejectInboundRequest (ContactRequest) returns (RejectInboundRequestReply); rpc RejectInboundRequest (ContactRequest) returns (RejectInboundRequestReply);
rpc StreamConversations (stream ConversationEvent) returns (stream ConversationEvent); // Open a stream to monitor messages in conversations with contacts.
rpc MonitorConversations (MonitorConversationsRequest) returns (stream ConversationEvent);
rpc SendMessage (Message) returns (Message);
} }
message ServerStatusRequest { message ServerStatusRequest {