From f5b8775bed8a49136c5d7e93bb0fb991bc2b1a4b Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 14 Oct 2014 01:56:24 +0200 Subject: event: new package for event multiplexer --- event/event.go | 162 ++++++++++++++++++++++++++++++++++++++++++++++++++++ event/event_test.go | 161 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 323 insertions(+) create mode 100644 event/event.go create mode 100644 event/event_test.go diff --git a/event/event.go b/event/event.go new file mode 100644 index 000000000..74f8043da --- /dev/null +++ b/event/event.go @@ -0,0 +1,162 @@ +// Package event implements an event multiplexer. +package event + +import ( + "errors" + "reflect" + "sync" +) + +type Subscription interface { + Chan() <-chan interface{} + Unsubscribe() +} + +// A TypeMux dispatches events to registered receivers. Receivers can be +// registered to handle events of certain type. Any operation +// called after mux is stopped will return ErrMuxClosed. +type TypeMux struct { + mutex sync.RWMutex + subm map[reflect.Type][]*muxsub + stopped bool +} + +var ErrMuxClosed = errors.New("event: mux closed") + +// NewTypeMux creates a running mux. +func NewTypeMux() *TypeMux { + return &TypeMux{subm: make(map[reflect.Type][]*muxsub)} +} + +// Subscribe creates a subscription for events of the given types. The +// subscription's channel is closed when it is unsubscribed +// or the mux is closed. +func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { + sub := newsub(mux) + mux.mutex.Lock() + if mux.stopped { + mux.mutex.Unlock() + close(sub.postC) + } else { + for _, t := range types { + rtyp := reflect.TypeOf(t) + oldsubs := mux.subm[rtyp] + subs := make([]*muxsub, len(oldsubs)+1) + copy(subs, oldsubs) + subs[len(oldsubs)] = sub + mux.subm[rtyp] = subs + } + mux.mutex.Unlock() + } + return sub +} + +// Post sends an event to all receivers registered for the given type. +// It returns ErrMuxClosed if the mux has been stopped. +func (mux *TypeMux) Post(ev interface{}) error { + rtyp := reflect.TypeOf(ev) + mux.mutex.RLock() + if mux.stopped { + mux.mutex.RUnlock() + return ErrMuxClosed + } + subs := mux.subm[rtyp] + mux.mutex.RUnlock() + for _, sub := range subs { + sub.deliver(ev) + } + return nil +} + +// Stop closes a mux. The mux can no longer be used. +// Future Post calls will fail with ErrMuxClosed. +// Stop blocks until all current deliveries have finished. +func (mux *TypeMux) Stop() { + mux.mutex.Lock() + for _, subs := range mux.subm { + for _, sub := range subs { + sub.closewait() + } + } + mux.subm = nil + mux.stopped = true + mux.mutex.Unlock() +} + +func (mux *TypeMux) del(s *muxsub) { + mux.mutex.Lock() + for typ, subs := range mux.subm { + if pos := find(subs, s); pos >= 0 { + if len(subs) == 1 { + delete(mux.subm, typ) + } else { + mux.subm[typ] = posdelete(subs, pos) + } + } + } + s.mux.mutex.Unlock() +} + +func find(slice []*muxsub, item *muxsub) int { + for i, v := range slice { + if v == item { + return i + } + } + return -1 +} + +func posdelete(slice []*muxsub, pos int) []*muxsub { + news := make([]*muxsub, len(slice)-1) + copy(news[:pos], slice[:pos]) + copy(news[pos:], slice[pos+1:]) + return news +} + +type muxsub struct { + mux *TypeMux + mutex sync.RWMutex + closing chan struct{} + + // these two are the same channel. they are stored separately so + // postC can be set to nil without affecting the return value of + // Chan. + readC <-chan interface{} + postC chan<- interface{} +} + +func newsub(mux *TypeMux) *muxsub { + c := make(chan interface{}) + return &muxsub{ + mux: mux, + readC: c, + postC: c, + closing: make(chan struct{}), + } +} + +func (s *muxsub) Chan() <-chan interface{} { + return s.readC +} + +func (s *muxsub) Unsubscribe() { + s.mux.del(s) + s.closewait() +} + +func (s *muxsub) closewait() { + close(s.closing) + s.mutex.Lock() + close(s.postC) + s.postC = nil + s.mutex.Unlock() +} + +func (s *muxsub) deliver(ev interface{}) { + s.mutex.RLock() + select { + case s.postC <- ev: + case <-s.closing: + } + s.mutex.RUnlock() +} diff --git a/event/event_test.go b/event/event_test.go new file mode 100644 index 000000000..385bd70b7 --- /dev/null +++ b/event/event_test.go @@ -0,0 +1,161 @@ +package event + +import ( + "math/rand" + "sync" + "testing" + "time" +) + +type testEvent int + +func TestSub(t *testing.T) { + mux := NewTypeMux() + defer mux.Stop() + + sub := mux.Subscribe(testEvent(0)) + go func() { + if err := mux.Post(testEvent(5)); err != nil { + t.Errorf("Post returned unexpected error: %v", err) + } + }() + ev := <-sub.Chan() + + if ev.(testEvent) != testEvent(5) { + t.Errorf("Got %v (%T), expected event %v (%T)", + ev, ev, testEvent(5), testEvent(5)) + } +} + +func TestMuxErrorAfterStop(t *testing.T) { + mux := NewTypeMux() + mux.Stop() + + sub := mux.Subscribe(testEvent(0)) + if _, isopen := <-sub.Chan(); isopen { + t.Errorf("subscription channel was not closed") + } + if err := mux.Post(testEvent(0)); err != ErrMuxClosed { + t.Errorf("Post error mismatch, got: %s, expected: %s", err, ErrMuxClosed) + } +} + +func TestUnsubscribeUnblockPost(t *testing.T) { + mux := NewTypeMux() + defer mux.Stop() + + sub := mux.Subscribe(testEvent(0)) + unblocked := make(chan bool) + go func() { + mux.Post(testEvent(5)) + unblocked <- true + }() + + select { + case <-unblocked: + t.Errorf("Post returned before Unsubscribe") + default: + sub.Unsubscribe() + <-unblocked + } +} + +func TestMuxConcurrent(t *testing.T) { + rand.Seed(time.Now().Unix()) + mux := NewTypeMux() + defer mux.Stop() + + recv := make(chan int) + poster := func() { + for { + err := mux.Post(testEvent(0)) + if err != nil { + return + } + } + } + sub := func(i int) { + time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond) + sub := mux.Subscribe(testEvent(0)) + <-sub.Chan() + sub.Unsubscribe() + recv <- i + } + + go poster() + go poster() + go poster() + nsubs := 1000 + for i := 0; i < nsubs; i++ { + go sub(i) + } + + // wait until everyone has been served + counts := make(map[int]int, nsubs) + for i := 0; i < nsubs; i++ { + counts[<-recv]++ + } + for i, count := range counts { + if count != 1 { + t.Errorf("receiver %d called %d times, expected only 1 call", i, count) + } + } +} + +func emptySubscriber(mux *TypeMux, types ...interface{}) { + s := mux.Subscribe(testEvent(0)) + go func() { + for _ = range s.Chan() { + } + }() +} + +func BenchmarkPost3(b *testing.B) { + var mux = NewTypeMux() + defer mux.Stop() + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + + for i := 0; i < b.N; i++ { + mux.Post(testEvent(0)) + } +} + +func BenchmarkPostConcurrent(b *testing.B) { + var mux = NewTypeMux() + defer mux.Stop() + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + emptySubscriber(mux, testEvent(0)) + + var wg sync.WaitGroup + poster := func() { + for i := 0; i < b.N; i++ { + mux.Post(testEvent(0)) + } + wg.Done() + } + wg.Add(5) + for i := 0; i < 5; i++ { + go poster() + } + wg.Wait() +} + +// for comparison +func BenchmarkChanSend(b *testing.B) { + c := make(chan interface{}) + closed := make(chan struct{}) + go func() { + for _ = range c { + } + }() + + for i := 0; i < b.N; i++ { + select { + case c <- i: + case <-closed: + } + } +} -- cgit v1.2.3 From dac4a8f113b35c67349115115af17c7f1874d939 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 16 Oct 2014 18:07:27 +0200 Subject: event: add some documentation --- event/event.go | 9 +++++++++ event/example_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100644 event/example_test.go diff --git a/event/event.go b/event/event.go index 74f8043da..09759ee50 100644 --- a/event/event.go +++ b/event/event.go @@ -7,8 +7,16 @@ import ( "sync" ) +// Subscription is implemented by event subscriptions. type Subscription interface { + // Chan returns a channel that carries events. + // Implementations should return the same channel + // for any subsequent calls to Chan. Chan() <-chan interface{} + + // Unsubscribe stops delivery of events to a subscription. + // The event channel is closed. + // Unsubscribe can be called more than once. Unsubscribe() } @@ -21,6 +29,7 @@ type TypeMux struct { stopped bool } +// ErrMuxClosed is returned when Posting on a closed TypeMux. var ErrMuxClosed = errors.New("event: mux closed") // NewTypeMux creates a running mux. diff --git a/event/example_test.go b/event/example_test.go new file mode 100644 index 000000000..2f47f6f27 --- /dev/null +++ b/event/example_test.go @@ -0,0 +1,42 @@ +package event + +import "fmt" + +func ExampleTypeMux() { + type someEvent struct{ I int } + type otherEvent struct{ S string } + type yetAnotherEvent struct{ X, Y int } + + var mux TypeMux + + // Start a subscriber. + done := make(chan struct{}) + sub := mux.Subscribe(someEvent{}, otherEvent{}) + go func() { + for event := range sub.Chan() { + fmt.Printf("Received: %#v\n", event) + } + fmt.Println("done") + close(done) + }() + + // Post some events. + mux.Post(someEvent{5}) + mux.Post(yetAnotherEvent{X: 3, Y: 4}) + mux.Post(someEvent{6}) + mux.Post(otherEvent{"whoa"}) + + // Stop closes all subscription channels. + // The subscriber goroutine will print "done" + // and exit. + mux.Stop() + + // Wait for subscriber to return. + <-done + + // Output: + // Received: event.someEvent{I:5} + // Received: event.someEvent{I:6} + // Received: event.otherEvent{S:"whoa"} + // done +} -- cgit v1.2.3 From 10bbf265b2e8f1906602d2604f755241b8eb49e6 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 16 Oct 2014 18:08:48 +0200 Subject: event: make Unsubscribe idempotent --- event/event.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/event/event.go b/event/event.go index 09759ee50..344d1e3f6 100644 --- a/event/event.go +++ b/event/event.go @@ -124,14 +124,16 @@ func posdelete(slice []*muxsub, pos int) []*muxsub { type muxsub struct { mux *TypeMux - mutex sync.RWMutex + closeMu sync.Mutex closing chan struct{} + closed bool // these two are the same channel. they are stored separately so // postC can be set to nil without affecting the return value of // Chan. - readC <-chan interface{} - postC chan<- interface{} + postMu sync.RWMutex + readC <-chan interface{} + postC chan<- interface{} } func newsub(mux *TypeMux) *muxsub { @@ -154,18 +156,25 @@ func (s *muxsub) Unsubscribe() { } func (s *muxsub) closewait() { + s.closeMu.Lock() + defer s.closeMu.Unlock() + if s.closed { + return + } close(s.closing) - s.mutex.Lock() + s.closed = true + + s.postMu.Lock() close(s.postC) s.postC = nil - s.mutex.Unlock() + s.postMu.Unlock() } func (s *muxsub) deliver(ev interface{}) { - s.mutex.RLock() + s.postMu.RLock() select { case s.postC <- ev: case <-s.closing: } - s.mutex.RUnlock() + s.postMu.RUnlock() } -- cgit v1.2.3 From 690690489610352d43f8547744b6c9486ad5affa Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 16 Oct 2014 18:10:09 +0200 Subject: event: make TypeMux zero value ready to use --- event/event.go | 13 ++++++------- event/event_test.go | 12 ++++++------ 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/event/event.go b/event/event.go index 344d1e3f6..d11a0e9bd 100644 --- a/event/event.go +++ b/event/event.go @@ -23,6 +23,8 @@ type Subscription interface { // A TypeMux dispatches events to registered receivers. Receivers can be // registered to handle events of certain type. Any operation // called after mux is stopped will return ErrMuxClosed. +// +// The zero value is ready to use. type TypeMux struct { mutex sync.RWMutex subm map[reflect.Type][]*muxsub @@ -32,11 +34,6 @@ type TypeMux struct { // ErrMuxClosed is returned when Posting on a closed TypeMux. var ErrMuxClosed = errors.New("event: mux closed") -// NewTypeMux creates a running mux. -func NewTypeMux() *TypeMux { - return &TypeMux{subm: make(map[reflect.Type][]*muxsub)} -} - // Subscribe creates a subscription for events of the given types. The // subscription's channel is closed when it is unsubscribed // or the mux is closed. @@ -44,9 +41,11 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { sub := newsub(mux) mux.mutex.Lock() if mux.stopped { - mux.mutex.Unlock() close(sub.postC) } else { + if mux.subm == nil { + mux.subm = make(map[reflect.Type][]*muxsub) + } for _, t := range types { rtyp := reflect.TypeOf(t) oldsubs := mux.subm[rtyp] @@ -55,8 +54,8 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { subs[len(oldsubs)] = sub mux.subm[rtyp] = subs } - mux.mutex.Unlock() } + mux.mutex.Unlock() return sub } diff --git a/event/event_test.go b/event/event_test.go index 385bd70b7..f65aaa0a2 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -10,7 +10,7 @@ import ( type testEvent int func TestSub(t *testing.T) { - mux := NewTypeMux() + mux := new(TypeMux) defer mux.Stop() sub := mux.Subscribe(testEvent(0)) @@ -28,7 +28,7 @@ func TestSub(t *testing.T) { } func TestMuxErrorAfterStop(t *testing.T) { - mux := NewTypeMux() + mux := new(TypeMux) mux.Stop() sub := mux.Subscribe(testEvent(0)) @@ -41,7 +41,7 @@ func TestMuxErrorAfterStop(t *testing.T) { } func TestUnsubscribeUnblockPost(t *testing.T) { - mux := NewTypeMux() + mux := new(TypeMux) defer mux.Stop() sub := mux.Subscribe(testEvent(0)) @@ -62,7 +62,7 @@ func TestUnsubscribeUnblockPost(t *testing.T) { func TestMuxConcurrent(t *testing.T) { rand.Seed(time.Now().Unix()) - mux := NewTypeMux() + mux := new(TypeMux) defer mux.Stop() recv := make(chan int) @@ -111,7 +111,7 @@ func emptySubscriber(mux *TypeMux, types ...interface{}) { } func BenchmarkPost3(b *testing.B) { - var mux = NewTypeMux() + var mux = new(TypeMux) defer mux.Stop() emptySubscriber(mux, testEvent(0)) emptySubscriber(mux, testEvent(0)) @@ -123,7 +123,7 @@ func BenchmarkPost3(b *testing.B) { } func BenchmarkPostConcurrent(b *testing.B) { - var mux = NewTypeMux() + var mux = new(TypeMux) defer mux.Stop() emptySubscriber(mux, testEvent(0)) emptySubscriber(mux, testEvent(0)) -- cgit v1.2.3 From 36cdab206849c7e363e0b9911553098c3e8ca644 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 14 Oct 2014 01:58:31 +0200 Subject: all: use (blocking) event package instead of ethreact --- ethchain/dagger.go | 7 +-- ethchain/events.go | 10 ++++ ethchain/state_manager.go | 55 +++++++---------- ethchain/transaction_pool.go | 3 +- ethereum.go | 74 +++++++++-------------- ethminer/miner.go | 137 ++++++++++++++++++++++--------------------- events.go | 11 ++++ peer.go | 2 +- 8 files changed, 144 insertions(+), 155 deletions(-) create mode 100644 ethchain/events.go create mode 100644 events.go diff --git a/ethchain/dagger.go b/ethchain/dagger.go index 916d7e9c8..2d2b5720f 100644 --- a/ethchain/dagger.go +++ b/ethchain/dagger.go @@ -8,7 +8,6 @@ import ( "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethutil" "github.com/obscuren/sha3" ) @@ -16,7 +15,7 @@ import ( var powlogger = ethlog.NewLogger("POW") type PoW interface { - Search(block *Block, reactChan chan ethreact.Event) []byte + Search(block *Block, stop <-chan struct{}) []byte Verify(hash []byte, diff *big.Int, nonce []byte) bool GetHashrate() int64 Turbo(bool) @@ -36,7 +35,7 @@ func (pow *EasyPow) Turbo(on bool) { pow.turbo = on } -func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte { +func (pow *EasyPow) Search(block *Block, stop <-chan struct{}) []byte { r := rand.New(rand.NewSource(time.Now().UnixNano())) hash := block.HashNoNonce() diff := block.Difficulty @@ -46,7 +45,7 @@ func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte { for { select { - case <-reactChan: + case <-stop: powlogger.Infoln("Breaking from mining") return nil default: diff --git a/ethchain/events.go b/ethchain/events.go new file mode 100644 index 000000000..05c21edfe --- /dev/null +++ b/ethchain/events.go @@ -0,0 +1,10 @@ +package ethchain + +type TxEvent struct { + Type int // TxPre || TxPost + Tx *Transaction +} + +type NewBlockEvent struct { + Block *Block +} diff --git a/ethchain/state_manager.go b/ethchain/state_manager.go index 589b99ac2..b71cbe8a1 100644 --- a/ethchain/state_manager.go +++ b/ethchain/state_manager.go @@ -11,11 +11,10 @@ import ( "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" - "github.com/ethereum/eth-go/eventer" + "github.com/ethereum/eth-go/event" ) var statelogger = ethlog.NewLogger("STATE") @@ -37,7 +36,6 @@ type EthManager interface { BlockChain() *BlockChain TxPool() *TxPool Broadcast(msgType ethwire.MsgType, data []interface{}) - Reactor() *ethreact.ReactorEngine PeerCount() int IsMining() bool IsListening() bool @@ -45,7 +43,7 @@ type EthManager interface { KeyManager() *ethcrypto.KeyManager ClientIdentity() ethwire.ClientIdentity Db() ethutil.Database - Eventer() *eventer.EventMachine + EventMux() *event.TypeMux } type StateManager struct { @@ -73,17 +71,15 @@ type StateManager struct { // 'Process' & canonical validation. lastAttemptedBlock *Block - // Quit chan - quit chan bool + events event.Subscription } func NewStateManager(ethereum EthManager) *StateManager { sm := &StateManager{ - mem: make(map[string]*big.Int), - Pow: &EasyPow{}, - eth: ethereum, - bc: ethereum.BlockChain(), - quit: make(chan bool), + mem: make(map[string]*big.Int), + Pow: &EasyPow{}, + eth: ethereum, + bc: ethereum.BlockChain(), } sm.transState = ethereum.BlockChain().CurrentBlock.State().Copy() sm.miningState = ethereum.BlockChain().CurrentBlock.State().Copy() @@ -93,36 +89,25 @@ func NewStateManager(ethereum EthManager) *StateManager { func (self *StateManager) Start() { statelogger.Debugln("Starting state manager") - + self.events = self.eth.EventMux().Subscribe(Blocks(nil)) go self.updateThread() } func (self *StateManager) Stop() { statelogger.Debugln("Stopping state manager") - - close(self.quit) + self.events.Unsubscribe() } func (self *StateManager) updateThread() { - blockChan := self.eth.Eventer().Register("blocks") - -out: - for { - select { - case event := <-blockChan: - blocks := event.Data.(Blocks) - for _, block := range blocks { - err := self.Process(block, false) - if err != nil { - statelogger.Infoln(err) - statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) - statelogger.Debugln(block) - break - } + for ev := range self.events.Chan() { + for _, block := range ev.(Blocks) { + err := self.Process(block, false) + if err != nil { + statelogger.Infoln(err) + statelogger.Debugf("Block #%v failed (%x...)\n", block.Number, block.Hash()[0:4]) + statelogger.Debugln(block) + break } - - case <-self.quit: - break out } } } @@ -202,7 +187,7 @@ done: } // Notify all subscribers - self.eth.Reactor().Post("newTx:post", tx) + self.eth.EventMux().Post(TxEvent{TxPost, tx}) receipts = append(receipts, receipt) handled = append(handled, tx) @@ -293,7 +278,7 @@ func (sm *StateManager) Process(block *Block, dontReact bool) (err error) { statelogger.Infof("Imported block #%d (%x...)\n", block.Number, block.Hash()[0:4]) if dontReact == false { - sm.eth.Reactor().Post("newBlock", block) + sm.eth.EventMux().Post(NewBlockEvent{block}) state.Manifest().Reset() } @@ -434,7 +419,7 @@ func (sm *StateManager) createBloomFilter(state *ethstate.State) *BloomFilter { bloomf.Set(msg.From) } - sm.eth.Reactor().Post("messages", state.Manifest().Messages) + sm.eth.EventMux().Post(state.Manifest().Messages) return bloomf } diff --git a/ethchain/transaction_pool.go b/ethchain/transaction_pool.go index da6c3d6ba..0676af3a3 100644 --- a/ethchain/transaction_pool.go +++ b/ethchain/transaction_pool.go @@ -24,6 +24,7 @@ type TxMsgTy byte const ( TxPre = iota TxPost + minGasPrice = 1000000 ) @@ -160,7 +161,7 @@ out: txplogger.Debugf("(t) %x => %x (%v) %x\n", tx.Sender()[:4], tmp, tx.Value, tx.Hash()) // Notify the subscribers - pool.Ethereum.Reactor().Post("newTx:pre", tx) + pool.Ethereum.EventMux().Post(TxEvent{TxPre, tx}) } case <-pool.quit: break out diff --git a/ethereum.go b/ethereum.go index 204f30bec..750ca8f03 100644 --- a/ethereum.go +++ b/ethereum.go @@ -17,12 +17,11 @@ import ( "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethrpc" "github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" - "github.com/ethereum/eth-go/eventer" + "github.com/ethereum/eth-go/event" ) const ( @@ -60,7 +59,7 @@ type Ethereum struct { // The block pool blockPool *BlockPool // Eventer - eventer *eventer.EventMachine + eventMux *event.TypeMux // Peers peers *list.List // Nonce @@ -85,8 +84,6 @@ type Ethereum struct { listening bool - reactor *ethreact.ReactorEngine - RpcServer *ethrpc.JsonRpcServer keyManager *ethcrypto.KeyManager @@ -129,8 +126,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager isUpToDate: true, filters: make(map[int]*ethchain.Filter), } - ethereum.reactor = ethreact.New() - ethereum.eventer = eventer.New() + ethereum.eventMux = event.NewTypeMux() ethereum.blockPool = NewBlockPool(ethereum) ethereum.txPool = ethchain.NewTxPool(ethereum) @@ -143,10 +139,6 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager return ethereum, nil } -func (s *Ethereum) Reactor() *ethreact.ReactorEngine { - return s.reactor -} - func (s *Ethereum) KeyManager() *ethcrypto.KeyManager { return s.keyManager } @@ -169,8 +161,8 @@ func (s *Ethereum) TxPool() *ethchain.TxPool { func (s *Ethereum) BlockPool() *BlockPool { return s.blockPool } -func (s *Ethereum) Eventer() *eventer.EventMachine { - return s.eventer +func (s *Ethereum) EventMux() *event.TypeMux { + return s.eventMux } func (self *Ethereum) Db() ethutil.Database { return self.db @@ -376,7 +368,7 @@ func (s *Ethereum) removePeerElement(e *list.Element) { s.peers.Remove(e) - s.reactor.Post("peerList", s.peers) + s.eventMux.Post(PeerListEvent{s.peers}) } func (s *Ethereum) RemovePeer(p *Peer) { @@ -400,7 +392,6 @@ func (s *Ethereum) reapDeadPeerHandler() { // Start the ethereum func (s *Ethereum) Start(seed bool) { - s.reactor.Start() s.blockPool.Start() s.stateManager.Start() @@ -524,8 +515,7 @@ func (s *Ethereum) Stop() { } s.txPool.Stop() s.stateManager.Stop() - s.reactor.Flush() - s.reactor.Stop() + s.eventMux.Stop() s.blockPool.Stop() ethlogger.Infoln("Server stopped") @@ -584,10 +574,10 @@ out: select { case <-upToDateTimer.C: if self.IsUpToDate() && !self.isUpToDate { - self.reactor.Post("chainSync", false) + self.eventMux.Post(ChainSyncEvent{false}) self.isUpToDate = true } else if !self.IsUpToDate() && self.isUpToDate { - self.reactor.Post("chainSync", true) + self.eventMux.Post(ChainSyncEvent{true}) self.isUpToDate = false } case <-self.quit: @@ -623,40 +613,30 @@ func (self *Ethereum) GetFilter(id int) *ethchain.Filter { } func (self *Ethereum) filterLoop() { - blockChan := make(chan ethreact.Event, 5) - messageChan := make(chan ethreact.Event, 5) // Subscribe to events - reactor := self.Reactor() - reactor.Subscribe("newBlock", blockChan) - reactor.Subscribe("messages", messageChan) -out: - for { - select { - case <-self.quit: - break out - case block := <-blockChan: - if block, ok := block.Resource.(*ethchain.Block); ok { - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.BlockCallback != nil { - filter.BlockCallback(block) - } + events := self.eventMux.Subscribe(ethchain.NewBlockEvent{}, ethstate.Messages(nil)) + for event := range events.Chan() { + switch event := event.(type) { + case ethchain.NewBlockEvent: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.BlockCallback != nil { + filter.BlockCallback(event.Block) } - self.filterMu.RUnlock() } - case msg := <-messageChan: - if messages, ok := msg.Resource.(ethstate.Messages); ok { - self.filterMu.RLock() - for _, filter := range self.filters { - if filter.MessageCallback != nil { - msgs := filter.FilterMessages(messages) - if len(msgs) > 0 { - filter.MessageCallback(msgs) - } + self.filterMu.RUnlock() + + case ethstate.Messages: + self.filterMu.RLock() + for _, filter := range self.filters { + if filter.MessageCallback != nil { + msgs := filter.FilterMessages(event) + if len(msgs) > 0 { + filter.MessageCallback(msgs) } } - self.filterMu.RUnlock() } + self.filterMu.RUnlock() } } } diff --git a/ethminer/miner.go b/ethminer/miner.go index 299a5204a..ffc49f096 100644 --- a/ethminer/miner.go +++ b/ethminer/miner.go @@ -6,27 +6,37 @@ import ( "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethwire" + "github.com/ethereum/eth-go/event" ) var logger = ethlog.NewLogger("MINER") type Miner struct { - pow ethchain.PoW - ethereum ethchain.EthManager - coinbase []byte - reactChan chan ethreact.Event - txs ethchain.Transactions - uncles []*ethchain.Block - block *ethchain.Block - powChan chan []byte - powQuitChan chan ethreact.Event - quitChan chan chan error + pow ethchain.PoW + ethereum ethchain.EthManager + coinbase []byte + txs ethchain.Transactions + uncles []*ethchain.Block + block *ethchain.Block + + events event.Subscription + powQuitChan chan struct{} + powDone chan struct{} turbo bool } +const ( + Started = iota + Stopped +) + +type Event struct { + Type int // Started || Stopped + Miner *Miner +} + func (self *Miner) GetPow() ethchain.PoW { return self.pow } @@ -48,46 +58,42 @@ func (self *Miner) ToggleTurbo() { } func (miner *Miner) Start() { - miner.reactChan = make(chan ethreact.Event, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in - miner.powChan = make(chan []byte, 1) // This is the channel that receives valid sha hashes for a given block - miner.powQuitChan = make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread - miner.quitChan = make(chan chan error, 1) // Insert initial TXs in our little miner 'pool' miner.txs = miner.ethereum.TxPool().Flush() miner.block = miner.ethereum.BlockChain().NewBlock(miner.coinbase) + mux := miner.ethereum.EventMux() + miner.events = mux.Subscribe(ethchain.NewBlockEvent{}, ethchain.TxEvent{}) + // Prepare inital block //miner.ethereum.StateManager().Prepare(miner.block.State(), miner.block.State()) go miner.listener() - reactor := miner.ethereum.Reactor() - reactor.Subscribe("newBlock", miner.reactChan) - reactor.Subscribe("newTx:pre", miner.reactChan) - - // We need the quit chan to be a Reactor event. - // The POW search method is actually blocking and if we don't - // listen to the reactor events inside of the pow itself - // The miner overseer will never get the reactor events themselves - // Only after the miner will find the sha - reactor.Subscribe("newBlock", miner.powQuitChan) - reactor.Subscribe("newTx:pre", miner.powQuitChan) - logger.Infoln("Started") + mux.Post(Event{Started, miner}) +} - reactor.Post("miner:start", miner) +func (miner *Miner) Stop() { + logger.Infoln("Stopping...") + miner.events.Unsubscribe() + miner.ethereum.EventMux().Post(Event{Stopped, miner}) } func (miner *Miner) listener() { for { + miner.startMining() + select { - case status := <-miner.quitChan: - logger.Infoln("Stopped") - status <- nil - return - case chanMessage := <-miner.reactChan: + case event, isopen := <-miner.events.Chan(): + miner.stopMining() + if !isopen { + return + } - if block, ok := chanMessage.Resource.(*ethchain.Block); ok { + switch event := event.(type) { + case ethchain.NewBlockEvent: + block := event.Block //logger.Infoln("Got new block via Reactor") if bytes.Compare(miner.ethereum.BlockChain().CurrentBlock.Hash(), block.Hash()) == 0 { // TODO: Perhaps continue mining to get some uncle rewards @@ -117,49 +123,44 @@ func (miner *Miner) listener() { miner.uncles = append(miner.uncles, block) } } - } - if tx, ok := chanMessage.Resource.(*ethchain.Transaction); ok { - found := false - for _, ctx := range miner.txs { - if found = bytes.Compare(ctx.Hash(), tx.Hash()) == 0; found { - break + case ethchain.TxEvent: + if event.Type == ethchain.TxPre { + found := false + for _, ctx := range miner.txs { + if found = bytes.Compare(ctx.Hash(), event.Tx.Hash()) == 0; found { + break + } + } + if found == false { + // Undo all previous commits + miner.block.Undo() + // Apply new transactions + miner.txs = append(miner.txs, event.Tx) } - - } - if found == false { - // Undo all previous commits - miner.block.Undo() - // Apply new transactions - miner.txs = append(miner.txs, tx) } } - default: - miner.mineNewBlock() + + case <-miner.powDone: + // next iteration will start mining again } } } -func (miner *Miner) Stop() { - logger.Infoln("Stopping...") - - miner.powQuitChan <- ethreact.Event{} - - status := make(chan error) - miner.quitChan <- status - <-status - - reactor := miner.ethereum.Reactor() - reactor.Unsubscribe("newBlock", miner.powQuitChan) - reactor.Unsubscribe("newTx:pre", miner.powQuitChan) - reactor.Unsubscribe("newBlock", miner.reactChan) - reactor.Unsubscribe("newTx:pre", miner.reactChan) +func (miner *Miner) startMining() { + if miner.powDone == nil { + miner.powDone = make(chan struct{}) + } + miner.powQuitChan = make(chan struct{}) + go miner.mineNewBlock() +} - reactor.Post("miner:stop", miner) +func (miner *Miner) stopMining() { + close(miner.powQuitChan) + <-miner.powDone } func (self *Miner) mineNewBlock() { - stateManager := self.ethereum.StateManager() self.block = self.ethereum.BlockChain().NewBlock(self.coinbase) @@ -195,8 +196,9 @@ func (self *Miner) mineNewBlock() { logger.Infof("Mining on block. Includes %v transactions", len(self.txs)) // Find a valid nonce - self.block.Nonce = self.pow.Search(self.block, self.powQuitChan) - if self.block.Nonce != nil { + nonce := self.pow.Search(self.block, self.powQuitChan) + if nonce != nil { + self.block.Nonce = nonce err := self.ethereum.StateManager().Process(self.block, false) if err != nil { logger.Infoln(err) @@ -208,4 +210,5 @@ func (self *Miner) mineNewBlock() { self.txs = self.ethereum.TxPool().CurrentTransactions() } } + self.powDone <- struct{}{} } diff --git a/events.go b/events.go new file mode 100644 index 000000000..5fff1d831 --- /dev/null +++ b/events.go @@ -0,0 +1,11 @@ +package eth + +import "container/list" + +type PeerListEvent struct { + Peers *list.List +} + +type ChainSyncEvent struct { + InSync bool +} diff --git a/peer.go b/peer.go index e9551e066..d66d082bb 100644 --- a/peer.go +++ b/peer.go @@ -802,7 +802,7 @@ func (p *Peer) handleHandshake(msg *ethwire.Msg) { p.versionKnown = true p.ethereum.PushPeer(p) - p.ethereum.reactor.Post("peerList", p.ethereum.Peers()) + p.ethereum.eventMux.Post(PeerListEvent{p.ethereum.Peers()}) p.protocolCaps = caps capsIt := caps.NewIterator() -- cgit v1.2.3 From 20cdb73862c6ae5af10dbaceba34c5073148235d Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 14 Oct 2014 02:01:46 +0200 Subject: ethchain: fix tests --- ethchain/filter_test.go | 2 +- ethchain/helper_test.go | 18 +++++++++++------- ethereum.go | 5 ++--- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/ethchain/filter_test.go b/ethchain/filter_test.go index 6dce51b3b..e569b3774 100644 --- a/ethchain/filter_test.go +++ b/ethchain/filter_test.go @@ -3,5 +3,5 @@ package ethchain import "testing" func TestFilter(t *testing.T) { - filter := NewFilter() + NewFilter(NewTestManager()) } diff --git a/ethchain/helper_test.go b/ethchain/helper_test.go index 75d7771fc..2da01d8a6 100644 --- a/ethchain/helper_test.go +++ b/ethchain/helper_test.go @@ -6,16 +6,17 @@ import ( "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethdb" - "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" + "github.com/ethereum/eth-go/event" ) // Implement our EthTest Manager type TestManager struct { stateManager *StateManager - reactor *ethreact.ReactorEngine + eventMux *event.TypeMux + db ethutil.Database txPool *TxPool blockChain *BlockChain Blocks []*Block @@ -49,8 +50,8 @@ func (tm *TestManager) StateManager() *StateManager { return tm.stateManager } -func (tm *TestManager) Reactor() *ethreact.ReactorEngine { - return tm.reactor +func (tm *TestManager) EventMux() *event.TypeMux { + return tm.eventMux } func (tm *TestManager) Broadcast(msgType ethwire.MsgType, data []interface{}) { fmt.Println("Broadcast not implemented") @@ -63,7 +64,10 @@ func (tm *TestManager) KeyManager() *ethcrypto.KeyManager { return nil } -func (tm *TestManager) Db() ethutil.Database { return nil } +func (tm *TestManager) Db() ethutil.Database { + return tm.db +} + func NewTestManager() *TestManager { ethutil.ReadConfig(".ethtest", "/tmp/ethtest", "ETH") @@ -75,8 +79,8 @@ func NewTestManager() *TestManager { ethutil.Config.Db = db testManager := &TestManager{} - testManager.reactor = ethreact.New() - + testManager.eventMux = new(event.TypeMux) + testManager.db = db testManager.txPool = NewTxPool(testManager) testManager.blockChain = NewBlockChain(testManager) testManager.stateManager = NewStateManager(testManager) diff --git a/ethereum.go b/ethereum.go index 750ca8f03..e5f73d507 100644 --- a/ethereum.go +++ b/ethereum.go @@ -59,7 +59,7 @@ type Ethereum struct { // The block pool blockPool *BlockPool // Eventer - eventMux *event.TypeMux + eventMux event.TypeMux // Peers peers *list.List // Nonce @@ -126,7 +126,6 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager isUpToDate: true, filters: make(map[int]*ethchain.Filter), } - ethereum.eventMux = event.NewTypeMux() ethereum.blockPool = NewBlockPool(ethereum) ethereum.txPool = ethchain.NewTxPool(ethereum) @@ -162,7 +161,7 @@ func (s *Ethereum) BlockPool() *BlockPool { return s.blockPool } func (s *Ethereum) EventMux() *event.TypeMux { - return s.eventMux + return &s.eventMux } func (self *Ethereum) Db() ethutil.Database { return self.db -- cgit v1.2.3 From 28570ef109d6a0f66e419165c985509bb01dc6bd Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 14 Oct 2014 02:17:50 +0200 Subject: eventer: remove package --- eventer/eventer.go | 83 ----------------------------------- eventer/eventer_test.go | 113 ------------------------------------------------ 2 files changed, 196 deletions(-) delete mode 100644 eventer/eventer.go delete mode 100644 eventer/eventer_test.go diff --git a/eventer/eventer.go b/eventer/eventer.go deleted file mode 100644 index 6e5ee2ec5..000000000 --- a/eventer/eventer.go +++ /dev/null @@ -1,83 +0,0 @@ -package eventer - -import "sync" - -// Basic receiver interface. -type Receiver interface { - Send(Event) -} - -// Receiver as channel -type Channel chan Event - -func (self Channel) Send(ev Event) { - self <- ev -} - -// Receiver as function -type Function func(ev Event) - -func (self Function) Send(ev Event) { - self(ev) -} - -type Event struct { - Type string - Data interface{} -} - -type Channels map[string][]Receiver - -type EventMachine struct { - mu sync.RWMutex - channels Channels -} - -func New() *EventMachine { - return &EventMachine{channels: make(Channels)} -} - -func (self *EventMachine) add(typ string, r Receiver) { - self.mu.Lock() - self.channels[typ] = append(self.channels[typ], r) - self.mu.Unlock() -} - -// Generalised methods for the known receiver types -// * Channel -// * Function -func (self *EventMachine) On(typ string, r interface{}) { - if eventFunc, ok := r.(func(Event)); ok { - self.RegisterFunc(typ, eventFunc) - } else if eventChan, ok := r.(Channel); ok { - self.RegisterChannel(typ, eventChan) - } else { - panic("Invalid type for EventMachine::On") - } -} - -func (self *EventMachine) RegisterChannel(typ string, c Channel) { - self.add(typ, c) -} - -func (self *EventMachine) RegisterFunc(typ string, f Function) { - self.add(typ, f) -} - -func (self *EventMachine) Register(typ string) Channel { - c := make(Channel, 1) - self.add(typ, c) - return c -} - -func (self *EventMachine) Post(typ string, data interface{}) { - self.mu.RLock() - if self.channels[typ] != nil { - ev := Event{typ, data} - for _, receiver := range self.channels[typ] { - // Blocking is OK. These are internals and need to be handled - receiver.Send(ev) - } - } - self.mu.RUnlock() -} diff --git a/eventer/eventer_test.go b/eventer/eventer_test.go deleted file mode 100644 index a5db6d901..000000000 --- a/eventer/eventer_test.go +++ /dev/null @@ -1,113 +0,0 @@ -package eventer - -import ( - "math/rand" - "testing" - "time" -) - -func TestChannel(t *testing.T) { - eventer := New() - - c := make(Channel, 1) - eventer.RegisterChannel("test", c) - eventer.Post("test", "hello world") - - res := <-c - - if res.Data.(string) != "hello world" { - t.Error("Expected event with data 'hello world'. Got", res.Data) - } -} - -func TestFunction(t *testing.T) { - eventer := New() - - var data string - eventer.RegisterFunc("test", func(ev Event) { - data = ev.Data.(string) - }) - eventer.Post("test", "hello world") - - if data != "hello world" { - t.Error("Expected event with data 'hello world'. Got", data) - } -} - -func TestRegister(t *testing.T) { - eventer := New() - - c := eventer.Register("test") - eventer.Post("test", "hello world") - - res := <-c - - if res.Data.(string) != "hello world" { - t.Error("Expected event with data 'hello world'. Got", res.Data) - } -} - -func TestOn(t *testing.T) { - eventer := New() - - c := make(Channel, 1) - eventer.On("test", c) - - var data string - eventer.On("test", func(ev Event) { - data = ev.Data.(string) - }) - eventer.Post("test", "hello world") - - res := <-c - if res.Data.(string) != "hello world" { - t.Error("Expected channel event with data 'hello world'. Got", res.Data) - } - - if data != "hello world" { - t.Error("Expected function event with data 'hello world'. Got", data) - } -} - -func TestConcurrentUsage(t *testing.T) { - rand.Seed(time.Now().Unix()) - eventer := New() - stop := make(chan struct{}) - recv := make(chan int) - poster := func() { - for { - select { - case <-stop: - return - default: - eventer.Post("test", "hi") - } - } - } - listener := func(i int) { - time.Sleep(time.Duration(rand.Intn(99)) * time.Millisecond) - c := eventer.Register("test") - // wait for the first event - <-c - recv <- i - // keep receiving to prevent deadlock - for { - select { - case <-stop: - return - case <-c: - } - } - } - - nlisteners := 200 - go poster() - for i := 0; i < nlisteners; i++ { - go listener(i) - } - // wait until everyone has been served - for i := 0; i < nlisteners; i++ { - <-recv - } - close(stop) -} -- cgit v1.2.3 From ade980912da4afb7d92f845b8d41955851791dc9 Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Tue, 14 Oct 2014 02:18:12 +0200 Subject: ethreact: remove package --- ethreact/README.md | 28 -------- ethreact/reactor.go | 183 ----------------------------------------------- ethreact/reactor_test.go | 63 ---------------- 3 files changed, 274 deletions(-) delete mode 100644 ethreact/README.md delete mode 100644 ethreact/reactor.go delete mode 100644 ethreact/reactor_test.go diff --git a/ethreact/README.md b/ethreact/README.md deleted file mode 100644 index 61af8a572..000000000 --- a/ethreact/README.md +++ /dev/null @@ -1,28 +0,0 @@ -## Reactor - -Reactor is the internal broadcast engine that allows components to be notified of ethereum stack events such as finding new blocks or change in state. -Event notification is handled via subscription: - - var blockChan = make(chan ethreact.Event, 10) - reactor.Subscribe("newBlock", blockChan) - -ethreact.Event broadcast on the channel are - - type Event struct { - Resource interface{} - Name string - } - -Resource is polimorphic depending on the event type and should be typecast before use, e.g: - - b := <-blockChan: - block := b.Resource.(*ethchain.Block) - -Events are guaranteed to be broadcast in order but the broadcast never blocks or leaks which means while the subscribing event channel is blocked (e.g., full if buffered) further messages will be skipped. - -The engine allows arbitrary events to be posted and subscribed to. - - ethereum.Reactor().Post("newBlock", newBlock) - - - \ No newline at end of file diff --git a/ethreact/reactor.go b/ethreact/reactor.go deleted file mode 100644 index 2edcbbbd9..000000000 --- a/ethreact/reactor.go +++ /dev/null @@ -1,183 +0,0 @@ -package ethreact - -import ( - "sync" - - "github.com/ethereum/eth-go/ethlog" -) - -var logger = ethlog.NewLogger("REACTOR") - -const ( - eventBufferSize int = 10 -) - -type EventHandler struct { - lock sync.RWMutex - name string - chans []chan Event -} - -// Post the Event with the reactor resource on the channels -// currently subscribed to the event -func (e *EventHandler) Post(event Event) { - e.lock.RLock() - defer e.lock.RUnlock() - - // if we want to preserve order pushing to subscibed channels - // dispatching should be syncrounous - // this means if subscribed event channel is blocked - // the reactor dispatch will be blocked, so we need to mitigate by skipping - // rogue blocking subscribers - for i, ch := range e.chans { - select { - case ch <- event: - default: - logger.Debugf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name) - } - } -} - -// Add a subscriber to this event -func (e *EventHandler) Add(ch chan Event) { - e.lock.Lock() - defer e.lock.Unlock() - - e.chans = append(e.chans, ch) -} - -// Remove a subscriber -func (e *EventHandler) Remove(ch chan Event) int { - e.lock.Lock() - defer e.lock.Unlock() - - for i, c := range e.chans { - if c == ch { - e.chans = append(e.chans[:i], e.chans[i+1:]...) - } - } - return len(e.chans) -} - -// Basic reactor event -type Event struct { - Resource interface{} - Name string -} - -// The reactor basic engine. Acts as bridge -// between the events and the subscribers/posters -type ReactorEngine struct { - lock sync.RWMutex - eventChannel chan Event - eventHandlers map[string]*EventHandler - quit chan chan error - running bool - drained chan bool -} - -func New() *ReactorEngine { - return &ReactorEngine{ - eventHandlers: make(map[string]*EventHandler), - eventChannel: make(chan Event, eventBufferSize), - quit: make(chan chan error, 1), - drained: make(chan bool, 1), - } -} - -func (reactor *ReactorEngine) Start() { - reactor.lock.Lock() - defer reactor.lock.Unlock() - if !reactor.running { - go func() { - for { - select { - case status := <-reactor.quit: - reactor.lock.Lock() - defer reactor.lock.Unlock() - reactor.running = false - logger.Infoln("stopped") - status <- nil - return - case event := <-reactor.eventChannel: - // needs to be called syncronously to keep order of events - reactor.dispatch(event) - default: - reactor.drained <- true // blocking till message is coming in - } - } - }() - reactor.running = true - logger.Infoln("started") - } -} - -func (reactor *ReactorEngine) Stop() { - if reactor.running { - status := make(chan error) - reactor.quit <- status - select { - case <-reactor.drained: - default: - } - <-status - } -} - -func (reactor *ReactorEngine) Flush() { - <-reactor.drained -} - -// Subscribe a channel to the specified event -func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) { - reactor.lock.Lock() - defer reactor.lock.Unlock() - - eventHandler := reactor.eventHandlers[event] - // Create a new event handler if one isn't available - if eventHandler == nil { - eventHandler = &EventHandler{name: event} - reactor.eventHandlers[event] = eventHandler - } - // Add the events channel to reactor event handler - eventHandler.Add(eventChannel) - logger.Debugf("added new subscription to %s", event) -} - -func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) { - reactor.lock.Lock() - defer reactor.lock.Unlock() - - eventHandler := reactor.eventHandlers[event] - if eventHandler != nil { - len := eventHandler.Remove(eventChannel) - if len == 0 { - reactor.eventHandlers[event] = nil - } - logger.Debugf("removed subscription to %s", event) - } -} - -func (reactor *ReactorEngine) Post(event string, resource interface{}) { - reactor.lock.Lock() - defer reactor.lock.Unlock() - - if reactor.running { - reactor.eventChannel <- Event{Resource: resource, Name: event} - select { - case <-reactor.drained: - default: - } - } -} - -func (reactor *ReactorEngine) dispatch(event Event) { - name := event.Name - eventHandler := reactor.eventHandlers[name] - // if no subscriptions to this event type - no event handler created - // then noone to notify - if eventHandler != nil { - // needs to be called syncronously - eventHandler.Post(event) - } -} diff --git a/ethreact/reactor_test.go b/ethreact/reactor_test.go deleted file mode 100644 index 801a8abd0..000000000 --- a/ethreact/reactor_test.go +++ /dev/null @@ -1,63 +0,0 @@ -package ethreact - -import ( - "fmt" - "testing" -) - -func TestReactorAdd(t *testing.T) { - reactor := New() - ch := make(chan Event) - reactor.Subscribe("test", ch) - if reactor.eventHandlers["test"] == nil { - t.Error("Expected new eventHandler to be created") - } - reactor.Unsubscribe("test", ch) - if reactor.eventHandlers["test"] != nil { - t.Error("Expected eventHandler to be removed") - } -} - -func TestReactorEvent(t *testing.T) { - var name string - reactor := New() - // Buffer the channel, so it doesn't block for this test - cap := 20 - ch := make(chan Event, cap) - reactor.Subscribe("even", ch) - reactor.Subscribe("odd", ch) - reactor.Post("even", "disappears") // should not broadcast if engine not started - reactor.Start() - for i := 0; i < cap; i++ { - if i%2 == 0 { - name = "even" - } else { - name = "odd" - } - reactor.Post(name, i) - } - reactor.Post("test", cap) // this should not block - i := 0 - reactor.Flush() - close(ch) - for event := range ch { - fmt.Printf("%d: %v", i, event) - if i%2 == 0 { - name = "even" - } else { - name = "odd" - } - if val, ok := event.Resource.(int); ok { - if i != val || event.Name != name { - t.Error("Expected event %d to be of type %s and resource %d, got ", i, name, i, val) - } - } else { - t.Error("Unable to cast") - } - i++ - } - if i != cap { - t.Error("excpected exactly %d events, got ", i) - } - reactor.Stop() -} -- cgit v1.2.3 From fa84e50ddb8e64d4cb92d58e235cfed13761f21e Mon Sep 17 00:00:00 2001 From: Felix Lange Date: Thu, 16 Oct 2014 18:59:28 +0200 Subject: event: panic for duplicate type --- event/event.go | 6 +++++- event/event_test.go | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/event/event.go b/event/event.go index d11a0e9bd..540fbba65 100644 --- a/event/event.go +++ b/event/event.go @@ -3,6 +3,7 @@ package event import ( "errors" + "fmt" "reflect" "sync" ) @@ -40,6 +41,7 @@ var ErrMuxClosed = errors.New("event: mux closed") func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { sub := newsub(mux) mux.mutex.Lock() + defer mux.mutex.Unlock() if mux.stopped { close(sub.postC) } else { @@ -49,13 +51,15 @@ func (mux *TypeMux) Subscribe(types ...interface{}) Subscription { for _, t := range types { rtyp := reflect.TypeOf(t) oldsubs := mux.subm[rtyp] + if find(oldsubs, sub) != -1 { + panic(fmt.Sprintf("event: duplicate type %s in Subscribe", rtyp)) + } subs := make([]*muxsub, len(oldsubs)+1) copy(subs, oldsubs) subs[len(oldsubs)] = sub mux.subm[rtyp] = subs } } - mux.mutex.Unlock() return sub } diff --git a/event/event_test.go b/event/event_test.go index f65aaa0a2..c7c0266c1 100644 --- a/event/event_test.go +++ b/event/event_test.go @@ -60,6 +60,21 @@ func TestUnsubscribeUnblockPost(t *testing.T) { } } +func TestSubscribeDuplicateType(t *testing.T) { + mux := new(TypeMux) + expected := "event: duplicate type event.testEvent in Subscribe" + + defer func() { + err := recover() + if err == nil { + t.Errorf("Subscribe didn't panic for duplicate type") + } else if err != expected { + t.Errorf("panic mismatch: got %#v, expected %#v", err, expected) + } + }() + mux.Subscribe(testEvent(1), testEvent(2)) +} + func TestMuxConcurrent(t *testing.T) { rand.Seed(time.Now().Unix()) mux := new(TypeMux) -- cgit v1.2.3