From 402fd6e8c6a2e379351e0aae10a833fae6bcae6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Mon, 12 Oct 2015 15:04:38 +0300 Subject: core, eth, event, miner, xeth: fix event post / subscription race --- event/event.go | 37 ++++++++++++++++++++++++++++--------- event/event_test.go | 2 +- event/example_test.go | 2 +- 3 files changed, 30 insertions(+), 11 deletions(-) (limited to 'event') diff --git a/event/event.go b/event/event.go index ce74e5286..57dd52baa 100644 --- a/event/event.go +++ b/event/event.go @@ -22,14 +22,21 @@ import ( "fmt" "reflect" "sync" + "time" ) +// Event is a time-tagged notification pushed to subscribers. +type Event struct { + Time time.Time + Data interface{} +} + // Subscription is implemented by event subscriptions. type Subscription interface { // Chan returns a channel that carries events. // Implementations should return the same channel // for any subsequent calls to Chan. - Chan() <-chan interface{} + Chan() <-chan *Event // Unsubscribe stops delivery of events to a subscription. // The event channel is closed. @@ -82,6 +89,10 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { // Post sends an event to all receivers registered for the given type. // It returns ErrMuxClosed if the mux has been stopped. func (mux *TypeMux) Post(ev interface{}) error { + event := &Event{ + Time: time.Now(), + Data: ev, + } rtyp := reflect.TypeOf(ev) mux.mutex.RLock() if mux.stopped { @@ -91,7 +102,7 @@ func (mux *TypeMux) Post(ev interface{}) error { subs := mux.subm[rtyp] mux.mutex.RUnlock() for _, sub := range subs { - sub.deliver(ev) + sub.deliver(event) } return nil } @@ -143,6 +154,7 @@ func posdelete(slice []*muxsub, pos int) []*muxsub { type muxsub struct { mux *TypeMux + created time.Time closeMu sync.Mutex closing chan struct{} closed bool @@ -151,21 +163,22 @@ type muxsub struct { // postC can be set to nil without affecting the return value of // Chan. postMu sync.RWMutex - readC <-chan interface{} - postC chan<- interface{} + readC <-chan *Event + postC chan<- *Event } func newsub(mux *TypeMux) *muxsub { - c := make(chan interface{}) + c := make(chan *Event) return &muxsub{ mux: mux, + created: time.Now(), readC: c, postC: c, closing: make(chan struct{}), } } -func (s *muxsub) Chan() <-chan interface{} { +func (s *muxsub) Chan() <-chan *Event { return s.readC } @@ -189,11 +202,17 @@ func (s *muxsub) closewait() { s.postMu.Unlock() } -func (s *muxsub) deliver(ev interface{}) { +func (s *muxsub) deliver(event *Event) { + // Short circuit delivery if stale event + if s.created.After(event.Time) { + return + } + // Otherwise deliver the event s.postMu.RLock() + defer s.postMu.RUnlock() + select { - case s.postC <- ev: + case s.postC <- event: case <-s.closing: } - s.postMu.RUnlock() } diff --git a/event/event_test.go b/event/event_test.go index 465af38cd..323cfea49 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -37,7 +37,7 @@ func TestSub(t *testing.T) { }() ev := <-sub.Chan() - if ev.(testEvent) != testEvent(5) { + if ev.Data.(testEvent) != testEvent(5) { t.Errorf("Got %v (%T), expected event %v (%T)", ev, ev, testEvent(5), testEvent(5)) } diff --git a/event/example_test.go b/event/example_test.go index d4642ef2f..29938e853 100644 --- a/event/example_test.go +++ b/event/example_test.go @@ -30,7 +30,7 @@ func ExampleTypeMux() { sub := mux.Subscribe(someEvent{}, otherEvent{}) go func() { for event := range sub.Chan() { - fmt.Printf("Received: %#v\n", event) + fmt.Printf("Received: %#v\n", event.Data) } fmt.Println("done") close(done) -- cgit v1.2.3