diff options
-rw-r--r-- | ethchain/dagger.go | 5 | ||||
-rw-r--r-- | ethchain/state_manager.go | 4 | ||||
-rw-r--r-- | ethereum.go | 10 | ||||
-rw-r--r-- | ethlog/loggers.go | 62 | ||||
-rw-r--r-- | ethlog/loggers_test.go | 30 | ||||
-rw-r--r-- | ethminer/miner.go | 85 | ||||
-rw-r--r-- | ethreact/reactor.go | 182 | ||||
-rw-r--r-- | ethreact/reactor_test.go | 63 | ||||
-rw-r--r-- | ethutil/reactor.go | 87 | ||||
-rw-r--r-- | ethutil/reactor_test.go | 30 |
10 files changed, 367 insertions, 191 deletions
diff --git a/ethchain/dagger.go b/ethchain/dagger.go index dccd2ff5b..917b3d722 100644 --- a/ethchain/dagger.go +++ b/ethchain/dagger.go @@ -3,6 +3,7 @@ package ethchain import ( "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" + "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethutil" "github.com/obscuren/sha3" "hash" @@ -14,7 +15,7 @@ import ( var powlogger = ethlog.NewLogger("POW") type PoW interface { - Search(block *Block, reactChan chan ethutil.React) []byte + Search(block *Block, reactChan chan ethreact.Event) []byte Verify(hash []byte, diff *big.Int, nonce []byte) bool GetHashrate() int64 } @@ -28,7 +29,7 @@ func (pow *EasyPow) GetHashrate() int64 { return pow.HashRate } -func (pow *EasyPow) Search(block *Block, reactChan chan ethutil.React) []byte { +func (pow *EasyPow) Search(block *Block, reactChan chan ethreact.Event) []byte { r := rand.New(rand.NewSource(time.Now().UnixNano())) hash := block.HashNoNonce() diff := block.Difficulty diff --git a/ethchain/state_manager.go b/ethchain/state_manager.go index a12ce53e5..6bd3edd0d 100644 --- a/ethchain/state_manager.go +++ b/ethchain/state_manager.go @@ -6,7 +6,7 @@ import ( "fmt" "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" - _ "github.com/ethereum/eth-go/ethtrie" + "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" "math/big" @@ -36,7 +36,7 @@ type EthManager interface { BlockChain() *BlockChain TxPool() *TxPool Broadcast(msgType ethwire.MsgType, data []interface{}) - Reactor() *ethutil.ReactorEngine + Reactor() *ethreact.ReactorEngine PeerCount() int IsMining() bool IsListening() bool diff --git a/ethereum.go b/ethereum.go index 18c1f8a23..c48ac00e4 100644 --- a/ethereum.go +++ b/ethereum.go @@ -6,6 +6,7 @@ import ( "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethcrypto" "github.com/ethereum/eth-go/ethlog" + "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethrpc" "github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethwire" @@ -73,7 +74,7 @@ type Ethereum struct { listening bool - reactor *ethutil.ReactorEngine + reactor *ethreact.ReactorEngine RpcServer *ethrpc.JsonRpcServer @@ -111,7 +112,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager clientIdentity: clientIdentity, isUpToDate: true, } - ethereum.reactor = ethutil.NewReactorEngine() + ethereum.reactor = ethreact.New() ethereum.txPool = ethchain.NewTxPool(ethereum) ethereum.blockChain = ethchain.NewBlockChain(ethereum) @@ -123,7 +124,7 @@ func New(db ethutil.Database, clientIdentity ethwire.ClientIdentity, keyManager return ethereum, nil } -func (s *Ethereum) Reactor() *ethutil.ReactorEngine { +func (s *Ethereum) Reactor() *ethreact.ReactorEngine { return s.reactor } @@ -355,6 +356,7 @@ func (s *Ethereum) ReapDeadPeerHandler() { // Start the ethereum func (s *Ethereum) Start(seed bool) { + s.reactor.Start() // Bind to addr and port ln, err := net.Listen("tcp", ":"+s.Port) if err != nil { @@ -466,6 +468,8 @@ func (s *Ethereum) Stop() { } s.txPool.Stop() s.stateManager.Stop() + s.reactor.Flush() + s.reactor.Stop() ethlogger.Infoln("Server stopped") close(s.shutdownChan) diff --git a/ethlog/loggers.go b/ethlog/loggers.go index 50de213b3..b2760534b 100644 --- a/ethlog/loggers.go +++ b/ethlog/loggers.go @@ -39,7 +39,9 @@ func (msg *logMessage) send(logger LogSystem) { var logMessages chan (*logMessage) var logSystems []LogSystem -var quit chan bool +var quit chan chan error +var drained chan bool +var mutex = sync.Mutex{} type LogLevel uint8 @@ -52,34 +54,55 @@ const ( DebugDetailLevel ) +func dispatch(msg *logMessage) { + for _, logSystem := range logSystems { + if logSystem.GetLogLevel() >= msg.LogLevel { + msg.send(logSystem) + } + } +} + // log messages are dispatched to log writers func start() { -out: for { select { + case status := <-quit: + status <- nil + return case msg := <-logMessages: - for _, logSystem := range logSystems { - if logSystem.GetLogLevel() >= msg.LogLevel { - msg.send(logSystem) - } - } - case <-quit: - break out + dispatch(msg) + default: + drained <- true // this blocks until a message is sent to the queue } } } -// waits until log messages are drained (dispatched to log writers) -func Flush() { - quit <- true +func send(msg *logMessage) { + logMessages <- msg + select { + case <-drained: + default: + } +} -done: - for { +func Reset() { + mutex.Lock() + defer mutex.Unlock() + if logSystems != nil { + status := make(chan error) + quit <- status select { - case <-logMessages: + case <-drained: default: - break done } + <-status + } +} + +// waits until log messages are drained (dispatched to log writers) +func Flush() { + if logSystems != nil { + <-drained } } @@ -97,7 +120,8 @@ func AddLogSystem(logSystem LogSystem) { defer mutex.Unlock() if logSystems == nil { logMessages = make(chan *logMessage, 10) - quit = make(chan bool, 1) + quit = make(chan chan error, 1) + drained = make(chan bool, 1) go start() } logSystems = append(logSystems, logSystem) @@ -106,14 +130,14 @@ func AddLogSystem(logSystem LogSystem) { func (logger *Logger) sendln(level LogLevel, v ...interface{}) { if logMessages != nil { msg := newPrintlnLogMessage(level, logger.tag, v...) - logMessages <- msg + send(msg) } } func (logger *Logger) sendf(level LogLevel, format string, v ...interface{}) { if logMessages != nil { msg := newPrintfLogMessage(level, logger.tag, format, v...) - logMessages <- msg + send(msg) } } diff --git a/ethlog/loggers_test.go b/ethlog/loggers_test.go index 89f416681..a9b1463e7 100644 --- a/ethlog/loggers_test.go +++ b/ethlog/loggers_test.go @@ -28,8 +28,19 @@ func (t *TestLogSystem) GetLogLevel() LogLevel { return t.level } -func quote(s string) string { - return fmt.Sprintf("'%s'", s) +func TestLoggerFlush(t *testing.T) { + logger := NewLogger("TEST") + testLogSystem := &TestLogSystem{level: WarnLevel} + AddLogSystem(testLogSystem) + for i := 0; i < 5; i++ { + logger.Errorf(".") + } + Flush() + Reset() + output := testLogSystem.Output + if output != "[TEST] .[TEST] .[TEST] .[TEST] .[TEST] ." { + t.Error("Expected complete logger output '[TEST] .[TEST] .[TEST] .[TEST] .[TEST] .', got ", output) + } } func TestLoggerPrintln(t *testing.T) { @@ -41,10 +52,11 @@ func TestLoggerPrintln(t *testing.T) { logger.Infoln("info") logger.Debugln("debug") Flush() + Reset() output := testLogSystem.Output fmt.Println(quote(output)) if output != "[TEST] error\n[TEST] warn\n" { - t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem.Output)) + t.Error("Expected logger output '[TEST] error\\n[TEST] warn\\n', got ", output) } } @@ -57,10 +69,10 @@ func TestLoggerPrintf(t *testing.T) { logger.Infof("info") logger.Debugf("debug") Flush() + Reset() output := testLogSystem.Output - fmt.Println(quote(output)) if output != "[TEST] error to { 2}\n[TEST] warn" { - t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", quote(testLogSystem.Output)) + t.Error("Expected logger output '[TEST] error to { 2}\\n[TEST] warn', got ", output) } } @@ -73,13 +85,14 @@ func TestMultipleLogSystems(t *testing.T) { logger.Errorln("error") logger.Warnln("warn") Flush() + Reset() output0 := testLogSystem0.Output output1 := testLogSystem1.Output if output0 != "[TEST] error\n" { - t.Error("Expected logger 0 output '[TEST] error\\n', got ", quote(testLogSystem0.Output)) + t.Error("Expected logger 0 output '[TEST] error\\n', got ", output0) } if output1 != "[TEST] error\n[TEST] warn\n" { - t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", quote(testLogSystem1.Output)) + t.Error("Expected logger 1 output '[TEST] error\\n[TEST] warn\\n', got ", output1) } } @@ -94,9 +107,8 @@ func TestFileLogSystem(t *testing.T) { Flush() contents, _ := ioutil.ReadFile(filename) output := string(contents) - fmt.Println(quote(output)) if output != "[TEST] error to test.log\n[TEST] warn\n" { - t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", quote(output)) + t.Error("Expected contents of file 'test.log': '[TEST] error to test.log\\n[TEST] warn\\n', got ", output) } else { os.Remove(filename) } diff --git a/ethminer/miner.go b/ethminer/miner.go index a50b3712f..602ab0f35 100644 --- a/ethminer/miner.go +++ b/ethminer/miner.go @@ -4,7 +4,7 @@ import ( "bytes" "github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethlog" - "github.com/ethereum/eth-go/ethutil" + "github.com/ethereum/eth-go/ethreact" "github.com/ethereum/eth-go/ethwire" "sort" ) @@ -15,13 +15,13 @@ type Miner struct { pow ethchain.PoW ethereum ethchain.EthManager coinbase []byte - reactChan chan ethutil.React + reactChan chan ethreact.Event txs ethchain.Transactions uncles []*ethchain.Block block *ethchain.Block powChan chan []byte - powQuitChan chan ethutil.React - quitChan chan bool + powQuitChan chan ethreact.Event + quitChan chan chan error } func (self *Miner) GetPow() ethchain.PoW { @@ -29,55 +29,53 @@ func (self *Miner) GetPow() ethchain.PoW { } func NewDefaultMiner(coinbase []byte, ethereum ethchain.EthManager) *Miner { - reactChan := make(chan ethutil.React, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in - powChan := make(chan []byte, 1) // This is the channel that receives valid sha hases for a given block - powQuitChan := make(chan ethutil.React, 1) // This is the channel that can exit the miner thread - quitChan := make(chan bool, 1) - - ethereum.Reactor().Subscribe("newBlock", reactChan) - ethereum.Reactor().Subscribe("newTx:pre", reactChan) - - // We need the quit chan to be a Reactor event. - // The POW search method is actually blocking and if we don't - // listen to the reactor events inside of the pow itself - // The miner overseer will never get the reactor events themselves - // Only after the miner will find the sha - ethereum.Reactor().Subscribe("newBlock", powQuitChan) - ethereum.Reactor().Subscribe("newTx:pre", powQuitChan) - miner := Miner{ - pow: ðchain.EasyPow{}, - ethereum: ethereum, - coinbase: coinbase, - reactChan: reactChan, - powChan: powChan, - powQuitChan: powQuitChan, - quitChan: quitChan, + pow: ðchain.EasyPow{}, + ethereum: ethereum, + coinbase: coinbase, } - // Insert initial TXs in our little miner 'pool' - miner.txs = ethereum.TxPool().Flush() - miner.block = ethereum.BlockChain().NewBlock(miner.coinbase) - return &miner } func (miner *Miner) Start() { + miner.reactChan = make(chan ethreact.Event, 1) // This is the channel that receives 'updates' when ever a new transaction or block comes in + miner.powChan = make(chan []byte, 1) // This is the channel that receives valid sha hashes for a given block + miner.powQuitChan = make(chan ethreact.Event, 1) // This is the channel that can exit the miner thread + miner.quitChan = make(chan chan error, 1) + + // Insert initial TXs in our little miner 'pool' + miner.txs = miner.ethereum.TxPool().Flush() + miner.block = miner.ethereum.BlockChain().NewBlock(miner.coinbase) + // Prepare inital block //miner.ethereum.StateManager().Prepare(miner.block.State(), miner.block.State()) go miner.listener() + + reactor := miner.ethereum.Reactor() + reactor.Subscribe("newBlock", miner.reactChan) + reactor.Subscribe("newTx:pre", miner.reactChan) + + // We need the quit chan to be a Reactor event. + // The POW search method is actually blocking and if we don't + // listen to the reactor events inside of the pow itself + // The miner overseer will never get the reactor events themselves + // Only after the miner will find the sha + reactor.Subscribe("newBlock", miner.powQuitChan) + reactor.Subscribe("newTx:pre", miner.powQuitChan) + logger.Infoln("Started") - miner.ethereum.Reactor().Post("miner:start", miner) + reactor.Post("miner:start", miner) } func (miner *Miner) listener() { -out: for { select { - case <-miner.quitChan: + case status := <-miner.quitChan: logger.Infoln("Stopped") - break out + status <- nil + return case chanMessage := <-miner.reactChan: if block, ok := chanMessage.Resource.(*ethchain.Block); ok { @@ -133,13 +131,22 @@ out: } } -func (self *Miner) Stop() { +func (miner *Miner) Stop() { logger.Infoln("Stopping...") + status := make(chan error) + miner.quitChan <- status + <-status + + reactor := miner.ethereum.Reactor() + reactor.Unsubscribe("newBlock", miner.powQuitChan) + reactor.Unsubscribe("newTx:pre", miner.powQuitChan) + reactor.Unsubscribe("newBlock", miner.reactChan) + reactor.Unsubscribe("newTx:pre", miner.reactChan) - self.quitChan <- true - self.powQuitChan <- ethutil.React{} + close(miner.powQuitChan) + close(miner.quitChan) - self.ethereum.Reactor().Post("miner:stop", self) + reactor.Post("miner:stop", miner) } func (self *Miner) mineNewBlock() { diff --git a/ethreact/reactor.go b/ethreact/reactor.go new file mode 100644 index 000000000..a26f82a97 --- /dev/null +++ b/ethreact/reactor.go @@ -0,0 +1,182 @@ +package ethreact + +import ( + "github.com/ethereum/eth-go/ethlog" + "sync" +) + +var logger = ethlog.NewLogger("REACTOR") + +const ( + eventBufferSize int = 10 +) + +type EventHandler struct { + lock sync.RWMutex + name string + chans []chan Event +} + +// Post the Event with the reactor resource on the channels +// currently subscribed to the event +func (e *EventHandler) Post(event Event) { + e.lock.RLock() + defer e.lock.RUnlock() + + // if we want to preserve order pushing to subscibed channels + // dispatching should be syncrounous + // this means if subscribed event channel is blocked + // the reactor dispatch will be blocked, so we need to mitigate by skipping + // rogue blocking subscribers + for i, ch := range e.chans { + select { + case ch <- event: + default: + logger.Warnf("subscribing channel %d to event %s blocked. skipping\n", i, event.Name) + } + } +} + +// Add a subscriber to this event +func (e *EventHandler) Add(ch chan Event) { + e.lock.Lock() + defer e.lock.Unlock() + + e.chans = append(e.chans, ch) +} + +// Remove a subscriber +func (e *EventHandler) Remove(ch chan Event) int { + e.lock.Lock() + defer e.lock.Unlock() + + for i, c := range e.chans { + if c == ch { + e.chans = append(e.chans[:i], e.chans[i+1:]...) + } + } + return len(e.chans) +} + +// Basic reactor resource +type Event struct { + Resource interface{} + Name string +} + +// The reactor basic engine. Acts as bridge +// between the events and the subscribers/posters +type ReactorEngine struct { + lock sync.RWMutex + eventChannel chan Event + eventHandlers map[string]*EventHandler + quit chan chan error + running bool + drained chan bool +} + +func New() *ReactorEngine { + return &ReactorEngine{ + eventHandlers: make(map[string]*EventHandler), + eventChannel: make(chan Event, eventBufferSize), + quit: make(chan chan error, 1), + drained: make(chan bool, 1), + } +} + +func (reactor *ReactorEngine) Start() { + reactor.lock.Lock() + defer reactor.lock.Unlock() + if !reactor.running { + go func() { + for { + select { + case status := <-reactor.quit: + reactor.lock.Lock() + defer reactor.lock.Unlock() + reactor.running = false + logger.Infoln("stopped") + status <- nil + return + case event := <-reactor.eventChannel: + // needs to be called syncronously to keep order of events + reactor.dispatch(event) + default: + reactor.drained <- true // blocking till message is coming in + } + } + }() + reactor.running = true + logger.Infoln("started") + } +} + +func (reactor *ReactorEngine) Stop() { + if reactor.running { + status := make(chan error) + reactor.quit <- status + select { + case <-reactor.drained: + default: + } + <-status + } +} + +func (reactor *ReactorEngine) Flush() { + <-reactor.drained +} + +// Subscribe a channel to the specified event +func (reactor *ReactorEngine) Subscribe(event string, eventChannel chan Event) { + reactor.lock.Lock() + defer reactor.lock.Unlock() + + eventHandler := reactor.eventHandlers[event] + // Create a new event handler if one isn't available + if eventHandler == nil { + eventHandler = &EventHandler{name: event} + reactor.eventHandlers[event] = eventHandler + } + // Add the events channel to reactor event handler + eventHandler.Add(eventChannel) + logger.Debugf("added new subscription to %s", event) +} + +func (reactor *ReactorEngine) Unsubscribe(event string, eventChannel chan Event) { + reactor.lock.Lock() + defer reactor.lock.Unlock() + + eventHandler := reactor.eventHandlers[event] + if eventHandler != nil { + len := eventHandler.Remove(eventChannel) + if len == 0 { + reactor.eventHandlers[event] = nil + } + logger.Debugf("removed subscription to %s", event) + } +} + +func (reactor *ReactorEngine) Post(event string, resource interface{}) { + reactor.lock.Lock() + defer reactor.lock.Unlock() + + if reactor.running { + reactor.eventChannel <- Event{Resource: resource, Name: event} + select { + case <-reactor.drained: + default: + } + } +} + +func (reactor *ReactorEngine) dispatch(event Event) { + name := event.Name + eventHandler := reactor.eventHandlers[name] + // if no subscriptions to this event type - no event handler created + // then noone to notify + if eventHandler != nil { + // needs to be called syncronously + eventHandler.Post(event) + } +} diff --git a/ethreact/reactor_test.go b/ethreact/reactor_test.go new file mode 100644 index 000000000..801a8abd0 --- /dev/null +++ b/ethreact/reactor_test.go @@ -0,0 +1,63 @@ +package ethreact + +import ( + "fmt" + "testing" +) + +func TestReactorAdd(t *testing.T) { + reactor := New() + ch := make(chan Event) + reactor.Subscribe("test", ch) + if reactor.eventHandlers["test"] == nil { + t.Error("Expected new eventHandler to be created") + } + reactor.Unsubscribe("test", ch) + if reactor.eventHandlers["test"] != nil { + t.Error("Expected eventHandler to be removed") + } +} + +func TestReactorEvent(t *testing.T) { + var name string + reactor := New() + // Buffer the channel, so it doesn't block for this test + cap := 20 + ch := make(chan Event, cap) + reactor.Subscribe("even", ch) + reactor.Subscribe("odd", ch) + reactor.Post("even", "disappears") // should not broadcast if engine not started + reactor.Start() + for i := 0; i < cap; i++ { + if i%2 == 0 { + name = "even" + } else { + name = "odd" + } + reactor.Post(name, i) + } + reactor.Post("test", cap) // this should not block + i := 0 + reactor.Flush() + close(ch) + for event := range ch { + fmt.Printf("%d: %v", i, event) + if i%2 == 0 { + name = "even" + } else { + name = "odd" + } + if val, ok := event.Resource.(int); ok { + if i != val || event.Name != name { + t.Error("Expected event %d to be of type %s and resource %d, got ", i, name, i, val) + } + } else { + t.Error("Unable to cast") + } + i++ + } + if i != cap { + t.Error("excpected exactly %d events, got ", i) + } + reactor.Stop() +} diff --git a/ethutil/reactor.go b/ethutil/reactor.go deleted file mode 100644 index 7cf145245..000000000 --- a/ethutil/reactor.go +++ /dev/null @@ -1,87 +0,0 @@ -package ethutil - -import ( - "sync" -) - -type ReactorEvent struct { - mut sync.Mutex - event string - chans []chan React -} - -// Post the specified reactor resource on the channels -// currently subscribed -func (e *ReactorEvent) Post(react React) { - e.mut.Lock() - defer e.mut.Unlock() - - for _, ch := range e.chans { - go func(ch chan React) { - ch <- react - }(ch) - } -} - -// Add a subscriber to this event -func (e *ReactorEvent) Add(ch chan React) { - e.mut.Lock() - defer e.mut.Unlock() - - e.chans = append(e.chans, ch) -} - -// Remove a subscriber -func (e *ReactorEvent) Remove(ch chan React) { - e.mut.Lock() - defer e.mut.Unlock() - - for i, c := range e.chans { - if c == ch { - e.chans = append(e.chans[:i], e.chans[i+1:]...) - } - } -} - -// Basic reactor resource -type React struct { - Resource interface{} - Event string -} - -// The reactor basic engine. Acts as bridge -// between the events and the subscribers/posters -type ReactorEngine struct { - patterns map[string]*ReactorEvent -} - -func NewReactorEngine() *ReactorEngine { - return &ReactorEngine{patterns: make(map[string]*ReactorEvent)} -} - -// Subscribe a channel to the specified event -func (reactor *ReactorEngine) Subscribe(event string, ch chan React) { - ev := reactor.patterns[event] - // Create a new event if one isn't available - if ev == nil { - ev = &ReactorEvent{event: event} - reactor.patterns[event] = ev - } - - // Add the channel to reactor event handler - ev.Add(ch) -} - -func (reactor *ReactorEngine) Unsubscribe(event string, ch chan React) { - ev := reactor.patterns[event] - if ev != nil { - ev.Remove(ch) - } -} - -func (reactor *ReactorEngine) Post(event string, resource interface{}) { - ev := reactor.patterns[event] - if ev != nil { - ev.Post(React{Resource: resource, Event: event}) - } -} diff --git a/ethutil/reactor_test.go b/ethutil/reactor_test.go deleted file mode 100644 index 48c2f0df3..000000000 --- a/ethutil/reactor_test.go +++ /dev/null @@ -1,30 +0,0 @@ -package ethutil - -import "testing" - -func TestReactorAdd(t *testing.T) { - engine := NewReactorEngine() - ch := make(chan React) - engine.Subscribe("test", ch) - if len(engine.patterns) != 1 { - t.Error("Expected patterns to be 1, got", len(engine.patterns)) - } -} - -func TestReactorEvent(t *testing.T) { - engine := NewReactorEngine() - - // Buffer 1, so it doesn't block for this test - ch := make(chan React, 1) - engine.Subscribe("test", ch) - engine.Post("test", "hello") - - value := <-ch - if val, ok := value.Resource.(string); ok { - if val != "hello" { - t.Error("Expected Resource to be 'hello', got", val) - } - } else { - t.Error("Unable to cast") - } -} |