From f5b8775bed8a49136c5d7e93bb0fb991bc2b1a4b Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 14 Oct 2014 01:56:24 +0200 Subject: event: new package for event multiplexer --- event/event.go | 162 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 162 insertions(+) create mode 100644 event/event.go (limited to 'event/event.go') diff --git a/event/event.go b/event/event.go new file mode 100644 index 000000000..74f8043da --- /dev/null +++ b/event/event.go @@ -0,0 +1,162 @@ +// Package event implements an event multiplexer. +package event + +import ( + "errors" + "reflect" + "sync" +) + +type Subscription interface { + Chan() <-chan interface{} + Unsubscribe() +} + +// A TypeMux dispatches events to registered receivers. Receivers can be +// registered to handle events of certain type. Any operation +// called after mux is stopped will return ErrMuxClosed. +type TypeMux struct { + mutex sync.RWMutex + subm map[reflect.Type][]*muxsub + stopped bool +} + +var ErrMuxClosed = errors.New("event: mux closed") + +// NewTypeMux creates a running mux. +func NewTypeMux() *TypeMux { + return &TypeMux{subm: make(map[reflect.Type][]*muxsub)} +} + +// Subscribe creates a subscription for events of the given types. The +// subscription's channel is closed when it is unsubscribed +// or the mux is closed. +func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { + sub := newsub(mux) + mux.mutex.Lock() + if mux.stopped { + mux.mutex.Unlock() + close(sub.postC) + } else { + for _, t := range types { + rtyp := reflect.TypeOf(t) + oldsubs := mux.subm[rtyp] + subs := make([]*muxsub, len(oldsubs)+1) + copy(subs, oldsubs) + subs[len(oldsubs)] = sub + mux.subm[rtyp] = subs + } + mux.mutex.Unlock() + } + return sub +} + +// Post sends an event to all receivers registered for the given type. +// It returns ErrMuxClosed if the mux has been stopped. +func (mux *TypeMux) Post(ev interface{}) error { + rtyp := reflect.TypeOf(ev) + mux.mutex.RLock() + if mux.stopped { + mux.mutex.RUnlock() + return ErrMuxClosed + } + subs := mux.subm[rtyp] + mux.mutex.RUnlock() + for _, sub := range subs { + sub.deliver(ev) + } + return nil +} + +// Stop closes a mux. The mux can no longer be used. +// Future Post calls will fail with ErrMuxClosed. +// Stop blocks until all current deliveries have finished. +func (mux *TypeMux) Stop() { + mux.mutex.Lock() + for _, subs := range mux.subm { + for _, sub := range subs { + sub.closewait() + } + } + mux.subm = nil + mux.stopped = true + mux.mutex.Unlock() +} + +func (mux *TypeMux) del(s *muxsub) { + mux.mutex.Lock() + for typ, subs := range mux.subm { + if pos := find(subs, s); pos >= 0 { + if len(subs) == 1 { + delete(mux.subm, typ) + } else { + mux.subm[typ] = posdelete(subs, pos) + } + } + } + s.mux.mutex.Unlock() +} + +func find(slice []*muxsub, item *muxsub) int { + for i, v := range slice { + if v == item { + return i + } + } + return -1 +} + +func posdelete(slice []*muxsub, pos int) []*muxsub { + news := make([]*muxsub, len(slice)-1) + copy(news[:pos], slice[:pos]) + copy(news[pos:], slice[pos+1:]) + return news +} + +type muxsub struct { + mux *TypeMux + mutex sync.RWMutex + closing chan struct{} + + // these two are the same channel. they are stored separately so + // postC can be set to nil without affecting the return value of + // Chan. + readC <-chan interface{} + postC chan<- interface{} +} + +func newsub(mux *TypeMux) *muxsub { + c := make(chan interface{}) + return &muxsub{ + mux: mux, + readC: c, + postC: c, + closing: make(chan struct{}), + } +} + +func (s *muxsub) Chan() <-chan interface{} { + return s.readC +} + +func (s *muxsub) Unsubscribe() { + s.mux.del(s) + s.closewait() +} + +func (s *muxsub) closewait() { + close(s.closing) + s.mutex.Lock() + close(s.postC) + s.postC = nil + s.mutex.Unlock() +} + +func (s *muxsub) deliver(ev interface{}) { + s.mutex.RLock() + select { + case s.postC <- ev: + case <-s.closing: + } + s.mutex.RUnlock() +} -- cgit v1.2.3 From dac4a8f113b35c67349115115af17c7f1874d939 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 16 Oct 2014 18:07:27 +0200 Subject: event: add some documentation --- event/event.go | 9 +++++++++ 1 file changed, 9 insertions(+) (limited to 'event/event.go') diff --git a/event/event.go b/event/event.go index 74f8043da..09759ee50 100644 --- a/event/event.go +++ b/event/event.go @@ -7,8 +7,16 @@ import ( "sync" ) +// Subscription is implemented by event subscriptions. type Subscription interface { + // Chan returns a channel that carries events. + // Implementations should return the same channel + // for any subsequent calls to Chan. Chan() <-chan interface{} + + // Unsubscribe stops delivery of events to a subscription. + // The event channel is closed. + // Unsubscribe can be called more than once. Unsubscribe() } @@ -21,6 +29,7 @@ type TypeMux struct { stopped bool } +// ErrMuxClosed is returned when Posting on a closed TypeMux. var ErrMuxClosed = errors.New("event: mux closed") // NewTypeMux creates a running mux. -- cgit v1.2.3 From 10bbf265b2e8f1906602d2604f755241b8eb49e6 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 16 Oct 2014 18:08:48 +0200 Subject: event: make Unsubscribe idempotent --- event/event.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) (limited to 'event/event.go') diff --git a/event/event.go b/event/event.go index 09759ee50..344d1e3f6 100644 --- a/event/event.go +++ b/event/event.go @@ -124,14 +124,16 @@ func posdelete(slice []*muxsub, pos int) []*muxsub { type muxsub struct { mux *TypeMux - mutex sync.RWMutex + closeMu sync.Mutex closing chan struct{} + closed bool // these two are the same channel. they are stored separately so // postC can be set to nil without affecting the return value of // Chan. - readC <-chan interface{} - postC chan<- interface{} + postMu sync.RWMutex + readC <-chan interface{} + postC chan<- interface{} } func newsub(mux *TypeMux) *muxsub { @@ -154,18 +156,25 @@ func (s *muxsub) Unsubscribe() { } func (s *muxsub) closewait() { + s.closeMu.Lock() + defer s.closeMu.Unlock() + if s.closed { + return + } close(s.closing) - s.mutex.Lock() + s.closed = true + + s.postMu.Lock() close(s.postC) s.postC = nil - s.mutex.Unlock() + s.postMu.Unlock() } func (s *muxsub) deliver(ev interface{}) { - s.mutex.RLock() + s.postMu.RLock() select { case s.postC <- ev: case <-s.closing: } - s.mutex.RUnlock() + s.postMu.RUnlock() } -- cgit v1.2.3 From 690690489610352d43f8547744b6c9486ad5affa Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 16 Oct 2014 18:10:09 +0200 Subject: event: make TypeMux zero value ready to use --- event/event.go | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) (limited to 'event/event.go') diff --git a/event/event.go b/event/event.go index 344d1e3f6..d11a0e9bd 100644 --- a/event/event.go +++ b/event/event.go @@ -23,6 +23,8 @@ type Subscription interface { // A TypeMux dispatches events to registered receivers. Receivers can be // registered to handle events of certain type. Any operation // called after mux is stopped will return ErrMuxClosed. +// +// The zero value is ready to use. type TypeMux struct { mutex sync.RWMutex subm map[reflect.Type][]*muxsub @@ -32,11 +34,6 @@ type TypeMux struct { // ErrMuxClosed is returned when Posting on a closed TypeMux. var ErrMuxClosed = errors.New("event: mux closed") -// NewTypeMux creates a running mux. -func NewTypeMux() *TypeMux { - return &TypeMux{subm: make(map[reflect.Type][]*muxsub)} -} - // Subscribe creates a subscription for events of the given types. The // subscription's channel is closed when it is unsubscribed // or the mux is closed. @@ -44,9 +41,11 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { sub := newsub(mux) mux.mutex.Lock() if mux.stopped { - mux.mutex.Unlock() close(sub.postC) } else { + if mux.subm == nil { + mux.subm = make(map[reflect.Type][]*muxsub) + } for _, t := range types { rtyp := reflect.TypeOf(t) oldsubs := mux.subm[rtyp] @@ -55,8 +54,8 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { subs[len(oldsubs)] = sub mux.subm[rtyp] = subs } - mux.mutex.Unlock() } + mux.mutex.Unlock() return sub } -- cgit v1.2.3 From fa84e50ddb8e64d4cb92d58e235cfed13761f21e Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 16 Oct 2014 18:59:28 +0200 Subject: event: panic for duplicate type --- event/event.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'event/event.go') diff --git a/event/event.go b/event/event.go index d11a0e9bd..540fbba65 100644 --- a/event/event.go +++ b/event/event.go @@ -3,6 +3,7 @@ package event import ( "errors" + "fmt" "reflect" "sync" ) @@ -40,6 +41,7 @@ var ErrMuxClosed = errors.New("event: mux closed") func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { sub := newsub(mux) mux.mutex.Lock() + defer mux.mutex.Unlock() if mux.stopped { close(sub.postC) } else { @@ -49,13 +51,15 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { for _, t := range types { rtyp := reflect.TypeOf(t) oldsubs := mux.subm[rtyp] + if find(oldsubs, sub) != -1 { + panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp)) + } subs := make([]*muxsub, len(oldsubs)+1) copy(subs, oldsubs) subs[len(oldsubs)] = sub mux.subm[rtyp] = subs } } - mux.mutex.Unlock() return sub } -- cgit v1.2.3