aboutsummaryrefslogtreecommitdiffstats
path: root/event/feed_test.go
diff options
context:
space:
mode:
authorFelix Lange <fjl@twurst.com>2016-07-19 07:39:12 +0800
committerFelix Lange <fjl@twurst.com>2017-01-26 01:44:20 +0800
commit6d5e100d0dc6fc0b905610850a75b5d4fa907739 (patch)
treea653dc592af1e7405faeaf9ec9b84f9b32aa3b88 /event/feed_test.go
parent9b62facdd4bdabfed5ef98d131686c4d2606083a (diff)
downloadgo-tangerine-6d5e100d0dc6fc0b905610850a75b5d4fa907739.tar
go-tangerine-6d5e100d0dc6fc0b905610850a75b5d4fa907739.tar.gz
go-tangerine-6d5e100d0dc6fc0b905610850a75b5d4fa907739.tar.bz2
go-tangerine-6d5e100d0dc6fc0b905610850a75b5d4fa907739.tar.lz
go-tangerine-6d5e100d0dc6fc0b905610850a75b5d4fa907739.tar.xz
go-tangerine-6d5e100d0dc6fc0b905610850a75b5d4fa907739.tar.zst
go-tangerine-6d5e100d0dc6fc0b905610850a75b5d4fa907739.zip
event: add new Subscription type and related utilities
This commit introduces a new Subscription type, which is synonymous with ethereum.Subscription. It also adds a couple of utilities that make working with Subscriptions easier. The mot complex utility is Feed, a synchronisation device that implements broadcast subscriptions. Feed is slightly faster than TypeMux and will replace uses of TypeMux across the go-ethereum codebase in the future.
Diffstat (limited to 'event/feed_test.go')
-rw-r--r--event/feed_test.go226
1 files changed, 226 insertions, 0 deletions
diff --git a/event/feed_test.go b/event/feed_test.go
new file mode 100644
index 000000000..4f897c162
--- /dev/null
+++ b/event/feed_test.go
@@ -0,0 +1,226 @@
+// Copyright 2016 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package event
+
+import (
+ "fmt"
+ "reflect"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestFeedPanics(t *testing.T) {
+ {
+ var f Feed
+ f.Send(int(2))
+ want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
+ if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
+ t.Error(err)
+ }
+ }
+ {
+ var f Feed
+ ch := make(chan int)
+ f.Subscribe(ch)
+ want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))}
+ if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil {
+ t.Error(err)
+ }
+ }
+ {
+ var f Feed
+ f.Send(int(2))
+ want := feedTypeError{op: "Subscribe", got: reflect.TypeOf(make(chan uint64)), want: reflect.TypeOf(make(chan<- int))}
+ if err := checkPanic(want, func() { f.Subscribe(make(chan uint64)) }); err != nil {
+ t.Error(err)
+ }
+ }
+ {
+ var f Feed
+ if err := checkPanic(errBadChannel, func() { f.Subscribe(make(<-chan int)) }); err != nil {
+ t.Error(err)
+ }
+ }
+ {
+ var f Feed
+ if err := checkPanic(errBadChannel, func() { f.Subscribe(int(0)) }); err != nil {
+ t.Error(err)
+ }
+ }
+}
+
+func checkPanic(want error, fn func()) (err error) {
+ defer func() {
+ panic := recover()
+ if panic == nil {
+ err = fmt.Errorf("didn't panic")
+ } else if !reflect.DeepEqual(panic, want) {
+ err = fmt.Errorf("panicked with wrong error: got %q, want %q", panic, want)
+ }
+ }()
+ fn()
+ return nil
+}
+
+func TestFeed(t *testing.T) {
+ var feed Feed
+ var done, subscribed sync.WaitGroup
+ subscriber := func(i int) {
+ defer done.Done()
+
+ subchan := make(chan int)
+ sub := feed.Subscribe(subchan)
+ timeout := time.NewTimer(2 * time.Second)
+ subscribed.Done()
+
+ select {
+ case v := <-subchan:
+ if v != 1 {
+ t.Errorf("%d: received value %d, want 1", i, v)
+ }
+ case <-timeout.C:
+ t.Errorf("%d: receive timeout", i)
+ }
+
+ sub.Unsubscribe()
+ select {
+ case _, ok := <-sub.Err():
+ if ok {
+ t.Errorf("%d: error channel not closed after unsubscribe", i)
+ }
+ case <-timeout.C:
+ t.Errorf("%d: unsubscribe timeout", i)
+ }
+ }
+
+ const n = 1000
+ done.Add(n)
+ subscribed.Add(n)
+ for i := 0; i < n; i++ {
+ go subscriber(i)
+ }
+ subscribed.Wait()
+ if nsent := feed.Send(1); nsent != n {
+ t.Errorf("first send delivered %d times, want %d", nsent, n)
+ }
+ if nsent := feed.Send(2); nsent != 0 {
+ t.Errorf("second send delivered %d times, want 0", nsent)
+ }
+ done.Wait()
+}
+
+func TestFeedSubscribeSameChannel(t *testing.T) {
+ var (
+ feed Feed
+ done sync.WaitGroup
+ ch = make(chan int)
+ sub1 = feed.Subscribe(ch)
+ sub2 = feed.Subscribe(ch)
+ _ = feed.Subscribe(ch)
+ )
+ expectSends := func(value, n int) {
+ if nsent := feed.Send(value); nsent != n {
+ t.Errorf("send delivered %d times, want %d", nsent, n)
+ }
+ done.Done()
+ }
+ expectRecv := func(wantValue, n int) {
+ for i := 0; i < n; i++ {
+ if v := <-ch; v != wantValue {
+ t.Errorf("received %d, want %d", v, wantValue)
+ }
+ }
+ }
+
+ done.Add(1)
+ go expectSends(1, 3)
+ expectRecv(1, 3)
+ done.Wait()
+
+ sub1.Unsubscribe()
+
+ done.Add(1)
+ go expectSends(2, 2)
+ expectRecv(2, 2)
+ done.Wait()
+
+ sub2.Unsubscribe()
+
+ done.Add(1)
+ go expectSends(3, 1)
+ expectRecv(3, 1)
+ done.Wait()
+}
+
+func TestFeedUnsubscribeFromInbox(t *testing.T) {
+ var (
+ feed Feed
+ ch1 = make(chan int)
+ ch2 = make(chan int)
+ sub1 = feed.Subscribe(ch1)
+ sub2 = feed.Subscribe(ch1)
+ sub3 = feed.Subscribe(ch2)
+ )
+ if len(feed.inbox) != 3 {
+ t.Errorf("inbox length != 3 after subscribe")
+ }
+ if len(feed.sendCases) != 1 {
+ t.Errorf("sendCases is non-empty after unsubscribe")
+ }
+
+ sub1.Unsubscribe()
+ sub2.Unsubscribe()
+ sub3.Unsubscribe()
+ if len(feed.inbox) != 0 {
+ t.Errorf("inbox is non-empty after unsubscribe")
+ }
+ if len(feed.sendCases) != 1 {
+ t.Errorf("sendCases is non-empty after unsubscribe")
+ }
+}
+
+func BenchmarkFeedSend1000(b *testing.B) {
+ var (
+ done sync.WaitGroup
+ feed Feed
+ nsubs = 1000
+ )
+ subscriber := func(ch <-chan int) {
+ for i := 0; i < b.N; i++ {
+ <-ch
+ }
+ done.Done()
+ }
+ done.Add(nsubs)
+ for i := 0; i < nsubs; i++ {
+ ch := make(chan int, 200)
+ feed.Subscribe(ch)
+ go subscriber(ch)
+ }
+
+ // The actual benchmark.
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ if feed.Send(i) != nsubs {
+ panic("wrong number of sends")
+ }
+ }
+
+ b.StopTimer()
+ done.Wait()
+}