aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ethchain/dagger.go5
-rw-r--r--ethchain/state_manager.go4
-rw-r--r--ethereum.go10
-rw-r--r--ethlog/loggers.go62
-rw-r--r--ethlog/loggers_test.go30
-rw-r--r--ethminer/miner.go85
-rw-r--r--ethreact/reactor.go182
-rw-r--r--ethreact/reactor_test.go63
-rw-r--r--ethutil/reactor.go87
-rw-r--r--ethutil/reactor_test.go30
10 files changed, 367 insertions, 191 deletions
diff --git a/ethchain/dagger.go b/ethchain/dagger.go
index dccd2ff5b..917b3d722 100644
--- a/ethchain/dagger.go
+++ b/ethchain/dagger.go
@@ -3,6 +3,7 @@ package ethchain
import (
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
+ "github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethutil"
"github.com/obscuren/sha3"
"hash"
@@ -14,7 +15,7 @@ import (
var powlogger = ethlog.NewLogger("POW")
type PoW interface {
- Search(block *Block, reactChan chan ethutil.React) []byte
+ Search(block *Block, reactChan chan ethreact.Event) []byte
Verify(hash []byte, diff *big.Int, nonce []byte) bool
GetHashrate() int64
}
@@ -28,7 +29,7 @@ func (pow *EasyPow) GetHashrate() int64 {
return pow.HashRate
}
-func (pow *EasyPow) Search(block *Block, reactChan chan ethutil.React) []byte {
+func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
hash := block.HashNoNonce()
diff := block.Difficulty
diff --git a/ethchain/state_manager.go b/ethchain/state_manager.go
index a12ce53e5..6bd3edd0d 100644
--- a/ethchain/state_manager.go
+++ b/ethchain/state_manager.go
@@ -6,7 +6,7 @@ import (
"fmt"
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
- _ "github.com/ethereum/eth-go/ethtrie"
+ "github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
"math/big"
@@ -36,7 +36,7 @@ type EthManager interface {
BlockChain() *BlockChain
TxPool() *TxPool
Broadcast(msgType ethwire.MsgType, data []interface{})
- Reactor() *ethutil.ReactorEngine
+ Reactor() *ethreact.ReactorEngine
PeerCount() int
IsMining() bool
IsListening() bool
diff --git a/ethereum.go b/ethereum.go
index 18c1f8a23..c48ac00e4 100644
--- a/ethereum.go
+++ b/ethereum.go
@@ -6,6 +6,7 @@ import (
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
+ "github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethrpc"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
@@ -73,7 +74,7 @@ type Ethereum struct {
listening bool
- reactor *ethutil.ReactorEngine
+ reactor *ethreact.ReactorEngine
RpcServer *ethrpc.JsonRpcServer
@@ -111,7 +112,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
clientIdentity: clientIdentity,
isUpToDate: true,
}
- ethereum.reactor = ethutil.NewReactorEngine()
+ ethereum.reactor = ethreact.New()
ethereum.txPool = ethchain.NewTxPool(ethereum)
ethereum.blockChain = ethchain.NewBlockChain(ethereum)
@@ -123,7 +124,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
return ethereum, nil
}
-func (s *Ethereum) Reactor() *ethutil.ReactorEngine {
+func (s *Ethereum) Reactor() *ethreact.ReactorEngine {
return s.reactor
}
@@ -355,6 +356,7 @@ func (s *Ethereum) ReapDeadPeerHandler() {
// Start the ethereum
func (s *Ethereum) Start(seed bool) {
+ s.reactor.Start()
// Bind to addr and port
ln, err := net.Listen("tcp", ":"+s.Port)
if err != nil {
@@ -466,6 +468,8 @@ func (s *Ethereum) Stop() {
}
s.txPool.Stop()
s.stateManager.Stop()
+ s.reactor.Flush()
+ s.reactor.Stop()
ethlogger.Infoln("Server stopped")
close(s.shutdownChan)
diff --git a/ethlog/loggers.go b/ethlog/loggers.go
index 50de213b3..b2760534b 100644
--- a/ethlog/loggers.go
+++ b/ethlog/loggers.go
@@ -39,7 +39,9 @@ func (msg *logMessage) send(logger LogSystem) {
var logMessages chan (*logMessage)
var logSystems []LogSystem
-var quit chan bool
+var quit chan chan error
+var drained chan bool
+var mutex = sync.Mutex{}
type LogLevel uint8
@@ -52,34 +54,55 @@ const (
DebugDetailLevel
)
+func dispatch(msg *logMessage) {
+ for _, logSystem := range logSystems {
+ if logSystem.GetLogLevel() >= msg.LogLevel {
+ msg.send(logSystem)
+ }
+ }
+}
+
// log messages are dispatched to log writers
func start() {
-out:
for {
select {
+ case status := <-quit:
+ status <- nil
+ return
case msg := <-logMessages:
- for _, logSystem := range logSystems {
- if logSystem.GetLogLevel() >= msg.LogLevel {
- msg.send(logSystem)
- }
- }
- case <-quit:
- break out
+ dispatch(msg)
+ default:
+ drained <- true // this blocks until a message is sent to the queue
}
}
}
-// waits until log messages are drained (dispatched to log writers)
-func Flush() {
- quit <- true
+func send(msg *logMessage) {
+ logMessages <- msg
+ select {
+ case <-drained:
+ default:
+ }
+}
-done:
- for {
+func Reset() {
+ mutex.Lock()
+ defer mutex.Unlock()
+ if logSystems != nil {
+ status := make(chan error)
+ quit <- status
select {
- case <-logMessages:
+ case <-drained:
default:
- break done
}
+ <-status
+ }
+}
+
+// waits until log messages are drained (dispatched to log writers)
+func Flush() {
+ if logSystems != nil {
+ <-drained
}
}
@@ -97,7 +120,8 @@ func AddLogSystem(logSystem LogSystem) {
defer mutex.Unlock()
if logSystems == nil {
logMessages = make(chan *logMessage, 10)
- quit = make(chan bool, 1)
+ quit = make(chan chan error, 1)
+ drained = make(chan bool, 1)
go start()
}
logSystems = append(logSystems, logSystem)
@@ -106,14 +130,14 @@ func AddLogSystem(logSystem LogSystem) {
func (logger *Logger) sendln(level LogLevel, v ...interface{}) {
if logMessages != nil {
msg := newPrintlnLogMessage(level, logger.tag, v...)
- logMessages <- msg
+ send(msg)
}
}
func (logger *Logger) sendf(level LogLevel, format string, v ...interface{}) {
if logMessages != nil {
msg := newPrintfLogMessage(level, logger.tag, format, v...)
- logMessages <- msg
+ send(msg)
}
}
diff --git a/ethlog/loggers_test.go b/ethlog/loggers_test.go
index 89f416681..a9b1463e7 100644
--- a/ethlog/loggers_test.go
+++ b/ethlog/loggers_test.go
@@ -28,8 +28,19 @@ func (t *TestLogSystem) GetLogLevel() LogLevel {
return t.level
}
-func quote(s string) string {
- return fmt.Sprintf("'%s'", s)
+func TestLoggerFlush(t *testing.T) {
+ logger := NewLogger("TEST")
+ testLogSystem := &TestLogSystem{level: WarnLevel}
+ AddLogSystem(testLogSystem)
+ for i := 0; i < 5; i++ {
+ logger.Errorf(".")
+ }
+ Flush()
+ Reset()
+ output := testLogSystem.Output
+ if output != "[TEST] .[TEST] .[TEST] .[TEST] .[TEST] ." {
+ t.Error("Expected complete logger output '[TEST] .[TEST] .[TEST] .[TEST] .[TEST] .', got ", output)
+ }
}
func TestLoggerPrintln(t *testing.T) {
@@ -41,10 +52,11 @@ func TestLoggerPrintln(t *testing.T) {
logger.Infoln("info")
logger.Debugln("debug")
Flush()
+ Reset()
output := testLogSystem.Output
fmt.Println(quote(output))
if output != "[TEST] error\n[TEST] warn\n" {
- t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem.Output))
+ t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", output)
}
}
@@ -57,10 +69,10 @@ func TestLoggerPrintf(t *testing.T) {
logger.Infof("info")
logger.Debugf("debug")
Flush()
+ Reset()
output := testLogSystem.Output
- fmt.Println(quote(output))
if output != "[TEST] error to { 2}\n[TEST] warn" {
- t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", quote(testLogSystem.Output))
+ t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", output)
}
}
@@ -73,13 +85,14 @@ func TestMultipleLogSystems(t *testing.T) {
logger.Errorln("error")
logger.Warnln("warn")
Flush()
+ Reset()
output0 := testLogSystem0.Output
output1 := testLogSystem1.Output
if output0 != "[TEST] error\n" {
- t.Error("Expected logger 0 output '[TEST] error\\n', got ", quote(testLogSystem0.Output))
+ t.Error("Expected logger 0 output '[TEST] error\\n', got ", output0)
}
if output1 != "[TEST] error\n[TEST] warn\n" {
- t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem1.Output))
+ t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", output1)
}
}
@@ -94,9 +107,8 @@ func TestFileLogSystem(t *testing.T) {
Flush()
contents, _ := ioutil.ReadFile(filename)
output := string(contents)
- fmt.Println(quote(output))
if output != "[TEST] error to test.log\n[TEST] warn\n" {
- t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", quote(output))
+ t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", output)
} else {
os.Remove(filename)
}
diff --git a/ethminer/miner.go b/ethminer/miner.go
index a50b3712f..602ab0f35 100644
--- a/ethminer/miner.go
+++ b/ethminer/miner.go
@@ -4,7 +4,7 @@ import (
"bytes"
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethlog"
- "github.com/ethereum/eth-go/ethutil"
+ "github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethwire"
"sort"
)
@@ -15,13 +15,13 @@ type Miner struct {
pow ethchain.PoW
ethereum ethchain.EthManager
coinbase []byte
- reactChan chan ethutil.React
+ reactChan chan ethreact.Event
txs ethchain.Transactions
uncles []*ethchain.Block
block *ethchain.Block
powChan chan []byte
- powQuitChan chan ethutil.React
- quitChan chan bool
+ powQuitChan chan ethreact.Event
+ quitChan chan chan error
}
func (self *Miner) GetPow() ethchain.PoW {
@@ -29,55 +29,53 @@ func (self *Miner) GetPow() ethchain.PoW {
}
func NewDefaultMiner(coinbase []byte, ethereum ethchain.EthManager) *Miner {
- reactChan := make(chan ethutil.React, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in
- powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block
- powQuitChan := make(chan ethutil.React, 1) // This is the channel that can exit the miner thread
- quitChan := make(chan bool, 1)
-
- ethereum.Reactor().Subscribe("newBlock", reactChan)
- ethereum.Reactor().Subscribe("newTx:pre", reactChan)
-
- // We need the quit chan to be a Reactor event.
- // The POW search method is actually blocking and if we don't
- // listen to the reactor events inside of the pow itself
- // The miner overseer will never get the reactor events themselves
- // Only after the miner will find the sha
- ethereum.Reactor().Subscribe("newBlock", powQuitChan)
- ethereum.Reactor().Subscribe("newTx:pre", powQuitChan)
-
miner := Miner{
- pow: &ethchain.EasyPow{},
- ethereum: ethereum,
- coinbase: coinbase,
- reactChan: reactChan,
- powChan: powChan,
- powQuitChan: powQuitChan,
- quitChan: quitChan,
+ pow: &ethchain.EasyPow{},
+ ethereum: ethereum,
+ coinbase: coinbase,
}
- // Insert initial TXs in our little miner 'pool'
- miner.txs = ethereum.TxPool().Flush()
- miner.block = ethereum.BlockChain().NewBlock(miner.coinbase)
-
return &miner
}
func (miner *Miner) Start() {
+ miner.reactChan = make(chan ethreact.Event, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in
+ miner.powChan = make(chan []byte, 1) // This is the channel that receives valid sha hashes for a given block
+ miner.powQuitChan = make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread
+ miner.quitChan = make(chan chan error, 1)
+
+ // Insert initial TXs in our little miner 'pool'
+ miner.txs = miner.ethereum.TxPool().Flush()
+ miner.block = miner.ethereum.BlockChain().NewBlock(miner.coinbase)
+
// Prepare inital block
//miner.ethereum.StateManager().Prepare(miner.block.State(), miner.block.State())
go miner.listener()
+
+ reactor := miner.ethereum.Reactor()
+ reactor.Subscribe("newBlock", miner.reactChan)
+ reactor.Subscribe("newTx:pre", miner.reactChan)
+
+ // We need the quit chan to be a Reactor event.
+ // The POW search method is actually blocking and if we don't
+ // listen to the reactor events inside of the pow itself
+ // The miner overseer will never get the reactor events themselves
+ // Only after the miner will find the sha
+ reactor.Subscribe("newBlock", miner.powQuitChan)
+ reactor.Subscribe("newTx:pre", miner.powQuitChan)
+
logger.Infoln("Started")
- miner.ethereum.Reactor().Post("miner:start", miner)
+ reactor.Post("miner:start", miner)
}
func (miner *Miner) listener() {
-out:
for {
select {
- case <-miner.quitChan:
+ case status := <-miner.quitChan:
logger.Infoln("Stopped")
- break out
+ status <- nil
+ return
case chanMessage := <-miner.reactChan:
if block, ok := chanMessage.Resource.(*ethchain.Block); ok {
@@ -133,13 +131,22 @@ out:
}
}
-func (self *Miner) Stop() {
+func (miner *Miner) Stop() {
logger.Infoln("Stopping...")
+ status := make(chan error)
+ miner.quitChan <- status
+ <-status
+
+ reactor := miner.ethereum.Reactor()
+ reactor.Unsubscribe("newBlock", miner.powQuitChan)
+ reactor.Unsubscribe("newTx:pre", miner.powQuitChan)
+ reactor.Unsubscribe("newBlock", miner.reactChan)
+ reactor.Unsubscribe("newTx:pre", miner.reactChan)
- self.quitChan <- true
- self.powQuitChan <- ethutil.React{}
+ close(miner.powQuitChan)
+ close(miner.quitChan)
- self.ethereum.Reactor().Post("miner:stop", self)
+ reactor.Post("miner:stop", miner)
}
func (self *Miner) mineNewBlock() {
diff --git a/ethreact/reactor.go b/ethreact/reactor.go
new file mode 100644
index 000000000..a26f82a97
--- /dev/null
+++ b/ethreact/reactor.go
@@ -0,0 +1,182 @@
+package ethreact
+
+import (
+ "github.com/ethereum/eth-go/ethlog"
+ "sync"
+)
+
+var logger = ethlog.NewLogger("REACTOR")
+
+const (
+ eventBufferSize int = 10
+)
+
+type EventHandler struct {
+ lock sync.RWMutex
+ name string
+ chans []chan Event
+}
+
+// Post the Event with the reactor resource on the channels
+// currently subscribed to the event
+func (e *EventHandler) Post(event Event) {
+ e.lock.RLock()
+ defer e.lock.RUnlock()
+
+ // if we want to preserve order pushing to subscibed channels
+ // dispatching should be syncrounous
+ // this means if subscribed event channel is blocked
+ // the reactor dispatch will be blocked, so we need to mitigate by skipping
+ // rogue blocking subscribers
+ for i, ch := range e.chans {
+ select {
+ case ch <- event:
+ default:
+ logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name)
+ }
+ }
+}
+
+// Add a subscriber to this event
+func (e *EventHandler) Add(ch chan Event) {
+ e.lock.Lock()
+ defer e.lock.Unlock()
+
+ e.chans = append(e.chans, ch)
+}
+
+// Remove a subscriber
+func (e *EventHandler) Remove(ch chan Event) int {
+ e.lock.Lock()
+ defer e.lock.Unlock()
+
+ for i, c := range e.chans {
+ if c == ch {
+ e.chans = append(e.chans[:i], e.chans[i+1:]...)
+ }
+ }
+ return len(e.chans)
+}
+
+// Basic reactor resource
+type Event struct {
+ Resource interface{}
+ Name string
+}
+
+// The reactor basic engine. Acts as bridge
+// between the events and the subscribers/posters
+type ReactorEngine struct {
+ lock sync.RWMutex
+ eventChannel chan Event
+ eventHandlers map[string]*EventHandler
+ quit chan chan error
+ running bool
+ drained chan bool
+}
+
+func New() *ReactorEngine {
+ return &ReactorEngine{
+ eventHandlers: make(map[string]*EventHandler),
+ eventChannel: make(chan Event, eventBufferSize),
+ quit: make(chan chan error, 1),
+ drained: make(chan bool, 1),
+ }
+}
+
+func (reactor *ReactorEngine) Start() {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+ if !reactor.running {
+ go func() {
+ for {
+ select {
+ case status := <-reactor.quit:
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+ reactor.running = false
+ logger.Infoln("stopped")
+ status <- nil
+ return
+ case event := <-reactor.eventChannel:
+ // needs to be called syncronously to keep order of events
+ reactor.dispatch(event)
+ default:
+ reactor.drained <- true // blocking till message is coming in
+ }
+ }
+ }()
+ reactor.running = true
+ logger.Infoln("started")
+ }
+}
+
+func (reactor *ReactorEngine) Stop() {
+ if reactor.running {
+ status := make(chan error)
+ reactor.quit <- status
+ select {
+ case <-reactor.drained:
+ default:
+ }
+ <-status
+ }
+}
+
+func (reactor *ReactorEngine) Flush() {
+ <-reactor.drained
+}
+
+// Subscribe a channel to the specified event
+func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+
+ eventHandler := reactor.eventHandlers[event]
+ // Create a new event handler if one isn't available
+ if eventHandler == nil {
+ eventHandler = &EventHandler{name: event}
+ reactor.eventHandlers[event] = eventHandler
+ }
+ // Add the events channel to reactor event handler
+ eventHandler.Add(eventChannel)
+ logger.Debugf("added new subscription to %s", event)
+}
+
+func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+
+ eventHandler := reactor.eventHandlers[event]
+ if eventHandler != nil {
+ len := eventHandler.Remove(eventChannel)
+ if len == 0 {
+ reactor.eventHandlers[event] = nil
+ }
+ logger.Debugf("removed subscription to %s", event)
+ }
+}
+
+func (reactor *ReactorEngine) Post(event string, resource interface{}) {
+ reactor.lock.Lock()
+ defer reactor.lock.Unlock()
+
+ if reactor.running {
+ reactor.eventChannel <- Event{Resource: resource, Name: event}
+ select {
+ case <-reactor.drained:
+ default:
+ }
+ }
+}
+
+func (reactor *ReactorEngine) dispatch(event Event) {
+ name := event.Name
+ eventHandler := reactor.eventHandlers[name]
+ // if no subscriptions to this event type - no event handler created
+ // then noone to notify
+ if eventHandler != nil {
+ // needs to be called syncronously
+ eventHandler.Post(event)
+ }
+}
diff --git a/ethreact/reactor_test.go b/ethreact/reactor_test.go
new file mode 100644
index 000000000..801a8abd0
--- /dev/null
+++ b/ethreact/reactor_test.go
@@ -0,0 +1,63 @@
+package ethreact
+
+import (
+ "fmt"
+ "testing"
+)
+
+func TestReactorAdd(t *testing.T) {
+ reactor := New()
+ ch := make(chan Event)
+ reactor.Subscribe("test", ch)
+ if reactor.eventHandlers["test"] == nil {
+ t.Error("Expected new eventHandler to be created")
+ }
+ reactor.Unsubscribe("test", ch)
+ if reactor.eventHandlers["test"] != nil {
+ t.Error("Expected eventHandler to be removed")
+ }
+}
+
+func TestReactorEvent(t *testing.T) {
+ var name string
+ reactor := New()
+ // Buffer the channel, so it doesn't block for this test
+ cap := 20
+ ch := make(chan Event, cap)
+ reactor.Subscribe("even", ch)
+ reactor.Subscribe("odd", ch)
+ reactor.Post("even", "disappears") // should not broadcast if engine not started
+ reactor.Start()
+ for i := 0; i < cap; i++ {
+ if i%2 == 0 {
+ name = "even"
+ } else {
+ name = "odd"
+ }
+ reactor.Post(name, i)
+ }
+ reactor.Post("test", cap) // this should not block
+ i := 0
+ reactor.Flush()
+ close(ch)
+ for event := range ch {
+ fmt.Printf("%d: %v", i, event)
+ if i%2 == 0 {
+ name = "even"
+ } else {
+ name = "odd"
+ }
+ if val, ok := event.Resource.(int); ok {
+ if i != val || event.Name != name {
+ t.Error("Expected event %d to be of type %s and resource %d, got ", i, name, i, val)
+ }
+ } else {
+ t.Error("Unable to cast")
+ }
+ i++
+ }
+ if i != cap {
+ t.Error("excpected exactly %d events, got ", i)
+ }
+ reactor.Stop()
+}
diff --git a/ethutil/reactor.go b/ethutil/reactor.go
deleted file mode 100644
index 7cf145245..000000000
--- a/ethutil/reactor.go
+++ /dev/null
@@ -1,87 +0,0 @@
-package ethutil
-
-import (
- "sync"
-)
-
-type ReactorEvent struct {
- mut sync.Mutex
- event string
- chans []chan React
-}
-
-// Post the specified reactor resource on the channels
-// currently subscribed
-func (e *ReactorEvent) Post(react React) {
- e.mut.Lock()
- defer e.mut.Unlock()
-
- for _, ch := range e.chans {
- go func(ch chan React) {
- ch <- react
- }(ch)
- }
-}
-
-// Add a subscriber to this event
-func (e *ReactorEvent) Add(ch chan React) {
- e.mut.Lock()
- defer e.mut.Unlock()
-
- e.chans = append(e.chans, ch)
-}
-
-// Remove a subscriber
-func (e *ReactorEvent) Remove(ch chan React) {
- e.mut.Lock()
- defer e.mut.Unlock()
-
- for i, c := range e.chans {
- if c == ch {
- e.chans = append(e.chans[:i], e.chans[i+1:]...)
- }
- }
-}
-
-// Basic reactor resource
-type React struct {
- Resource interface{}
- Event string
-}
-
-// The reactor basic engine. Acts as bridge
-// between the events and the subscribers/posters
-type ReactorEngine struct {
- patterns map[string]*ReactorEvent
-}
-
-func NewReactorEngine() *ReactorEngine {
- return &ReactorEngine{patterns: make(map[string]*ReactorEvent)}
-}
-
-// Subscribe a channel to the specified event
-func (reactor *ReactorEngine) Subscribe(event string, ch chan React) {
- ev := reactor.patterns[event]
- // Create a new event if one isn't available
- if ev == nil {
- ev = &ReactorEvent{event: event}
- reactor.patterns[event] = ev
- }
-
- // Add the channel to reactor event handler
- ev.Add(ch)
-}
-
-func (reactor *ReactorEngine) Unsubscribe(event string, ch chan React) {
- ev := reactor.patterns[event]
- if ev != nil {
- ev.Remove(ch)
- }
-}
-
-func (reactor *ReactorEngine) Post(event string, resource interface{}) {
- ev := reactor.patterns[event]
- if ev != nil {
- ev.Post(React{Resource: resource, Event: event})
- }
-}
diff --git a/ethutil/reactor_test.go b/ethutil/reactor_test.go
deleted file mode 100644
index 48c2f0df3..000000000
--- a/ethutil/reactor_test.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package ethutil
-
-import "testing"
-
-func TestReactorAdd(t *testing.T) {
- engine := NewReactorEngine()
- ch := make(chan React)
- engine.Subscribe("test", ch)
- if len(engine.patterns) != 1 {
- t.Error("Expected patterns to be 1, got", len(engine.patterns))
- }
-}
-
-func TestReactorEvent(t *testing.T) {
- engine := NewReactorEngine()
-
- // Buffer 1, so it doesn't block for this test
- ch := make(chan React, 1)
- engine.Subscribe("test", ch)
- engine.Post("test", "hello")
-
- value := <-ch
- if val, ok := value.Resource.(string); ok {
- if val != "hello" {
- t.Error("Expected Resource to be 'hello', got", val)
- }
- } else {
- t.Error("Unable to cast")
- }
-}