diff --git a/backend/rpc.go b/backend/rpc.go index af603ca..635e32f 100644 --- a/backend/rpc.go +++ b/backend/rpc.go @@ -170,5 +170,25 @@ func (s *RpcServer) MonitorConversations(req *rpc.MonitorConversationsRequest, s } func (s *RpcServer) SendMessage(ctx context.Context, req *rpc.Message) (*rpc.Message, error) { - return nil, NotImplementedError + if req.Sender == nil || !req.Sender.IsSelf { + return nil, errors.New("Invalid message sender") + } else if req.Recipient == nil || req.Recipient.IsSelf { + return nil, errors.New("Invalid message recipient") + } + + contact := s.core.Identity.ContactList().ContactByAddress(req.Recipient.Address) + if contact == nil || (req.Recipient.ContactId != 0 && int32(contact.Id()) != req.Recipient.ContactId) { + return nil, errors.New("Unknown recipient") + } + + // XXX timestamp + // XXX validate text + // XXX identifier + + message, err := contact.Conversation().Send(req.Text) + if err != nil { + return nil, err + } + + return message, nil } diff --git a/core/contact.go b/core/contact.go index 371c887..d0a871c 100644 --- a/core/contact.go +++ b/core/contact.go @@ -125,6 +125,13 @@ func (c *Contact) Conversation() *Conversation { return c.conversation } +// XXX Thread safety disaster +func (c *Contact) Connection() *protocol.OpenConnection { + c.mutex.Lock() + defer c.mutex.Unlock() + return c.connection +} + // Goroutine to handle the protocol connection for a contact. // Responsible for making outbound connections and taking over authenticated // inbound connections, running protocol handlers on the active connection, and diff --git a/core/conversation.go b/core/conversation.go index 8fb14fb..b02ec65 100644 --- a/core/conversation.go +++ b/core/conversation.go @@ -88,7 +88,7 @@ func (c *Conversation) UpdateSentStatus(id uint64, success bool) { c.mutex.Unlock() } -func (c *Conversation) Send(text string) { +func (c *Conversation) Send(text string) (*ricochet.Message, error) { // XXX protocol // XXX check that text is ok, get identifier, etc // XXX decide whether sending or queued based on state @@ -96,10 +96,29 @@ func (c *Conversation) Send(text string) { Sender: c.localEntity, Recipient: c.remoteEntity, Timestamp: time.Now().Unix(), - Identifier: 0, // XXX - Status: ricochet.Message_SENDING, // XXX + Identifier: 0, // XXX + Status: ricochet.Message_QUEUED, Text: text, } + + // XXX witness thread disaster + conn := c.Contact.Connection() + if conn != nil { + // XXX hardcoded channel IDs, also channel IDs shouldn't be exposed + channelId := int32(7) + if !conn.Client { + channelId++ + } + // XXX no error handling + if conn.GetChannelType(channelId) != "im.ricochet.chat" { + conn.OpenChatChannel(channelId) + } + + // XXX no message IDs, no acks + conn.SendMessage(channelId, text) + message.Status = ricochet.Message_SENDING + } + c.mutex.Lock() c.messages = append(c.messages, message) log.Printf("Conversation sent message: %v", message) @@ -110,4 +129,6 @@ func (c *Conversation) Send(text string) { Msg: message, } conversationStream.Publish(event) + + return message, nil }