From fbb307cca075b9a253316434f016a4820783a02d Mon Sep 17 00:00:00 2001 From: Taylor Gerring Date: Thu, 5 Mar 2015 10:58:13 -0600 Subject: Added eth.chain.new_head --- core/chain_manager.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'core') diff --git a/core/chain_manager.go b/core/chain_manager.go index 959bfd398..81f085c47 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -15,6 +15,7 @@ import ( ) var chainlogger = logger.NewLogger("CHAIN") +var jsonlogger = logger.NewJsonLogger() type ChainEvent struct { Block *types.Block @@ -122,7 +123,7 @@ func (self *ChainManager) Status() (td *big.Int, currentBlock []byte, genesisBlo self.mu.RLock() defer self.mu.RUnlock() - return self.td, self.currentBlock.Hash(), self.Genesis().Hash() + return self.td, self.currentBlock.Hash(), self.genesisBlock.Hash() } func (self *ChainManager) SetProcessor(proc types.BlockProcessor) { @@ -395,11 +396,11 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { var canonical, split bool self.mu.Lock() + cblock := self.currentBlock { // Write block to database. Eventually we'll have to improve on this and throw away blocks that are // not in the canonical chain. self.write(block) - cblock := self.currentBlock // Compare the TD of the last known block in the canonical chain to make sure it's greater. // At this point it's possible that a different chain (fork) becomes the new canonical chain. if td.Cmp(self.td) > 0 { @@ -417,6 +418,12 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { self.mu.Unlock() if canonical { + jsonlogger.LogJson(&logger.EthChainNewHead{ + BlockHash: ethutil.Bytes2Hex(block.Hash()), + BlockNumber: block.Number(), + ChainHeadHash: ethutil.Bytes2Hex(cblock.Hash()), + BlockPrevHash: ethutil.Bytes2Hex(block.ParentHash()), + }) self.setTransState(state.New(block.Root(), self.db)) self.eventMux.Post(ChainEvent{block, td}) } -- cgit v1.2.3 From 88ff13c241faff1d58e47f12bd283c112de7225a Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 5 Mar 2015 19:51:25 +0100 Subject: Spec changes. * All errors during state transition result in an invalid tx --- core/error.go | 16 ++++++++++++++++ core/execution.go | 8 +++++--- core/state_transition.go | 13 ++++++++++--- 3 files changed, 31 insertions(+), 6 deletions(-) (limited to 'core') diff --git a/core/error.go b/core/error.go index 514cd076b..04e40646c 100644 --- a/core/error.go +++ b/core/error.go @@ -146,3 +146,19 @@ func IsKnownBlockErr(e error) bool { _, ok := e.(*KnownBlockError) return ok } + +type ValueTransferError struct { + message string +} + +func ValueTransferErr(str string, v ...interface{}) *ValueTransferError { + return &ValueTransferError{fmt.Sprintf(str, v...)} +} + +func (self *ValueTransferError) Error() string { + return self.message +} +func IsValueTransferErr(e error) bool { + _, ok := e.(*ValueTransferError) + return ok +} diff --git a/core/execution.go b/core/execution.go index f7d5a8945..4a69cce09 100644 --- a/core/execution.go +++ b/core/execution.go @@ -1,7 +1,6 @@ package core import ( - "fmt" "math/big" "time" @@ -26,7 +25,10 @@ func (self *Execution) Addr() []byte { func (self *Execution) Call(codeAddr []byte, caller vm.ContextRef) ([]byte, error) { // Retrieve the executing code - code := self.env.State().GetCode(codeAddr) + var code []byte + if self.env.State().GetStateObject(codeAddr) != nil { + code = self.env.State().GetCode(codeAddr) + } return self.exec(code, codeAddr, caller) } @@ -55,7 +57,7 @@ func (self *Execution) exec(code, contextAddr []byte, caller vm.ContextRef) (ret caller.ReturnGas(self.Gas, self.price) - return nil, fmt.Errorf("insufficient funds to transfer value. Req %v, has %v", self.value, from.Balance()) + return nil, ValueTransferErr("insufficient funds to transfer value. Req %v, has %v", self.value, from.Balance()) } snapshot := env.State().Copy() diff --git a/core/state_transition.go b/core/state_transition.go index 00e383f3f..f54acd6ee 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -3,6 +3,7 @@ package core import ( "fmt" "math/big" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethutil" "github.com/ethereum/go-ethereum/state" @@ -185,7 +186,7 @@ func (self *StateTransition) TransitionState() (ret []byte, err error) { } } if err = self.UseGas(big.NewInt(dgas)); err != nil { - return + return nil, InvalidTxError(err) } //stateCopy := self.env.State().Copy() @@ -231,10 +232,16 @@ func (self *StateTransition) TransitionState() (ret []byte, err error) { */ } - if err != nil { - self.UseGas(self.gas) + if err != nil && IsValueTransferErr(err) { + return nil, InvalidTxError(err) } + /* + if err != nil { + self.UseGas(self.gas) + } + */ + return } -- cgit v1.2.3 From 23ad2f02c0992c212d7d179991560eb44f1b1f78 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 6 Mar 2015 10:22:40 +0100 Subject: debug comments & pow handling --- core/chain_manager.go | 5 ----- 1 file changed, 5 deletions(-) (limited to 'core') diff --git a/core/chain_manager.go b/core/chain_manager.go index 1152e3fa2..75d2f6bad 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -66,7 +66,6 @@ func CalcGasLimit(parent, block *types.Block) *big.Int { } // ((1024-1) * parent.gasLimit + (gasUsed * 6 / 5)) / 1024 - previous := new(big.Int).Mul(big.NewInt(1024-1), parent.GasLimit()) current := new(big.Rat).Mul(new(big.Rat).SetInt(parent.GasUsed()), big.NewRat(6, 5)) curInt := new(big.Int).Div(current.Num(), current.Denom()) @@ -291,7 +290,6 @@ func (self *ChainManager) GetBlockHashesFromHash(hash []byte, max uint64) (chain if block == nil { return } - // XXX Could be optimised by using a different database which only holds hashes (i.e., linked list) for i := uint64(0); i < max; i++ { parentHash := block.Header().ParentHash @@ -396,12 +394,9 @@ func (bc *ChainManager) Stop() { } func (self *ChainManager) InsertChain(chain types.Blocks) error { - println("insert chain start") self.tsmu.Lock() defer self.tsmu.Unlock() - defer println("insert chain end") - for _, block := range chain { // Call in to the block processor and check for errors. It's likely that if one block fails // all others will fail too (unless a known block is returned). -- cgit v1.2.3 From 8d9be18b296afb8302249dcc96651aabb0975e26 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 6 Mar 2015 15:50:44 +0100 Subject: Queued approach to delivering chain events --- core/chain_manager.go | 107 ++++++++++++++++++++++++++++++++++---------------- core/events.go | 10 +++++ 2 files changed, 83 insertions(+), 34 deletions(-) (limited to 'core') diff --git a/core/chain_manager.go b/core/chain_manager.go index 75d2f6bad..20a1737ad 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -19,11 +19,6 @@ var ( jsonlogger = logger.NewJsonLogger() ) -type ChainEvent struct { - Block *types.Block - Td *big.Int -} - type StateQuery interface { GetAccount(addr []byte) *state.StateObject } @@ -93,13 +88,16 @@ type ChainManager struct { transState *state.StateDB txState *state.StateDB + + quit chan struct{} } func NewChainManager(db ethutil.Database, mux *event.TypeMux) *ChainManager { - bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: mux} + bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: mux, quit: make(chan struct{})} bc.setLastBlock() bc.transState = bc.State().Copy() bc.txState = bc.State().Copy() + go bc.update() return bc } @@ -388,16 +386,24 @@ func (self *ChainManager) CalcTotalDiff(block *types.Block) (*big.Int, error) { } func (bc *ChainManager) Stop() { - if bc.CurrentBlock != nil { - chainlogger.Infoln("Stopped") - } + close(bc.quit) +} + +type queueEvent struct { + queue []interface{} + canonicalCount int + sideCount int + splitCount int } func (self *ChainManager) InsertChain(chain types.Blocks) error { - self.tsmu.Lock() - defer self.tsmu.Unlock() + //self.tsmu.Lock() + //defer self.tsmu.Unlock() - for _, block := range chain { + // A queued approach to delivering events. This is generally faster than direct delivery and requires much less mutex acquiring. + var queue = make([]interface{}, len(chain)) + var queueEvent = queueEvent{queue: queue} + for i, block := range chain { // Call in to the block processor and check for errors. It's likely that if one block fails // all others will fail too (unless a known block is returned). td, err := self.processor.Process(block) @@ -414,7 +420,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { } block.Td = td - var canonical, split bool self.mu.Lock() cblock := self.currentBlock { @@ -426,41 +431,75 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { if td.Cmp(self.td) > 0 { if block.Header().Number.Cmp(new(big.Int).Add(cblock.Header().Number, ethutil.Big1)) < 0 { chainlogger.Infof("Split detected. New head #%v (%x) TD=%v, was #%v (%x) TD=%v\n", block.Header().Number, block.Hash()[:4], td, cblock.Header().Number, cblock.Hash()[:4], self.td) - split = true + + queue[i] = ChainSplitEvent{block} + queueEvent.splitCount++ } self.setTotalDifficulty(td) self.insert(block) - canonical = true + /* + jsonlogger.LogJson(&logger.EthChainNewHead{ + BlockHash: ethutil.Bytes2Hex(block.Hash()), + BlockNumber: block.Number(), + ChainHeadHash: ethutil.Bytes2Hex(cblock.Hash()), + BlockPrevHash: ethutil.Bytes2Hex(block.ParentHash()), + }) + */ + + self.setTransState(state.New(block.Root(), self.db)) + queue[i] = ChainEvent{block} + queueEvent.canonicalCount++ + } else { + queue[i] = ChainSideEvent{block} + queueEvent.sideCount++ } } self.mu.Unlock() - if canonical { - /* - jsonlogger.LogJson(&logger.EthChainNewHead{ - BlockHash: ethutil.Bytes2Hex(block.Hash()), - BlockNumber: block.Number(), - ChainHeadHash: ethutil.Bytes2Hex(cblock.Hash()), - BlockPrevHash: ethutil.Bytes2Hex(block.ParentHash()), - }) - */ - self.setTransState(state.New(block.Root(), self.db)) - self.eventMux.Post(ChainEvent{block, td}) - } else { - //self.eventMux. - } - - if split { - self.setTxState(state.New(block.Root(), self.db)) - self.eventMux.Post(ChainSplitEvent{block}) - } } + // XXX put this in a goroutine? + go self.eventMux.Post(queueEvent) + return nil } +func (self *ChainManager) update() { + events := self.eventMux.Subscribe(queueEvent{}) + +out: + for { + select { + case ev := <-events.Chan(): + switch ev := ev.(type) { + case queueEvent: + for i, event := range ev.queue { + switch event := event.(type) { + case ChainEvent: + // We need some control over the mining operation. Acquiring locks and waiting for the miner to create new block takes too long + // and in most cases isn't even necessary. + if i == ev.canonicalCount { + self.eventMux.Post(ChainHeadEvent{event.Block}) + } + case ChainSplitEvent: + // On chain splits we need to reset the transaction state. We can't be sure whether the actual + // state of the accounts are still valid. + if i == ev.splitCount { + self.setTxState(state.New(event.Block.Root(), self.db)) + } + } + + self.eventMux.Post(event) + } + } + case <-self.quit: + break out + } + } +} + // Satisfy state query interface func (self *ChainManager) GetAccount(addr []byte) *state.StateObject { return self.State().GetAccount(addr) diff --git a/core/events.go b/core/events.go index 4cbbc609c..23678ef60 100644 --- a/core/events.go +++ b/core/events.go @@ -16,3 +16,13 @@ type NewMinedBlockEvent struct{ Block *types.Block } // ChainSplit is posted when a new head is detected type ChainSplitEvent struct{ Block *types.Block } + +type ChainEvent struct{ Block *types.Block } + +type ChainSideEvent struct{ Block *types.Block } + +type ChainHeadEvent struct{ Block *types.Block } + +// Mining operation events +type StartMining struct{} +type TopMining struct{} -- cgit v1.2.3 From cd856cb2133d390758bb24b88fa3b538bb7bc306 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 6 Mar 2015 18:26:16 +0100 Subject: Separated block db from state db. Partial fix for #416 --- core/chain_makers.go | 2 +- core/chain_manager.go | 31 ++++++++++++++++--------------- core/manager.go | 3 ++- 3 files changed, 19 insertions(+), 17 deletions(-) (limited to 'core') diff --git a/core/chain_makers.go b/core/chain_makers.go index fad9ac97b..b5c50dc3d 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -108,7 +108,7 @@ func makeChain(bman *BlockProcessor, parent *types.Block, max int, db ethutil.Da // Create a new chain manager starting from given block // Effectively a fork factory func newChainManager(block *types.Block, eventMux *event.TypeMux, db ethutil.Database) *ChainManager { - bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: eventMux} + bc := &ChainManager{blockDb: db, stateDb: db, genesisBlock: GenesisBlock(db), eventMux: eventMux} if block == nil { bc.Reset() } else { diff --git a/core/chain_manager.go b/core/chain_manager.go index 20a1737ad..7ee182734 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -75,7 +75,8 @@ func CalcGasLimit(parent, block *types.Block) *big.Int { type ChainManager struct { //eth EthManager - db ethutil.Database + blockDb ethutil.Database + stateDb ethutil.Database processor types.BlockProcessor eventMux *event.TypeMux genesisBlock *types.Block @@ -92,8 +93,8 @@ type ChainManager struct { quit chan struct{} } -func NewChainManager(db ethutil.Database, mux *event.TypeMux) *ChainManager { - bc := &ChainManager{db: db, genesisBlock: GenesisBlock(db), eventMux: mux, quit: make(chan struct{})} +func NewChainManager(blockDb, stateDb ethutil.Database, mux *event.TypeMux) *ChainManager { + bc := &ChainManager{blockDb: blockDb, stateDb: stateDb, genesisBlock: GenesisBlock(stateDb), eventMux: mux, quit: make(chan struct{})} bc.setLastBlock() bc.transState = bc.State().Copy() bc.txState = bc.State().Copy() @@ -135,7 +136,7 @@ func (self *ChainManager) SetProcessor(proc types.BlockProcessor) { } func (self *ChainManager) State() *state.StateDB { - return state.New(self.CurrentBlock().Root(), self.db) + return state.New(self.CurrentBlock().Root(), self.stateDb) } func (self *ChainManager) TransState() *state.StateDB { @@ -163,7 +164,7 @@ func (self *ChainManager) setTransState(statedb *state.StateDB) { } func (bc *ChainManager) setLastBlock() { - data, _ := bc.db.Get([]byte("LastBlock")) + data, _ := bc.blockDb.Get([]byte("LastBlock")) if len(data) != 0 { var block types.Block rlp.Decode(bytes.NewReader(data), &block) @@ -171,7 +172,7 @@ func (bc *ChainManager) setLastBlock() { bc.lastBlockHash = block.Hash() // Set the last know difficulty (might be 0x0 as initial value, Genesis) - bc.td = ethutil.BigD(bc.db.LastKnownTD()) + bc.td = ethutil.BigD(bc.blockDb.LastKnownTD()) } else { bc.Reset() } @@ -220,7 +221,7 @@ func (bc *ChainManager) Reset() { defer bc.mu.Unlock() for block := bc.currentBlock; block != nil; block = bc.GetBlock(block.Header().ParentHash) { - bc.db.Delete(block.Hash()) + bc.blockDb.Delete(block.Hash()) } // Prepare the genesis block @@ -236,7 +237,7 @@ func (bc *ChainManager) ResetWithGenesisBlock(gb *types.Block) { defer bc.mu.Unlock() for block := bc.currentBlock; block != nil; block = bc.GetBlock(block.Header().ParentHash) { - bc.db.Delete(block.Hash()) + bc.blockDb.Delete(block.Hash()) } // Prepare the genesis block @@ -262,14 +263,14 @@ func (self *ChainManager) Export() []byte { func (bc *ChainManager) insert(block *types.Block) { encodedBlock := ethutil.Encode(block) - bc.db.Put([]byte("LastBlock"), encodedBlock) + bc.blockDb.Put([]byte("LastBlock"), encodedBlock) bc.currentBlock = block bc.lastBlockHash = block.Hash() } func (bc *ChainManager) write(block *types.Block) { encodedBlock := ethutil.Encode(block.RlpDataForStorage()) - bc.db.Put(block.Hash(), encodedBlock) + bc.blockDb.Put(block.Hash(), encodedBlock) } // Accessors @@ -279,7 +280,7 @@ func (bc *ChainManager) Genesis() *types.Block { // Block fetching methods func (bc *ChainManager) HasBlock(hash []byte) bool { - data, _ := bc.db.Get(hash) + data, _ := bc.blockDb.Get(hash) return len(data) != 0 } @@ -307,7 +308,7 @@ func (self *ChainManager) GetBlockHashesFromHash(hash []byte, max uint64) (chain } func (self *ChainManager) GetBlock(hash []byte) *types.Block { - data, _ := self.db.Get(hash) + data, _ := self.blockDb.Get(hash) if len(data) == 0 { return nil } @@ -361,7 +362,7 @@ func (self *ChainManager) GetBlockByNumber(num uint64) *types.Block { } func (bc *ChainManager) setTotalDifficulty(td *big.Int) { - bc.db.Put([]byte("LTD"), td.Bytes()) + bc.blockDb.Put([]byte("LTD"), td.Bytes()) bc.td = td } @@ -448,7 +449,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) error { }) */ - self.setTransState(state.New(block.Root(), self.db)) + self.setTransState(state.New(block.Root(), self.stateDb)) queue[i] = ChainEvent{block} queueEvent.canonicalCount++ } else { @@ -487,7 +488,7 @@ out: // On chain splits we need to reset the transaction state. We can't be sure whether the actual // state of the accounts are still valid. if i == ev.splitCount { - self.setTxState(state.New(event.Block.Root(), self.db)) + self.setTxState(state.New(event.Block.Root(), self.stateDb)) } } diff --git a/core/manager.go b/core/manager.go index bb039d063..803069377 100644 --- a/core/manager.go +++ b/core/manager.go @@ -15,6 +15,7 @@ type Backend interface { IsListening() bool Peers() []*p2p.Peer KeyManager() *crypto.KeyManager - Db() ethutil.Database + BlockDb() ethutil.Database + StateDb() ethutil.Database EventMux() *event.TypeMux } -- cgit v1.2.3 From 22b493a6ff3e9692676c17f8d766ff1117da9485 Mon Sep 17 00:00:00 2001 From: obscuren Date: Fri, 6 Mar 2015 20:07:35 +0100 Subject: Fixed tests --- core/block_processor_test.go | 9 +++++---- core/chain_manager_test.go | 10 +++++----- 2 files changed, 10 insertions(+), 9 deletions(-) (limited to 'core') diff --git a/core/block_processor_test.go b/core/block_processor_test.go index 35aeaa714..a031c2669 100644 --- a/core/block_processor_test.go +++ b/core/block_processor_test.go @@ -6,14 +6,15 @@ import ( "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/pow/ezp" ) func proc() (*BlockProcessor, *ChainManager) { db, _ := ethdb.NewMemDatabase() var mux event.TypeMux - chainMan := NewChainManager(db, &mux) - return NewBlockProcessor(db, nil, chainMan, &mux), chainMan + chainMan := NewChainManager(db, db, &mux) + return NewBlockProcessor(db, ezp.New(), nil, chainMan, &mux), chainMan } func TestNumber(t *testing.T) { @@ -21,13 +22,13 @@ func TestNumber(t *testing.T) { block1 := chain.NewBlock(nil) block1.Header().Number = big.NewInt(3) - err := bp.ValidateBlock(block1, chain.Genesis()) + err := bp.ValidateHeader(block1.Header(), chain.Genesis().Header()) if err != BlockNumberErr { t.Errorf("expected block number error") } block1 = chain.NewBlock(nil) - err = bp.ValidateBlock(block1, chain.Genesis()) + err = bp.ValidateHeader(block1.Header(), chain.Genesis().Header()) if err == BlockNumberErr { t.Errorf("didn't expect block number error") } diff --git a/core/chain_manager_test.go b/core/chain_manager_test.go index b562b677d..e78c2e980 100644 --- a/core/chain_manager_test.go +++ b/core/chain_manager_test.go @@ -257,9 +257,9 @@ func TestChainInsertions(t *testing.T) { } var eventMux event.TypeMux - chainMan := NewChainManager(db, &eventMux) + chainMan := NewChainManager(db, db, &eventMux) txPool := NewTxPool(&eventMux) - blockMan := NewBlockProcessor(db, txPool, chainMan, &eventMux) + blockMan := NewBlockProcessor(db, nil, txPool, chainMan, &eventMux) chainMan.SetProcessor(blockMan) const max = 2 @@ -303,9 +303,9 @@ func TestChainMultipleInsertions(t *testing.T) { } } var eventMux event.TypeMux - chainMan := NewChainManager(db, &eventMux) + chainMan := NewChainManager(db, db, &eventMux) txPool := NewTxPool(&eventMux) - blockMan := NewBlockProcessor(db, txPool, chainMan, &eventMux) + blockMan := NewBlockProcessor(db, nil, txPool, chainMan, &eventMux) chainMan.SetProcessor(blockMan) done := make(chan bool, max) for i, chain := range chains { @@ -332,7 +332,7 @@ func TestGetAncestors(t *testing.T) { db, _ := ethdb.NewMemDatabase() var eventMux event.TypeMux - chainMan := NewChainManager(db, &eventMux) + chainMan := NewChainManager(db, db, &eventMux) chain, err := loadChain("valid1", t) if err != nil { fmt.Println(err) -- cgit v1.2.3