diff options
author | obscuren <geffobscura@gmail.com> | 2015-03-06 22:50:44 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-03-06 22:50:44 +0800 |
commit | 8d9be18b296afb8302249dcc96651aabb0975e26 (patch) | |
tree | 70b8e2ac38edcb8968b946bcfc0772f188d668d7 | |
parent | b72ca57775c7baa6d83cf00093496c9836039d47 (diff) | |
download | go-tangerine-8d9be18b296afb8302249dcc96651aabb0975e26.tar go-tangerine-8d9be18b296afb8302249dcc96651aabb0975e26.tar.gz go-tangerine-8d9be18b296afb8302249dcc96651aabb0975e26.tar.bz2 go-tangerine-8d9be18b296afb8302249dcc96651aabb0975e26.tar.lz go-tangerine-8d9be18b296afb8302249dcc96651aabb0975e26.tar.xz go-tangerine-8d9be18b296afb8302249dcc96651aabb0975e26.tar.zst go-tangerine-8d9be18b296afb8302249dcc96651aabb0975e26.zip |
Queued approach to delivering chain events
-rw-r--r-- | core/chain_manager.go | 107 | ||||
-rw-r--r-- | core/events.go | 10 | ||||
-rw-r--r-- | miner/miner.go | 1 | ||||
-rw-r--r-- | miner/worker.go | 4 |
4 files changed, 86 insertions, 36 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go index 75d2f6bad..20a1737ad 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -19,11 +19,6 @@ var ( jsonlogger = logger.NewJsonLogger() ) -type ChainEvent struct { - Block *types.Block - Td *big.Int -} - type StateQuery interface { GetAccount(addr []byte) *state.StateObject } @@ -93,13 +88,16 @@ type ChainManager struct { transState *state.StateDB txState *state.StateDB + + quit chan struct{} } func NewChainManager(db ethutil.Database, mux *event.TypeMux) *ChainManager { - bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: mux} + bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: mux, quit: make(chan struct{})} bc.setLastBlock() bc.transState = bc.State().Copy() bc.txState = bc.State().Copy() + go bc.update() return bc } @@ -388,16 +386,24 @@ func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) { } func (bc *ChainManager) Stop() { - if bc.CurrentBlock != nil { - chainlogger.Infoln("Stopped") - } + close(bc.quit) +} + +type queueEvent struct { + queue []interface{} + canonicalCount int + sideCount int + splitCount int } func (self *ChainManager) InsertChain(chain types.Blocks) error { - self.tsmu.Lock() - defer self.tsmu.Unlock() + //self.tsmu.Lock() + //defer self.tsmu.Unlock() - for _, block := range chain { + // A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring. + var queue = make([]interface{}, len(chain)) + var queueEvent = queueEvent{queue: queue} + for i, block := range chain { // Call in to the block processor and check for errors. It's likely that if one block fails // all others will fail too (unless a known block is returned). td, err := self.processor.Process(block) @@ -414,7 +420,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { } block.Td = td - var canonical, split bool self.mu.Lock() cblock := self.currentBlock { @@ -426,41 +431,75 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { if td.Cmp(self.td) > 0 { if block.Header().Number.Cmp(new(big.Int).Add(cblock.Header().Number, ethutil.Big1)) < 0 { chainlogger.Infof("Split detected. New head #%v (%x) TD=%v, was #%v (%x) TD=%v\n", block.Header().Number, block.Hash()[:4], td, cblock.Header().Number, cblock.Hash()[:4], self.td) - split = true + + queue[i] = ChainSplitEvent{block} + queueEvent.splitCount++ } self.setTotalDifficulty(td) self.insert(block) - canonical = true + /* + jsonlogger.LogJson(&logger.EthChainNewHead{ + BlockHash: ethutil.Bytes2Hex(block.Hash()), + BlockNumber: block.Number(), + ChainHeadHash: ethutil.Bytes2Hex(cblock.Hash()), + BlockPrevHash: ethutil.Bytes2Hex(block.ParentHash()), + }) + */ + + self.setTransState(state.New(block.Root(), self.db)) + queue[i] = ChainEvent{block} + queueEvent.canonicalCount++ + } else { + queue[i] = ChainSideEvent{block} + queueEvent.sideCount++ } } self.mu.Unlock() - if canonical { - /* - jsonlogger.LogJson(&logger.EthChainNewHead{ - BlockHash: ethutil.Bytes2Hex(block.Hash()), - BlockNumber: block.Number(), - ChainHeadHash: ethutil.Bytes2Hex(cblock.Hash()), - BlockPrevHash: ethutil.Bytes2Hex(block.ParentHash()), - }) - */ - self.setTransState(state.New(block.Root(), self.db)) - self.eventMux.Post(ChainEvent{block, td}) - } else { - //self.eventMux. - } - - if split { - self.setTxState(state.New(block.Root(), self.db)) - self.eventMux.Post(ChainSplitEvent{block}) - } } + // XXX put this in a goroutine? + go self.eventMux.Post(queueEvent) + return nil } +func (self *ChainManager) update() { + events := self.eventMux.Subscribe(queueEvent{}) + +out: + for { + select { + case ev := <-events.Chan(): + switch ev := ev.(type) { + case queueEvent: + for i, event := range ev.queue { + switch event := event.(type) { + case ChainEvent: + // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long + // and in most cases isn't even necessary. + if i == ev.canonicalCount { + self.eventMux.Post(ChainHeadEvent{event.Block}) + } + case ChainSplitEvent: + // On chain splits we need to reset the transaction state. We can't be sure whether the actual + // state of the accounts are still valid. + if i == ev.splitCount { + self.setTxState(state.New(event.Block.Root(), self.db)) + } + } + + self.eventMux.Post(event) + } + } + case <-self.quit: + break out + } + } +} + // Satisfy state query interface func (self *ChainManager) GetAccount(addr []byte) *state.StateObject { return self.State().GetAccount(addr) diff --git a/core/events.go b/core/events.go index 4cbbc609c..23678ef60 100644 --- a/core/events.go +++ b/core/events.go @@ -16,3 +16,13 @@ type NewMinedBlockEvent struct{ Block *types.Block } // ChainSplit is posted when a new head is detected type ChainSplitEvent struct{ Block *types.Block } + +type ChainEvent struct{ Block *types.Block } + +type ChainSideEvent struct{ Block *types.Block } + +type ChainHeadEvent struct{ Block *types.Block } + +// Mining operation events +type StartMining struct{} +type TopMining struct{} diff --git a/miner/miner.go b/miner/miner.go index 490296431..d3b1f578a 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -30,6 +30,7 @@ func New(coinbase []byte, eth core.Backend, pow pow.PoW, minerThreads int) *Mine pow: pow, } + minerThreads = 1 for i := 0; i < minerThreads; i++ { miner.worker.register(NewCpuMiner(i, miner.pow)) } diff --git a/miner/worker.go b/miner/worker.go index 29992b327..cd105fa73 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -116,7 +116,7 @@ func (self *worker) register(agent Agent) { } func (self *worker) update() { - events := self.mux.Subscribe(core.ChainEvent{}, core.NewMinedBlockEvent{}) + events := self.mux.Subscribe(core.ChainHeadEvent{}, core.NewMinedBlockEvent{}) timer := time.NewTicker(2 * time.Second) @@ -125,7 +125,7 @@ out: select { case event := <-events.Chan(): switch ev := event.(type) { - case core.ChainEvent: + case core.ChainHeadEvent: if self.current.block != ev.Block { self.commitNewWork() } |