From 92f2abdf769f52ea8e5e6d02bf326744e926f5b4 Mon Sep 17 00:00:00 2001 From: obscuren Date: Wed, 5 Mar 2014 10:42:51 +0100 Subject: Partially refactored server/txpool/block manager/block chain The Ethereum structure now complies to a EthManager interface which is being used by the tx pool, block manager and block chain in order to gain access to each other. It's become simpeler. TODO: BlockManager => StateManager --- ethchain/block_chain.go | 29 +++++++++++- ethchain/block_manager.go | 102 +++++++++++++++++++++++-------------------- ethchain/state.go | 38 ++++++++++++++++ ethchain/transaction_pool.go | 18 +++----- ethereum.go | 35 ++++++++++----- ethutil/value.go | 29 ++++++++++++ ethutil/value_test.go | 13 ++++++ peer.go | 20 +++++---- 8 files changed, 203 insertions(+), 81 deletions(-) diff --git a/ethchain/block_chain.go b/ethchain/block_chain.go index 026fc1cea..2865e0a21 100644 --- a/ethchain/block_chain.go +++ b/ethchain/block_chain.go @@ -9,6 +9,7 @@ import ( ) type BlockChain struct { + Ethereum EthManager // The famous, the fabulous Mister GENESIIIIIIS (block) genesisBlock *Block // Last known total difficulty @@ -20,7 +21,7 @@ type BlockChain struct { LastBlockHash []byte } -func NewBlockChain() *BlockChain { +func NewBlockChain(ethereum EthManager) *BlockChain { bc := &BlockChain{} bc.genesisBlock = NewBlockFromData(ethutil.Encode(Genesis)) @@ -129,6 +130,21 @@ func (bc *BlockChain) GetChain(hash []byte, amount int) []*Block { return blocks } +func AddTestNetFunds(block *Block) { + for _, addr := range []string{ + "8a40bfaa73256b60764c1bf40675a99083efb075", // Gavin + "e6716f9544a56c530d868e4bfbacb172315bdead", // Jeffrey + "1e12515ce3e0f817a4ddef9ca55788a1d66bd2df", // Vit + "1a26338f0d905e295fccb71fa9ea849ffa12aaf4", // Alex + } { + //log.Println("2^200 Wei to", addr) + codedAddr := ethutil.FromHex(addr) + addr := block.state.GetAccount(codedAddr) + addr.Amount = ethutil.BigPow(2, 200) + block.state.UpdateAccount(codedAddr, addr) + } +} + func (bc *BlockChain) setLastBlock() { data, _ := ethutil.Config.Db.Get([]byte("LastBlock")) if len(data) != 0 { @@ -139,10 +155,21 @@ func (bc *BlockChain) setLastBlock() { bc.LastBlockNumber = info.Number log.Printf("[CHAIN] Last known block height #%d\n", bc.LastBlockNumber) + } else { + AddTestNetFunds(bc.genesisBlock) + + bc.genesisBlock.state.trie.Sync() + // Prepare the genesis block + bc.Add(bc.genesisBlock) + + //log.Printf("root %x\n", bm.bc.genesisBlock.State().Root) + //bm.bc.genesisBlock.PrintHash() } // Set the last know difficulty (might be 0x0 as initial value, Genesis) bc.TD = ethutil.BigD(ethutil.Config.Db.LastKnownTD()) + + log.Printf("Last block: %x\n", bc.CurrentBlock.Hash()) } func (bc *BlockChain) SetTotalDifficulty(td *big.Int) { diff --git a/ethchain/block_manager.go b/ethchain/block_manager.go index b184fa9c9..fa50304ea 100644 --- a/ethchain/block_manager.go +++ b/ethchain/block_manager.go @@ -2,11 +2,9 @@ package ethchain import ( "bytes" - "encoding/hex" "fmt" "github.com/ethereum/eth-go/ethutil" - _ "github.com/ethereum/eth-go/ethwire" - "log" + "github.com/ethereum/eth-go/ethwire" "math/big" "sync" "time" @@ -16,14 +14,20 @@ type BlockProcessor interface { ProcessBlock(block *Block) } +type EthManager interface { + StateManager() *BlockManager + BlockChain() *BlockChain + TxPool() *TxPool + Broadcast(msgType ethwire.MsgType, data []interface{}) +} + // TODO rename to state manager type BlockManager struct { // Mutex for locking the block processor. Blocks can only be handled one at a time mutex sync.Mutex - // The block chain :) + // Canonical block chain bc *BlockChain - // States for addresses. You can watch any address // at any given time addrStateStore *AddrStateStore @@ -33,59 +37,41 @@ type BlockManager struct { // non-persistent key/value memory storage mem map[string]*big.Int - TransactionPool *TxPool - Pow PoW - Speaker PublicSpeaker + Ethereum EthManager SecondaryBlockProcessor BlockProcessor -} -func AddTestNetFunds(block *Block) { - for _, addr := range []string{ - "8a40bfaa73256b60764c1bf40675a99083efb075", // Gavin - "e6716f9544a56c530d868e4bfbacb172315bdead", // Jeffrey - "1e12515ce3e0f817a4ddef9ca55788a1d66bd2df", // Vit - "1a26338f0d905e295fccb71fa9ea849ffa12aaf4", // Alex - } { - //log.Println("2^200 Wei to", addr) - codedAddr, _ := hex.DecodeString(addr) - addr := block.state.GetAccount(codedAddr) - addr.Amount = ethutil.BigPow(2, 200) - block.state.UpdateAccount(codedAddr, addr) - } + // The managed states + // Processor state. Anything processed will be applied to this + // state + procState *State + // Comparative state it used for comparing and validating end + // results + compState *State } -func NewBlockManager(speaker PublicSpeaker) *BlockManager { +func NewBlockManager(ethereum EthManager) *BlockManager { bm := &BlockManager{ - //server: s, - bc: NewBlockChain(), stack: NewStack(), mem: make(map[string]*big.Int), Pow: &EasyPow{}, - Speaker: speaker, + Ethereum: ethereum, addrStateStore: NewAddrStateStore(), + bc: ethereum.BlockChain(), } - if bm.bc.CurrentBlock == nil { - AddTestNetFunds(bm.bc.genesisBlock) - - bm.bc.genesisBlock.state.trie.Sync() - // Prepare the genesis block - bm.bc.Add(bm.bc.genesisBlock) - - //log.Printf("root %x\n", bm.bc.genesisBlock.State().Root) - //bm.bc.genesisBlock.PrintHash() - } - - log.Printf("Last block: %x\n", bm.bc.CurrentBlock.Hash()) - return bm } +func (bm *BlockManager) ProcState() *State { + return bm.procState +} + // Watches any given address and puts it in the address state store func (bm *BlockManager) WatchAddr(addr []byte) *AccountState { + //FIXME account := bm.procState.GetAccount(addr) account := bm.bc.CurrentBlock.state.GetAccount(addr) return bm.addrStateStore.Add(addr, account) @@ -105,17 +91,26 @@ func (bm *BlockManager) BlockChain() *BlockChain { return bm.bc } +func (bm *BlockManager) MakeContract(tx *Transaction) { + contract := MakeContract(tx, bm.procState) + if contract != nil { + bm.procState.states[string(tx.Hash()[12:])] = contract.state + } +} + func (bm *BlockManager) ApplyTransactions(block *Block, txs []*Transaction) { // Process each transaction/contract for _, tx := range txs { // If there's no recipient, it's a contract if tx.IsContract() { + //FIXME bm.MakeContract(tx) block.MakeContract(tx) } else { + //FIXME if contract := procState.GetContract(tx.Recipient); contract != nil { if contract := block.state.GetContract(tx.Recipient); contract != nil { bm.ProcessContract(contract, tx, block) } else { - err := bm.TransactionPool.ProcessTransaction(tx, block) + err := bm.Ethereum.TxPool().ProcessTransaction(tx, block) if err != nil { ethutil.Config.Log.Infoln("[BMGR]", err) } @@ -124,6 +119,18 @@ func (bm *BlockManager) ApplyTransactions(block *Block, txs []*Transaction) { } } +// The prepare function, prepares the state manager for the next +// "ProcessBlock" action. +func (bm *BlockManager) Prepare(processer *State, comparative *State) { + bm.compState = comparative + bm.procState = processer +} + +// Default prepare function +func (bm *BlockManager) PrepareDefault(block *Block) { + bm.Prepare(bm.BlockChain().CurrentBlock.State(), block.State()) +} + // Block processing and validating with a given (temporarily) state func (bm *BlockManager) ProcessBlock(block *Block) error { // Processing a blocks may never happen simultaneously @@ -161,17 +168,20 @@ func (bm *BlockManager) ProcessBlock(block *Block) error { return err } + // if !bm.compState.Cmp(bm.procState) if !block.state.Cmp(bm.bc.CurrentBlock.state) { return fmt.Errorf("Invalid merkle root. Expected %x, got %x", block.State().trie.Root, bm.bc.CurrentBlock.State().trie.Root) + //FIXME return fmt.Errorf("Invalid merkle root. Expected %x, got %x", bm.compState.trie.Root, bm.procState.trie.Root) } // Calculate the new total difficulty and sync back to the db if bm.CalculateTD(block) { // Sync the current block's state to the database and cancelling out the deferred Undo bm.bc.CurrentBlock.Sync() + //FIXME bm.procState.Sync() // Broadcast the valid block back to the wire - //bm.Speaker.Broadcast(ethwire.MsgBlockTy, []interface{}{block.Value().Val}) + //bm.Ethereum.Broadcast(ethwire.MsgBlockTy, []interface{}{block.Value().Val}) // Add the block to the chain bm.bc.Add(block) @@ -207,12 +217,6 @@ func (bm *BlockManager) CalculateTD(block *Block) bool { // Set the new total difficulty back to the block chain bm.bc.SetTotalDifficulty(td) - /* - if ethutil.Config.Debug { - log.Println("[BMGR] TD(block) =", td) - } - */ - return true } @@ -268,16 +272,19 @@ func CalculateUncleReward(block *Block) *big.Int { func (bm *BlockManager) AccumelateRewards(processor *Block, block *Block) error { // Get the coinbase rlp data addr := processor.state.GetAccount(block.Coinbase) + //FIXME addr := proc.GetAccount(block.Coinbase) // Reward amount of ether to the coinbase address addr.AddFee(CalculateBlockReward(block, len(block.Uncles))) processor.state.UpdateAccount(block.Coinbase, addr) + //FIXME proc.UpdateAccount(block.Coinbase, addr) for _, uncle := range block.Uncles { uncleAddr := processor.state.GetAccount(uncle.Coinbase) uncleAddr.AddFee(CalculateUncleReward(uncle)) processor.state.UpdateAccount(uncle.Coinbase, uncleAddr) + //FIXME proc.UpdateAccount(uncle.Coinbase, uncleAddr) } return nil @@ -298,6 +305,7 @@ func (bm *BlockManager) ProcessContract(contract *Contract, tx *Transaction, blo */ vm := &Vm{} + //vm.Process(contract, bm.procState, RuntimeVars{ vm.Process(contract, block.state, RuntimeVars{ address: tx.Hash()[12:], blockNumber: block.BlockInfo().Number, diff --git a/ethchain/state.go b/ethchain/state.go index e6649cf22..be25fe7b4 100644 --- a/ethchain/state.go +++ b/ethchain/state.go @@ -111,3 +111,41 @@ func (s *State) UpdateAccount(addr []byte, account *Account) { func (s *State) Cmp(other *State) bool { return s.trie.Cmp(other.trie) } + +type ObjType byte + +const ( + NilTy ObjType = iota + AccountTy + ContractTy + + UnknownTy +) + +// Returns the object stored at key and the type stored at key +// Returns nil if nothing is stored +func (s *State) Get(key []byte) (*ethutil.Value, ObjType) { + // Fetch data from the trie + data := s.trie.Get(string(key)) + // Returns the nil type, indicating nothing could be retrieved. + // Anything using this function should check for this ret val + if data == "" { + return nil, NilTy + } + + var typ ObjType + val := ethutil.NewValueFromBytes([]byte(data)) + // Check the length of the retrieved value. + // Len 2 = Account + // Len 3 = Contract + // Other = invalid for now. If other types emerge, add them here + if val.Len() == 2 { + typ = AccountTy + } else if val.Len() == 3 { + typ = ContractTy + } else { + typ = UnknownTy + } + + return val, typ +} diff --git a/ethchain/transaction_pool.go b/ethchain/transaction_pool.go index 763560570..2c9a26936 100644 --- a/ethchain/transaction_pool.go +++ b/ethchain/transaction_pool.go @@ -41,10 +41,6 @@ func FindTx(pool *list.List, finder func(*Transaction, *list.Element) bool) *Tra return nil } -type PublicSpeaker interface { - Broadcast(msgType ethwire.MsgType, data []interface{}) -} - type TxProcessor interface { ProcessTransaction(tx *Transaction) } @@ -55,8 +51,7 @@ type TxProcessor interface { // pool is being drained or synced for whatever reason the transactions // will simple queue up and handled when the mutex is freed. type TxPool struct { - //server *Server - Speaker PublicSpeaker + Ethereum EthManager // The mutex for accessing the Tx pool. mutex sync.Mutex // Queueing channel for reading and writing incoming @@ -67,20 +62,19 @@ type TxPool struct { // The actual pool pool *list.List - BlockManager *BlockManager - SecondaryProcessor TxProcessor subscribers []chan TxMsg } -func NewTxPool() *TxPool { +func NewTxPool(ethereum EthManager) *TxPool { return &TxPool{ //server: s, mutex: sync.Mutex{}, pool: list.New(), queueChan: make(chan *Transaction, txPoolQueueSize), quit: make(chan bool), + Ethereum: ethereum, } } @@ -91,7 +85,7 @@ func (pool *TxPool) addTransaction(tx *Transaction) { pool.mutex.Unlock() // Broadcast the transaction to the rest of the peers - pool.Speaker.Broadcast(ethwire.MsgTxTy, []interface{}{tx.RlpData()}) + pool.Ethereum.Broadcast(ethwire.MsgTxTy, []interface{}{tx.RlpData()}) } // Process transaction validates the Tx and processes funds from the @@ -152,14 +146,14 @@ func (pool *TxPool) ProcessTransaction(tx *Transaction, block *Block) (err error func (pool *TxPool) ValidateTransaction(tx *Transaction) error { // Get the last block so we can retrieve the sender and receiver from // the merkle trie - block := pool.BlockManager.BlockChain().CurrentBlock + block := pool.Ethereum.BlockChain().CurrentBlock // Something has gone horribly wrong if this happens if block == nil { return errors.New("No last block on the block chain") } // Get the sender - accountState := pool.BlockManager.GetAddrState(tx.Sender()) + accountState := pool.Ethereum.StateManager().GetAddrState(tx.Sender()) sender := accountState.Account totAmount := new(big.Int).Add(tx.Value, new(big.Int).Mul(TxFee, TxFeeRat)) diff --git a/ethereum.go b/ethereum.go index b4a8cdb4a..2c8b2cceb 100644 --- a/ethereum.go +++ b/ethereum.go @@ -37,10 +37,12 @@ type Ethereum struct { //db *ethdb.LDBDatabase db ethutil.Database // Block manager for processing new blocks and managing the block chain - BlockManager *ethchain.BlockManager + blockManager *ethchain.BlockManager // The transaction pool. Transaction can be pushed on this pool // for later including in the blocks - TxPool *ethchain.TxPool + txPool *ethchain.TxPool + // The canonical chain + blockChain *ethchain.BlockChain // Peers (NYI) peers *list.List // Nonce @@ -87,19 +89,28 @@ func New(caps Caps, usePnp bool) (*Ethereum, error) { serverCaps: caps, nat: nat, } - ethereum.TxPool = ethchain.NewTxPool() - ethereum.TxPool.Speaker = ethereum - ethereum.BlockManager = ethchain.NewBlockManager(ethereum) - - ethereum.TxPool.BlockManager = ethereum.BlockManager - ethereum.BlockManager.TransactionPool = ethereum.TxPool + ethereum.txPool = ethchain.NewTxPool(ethereum) + ethereum.blockChain = ethchain.NewBlockChain(ethereum) + ethereum.blockManager = ethchain.NewBlockManager(ethereum) // Start the tx pool - ethereum.TxPool.Start() + ethereum.txPool.Start() return ethereum, nil } +func (s *Ethereum) BlockChain() *ethchain.BlockChain { + return s.blockChain +} + +func (s *Ethereum) StateManager() *ethchain.BlockManager { + return s.blockManager +} + +func (s *Ethereum) TxPool() *ethchain.TxPool { + return s.txPool +} + func (s *Ethereum) AddPeer(conn net.Conn) { peer := NewPeer(conn, s, true) @@ -253,7 +264,7 @@ func (s *Ethereum) Start() { if ethutil.Config.Seed { ethutil.Config.Log.Debugln("Seeding") // Testnet seed bootstrapping - resp, err := http.Get("http://www.ethereum.org/servers.poc3.txt") + resp, err := http.Get("https://www.ethereum.org/servers.poc3.txt") if err != nil { log.Println("Fetching seed failed:", err) return @@ -292,8 +303,8 @@ func (s *Ethereum) Stop() { close(s.quit) - s.TxPool.Stop() - s.BlockManager.Stop() + s.txPool.Stop() + s.blockManager.Stop() close(s.shutdownChan) } diff --git a/ethutil/value.go b/ethutil/value.go index 3dd84d12d..46681ec2a 100644 --- a/ethutil/value.go +++ b/ethutil/value.go @@ -224,3 +224,32 @@ func (val *Value) Append(v interface{}) *Value { return val } + +type ValueIterator struct { + value *Value + currentValue *Value + idx int +} + +func (val *Value) NewIterator() *ValueIterator { + return &ValueIterator{value: val} +} + +func (it *ValueIterator) Next() bool { + if it.idx >= it.value.Len() { + return false + } + + it.currentValue = it.value.Get(it.idx) + it.idx++ + + return true +} + +func (it *ValueIterator) Value() *Value { + return it.currentValue +} + +func (it *ValueIterator) Idx() int { + return it.idx +} diff --git a/ethutil/value_test.go b/ethutil/value_test.go index 0e2da5328..a100f44bc 100644 --- a/ethutil/value_test.go +++ b/ethutil/value_test.go @@ -50,3 +50,16 @@ func TestValueTypes(t *testing.T) { t.Errorf("expected BigInt to return '%v', got %v", bigExp, bigInt.BigInt()) } } + +func TestIterator(t *testing.T) { + value := NewValue([]interface{}{1, 2, 3}) + it := value.NewIterator() + values := []uint64{1, 2, 3} + i := 0 + for it.Next() { + if values[i] != it.Value().Uint() { + t.Errorf("Expected %d, got %d", values[i], it.Value().Uint()) + } + i++ + } +} diff --git a/peer.go b/peer.go index 271c8708e..22a4c32fd 100644 --- a/peer.go +++ b/peer.go @@ -18,7 +18,7 @@ const ( // The size of the output buffer for writing messages outputBufferSize = 50 // Current protocol version - ProtocolVersion = 7 + ProtocolVersion = 8 ) type DiscReason byte @@ -49,7 +49,7 @@ var discReasonToString = []string{ } func (d DiscReason) String() string { - if len(discReasonToString) > int(d) { + if len(discReasonToString) < int(d) { return "Unknown" } @@ -293,7 +293,8 @@ func (p *Peer) HandleInbound() { var err error for i := msg.Data.Len() - 1; i >= 0; i-- { block = ethchain.NewBlockFromRlpValue(msg.Data.Get(i)) - err = p.ethereum.BlockManager.ProcessBlock(block) + // FIXME p.ethereum.BlockManager.DefaultPrepare(block) + err = p.ethereum.StateManager().ProcessBlock(block) if err != nil { if ethutil.Config.Debug { @@ -332,7 +333,7 @@ func (p *Peer) HandleInbound() { // in the TxPool where it will undergo validation and // processing when a new block is found for i := 0; i < msg.Data.Len(); i++ { - p.ethereum.TxPool.QueueTransaction(ethchain.NewTransactionFromData(msg.Data.Get(i).Encode())) + p.ethereum.TxPool().QueueTransaction(ethchain.NewTransactionFromData(msg.Data.Get(i).Encode())) } case ethwire.MsgGetPeersTy: // Flag this peer as a 'requested of new peers' this to @@ -374,15 +375,16 @@ func (p *Peer) HandleInbound() { // Check each SHA block hash from the message and determine whether // the SHA is in the database for i := 0; i < l; i++ { - if data := msg.Data.Get(i).Bytes(); p.ethereum.BlockManager.BlockChain().HasBlock(data) { - parent = p.ethereum.BlockManager.BlockChain().GetBlock(data) + if data := + msg.Data.Get(i).Bytes(); p.ethereum.StateManager().BlockChain().HasBlock(data) { + parent = p.ethereum.BlockChain().GetBlock(data) break } } // If a parent is found send back a reply if parent != nil { - chain := p.ethereum.BlockManager.BlockChain().GetChainFromHash(parent.Hash(), amountOfBlocks) + chain := p.ethereum.BlockChain().GetChainFromHash(parent.Hash(), amountOfBlocks) p.QueueMessage(ethwire.NewMessage(ethwire.MsgBlockTy, chain)) } else { // If no blocks are found we send back a reply with msg not in chain @@ -554,10 +556,10 @@ func (p *Peer) String() string { func (p *Peer) CatchupWithPeer() { if !p.catchingUp { p.catchingUp = true - msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{p.ethereum.BlockManager.BlockChain().CurrentBlock.Hash(), uint64(50)}) + msg := ethwire.NewMessage(ethwire.MsgGetChainTy, []interface{}{p.ethereum.BlockChain().CurrentBlock.Hash(), uint64(50)}) p.QueueMessage(msg) - ethutil.Config.Log.Debugf("Requesting blockchain %x...\n", p.ethereum.BlockManager.BlockChain().CurrentBlock.Hash()[:4]) + ethutil.Config.Log.Debugf("Requesting blockchain %x...\n", p.ethereum.BlockChain().CurrentBlock.Hash()[:4]) } } -- cgit v1.2.3