diff options
author | Péter Szilágyi <peterke@gmail.com> | 2017-02-03 19:56:00 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-02-03 19:56:00 +0800 |
commit | 7734ead52030a6c128613331ea4a24afd9d4173a (patch) | |
tree | 7bed2d0085c7a9cac97691ccf561976119f45c78 /event/feed_test.go | |
parent | 8b57c494908637a5c0e74f8f7a13b3218e026757 (diff) | |
parent | 1bed9b3fea9939581b03cae9d6b4984ced456748 (diff) | |
download | dexon-7734ead52030a6c128613331ea4a24afd9d4173a.tar dexon-7734ead52030a6c128613331ea4a24afd9d4173a.tar.gz dexon-7734ead52030a6c128613331ea4a24afd9d4173a.tar.bz2 dexon-7734ead52030a6c128613331ea4a24afd9d4173a.tar.lz dexon-7734ead52030a6c128613331ea4a24afd9d4173a.tar.xz dexon-7734ead52030a6c128613331ea4a24afd9d4173a.tar.zst dexon-7734ead52030a6c128613331ea4a24afd9d4173a.zip |
Merge pull request #3605 from fjl/event-feed
event: add new Subscription type and related utilities
Diffstat (limited to 'event/feed_test.go')
-rw-r--r-- | event/feed_test.go | 294 |
1 files changed, 294 insertions, 0 deletions
diff --git a/event/feed_test.go b/event/feed_test.go new file mode 100644 index 000000000..a82c10303 --- /dev/null +++ b/event/feed_test.go @@ -0,0 +1,294 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package event + +import ( + "fmt" + "reflect" + "sync" + "testing" + "time" +) + +func TestFeedPanics(t *testing.T) { + { + var f Feed + f.Send(int(2)) + want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))} + if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil { + t.Error(err) + } + } + { + var f Feed + ch := make(chan int) + f.Subscribe(ch) + want := feedTypeError{op: "Send", got: reflect.TypeOf(uint64(0)), want: reflect.TypeOf(int(0))} + if err := checkPanic(want, func() { f.Send(uint64(2)) }); err != nil { + t.Error(err) + } + } + { + var f Feed + f.Send(int(2)) + want := feedTypeError{op: "Subscribe", got: reflect.TypeOf(make(chan uint64)), want: reflect.TypeOf(make(chan<- int))} + if err := checkPanic(want, func() { f.Subscribe(make(chan uint64)) }); err != nil { + t.Error(err) + } + } + { + var f Feed + if err := checkPanic(errBadChannel, func() { f.Subscribe(make(<-chan int)) }); err != nil { + t.Error(err) + } + } + { + var f Feed + if err := checkPanic(errBadChannel, func() { f.Subscribe(int(0)) }); err != nil { + t.Error(err) + } + } +} + +func checkPanic(want error, fn func()) (err error) { + defer func() { + panic := recover() + if panic == nil { + err = fmt.Errorf("didn't panic") + } else if !reflect.DeepEqual(panic, want) { + err = fmt.Errorf("panicked with wrong error: got %q, want %q", panic, want) + } + }() + fn() + return nil +} + +func TestFeed(t *testing.T) { + var feed Feed + var done, subscribed sync.WaitGroup + subscriber := func(i int) { + defer done.Done() + + subchan := make(chan int) + sub := feed.Subscribe(subchan) + timeout := time.NewTimer(2 * time.Second) + subscribed.Done() + + select { + case v := <-subchan: + if v != 1 { + t.Errorf("%d: received value %d, want 1", i, v) + } + case <-timeout.C: + t.Errorf("%d: receive timeout", i) + } + + sub.Unsubscribe() + select { + case _, ok := <-sub.Err(): + if ok { + t.Errorf("%d: error channel not closed after unsubscribe", i) + } + case <-timeout.C: + t.Errorf("%d: unsubscribe timeout", i) + } + } + + const n = 1000 + done.Add(n) + subscribed.Add(n) + for i := 0; i < n; i++ { + go subscriber(i) + } + subscribed.Wait() + if nsent := feed.Send(1); nsent != n { + t.Errorf("first send delivered %d times, want %d", nsent, n) + } + if nsent := feed.Send(2); nsent != 0 { + t.Errorf("second send delivered %d times, want 0", nsent) + } + done.Wait() +} + +func TestFeedSubscribeSameChannel(t *testing.T) { + var ( + feed Feed + done sync.WaitGroup + ch = make(chan int) + sub1 = feed.Subscribe(ch) + sub2 = feed.Subscribe(ch) + _ = feed.Subscribe(ch) + ) + expectSends := func(value, n int) { + if nsent := feed.Send(value); nsent != n { + t.Errorf("send delivered %d times, want %d", nsent, n) + } + done.Done() + } + expectRecv := func(wantValue, n int) { + for i := 0; i < n; i++ { + if v := <-ch; v != wantValue { + t.Errorf("received %d, want %d", v, wantValue) + } + } + } + + done.Add(1) + go expectSends(1, 3) + expectRecv(1, 3) + done.Wait() + + sub1.Unsubscribe() + + done.Add(1) + go expectSends(2, 2) + expectRecv(2, 2) + done.Wait() + + sub2.Unsubscribe() + + done.Add(1) + go expectSends(3, 1) + expectRecv(3, 1) + done.Wait() +} + +func TestFeedSubscribeBlockedPost(t *testing.T) { + var ( + feed Feed + nsends = 2000 + ch1 = make(chan int) + ch2 = make(chan int) + wg sync.WaitGroup + ) + defer wg.Wait() + + feed.Subscribe(ch1) + wg.Add(nsends) + for i := 0; i < nsends; i++ { + go func() { + feed.Send(99) + wg.Done() + }() + } + + sub2 := feed.Subscribe(ch2) + defer sub2.Unsubscribe() + + // We're done when ch1 has received N times. + // The number of receives on ch2 depends on scheduling. + for i := 0; i < nsends; { + select { + case <-ch1: + i++ + case <-ch2: + } + } +} + +func TestFeedUnsubscribeBlockedPost(t *testing.T) { + var ( + feed Feed + nsends = 200 + chans = make([]chan int, 2000) + subs = make([]Subscription, len(chans)) + bchan = make(chan int) + bsub = feed.Subscribe(bchan) + wg sync.WaitGroup + ) + for i := range chans { + chans[i] = make(chan int, nsends) + } + + // Queue up some Sends. None of these can make progress while bchan isn't read. + wg.Add(nsends) + for i := 0; i < nsends; i++ { + go func() { + feed.Send(99) + wg.Done() + }() + } + // Subscribe the other channels. + for i, ch := range chans { + subs[i] = feed.Subscribe(ch) + } + // Unsubscribe them again. + for _, sub := range subs { + sub.Unsubscribe() + } + // Unblock the Sends. + bsub.Unsubscribe() + wg.Wait() +} + +func TestFeedUnsubscribeFromInbox(t *testing.T) { + var ( + feed Feed + ch1 = make(chan int) + ch2 = make(chan int) + sub1 = feed.Subscribe(ch1) + sub2 = feed.Subscribe(ch1) + sub3 = feed.Subscribe(ch2) + ) + if len(feed.inbox) != 3 { + t.Errorf("inbox length != 3 after subscribe") + } + if len(feed.sendCases) != 1 { + t.Errorf("sendCases is non-empty after unsubscribe") + } + + sub1.Unsubscribe() + sub2.Unsubscribe() + sub3.Unsubscribe() + if len(feed.inbox) != 0 { + t.Errorf("inbox is non-empty after unsubscribe") + } + if len(feed.sendCases) != 1 { + t.Errorf("sendCases is non-empty after unsubscribe") + } +} + +func BenchmarkFeedSend1000(b *testing.B) { + var ( + done sync.WaitGroup + feed Feed + nsubs = 1000 + ) + subscriber := func(ch <-chan int) { + for i := 0; i < b.N; i++ { + <-ch + } + done.Done() + } + done.Add(nsubs) + for i := 0; i < nsubs; i++ { + ch := make(chan int, 200) + feed.Subscribe(ch) + go subscriber(ch) + } + + // The actual benchmark. + b.ResetTimer() + for i := 0; i < b.N; i++ { + if feed.Send(i) != nsubs { + panic("wrong number of sends") + } + } + + b.StopTimer() + done.Wait() +} |