core: Refactor network reconnection to fix onion republication

A race would cause a call to AddOnionPorts that was blocked waiting for
a control connection to try to publish the service twice, because the
onion republication wasn't done until after the connection status
change.

This is fixed by refactoring the control connection setup to do all
setup before signalling the state change, including copying the list of
onions to publish. The code is slightly cleaner now as well.
This commit is contained in:
John Brooks 2016-09-15 18:16:00 -06:00
parent cf903a3b7d
commit 346971368b
1 changed files with 120 additions and 136 deletions

View File

@ -176,6 +176,10 @@ func (n *Network) AddOnionPorts(ports []bulb.OnionPortSpec, key crypto.PrivateKe
PrivateKey: info.PrivateKey,
}
if service.PrivateKey == nil {
service.PrivateKey = key
}
n.controlMutex.Lock()
n.onions = append(n.onions, service)
n.controlMutex.Unlock()
@ -226,58 +230,54 @@ func (n *Network) run(connectChannel chan<- error) {
n.events.Publish(status)
// Attempt connection
conn, err := createConnection(n.controlAddress, n.controlPassword)
retryChannel := make(chan error, 1)
if err == nil {
// Connected successfully; spawn goroutine to poll and handle
// control events. On connection failure (or close as a result of
// stop), signal retryChannel.
// Query ProtocolInfo for tor version
pinfo, err := conn.ProtocolInfo()
if err != nil {
log.Printf("Control protocolinfo failed: %v", err)
retryChannel <- err
} else {
// Status to CONNECTED
n.controlMutex.Lock()
n.conn = conn
n.status.Control = &ricochet.TorControlStatus{
Status: ricochet.TorControlStatus_CONNECTED,
TorVersion: pinfo.TorVersion,
}
n.status.Connection = &ricochet.TorConnectionStatus{}
status := n.status
n.controlMutex.Unlock()
n.events.Publish(status)
// Query initial tor state and subscribe to events
if err := n.updateTorState(); err != nil {
log.Printf("Control state query failed: %v", err)
// Signal error to terminate connection
retryChannel <- err
} else {
// Report result of the first connection attempt
if connectChannel != nil {
connectChannel <- err
close(connectChannel)
connectChannel = nil
}
// Goroutine polls for control events; retryChannel is
// signalled on connection failure. Block on retryChannel
// below.
go n.handleControlEvents(conn, retryChannel)
// Re-publish onion services
n.publishOnions()
}
}
errorChannel := make(chan error, 1)
err := n.connectControl()
if err != nil {
errorChannel <- err
} else {
// Status to ERROR
// The goroutine polls for control events, and signals
// errorChannel on connection failure.
go n.handleControlEvents(n.conn, errorChannel)
}
// Report result of the first connection attempt
if connectChannel != nil {
connectChannel <- err
close(connectChannel)
connectChannel = nil
}
// Wait for network stop or connection errors
select {
case <-stopSignal:
// Close connection, clean up struct, signal status change
n.controlMutex.Lock()
if n.conn != nil {
n.conn.Close()
n.conn = nil
}
n.controlAddress = ""
n.controlPassword = ""
n.stoppedSignal = nil
n.status = ricochet.NetworkStatus{}
n.controlMutex.Unlock()
n.events.Publish(ricochet.NetworkStatus{})
// Signal stopped and exit
close(stoppedSignal)
return
case err := <-errorChannel:
if err == nil {
err = errors.New("Unknown error")
}
// Change status to ERROR
n.controlMutex.Lock()
if n.conn != nil {
n.conn.Close()
n.conn = nil
}
n.status.Control = &ricochet.TorControlStatus{
Status: ricochet.TorControlStatus_ERROR,
ErrorMessage: err.Error(),
@ -287,60 +287,68 @@ func (n *Network) run(connectChannel chan<- error) {
n.controlMutex.Unlock()
n.events.Publish(status)
// signal for retry in 5 seconds
go func() {
time.Sleep(5 * time.Second)
retryChannel <- err
}()
}
// Wait for network stop, connection failure, or retry timeout
select {
case <-stopSignal:
// Clean up struct
n.controlMutex.Lock()
n.controlAddress = ""
n.controlPassword = ""
n.conn = nil
n.stoppedSignal = nil
n.status = ricochet.NetworkStatus{}
n.controlMutex.Unlock()
n.events.Publish(ricochet.NetworkStatus{})
// Close connection
if conn != nil {
conn.Close()
}
// Signal stopped and exit
close(stoppedSignal)
return
case err := <-retryChannel:
if err == nil {
err = errors.New("Unknown error")
}
// Clean up connection if necessary
if conn != nil {
// Status to ERROR
n.controlMutex.Lock()
n.conn = nil
n.status.Control = &ricochet.TorControlStatus{
Status: ricochet.TorControlStatus_ERROR,
ErrorMessage: err.Error(),
}
n.status.Connection = &ricochet.TorConnectionStatus{}
status := n.status
n.controlMutex.Unlock()
n.events.Publish(status)
conn.Close()
}
// Loop to retry connection
// BUG(x): This timeout is static and uninterruptable
time.Sleep(5 * time.Second)
}
}
}
func (n *Network) connectControl() error {
// Attempt connection
conn, err := createConnection(n.controlAddress, n.controlPassword)
if err != nil {
return err
}
// Query ProtocolInfo for tor version
pinfo, err := conn.ProtocolInfo()
if err != nil {
conn.Close()
return err
}
// Subscribe to events
_, err = conn.Request("SETEVENTS STATUS_CLIENT")
if err != nil {
conn.Close()
return err
}
// Query initial tor state
connStatus, err := queryTorState(conn)
if err != nil {
conn.Close()
return err
}
// Copy list of onions to republish. This is done before the status
// change to avoid racing with blocked calls to AddOnionPorts, which
// will add to this list once the connection is available, but the
// publication is done afterwards.
n.controlMutex.Lock()
onions := make([]*OnionService, len(n.onions))
copy(onions, n.onions)
n.controlMutex.Unlock()
// Update network status and set connection
n.controlMutex.Lock()
n.conn = conn
n.status.Control = &ricochet.TorControlStatus{
Status: ricochet.TorControlStatus_CONNECTED,
TorVersion: pinfo.TorVersion,
}
n.status.Connection = &connStatus
status := n.status
n.controlMutex.Unlock()
n.events.Publish(status)
// Re-publish onion services. Errors are not fatal to conn.
publishOnions(conn, onions)
return nil
}
func createConnection(address, password string) (*bulb.Conn, error) {
net, addr, err := bulbutils.ParseControlPortString(address)
if err != nil {
@ -386,52 +394,38 @@ func createConnection(address, password string) (*bulb.Conn, error) {
* reasons for failed outbound connections.
*/
func (n *Network) updateTorState() error {
if _, err := n.conn.Request("SETEVENTS STATUS_CLIENT"); err != nil {
return err
}
func queryTorState(conn *bulb.Conn) (ricochet.TorConnectionStatus, error) {
status := ricochet.TorConnectionStatus{}
response, err := n.conn.Request("GETINFO status/circuit-established status/bootstrap-phase net/listeners/socks")
response, err := conn.Request("GETINFO status/circuit-established status/bootstrap-phase net/listeners/socks")
if err != nil {
return err
return status, err
}
results := make(map[string]string)
for _, rawLine := range response.Data {
line := strings.SplitN(rawLine, "=", 2)
if len(line) != 2 {
return errors.New("Invalid GETINFO response format")
return status, errors.New("Invalid GETINFO response format")
}
results[line[0]] = strings.TrimSpace(line[1])
log.Printf("'%v' = '%v'", line[0], results[line[0]])
}
var connStatus ricochet.TorConnectionStatus_Status
if results["status/circuit-established"] == "0" {
if strings.Contains(results["status/bootstrap-phase"], "TAG=done") {
connStatus = ricochet.TorConnectionStatus_OFFLINE
status.Status = ricochet.TorConnectionStatus_OFFLINE
} else {
connStatus = ricochet.TorConnectionStatus_BOOTSTRAPPING
status.Status = ricochet.TorConnectionStatus_BOOTSTRAPPING
}
} else if results["status/circuit-established"] == "1" {
connStatus = ricochet.TorConnectionStatus_READY
status.Status = ricochet.TorConnectionStatus_READY
} else {
return errors.New("Invalid GETINFO response format")
return status, errors.New("Invalid GETINFO response format")
}
socksAddresses := utils.UnquoteStringSplit(results["net/listeners/socks"], ' ')
n.controlMutex.Lock()
n.status.Connection = &ricochet.TorConnectionStatus{
Status: connStatus,
BootstrapProgress: results["status/bootstrap-phase"],
SocksAddress: socksAddresses,
}
status := n.status
n.controlMutex.Unlock()
n.events.Publish(status)
return nil
status.BootstrapProgress = results["status/bootstrap-phase"]
status.SocksAddress = utils.UnquoteStringSplit(results["net/listeners/socks"], ' ')
return status, nil
}
func (n *Network) handleControlEvents(conn *bulb.Conn, errorChannel chan<- error) {
@ -483,17 +477,7 @@ func (n *Network) handleControlEvents(conn *bulb.Conn, errorChannel chan<- error
}
}
func (n *Network) publishOnions() {
n.controlMutex.Lock()
conn := n.conn
onions := make([]*OnionService, len(n.onions))
copy(onions, n.onions)
n.controlMutex.Unlock()
if conn == nil {
return
}
func publishOnions(conn *bulb.Conn, onions []*OnionService) {
for _, service := range onions {
_, err := conn.AddOnion(service.Ports, service.PrivateKey, false)
if err != nil {