From 9b62facdd4bdabfed5ef98d131686c4d2606083a Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Sat, 10 Dec 2016 19:02:14 +0100 Subject: event: deprecate TypeMux and related types The Subscription type is gone, all uses are replaced by *TypeMuxSubscription. This change is prep-work for the introduction of the new Subscription type in a later commit. gorename -from '"github.com/ethereum/go-ethereum/event"::Event' -to TypeMuxEvent gorename -from '"github.com/ethereum/go-ethereum/event"::muxsub' -to TypeMuxSubscription gofmt -w -r 'Subscription -> *TypeMuxSubscription' ./event/*.go find . -name '*.go' -and -not -regex '\./vendor/.*' \| xargs gofmt -w -r 'event.Subscription -> *event.TypeMuxSubscription' --- core/tx_pool.go | 2 +- eth/filters/filter_system.go | 4 +-- eth/handler.go | 4 +-- event/event.go | 58 ++++++++++++++++++-------------------------- light/txpool.go | 2 +- miner/worker.go | 2 +- 6 files changed, 31 insertions(+), 41 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 58922f12f..ca16c1ba3 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -89,7 +89,7 @@ type TxPool struct { gasLimit func() *big.Int // The current gas limit function callback minGasPrice *big.Int eventMux *event.TypeMux - events event.Subscription + events *event.TypeMuxSubscription localTx *txSet signer types.Signer mu sync.RWMutex diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index e0ee2ff51..3adf8111a 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -74,7 +74,7 @@ type subscription struct { // subscription which match the subscription criteria. type EventSystem struct { mux *event.TypeMux - sub event.Subscription + sub *event.TypeMuxSubscription backend Backend lightMode bool lastHead *types.Header @@ -277,7 +277,7 @@ func (es *EventSystem) SubscribePendingTxEvents(hashes chan common.Hash) *Subscr type filterIndex map[Type]map[rpc.ID]*subscription // broadcast event to filters that match criteria. -func (es *EventSystem) broadcast(filters filterIndex, ev *event.Event) { +func (es *EventSystem) broadcast(filters filterIndex, ev *event.TypeMuxEvent) { if ev == nil { return } diff --git a/eth/handler.go b/eth/handler.go index 63ba0821f..691fc0677 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -78,8 +78,8 @@ type ProtocolManager struct { SubProtocols []p2p.Protocol eventMux *event.TypeMux - txSub event.Subscription - minedBlockSub event.Subscription + txSub *event.TypeMuxSubscription + minedBlockSub *event.TypeMuxSubscription // channels for fetcher, syncer, txsyncLoop newPeerCh chan *peer diff --git a/event/event.go b/event/event.go index fd0bcfbd4..f8a2eb013 100644 --- a/event/event.go +++ b/event/event.go @@ -25,33 +25,22 @@ import ( "time" ) -// Event is a time-tagged notification pushed to subscribers. -type Event struct { +// TypeMuxEvent is a time-tagged notification pushed to subscribers. +type TypeMuxEvent struct { Time time.Time Data interface{} } -// Subscription is implemented by event subscriptions. -type Subscription interface { - // Chan returns a channel that carries events. - // Implementations should return the same channel - // for any subsequent calls to Chan. - Chan() <-chan *Event - - // Unsubscribe stops delivery of events to a subscription. - // The event channel is closed. - // Unsubscribe can be called more than once. - Unsubscribe() -} - // A TypeMux dispatches events to registered receivers. Receivers can be // registered to handle events of certain type. Any operation // called after mux is stopped will return ErrMuxClosed. // // The zero value is ready to use. +// +// Deprecated: use Feed type TypeMux struct { mutex sync.RWMutex - subm map[reflect.Type][]*muxsub + subm map[reflect.Type][]*TypeMuxSubscription stopped bool } @@ -61,7 +50,7 @@ var ErrMuxClosed = errors.New("event: mux closed") // Subscribe creates a subscription for events of the given types. The // subscription's channel is closed when it is unsubscribed // or the mux is closed. -func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { +func (mux *TypeMux) Subscribe(types ...interface{}) *TypeMuxSubscription { sub := newsub(mux) mux.mutex.Lock() defer mux.mutex.Unlock() @@ -72,7 +61,7 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { close(sub.postC) } else { if mux.subm == nil { - mux.subm = make(map[reflect.Type][]*muxsub) + mux.subm = make(map[reflect.Type][]*TypeMuxSubscription) } for _, t := range types { rtyp := reflect.TypeOf(t) @@ -80,7 +69,7 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { if find(oldsubs, sub) != -1 { panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp)) } - subs := make([]*muxsub, len(oldsubs)+1) + subs := make([]*TypeMuxSubscription, len(oldsubs)+1) copy(subs, oldsubs) subs[len(oldsubs)] = sub mux.subm[rtyp] = subs @@ -92,7 +81,7 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { // Post sends an event to all receivers registered for the given type. // It returns ErrMuxClosed if the mux has been stopped. func (mux *TypeMux) Post(ev interface{}) error { - event := &Event{ + event := &TypeMuxEvent{ Time: time.Now(), Data: ev, } @@ -125,7 +114,7 @@ func (mux *TypeMux) Stop() { mux.mutex.Unlock() } -func (mux *TypeMux) del(s *muxsub) { +func (mux *TypeMux) del(s *TypeMuxSubscription) { mux.mutex.Lock() for typ, subs := range mux.subm { if pos := find(subs, s); pos >= 0 { @@ -139,7 +128,7 @@ func (mux *TypeMux) del(s *muxsub) { s.mux.mutex.Unlock() } -func find(slice []*muxsub, item *muxsub) int { +func find(slice []*TypeMuxSubscription, item *TypeMuxSubscription) int { for i, v := range slice { if v == item { return i @@ -148,14 +137,15 @@ func find(slice []*muxsub, item *muxsub) int { return -1 } -func posdelete(slice []*muxsub, pos int) []*muxsub { - news := make([]*muxsub, len(slice)-1) +func posdelete(slice []*TypeMuxSubscription, pos int) []*TypeMuxSubscription { + news := make([]*TypeMuxSubscription, len(slice)-1) copy(news[:pos], slice[:pos]) copy(news[pos:], slice[pos+1:]) return news } -type muxsub struct { +// TypeMuxSubscription is a subscription established through TypeMux. +type TypeMuxSubscription struct { mux *TypeMux created time.Time closeMu sync.Mutex @@ -166,13 +156,13 @@ type muxsub struct { // postC can be set to nil without affecting the return value of // Chan. postMu sync.RWMutex - readC <-chan *Event - postC chan<- *Event + readC <-chan *TypeMuxEvent + postC chan<- *TypeMuxEvent } -func newsub(mux *TypeMux) *muxsub { - c := make(chan *Event) - return &muxsub{ +func newsub(mux *TypeMux) *TypeMuxSubscription { + c := make(chan *TypeMuxEvent) + return &TypeMuxSubscription{ mux: mux, created: time.Now(), readC: c, @@ -181,16 +171,16 @@ func newsub(mux *TypeMux) *muxsub { } } -func (s *muxsub) Chan() <-chan *Event { +func (s *TypeMuxSubscription) Chan() <-chan *TypeMuxEvent { return s.readC } -func (s *muxsub) Unsubscribe() { +func (s *TypeMuxSubscription) Unsubscribe() { s.mux.del(s) s.closewait() } -func (s *muxsub) closewait() { +func (s *TypeMuxSubscription) closewait() { s.closeMu.Lock() defer s.closeMu.Unlock() if s.closed { @@ -205,7 +195,7 @@ func (s *muxsub) closewait() { s.postMu.Unlock() } -func (s *muxsub) deliver(event *Event) { +func (s *TypeMuxSubscription) deliver(event *TypeMuxEvent) { // Short circuit delivery if stale event if s.created.After(event.Time) { return diff --git a/light/txpool.go b/light/txpool.go index d0781593b..bcdb6123d 100644 --- a/light/txpool.go +++ b/light/txpool.go @@ -47,7 +47,7 @@ type TxPool struct { signer types.Signer quit chan bool eventMux *event.TypeMux - events event.Subscription + events *event.TypeMuxSubscription mu sync.RWMutex chain *LightChain odr OdrBackend diff --git a/miner/worker.go b/miner/worker.go index 77e4e0205..49ac60253 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -90,7 +90,7 @@ type worker struct { // update loop mux *event.TypeMux - events event.Subscription + events *event.TypeMuxSubscription wg sync.WaitGroup agents map[Agent]struct{} -- cgit v1.2.3 From 6d5e100d0dc6fc0b905610850a75b5d4fa907739 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 19 Jul 2016 01:39:12 +0200 Subject: event: add new Subscription type and related utilities This commit introduces a new Subscription type, which is synonymous with ethereum.Subscription. It also adds a couple of utilities that make working with Subscriptions easier. The mot complex utility is Feed, a synchronisation device that implements broadcast subscriptions. Feed is slightly faster than TypeMux and will replace uses of TypeMux across the go-ethereum codebase in the future. --- event/event.go | 2 +- event/event_test.go | 30 +++- event/example_feed_test.go | 73 ++++++++++ event/example_scope_test.go | 128 +++++++++++++++++ event/example_subscription_test.go | 56 ++++++++ event/feed.go | 240 ++++++++++++++++++++++++++++++++ event/feed_test.go | 226 ++++++++++++++++++++++++++++++ event/subscription.go | 275 +++++++++++++++++++++++++++++++++++++ event/subscription_test.go | 121 ++++++++++++++++ 9 files changed, 1144 insertions(+), 7 deletions(-) create mode 100644 event/example_feed_test.go create mode 100644 event/example_scope_test.go create mode 100644 event/example_subscription_test.go create mode 100644 event/feed.go create mode 100644 event/feed_test.go create mode 100644 event/subscription.go create mode 100644 event/subscription_test.go diff --git a/event/event.go b/event/event.go index f8a2eb013..d3e84f0f7 100644 --- a/event/event.go +++ b/event/event.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// Package event implements an event multiplexer. +// Package event deals with subscriptions to real-time events. package event import ( diff --git a/event/event_test.go b/event/event_test.go index 2c56ecf29..a12945a47 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -149,16 +149,34 @@ func emptySubscriber(mux *TypeMux, types ...interface{}) { }() } -func BenchmarkPost3(b *testing.B) { - var mux = new(TypeMux) - defer mux.Stop() - emptySubscriber(mux, testEvent(0)) - emptySubscriber(mux, testEvent(0)) - emptySubscriber(mux, testEvent(0)) +func BenchmarkPost1000(b *testing.B) { + var ( + mux = new(TypeMux) + subscribed, done sync.WaitGroup + nsubs = 1000 + ) + subscribed.Add(nsubs) + done.Add(nsubs) + for i := 0; i < nsubs; i++ { + go func() { + s := mux.Subscribe(testEvent(0)) + subscribed.Done() + for range s.Chan() { + } + done.Done() + }() + } + subscribed.Wait() + // The actual benchmark. + b.ResetTimer() for i := 0; i < b.N; i++ { mux.Post(testEvent(0)) } + + b.StopTimer() + mux.Stop() + done.Wait() } func BenchmarkPostConcurrent(b *testing.B) { diff --git a/event/example_feed_test.go b/event/example_feed_test.go new file mode 100644 index 000000000..63436b226 --- /dev/null +++ b/event/example_feed_test.go @@ -0,0 +1,73 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package event_test + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/event" +) + +func ExampleFeed_acknowledgedEvents() { + // This example shows how the return value of Send can be used for request/reply + // interaction between event consumers and producers. + var feed event.Feed + type ackedEvent struct { + i int + ack chan<- struct{} + } + + // Consumers wait for events on the feed and acknowledge processing. + done := make(chan struct{}) + defer close(done) + for i := 0; i < 3; i++ { + ch := make(chan ackedEvent, 100) + sub := feed.Subscribe(ch) + go func() { + defer sub.Unsubscribe() + for { + select { + case ev := <-ch: + fmt.Println(ev.i) // "process" the event + ev.ack <- struct{}{} + case <-done: + return + } + } + }() + } + + // The producer sends values of type ackedEvent with increasing values of i. + // It waits for all consumers to acknowledge before sending the next event. + for i := 0; i < 3; i++ { + acksignal := make(chan struct{}) + n := feed.Send(ackedEvent{i, acksignal}) + for ack := 0; ack < n; ack++ { + <-acksignal + } + } + // Output: + // 0 + // 0 + // 0 + // 1 + // 1 + // 1 + // 2 + // 2 + // 2 +} diff --git a/event/example_scope_test.go b/event/example_scope_test.go new file mode 100644 index 000000000..c517a8324 --- /dev/null +++ b/event/example_scope_test.go @@ -0,0 +1,128 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package event_test + +import ( + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/event" +) + +// This example demonstrates how SubscriptionScope can be used to control the lifetime of +// subscriptions. +// +// Our example program consists of two servers, each of which performs a calculation when +// requested. The servers also allow subscribing to results of all computations. +type divServer struct{ results event.Feed } +type mulServer struct{ results event.Feed } + +func (s *divServer) do(a, b int) int { + r := a / b + s.results.Send(r) + return r +} + +func (s *mulServer) do(a, b int) int { + r := a * b + s.results.Send(r) + return r +} + +// The servers are contained in an App. The app controls the servers and exposes them +// through its API. +type App struct { + divServer + mulServer + scope event.SubscriptionScope +} + +func (s *App) Calc(op byte, a, b int) int { + switch op { + case '/': + return s.divServer.do(a, b) + case '*': + return s.mulServer.do(a, b) + default: + panic("invalid op") + } +} + +// The app's SubscribeResults method starts sending calculation results to the given +// channel. Subscriptions created through this method are tied to the lifetime of the App +// because they are registered in the scope. +func (s *App) SubscribeResults(op byte, ch chan<- int) event.Subscription { + switch op { + case '/': + return s.scope.Track(s.divServer.results.Subscribe(ch)) + case '*': + return s.scope.Track(s.mulServer.results.Subscribe(ch)) + default: + panic("invalid op") + } +} + +// Stop stops the App, closing all subscriptions created through SubscribeResults. +func (s *App) Stop() { + s.scope.Close() +} + +func ExampleSubscriptionScope() { + // Create the app. + var ( + app App + wg sync.WaitGroup + divs = make(chan int) + muls = make(chan int) + ) + + // Run a subscriber in the background. + divsub := app.SubscribeResults('/', divs) + mulsub := app.SubscribeResults('*', muls) + wg.Add(1) + go func() { + defer wg.Done() + defer fmt.Println("subscriber exited") + defer divsub.Unsubscribe() + defer mulsub.Unsubscribe() + for { + select { + case result := <-divs: + fmt.Println("division happened:", result) + case result := <-muls: + fmt.Println("multiplication happened:", result) + case <-divsub.Err(): + return + case <-mulsub.Err(): + return + } + } + }() + + // Interact with the app. + app.Calc('/', 22, 11) + app.Calc('*', 3, 4) + + // Stop the app. This shuts down the subscriptions, causing the subscriber to exit. + app.Stop() + wg.Wait() + + // Output: + // division happened: 2 + // multiplication happened: 12 + // subscriber exited +} diff --git a/event/example_subscription_test.go b/event/example_subscription_test.go new file mode 100644 index 000000000..de1126689 --- /dev/null +++ b/event/example_subscription_test.go @@ -0,0 +1,56 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package event_test + +import ( + "fmt" + + "github.com/ethereum/go-ethereum/event" +) + +func ExampleNewSubscription() { + // Create a subscription that sends 10 integers on ch. + ch := make(chan int) + sub := event.NewSubscription(func(quit <-chan struct{}) error { + for i := 0; i < 10; i++ { + select { + case ch <- i: + case <-quit: + fmt.Println("unsubscribed") + return nil + } + } + return nil + }) + + // This is the consumer. It reads 5 integers, then aborts the subscription. + // Note that Unsubscribe waits until the producer has shut down. + for i := range ch { + fmt.Println(i) + if i == 4 { + sub.Unsubscribe() + break + } + } + // Output: + // 0 + // 1 + // 2 + // 3 + // 4 + // unsubscribed +} diff --git a/event/feed.go b/event/feed.go new file mode 100644 index 000000000..bd8e26321 --- /dev/null +++ b/event/feed.go @@ -0,0 +1,240 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package event + +import ( + "errors" + "reflect" + "sync" +) + +var errBadChannel = errors.New("event: Subscribe argument does not have sendable channel type") + +// Feed implements one-to-many subscriptions where the carrier of events is a channel. +// Values sent to a Feed are delivered to all subscribed channels simultaneously. +// +// Feeds can only be used with a single type. The type is determined by the first Send or +// Subscribe operation. Subsequent calls to these methods panic if the type does not +// match. +// +// The zero value is ready to use. +type Feed struct { + sendLock chan struct{} // one-element buffer, empty when held + removeSub chan interface{} // interrupts Send + sendCases caseList // the active set of select cases used by Send + + // The inbox holds newly subscribed channels until they are added to sendCases. + mu sync.Mutex + inbox caseList + etype reflect.Type + closed bool +} + +type feedTypeError struct { + got, want reflect.Type + op string +} + +func (e feedTypeError) Error() string { + return "event: wrong type in " + e.op + " got " + e.got.String() + ", want " + e.want.String() +} + +func (f *Feed) init() { + if f.sendLock != nil { + return + } + f.removeSub = make(chan interface{}) + f.sendLock = make(chan struct{}, 1) + f.sendLock <- struct{}{} + f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}} +} + +// Subscribe adds a channel to the feed. Future sends will be delivered on the channel +// until the subscription is canceled. All channels added must have the same element type. +// +// The channel should have ample buffer space to avoid blocking other subscribers. +func (f *Feed) Subscribe(channel interface{}) Subscription { + chanval := reflect.ValueOf(channel) + chantyp := chanval.Type() + if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 { + panic(errBadChannel) + } + sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)} + + f.mu.Lock() + defer f.mu.Unlock() + f.init() + if !f.typecheck(chantyp.Elem()) { + panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)}) + } + // Add the select case to the inbox. + // The next Send will add it to f.sendCases. + cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval} + f.inbox = append(f.inbox, cas) + return sub +} + +// note: callers must hold f.mu +func (f *Feed) typecheck(typ reflect.Type) bool { + if f.etype == nil { + f.etype = typ + return true + } + return f.etype == typ +} + +func (f *Feed) remove(sub *feedSub) { + // Delete from inbox first, which covers channels + // that have not been added to f.sendCases yet. + ch := sub.channel.Interface() + f.mu.Lock() + index := f.inbox.find(ch) + if index != -1 { + f.inbox = f.inbox.delete(index) + f.mu.Unlock() + return + } + f.mu.Unlock() + + select { + case f.removeSub <- ch: + // Send will remove the channel from f.sendCases. + case <-f.sendLock: + // No Send is in progress, delete the channel now that we have the send lock. + f.sendCases = f.sendCases.delete(f.sendCases.find(ch)) + f.sendLock <- struct{}{} + } +} + +// Send delivers to all subscribed channels simultaneously. +// It returns the number of subscribers that the value was sent to. +func (f *Feed) Send(value interface{}) (nsent int) { + f.mu.Lock() + f.init() + <-f.sendLock + // Add new subscriptions from the inbox, then clear it. + f.sendCases = append(f.sendCases, f.inbox...) + for i := range f.inbox { + f.inbox[i] = reflect.SelectCase{} + } + f.inbox = f.inbox[:0] + f.mu.Unlock() + + // Set the sent value on all channels. + rvalue := reflect.ValueOf(value) + if !f.typecheck(rvalue.Type()) { + f.sendLock <- struct{}{} + panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype}) + } + for i := 1; i < len(f.sendCases); i++ { + f.sendCases[i].Send = rvalue + } + + // Send until all channels except removeSub have been chosen. + cases := f.sendCases + for { + // Fast path: try sending without blocking before adding to the select set. + // This should usually succeed if subscribers are fast enough and have free + // buffer space. + for i := 1; i < len(cases); i++ { + if cases[i].Chan.TrySend(rvalue) { + cases = cases.deactivate(i) + nsent++ + } + } + if len(cases) == 1 { + break + } + // Select on all the receivers, waiting for them to unblock. + chosen, recv, _ := reflect.Select(cases) + if chosen == 0 /* <-f.removeSub */ { + index := f.sendCases.find(recv.Interface()) + f.sendCases = f.sendCases.delete(index) + if index >= 0 && index < len(cases) { + cases = f.sendCases[:len(cases)-1] + } + } else { + cases = cases.deactivate(chosen) + nsent++ + } + } + + // Forget about the sent value and hand off the send lock. + for i := 1; i < len(f.sendCases); i++ { + f.sendCases[i].Send = reflect.Value{} + } + f.sendLock <- struct{}{} + return nsent +} + +type feedSub struct { + feed *Feed + channel reflect.Value + errOnce sync.Once + err chan error +} + +func (sub *feedSub) Unsubscribe() { + sub.errOnce.Do(func() { + sub.feed.remove(sub) + close(sub.err) + }) +} + +func (sub *feedSub) Err() <-chan error { + return sub.err +} + +type caseList []reflect.SelectCase + +// find returns the index of a case containing the given channel. +func (cs caseList) find(channel interface{}) int { + for i, cas := range cs { + if cas.Chan.Interface() == channel { + return i + } + } + return -1 +} + +// delete removes the given case from cs. +func (cs caseList) delete(index int) caseList { + return append(cs[:index], cs[index+1:]...) +} + +// deactivate moves the case at index into the non-accessible portion of the cs slice. +func (cs caseList) deactivate(index int) caseList { + last := len(cs) - 1 + cs[index], cs[last] = cs[last], cs[index] + return cs[:last] +} + +// func (cs caseList) String() string { +// s := "[" +// for i, cas := range cs { +// if i != 0 { +// s += ", " +// } +// switch cas.Dir { +// case reflect.SelectSend: +// s += fmt.Sprintf("%v<-", cas.Chan.Interface()) +// case reflect.SelectRecv: +// s += fmt.Sprintf("<-%v", cas.Chan.Interface()) +// } +// } +// return s + "]" +// } diff --git a/event/feed_test.go b/event/feed_test.go new file mode 100644 index 000000000..4f897c162 --- /dev/null +++ b/event/feed_test.go @@ -0,0 +1,226 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package event + +import ( + "fmt" + "reflect" + "sync" + "testing" + "time" +) + +func TestFeedPanics(t *testing.T) { + { + var f Feed + f.Send(int(2)) + want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))} + if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil { + t.Error(err) + } + } + { + var f Feed + ch := make(chan int) + f.Subscribe(ch) + want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))} + if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil { + t.Error(err) + } + } + { + var f Feed + f.Send(int(2)) + want := feedTypeError{op: "Subscribe", got: reflect.TypeOf(make(chan uint64)), want: reflect.TypeOf(make(chan<- int))} + if err := checkPanic(want, func() { f.Subscribe(make(chan uint64)) }); err != nil { + t.Error(err) + } + } + { + var f Feed + if err := checkPanic(errBadChannel, func() { f.Subscribe(make(<-chan int)) }); err != nil { + t.Error(err) + } + } + { + var f Feed + if err := checkPanic(errBadChannel, func() { f.Subscribe(int(0)) }); err != nil { + t.Error(err) + } + } +} + +func checkPanic(want error, fn func()) (err error) { + defer func() { + panic := recover() + if panic == nil { + err = fmt.Errorf("didn't panic") + } else if !reflect.DeepEqual(panic, want) { + err = fmt.Errorf("panicked with wrong error: got %q, want %q", panic, want) + } + }() + fn() + return nil +} + +func TestFeed(t *testing.T) { + var feed Feed + var done, subscribed sync.WaitGroup + subscriber := func(i int) { + defer done.Done() + + subchan := make(chan int) + sub := feed.Subscribe(subchan) + timeout := time.NewTimer(2 * time.Second) + subscribed.Done() + + select { + case v := <-subchan: + if v != 1 { + t.Errorf("%d: received value %d, want 1", i, v) + } + case <-timeout.C: + t.Errorf("%d: receive timeout", i) + } + + sub.Unsubscribe() + select { + case _, ok := <-sub.Err(): + if ok { + t.Errorf("%d: error channel not closed after unsubscribe", i) + } + case <-timeout.C: + t.Errorf("%d: unsubscribe timeout", i) + } + } + + const n = 1000 + done.Add(n) + subscribed.Add(n) + for i := 0; i < n; i++ { + go subscriber(i) + } + subscribed.Wait() + if nsent := feed.Send(1); nsent != n { + t.Errorf("first send delivered %d times, want %d", nsent, n) + } + if nsent := feed.Send(2); nsent != 0 { + t.Errorf("second send delivered %d times, want 0", nsent) + } + done.Wait() +} + +func TestFeedSubscribeSameChannel(t *testing.T) { + var ( + feed Feed + done sync.WaitGroup + ch = make(chan int) + sub1 = feed.Subscribe(ch) + sub2 = feed.Subscribe(ch) + _ = feed.Subscribe(ch) + ) + expectSends := func(value, n int) { + if nsent := feed.Send(value); nsent != n { + t.Errorf("send delivered %d times, want %d", nsent, n) + } + done.Done() + } + expectRecv := func(wantValue, n int) { + for i := 0; i < n; i++ { + if v := <-ch; v != wantValue { + t.Errorf("received %d, want %d", v, wantValue) + } + } + } + + done.Add(1) + go expectSends(1, 3) + expectRecv(1, 3) + done.Wait() + + sub1.Unsubscribe() + + done.Add(1) + go expectSends(2, 2) + expectRecv(2, 2) + done.Wait() + + sub2.Unsubscribe() + + done.Add(1) + go expectSends(3, 1) + expectRecv(3, 1) + done.Wait() +} + +func TestFeedUnsubscribeFromInbox(t *testing.T) { + var ( + feed Feed + ch1 = make(chan int) + ch2 = make(chan int) + sub1 = feed.Subscribe(ch1) + sub2 = feed.Subscribe(ch1) + sub3 = feed.Subscribe(ch2) + ) + if len(feed.inbox) != 3 { + t.Errorf("inbox length != 3 after subscribe") + } + if len(feed.sendCases) != 1 { + t.Errorf("sendCases is non-empty after unsubscribe") + } + + sub1.Unsubscribe() + sub2.Unsubscribe() + sub3.Unsubscribe() + if len(feed.inbox) != 0 { + t.Errorf("inbox is non-empty after unsubscribe") + } + if len(feed.sendCases) != 1 { + t.Errorf("sendCases is non-empty after unsubscribe") + } +} + +func BenchmarkFeedSend1000(b *testing.B) { + var ( + done sync.WaitGroup + feed Feed + nsubs = 1000 + ) + subscriber := func(ch <-chan int) { + for i := 0; i < b.N; i++ { + <-ch + } + done.Done() + } + done.Add(nsubs) + for i := 0; i < nsubs; i++ { + ch := make(chan int, 200) + feed.Subscribe(ch) + go subscriber(ch) + } + + // The actual benchmark. + b.ResetTimer() + for i := 0; i < b.N; i++ { + if feed.Send(i) != nsubs { + panic("wrong number of sends") + } + } + + b.StopTimer() + done.Wait() +} diff --git a/event/subscription.go b/event/subscription.go new file mode 100644 index 000000000..7f2619b2d --- /dev/null +++ b/event/subscription.go @@ -0,0 +1,275 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package event + +import ( + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/mclock" + "golang.org/x/net/context" +) + +// Subscription represents a stream of events. The carrier of the events is typically a +// channel, but isn't part of the interface. +// +// Subscriptions can fail while established. Failures are reported through an error +// channel. It receives a value if there is an issue with the subscription (e.g. the +// network connection delivering the events has been closed). Only one value will ever be +// sent. +// +// The error channel is closed when the subscription ends successfully (i.e. when the +// source of events is closed). It is also closed when Unsubscribe is called. +// +// The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all +// cases to ensure that resources related to the subscription are released. It can be +// called any number of times. +type Subscription interface { + Err() <-chan error // returns the error channel + Unsubscribe() // cancels sending of events, closing the error channel +} + +// NewSubscription runs fn as a subscription in a new goroutine. The channel given to fn +// is closed when Unsubscribe is called. If fn returns an error, it is sent on the +// subscription's error channel. +func NewSubscription(fn func(<-chan struct{}) error) Subscription { + s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)} + go func() { + defer close(s.err) + err := fn(s.unsub) + s.mu.Lock() + defer s.mu.Unlock() + if !s.unsubscribed { + if err != nil { + s.err <- err + } + s.unsubscribed = true + } + }() + return s +} + +type funcSub struct { + unsub chan struct{} + err chan error + mu sync.Mutex + unsubscribed bool +} + +func (s *funcSub) Unsubscribe() { + s.mu.Lock() + if s.unsubscribed { + s.mu.Unlock() + return + } + s.unsubscribed = true + close(s.unsub) + s.mu.Unlock() + // Wait for producer shutdown. + <-s.err +} + +func (s *funcSub) Err() <-chan error { + return s.err +} + +// Resubscribe calls fn repeatedly to keep a subscription established. When the +// subscription is established, Resubscribe waits for it to fail and calls fn again. This +// process repeats until Unsubscribe is called or the active subscription ends +// successfully. +// +// Resubscribe applies backoff between calls to fn. The time between calls is adapted +// based on the error rate, but will never exceed backoffMax. +func Resubscribe(backoffMax time.Duration, fn ResubscribeFunc) Subscription { + s := &resubscribeSub{ + waitTime: backoffMax / 10, + backoffMax: backoffMax, + fn: fn, + err: make(chan error), + unsub: make(chan struct{}), + } + go s.loop() + return s +} + +// A ResubscribeFunc attempts to establish a subscription. +type ResubscribeFunc func(context.Context) (Subscription, error) + +type resubscribeSub struct { + fn ResubscribeFunc + err chan error + unsub chan struct{} + unsubOnce sync.Once + lastTry mclock.AbsTime + waitTime, backoffMax time.Duration +} + +func (s *resubscribeSub) Unsubscribe() { + s.unsubOnce.Do(func() { + s.unsub <- struct{}{} + <-s.err + }) +} + +func (s *resubscribeSub) Err() <-chan error { + return s.err +} + +func (s *resubscribeSub) loop() { + defer close(s.err) + var done bool + for !done { + sub := s.subscribe() + if sub == nil { + break + } + done = s.waitForError(sub) + sub.Unsubscribe() + } +} + +func (s *resubscribeSub) subscribe() Subscription { + subscribed := make(chan error) + var sub Subscription +retry: + for { + s.lastTry = mclock.Now() + ctx, cancel := context.WithCancel(context.Background()) + go func() { + rsub, err := s.fn(ctx) + sub = rsub + subscribed <- err + }() + select { + case err := <-subscribed: + cancel() + if err != nil { + // Subscribing failed, wait before launching the next try. + if s.backoffWait() { + return nil + } + continue retry + } + if sub == nil { + panic("event: ResubscribeFunc returned nil subscription and no error") + } + return sub + case <-s.unsub: + cancel() + return nil + } + } +} + +func (s *resubscribeSub) waitForError(sub Subscription) bool { + defer sub.Unsubscribe() + select { + case err := <-sub.Err(): + return err == nil + case <-s.unsub: + return true + } +} + +func (s *resubscribeSub) backoffWait() bool { + if time.Duration(mclock.Now()-s.lastTry) > s.backoffMax { + s.waitTime = s.backoffMax / 10 + } else { + s.waitTime *= 2 + if s.waitTime > s.backoffMax { + s.waitTime = s.backoffMax + } + } + + t := time.NewTimer(s.waitTime) + defer t.Stop() + select { + case <-t.C: + return false + case <-s.unsub: + return true + } +} + +// SubscriptionScope provides a facility to unsubscribe multiple subscriptions at once. +// +// For code that handle more than one subscription, a scope can be used to conveniently +// unsubscribe all of them with a single call. The example demonstrates a typical use in a +// larger program. +// +// The zero value is ready to use. +type SubscriptionScope struct { + mu sync.Mutex + subs map[*scopeSub]struct{} + closed bool +} + +type scopeSub struct { + sc *SubscriptionScope + s Subscription +} + +// Track starts tracking a subscription. If the scope is closed, Track returns nil. The +// returned subscription is a wrapper. Unsubscribing the wrapper removes it from the +// scope. +func (sc *SubscriptionScope) Track(s Subscription) Subscription { + sc.mu.Lock() + defer sc.mu.Unlock() + if sc.closed { + return nil + } + if sc.subs == nil { + sc.subs = make(map[*scopeSub]struct{}) + } + ss := &scopeSub{sc, s} + sc.subs[ss] = struct{}{} + return ss +} + +// Close calls Unsubscribe on all tracked subscriptions and prevents further additions to +// the tracked set. Calls to Track after Close return nil. +func (sc *SubscriptionScope) Close() { + sc.mu.Lock() + defer sc.mu.Unlock() + if sc.closed { + return + } + sc.closed = true + for s := range sc.subs { + s.s.Unsubscribe() + } + sc.subs = nil +} + +// Count returns the number of tracked subscriptions. +// It is meant to be used for debugging. +func (sc *SubscriptionScope) Count() int { + sc.mu.Lock() + defer sc.mu.Unlock() + return len(sc.subs) +} + +func (s *scopeSub) Unsubscribe() { + s.s.Unsubscribe() + s.sc.mu.Lock() + defer s.sc.mu.Unlock() + delete(s.sc.subs, s) +} + +func (s *scopeSub) Err() <-chan error { + return s.s.Err() +} diff --git a/event/subscription_test.go b/event/subscription_test.go new file mode 100644 index 000000000..a4fe30298 --- /dev/null +++ b/event/subscription_test.go @@ -0,0 +1,121 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package event + +import ( + "errors" + "testing" + "time" + + "golang.org/x/net/context" +) + +var errInts = errors.New("error in subscribeInts") + +func subscribeInts(max, fail int, c chan<- int) Subscription { + return NewSubscription(func(quit <-chan struct{}) error { + for i := 0; i < max; i++ { + if i >= fail { + return errInts + } + select { + case c <- i: + case <-quit: + return nil + } + } + return nil + }) +} + +func TestNewSubscriptionError(t *testing.T) { + t.Parallel() + + channel := make(chan int) + sub := subscribeInts(10, 2, channel) +loop: + for want := 0; want < 10; want++ { + select { + case got := <-channel: + if got != want { + t.Fatalf("wrong int %d, want %d", got, want) + } + case err := <-sub.Err(): + if err != errInts { + t.Fatalf("wrong error: got %q, want %q", err, errInts) + } + if want != 2 { + t.Fatalf("got errInts at int %d, should be received at 2", want) + } + break loop + } + } + sub.Unsubscribe() + + err, ok := <-sub.Err() + if err != nil { + t.Fatal("got non-nil error after Unsubscribe") + } + if ok { + t.Fatal("channel still open after Unsubscribe") + } +} + +func TestResubscribe(t *testing.T) { + t.Parallel() + + var i int + nfails := 6 + sub := Resubscribe(100*time.Millisecond, func(ctx context.Context) (Subscription, error) { + // fmt.Printf("call #%d @ %v\n", i, time.Now()) + i++ + if i == 2 { + // Delay the second failure a bit to reset the resubscribe interval. + time.Sleep(200 * time.Millisecond) + } + if i < nfails { + return nil, errors.New("oops") + } + sub := NewSubscription(func(unsubscribed <-chan struct{}) error { return nil }) + return sub, nil + }) + + <-sub.Err() + if i != nfails { + t.Fatalf("resubscribe function called %d times, want %d times", i, nfails) + } +} + +func TestResubscribeAbort(t *testing.T) { + t.Parallel() + + done := make(chan error) + sub := Resubscribe(0, func(ctx context.Context) (Subscription, error) { + select { + case <-ctx.Done(): + done <- nil + case <-time.After(2 * time.Second): + done <- errors.New("context given to resubscribe function not canceled within 2s") + } + return nil, nil + }) + + sub.Unsubscribe() + if err := <-done; err != nil { + t.Fatal(err) + } +} -- cgit v1.2.3 From a2b4abd89adf0404b43fcd19766bac37009032a5 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Fri, 9 Dec 2016 19:51:21 +0100 Subject: rpc: send nil on subscription Err channel when Client is closed This change makes client subscriptions compatible with the new Subscription semantics introduced in the previous commit. --- rpc/client.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/rpc/client.go b/rpc/client.go index 34a3b7831..269eb78c8 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -682,7 +682,7 @@ func newClientSubscription(c *Client, channel reflect.Value) *ClientSubscription // resubscription when the client connection is closed unexpectedly. // // The error channel receives a value when the subscription has ended due -// to an error. The received error is ErrClientQuit if Close has been called +// to an error. The received error is nil if Close has been called // on the underlying client and no other error has occurred. // // The error channel is closed when Unsubscribe is called on the subscription. @@ -707,6 +707,9 @@ func (sub *ClientSubscription) quitWithError(err error, unsubscribeServer bool) sub.requestUnsubscribe() } if err != nil { + if err == ErrClientQuit { + err = nil // Adhere to subscription semantics. + } sub.err <- err } }) -- cgit v1.2.3 From 1bed9b3fea9939581b03cae9d6b4984ced456748 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 26 Jan 2017 11:57:31 +0100 Subject: event: address review issues (multiple commits) event: address Feed review issues event: clarify role of NewSubscription function event: more Feed review fixes * take sendLock after dropping f.mu * add constant for number of special cases event: fix subscribing/unsubscribing while Send is blocked --- event/feed.go | 31 ++++++++++++++--------- event/feed_test.go | 68 +++++++++++++++++++++++++++++++++++++++++++++++++++ event/subscription.go | 10 ++++---- 3 files changed, 93 insertions(+), 16 deletions(-) diff --git a/event/feed.go b/event/feed.go index bd8e26321..4568304df 100644 --- a/event/feed.go +++ b/event/feed.go @@ -33,7 +33,9 @@ var errBadChannel = errors.New("event: Subscribe argument does not have sendable // // The zero value is ready to use. type Feed struct { - sendLock chan struct{} // one-element buffer, empty when held + // sendLock has a one-element buffer and is empty when held. + // It protects sendCases. + sendLock chan struct{} removeSub chan interface{} // interrupts Send sendCases caseList // the active set of select cases used by Send @@ -44,6 +46,10 @@ type Feed struct { closed bool } +// This is the index of the first actual subscription channel in sendCases. +// sendCases[0] is a SelectRecv case for the removeSub channel. +const firstSubSendCase = 1 + type feedTypeError struct { got, want reflect.Type op string @@ -67,6 +73,7 @@ func (f *Feed) init() { // until the subscription is canceled. All channels added must have the same element type. // // The channel should have ample buffer space to avoid blocking other subscribers. +// Slow subscribers are not dropped. func (f *Feed) Subscribe(channel interface{}) Subscription { chanval := reflect.ValueOf(channel) chantyp := chanval.Type() @@ -125,13 +132,14 @@ func (f *Feed) remove(sub *feedSub) { func (f *Feed) Send(value interface{}) (nsent int) { f.mu.Lock() f.init() + f.mu.Unlock() + <-f.sendLock - // Add new subscriptions from the inbox, then clear it. + + // Add new cases from the inbox after taking the send lock. + f.mu.Lock() f.sendCases = append(f.sendCases, f.inbox...) - for i := range f.inbox { - f.inbox[i] = reflect.SelectCase{} - } - f.inbox = f.inbox[:0] + f.inbox = nil f.mu.Unlock() // Set the sent value on all channels. @@ -140,7 +148,7 @@ func (f *Feed) Send(value interface{}) (nsent int) { f.sendLock <- struct{}{} panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype}) } - for i := 1; i < len(f.sendCases); i++ { + for i := firstSubSendCase; i < len(f.sendCases); i++ { f.sendCases[i].Send = rvalue } @@ -150,13 +158,14 @@ func (f *Feed) Send(value interface{}) (nsent int) { // Fast path: try sending without blocking before adding to the select set. // This should usually succeed if subscribers are fast enough and have free // buffer space. - for i := 1; i < len(cases); i++ { + for i := firstSubSendCase; i < len(cases); i++ { if cases[i].Chan.TrySend(rvalue) { - cases = cases.deactivate(i) nsent++ + cases = cases.deactivate(i) + i-- } } - if len(cases) == 1 { + if len(cases) == firstSubSendCase { break } // Select on all the receivers, waiting for them to unblock. @@ -174,7 +183,7 @@ func (f *Feed) Send(value interface{}) (nsent int) { } // Forget about the sent value and hand off the send lock. - for i := 1; i < len(f.sendCases); i++ { + for i := firstSubSendCase; i < len(f.sendCases); i++ { f.sendCases[i].Send = reflect.Value{} } f.sendLock <- struct{}{} diff --git a/event/feed_test.go b/event/feed_test.go index 4f897c162..a82c10303 100644 --- a/event/feed_test.go +++ b/event/feed_test.go @@ -167,6 +167,74 @@ func TestFeedSubscribeSameChannel(t *testing.T) { done.Wait() } +func TestFeedSubscribeBlockedPost(t *testing.T) { + var ( + feed Feed + nsends = 2000 + ch1 = make(chan int) + ch2 = make(chan int) + wg sync.WaitGroup + ) + defer wg.Wait() + + feed.Subscribe(ch1) + wg.Add(nsends) + for i := 0; i < nsends; i++ { + go func() { + feed.Send(99) + wg.Done() + }() + } + + sub2 := feed.Subscribe(ch2) + defer sub2.Unsubscribe() + + // We're done when ch1 has received N times. + // The number of receives on ch2 depends on scheduling. + for i := 0; i < nsends; { + select { + case <-ch1: + i++ + case <-ch2: + } + } +} + +func TestFeedUnsubscribeBlockedPost(t *testing.T) { + var ( + feed Feed + nsends = 200 + chans = make([]chan int, 2000) + subs = make([]Subscription, len(chans)) + bchan = make(chan int) + bsub = feed.Subscribe(bchan) + wg sync.WaitGroup + ) + for i := range chans { + chans[i] = make(chan int, nsends) + } + + // Queue up some Sends. None of these can make progress while bchan isn't read. + wg.Add(nsends) + for i := 0; i < nsends; i++ { + go func() { + feed.Send(99) + wg.Done() + }() + } + // Subscribe the other channels. + for i, ch := range chans { + subs[i] = feed.Subscribe(ch) + } + // Unsubscribe them again. + for _, sub := range subs { + sub.Unsubscribe() + } + // Unblock the Sends. + bsub.Unsubscribe() + wg.Wait() +} + func TestFeedUnsubscribeFromInbox(t *testing.T) { var ( feed Feed diff --git a/event/subscription.go b/event/subscription.go index 7f2619b2d..83bd21213 100644 --- a/event/subscription.go +++ b/event/subscription.go @@ -43,14 +43,14 @@ type Subscription interface { Unsubscribe() // cancels sending of events, closing the error channel } -// NewSubscription runs fn as a subscription in a new goroutine. The channel given to fn -// is closed when Unsubscribe is called. If fn returns an error, it is sent on the -// subscription's error channel. -func NewSubscription(fn func(<-chan struct{}) error) Subscription { +// NewSubscription runs a producer function as a subscription in a new goroutine. The +// channel given to the producer is closed when Unsubscribe is called. If fn returns an +// error, it is sent on the subscription's error channel. +func NewSubscription(producer func(<-chan struct{}) error) Subscription { s := &funcSub{unsub: make(chan struct{}), err: make(chan error, 1)} go func() { defer close(s.err) - err := fn(s.unsub) + err := producer(s.unsub) s.mu.Lock() defer s.mu.Unlock() if !s.unsubscribed { -- cgit v1.2.3