aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ethchain/dagger.go5
-rw-r--r--ethchain/state_manager.go3
-rw-r--r--ethereum.go10
-rw-r--r--ethminer/miner.go12
-rw-r--r--ethutil/reactor.go87
-rw-r--r--ethutil/reactor_test.go30
6 files changed, 18 insertions, 129 deletions
diff --git a/ethchain/dagger.go b/ethchain/dagger.go
index 4dda21ff5..adf1c2f05 100644
--- a/ethchain/dagger.go
+++ b/ethchain/dagger.go
@@ -3,6 +3,7 @@ package ethchain
import (
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
+ "github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethutil"
"github.com/obscuren/sha3"
"hash"
@@ -14,7 +15,7 @@ import (
var powlogger = ethlog.NewLogger("POW")
type PoW interface {
- Search(block *Block, reactChan chan ethutil.React) []byte
+ Search(block *Block, reactChan chan ethreact.Event) []byte
Verify(hash []byte, diff *big.Int, nonce []byte) bool
}
@@ -22,7 +23,7 @@ type EasyPow struct {
hash *big.Int
}
-func (pow *EasyPow) Search(block *Block, reactChan chan ethutil.React) []byte {
+func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
hash := block.HashNoNonce()
diff := block.Difficulty
diff --git a/ethchain/state_manager.go b/ethchain/state_manager.go
index 62fcda8a5..3eafd2d6e 100644
--- a/ethchain/state_manager.go
+++ b/ethchain/state_manager.go
@@ -6,6 +6,7 @@ import (
"fmt"
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
+ "github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethtrie"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
@@ -36,7 +37,7 @@ type EthManager interface {
BlockChain() *BlockChain
TxPool() *TxPool
Broadcast(msgType ethwire.MsgType, data []interface{})
- Reactor() *ethutil.ReactorEngine
+ Reactor() *ethreact.ReactorEngine
PeerCount() int
IsMining() bool
IsListening() bool
diff --git a/ethereum.go b/ethereum.go
index 2806dfd9d..c2d2f5241 100644
--- a/ethereum.go
+++ b/ethereum.go
@@ -6,6 +6,7 @@ import (
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethcrypto"
"github.com/ethereum/eth-go/ethlog"
+ "github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethrpc"
"github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire"
@@ -73,7 +74,7 @@ type Ethereum struct {
listening bool
- reactor *ethutil.ReactorEngine
+ reactor *ethreact.ReactorEngine
RpcServer *ethrpc.JsonRpcServer
@@ -108,7 +109,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
keyManager: keyManager,
clientIdentity: clientIdentity,
}
- ethereum.reactor = ethutil.NewReactorEngine()
+ ethereum.reactor = ethreact.New()
ethereum.txPool = ethchain.NewTxPool(ethereum)
ethereum.blockChain = ethchain.NewBlockChain(ethereum)
@@ -120,7 +121,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager
return ethereum, nil
}
-func (s *Ethereum) Reactor() *ethutil.ReactorEngine {
+func (s *Ethereum) Reactor() *ethreact.ReactorEngine {
return s.reactor
}
@@ -352,6 +353,7 @@ func (s *Ethereum) ReapDeadPeerHandler() {
// Start the ethereum
func (s *Ethereum) Start(seed bool) {
+ s.reactor.Start()
// Bind to addr and port
ln, err := net.Listen("tcp", ":"+s.Port)
if err != nil {
@@ -462,6 +464,8 @@ func (s *Ethereum) Stop() {
}
s.txPool.Stop()
s.stateManager.Stop()
+ s.reactor.Flush()
+ s.reactor.Stop()
ethlogger.Infoln("Server stopped")
close(s.shutdownChan)
diff --git a/ethminer/miner.go b/ethminer/miner.go
index 71d4b2428..8224c5441 100644
--- a/ethminer/miner.go
+++ b/ethminer/miner.go
@@ -4,7 +4,7 @@ import (
"bytes"
"github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethlog"
- "github.com/ethereum/eth-go/ethutil"
+ "github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethwire"
"sort"
)
@@ -15,19 +15,19 @@ type Miner struct {
pow ethchain.PoW
ethereum ethchain.EthManager
coinbase []byte
- reactChan chan ethutil.React
+ reactChan chan ethreact.Event
txs ethchain.Transactions
uncles []*ethchain.Block
block *ethchain.Block
powChan chan []byte
- powQuitChan chan ethutil.React
+ powQuitChan chan ethreact.Event
quitChan chan bool
}
func NewDefaultMiner(coinbase []byte, ethereum ethchain.EthManager) Miner {
- reactChan := make(chan ethutil.React, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in
- powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block
- powQuitChan := make(chan ethutil.React, 1) // This is the channel that can exit the miner thread
+ reactChan := make(chan ethreact.Event, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in
+ powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block
+ powQuitChan := make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread
quitChan := make(chan bool, 1)
ethereum.Reactor().Subscribe("newBlock", reactChan)
diff --git a/ethutil/reactor.go b/ethutil/reactor.go
deleted file mode 100644
index 7cf145245..000000000
--- a/ethutil/reactor.go
+++ /dev/null
@@ -1,87 +0,0 @@
-package ethutil
-
-import (
- "sync"
-)
-
-type ReactorEvent struct {
- mut sync.Mutex
- event string
- chans []chan React
-}
-
-// Post the specified reactor resource on the channels
-// currently subscribed
-func (e *ReactorEvent) Post(react React) {
- e.mut.Lock()
- defer e.mut.Unlock()
-
- for _, ch := range e.chans {
- go func(ch chan React) {
- ch <- react
- }(ch)
- }
-}
-
-// Add a subscriber to this event
-func (e *ReactorEvent) Add(ch chan React) {
- e.mut.Lock()
- defer e.mut.Unlock()
-
- e.chans = append(e.chans, ch)
-}
-
-// Remove a subscriber
-func (e *ReactorEvent) Remove(ch chan React) {
- e.mut.Lock()
- defer e.mut.Unlock()
-
- for i, c := range e.chans {
- if c == ch {
- e.chans = append(e.chans[:i], e.chans[i+1:]...)
- }
- }
-}
-
-// Basic reactor resource
-type React struct {
- Resource interface{}
- Event string
-}
-
-// The reactor basic engine. Acts as bridge
-// between the events and the subscribers/posters
-type ReactorEngine struct {
- patterns map[string]*ReactorEvent
-}
-
-func NewReactorEngine() *ReactorEngine {
- return &ReactorEngine{patterns: make(map[string]*ReactorEvent)}
-}
-
-// Subscribe a channel to the specified event
-func (reactor *ReactorEngine) Subscribe(event string, ch chan React) {
- ev := reactor.patterns[event]
- // Create a new event if one isn't available
- if ev == nil {
- ev = &ReactorEvent{event: event}
- reactor.patterns[event] = ev
- }
-
- // Add the channel to reactor event handler
- ev.Add(ch)
-}
-
-func (reactor *ReactorEngine) Unsubscribe(event string, ch chan React) {
- ev := reactor.patterns[event]
- if ev != nil {
- ev.Remove(ch)
- }
-}
-
-func (reactor *ReactorEngine) Post(event string, resource interface{}) {
- ev := reactor.patterns[event]
- if ev != nil {
- ev.Post(React{Resource: resource, Event: event})
- }
-}
diff --git a/ethutil/reactor_test.go b/ethutil/reactor_test.go
deleted file mode 100644
index 48c2f0df3..000000000
--- a/ethutil/reactor_test.go
+++ /dev/null
@@ -1,30 +0,0 @@
-package ethutil
-
-import "testing"
-
-func TestReactorAdd(t *testing.T) {
- engine := NewReactorEngine()
- ch := make(chan React)
- engine.Subscribe("test", ch)
- if len(engine.patterns) != 1 {
- t.Error("Expected patterns to be 1, got", len(engine.patterns))
- }
-}
-
-func TestReactorEvent(t *testing.T) {
- engine := NewReactorEngine()
-
- // Buffer 1, so it doesn't block for this test
- ch := make(chan React, 1)
- engine.Subscribe("test", ch)
- engine.Post("test", "hello")
-
- value := <-ch
- if val, ok := value.Resource.(string); ok {
- if val != "hello" {
- t.Error("Expected Resource to be 'hello', got", val)
- }
- } else {
- t.Error("Unable to cast")
- }
-}