core: Clean up network implementation

This commit is contained in:
John Brooks 2016-08-02 23:39:13 -06:00
parent 5e08afa64f
commit d5cb5feff1
2 changed files with 155 additions and 57 deletions

View File

@ -25,24 +25,25 @@ func (core *RicochetCore) GetServerStatus(ctx context.Context, req *rpc.ServerSt
} }
func (core *RicochetCore) MonitorNetwork(req *rpc.MonitorNetworkRequest, stream rpc.RicochetCore_MonitorNetworkServer) error { func (core *RicochetCore) MonitorNetwork(req *rpc.MonitorNetworkRequest, stream rpc.RicochetCore_MonitorNetworkServer) error {
status := &rpc.NetworkStatus{
Control: &rpc.TorControlStatus{
Status: rpc.TorControlStatus_ERROR,
ErrorMessage: "Not implemented",
},
}
events := core.Network.EventMonitor().Subscribe(20) events := core.Network.EventMonitor().Subscribe(20)
defer core.Network.EventMonitor().Unsubscribe(events) defer core.Network.EventMonitor().Unsubscribe(events)
// Send initial status event
{
event := core.Network.GetStatus()
if err := stream.Send(&event); err != nil {
return err
}
}
for { for {
event, ok := (<-events).(bool) event, ok := (<-events).(rpc.NetworkStatus)
if !ok { if !ok {
break break
} }
log.Printf("RPC monitor event: %v", event) log.Printf("RPC monitor event: %v", event)
if err := stream.Send(status); err != nil { if err := stream.Send(&event); err != nil {
return err return err
} }
} }
@ -58,15 +59,12 @@ func (core *RicochetCore) StartNetwork(ctx context.Context, req *rpc.StartNetwor
return nil, err return nil, err
} }
// XXX real status status := core.Network.GetStatus()
return &rpc.NetworkStatus{ return &status, nil
Control: &rpc.TorControlStatus{
Status: rpc.TorControlStatus_CONNECTED,
},
}, nil
} }
func (core *RicochetCore) StopNetwork(ctx context.Context, req *rpc.StopNetworkRequest) (*rpc.NetworkStatus, error) { func (core *RicochetCore) StopNetwork(ctx context.Context, req *rpc.StopNetworkRequest) (*rpc.NetworkStatus, error) {
core.Network.Stop() core.Network.Stop()
return &rpc.NetworkStatus{}, nil status := core.Network.GetStatus()
return &status, nil
} }

View File

