aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorobscuren <geffobscura@gmail.com>2015-03-06 22:50:44 +0800
committerobscuren <geffobscura@gmail.com>2015-03-06 22:50:44 +0800
commit8d9be18b296afb8302249dcc96651aabb0975e26 (patch)
tree70b8e2ac38edcb8968b946bcfc0772f188d668d7
parentb72ca57775c7baa6d83cf00093496c9836039d47 (diff)
downloadgo-tangerine-8d9be18b296afb8302249dcc96651aabb0975e26.tar
go-tangerine-8d9be18b296afb8302249dcc96651aabb0975e26.tar.gz
go-tangerine-8d9be18b296afb8302249dcc96651aabb0975e26.tar.bz2
go-tangerine-8d9be18b296afb8302249dcc96651aabb0975e26.tar.lz
go-tangerine-8d9be18b296afb8302249dcc96651aabb0975e26.tar.xz
go-tangerine-8d9be18b296afb8302249dcc96651aabb0975e26.tar.zst
go-tangerine-8d9be18b296afb8302249dcc96651aabb0975e26.zip
Queued approach to delivering chain events
-rw-r--r--core/chain_manager.go107
-rw-r--r--core/events.go10
-rw-r--r--miner/miner.go1
-rw-r--r--miner/worker.go4
4 files changed, 86 insertions, 36 deletions
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 75d2f6bad..20a1737ad 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -19,11 +19,6 @@ var (
jsonlogger = logger.NewJsonLogger()
)
-type ChainEvent struct {
- Block *types.Block
- Td *big.Int
-}
-
type StateQuery interface {
GetAccount(addr []byte) *state.StateObject
}
@@ -93,13 +88,16 @@ type ChainManager struct {
transState *state.StateDB
txState *state.StateDB
+
+ quit chan struct{}
}
func NewChainManager(db ethutil.Database, mux *event.TypeMux) *ChainManager {
- bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: mux}
+ bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: mux, quit: make(chan struct{})}
bc.setLastBlock()
bc.transState = bc.State().Copy()
bc.txState = bc.State().Copy()
+ go bc.update()
return bc
}
@@ -388,16 +386,24 @@ func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) {
}
func (bc *ChainManager) Stop() {
- if bc.CurrentBlock != nil {
- chainlogger.Infoln("Stopped")
- }
+ close(bc.quit)
+}
+
+type queueEvent struct {
+ queue []interface{}
+ canonicalCount int
+ sideCount int
+ splitCount int
}
func (self *ChainManager) InsertChain(chain types.Blocks) error {
- self.tsmu.Lock()
- defer self.tsmu.Unlock()
+ //self.tsmu.Lock()
+ //defer self.tsmu.Unlock()
- for _, block := range chain {
+ // A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring.
+ var queue = make([]interface{}, len(chain))
+ var queueEvent = queueEvent{queue: queue}
+ for i, block := range chain {
// Call in to the block processor and check for errors. It's likely that if one block fails
// all others will fail too (unless a known block is returned).
td, err := self.processor.Process(block)
@@ -414,7 +420,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
}
block.Td = td
- var canonical, split bool
self.mu.Lock()
cblock := self.currentBlock
{
@@ -426,41 +431,75 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error {
if td.Cmp(self.td) > 0 {
if block.Header().Number.Cmp(new(big.Int).Add(cblock.Header().Number, ethutil.Big1)) < 0 {
chainlogger.Infof("Split detected. New head #%v (%x) TD=%v, was #%v (%x) TD=%v\n", block.Header().Number, block.Hash()[:4], td, cblock.Header().Number, cblock.Hash()[:4], self.td)
- split = true
+
+ queue[i] = ChainSplitEvent{block}
+ queueEvent.splitCount++
}
self.setTotalDifficulty(td)
self.insert(block)
- canonical = true
+ /*
+ jsonlogger.LogJson(&logger.EthChainNewHead{
+ BlockHash: ethutil.Bytes2Hex(block.Hash()),
+ BlockNumber: block.Number(),
+ ChainHeadHash: ethutil.Bytes2Hex(cblock.Hash()),
+ BlockPrevHash: ethutil.Bytes2Hex(block.ParentHash()),
+ })
+ */
+
+ self.setTransState(state.New(block.Root(), self.db))
+ queue[i] = ChainEvent{block}
+ queueEvent.canonicalCount++
+ } else {
+ queue[i] = ChainSideEvent{block}
+ queueEvent.sideCount++
}
}
self.mu.Unlock()
- if canonical {
- /*
- jsonlogger.LogJson(&logger.EthChainNewHead{
- BlockHash: ethutil.Bytes2Hex(block.Hash()),
- BlockNumber: block.Number(),
- ChainHeadHash: ethutil.Bytes2Hex(cblock.Hash()),
- BlockPrevHash: ethutil.Bytes2Hex(block.ParentHash()),
- })
- */
- self.setTransState(state.New(block.Root(), self.db))
- self.eventMux.Post(ChainEvent{block, td})
- } else {
- //self.eventMux.
- }
-
- if split {
- self.setTxState(state.New(block.Root(), self.db))
- self.eventMux.Post(ChainSplitEvent{block})
- }
}
+ // XXX put this in a goroutine?
+ go self.eventMux.Post(queueEvent)
+
return nil
}
+func (self *ChainManager) update() {
+ events := self.eventMux.Subscribe(queueEvent{})
+
+out:
+ for {
+ select {
+ case ev := <-events.Chan():
+ switch ev := ev.(type) {
+ case queueEvent:
+ for i, event := range ev.queue {
+ switch event := event.(type) {
+ case ChainEvent:
+ // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long
+ // and in most cases isn't even necessary.
+ if i == ev.canonicalCount {
+ self.eventMux.Post(ChainHeadEvent{event.Block})
+ }
+ case ChainSplitEvent:
+ // On chain splits we need to reset the transaction state. We can't be sure whether the actual
+ // state of the accounts are still valid.
+ if i == ev.splitCount {
+ self.setTxState(state.New(event.Block.Root(), self.db))
+ }
+ }
+
+ self.eventMux.Post(event)
+ }
+ }
+ case <-self.quit:
+ break out
+ }
+ }
+}
+
// Satisfy state query interface
func (self *ChainManager) GetAccount(addr []byte) *state.StateObject {
return self.State().GetAccount(addr)
diff --git a/core/events.go b/core/events.go
index 4cbbc609c..23678ef60 100644
--- a/core/events.go
+++ b/core/events.go
@@ -16,3 +16,13 @@ type NewMinedBlockEvent struct{ Block *types.Block }
// ChainSplit is posted when a new head is detected
type ChainSplitEvent struct{ Block *types.Block }
+
+type ChainEvent struct{ Block *types.Block }
+
+type ChainSideEvent struct{ Block *types.Block }
+
+type ChainHeadEvent struct{ Block *types.Block }
+
+// Mining operation events
+type StartMining struct{}
+type TopMining struct{}
diff --git a/miner/miner.go b/miner/miner.go
index 490296431..d3b1f578a 100644
--- a/miner/miner.go
+++ b/miner/miner.go
@@ -30,6 +30,7 @@ func New(coinbase []byte, eth core.Backend, pow pow.PoW, minerThreads int) *Mine
pow: pow,
}
+ minerThreads = 1
for i := 0; i < minerThreads; i++ {
miner.worker.register(NewCpuMiner(i, miner.pow))
}
diff --git a/miner/worker.go b/miner/worker.go
index 29992b327..cd105fa73 100644
--- a/miner/worker.go
+++ b/miner/worker.go
@@ -116,7 +116,7 @@ func (self *worker) register(agent Agent) {
}
func (self *worker) update() {
- events := self.mux.Subscribe(core.ChainEvent{}, core.NewMinedBlockEvent{})
+ events := self.mux.Subscribe(core.ChainHeadEvent{}, core.NewMinedBlockEvent{})
timer := time.NewTicker(2 * time.Second)
@@ -125,7 +125,7 @@ out:
select {
case event := <-events.Chan():
switch ev := event.(type) {
- case core.ChainEvent:
+ case core.ChainHeadEvent:
if self.current.block != ev.Block {
self.commitNewWork()
}