diff --git a/backend/rpc.go b/backend/rpc.go index 65e44ca..103487b 100644 --- a/backend/rpc.go +++ b/backend/rpc.go @@ -25,24 +25,25 @@ func (core *RicochetCore) GetServerStatus(ctx context.Context, req *rpc.ServerSt } func (core *RicochetCore) MonitorNetwork(req *rpc.MonitorNetworkRequest, stream rpc.RicochetCore_MonitorNetworkServer) error { - status := &rpc.NetworkStatus{ - Control: &rpc.TorControlStatus{ - Status: rpc.TorControlStatus_ERROR, - ErrorMessage: "Not implemented", - }, - } - events := core.Network.EventMonitor().Subscribe(20) defer core.Network.EventMonitor().Unsubscribe(events) + // Send initial status event + { + event := core.Network.GetStatus() + if err := stream.Send(&event); err != nil { + return err + } + } + for { - event, ok := (<-events).(bool) + event, ok := (<-events).(rpc.NetworkStatus) if !ok { break } log.Printf("RPC monitor event: %v", event) - if err := stream.Send(status); err != nil { + if err := stream.Send(&event); err != nil { return err } } @@ -58,15 +59,12 @@ func (core *RicochetCore) StartNetwork(ctx context.Context, req *rpc.StartNetwor return nil, err } - // XXX real status - return &rpc.NetworkStatus{ - Control: &rpc.TorControlStatus{ - Status: rpc.TorControlStatus_CONNECTED, - }, - }, nil + status := core.Network.GetStatus() + return &status, nil } func (core *RicochetCore) StopNetwork(ctx context.Context, req *rpc.StopNetworkRequest) (*rpc.NetworkStatus, error) { core.Network.Stop() - return &rpc.NetworkStatus{}, nil + status := core.Network.GetStatus() + return &status, nil } diff --git a/core/network.go b/core/network.go index 2966895..6251d5a 100644 --- a/core/network.go +++ b/core/network.go @@ -3,6 +3,7 @@ package core import ( "errors" "github.com/special/notricochet/core/utils" + "github.com/special/notricochet/rpc" "github.com/yawning/bulb" bulbutils "github.com/yawning/bulb/utils" "log" @@ -11,15 +12,29 @@ import ( ) type Network struct { - conn *bulb.Conn - + // Connection settings; can only change while stopped controlAddress string controlPassword string - handleLock sync.Mutex - stopChannel chan struct{} - + // Events events *utils.Publisher + + // nil when stopped, otherwise used to signal stop to active network + stopSignal chan struct{} + stoppedSignal chan struct{} + + // Mutex required to access below + controlMutex sync.Mutex + + // Do not use while holding controlMutex; instead, copy ptr and unlock + // mutex before use. + conn *bulb.Conn + + // Modifications must be done while holding controlMutex and signalled + // to events. Do not directly modify the child elements, as they are + // pointers and may be shared. Instead, construct a new TorControlStatus + // et al for each change. + status ricochet.NetworkStatus } func CreateNetwork() *Network { @@ -34,20 +49,20 @@ func CreateNetwork() *Network { // been started; if true, the connection is up even if the first attempt failed. // The second return value is the connection attempt error, or nil on success. func (n *Network) Start(address, password string) (bool, error) { - n.handleLock.Lock() - defer n.handleLock.Unlock() - - if n.stopChannel != nil { + n.controlMutex.Lock() + if n.stoppedSignal != nil { // This is an error, because address/password might not be the same + n.controlMutex.Unlock() return false, errors.New("Network is already started") } - n.stopChannel = make(chan struct{}) + n.stopSignal = make(chan struct{}) + n.stoppedSignal = make(chan struct{}) n.controlAddress = address n.controlPassword = password + n.controlMutex.Unlock() connectChannel := make(chan error) - go n.run(connectChannel) err := <-connectChannel return true, err @@ -58,80 +73,165 @@ func (n *Network) Start(address, password string) (bool, error) { // the client will be offline until Start is called again. This call will // block until the connection is stopped. func (n *Network) Stop() { - n.handleLock.Lock() - defer n.handleLock.Unlock() + // Take mutex, copy channels, nil stopSignal to avoid race if Stop() + // is called again. Other calls will still use stoppedSignal. + n.controlMutex.Lock() + stop := n.stopSignal + stopped := n.stoppedSignal + n.stopSignal = nil + n.controlMutex.Unlock() - if n.stopChannel == nil { + if stop != nil { + // Signal to stop + stop <- struct{}{} + } else if stopped == nil { + // Already stopped return } - // XXX but if we do block that long, we can hold the mutex a _long_ time. - // XXX so the mutex won't be suitable for a "is network started" check - n.stopChannel <- struct{}{} - n.stopChannel = nil - n.conn = nil - n.controlAddress = "" - n.controlPassword = "" + // Wait until stopped; safe for multiple receivers, because the channel + // is closed on stop. Sender is responsible for all other cleanup and state. + <-stopped } func (n *Network) EventMonitor() utils.Subscribable { return n.events } +func (n *Network) GetStatus() ricochet.NetworkStatus { + n.controlMutex.Lock() + status := n.status + n.controlMutex.Unlock() + return status +} + func (n *Network) run(connectChannel chan<- error) { - var conn *bulb.Conn - var err error + n.controlMutex.Lock() + stopSignal := n.stopSignal + stoppedSignal := n.stoppedSignal + n.controlMutex.Unlock() for { - conn, err = createConnection(n.controlAddress, n.controlPassword) + // Status to CONNECTING + n.controlMutex.Lock() + n.status.Control = &ricochet.TorControlStatus{ + Status: ricochet.TorControlStatus_CONNECTING, + } + n.status.Connection = &ricochet.TorConnectionStatus{} + status := n.status + n.controlMutex.Unlock() + n.events.Publish(status) + + // Attempt connection + conn, err := createConnection(n.controlAddress, n.controlPassword) // Report result of the first connection attempt + // XXX too early, because of post-connection work if connectChannel != nil { connectChannel <- err close(connectChannel) connectChannel = nil } - retryChannel := make(chan struct{}, 1) + retryChannel := make(chan error, 1) if err == nil { - // Success! Set connection, handle events, and retry connection - // if it fails + // Connected successfully; spawn goroutine to poll and handle + // control events. On connection failure (or close as a result of + // stop), signal retryChannel. + + // XXX TODO: post-connect queries + + // Status to CONNECTED + n.controlMutex.Lock() n.conn = conn - n.events.Publish(true) + n.status.Control = &ricochet.TorControlStatus{ + Status: ricochet.TorControlStatus_CONNECTED, + TorVersion: "XXX", // XXX + } + // XXX Fake connection status + n.status.Connection = &ricochet.TorConnectionStatus{ + Status: ricochet.TorConnectionStatus_READY, + } + status := n.status + n.controlMutex.Unlock() + n.events.Publish(status) + go func() { for { event, err := conn.NextEvent() if err != nil { log.Printf("Control connection failed: %v", err) - n.events.Publish(false) - retryChannel <- struct{}{} + retryChannel <- err break } + // XXX handle event log.Printf("Control event: %v", event) } }() } else { - // Failed; retry in one second + // Status to ERROR + n.controlMutex.Lock() + n.status.Control = &ricochet.TorControlStatus{ + Status: ricochet.TorControlStatus_ERROR, + ErrorMessage: err.Error(), + } + n.status.Connection = &ricochet.TorConnectionStatus{} + status := n.status + n.controlMutex.Unlock() + n.events.Publish(status) + + // signal for retry in 5 seconds go func() { - time.Sleep(1 * time.Second) - retryChannel <- struct{}{} + time.Sleep(5 * time.Second) + retryChannel <- err }() } + // Wait for network stop, connection failure, or retry timeout select { - case <-n.stopChannel: - break - case <-retryChannel: + case <-stopSignal: + // Clean up struct + n.controlMutex.Lock() + n.controlAddress = "" + n.controlPassword = "" + n.conn = nil + n.stoppedSignal = nil + n.status = ricochet.NetworkStatus{} + n.controlMutex.Unlock() + n.events.Publish(ricochet.NetworkStatus{}) + + // Close connection if conn != nil { conn.Close() } - } - } - // Stopped - if conn != nil { - conn.Close() + // Signal stopped and exit + close(stoppedSignal) + return + case err := <-retryChannel: + if err == nil { + err = errors.New("Unknown error") + } + + // Clean up connection if necessary + if conn != nil { + // Status to ERROR + n.controlMutex.Lock() + n.conn = nil + n.status.Control = &ricochet.TorControlStatus{ + Status: ricochet.TorControlStatus_ERROR, + ErrorMessage: err.Error(), + } + n.status.Connection = &ricochet.TorConnectionStatus{} + status := n.status + n.controlMutex.Unlock() + n.events.Publish(status) + + conn.Close() + } + // Loop to retry connection + } } }