@ -3,6 +3,7 @@ package core
import ( import (
"errors" "errors"
"github.com/special/notricochet/core/utils" "github.com/special/notricochet/core/utils"
"github.com/special/notricochet/rpc"
"github.com/yawning/bulb" "github.com/yawning/bulb"
bulbutils "github.com/yawning/bulb/utils" bulbutils "github.com/yawning/bulb/utils"
"log" "log"
@ -11,15 +12,29 @@ import (
) )
type Network struct { type Network struct {
conn *bulb.Conn // Connection settings; can only change while stopped
controlAddress string controlAddress string
controlPassword string controlPassword string
handleLock sync.Mutex // Events
stopChannel chan struct{}
events *utils.Publisher events *utils.Publisher
// nil when stopped, otherwise used to signal stop to active network
stopSignal chan struct{}
stoppedSignal chan struct{}
// Mutex required to access below
controlMutex sync.Mutex
// Do not use while holding controlMutex; instead, copy ptr and unlock
// mutex before use.
conn *bulb.Conn
// Modifications must be done while holding controlMutex and signalled
// to events. Do not directly modify the child elements, as they are
// pointers and may be shared. Instead, construct a new TorControlStatus
// et al for each change.
status ricochet.NetworkStatus
} }
func CreateNetwork() *Network { func CreateNetwork() *Network {
@ -34,20 +49,20 @@ func CreateNetwork() *Network {
// been started; if true, the connection is up even if the first attempt failed. // been started; if true, the connection is up even if the first attempt failed.
// The second return value is the connection attempt error, or nil on success. // The second return value is the connection attempt error, or nil on success.
func (n *Network) Start(address, password string) (bool, error) { func (n *Network) Start(address, password string) (bool, error) {
n.handleLock.Lock() n.controlMutex.Lock()
defer n.handleLock.Unlock() if n.stoppedSignal != nil {
if n.stopChannel != nil {
// This is an error, because address/password might not be the same // This is an error, because address/password might not be the same
n.controlMutex.Unlock()
return false, errors.New("Network is already started") return false, errors.New("Network is already started")
} }
n.stopChannel = make(chan struct{}) n.stopSignal = make(chan struct{})
n.stoppedSignal = make(chan struct{})
n.controlAddress = address n.controlAddress = address
n.controlPassword = password n.controlPassword = password
n.controlMutex.Unlock()
connectChannel := make(chan error) connectChannel := make(chan error)
go n.run(connectChannel) go n.run(connectChannel)
err := <-connectChannel err := <-connectChannel
return true, err return true, err
@ -58,80 +73,165 @@ func (n *Network) Start(address, password string) (bool, error) {
// the client will be offline until Start is called again. This call will // the client will be offline until Start is called again. This call will
// block until the connection is stopped. // block until the connection is stopped.
func (n *Network) Stop() { func (n *Network) Stop() {
n.handleLock.Lock() // Take mutex, copy channels, nil stopSignal to avoid race if Stop()
defer n.handleLock.Unlock() // is called again. Other calls will still use stoppedSignal.
n.controlMutex.Lock()
stop := n.stopSignal
stopped := n.stoppedSignal
n.stopSignal = nil
n.controlMutex.Unlock()
if n.stopChannel == nil { if stop != nil {
// Signal to stop
stop <- struct{}{}
} else if stopped == nil {
// Already stopped
return return
} }
// XXX but if we do block that long, we can hold the mutex a _long_ time. // Wait until stopped; safe for multiple receivers, because the channel
// XXX so the mutex won't be suitable for a "is network started" check // is closed on stop. Sender is responsible for all other cleanup and state.
n.stopChannel <- struct{}{} <-stopped
n.stopChannel = nil
n.conn = nil
n.controlAddress = ""
n.controlPassword = ""
} }
func (n *Network) EventMonitor() utils.Subscribable { func (n *Network) EventMonitor() utils.Subscribable {
return n.events return n.events
} }
func (n *Network) GetStatus() ricochet.NetworkStatus {
n.controlMutex.Lock()
status := n.status
n.controlMutex.Unlock()
return status
}
func (n *Network) run(connectChannel chan<- error) { func (n *Network) run(connectChannel chan<- error) {
var conn *bulb.Conn n.controlMutex.Lock()
var err error stopSignal := n.stopSignal
stoppedSignal := n.stoppedSignal
n.controlMutex.Unlock()
for { for {
conn, err = createConnection(n.controlAddress, n.controlPassword) // Status to CONNECTING
n.controlMutex.Lock()
n.status.Control = &ricochet.TorControlStatus{
Status: ricochet.TorControlStatus_CONNECTING,
}
n.status.Connection = &ricochet.TorConnectionStatus{}
status := n.status
n.controlMutex.Unlock()
n.events.Publish(status)
// Attempt connection
conn, err := createConnection(n.controlAddress, n.controlPassword)
// Report result of the first connection attempt // Report result of the first connection attempt
// XXX too early, because of post-connection work
if connectChannel != nil { if connectChannel != nil {
connectChannel <- err connectChannel <- err
close(connectChannel) close(connectChannel)
connectChannel = nil connectChannel = nil
} }
retryChannel := make(chan struct{}, 1) retryChannel := make(chan error, 1)
if err == nil { if err == nil {
// Success! Set connection, handle events, and retry connection // Connected successfully; spawn goroutine to poll and handle
// if it fails // control events. On connection failure (or close as a result of
// stop), signal retryChannel.
// XXX TODO: post-connect queries
// Status to CONNECTED
n.controlMutex.Lock()
n.conn = conn n.conn = conn
n.events.Publish(true) n.status.Control = &ricochet.TorControlStatus{
Status: ricochet.TorControlStatus_CONNECTED,
TorVersion: "XXX", // XXX
}
// XXX Fake connection status
n.status.Connection = &ricochet.TorConnectionStatus{
Status: ricochet.TorConnectionStatus_READY,
}
status := n.status
n.controlMutex.Unlock()
n.events.Publish(status)
go func() { go func() {
for { for {
event, err := conn.NextEvent() event, err := conn.NextEvent()
if err != nil { if err != nil {
log.Printf("Control connection failed: %v", err) log.Printf("Control connection failed: %v", err)
n.events.Publish(false) retryChannel <- err
retryChannel <- struct{}{}
break break
} }
// XXX handle event
log.Printf("Control event: %v", event) log.Printf("Control event: %v", event)
} }
}() }()
} else { } else {
// Failed; retry in one second // Status to ERROR
n.controlMutex.Lock()
n.status.Control = &ricochet.TorControlStatus{
Status: ricochet.TorControlStatus_ERROR,
ErrorMessage: err.Error(),
}
n.status.Connection = &ricochet.TorConnectionStatus{}
status := n.status
n.controlMutex.Unlock()
n.events.Publish(status)
// signal for retry in 5 seconds
go func() { go func() {
time.Sleep(1 * time.Second) time.Sleep(5 * time.Second)
retryChannel <- struct{}{} retryChannel <- err
}() }()
} }
// Wait for network stop, connection failure, or retry timeout
select { select {
case <-n.stopChannel: case <-stopSignal:
break // Clean up struct
case <-retryChannel: n.controlMutex.Lock()
n.controlAddress = ""
n.controlPassword = ""
n.conn = nil
n.stoppedSignal = nil
n.status = ricochet.NetworkStatus{}
n.controlMutex.Unlock()
n.events.Publish(ricochet.NetworkStatus{})
// Close connection
if conn != nil { if conn != nil {
conn.Close() conn.Close()
} }
}
}
// Stopped // Signal stopped and exit
if conn != nil { close(stoppedSignal)
conn.Close() return
case err := <-retryChannel:
if err == nil {
err = errors.New("Unknown error")
}
// Clean up connection if necessary
if conn != nil {
// Status to ERROR
n.controlMutex.Lock()
n.conn = nil
n.status.Control = &ricochet.TorControlStatus{
Status: ricochet.TorControlStatus_ERROR,
ErrorMessage: err.Error(),
}
n.status.Connection = &ricochet.TorConnectionStatus{}
status := n.status
n.controlMutex.Unlock()
n.events.Publish(status)
conn.Close()
}
// Loop to retry connection
}
} }
} }