ricochet-go/core/utils/publisher.go

97 lines
2.1 KiB
Go
Raw Permalink Normal View History

2016-08-02 23:04:00 +00:00
package utils
type Publisher struct {
subscribeChannel chan chan interface{}
unsubscribeChannel chan (<-chan interface{})
broadcastChannel chan interface{}
closeChannel chan struct{}
closedChannel chan struct{}
}
type Subscribable interface {
Subscribe(queueSize int) <-chan interface{}
Unsubscribe(channel <-chan interface{})
}
func CreatePublisher() *Publisher {
re := &Publisher{
subscribeChannel: make(chan chan interface{}),
unsubscribeChannel: make(chan (<-chan interface{})),
broadcastChannel: make(chan interface{}),
closeChannel: make(chan struct{}),
closedChannel: make(chan struct{}),
}
go re.broadcast()
return re
}
func (pub *Publisher) Subscribe(queueSize int) <-chan interface{} {
channel := make(chan interface{}, queueSize)
pub.subscribeChannel <- channel
// Read empty value to block until subscribed
<-channel
return channel
}
func (pub *Publisher) Unsubscribe(channel <-chan interface{}) {
pub.unsubscribeChannel <- channel
// Wait for channel close
for {
_, ok := <-channel
if !ok {
return
}
}
}
func (pub *Publisher) Publish(value interface{}) {
pub.broadcastChannel <- value
}
func (pub *Publisher) Close() {
pub.closeChannel <- struct{}{}
<-pub.closedChannel
// Close channels, so that future use of the object will panic
close(pub.subscribeChannel)
close(pub.unsubscribeChannel)
close(pub.broadcastChannel)
close(pub.closeChannel)
close(pub.closedChannel)
}
func (pub *Publisher) broadcast() {
var channels []chan interface{}
for {
select {
case channel := <-pub.subscribeChannel:
channels = append(channels, channel)
channel <- struct{}{}
case channel := <-pub.unsubscribeChannel:
for i, c := range channels {
if c != channel {
continue
}
channels = append(channels[:i], channels[i+1:]...)
close(c)
break
}
case value := <-pub.broadcastChannel:
for _, c := range channels {
select {
case c <- value:
default:
go pub.Unsubscribe(c)
}
}
case <-pub.closeChannel:
for _, c := range channels {
close(c)
}
pub.closedChannel <- struct{}{}
return
}
}
}