aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2014-08-01 16:22:25 +0800
committerobscuren <geffobscura@gmail.com>2014-08-01 16:22:25 +0800
commit8bed47a2d4377b7a49f34350ae5c5ea50464b95d (patch)
tree3b1923c6560da3a08bfb88783e49b353f2609ea9
parent5a2d62e4d9e551f16f094216da70b7a6f5d2bf00 (diff)
parent9831619881c5264c2449ce1b906108d892b6e1e1 (diff)
downloadgo-tangerine-8bed47a2d4377b7a49f34350ae5c5ea50464b95d.tar
go-tangerine-8bed47a2d4377b7a49f34350ae5c5ea50464b95d.tar.gz
go-tangerine-8bed47a2d4377b7a49f34350ae5c5ea50464b95d.tar.bz2
go-tangerine-8bed47a2d4377b7a49f34350ae5c5ea50464b95d.tar.lz
go-tangerine-8bed47a2d4377b7a49f34350ae5c5ea50464b95d.tar.xz
go-tangerine-8bed47a2d4377b7a49f34350ae5c5ea50464b95d.tar.zst
go-tangerine-8bed47a2d4377b7a49f34350ae5c5ea50464b95d.zip
Merge branch 'feature/ethutil-refactor' of https://github.com/ethersphere/eth-go into ethersphere-feature/ethutil-refactor
-rw-r--r--ethchain/dagger.go5
-rw-r--r--ethchain/state_manager.go3
-rw-r--r--ethereum.go10
-rw-r--r--ethlog/loggers.go62
-rw-r--r--ethlog/loggers_test.go30
-rw-r--r--ethminer/miner.go85
-rw-r--r--ethreact/README.md28
-rw-r--r--ethreact/reactor.go182
-rw-r--r--ethreact/reactor_test.go63
-rw-r--r--ethutil/reactor.go87
-rw-r--r--ethutil/reactor_test.go30
11 files changed, 395 insertions, 190 deletions
diff --git a/ethchain/dagger.go b/ethchain/dagger.go
index dccd2ff5b..917b3d722 100644
--- a/ethchain/dagger.go
+++ b/ethchain/dagger.go
@@ -3,6 +3,7 @@ package ethchain
import (
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
+ "github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethutil"
"github.com/obscuren/sha3"
"hash"
@@ -14,7 +15,7 @@ import (
var powlogger = ethlog.NewLogger("POW")
type PoW interface {
- Search(block *Block, reactChan chan ethutil.React) []byte
+ Search(block *Block, reactChan chan ethreact.Event) []byte
Verify(hash []byte, diff *big.Int, nonce []byte) bool
GetHashrate() int64
}
@@ -28,7 +29,7 @@ func (pow *EasyPow) GetHashrate() int64 {
return pow.HashRate
}
-func (pow *EasyPow) Search(block *Block, reactChan chan ethutil.React) []byte {
+func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
hash := block.HashNoNonce()
diff := block.Difficulty
diff --git a/ethchain/state_manager.go b/ethchain/state_manager.go
index 9408cf331..226b2fe73 100644
--- a/ethchain/state_manager.go
+++ b/ethchain/state_manager.go
@@ -6,6 +6,7 @@ import (
"fmt"
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
+ "github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethstate"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
@@ -36,7 +37,7 @@ type EthManager interface {
BlockChain() *BlockChain
TxPool() *TxPool
Broadcast(msgType ethwire.MsgType, data []interface{})
- Reactor() *ethutil.ReactorEngine
+ Reactor() *ethreact.ReactorEngine
PeerCount() int
IsMining() bool
IsListening() bool
diff --git a/ethereum.go b/ethereum.go
index 395eba954..69bb93cff 100644
--- a/ethereum.go
+++ b/ethereum.go
@@ -14,6 +14,7 @@ import (
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
+ "github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethrpc"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
@@ -75,7 +76,7 @@ type Ethereum struct {
listening bool
- reactor *ethutil.ReactorEngine
+ reactor *ethreact.ReactorEngine
RpcServer *ethrpc.JsonRpcServer
@@ -113,7 +114,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
clientIdentity: clientIdentity,
isUpToDate: true,
}
- ethereum.reactor = ethutil.NewReactorEngine()
+ ethereum.reactor = ethreact.New()
ethereum.txPool = ethchain.NewTxPool(ethereum)
ethereum.blockChain = ethchain.NewBlockChain(ethereum)
@@ -125,7 +126,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
return ethereum, nil
}
-func (s *Ethereum) Reactor() *ethutil.ReactorEngine {
+func (s *Ethereum) Reactor() *ethreact.ReactorEngine {
return s.reactor
}
@@ -357,6 +358,7 @@ func (s *Ethereum) ReapDeadPeerHandler() {
// Start the ethereum
func (s *Ethereum) Start(seed bool) {
+ s.reactor.Start()
// Bind to addr and port
ln, err := net.Listen("tcp", ":"+s.Port)
if err != nil {
@@ -456,6 +458,8 @@ func (s *Ethereum) Stop() {
}
s.txPool.Stop()
s.stateManager.Stop()
+ s.reactor.Flush()
+ s.reactor.Stop()
ethlogger.Infoln("Server stopped")
close(s.shutdownChan)
diff --git a/ethlog/loggers.go b/ethlog/loggers.go
index 50de213b3..b2760534b 100644
--- a/ethlog/loggers.go
+++ b/ethlog/loggers.go
@@ -39,7 +39,9 @@ func (msg *logMessage) send(logger LogSystem) {
var logMessages chan (*logMessage)
var logSystems []LogSystem
-var quit chan bool
+var quit chan chan error
+var drained chan bool
+var mutex = sync.Mutex{}
type LogLevel uint8
@@ -52,34 +54,55 @@ const (
DebugDetailLevel
)
+func dispatch(msg *logMessage) {
+ for _, logSystem := range logSystems {
+ if logSystem.GetLogLevel() >= msg.LogLevel {
+ msg.send(logSystem)
+ }
+ }
+}
+
// log messages are dispatched to log writers
func start() {
-out:
for {
select {
+ case status := <-quit:
+ status <- nil
+ return
case msg := <-logMessages:
- for _, logSystem := range logSystems {
- if logSystem.GetLogLevel() >= msg.LogLevel {
- msg.send(logSystem)
- }
- }
- case <-quit:
- break out
+ dispatch(msg)
+ default:
+ drained <- true // this blocks until a message is sent to the queue
}
}
}
-// waits until log messages are drained (dispatched to log writers)
-func Flush() {
- quit <- true
+func send(msg *logMessage) {
+ logMessages <- msg
+ select {
+ case <-drained:
+ default:
+ }
+}
-done:
- for {
+func Reset() {
+ mutex.Lock()
+ defer mutex.Unlock()
+ if logSystems != nil {
+ status := make(chan error)
+ quit <- status
select {
- case <-logMessages:
+ case <-drained:
default:
- break done
}
+ <-status
+ }
+}
+
+// waits until log messages are drained (dispatched to log writers)
+func Flush() {
+ if logSystems != nil {
+ <-drained
}
}
@@ -97,7 +120,8 @@ func AddLogSystem(logSystem LogSystem) {
defer mutex.Unlock()
if logSystems == nil {
logMessages = make(chan *logMessage, 10)
- quit = make(chan bool, 1)
+ quit = make(chan chan error, 1)
+ drained = make(chan bool, 1)
go start()
}
logSystems = append(logSystems, logSystem)
@@ -106,14 +130,14 @@ func AddLogSystem(logSystem LogSystem) {
func (logger *Logger) sendln(level LogLevel, v ...interface{}) {
if logMessages != nil {
msg := newPrintlnLogMessage(level, logger.tag, v...)
- logMessages <- msg
+ send(msg)
}
}
func (logger *Logger) sendf(level LogLevel, format string, v ...interface{}) {
if logMessages != nil {
msg := newPrintfLogMessage(level, logger.tag, format, v...)
- logMessages <- msg
+ send(msg)
}
}
diff --git a/ethlog/loggers_test.go b/ethlog/loggers_test.go
index 89f416681..a9b1463e7 100644
--- a/ethlog/loggers_test.go
+++ b/ethlog/loggers_test.go
@@ -28,8 +28,19 @@ func (t *TestLogSystem) GetLogLevel() LogLevel {
return t.level
}
-func quote(s string) string {
- return fmt.Sprintf("'%s'", s)
+func TestLoggerFlush(t *testing.T) {
+ logger := NewLogger("TEST")
+ testLogSystem := &TestLogSystem{level: WarnLevel}
+ AddLogSystem(testLogSystem)
+ for i := 0; i < 5; i++ {
+ logger.Errorf(".")
+ }
+ Flush()
+ Reset()
+ output := testLogSystem.Output
+ if output != "[TEST] .[TEST] .[TEST] .[TEST] .[TEST] ." {
+ t.Error("Expected complete logger output '[TEST] .[TEST] .[TEST] .[TEST] .[TEST] .', got ", output)
+ }
}
func TestLoggerPrintln(t *testing.T) {
@@ -41,10 +52,11 @@ func TestLoggerPrintln(t *testing.T) {
logger.Infoln("info")
logger.Debugln("debug")
Flush()
+ Reset()
output := testLogSystem.Output
fmt.Println(quote(output))
if output != "[TEST] error\n[TEST] warn\n" {
- t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem.Output))
+ t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", output)
}
}
@@ -57,10 +69,10 @@ func TestLoggerPrintf(t *testing.T) {
logger.Infof("info")
logger.Debugf("debug")
Flush()
+ Reset()
output := testLogSystem.Output
- fmt.Println(quote(output))
if output != "[TEST] error to { 2}\n[TEST] warn" {
- t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", quote(testLogSystem.Output))
+ t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", output)
}
}
@@ -73,13 +85,14 @@ func TestMultipleLogSystems(t *testing.T) {
logger.Errorln("error")
logger.Warnln("warn")
Flush()
+ Reset()
output0 := testLogSystem0.Output
output1 := testLogSystem1.Output
if output0 != "[TEST] error\n" {
- t.Error("Expected logger 0 output '[TEST] error\\n', got ", quote(testLogSystem0.Output))
+ t.Error("Expected logger 0 output '[TEST] error\\n', got ", output0)
}
if output1 != "[TEST] error\n[TEST] warn\n" {
- t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem1.Output))
+ t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", output1)
}
}
@@ -94,9 +107,8 @@ func TestFileLogSystem(t *testing.T) {
Flush()
contents, _ := ioutil.ReadFile(filename)
output := string(contents)
- fmt.Println(quote(output))
if output != "[TEST] error to test.log\n[TEST] warn\n" {
- t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", quote(output))
+ t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", output)
} else {
os.Remove(filename)
}
diff --git a/ethminer/miner.go b/ethminer/miner.go
index bfea8e580..e51b37e05 100644
--- a/ethminer/miner.go
+++ b/ethminer/miner.go
@@ -4,7 +4,7 @@ import (
"bytes"
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethlog"
- "github.com/ethereum/eth-go/ethutil"
+ "github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethwire"
"sort"
)
@@ -15,13 +15,13 @@ type Miner struct {
pow ethchain.PoW
ethereum ethchain.EthManager
coinbase []byte
- reactChan chan ethutil.React
+ reactChan chan ethreact.Event
txs ethchain.Transactions
uncles []*ethchain.Block
block *ethchain.Block
powChan chan []byte
- powQuitChan chan ethutil.React
- quitChan chan bool
+ powQuitChan chan ethreact.Event
+ quitChan chan chan error
}
func (self *Miner) GetPow() ethchain.PoW {
@@ -29,55 +29,53 @@ func (self *Miner) GetPow() ethchain.PoW {
}
func NewDefaultMiner(coinbase []byte, ethereum ethchain.EthManager) *Miner {
- reactChan := make(chan ethutil.React, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in
- powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block
- powQuitChan := make(chan ethutil.React, 1) // This is the channel that can exit the miner thread
- quitChan := make(chan bool, 1)
-
- ethereum.Reactor().Subscribe("newBlock", reactChan)
- ethereum.Reactor().Subscribe("newTx:pre", reactChan)
-
- // We need the quit chan to be a Reactor event.
- // The POW search method is actually blocking and if we don't
- // listen to the reactor events inside of the pow itself
- // The miner overseer will never get the reactor events themselves
- // Only after the miner will find the sha
- ethereum.Reactor().Subscribe("newBlock", powQuitChan)
- ethereum.Reactor().Subscribe("newTx:pre", powQuitChan)
-
miner := Miner{
- pow: &ethchain.EasyPow{},
- ethereum: ethereum,
- coinbase: coinbase,
- reactChan: reactChan,
- powChan: powChan,
- powQuitChan: powQuitChan,
- quitChan: quitChan,
+ pow: &ethchain.EasyPow{},
+ ethereum: ethereum,
+ coinbase: coinbase,
}
- // Insert initial TXs in our little miner 'pool'
- miner.txs = ethereum.TxPool().Flush()
- miner.block = ethereum.BlockChain().NewBlock(miner.coinbase)
-
return &miner
}
func (miner *Miner) Start() {
+ miner.reactChan = make(chan ethreact.Event, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in
+ miner.powChan = make(chan []byte, 1) // This is the channel that receives valid sha hashes for a given block
+ miner.powQuitChan = make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread
+ miner.quitChan = make(chan chan error, 1)
+
+ // Insert initial TXs in our little miner 'pool'
+ miner.txs = miner.ethereum.TxPool().Flush()
+ miner.block = miner.ethereum.BlockChain().NewBlock(miner.coinbase)
+
// Prepare inital block
//miner.ethereum.StateManager().Prepare(miner.block.State(), miner.block.State())
go miner.listener()
+
+ reactor := miner.ethereum.Reactor()
+ reactor.Subscribe("newBlock", miner.reactChan)
+ reactor.Subscribe("newTx:pre", miner.reactChan)
+
+ // We need the quit chan to be a Reactor event.
+ // The POW search method is actually blocking and if we don't
+ // listen to the reactor events inside of the pow itself
+ // The miner overseer will never get the reactor events themselves
+ // Only after the miner will find the sha
+ reactor.Subscribe("newBlock", miner.powQuitChan)
+ reactor.Subscribe("newTx:pre", miner.powQuitChan)
+
logger.Infoln("Started")
- miner.ethereum.Reactor().Post("miner:start", miner)
+ reactor.Post("miner:start", miner)
}
func (miner *Miner) listener() {
-out:
for {
select {
- case <-miner.quitChan:
+ case status := <-miner.quitChan:
logger.Infoln("Stopped")
- break out
+ status <- nil
+ return
case chanMessage := <-miner.reactChan:
if block, ok := chanMessage.Resource.(*ethchain.Block); ok {
@@ -133,13 +131,22 @@ out:
}
}
-func (self *Miner) Stop() {
+func (miner *Miner) Stop() {
logger.Infoln("Stopping...")
- self.quitChan <- true
- self.powQuitChan <- ethutil.React{}
+ miner.powQuitChan <- ethreact.Event{}
+
+ status := make(chan error)
+ miner.quitChan <- status
+ <-status
+
+ reactor := miner.ethereum.Reactor()
+ reactor.Unsubscribe("newBlock", miner.powQuitChan)
+ reactor.Unsubscribe("newTx:pre", miner.powQuitChan)
+ reactor.Unsubscribe("newBlock", miner.reactChan)
+ reactor.Unsubscribe("newTx:pre", miner.reactChan)
- self.ethereum.Reactor().Post("miner:stop", self)
+ reactor.Post("miner:stop", miner)
}
func (self *Miner) mineNewBlock() {
diff --git a/ethreact/README.md b/ethreact/README.md
new file mode 100644
index 000000000..61af8a572
--- /dev/null
+++ b/ethreact/README.md
@@ -0,0 +1,28 @@
+## Reactor
+
+Reactor is the internal broadcast engine that allows components to be notified of ethereum stack events such as finding new blocks or change in state.
+Event notification is handled via subscription:
+
+ var blockChan = make(chan ethreact.Event, 10)
+ reactor.Subscribe("newBlock", blockChan)
+
+ethreact.Event broadcast on the channel are
+
+ type Event struct {
+ Resource interface{}
+ Name string
+ }
+
+Resource is polimorphic depending on the event type and should be typecast before use, e.g:
+
+ b := <-blockChan:
+ block := b.Resource.(*ethchain.Block)
+
+Events are guaranteed to be broadcast in order but the broadcast never blocks or leaks which means while the subscribing event channel is blocked (e.g., full if buffered) further messages will be skipped.
+
+The engine allows arbitrary events to be posted and subscribed to.
+
+ ethereum.Reactor().Post("newBlock", newBlock)
+
+
+ \ No newline at end of file
diff --git a/ethreact/reactor.go b/ethreact/reactor.go
new file mode 100644
index 000000000..7fe2356db
--- /dev/null
+++ b/ethreact/reactor.go
@@ -0,0 +1,182 @@
+package ethreact
+
+import (
+ "github.com/ethereum/eth-go/ethlog"
+ "sync"
+)
+
+var logger = ethlog.NewLogger("REACTOR")
+
+const (
+ eventBufferSize int = 10
+)
+
+type EventHandler struct {
+ lock sync.RWMutex
+ name string
+ chans []chan Event
+}
+
+// Post the Event with the reactor resource on the channels
+// currently subscribed to the event
+func (e *EventHandler) Post(event Event) {
+ e.lock.RLock()
+ defer e.lock.RUnlock()
+
+ // if we want to preserve order pushing to subscibed channels
+ // dispatching should be syncrounous
+ // this means if subscribed event channel is blocked
+ // the reactor dispatch will be blocked, so we need to mitigate by skipping
+ // rogue blocking subscribers
+ for i, ch := range e.chans {
+ select {
+ case ch <- event:
+ default:
+ logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name)
+ }
+ }
+}
+
+// Add a subscriber to this event
+func (e *EventHandler) Add(ch chan Event) {
+ e.lock.Lock()
+ defer e.lock.Unlock()
+
+ e.chans = append(e.chans, ch)
+}
+
+// Remove a subscriber
+func (e *EventHandler) Remove(ch chan Event) int {
+ e.lock.Lock()
+ defer e.lock.Unlock()
+
+ for i, c := range e.chans {
+ if c == ch {
+ e.chans = append(e.chans[:i], e.chans[i+1:]...)
+ }
+ }
+ return len(e.chans)
+}
+
+// Basic reactor event
+type Event struct {
+ Resource interface{}
+ Name string
+}
+
+// The reactor basic engine. Acts as bridge
+// between the events and the subscribers/posters
+type ReactorEngine struct {
+ lock sync.RWMutex
+ eventChannel chan Event
+ eventHandlers map[string]*EventHandler
+ quit chan chan error
+ running bool
+ drained chan bool
+}
+
+func New() *ReactorEngine {
+ return &ReactorEngine{
+ eventHandlers: make(map[string]*EventHandler),
+ eventChannel: make(chan Event, eventBufferSize),
+ quit: make(chan chan error, 1),
+ drained: make(chan bool, 1),
+ }
+}
+
+func (reactor *ReactorEngine) Start() {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+ if !reactor.running {
+ go func() {
+ for {
+ select {
+ case status := <-reactor.quit:
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+ reactor.running = false
+ logger.Infoln("stopped")
+ status <- nil
+ return
+ case event := <-reactor.eventChannel:
+ // needs to be called syncronously to keep order of events
+ reactor.dispatch(event)
+ default:
+ reactor.drained <- true // blocking till message is coming in
+ }
+ }
+ }()
+ reactor.running = true
+ logger.Infoln("started")
+ }
+}
+
+func (reactor *ReactorEngine) Stop() {
+ if reactor.running {
+ status := make(chan error)
+ reactor.quit <- status
+ select {
+ case <-reactor.drained:
+ default:
+ }
+ <-status
+ }
+}
+
+func (reactor *ReactorEngine) Flush() {
+ <-reactor.drained
+}
+
+// Subscribe a channel to the specified event
+func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+
+ eventHandler := reactor.eventHandlers[event]
+ // Create a new event handler if one isn't available
+ if eventHandler == nil {
+ eventHandler = &EventHandler{name: event}
+ reactor.eventHandlers[event] = eventHandler
+ }
+ // Add the events channel to reactor event handler
+ eventHandler.Add(eventChannel)
+ logger.Debugf("added new subscription to %s", event)
+}
+
+func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+
+ eventHandler := reactor.eventHandlers[event]
+ if eventHandler != nil {
+ len := eventHandler.Remove(eventChannel)
+ if len == 0 {
+ reactor.eventHandlers[event] = nil
+ }
+ logger.Debugf("removed subscription to %s", event)
+ }
+}
+
+func (reactor *ReactorEngine) Post(event string, resource interface{}) {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+
+ if reactor.running {
+ reactor.eventChannel <- Event{Resource: resource, Name: event}
+ select {
+ case <-reactor.drained:
+ default:
+ }
+ }
+}
+
+func (reactor *ReactorEngine) dispatch(event Event) {
+ name := event.Name
+ eventHandler := reactor.eventHandlers[name]
+ // if no subscriptions to this event type - no event handler created
+ // then noone to notify
+ if eventHandler != nil {
+ // needs to be called syncronously
+ eventHandler.Post(event)
+ }
+}
diff --git a/ethreact/reactor_test.go b/ethreact/reactor_test.go
new file mode 100644
index 000000000..801a8abd0
--- /dev/null
+++ b/ethreact/reactor_test.go
@@ -0,0 +1,63 @@
+package ethreact
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestReactorAdd(t *testing.T) {
+ reactor := New()
+ ch := make(chan Event)
+ reactor.Subscribe("test", ch)
+ if reactor.eventHandlers["test"] == nil {
+ t.Error("Expected new eventHandler to be created")
+ }
+ reactor.Unsubscribe("test", ch)
+ if reactor.eventHandlers["test"] != nil {
+ t.Error("Expected eventHandler to be removed")
+ }
+}
+
+func TestReactorEvent(t *testing.T) {
+ var name string
+ reactor := New()
+ // Buffer the channel, so it doesn't block for this test
+ cap := 20
+ ch := make(chan Event, cap)
+ reactor.Subscribe("even", ch)
+ reactor.Subscribe("odd", ch)
+ reactor.Post("even", "disappears") // should not broadcast if engine not started
+ reactor.Start()
+ for i := 0; i < cap; i++ {
+ if i%2 == 0 {
+ name = "even"
+ } else {
+ name = "odd"
+ }
+ reactor.Post(name, i)
+ }
+ reactor.Post("test", cap) // this should not block
+ i := 0
+ reactor.Flush()
+ close(ch)
+ for event := range ch {
+ fmt.Printf("%d: %v", i, event)
+ if i%2 == 0 {
+ name = "even"
+ } else {
+ name = "odd"
+ }
+ if val, ok := event.Resource.(int); ok {
+ if i != val || event.Name != name {
+ t.Error("Expected event %d to be of type %s and resource %d, got ", i, name, i, val)
+ }
+ } else {
+ t.Error("Unable to cast")
+ }
+ i++
+ }
+ if i != cap {
+ t.Error("excpected exactly %d events, got ", i)
+ }
+ reactor.Stop()
+}
diff --git a/ethutil/reactor.go b/ethutil/reactor.go
deleted file mode 100644
index 7cf145245..000000000
--- a/ethutil/reactor.go
+++ /dev/null
@@ -1,87 +0,0 @@
-package ethutil
-
-import (
- "sync"
-)
-
-type ReactorEvent struct {
- mut sync.Mutex
- event string
- chans []chan React
-}
-
-// Post the specified reactor resource on the channels
-// currently subscribed
-func (e *ReactorEvent) Post(react React) {
- e.mut.Lock()
- defer e.mut.Unlock()
-
- for _, ch := range e.chans {
- go func(ch chan React) {
- ch <- react
- }(ch)
- }
-}
-
-// Add a subscriber to this event
-func (e *ReactorEvent) Add(ch chan React) {
- e.mut.Lock()
- defer e.mut.Unlock()
-
- e.chans = append(e.chans, ch)
-}
-
-// Remove a subscriber
-func (e *ReactorEvent) Remove(ch chan React) {
- e.mut.Lock()
- defer e.mut.Unlock()
-
- for i, c := range e.chans {
- if c == ch {
- e.chans = append(e.chans[:i], e.chans[i+1:]...)
- }
- }
-}
-
-// Basic reactor resource
-type React struct {
- Resource interface{}
- Event string
-}
-
-// The reactor basic engine. Acts as bridge
-// between the events and the subscribers/posters
-type ReactorEngine struct {
- patterns map[string]*ReactorEvent
-}
-
-func NewReactorEngine() *ReactorEngine {
- return &ReactorEngine{patterns: make(map[string]*ReactorEvent)}
-}
-
-// Subscribe a channel to the specified event
-func (reactor *ReactorEngine) Subscribe(event string, ch chan React) {
- ev := reactor.patterns[event]
- // Create a new event if one isn't available
- if ev == nil {
- ev = &ReactorEvent{event: event}
- reactor.patterns[event] = ev
- }
-
- // Add the channel to reactor event handler
- ev.Add(ch)
-}
-
-func (reactor *ReactorEngine) Unsubscribe(event string, ch chan React) {
- ev := reactor.patterns[event]
- if ev != nil {
- ev.Remove(ch)
- }
-}
-
-func (reactor *ReactorEngine) Post(event string, resource interface{}) {
- ev := reactor.patterns[event]
- if ev != nil {
- ev.Post(React{Resource: resource, Event: event})
- }
-}
diff --git a/ethutil/reactor_test.go b/ethutil/reactor_test.go
deleted file mode 100644
index 48c2f0df3..000000000
--- a/ethutil/reactor_test.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package ethutil
-
-import "testing"
-
-func TestReactorAdd(t *testing.T) {
- engine := NewReactorEngine()
- ch := make(chan React)
- engine.Subscribe("test", ch)
- if len(engine.patterns) != 1 {
- t.Error("Expected patterns to be 1, got", len(engine.patterns))
- }
-}
-
-func TestReactorEvent(t *testing.T) {
- engine := NewReactorEngine()
-
- // Buffer 1, so it doesn't block for this test
- ch := make(chan React, 1)
- engine.Subscribe("test", ch)
- engine.Post("test", "hello")
-
- value := <-ch
- if val, ok := value.Resource.(string); ok {
- if val != "hello" {
- t.Error("Expected Resource to be 'hello', got", val)
- }
- } else {
- t.Error("Unable to cast")
- }
-}