core: Add utils/publisher
This commit is contained in:
parent
06294898f6
commit
eee2dd703d
|
@ -0,0 +1,96 @@
|
||||||
|
package utils
|
||||||
|
|
||||||
|
type Publisher struct {
|
||||||
|
subscribeChannel chan chan interface{}
|
||||||
|
unsubscribeChannel chan (<-chan interface{})
|
||||||
|
broadcastChannel chan interface{}
|
||||||
|
closeChannel chan struct{}
|
||||||
|
closedChannel chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Subscribable interface {
|
||||||
|
Subscribe(queueSize int) <-chan interface{}
|
||||||
|
Unsubscribe(channel <-chan interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func CreatePublisher() *Publisher {
|
||||||
|
re := &Publisher{
|
||||||
|
subscribeChannel: make(chan chan interface{}),
|
||||||
|
unsubscribeChannel: make(chan (<-chan interface{})),
|
||||||
|
broadcastChannel: make(chan interface{}),
|
||||||
|
closeChannel: make(chan struct{}),
|
||||||
|
closedChannel: make(chan struct{}),
|
||||||
|
}
|
||||||
|
go re.broadcast()
|
||||||
|
return re
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pub *Publisher) Subscribe(queueSize int) <-chan interface{} {
|
||||||
|
channel := make(chan interface{}, queueSize)
|
||||||
|
pub.subscribeChannel <- channel
|
||||||
|
// Read empty value to block until subscribed
|
||||||
|
<-channel
|
||||||
|
return channel
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pub *Publisher) Unsubscribe(channel <-chan interface{}) {
|
||||||
|
pub.unsubscribeChannel <- channel
|
||||||
|
// Wait for channel close
|
||||||
|
for {
|
||||||
|
_, ok := <-channel
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pub *Publisher) Publish(value interface{}) {
|
||||||
|
pub.broadcastChannel <- value
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pub *Publisher) Close() {
|
||||||
|
pub.closeChannel <- struct{}{}
|
||||||
|
<-pub.closedChannel
|
||||||
|
|
||||||
|
// Close channels, so that future use of the object will panic
|
||||||
|
close(pub.subscribeChannel)
|
||||||
|
close(pub.unsubscribeChannel)
|
||||||
|
close(pub.broadcastChannel)
|
||||||
|
close(pub.closeChannel)
|
||||||
|
close(pub.closedChannel)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pub *Publisher) broadcast() {
|
||||||
|
var channels []chan interface{}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case channel := <-pub.subscribeChannel:
|
||||||
|
channels = append(channels, channel)
|
||||||
|
channel <- struct{}{}
|
||||||
|
case channel := <-pub.unsubscribeChannel:
|
||||||
|
for i, c := range channels {
|
||||||
|
if c != channel {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
channels = append(channels[:i], channels[i+1:]...)
|
||||||
|
close(c)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case value := <-pub.broadcastChannel:
|
||||||
|
for _, c := range channels {
|
||||||
|
select {
|
||||||
|
case c <- value:
|
||||||
|
default:
|
||||||
|
go pub.Unsubscribe(c)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
case <-pub.closeChannel:
|
||||||
|
for _, c := range channels {
|
||||||
|
close(c)
|
||||||
|
}
|
||||||
|
pub.closedChannel <- struct{}{}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue