diff options
author | obscuren <geffobscura@gmail.com> | 2015-03-06 22:50:44 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-03-06 22:50:44 +0800 |
commit | 8d9be18b296afb8302249dcc96651aabb0975e26 (patch) | |
tree | 70b8e2ac38edcb8968b946bcfc0772f188d668d7 /core | |
parent | b72ca57775c7baa6d83cf00093496c9836039d47 (diff) | |
download | dexon-8d9be18b296afb8302249dcc96651aabb0975e26.tar dexon-8d9be18b296afb8302249dcc96651aabb0975e26.tar.gz dexon-8d9be18b296afb8302249dcc96651aabb0975e26.tar.bz2 dexon-8d9be18b296afb8302249dcc96651aabb0975e26.tar.lz dexon-8d9be18b296afb8302249dcc96651aabb0975e26.tar.xz dexon-8d9be18b296afb8302249dcc96651aabb0975e26.tar.zst dexon-8d9be18b296afb8302249dcc96651aabb0975e26.zip |
Queued approach to delivering chain events
Diffstat (limited to 'core')
-rw-r--r-- | core/chain_manager.go | 107 | ||||
-rw-r--r-- | core/events.go | 10 |
2 files changed, 83 insertions, 34 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go index 75d2f6bad..20a1737ad 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -19,11 +19,6 @@ var ( jsonlogger = logger.NewJsonLogger() ) -type ChainEvent struct { - Block *types.Block - Td *big.Int -} - type StateQuery interface { GetAccount(addr []byte) *state.StateObject } @@ -93,13 +88,16 @@ type ChainManager struct { transState *state.StateDB txState *state.StateDB + + quit chan struct{} } func NewChainManager(db ethutil.Database, mux *event.TypeMux) *ChainManager { - bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: mux} + bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: mux, quit: make(chan struct{})} bc.setLastBlock() bc.transState = bc.State().Copy() bc.txState = bc.State().Copy() + go bc.update() return bc } @@ -388,16 +386,24 @@ func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) { } func (bc *ChainManager) Stop() { - if bc.CurrentBlock != nil { - chainlogger.Infoln("Stopped") - } + close(bc.quit) +} + +type queueEvent struct { + queue []interface{} + canonicalCount int + sideCount int + splitCount int } func (self *ChainManager) InsertChain(chain types.Blocks) error { - self.tsmu.Lock() - defer self.tsmu.Unlock() + //self.tsmu.Lock() + //defer self.tsmu.Unlock() - for _, block := range chain { + // A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring. + var queue = make([]interface{}, len(chain)) + var queueEvent = queueEvent{queue: queue} + for i, block := range chain { // Call in to the block processor and check for errors. It's likely that if one block fails // all others will fail too (unless a known block is returned). td, err := self.processor.Process(block) @@ -414,7 +420,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { } block.Td = td - var canonical, split bool self.mu.Lock() cblock := self.currentBlock { @@ -426,41 +431,75 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { if td.Cmp(self.td) > 0 { if block.Header().Number.Cmp(new(big.Int).Add(cblock.Header().Number, ethutil.Big1)) < 0 { chainlogger.Infof("Split detected. New head #%v (%x) TD=%v, was #%v (%x) TD=%v\n", block.Header().Number, block.Hash()[:4], td, cblock.Header().Number, cblock.Hash()[:4], self.td) - split = true + + queue[i] = ChainSplitEvent{block} + queueEvent.splitCount++ } self.setTotalDifficulty(td) self.insert(block) - canonical = true + /* + jsonlogger.LogJson(&logger.EthChainNewHead{ + BlockHash: ethutil.Bytes2Hex(block.Hash()), + BlockNumber: block.Number(), + ChainHeadHash: ethutil.Bytes2Hex(cblock.Hash()), + BlockPrevHash: ethutil.Bytes2Hex(block.ParentHash()), + }) + */ + + self.setTransState(state.New(block.Root(), self.db)) + queue[i] = ChainEvent{block} + queueEvent.canonicalCount++ + } else { + queue[i] = ChainSideEvent{block} + queueEvent.sideCount++ } } self.mu.Unlock() - if canonical { - /* - jsonlogger.LogJson(&logger.EthChainNewHead{ - BlockHash: ethutil.Bytes2Hex(block.Hash()), - BlockNumber: block.Number(), - ChainHeadHash: ethutil.Bytes2Hex(cblock.Hash()), - BlockPrevHash: ethutil.Bytes2Hex(block.ParentHash()), - }) - */ - self.setTransState(state.New(block.Root(), self.db)) - self.eventMux.Post(ChainEvent{block, td}) - } else { - //self.eventMux. - } - - if split { - self.setTxState(state.New(block.Root(), self.db)) - self.eventMux.Post(ChainSplitEvent{block}) - } } + // XXX put this in a goroutine? + go self.eventMux.Post(queueEvent) + return nil } +func (self *ChainManager) update() { + events := self.eventMux.Subscribe(queueEvent{}) + +out: + for { + select { + case ev := <-events.Chan(): + switch ev := ev.(type) { + case queueEvent: + for i, event := range ev.queue { + switch event := event.(type) { + case ChainEvent: + // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long + // and in most cases isn't even necessary. + if i == ev.canonicalCount { + self.eventMux.Post(ChainHeadEvent{event.Block}) + } + case ChainSplitEvent: + // On chain splits we need to reset the transaction state. We can't be sure whether the actual + // state of the accounts are still valid. + if i == ev.splitCount { + self.setTxState(state.New(event.Block.Root(), self.db)) + } + } + + self.eventMux.Post(event) + } + } + case <-self.quit: + break out + } + } +} + // Satisfy state query interface func (self *ChainManager) GetAccount(addr []byte) *state.StateObject { return self.State().GetAccount(addr) diff --git a/core/events.go b/core/events.go index 4cbbc609c..23678ef60 100644 --- a/core/events.go +++ b/core/events.go @@ -16,3 +16,13 @@ type NewMinedBlockEvent struct{ Block *types.Block } // ChainSplit is posted when a new head is detected type ChainSplitEvent struct{ Block *types.Block } + +type ChainEvent struct{ Block *types.Block } + +type ChainSideEvent struct{ Block *types.Block } + +type ChainHeadEvent struct{ Block *types.Block } + +// Mining operation events +type StartMining struct{} +type TopMining struct{} |