aboutsummaryrefslogtreecommitdiffstats
path: root/event/event.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2014-10-14 07:56:24 +0800
committerFelix Lange <fjl@twurst.com>2014-10-17 00:14:35 +0800
commitf5b8775bed8a49136c5d7e93bb0fb991bc2b1a4b (patch)
tree56ba3907caf45e02decc7d9ee37935cead24d941 /event/event.go
parentbb5038699ef7e08054ef154107e359dce2e3b106 (diff)
downloaddexon-f5b8775bed8a49136c5d7e93bb0fb991bc2b1a4b.tar
dexon-f5b8775bed8a49136c5d7e93bb0fb991bc2b1a4b.tar.gz
dexon-f5b8775bed8a49136c5d7e93bb0fb991bc2b1a4b.tar.bz2
dexon-f5b8775bed8a49136c5d7e93bb0fb991bc2b1a4b.tar.lz
dexon-f5b8775bed8a49136c5d7e93bb0fb991bc2b1a4b.tar.xz
dexon-f5b8775bed8a49136c5d7e93bb0fb991bc2b1a4b.tar.zst
dexon-f5b8775bed8a49136c5d7e93bb0fb991bc2b1a4b.zip
event: new package for event multiplexer
Diffstat (limited to 'event/event.go')
-rw-r--r--event/event.go162
1 files changed, 162 insertions, 0 deletions
diff --git a/event/event.go b/event/event.go
new file mode 100644
index 000000000..74f8043da
--- /dev/null
+++ b/event/event.go
@@ -0,0 +1,162 @@
+// Package event implements an event multiplexer.
+package event
+
+import (
+ "errors"
+ "reflect"
+ "sync"
+)
+
+type Subscription interface {
+ Chan() <-chan interface{}
+ Unsubscribe()
+}
+
+// A TypeMux dispatches events to registered receivers. Receivers can be
+// registered to handle events of certain type. Any operation
+// called after mux is stopped will return ErrMuxClosed.
+type TypeMux struct {
+ mutex sync.RWMutex
+ subm map[reflect.Type][]*muxsub
+ stopped bool
+}
+
+var ErrMuxClosed = errors.New("event: mux closed")
+
+// NewTypeMux creates a running mux.
+func NewTypeMux() *TypeMux {
+ return &TypeMux{subm: make(map[reflect.Type][]*muxsub)}
+}
+
+// Subscribe creates a subscription for events of the given types. The
+// subscription's channel is closed when it is unsubscribed
+// or the mux is closed.
+func (mux *TypeMux) Subscribe(types ...interface{}) Subscription {
+ sub := newsub(mux)
+ mux.mutex.Lock()
+ if mux.stopped {
+ mux.mutex.Unlock()
+ close(sub.postC)
+ } else {
+ for _, t := range types {
+ rtyp := reflect.TypeOf(t)
+ oldsubs := mux.subm[rtyp]
+ subs := make([]*muxsub, len(oldsubs)+1)
+ copy(subs, oldsubs)
+ subs[len(oldsubs)] = sub
+ mux.subm[rtyp] = subs
+ }
+ mux.mutex.Unlock()
+ }
+ return sub
+}
+
+// Post sends an event to all receivers registered for the given type.
+// It returns ErrMuxClosed if the mux has been stopped.
+func (mux *TypeMux) Post(ev interface{}) error {
+ rtyp := reflect.TypeOf(ev)
+ mux.mutex.RLock()
+ if mux.stopped {
+ mux.mutex.RUnlock()
+ return ErrMuxClosed
+ }
+ subs := mux.subm[rtyp]
+ mux.mutex.RUnlock()
+ for _, sub := range subs {
+ sub.deliver(ev)
+ }
+ return nil
+}
+
+// Stop closes a mux. The mux can no longer be used.
+// Future Post calls will fail with ErrMuxClosed.
+// Stop blocks until all current deliveries have finished.
+func (mux *TypeMux) Stop() {
+ mux.mutex.Lock()
+ for _, subs := range mux.subm {
+ for _, sub := range subs {
+ sub.closewait()
+ }
+ }
+ mux.subm = nil
+ mux.stopped = true
+ mux.mutex.Unlock()
+}
+
+func (mux *TypeMux) del(s *muxsub) {
+ mux.mutex.Lock()
+ for typ, subs := range mux.subm {
+ if pos := find(subs, s); pos >= 0 {
+ if len(subs) == 1 {
+ delete(mux.subm, typ)
+ } else {
+ mux.subm[typ] = posdelete(subs, pos)
+ }
+ }
+ }
+ s.mux.mutex.Unlock()
+}
+
+func find(slice []*muxsub, item *muxsub) int {
+ for i, v := range slice {
+ if v == item {
+ return i
+ }
+ }
+ return -1
+}
+
+func posdelete(slice []*muxsub, pos int) []*muxsub {
+ news := make([]*muxsub, len(slice)-1)
+ copy(news[:pos], slice[:pos])
+ copy(news[pos:], slice[pos+1:])
+ return news
+}
+
+type muxsub struct {
+ mux *TypeMux
+ mutex sync.RWMutex
+ closing chan struct{}
+
+ // these two are the same channel. they are stored separately so
+ // postC can be set to nil without affecting the return value of
+ // Chan.
+ readC <-chan interface{}
+ postC chan<- interface{}
+}
+
+func newsub(mux *TypeMux) *muxsub {
+ c := make(chan interface{})
+ return &muxsub{
+ mux: mux,
+ readC: c,
+ postC: c,
+ closing: make(chan struct{}),
+ }
+}
+
+func (s *muxsub) Chan() <-chan interface{} {
+ return s.readC
+}
+
+func (s *muxsub) Unsubscribe() {
+ s.mux.del(s)
+ s.closewait()
+}
+
+func (s *muxsub) closewait() {
+ close(s.closing)
+ s.mutex.Lock()
+ close(s.postC)
+ s.postC = nil
+ s.mutex.Unlock()
+}
+
+func (s *muxsub) deliver(ev interface{}) {
+ s.mutex.RLock()
+ select {
+ case s.postC <- ev:
+ case <-s.closing:
+ }
+ s.mutex.RUnlock()
+}