diff options
37 files changed, 1462 insertions, 482 deletions
diff --git a/cmd/evm/main.go b/cmd/evm/main.go index 7c9d27fac..f6ec8c21e 100644 --- a/cmd/evm/main.go +++ b/cmd/evm/main.go @@ -106,7 +106,7 @@ type VMEnv struct { depth int Gas *big.Int - time int64 + time uint64 logs []vm.StructLog } @@ -115,7 +115,7 @@ func NewEnv(state *state.StateDB, transactor common.Address, value *big.Int) *VM state: state, transactor: &transactor, value: value, - time: time.Now().Unix(), + time: uint64(time.Now().Unix()), } } @@ -123,7 +123,7 @@ func (self *VMEnv) State() *state.StateDB { return self.state } func (self *VMEnv) Origin() common.Address { return *self.transactor } func (self *VMEnv) BlockNumber() *big.Int { return common.Big0 } func (self *VMEnv) Coinbase() common.Address { return *self.transactor } -func (self *VMEnv) Time() int64 { return self.time } +func (self *VMEnv) Time() uint64 { return self.time } func (self *VMEnv) Difficulty() *big.Int { return common.Big1 } func (self *VMEnv) BlockHash() []byte { return make([]byte, 32) } func (self *VMEnv) Value() *big.Int { return self.value } diff --git a/cmd/geth/main.go b/cmd/geth/main.go index c46343a60..be40d5137 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -37,8 +37,12 @@ import ( "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/cmd/utils" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rpc/codec" "github.com/ethereum/go-ethereum/rpc/comms" @@ -68,6 +72,18 @@ func init() { app.Action = run app.HideVersion = true // we have a command to print the version app.Commands = []cli.Command{ + { + Action: blockRecovery, + Name: "recover", + Usage: "attempts to recover a corrupted database by setting a new block by number or hash. See help recover.", + Description: ` +The recover commands will attempt to read out the last +block based on that. + +recover #number recovers by number +recover <hex> recovers by hash +`, + }, blocktestCommand, importCommand, exportCommand, @@ -261,7 +277,6 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso utils.ExecFlag, utils.WhisperEnabledFlag, utils.VMDebugFlag, - utils.ProtocolVersionFlag, utils.NetworkIdFlag, utils.RPCCORSDomainFlag, utils.VerbosityFlag, @@ -439,6 +454,36 @@ func unlockAccount(ctx *cli.Context, am *accounts.Manager, account string) (pass return } +func blockRecovery(ctx *cli.Context) { + arg := ctx.Args().First() + if len(ctx.Args()) < 1 && len(arg) > 0 { + glog.Fatal("recover requires block number or hash") + } + + cfg := utils.MakeEthConfig(ClientIdentifier, nodeNameVersion, ctx) + blockDb, err := ethdb.NewLDBDatabase(filepath.Join(cfg.DataDir, "blockchain")) + if err != nil { + glog.Fatalln("could not open db:", err) + } + + var block *types.Block + if arg[0] == '#' { + block = core.GetBlockByNumber(blockDb, common.String2Big(arg[1:]).Uint64()) + } else { + block = core.GetBlockByHash(blockDb, common.HexToHash(arg)) + } + + if block == nil { + glog.Fatalln("block not found. Recovery failed") + } + + err = core.WriteHead(blockDb, block) + if err != nil { + glog.Fatalln("block write err", err) + } + glog.Infof("Recovery succesful. New HEAD %x\n", block.Hash()) +} + func startEth(ctx *cli.Context, eth *eth.Ethereum) { // Start Ethereum itself @@ -598,7 +643,7 @@ func version(c *cli.Context) { if gitCommit != "" { fmt.Println("Git Commit:", gitCommit) } - fmt.Println("Protocol Version:", c.GlobalInt(utils.ProtocolVersionFlag.Name)) + fmt.Println("Protocol Versions:", eth.ProtocolVersions) fmt.Println("Network Id:", c.GlobalInt(utils.NetworkIdFlag.Name)) fmt.Println("Go Version:", runtime.Version()) fmt.Println("OS:", runtime.GOOS) diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index f9458f346..45164741d 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -82,11 +82,6 @@ var ( Usage: "Data directory to be used", Value: DirectoryString{common.DefaultDataDir()}, } - ProtocolVersionFlag = cli.IntFlag{ - Name: "protocolversion", - Usage: "ETH protocol version (integer)", - Value: eth.ProtocolVersion, - } NetworkIdFlag = cli.IntFlag{ Name: "networkid", Usage: "Network Id (integer)", @@ -359,7 +354,6 @@ func MakeEthConfig(clientID, version string, ctx *cli.Context) *eth.Config { return ð.Config{ Name: common.MakeName(clientID, version), DataDir: ctx.GlobalString(DataDirFlag.Name), - ProtocolVersion: ctx.GlobalInt(ProtocolVersionFlag.Name), GenesisNonce: ctx.GlobalInt(GenesisNonceFlag.Name), BlockChainVersion: ctx.GlobalInt(BlockchainVersionFlag.Name), SkipBcVersionCheck: false, diff --git a/core/block_processor.go b/core/block_processor.go index 22d4c7c27..9b77d10eb 100644 --- a/core/block_processor.go +++ b/core/block_processor.go @@ -362,6 +362,13 @@ func ValidateHeader(pow pow.PoW, block *types.Header, parent *types.Block, check return fmt.Errorf("Block extra data too long (%d)", len(block.Extra)) } + if block.Time > uint64(time.Now().Unix()) { + return BlockFutureErr + } + if block.Time <= parent.Time() { + return BlockEqualTSErr + } + expd := CalcDifficulty(int64(block.Time), int64(parent.Time()), parent.Difficulty()) if expd.Cmp(block.Difficulty) != 0 { return fmt.Errorf("Difficulty check failed for block %v, %v", block.Difficulty, expd) @@ -377,20 +384,12 @@ func ValidateHeader(pow pow.PoW, block *types.Header, parent *types.Block, check return fmt.Errorf("GasLimit check failed for block %v (%v > %v)", block.GasLimit, a, b) } - if int64(block.Time) > time.Now().Unix() { - return BlockFutureErr - } - num := parent.Number() num.Sub(block.Number, num) if num.Cmp(big.NewInt(1)) != 0 { return BlockNumberErr } - if block.Time <= uint64(parent.Time()) { - return BlockEqualTSErr //ValidationError("Block timestamp equal or less than previous block (%v - %v)", block.Time, parent.Time) - } - if checkPow { // Verify the nonce of the block. Return an error if it's not valid if !pow.Verify(types.NewBlockWithHeader(block)) { diff --git a/core/canary.go b/core/canary.go index de77c4bba..90b4a2eaf 100644 --- a/core/canary.go +++ b/core/canary.go @@ -8,10 +8,10 @@ import ( ) var ( - jeff = common.HexToAddress("9d38997c624a71b21278389ea2fdc460d000e4b2") - vitalik = common.HexToAddress("b1e570be07eaa673e4fd0c8265b64ef739385709") - christoph = common.HexToAddress("529bc43a5d93789fa28de1961db6a07e752204ae") - gav = common.HexToAddress("e3e942b2aa524293c84ff6c7f87a6635790ad5e4") + jeff = common.HexToAddress("a8edb1ac2c86d3d9d78f96cd18001f60df29e52c") + vitalik = common.HexToAddress("1baf27b88c48dd02b744999cf3522766929d2b2a") + christoph = common.HexToAddress("60d11b58744784dc97f878f7e3749c0f1381a004") + gav = common.HexToAddress("4bb7e8ae99b645c2b7860b8f3a2328aae28bd80a") ) // Canary will check the 0'd address of the 4 contracts above. diff --git a/core/chain_makers.go b/core/chain_makers.go index 72ae7970e..013251d74 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -155,7 +155,7 @@ func makeHeader(parent *types.Block, state *state.StateDB) *types.Header { Root: state.Root(), ParentHash: parent.Hash(), Coinbase: parent.Coinbase(), - Difficulty: CalcDifficulty(time, parent.Time(), parent.Difficulty()), + Difficulty: CalcDifficulty(int64(time), int64(parent.Time()), parent.Difficulty()), GasLimit: CalcGasLimit(parent), GasUsed: new(big.Int), Number: new(big.Int).Add(parent.Number(), common.Big1), diff --git a/core/chain_manager.go b/core/chain_manager.go index 808ccd201..70a8b11c6 100644 --- a/core/chain_manager.go +++ b/core/chain_manager.go @@ -1,7 +1,6 @@ package core import ( - "bytes" "fmt" "io" "math/big" @@ -11,19 +10,15 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/compression/rle" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/pow" "github.com/ethereum/go-ethereum/rlp" "github.com/hashicorp/golang-lru" - "github.com/syndtr/goleveldb/leveldb" ) var ( @@ -40,55 +35,9 @@ const ( blockCacheLimit = 256 maxFutureBlocks = 256 maxTimeFutureBlocks = 30 + checkpointLimit = 200 ) -// CalcDifficulty is the difficulty adjustment algorithm. It returns -// the difficulty that a new block b should have when created at time -// given the parent block's time and difficulty. -func CalcDifficulty(time int64, parentTime int64, parentDiff *big.Int) *big.Int { - diff := new(big.Int) - adjust := new(big.Int).Div(parentDiff, params.DifficultyBoundDivisor) - if big.NewInt(time-parentTime).Cmp(params.DurationLimit) < 0 { - diff.Add(parentDiff, adjust) - } else { - diff.Sub(parentDiff, adjust) - } - if diff.Cmp(params.MinimumDifficulty) < 0 { - return params.MinimumDifficulty - } - return diff -} - -// CalcTD computes the total difficulty of block. -func CalcTD(block, parent *types.Block) *big.Int { - if parent == nil { - return block.Difficulty() - } - d := block.Difficulty() - d.Add(d, parent.Td) - return d -} - -// CalcGasLimit computes the gas limit of the next block after parent. -// The result may be modified by the caller. -func CalcGasLimit(parent *types.Block) *big.Int { - decay := new(big.Int).Div(parent.GasLimit(), params.GasLimitBoundDivisor) - contrib := new(big.Int).Mul(parent.GasUsed(), big.NewInt(3)) - contrib = contrib.Div(contrib, big.NewInt(2)) - contrib = contrib.Div(contrib, params.GasLimitBoundDivisor) - - gl := new(big.Int).Sub(parent.GasLimit(), decay) - gl = gl.Add(gl, contrib) - gl = gl.Add(gl, big.NewInt(1)) - gl.Set(common.BigMax(gl, params.MinGasLimit)) - - if gl.Cmp(params.GenesisGasLimit) < 0 { - gl.Add(parent.GasLimit(), decay) - gl.Set(common.BigMin(gl, params.GenesisGasLimit)) - } - return gl -} - type ChainManager struct { //eth EthManager blockDb common.Database @@ -101,6 +50,7 @@ type ChainManager struct { chainmu sync.RWMutex tsmu sync.RWMutex + checkpoint int // checkpoint counts towards the new checkpoint td *big.Int currentBlock *types.Block lastBlockHash common.Hash @@ -109,9 +59,8 @@ type ChainManager struct { transState *state.StateDB txState *state.ManagedState - cache *lru.Cache // cache is the LRU caching - futureBlocks *lru.Cache // future blocks are blocks added for later processing - pendingBlocks *lru.Cache // pending blocks contain blocks not yet written to the db + cache *lru.Cache // cache is the LRU caching + futureBlocks *lru.Cache // future blocks are blocks added for later processing quit chan struct{} // procInterrupt must be atomically called @@ -240,6 +189,24 @@ func (self *ChainManager) setTransState(statedb *state.StateDB) { self.transState = statedb } +func (bc *ChainManager) recover() bool { + data, _ := bc.blockDb.Get([]byte("checkpoint")) + if len(data) != 0 { + block := bc.GetBlock(common.BytesToHash(data)) + if block != nil { + err := bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes()) + if err != nil { + glog.Fatalln("db write err:", err) + } + + bc.currentBlock = block + bc.lastBlockHash = block.Hash() + return true + } + } + return false +} + func (bc *ChainManager) setLastState() { data, _ := bc.blockDb.Get([]byte("LastBlock")) if len(data) != 0 { @@ -248,7 +215,12 @@ func (bc *ChainManager) setLastState() { bc.currentBlock = block bc.lastBlockHash = block.Hash() } else { - glog.Fatalf("Fatal. LastBlock not found. Please run removedb and resync") + glog.Infof("LastBlock (%x) not found. Recovering...\n", data) + if bc.recover() { + glog.Infof("Recover successful") + } else { + glog.Fatalf("Recover failed. Please report") + } } } else { bc.Reset() @@ -347,14 +319,19 @@ func (self *ChainManager) ExportN(w io.Writer, first uint64, last uint64) error // insert injects a block into the current chain block chain. Note, this function // assumes that the `mu` mutex is held! func (bc *ChainManager) insert(block *types.Block) { - key := append(blockNumPre, block.Number().Bytes()...) - err := bc.blockDb.Put(key, block.Hash().Bytes()) + err := WriteHead(bc.blockDb, block) if err != nil { glog.Fatal("db write fail:", err) } - err = bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes()) - if err != nil { - glog.Fatal("db write fail:", err) + + bc.checkpoint++ + if bc.checkpoint > checkpointLimit { + err = bc.blockDb.Put([]byte("checkpoint"), block.Hash().Bytes()) + if err != nil { + glog.Fatal("db write fail:", err) + } + + bc.checkpoint = 0 } bc.currentBlock = block @@ -387,12 +364,6 @@ func (bc *ChainManager) HasBlock(hash common.Hash) bool { return true } - if bc.pendingBlocks != nil { - if _, exist := bc.pendingBlocks.Get(hash); exist { - return true - } - } - data, _ := bc.blockDb.Get(append(blockHashPre, hash[:]...)) return len(data) != 0 } @@ -423,26 +394,15 @@ func (self *ChainManager) GetBlock(hash common.Hash) *types.Block { return block.(*types.Block) } - if self.pendingBlocks != nil { - if block, _ := self.pendingBlocks.Get(hash); block != nil { - return block.(*types.Block) - } - } - - data, _ := self.blockDb.Get(append(blockHashPre, hash[:]...)) - if len(data) == 0 { - return nil - } - var block types.StorageBlock - if err := rlp.Decode(bytes.NewReader(data), &block); err != nil { - glog.V(logger.Error).Infof("invalid block RLP for hash %x: %v", hash, err) + block := GetBlockByHash(self.blockDb, hash) + if block == nil { return nil } // Add the block to the cache - self.cache.Add(hash, (*types.Block)(&block)) + self.cache.Add(hash, (*types.Block)(block)) - return (*types.Block)(&block) + return (*types.Block)(block) } func (self *ChainManager) GetBlockByNumber(num uint64) *types.Block { @@ -468,12 +428,7 @@ func (self *ChainManager) GetBlocksFromHash(hash common.Hash, n int) (blocks []* // non blocking version func (self *ChainManager) getBlockByNumber(num uint64) *types.Block { - key, _ := self.blockDb.Get(append(blockNumPre, big.NewInt(int64(num)).Bytes()...)) - if len(key) == 0 { - return nil - } - - return self.GetBlock(common.BytesToHash(key)) + return GetBlockByNumber(self.blockDb, num) } func (self *ChainManager) GetUnclesInChain(block *types.Block, length int) (uncles []*types.Header) { @@ -519,31 +474,6 @@ func (self *ChainManager) procFutureBlocks() { } } -func (self *ChainManager) enqueueForWrite(block *types.Block) { - self.pendingBlocks.Add(block.Hash(), block) -} - -func (self *ChainManager) flushQueuedBlocks() { - db, batchWrite := self.blockDb.(*ethdb.LDBDatabase) - batch := new(leveldb.Batch) - for _, key := range self.pendingBlocks.Keys() { - b, _ := self.pendingBlocks.Get(key) - block := b.(*types.Block) - - enc, _ := rlp.EncodeToBytes((*types.StorageBlock)(block)) - key := append(blockHashPre, block.Hash().Bytes()...) - if batchWrite { - batch.Put(key, rle.Compress(enc)) - } else { - self.blockDb.Put(key, enc) - } - } - - if batchWrite { - db.LDB().Write(batch, nil) - } -} - type writeStatus byte const ( @@ -586,15 +516,7 @@ func (self *ChainManager) WriteBlock(block *types.Block, queued bool) (status wr status = sideStatTy } - if queued { - // Write block to database. Eventually we'll have to improve on this and throw away blocks that are - // not in the canonical chain. - self.mu.Lock() - self.enqueueForWrite(block) - self.mu.Unlock() - } else { - self.write(block) - } + self.write(block) // Delete from future blocks self.futureBlocks.Remove(block.Hash()) @@ -610,8 +532,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { self.chainmu.Lock() defer self.chainmu.Unlock() - self.pendingBlocks, _ = lru.New(len(chain)) - // A queued approach to delivering events. This is generally // faster than direct delivery and requires much less mutex // acquiring. @@ -629,7 +549,6 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { // Start the parallel nonce verifier. go verifyNonces(self.pow, chain, nonceQuit, nonceDone) defer close(nonceQuit) - defer self.flushQueuedBlocks() txcount := 0 for i, block := range chain { @@ -673,7 +592,7 @@ func (self *ChainManager) InsertChain(chain types.Blocks) (int, error) { // Allow up to MaxFuture second in the future blocks. If this limit // is exceeded the chain is discarded and processed at a later time // if given. - if max := time.Now().Unix() + maxTimeFutureBlocks; block.Time() > max { + if max := time.Now().Unix() + maxTimeFutureBlocks; int64(block.Time()) > max { return i, fmt.Errorf("%v: BlockFutureErr, %v > %v", BlockFutureErr, block.Time(), max) } diff --git a/core/chain_manager_test.go b/core/chain_manager_test.go index 8b3ea9e85..6869bc746 100644 --- a/core/chain_manager_test.go +++ b/core/chain_manager_test.go @@ -109,8 +109,7 @@ func testChain(chainB types.Blocks, bman *BlockProcessor) (*big.Int, error) { bman.bc.mu.Lock() { - bman.bc.enqueueForWrite(block) - //bman.bc.write(block) + bman.bc.write(block) } bman.bc.mu.Unlock() } diff --git a/core/chain_util.go b/core/chain_util.go new file mode 100644 index 000000000..8051cc47a --- /dev/null +++ b/core/chain_util.go @@ -0,0 +1,98 @@ +package core + +import ( + "bytes" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" +) + +// CalcDifficulty is the difficulty adjustment algorithm. It returns +// the difficulty that a new block b should have when created at time +// given the parent block's time and difficulty. +func CalcDifficulty(time int64, parentTime int64, parentDiff *big.Int) *big.Int { + diff := new(big.Int) + adjust := new(big.Int).Div(parentDiff, params.DifficultyBoundDivisor) + if big.NewInt(time-parentTime).Cmp(params.DurationLimit) < 0 { + diff.Add(parentDiff, adjust) + } else { + diff.Sub(parentDiff, adjust) + } + if diff.Cmp(params.MinimumDifficulty) < 0 { + return params.MinimumDifficulty + } + return diff +} + +// CalcTD computes the total difficulty of block. +func CalcTD(block, parent *types.Block) *big.Int { + if parent == nil { + return block.Difficulty() + } + d := block.Difficulty() + d.Add(d, parent.Td) + return d +} + +// CalcGasLimit computes the gas limit of the next block after parent. +// The result may be modified by the caller. +func CalcGasLimit(parent *types.Block) *big.Int { + decay := new(big.Int).Div(parent.GasLimit(), params.GasLimitBoundDivisor) + contrib := new(big.Int).Mul(parent.GasUsed(), big.NewInt(3)) + contrib = contrib.Div(contrib, big.NewInt(2)) + contrib = contrib.Div(contrib, params.GasLimitBoundDivisor) + + gl := new(big.Int).Sub(parent.GasLimit(), decay) + gl = gl.Add(gl, contrib) + gl = gl.Add(gl, big.NewInt(1)) + gl.Set(common.BigMax(gl, params.MinGasLimit)) + + if gl.Cmp(params.GenesisGasLimit) < 0 { + gl.Add(parent.GasLimit(), decay) + gl.Set(common.BigMin(gl, params.GenesisGasLimit)) + } + return gl +} + +// GetBlockByHash returns the block corresponding to the hash or nil if not found +func GetBlockByHash(db common.Database, hash common.Hash) *types.Block { + data, _ := db.Get(append(blockHashPre, hash[:]...)) + if len(data) == 0 { + return nil + } + var block types.StorageBlock + if err := rlp.Decode(bytes.NewReader(data), &block); err != nil { + glog.V(logger.Error).Infof("invalid block RLP for hash %x: %v", hash, err) + return nil + } + return (*types.Block)(&block) +} + +// GetBlockByHash returns the canonical block by number or nil if not found +func GetBlockByNumber(db common.Database, number uint64) *types.Block { + key, _ := db.Get(append(blockNumPre, big.NewInt(int64(number)).Bytes()...)) + if len(key) == 0 { + return nil + } + + return GetBlockByHash(db, common.BytesToHash(key)) +} + +// WriteHead force writes the current head +func WriteHead(db common.Database, block *types.Block) error { + key := append(blockNumPre, block.Number().Bytes()...) + err := db.Put(key, block.Hash().Bytes()) + if err != nil { + return err + } + err = db.Put([]byte("LastBlock"), block.Hash().Bytes()) + if err != nil { + return err + } + return nil +} diff --git a/core/state_transition.go b/core/state_transition.go index e2212dfef..5611ffd0f 100644 --- a/core/state_transition.go +++ b/core/state_transition.go @@ -7,7 +7,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/vm" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/params" @@ -56,11 +55,6 @@ type Message interface { Data() []byte } -func AddressFromMessage(msg Message) common.Address { - from, _ := msg.From() - return crypto.CreateAddress(from, msg.Nonce()) -} - func MessageCreatesContract(msg Message) bool { return msg.To() == nil } diff --git a/core/transaction_pool.go b/core/transaction_pool.go index 6a7012c65..ac9027755 100644 --- a/core/transaction_pool.go +++ b/core/transaction_pool.go @@ -65,7 +65,7 @@ func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn, gasLimitFn func( gasLimit: gasLimitFn, minGasPrice: new(big.Int), pendingState: state.ManageState(currentStateFn()), - events: eventMux.Subscribe(ChainEvent{}, GasPriceChanged{}), + events: eventMux.Subscribe(ChainHeadEvent{}, GasPriceChanged{}), } go pool.eventLoop() @@ -80,7 +80,7 @@ func (pool *TxPool) eventLoop() { pool.mu.Lock() switch ev := ev.(type) { - case ChainEvent: + case ChainHeadEvent: pool.resetState() case GasPriceChanged: pool.minGasPrice = ev.Price diff --git a/core/types/block.go b/core/types/block.go index b7eb700ca..e8919e9a0 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -290,7 +290,7 @@ func (b *Block) MixDigest() common.Hash { return b.header.MixDigest } func (b *Block) Nonce() uint64 { return binary.BigEndian.Uint64(b.header.Nonce[:]) } func (b *Block) Bloom() Bloom { return b.header.Bloom } func (b *Block) Coinbase() common.Address { return b.header.Coinbase } -func (b *Block) Time() int64 { return int64(b.header.Time) } +func (b *Block) Time() uint64 { return b.header.Time } func (b *Block) Root() common.Hash { return b.header.Root } func (b *Block) ParentHash() common.Hash { return b.header.ParentHash } func (b *Block) TxHash() common.Hash { return b.header.TxHash } diff --git a/core/types/block_test.go b/core/types/block_test.go index 03e6881be..e0b98cd26 100644 --- a/core/types/block_test.go +++ b/core/types/block_test.go @@ -31,7 +31,7 @@ func TestBlockEncoding(t *testing.T) { check("Root", block.Root(), common.HexToHash("ef1552a40b7165c3cd773806b9e0c165b75356e0314bf0706f279c729f51e017")) check("Hash", block.Hash(), common.HexToHash("0a5843ac1cb04865017cb35a57b50b07084e5fcee39b5acadade33149f4fff9e")) check("Nonce", block.Nonce(), uint64(0xa13a5a8c8f2bb1c4)) - check("Time", block.Time(), int64(1426516743)) + check("Time", block.Time(), uint64(1426516743)) check("Size", block.Size(), common.StorageSize(len(blockEnc))) tx1 := NewTransaction(0, common.HexToAddress("095e7baea6a6c7c4c2dfeb977efac326af552d87"), big.NewInt(10), big.NewInt(50000), big.NewInt(10), nil) diff --git a/core/vm/environment.go b/core/vm/environment.go index c103049a2..0a5891f5c 100644 --- a/core/vm/environment.go +++ b/core/vm/environment.go @@ -17,7 +17,7 @@ type Environment interface { BlockNumber() *big.Int GetHash(n uint64) common.Hash Coinbase() common.Address - Time() int64 + Time() uint64 Difficulty() *big.Int GasLimit() *big.Int Transfer(from, to Account, amount *big.Int) error diff --git a/core/vm/vm.go b/core/vm/vm.go index 9e092300d..ba803683b 100644 --- a/core/vm/vm.go +++ b/core/vm/vm.go @@ -444,7 +444,7 @@ func (self *Vm) Run(context *Context, input []byte) (ret []byte, err error) { case TIMESTAMP: time := self.env.Time() - stack.push(big.NewInt(time)) + stack.push(new(big.Int).SetUint64(time)) case NUMBER: number := self.env.BlockNumber() diff --git a/core/vm_env.go b/core/vm_env.go index 6dd83acde..24a29545f 100644 --- a/core/vm_env.go +++ b/core/vm_env.go @@ -33,7 +33,7 @@ func NewEnv(state *state.StateDB, chain *ChainManager, msg Message, header *type func (self *VMEnv) Origin() common.Address { f, _ := self.msg.From(); return f } func (self *VMEnv) BlockNumber() *big.Int { return self.header.Number } func (self *VMEnv) Coinbase() common.Address { return self.header.Coinbase } -func (self *VMEnv) Time() int64 { return int64(self.header.Time) } +func (self *VMEnv) Time() uint64 { return self.header.Time } func (self *VMEnv) Difficulty() *big.Int { return self.header.Difficulty } func (self *VMEnv) GasLimit() *big.Int { return self.header.GasLimit } func (self *VMEnv) Value() *big.Int { return self.msg.Value() } diff --git a/eth/backend.go b/eth/backend.go index 4644b8a93..d6ad3381d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -11,8 +11,6 @@ import ( "strings" "time" - "github.com/ethereum/go-ethereum/metrics" - "github.com/ethereum/ethash" "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/common" @@ -26,6 +24,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/miner" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discover" @@ -57,10 +56,9 @@ var ( ) type Config struct { - Name string - ProtocolVersion int - NetworkId int - GenesisNonce int + Name string + NetworkId int + GenesisNonce int BlockChainVersion int SkipBcVersionCheck bool // e.g. blockchain export @@ -226,7 +224,6 @@ type Ethereum struct { autodagquit chan bool etherbase common.Address clientVersion string - ethVersionId int netVersionId int shhVersionId int } @@ -291,14 +288,20 @@ func New(config *Config) (*Ethereum, error) { nodeDb := filepath.Join(config.DataDir, "nodes") // Perform database sanity checks - d, _ := blockDb.Get([]byte("ProtocolVersion")) - protov := int(common.NewValue(d).Uint()) - if protov != config.ProtocolVersion && protov != 0 { - path := filepath.Join(config.DataDir, "blockchain") - return nil, fmt.Errorf("Database version mismatch. Protocol(%d / %d). `rm -rf %s`", protov, config.ProtocolVersion, path) - } - saveProtocolVersion(blockDb, config.ProtocolVersion) - glog.V(logger.Info).Infof("Protocol Version: %v, Network Id: %v", config.ProtocolVersion, config.NetworkId) + /* + // The databases were previously tied to protocol versions. Currently we + // are moving away from this decision as approaching Frontier. The below + // check was left in for now but should eventually be just dropped. + + d, _ := blockDb.Get([]byte("ProtocolVersion")) + protov := int(common.NewValue(d).Uint()) + if protov != config.ProtocolVersion && protov != 0 { + path := filepath.Join(config.DataDir, "blockchain") + return nil, fmt.Errorf("Database version mismatch. Protocol(%d / %d). `rm -rf %s`", protov, config.ProtocolVersion, path) + } + saveProtocolVersion(blockDb, config.ProtocolVersion) + */ + glog.V(logger.Info).Infof("Protocol Versions: %v, Network Id: %v", ProtocolVersions, config.NetworkId) if !config.SkipBcVersionCheck { b, _ := blockDb.Get([]byte("BlockchainVersion")) @@ -321,7 +324,6 @@ func New(config *Config) (*Ethereum, error) { DataDir: config.DataDir, etherbase: common.HexToAddress(config.Etherbase), clientVersion: config.Name, // TODO should separate from Name - ethVersionId: config.ProtocolVersion, netVersionId: config.NetworkId, NatSpec: config.NatSpec, MinerThreads: config.MinerThreads, @@ -345,7 +347,7 @@ func New(config *Config) (*Ethereum, error) { eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) - eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.pow, eth.chainManager) + eth.protocolManager = NewProtocolManager(config.NetworkId, eth.eventMux, eth.txPool, eth.pow, eth.chainManager) eth.miner = miner.New(eth, eth.EventMux(), eth.pow) eth.miner.SetGasPrice(config.GasPrice) @@ -358,7 +360,7 @@ func New(config *Config) (*Ethereum, error) { if err != nil { return nil, err } - protocols := []p2p.Protocol{eth.protocolManager.SubProtocol} + protocols := append([]p2p.Protocol{}, eth.protocolManager.SubProtocols...) if config.Shh { protocols = append(protocols, eth.whisper.Protocol()) } @@ -495,7 +497,7 @@ func (s *Ethereum) PeerCount() int { return s.net.PeerCoun func (s *Ethereum) Peers() []*p2p.Peer { return s.net.Peers() } func (s *Ethereum) MaxPeers() int { return s.net.MaxPeers } func (s *Ethereum) ClientVersion() string { return s.clientVersion } -func (s *Ethereum) EthVersion() int { return s.ethVersionId } +func (s *Ethereum) EthVersion() int { return int(s.protocolManager.SubProtocols[0].Version) } func (s *Ethereum) NetVersion() int { return s.netVersionId } func (s *Ethereum) ShhVersion() int { return s.shhVersionId } func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManager.downloader } @@ -504,7 +506,7 @@ func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolMana func (s *Ethereum) Start() error { jsonlogger.LogJson(&logger.LogStarting{ ClientString: s.net.Name, - ProtocolVersion: ProtocolVersion, + ProtocolVersion: s.EthVersion(), }) err := s.net.Start() if err != nil { @@ -560,7 +562,7 @@ done: func (s *Ethereum) StartForTest() { jsonlogger.LogJson(&logger.LogStarting{ ClientString: s.net.Name, - ProtocolVersion: ProtocolVersion, + ProtocolVersion: s.EthVersion(), }) } @@ -667,14 +669,20 @@ func (self *Ethereum) StopAutoDAG() { glog.V(logger.Info).Infof("Automatic pregeneration of ethash DAG OFF (ethash dir: %s)", ethash.DefaultDir) } -func saveProtocolVersion(db common.Database, protov int) { - d, _ := db.Get([]byte("ProtocolVersion")) - protocolVersion := common.NewValue(d).Uint() +/* + // The databases were previously tied to protocol versions. Currently we + // are moving away from this decision as approaching Frontier. The below + // code was left in for now but should eventually be just dropped. + + func saveProtocolVersion(db common.Database, protov int) { + d, _ := db.Get([]byte("ProtocolVersion")) + protocolVersion := common.NewValue(d).Uint() - if protocolVersion == 0 { - db.Put([]byte("ProtocolVersion"), common.NewValue(protov).Bytes()) + if protocolVersion == 0 { + db.Put([]byte("ProtocolVersion"), common.NewValue(protov).Bytes()) + } } -} +*/ func saveBlockchainVersion(db common.Database, bcVersion int) { d, _ := db.Get([]byte("BlockchainVersion")) diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 39976aae1..c788048e9 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -19,18 +19,24 @@ import ( "gopkg.in/fatih/set.v0" ) +const ( + eth60 = 60 // Constant to check for old protocol support + eth61 = 61 // Constant to check for new protocol support +) + var ( - MinHashFetch = 512 // Minimum amount of hashes to not consider a peer stalling - MaxHashFetch = 2048 // Amount of hashes to be fetched per retrieval request - MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request + MinHashFetch = 512 // Minimum amount of hashes to not consider a peer stalling + MaxHashFetch = 512 // Amount of hashes to be fetched per retrieval request + MaxBlockFetch = 128 // Amount of blocks to be fetched per retrieval request hashTTL = 5 * time.Second // Time it takes for a hash request to time out blockSoftTTL = 3 * time.Second // Request completion threshold for increasing or decreasing a peer's bandwidth blockHardTTL = 3 * blockSoftTTL // Maximum time allowance before a block request is considered expired crossCheckCycle = time.Second // Period after which to check for expired cross checks - maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out - maxBlockProcess = 256 // Number of blocks to import at once into the chain + maxQueuedHashes = 256 * 1024 // Maximum number of hashes to queue for import (DOS protection) + maxBannedHashes = 4096 // Number of bannable hashes before phasing old ones out + maxBlockProcess = 256 // Number of blocks to import at once into the chain ) var ( @@ -58,6 +64,9 @@ type hashCheckFn func(common.Hash) bool // blockRetrievalFn is a callback type for retrieving a block from the local chain. type blockRetrievalFn func(common.Hash) *types.Block +// headRetrievalFn is a callback type for retrieving the head block from the local chain. +type headRetrievalFn func() *types.Block + // chainInsertFn is a callback type to insert a batch of blocks into the local chain. type chainInsertFn func(types.Blocks) (int, error) @@ -98,6 +107,7 @@ type Downloader struct { // Callbacks hasBlock hashCheckFn // Checks if a block is present in the chain getBlock blockRetrievalFn // Retrieves a block from the chain + headBlock headRetrievalFn // Retrieves the head block from the chain insertChain chainInsertFn // Injects a batch of blocks into the chain dropPeer peerDropFn // Drops a peer for misbehaving @@ -109,8 +119,9 @@ type Downloader struct { // Channels newPeerCh chan *peer - hashCh chan hashPack - blockCh chan blockPack + hashCh chan hashPack // Channel receiving inbound hashes + blockCh chan blockPack // Channel receiving inbound blocks + processCh chan bool // Channel to signal the block fetcher of new or finished work cancelCh chan struct{} // Channel to cancel mid-flight syncs cancelLock sync.RWMutex // Lock to protect the cancel channel in delivers @@ -123,7 +134,7 @@ type Block struct { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader { +func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, headBlock headRetrievalFn, insertChain chainInsertFn, dropPeer peerDropFn) *Downloader { // Create the base downloader downloader := &Downloader{ mux: mux, @@ -131,11 +142,13 @@ func New(mux *event.TypeMux, hasBlock hashCheckFn, getBlock blockRetrievalFn, in peers: newPeerSet(), hasBlock: hasBlock, getBlock: getBlock, + headBlock: headBlock, insertChain: insertChain, dropPeer: dropPeer, newPeerCh: make(chan *peer, 1), hashCh: make(chan hashPack, 1), blockCh: make(chan blockPack, 1), + processCh: make(chan bool, 1), } // Inject all the known bad hashes downloader.banned = set.New() @@ -175,7 +188,7 @@ func (d *Downloader) Synchronising() bool { // RegisterPeer injects a new download peer into the set of block source to be // used for fetching hashes and blocks from. -func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) error { +func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn) error { // If the peer wants to send a banned hash, reject if d.banned.Has(head) { glog.V(logger.Debug).Infoln("Register rejected, head hash banned:", id) @@ -183,7 +196,7 @@ func (d *Downloader) RegisterPeer(id string, head common.Hash, getHashes hashFet } // Otherwise try to construct and register the peer glog.V(logger.Detail).Infoln("Registering peer", id) - if err := d.peers.Register(newPeer(id, head, getHashes, getBlocks)); err != nil { + if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks)); err != nil { glog.V(logger.Error).Infoln("Register failed:", err) return err } @@ -289,12 +302,38 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) { } }() - glog.V(logger.Debug).Infoln("Synchronizing with the network using:", p.id) - if err = d.fetchHashes(p, hash); err != nil { - return err - } - if err = d.fetchBlocks(); err != nil { - return err + glog.V(logger.Debug).Infof("Synchronizing with the network using: %s, eth/%d", p.id, p.version) + switch p.version { + case eth60: + // Old eth/60 version, use reverse hash retrieval algorithm + if err = d.fetchHashes60(p, hash); err != nil { + return err + } + if err = d.fetchBlocks60(); err != nil { + return err + } + case eth61: + // New eth/61, use forward, concurrent hash and block retrieval algorithm + number, err := d.findAncestor(p) + if err != nil { + return err + } + errc := make(chan error, 2) + go func() { errc <- d.fetchHashes(p, number+1) }() + go func() { errc <- d.fetchBlocks(number + 1) }() + + // If any fetcher fails, cancel the other + if err := <-errc; err != nil { + d.cancel() + <-errc + return err + } + return <-errc + + default: + // Something very wrong, stop right here + glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version) + return errBadPeer } glog.V(logger.Debug).Infoln("Synchronization completed") @@ -326,10 +365,10 @@ func (d *Downloader) Terminate() { d.cancel() } -// fetchHahes starts retrieving hashes backwards from a specific peer and hash, +// fetchHashes60 starts retrieving hashes backwards from a specific peer and hash, // up until it finds a common ancestor. If the source peer times out, alternative // ones are tried for continuation. -func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { +func (d *Downloader) fetchHashes60(p *peer, h common.Hash) error { var ( start = time.Now() active = p // active peer will help determine the current active peer @@ -346,12 +385,12 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { <-timeout.C // timeout channel should be initially empty. getHashes := func(from common.Hash) { - go active.getHashes(from) + go active.getRelHashes(from) timeout.Reset(hashTTL) } // Add the hash to the queue, and start hash retrieval. - d.queue.Insert([]common.Hash{h}) + d.queue.Insert([]common.Hash{h}, false) getHashes(h) attempted[p.id] = true @@ -377,7 +416,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { if d.banned.Has(hash) { glog.V(logger.Debug).Infof("Peer (%s) sent a known invalid chain", active.id) - d.queue.Insert(hashPack.hashes[:index+1]) + d.queue.Insert(hashPack.hashes[:index+1], false) if err := d.banBlocks(active.id, hash); err != nil { glog.V(logger.Debug).Infof("Failed to ban batch of blocks: %v", err) } @@ -395,7 +434,7 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { } } // Insert all the new hashes, but only continue if got something useful - inserts := d.queue.Insert(hashPack.hashes) + inserts := d.queue.Insert(hashPack.hashes, false) if len(inserts) == 0 && !done { glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes", active.id) return errBadPeer @@ -422,9 +461,9 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { continue } // We're done, prepare the download cache and proceed pulling the blocks - offset := 0 + offset := uint64(0) if block := d.getBlock(head); block != nil { - offset = int(block.NumberU64() + 1) + offset = block.NumberU64() + 1 } d.queue.Prepare(offset) finished = true @@ -481,10 +520,10 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error { return nil } -// fetchBlocks iteratively downloads the entire schedules block-chain, taking +// fetchBlocks60 iteratively downloads the entire schedules block-chain, taking // any available peers, reserving a chunk of blocks for each, wait for delivery // and periodically checking for timeouts. -func (d *Downloader) fetchBlocks() error { +func (d *Downloader) fetchBlocks60() error { glog.V(logger.Debug).Infoln("Downloading", d.queue.Pending(), "block(s)") start := time.Now() @@ -619,6 +658,332 @@ out: return nil } +// findAncestor tries to locate the common ancestor block of the local chain and +// a remote peers blockchain. In the general case when our node was in sync and +// on the correct chain, checking the top N blocks should already get us a match. +// In the rare scenario when we ended up on a long soft fork (i.e. none of the +// head blocks match), we do a binary search to find the common ancestor. +func (d *Downloader) findAncestor(p *peer) (uint64, error) { + glog.V(logger.Debug).Infof("%v: looking for common ancestor", p) + + // Request out head blocks to short circuit ancestor location + head := d.headBlock().NumberU64() + from := int64(head) - int64(MaxHashFetch) + if from < 0 { + from = 0 + } + go p.getAbsHashes(uint64(from), MaxHashFetch) + + // Wait for the remote response to the head fetch + number, hash := uint64(0), common.Hash{} + timeout := time.After(hashTTL) + + for finished := false; !finished; { + select { + case <-d.cancelCh: + return 0, errCancelHashFetch + + case hashPack := <-d.hashCh: + // Discard anything not from the origin peer + if hashPack.peerId != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + break + } + // Make sure the peer actually gave something valid + hashes := hashPack.hashes + if len(hashes) == 0 { + glog.V(logger.Debug).Infof("%v: empty head hash set", p) + return 0, errEmptyHashSet + } + // Check if a common ancestor was found + finished = true + for i := len(hashes) - 1; i >= 0; i-- { + if d.hasBlock(hashes[i]) { + number, hash = uint64(from)+uint64(i), hashes[i] + break + } + } + + case <-d.blockCh: + // Out of bounds blocks received, ignore them + + case <-timeout: + glog.V(logger.Debug).Infof("%v: head hash timeout", p) + return 0, errTimeout + } + } + // If the head fetch already found an ancestor, return + if !common.EmptyHash(hash) { + glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x]", p, number, hash[:4]) + return number, nil + } + // Ancestor not found, we need to binary search over our chain + start, end := uint64(0), head + for start+1 < end { + // Split our chain interval in two, and request the hash to cross check + check := (start + end) / 2 + + timeout := time.After(hashTTL) + go p.getAbsHashes(uint64(check), 1) + + // Wait until a reply arrives to this request + for arrived := false; !arrived; { + select { + case <-d.cancelCh: + return 0, errCancelHashFetch + + case hashPack := <-d.hashCh: + // Discard anything not from the origin peer + if hashPack.peerId != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + break + } + // Make sure the peer actually gave something valid + hashes := hashPack.hashes + if len(hashes) != 1 { + glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes)) + return 0, errBadPeer + } + arrived = true + + // Modify the search interval based on the response + block := d.getBlock(hashes[0]) + if block == nil { + end = check + break + } + if block.NumberU64() != check { + glog.V(logger.Debug).Infof("%v: non requested hash #%d [%x], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check) + return 0, errBadPeer + } + start = check + + case <-d.blockCh: + // Out of bounds blocks received, ignore them + + case <-timeout: + glog.V(logger.Debug).Infof("%v: search hash timeout", p) + return 0, errTimeout + } + } + } + return start, nil +} + +// fetchHashes keeps retrieving hashes from the requested number, until no more +// are returned, potentially throttling on the way. +func (d *Downloader) fetchHashes(p *peer, from uint64) error { + glog.V(logger.Debug).Infof("%v: downloading hashes from #%d", p, from) + + // Create a timeout timer, and the associated hash fetcher + timeout := time.NewTimer(0) // timer to dump a non-responsive active peer + <-timeout.C // timeout channel should be initially empty + defer timeout.Stop() + + getHashes := func(from uint64) { + glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from) + + go p.getAbsHashes(from, MaxHashFetch) + timeout.Reset(hashTTL) + } + // Start pulling hashes, until all are exhausted + getHashes(from) + for { + select { + case <-d.cancelCh: + return errCancelHashFetch + + case hashPack := <-d.hashCh: + // Make sure the active peer is giving us the hashes + if hashPack.peerId != p.id { + glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", hashPack.peerId) + break + } + timeout.Stop() + + // If no more hashes are inbound, notify the block fetcher and return + if len(hashPack.hashes) == 0 { + glog.V(logger.Debug).Infof("%v: no available hashes", p) + + select { + case d.processCh <- false: + case <-d.cancelCh: + } + return nil + } + // Otherwise insert all the new hashes, aborting in case of junk + glog.V(logger.Detail).Infof("%v: inserting %d hashes from #%d", p, len(hashPack.hashes), from) + + inserts := d.queue.Insert(hashPack.hashes, true) + if len(inserts) != len(hashPack.hashes) { + glog.V(logger.Debug).Infof("%v: stale hashes", p) + return errBadPeer + } + // Notify the block fetcher of new hashes, but stop if queue is full + cont := d.queue.Pending() < maxQueuedHashes + select { + case d.processCh <- cont: + default: + } + if !cont { + return nil + } + // Queue not yet full, fetch the next batch + from += uint64(len(hashPack.hashes)) + getHashes(from) + + case <-timeout.C: + glog.V(logger.Debug).Infof("%v: hash request timed out", p) + return errTimeout + } + } +} + +// fetchBlocks iteratively downloads the scheduled hashes, taking any available +// peers, reserving a chunk of blocks for each, waiting for delivery and also +// periodically checking for timeouts. +func (d *Downloader) fetchBlocks(from uint64) error { + glog.V(logger.Debug).Infof("Downloading blocks from #%d", from) + defer glog.V(logger.Debug).Infof("Block download terminated") + + // Create a timeout timer for scheduling expiration tasks + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + update := make(chan struct{}, 1) + + // Prepare the queue and fetch blocks until the hash fetcher's done + d.queue.Prepare(from) + finished := false + + for { + select { + case <-d.cancelCh: + return errCancelBlockFetch + + case blockPack := <-d.blockCh: + // If the peer was previously banned and failed to deliver it's pack + // in a reasonable time frame, ignore it's message. + if peer := d.peers.Peer(blockPack.peerId); peer != nil { + // Deliver the received chunk of blocks, and demote in case of errors + err := d.queue.Deliver(blockPack.peerId, blockPack.blocks) + switch err { + case nil: + // If no blocks were delivered, demote the peer (need the delivery above) + if len(blockPack.blocks) == 0 { + peer.Demote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: no blocks delivered", peer) + break + } + // All was successful, promote the peer and potentially start processing + peer.Promote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blockPack.blocks)) + go d.process() + + case errInvalidChain: + // The hash chain is invalid (blocks are not ordered properly), abort + return err + + case errNoFetchesPending: + // Peer probably timed out with its delivery but came through + // in the end, demote, but allow to to pull from this peer. + peer.Demote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: out of bound delivery", peer) + + case errStaleDelivery: + // Delivered something completely else than requested, usually + // caused by a timeout and delivery during a new sync cycle. + // Don't set it to idle as the original request should still be + // in flight. + peer.Demote() + glog.V(logger.Detail).Infof("%s: stale delivery", peer) + + default: + // Peer did something semi-useful, demote but keep it around + peer.Demote() + peer.SetIdle() + glog.V(logger.Detail).Infof("%s: delivery partially failed: %v", peer, err) + go d.process() + } + } + // Blocks arrived, try to update the progress + select { + case update <- struct{}{}: + default: + } + + case cont := <-d.processCh: + // The hash fetcher sent a continuation flag, check if it's done + if !cont { + finished = true + } + // Hashes arrive, try to update the progress + select { + case update <- struct{}{}: + default: + } + + case <-ticker.C: + // Sanity check update the progress + select { + case update <- struct{}{}: + default: + } + + case <-update: + // Short circuit if we lost all our peers + if d.peers.Len() == 0 { + return errNoPeers + } + // Check for block request timeouts and demote the responsible peers + for _, pid := range d.queue.Expire(blockHardTTL) { + if peer := d.peers.Peer(pid); peer != nil { + peer.Demote() + glog.V(logger.Detail).Infof("%s: block delivery timeout", peer) + } + } + // If there's noting more to fetch, wait or terminate + if d.queue.Pending() == 0 { + if d.queue.InFlight() == 0 && finished { + glog.V(logger.Debug).Infof("Block fetching completed") + return nil + } + break + } + // Send a download request to all idle peers, until throttled + for _, peer := range d.peers.IdlePeers() { + // Short circuit if throttling activated + if d.queue.Throttle() { + break + } + // Reserve a chunk of hashes for a peer. A nil can mean either that + // no more hashes are available, or that the peer is known not to + // have them. + request := d.queue.Reserve(peer, peer.Capacity()) + if request == nil { + continue + } + if glog.V(logger.Detail) { + glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes)) + } + // Fetch the chunk and make sure any errors return the hashes to the queue + if err := peer.Fetch(request); err != nil { + glog.V(logger.Error).Infof("%v: fetch failed, rescheduling", peer) + d.queue.Cancel(request) + } + } + // Make sure that we have peers available for fetching. If all peers have been tried + // and all failed throw an error + if !d.queue.Throttle() && d.queue.InFlight() == 0 { + return errPeersUnavailable + } + } + } +} + // banBlocks retrieves a batch of blocks from a peer feeding us invalid hashes, // and bans the head of the retrieved batch. // diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 7feca8782..c5fb00289 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -21,7 +21,7 @@ var ( genesis = core.GenesisBlockForTesting(testdb, common.Address{}, big.NewInt(0)) ) -// makeChain creates a chain of n blocks starting at and including +// makeChain creates a chain of n blocks starting at but not including // parent. the returned hash chain is ordered head->parent. func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common.Hash]*types.Block) { blocks := core.GenerateChain(parent, testdb, n, func(i int, gen *core.BlockGen) { @@ -42,7 +42,7 @@ func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common // h2[:f] are different but have a common suffix of length n-f. func makeChainFork(n, f int, parent *types.Block) (h1, h2 []common.Hash, b1, b2 map[common.Hash]*types.Block) { // Create the common suffix. - h, b := makeChain(n-f-1, 0, parent) + h, b := makeChain(n-f, 0, parent) // Create the forks. h1, b1 = makeChain(f, 1, b[h[0]]) h1 = append(h1, h[1:]...) @@ -75,7 +75,7 @@ func newTester() *downloadTester { peerHashes: make(map[string][]common.Hash), peerBlocks: make(map[string]map[common.Hash]*types.Block), } - tester.downloader = New(new(event.TypeMux), tester.hasBlock, tester.getBlock, tester.insertChain, tester.dropPeer) + tester.downloader = New(new(event.TypeMux), tester.hasBlock, tester.getBlock, tester.headBlock, tester.insertChain, tester.dropPeer) return tester } @@ -99,6 +99,11 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block { return dl.ownBlocks[hash] } +// headBlock retrieves the current head block from the canonical chain. +func (dl *downloadTester) headBlock() *types.Block { + return dl.getBlock(dl.ownHashes[len(dl.ownHashes)-1]) +} + // insertChain injects a new batch of blocks into the simulated chain. func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { for i, block := range blocks { @@ -112,15 +117,15 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) (int, error) { } // newPeer registers a new block download source into the downloader. -func (dl *downloadTester) newPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { - return dl.newSlowPeer(id, hashes, blocks, 0) +func (dl *downloadTester) newPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block) error { + return dl.newSlowPeer(id, version, hashes, blocks, 0) } // newSlowPeer registers a new block download source into the downloader, with a // specific delay time on processing the network packets sent to it, simulating // potentially slow network IO. -func (dl *downloadTester) newSlowPeer(id string, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error { - err := dl.downloader.RegisterPeer(id, hashes[0], dl.peerGetHashesFn(id, delay), dl.peerGetBlocksFn(id, delay)) +func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Hash, blocks map[common.Hash]*types.Block, delay time.Duration) error { + err := dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, version, delay), dl.peerGetBlocksFn(id, delay)) if err == nil { // Assign the owned hashes and blocks to the peer (deep copy) dl.peerHashes[id] = make([]common.Hash, len(hashes)) @@ -141,10 +146,10 @@ func (dl *downloadTester) dropPeer(id string) { dl.downloader.UnregisterPeer(id) } -// peerGetBlocksFn constructs a getHashes function associated with a particular +// peerGetRelHashesFn constructs a GetHashes function associated with a specific // peer in the download tester. The returned function can be used to retrieve // batches of hashes from the particularly requested peer. -func (dl *downloadTester) peerGetHashesFn(id string, delay time.Duration) func(head common.Hash) error { +func (dl *downloadTester) peerGetRelHashesFn(id string, delay time.Duration) func(head common.Hash) error { return func(head common.Hash) error { time.Sleep(delay) @@ -174,13 +179,43 @@ func (dl *downloadTester) peerGetHashesFn(id string, delay time.Duration) func(h } } +// peerGetAbsHashesFn constructs a GetHashesFromNumber function associated with +// a particular peer in the download tester. The returned function can be used to +// retrieve batches of hashes from the particularly requested peer. +func (dl *downloadTester) peerGetAbsHashesFn(id string, version int, delay time.Duration) func(uint64, int) error { + // If the simulated peer runs eth/60, this message is not supported + if version == eth60 { + return func(uint64, int) error { return nil } + } + // Otherwise create a method to request the blocks by number + return func(head uint64, count int) error { + time.Sleep(delay) + + limit := count + if dl.maxHashFetch > 0 { + limit = dl.maxHashFetch + } + // Gather the next batch of hashes + hashes := dl.peerHashes[id] + result := make([]common.Hash, 0, limit) + for i := 0; i < limit && len(hashes)-int(head)-1-i >= 0; i++ { + result = append(result, hashes[len(hashes)-int(head)-1-i]) + } + // Delay delivery a bit to allow attacks to unfold + go func() { + time.Sleep(time.Millisecond) + dl.downloader.DeliverHashes(id, result) + }() + return nil + } +} + // peerGetBlocksFn constructs a getBlocks function associated with a particular // peer in the download tester. The returned function can be used to retrieve // batches of blocks from the particularly requested peer. func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([]common.Hash) error { return func(hashes []common.Hash) error { time.Sleep(delay) - blocks := dl.peerBlocks[id] result := make([]*types.Block, 0, len(hashes)) for _, hash := range hashes { @@ -195,13 +230,13 @@ func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([ } // Tests that simple synchronization, without throttling from a good peer works. -func TestSynchronisation(t *testing.T) { +func TestSynchronisation60(t *testing.T) { // Create a small enough block chain to download and the tester targetBlocks := blockCacheLimit - 15 hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() - tester.newPeer("peer", hashes, blocks) + tester.newPeer("peer", eth60, hashes, blocks) // Synchronise with the peer and make sure all blocks were retrieved if err := tester.sync("peer"); err != nil { @@ -212,42 +247,79 @@ func TestSynchronisation(t *testing.T) { } } -// Tests that an inactive downloader will not accept incoming hashes and blocks. -func TestInactiveDownloader(t *testing.T) { +// Tests that simple synchronization against a canonical chain works correctly. +// In this test common ancestor lookup should be short circuited and not require +// binary searching. +func TestCanonicalSynchronisation(t *testing.T) { + // Create a small enough block chain to download + targetBlocks := blockCacheLimit - 15 + hashes, blocks := makeChain(targetBlocks, 0, genesis) + tester := newTester() + tester.newPeer("peer", eth61, hashes, blocks) - // Check that neither hashes nor blocks are accepted - if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive { - t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + // Synchronise with the peer and make sure all blocks were retrieved + if err := tester.sync("peer"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) } - if err := tester.downloader.DeliverBlocks("bad peer", []*types.Block{}); err != errNoSyncActive { - t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + if imported := len(tester.ownBlocks); imported != targetBlocks+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, targetBlocks+1) } } -// Tests that a canceled download wipes all previously accumulated state. -func TestCancel(t *testing.T) { - // Create a small enough block chain to download and the tester - targetBlocks := blockCacheLimit - 15 +// Tests that if a large batch of blocks are being downloaded, it is throttled +// until the cached blocks are retrieved. +func TestThrottling60(t *testing.T) { + // Create a long block chain to download and the tester + targetBlocks := 8 * blockCacheLimit hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() - tester.newPeer("peer", hashes, blocks) + tester.newPeer("peer", eth60, hashes, blocks) - // Make sure canceling works with a pristine downloader - tester.downloader.cancel() - hashCount, blockCount := tester.downloader.queue.Size() - if hashCount > 0 || blockCount > 0 { - t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + // Wrap the importer to allow stepping + done := make(chan int) + tester.downloader.insertChain = func(blocks types.Blocks) (int, error) { + n, err := tester.insertChain(blocks) + done <- n + return n, err } - // Synchronise with the peer, but cancel afterwards - if err := tester.sync("peer"); err != nil { - t.Fatalf("failed to synchronise blocks: %v", err) + // Start a synchronisation concurrently + errc := make(chan error) + go func() { + errc <- tester.sync("peer") + }() + // Iteratively take some blocks, always checking the retrieval count + for len(tester.ownBlocks) < targetBlocks+1 { + // Wait a bit for sync to throttle itself + var cached int + for start := time.Now(); time.Since(start) < 3*time.Second; { + time.Sleep(25 * time.Millisecond) + + cached = len(tester.downloader.queue.blockPool) + if cached == blockCacheLimit || len(tester.ownBlocks)+cached == targetBlocks+1 { + break + } + } + // Make sure we filled up the cache, then exhaust it + time.Sleep(25 * time.Millisecond) // give it a chance to screw up + if cached != blockCacheLimit && len(tester.ownBlocks)+cached < targetBlocks+1 { + t.Fatalf("block count mismatch: have %v, want %v", cached, blockCacheLimit) + } + <-done // finish previous blocking import + for cached > maxBlockProcess { + cached -= <-done + } + time.Sleep(25 * time.Millisecond) // yield to the insertion } - tester.downloader.cancel() - hashCount, blockCount = tester.downloader.queue.Size() - if hashCount > 0 || blockCount > 0 { - t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + <-done // finish the last blocking import + + // Check that we haven't pulled more blocks than available + if len(tester.ownBlocks) > targetBlocks+1 { + t.Fatalf("target block count mismatch: have %v, want %v", len(tester.ownBlocks), targetBlocks+1) + } + if err := <-errc; err != nil { + t.Fatalf("block synchronization failed: %v", err) } } @@ -259,7 +331,7 @@ func TestThrottling(t *testing.T) { hashes, blocks := makeChain(targetBlocks, 0, genesis) tester := newTester() - tester.newPeer("peer", hashes, blocks) + tester.newPeer("peer", eth61, hashes, blocks) // Wrap the importer to allow stepping done := make(chan int) @@ -307,6 +379,102 @@ func TestThrottling(t *testing.T) { } } +// Tests that simple synchronization against a forked chain works correctly. In +// this test common ancestor lookup should *not* be short circuited, and a full +// binary search should be executed. +func TestForkedSynchronisation(t *testing.T) { + // Create a long enough forked chain + common, fork := MaxHashFetch, 2*MaxHashFetch + hashesA, hashesB, blocksA, blocksB := makeChainFork(common+fork, fork, genesis) + + tester := newTester() + tester.newPeer("fork A", eth61, hashesA, blocksA) + tester.newPeer("fork B", eth61, hashesB, blocksB) + + // Synchronise with the peer and make sure all blocks were retrieved + if err := tester.sync("fork A"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != common+fork+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, common+fork+1) + } + // Synchronise with the second peer and make sure that fork is pulled too + if err := tester.sync("fork B"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + if imported := len(tester.ownBlocks); imported != common+2*fork+1 { + t.Fatalf("synchronised block mismatch: have %v, want %v", imported, common+2*fork+1) + } +} + +// Tests that an inactive downloader will not accept incoming hashes and blocks. +func TestInactiveDownloader(t *testing.T) { + tester := newTester() + + // Check that neither hashes nor blocks are accepted + if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } + if err := tester.downloader.DeliverBlocks("bad peer", []*types.Block{}); err != errNoSyncActive { + t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive) + } +} + +// Tests that a canceled download wipes all previously accumulated state. +func TestCancel60(t *testing.T) { + // Create a small enough block chain to download and the tester + targetBlocks := blockCacheLimit - 15 + hashes, blocks := makeChain(targetBlocks, 0, genesis) + + tester := newTester() + tester.newPeer("peer", eth60, hashes, blocks) + + // Make sure canceling works with a pristine downloader + tester.downloader.cancel() + hashCount, blockCount := tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + } + // Synchronise with the peer, but cancel afterwards + if err := tester.sync("peer"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + tester.downloader.cancel() + hashCount, blockCount = tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + } +} + +// Tests that a canceled download wipes all previously accumulated state. +func TestCancel(t *testing.T) { + // Create a small enough block chain to download and the tester + targetBlocks := blockCacheLimit - 15 + if targetBlocks >= MaxHashFetch { + targetBlocks = MaxHashFetch - 15 + } + hashes, blocks := makeChain(targetBlocks, 0, genesis) + + tester := newTester() + tester.newPeer("peer", eth61, hashes, blocks) + + // Make sure canceling works with a pristine downloader + tester.downloader.cancel() + hashCount, blockCount := tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + } + // Synchronise with the peer, but cancel afterwards + if err := tester.sync("peer"); err != nil { + t.Fatalf("failed to synchronise blocks: %v", err) + } + tester.downloader.cancel() + hashCount, blockCount = tester.downloader.queue.Size() + if hashCount > 0 || blockCount > 0 { + t.Errorf("block or hash count mismatch: %d hashes, %d blocks, want 0", hashCount, blockCount) + } +} + // Tests that synchronisation from multiple peers works as intended (multi thread sanity test). func TestMultiSynchronisation(t *testing.T) { // Create various peers with various parts of the chain @@ -317,7 +485,7 @@ func TestMultiSynchronisation(t *testing.T) { tester := newTester() for i := 0; i < targetPeers; i++ { id := fmt.Sprintf("peer #%d", i) - tester.newPeer(id, hashes[i*blockCacheLimit:], blocks) + tester.newPeer(id, eth60, hashes[i*blockCacheLimit:], blocks) } // Synchronise with the middle peer and make sure half of the blocks were retrieved id := fmt.Sprintf("peer #%d", targetPeers/2) @@ -347,8 +515,8 @@ func TestSlowSynchronisation(t *testing.T) { targetIODelay := time.Second hashes, blocks := makeChain(targetBlocks, 0, genesis) - tester.newSlowPeer("fast", hashes, blocks, 0) - tester.newSlowPeer("slow", hashes, blocks, targetIODelay) + tester.newSlowPeer("fast", eth60, hashes, blocks, 0) + tester.newSlowPeer("slow", eth60, hashes, blocks, targetIODelay) // Try to sync with the peers (pull hashes from fast) start := time.Now() @@ -370,13 +538,14 @@ func TestSlowSynchronisation(t *testing.T) { func TestNonExistingParentAttack(t *testing.T) { tester := newTester() + // Forge a single-link chain with a forged header hashes, blocks := makeChain(1, 0, genesis) - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) wrongblock := types.NewBlock(&types.Header{}, nil, nil, nil) wrongblock.Td = blocks[hashes[0]].Td hashes, blocks = makeChain(1, 0, wrongblock) - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err == nil { @@ -401,8 +570,8 @@ func TestRepeatingHashAttack(t *testing.T) { // TODO: Is this thing valid?? // Create a valid chain, but drop the last link hashes, blocks := makeChain(blockCacheLimit, 0, genesis) - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", hashes[:len(hashes)-1], blocks) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, hashes[:len(hashes)-1], blocks) // Try and sync with the malicious node errc := make(chan error) @@ -431,10 +600,10 @@ func TestNonExistingBlockAttack(t *testing.T) { // Create a valid chain, but forge the last link hashes, blocks := makeChain(blockCacheLimit, 0, genesis) - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) hashes[len(hashes)/2] = common.Hash{} - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err != errPeersUnavailable { @@ -453,7 +622,7 @@ func TestInvalidHashOrderAttack(t *testing.T) { // Create a valid long chain, but reverse some hashes within hashes, blocks := makeChain(4*blockCacheLimit, 0, genesis) - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) chunk1 := make([]common.Hash, blockCacheLimit) chunk2 := make([]common.Hash, blockCacheLimit) @@ -462,7 +631,7 @@ func TestInvalidHashOrderAttack(t *testing.T) { copy(hashes[2*blockCacheLimit:], chunk1) copy(hashes[blockCacheLimit:], chunk2) - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err != errInvalidChain { @@ -489,8 +658,8 @@ func TestMadeupHashChainAttack(t *testing.T) { rand.Read(randomHashes[i][:]) } - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", randomHashes, nil) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, randomHashes, nil) // Try and sync with the malicious node and check that it fails if err := tester.sync("attack"); err != errCrossCheckFailed { @@ -517,7 +686,7 @@ func TestMadeupHashChainDrippingAttack(t *testing.T) { // Try and sync with the attacker, one hash at a time tester.maxHashFetch = 1 - tester.newPeer("attack", randomHashes, nil) + tester.newPeer("attack", eth60, randomHashes, nil) if err := tester.sync("attack"); err != errStallingPeer { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errStallingPeer) } @@ -540,7 +709,7 @@ func TestMadeupBlockChainAttack(t *testing.T) { } // Try and sync with the malicious node and check that it fails tester := newTester() - tester.newPeer("attack", gapped, blocks) + tester.newPeer("attack", eth60, gapped, blocks) if err := tester.sync("attack"); err != errCrossCheckFailed { t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errCrossCheckFailed) } @@ -548,13 +717,13 @@ func TestMadeupBlockChainAttack(t *testing.T) { blockSoftTTL = defaultBlockTTL crossCheckCycle = defaultCrossCheckCycle - tester.newPeer("valid", hashes, blocks) + tester.newPeer("valid", eth60, hashes, blocks) if err := tester.sync("valid"); err != nil { t.Fatalf("failed to synchronise blocks: %v", err) } } -// tests that if one/multiple malicious peers try to feed a banned blockchain to +// Tests that if one/multiple malicious peers try to feed a banned blockchain to // the downloader, it will not keep refetching the same chain indefinitely, but // gradually block pieces of it, until its head is also blocked. func TestBannedChainStarvationAttack(t *testing.T) { @@ -565,8 +734,8 @@ func TestBannedChainStarvationAttack(t *testing.T) { // Create the tester and ban the selected hash. tester := newTester() tester.downloader.banned.Add(forkHashes[fork-1]) - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", forkHashes, forkBlocks) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, forkHashes, forkBlocks) // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. @@ -586,7 +755,7 @@ func TestBannedChainStarvationAttack(t *testing.T) { banned = bans } // Check that after banning an entire chain, bad peers get dropped - if err := tester.newPeer("new attacker", forkHashes, forkBlocks); err != errBannedHead { + if err := tester.newPeer("new attacker", eth60, forkHashes, forkBlocks); err != errBannedHead { t.Fatalf("peer registration mismatch: have %v, want %v", err, errBannedHead) } if peer := tester.downloader.peers.Peer("new attacker"); peer != nil { @@ -618,8 +787,8 @@ func TestBannedChainMemoryExhaustionAttack(t *testing.T) { MaxBlockFetch = 4 maxBannedHashes = 256 - tester.newPeer("valid", hashes, blocks) - tester.newPeer("attack", forkHashes, forkBlocks) + tester.newPeer("valid", eth60, hashes, blocks) + tester.newPeer("attack", eth60, forkHashes, forkBlocks) // Iteratively try to sync, and verify that the banned hash list grows until // the head of the invalid chain is blocked too. @@ -664,7 +833,7 @@ func TestOverlappingDeliveryAttack(t *testing.T) { // Register an attacker that always returns non-requested blocks too tester := newTester() - tester.newPeer("attack", hashes, blocks) + tester.newPeer("attack", eth60, hashes, blocks) rawGetBlocks := tester.downloader.peers.Peer("attack").getBlocks tester.downloader.peers.Peer("attack").getBlocks = func(request []common.Hash) error { @@ -712,7 +881,7 @@ func TestHashAttackerDropping(t *testing.T) { for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, []common.Hash{genesis.Hash()}, nil); err != nil { + if err := tester.newPeer(id, eth60, []common.Hash{genesis.Hash()}, nil); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) } if _, ok := tester.peerHashes[id]; !ok { @@ -744,7 +913,7 @@ func TestBlockAttackerDropping(t *testing.T) { for i, tt := range tests { // Register a new peer and ensure it's presence id := fmt.Sprintf("test %d", i) - if err := tester.newPeer(id, []common.Hash{common.Hash{}}, nil); err != nil { + if err := tester.newPeer(id, eth60, []common.Hash{common.Hash{}}, nil); err != nil { t.Fatalf("test %d: failed to register new peer: %v", i, err) } if _, ok := tester.peerHashes[id]; !ok { diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index f36e133e4..bd58b4dc8 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -15,7 +15,8 @@ import ( "gopkg.in/fatih/set.v0" ) -type hashFetcherFn func(common.Hash) error +type relativeHashFetcherFn func(common.Hash) error +type absoluteHashFetcherFn func(uint64, int) error type blockFetcherFn func([]common.Hash) error var ( @@ -37,20 +38,25 @@ type peer struct { ignored *set.Set // Set of hashes not to request (didn't have previously) - getHashes hashFetcherFn // Method to retrieve a batch of hashes (mockable for testing) - getBlocks blockFetcherFn // Method to retrieve a batch of blocks (mockable for testing) + getRelHashes relativeHashFetcherFn // Method to retrieve a batch of hashes from an origin hash + getAbsHashes absoluteHashFetcherFn // Method to retrieve a batch of hashes from an absolute position + getBlocks blockFetcherFn // Method to retrieve a batch of blocks + + version int // Eth protocol version number to switch strategies } // newPeer create a new downloader peer, with specific hash and block retrieval // mechanisms. -func newPeer(id string, head common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer { +func newPeer(id string, version int, head common.Hash, getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn) *peer { return &peer{ - id: id, - head: head, - capacity: 1, - getHashes: getHashes, - getBlocks: getBlocks, - ignored: set.New(), + id: id, + head: head, + capacity: 1, + getRelHashes: getRelHashes, + getAbsHashes: getAbsHashes, + getBlocks: getBlocks, + ignored: set.New(), + version: version, } } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index 903f043eb..b24ce42e8 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -40,9 +40,9 @@ type queue struct { pendPool map[string]*fetchRequest // Currently pending block retrieval operations - blockPool map[common.Hash]int // Hash-set of the downloaded data blocks, mapping to cache indexes - blockCache []*Block // Downloaded but not yet delivered blocks - blockOffset int // Offset of the first cached block in the block-chain + blockPool map[common.Hash]uint64 // Hash-set of the downloaded data blocks, mapping to cache indexes + blockCache []*Block // Downloaded but not yet delivered blocks + blockOffset uint64 // Offset of the first cached block in the block-chain lock sync.RWMutex } @@ -53,7 +53,7 @@ func newQueue() *queue { hashPool: make(map[common.Hash]int), hashQueue: prque.New(), pendPool: make(map[string]*fetchRequest), - blockPool: make(map[common.Hash]int), + blockPool: make(map[common.Hash]uint64), blockCache: make([]*Block, blockCacheLimit), } } @@ -69,7 +69,7 @@ func (q *queue) Reset() { q.pendPool = make(map[string]*fetchRequest) - q.blockPool = make(map[common.Hash]int) + q.blockPool = make(map[common.Hash]uint64) q.blockOffset = 0 q.blockCache = make([]*Block, blockCacheLimit) } @@ -130,7 +130,7 @@ func (q *queue) Has(hash common.Hash) bool { // Insert adds a set of hashes for the download queue for scheduling, returning // the new hashes encountered. -func (q *queue) Insert(hashes []common.Hash) []common.Hash { +func (q *queue) Insert(hashes []common.Hash, fifo bool) []common.Hash { q.lock.Lock() defer q.lock.Unlock() @@ -147,7 +147,11 @@ func (q *queue) Insert(hashes []common.Hash) []common.Hash { inserts = append(inserts, hash) q.hashPool[hash] = q.hashCounter - q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first + if fifo { + q.hashQueue.Push(hash, -float32(q.hashCounter)) // Lowest gets schedules first + } else { + q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first + } } return inserts } @@ -175,7 +179,7 @@ func (q *queue) GetBlock(hash common.Hash) *Block { return nil } // Return the block if it's still available in the cache - if q.blockOffset <= index && index < q.blockOffset+len(q.blockCache) { + if q.blockOffset <= index && index < q.blockOffset+uint64(len(q.blockCache)) { return q.blockCache[index-q.blockOffset] } return nil @@ -202,7 +206,7 @@ func (q *queue) TakeBlocks() []*Block { for k, n := len(q.blockCache)-len(blocks), len(q.blockCache); k < n; k++ { q.blockCache[k] = nil } - q.blockOffset += len(blocks) + q.blockOffset += uint64(len(blocks)) return blocks } @@ -318,7 +322,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { continue } // If a requested block falls out of the range, the hash chain is invalid - index := int(block.NumberU64()) - q.blockOffset + index := int(int64(block.NumberU64()) - int64(q.blockOffset)) if index >= len(q.blockCache) || index < 0 { return errInvalidChain } @@ -329,7 +333,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { } delete(request.Hashes, hash) delete(q.hashPool, hash) - q.blockPool[hash] = int(block.NumberU64()) + q.blockPool[hash] = block.NumberU64() } // Return all failed or missing fetches to the queue for hash, index := range request.Hashes { @@ -346,7 +350,7 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) { } // Prepare configures the block cache offset to allow accepting inbound blocks. -func (q *queue) Prepare(offset int) { +func (q *queue) Prepare(offset uint64) { q.lock.Lock() defer q.lock.Unlock() diff --git a/eth/handler.go b/eth/handler.go index 278a2bec2..59bbb480b 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -49,7 +49,7 @@ type ProtocolManager struct { fetcher *fetcher.Fetcher peers *peerSet - SubProtocol p2p.Protocol + SubProtocols []p2p.Protocol eventMux *event.TypeMux txSub event.Subscription @@ -68,8 +68,8 @@ type ProtocolManager struct { // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager) *ProtocolManager { - // Create the protocol manager and initialize peer handlers +func NewProtocolManager(networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, chainman *core.ChainManager) *ProtocolManager { + // Create the protocol manager with the base fields manager := &ProtocolManager{ eventMux: mux, txpool: txpool, @@ -79,18 +79,24 @@ func NewProtocolManager(protocolVersion, networkId int, mux *event.TypeMux, txpo txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } - manager.SubProtocol = p2p.Protocol{ - Name: "eth", - Version: uint(protocolVersion), - Length: ProtocolLength, - Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { - peer := manager.newPeer(protocolVersion, networkId, p, rw) - manager.newPeerCh <- peer - return manager.handle(peer) - }, + // Initiate a sub-protocol for every implemented version we can handle + manager.SubProtocols = make([]p2p.Protocol, len(ProtocolVersions)) + for i := 0; i < len(manager.SubProtocols); i++ { + version := ProtocolVersions[i] + + manager.SubProtocols[i] = p2p.Protocol{ + Name: "eth", + Version: version, + Length: ProtocolLengths[i], + Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := manager.newPeer(int(version), networkId, p, rw) + manager.newPeerCh <- peer + return manager.handle(peer) + }, + } } // Construct the different synchronisation mechanisms - manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.InsertChain, manager.removePeer) + manager.downloader = downloader.New(manager.eventMux, manager.chainman.HasBlock, manager.chainman.GetBlock, manager.chainman.CurrentBlock, manager.chainman.InsertChain, manager.removePeer) validator := func(block *types.Block, parent *types.Block) error { return core.ValidateHeader(pow, block.Header(), parent, true) @@ -152,31 +158,32 @@ func (pm *ProtocolManager) Stop() { } func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { - td, current, genesis := pm.chainman.Status() - - return newPeer(pv, nv, genesis, current, td, p, rw) + return newPeer(pv, nv, p, rw) } +// handle is the callback invoked to manage the life cycle of an eth peer. When +// this function terminates, the peer is disconnected. func (pm *ProtocolManager) handle(p *peer) error { - // Execute the Ethereum handshake. - if err := p.handleStatus(); err != nil { + glog.V(logger.Debug).Infof("%v: peer connected [%s]", p, p.Name()) + + // Execute the Ethereum handshake + td, head, genesis := pm.chainman.Status() + if err := p.Handshake(td, head, genesis); err != nil { + glog.V(logger.Debug).Infof("%v: handshake failed: %v", p, err) return err } - - // Register the peer locally. - glog.V(logger.Detail).Infoln("Adding peer", p.id) + // Register the peer locally + glog.V(logger.Detail).Infof("%v: adding peer", p) if err := pm.peers.Register(p); err != nil { - glog.V(logger.Error).Infoln("Addition failed:", err) + glog.V(logger.Error).Infof("%v: addition failed: %v", p, err) return err } defer pm.removePeer(p.id) - // Register the peer in the downloader. If the downloader - // considers it banned, we disconnect. - if err := pm.downloader.RegisterPeer(p.id, p.Head(), p.requestHashes, p.requestBlocks); err != nil { + // Register the peer in the downloader. If the downloader considers it banned, we disconnect + if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks); err != nil { return err } - // Propagate existing transactions. new transactions appearing // after this will be sent via broadcasts. pm.syncTransactions(p) @@ -184,13 +191,17 @@ func (pm *ProtocolManager) handle(p *peer) error { // main loop. handle incoming messages. for { if err := pm.handleMsg(p); err != nil { + glog.V(logger.Debug).Infof("%v: message handling failed: %v", p, err) return err } } return nil } +// handleMsg is invoked whenever an inbound message is received from a remote +// peer. The remote connection is torn down upon returning any error. func (pm *ProtocolManager) handleMsg(p *peer) error { + // Read the next message from the remote peer, and ensure it's fully consumed msg, err := p.rw.ReadMsg() if err != nil { return err @@ -198,58 +209,69 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if msg.Size > ProtocolMaxMsgSize { return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) } - // make sure that the payload has been fully consumed defer msg.Discard() + // Handle the message depending on its contents switch msg.Code { case StatusMsg: + // Status messages should never arrive after the handshake return errResp(ErrExtraStatusMsg, "uncontrolled status message") - case TxMsg: - // TODO: rework using lazy RLP stream - var txs []*types.Transaction - if err := msg.Decode(&txs); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - for i, tx := range txs { - if tx == nil { - return errResp(ErrDecode, "transaction %d is nil", i) - } - jsonlogger.LogJson(&logger.EthTxReceived{ - TxHash: tx.Hash().Hex(), - RemoteId: p.ID().String(), - }) - } - pm.txpool.AddTransactions(txs) - case GetBlockHashesMsg: - var request getBlockHashesMsgData + // Retrieve the number of hashes to return and from which origin hash + var request getBlockHashesData if err := msg.Decode(&request); err != nil { - return errResp(ErrDecode, "->msg %v: %v", msg, err) + return errResp(ErrDecode, "%v: %v", msg, err) } - if request.Amount > uint64(downloader.MaxHashFetch) { request.Amount = uint64(downloader.MaxHashFetch) } - + // Retrieve the hashes from the block chain and return them hashes := pm.chainman.GetBlockHashesFromHash(request.Hash, request.Amount) + if len(hashes) == 0 { + glog.V(logger.Debug).Infof("invalid block hash %x", request.Hash.Bytes()[:4]) + } + return p.SendBlockHashes(hashes) - if glog.V(logger.Debug) { - if len(hashes) == 0 { - glog.Infof("invalid block hash %x", request.Hash.Bytes()[:4]) - } + case GetBlockHashesFromNumberMsg: + // Retrieve and decode the number of hashes to return and from which origin number + var request getBlockHashesFromNumberData + if err := msg.Decode(&request); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + if request.Amount > uint64(downloader.MaxHashFetch) { + request.Amount = uint64(downloader.MaxHashFetch) } + // Calculate the last block that should be retrieved, and short circuit if unavailable + last := pm.chainman.GetBlockByNumber(request.Number + request.Amount - 1) + if last == nil { + last = pm.chainman.CurrentBlock() + request.Amount = last.NumberU64() - request.Number + 1 + } + if last.NumberU64() < request.Number { + return p.SendBlockHashes(nil) + } + // Retrieve the hashes from the last block backwards, reverse and return + hashes := []common.Hash{last.Hash()} + hashes = append(hashes, pm.chainman.GetBlockHashesFromHash(last.Hash(), request.Amount-1)...) - // returns either requested hashes or nothing (i.e. not found) - return p.sendBlockHashes(hashes) + for i := 0; i < len(hashes)/2; i++ { + hashes[i], hashes[len(hashes)-1-i] = hashes[len(hashes)-1-i], hashes[i] + } + return p.SendBlockHashes(hashes) case BlockHashesMsg: + // A batch of hashes arrived to one of our previous requests msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + reqHashInPacketsMeter.Mark(1) var hashes []common.Hash if err := msgStream.Decode(&hashes); err != nil { break } + reqHashInTrafficMeter.Mark(int64(32 * len(hashes))) + + // Deliver them all to the downloader for queuing err := pm.downloader.DeliverHashes(p.id, hashes) if err != nil { glog.V(logger.Debug).Infoln(err) @@ -293,13 +315,14 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } list = list[:len(list)-2] + "]" - glog.Infof("Peer %s: no blocks found for requested hashes %s", p.id, list) + glog.Infof("%v: no blocks found for requested hashes %s", p, list) } - return p.sendBlocks(blocks) + return p.SendBlocks(blocks) case BlocksMsg: // Decode the arrived block message msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + reqBlockInPacketsMeter.Mark(1) var blocks []*types.Block if err := msgStream.Decode(&blocks); err != nil { @@ -307,8 +330,9 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { blocks = nil } // Update the receive timestamp of each block - for i := 0; i < len(blocks); i++ { - blocks[i].ReceivedAt = msg.ReceivedAt + for _, block := range blocks { + reqBlockInTrafficMeter.Mark(block.Size().Int64()) + block.ReceivedAt = msg.ReceivedAt } // Filter out any explicitly requested blocks, deliver the rest to the downloader if blocks := pm.fetcher.Filter(blocks); len(blocks) > 0 { @@ -323,9 +347,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msgStream.Decode(&hashes); err != nil { break } + propHashInPacketsMeter.Mark(1) + propHashInTrafficMeter.Mark(int64(32 * len(hashes))) + // Mark the hashes as present at the remote node for _, hash := range hashes { - p.blockHashes.Add(hash) + p.MarkBlock(hash) p.SetHead(hash) } // Schedule all the unknown hashes for retrieval @@ -336,15 +363,18 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } for _, hash := range unknown { - pm.fetcher.Notify(p.id, hash, time.Now(), p.requestBlocks) + pm.fetcher.Notify(p.id, hash, time.Now(), p.RequestBlocks) } case NewBlockMsg: // Retrieve and decode the propagated block - var request newBlockMsgData + var request newBlockData if err := msg.Decode(&request); err != nil { return errResp(ErrDecode, "%v: %v", msg, err) } + propBlockInPacketsMeter.Mark(1) + propBlockInTrafficMeter.Mark(request.Block.Size().Int64()) + if err := request.Block.ValidateFields(); err != nil { return errResp(ErrDecode, "block validation %v: %v", msg, err) } @@ -360,7 +390,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { RemoteId: p.ID().String(), }) // Mark the peer as owning the block and schedule it for import - p.blockHashes.Add(request.Block.Hash()) + p.MarkBlock(request.Block.Hash()) p.SetHead(request.Block.Hash()) pm.fetcher.Enqueue(p.id, request.Block) @@ -369,6 +399,29 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.SetTd(request.TD) go pm.synchronise(p) + case TxMsg: + // Transactions arrived, parse all of them and deliver to the pool + var txs []*types.Transaction + if err := msg.Decode(&txs); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + propTxnInPacketsMeter.Mark(1) + for i, tx := range txs { + // Validate and mark the remote transaction + if tx == nil { + return errResp(ErrDecode, "transaction %d is nil", i) + } + p.MarkTransaction(tx.Hash()) + + // Log it's arrival for later analysis + propTxnInTrafficMeter.Mark(tx.Size().Int64()) + jsonlogger.LogJson(&logger.EthTxReceived{ + TxHash: tx.Hash().Hex(), + RemoteId: p.ID().String(), + }) + } + pm.txpool.AddTransactions(txs) + default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) } @@ -385,28 +438,27 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { if propagate { transfer := peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range transfer { - peer.sendNewBlock(block) + peer.SendNewBlock(block) } glog.V(logger.Detail).Infof("propagated block %x to %d peers in %v", hash[:4], len(transfer), time.Since(block.ReceivedAt)) } // Otherwise if the block is indeed in out own chain, announce it if pm.chainman.HasBlock(hash) { for _, peer := range peers { - peer.sendNewBlockHashes([]common.Hash{hash}) + peer.SendNewBlockHashes([]common.Hash{hash}) } glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt)) } } -// BroadcastTx will propagate the block to its connected peers. It will sort -// out which peers do not contain the block in their block set and will do a -// sqrt(peers) to determine the amount of peers we broadcast to. +// BroadcastTx will propagate a transaction to all peers which are not known to +// already have the given transaction. func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) { // Broadcast transaction to a batch of peers not knowing about it peers := pm.peers.PeersWithoutTx(hash) //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] for _, peer := range peers { - peer.sendTransaction(tx) + peer.SendTransactions(types.Transactions{tx}) } glog.V(logger.Detail).Infoln("broadcast tx to", len(peers), "peers") } diff --git a/eth/metrics.go b/eth/metrics.go new file mode 100644 index 000000000..950b50296 --- /dev/null +++ b/eth/metrics.go @@ -0,0 +1,28 @@ +package eth + +import ( + "github.com/ethereum/go-ethereum/metrics" +) + +var ( + propTxnInPacketsMeter = metrics.NewMeter("eth/prop/txns/in/packets") + propTxnInTrafficMeter = metrics.NewMeter("eth/prop/txns/in/traffic") + propTxnOutPacketsMeter = metrics.NewMeter("eth/prop/txns/out/packets") + propTxnOutTrafficMeter = metrics.NewMeter("eth/prop/txns/out/traffic") + propHashInPacketsMeter = metrics.NewMeter("eth/prop/hashes/in/packets") + propHashInTrafficMeter = metrics.NewMeter("eth/prop/hashes/in/traffic") + propHashOutPacketsMeter = metrics.NewMeter("eth/prop/hashes/out/packets") + propHashOutTrafficMeter = metrics.NewMeter("eth/prop/hashes/out/traffic") + propBlockInPacketsMeter = metrics.NewMeter("eth/prop/blocks/in/packets") + propBlockInTrafficMeter = metrics.NewMeter("eth/prop/blocks/in/traffic") + propBlockOutPacketsMeter = metrics.NewMeter("eth/prop/blocks/out/packets") + propBlockOutTrafficMeter = metrics.NewMeter("eth/prop/blocks/out/traffic") + reqHashInPacketsMeter = metrics.NewMeter("eth/req/hashes/in/packets") + reqHashInTrafficMeter = metrics.NewMeter("eth/req/hashes/in/traffic") + reqHashOutPacketsMeter = metrics.NewMeter("eth/req/hashes/out/packets") + reqHashOutTrafficMeter = metrics.NewMeter("eth/req/hashes/out/traffic") + reqBlockInPacketsMeter = metrics.NewMeter("eth/req/blocks/in/packets") + reqBlockInTrafficMeter = metrics.NewMeter("eth/req/blocks/in/traffic") + reqBlockOutPacketsMeter = metrics.NewMeter("eth/req/blocks/out/packets") + reqBlockOutTrafficMeter = metrics.NewMeter("eth/req/blocks/out/traffic") +) diff --git a/eth/peer.go b/eth/peer.go index c7045282b..088417aab 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -20,25 +20,18 @@ var ( errNotRegistered = errors.New("peer is not registered") ) -type statusMsgData struct { - ProtocolVersion uint32 - NetworkId uint32 - TD *big.Int - CurrentBlock common.Hash - GenesisBlock common.Hash -} - -type getBlockHashesMsgData struct { - Hash common.Hash - Amount uint64 -} +const ( + maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) + maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) +) type peer struct { *p2p.Peer rw p2p.MsgReadWriter - protv, netid int + version int // Protocol version negotiated + network int // Network ID being on id string @@ -46,27 +39,21 @@ type peer struct { td *big.Int lock sync.RWMutex - genesis, ourHash common.Hash - ourTd *big.Int - - txHashes *set.Set - blockHashes *set.Set + knownTxs *set.Set // Set of transaction hashes known to be known by this peer + knownBlocks *set.Set // Set of block hashes known to be known by this peer } -func newPeer(protv, netid int, genesis, head common.Hash, td *big.Int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { +func newPeer(version, network int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { id := p.ID() return &peer{ Peer: p, rw: rw, - genesis: genesis, - ourHash: head, - ourTd: td, - protv: protv, - netid: netid, + version: version, + network: network, id: fmt.Sprintf("%x", id[:8]), - txHashes: set.New(), - blockHashes: set.New(), + knownTxs: set.New(), + knownBlocks: set.New(), } } @@ -103,68 +90,110 @@ func (p *peer) SetTd(td *big.Int) { p.td.Set(td) } -// sendTransactions sends transactions to the peer and includes the hashes -// in it's tx hash set for future reference. The tx hash will allow the -// manager to check whether the peer has already received this particular -// transaction -func (p *peer) sendTransactions(txs types.Transactions) error { - for _, tx := range txs { - p.txHashes.Add(tx.Hash()) +// MarkBlock marks a block as known for the peer, ensuring that the block will +// never be propagated to this particular peer. +func (p *peer) MarkBlock(hash common.Hash) { + // If we reached the memory allowance, drop a previously known block hash + for p.knownBlocks.Size() >= maxKnownBlocks { + p.knownBlocks.Pop() } + p.knownBlocks.Add(hash) +} +// MarkTransaction marks a transaction as known for the peer, ensuring that it +// will never be propagated to this particular peer. +func (p *peer) MarkTransaction(hash common.Hash) { + // If we reached the memory allowance, drop a previously known transaction hash + for p.knownTxs.Size() >= maxKnownTxs { + p.knownTxs.Pop() + } + p.knownTxs.Add(hash) +} + +// SendTransactions sends transactions to the peer and includes the hashes +// in its transaction hash set for future reference. +func (p *peer) SendTransactions(txs types.Transactions) error { + propTxnOutPacketsMeter.Mark(1) + for _, tx := range txs { + propTxnOutTrafficMeter.Mark(tx.Size().Int64()) + p.knownTxs.Add(tx.Hash()) + } return p2p.Send(p.rw, TxMsg, txs) } -func (p *peer) sendBlockHashes(hashes []common.Hash) error { +// SendBlockHashes sends a batch of known hashes to the remote peer. +func (p *peer) SendBlockHashes(hashes []common.Hash) error { + reqHashOutPacketsMeter.Mark(1) + reqHashOutTrafficMeter.Mark(int64(32 * len(hashes))) + return p2p.Send(p.rw, BlockHashesMsg, hashes) } -func (p *peer) sendBlocks(blocks []*types.Block) error { +// SendBlocks sends a batch of blocks to the remote peer. +func (p *peer) SendBlocks(blocks []*types.Block) error { + reqBlockOutPacketsMeter.Mark(1) + for _, block := range blocks { + reqBlockOutTrafficMeter.Mark(block.Size().Int64()) + } return p2p.Send(p.rw, BlocksMsg, blocks) } -func (p *peer) sendNewBlockHashes(hashes []common.Hash) error { +// SendNewBlockHashes announces the availability of a number of blocks through +// a hash notification. +func (p *peer) SendNewBlockHashes(hashes []common.Hash) error { + propHashOutPacketsMeter.Mark(1) + propHashOutTrafficMeter.Mark(int64(32 * len(hashes))) + for _, hash := range hashes { - p.blockHashes.Add(hash) + p.knownBlocks.Add(hash) } return p2p.Send(p.rw, NewBlockHashesMsg, hashes) } -func (p *peer) sendNewBlock(block *types.Block) error { - p.blockHashes.Add(block.Hash()) +// SendNewBlock propagates an entire block to a remote peer. +func (p *peer) SendNewBlock(block *types.Block) error { + propBlockOutPacketsMeter.Mark(1) + propBlockOutTrafficMeter.Mark(block.Size().Int64()) + p.knownBlocks.Add(block.Hash()) return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, block.Td}) } -func (p *peer) sendTransaction(tx *types.Transaction) error { - p.txHashes.Add(tx.Hash()) - - return p2p.Send(p.rw, TxMsg, []*types.Transaction{tx}) +// RequestHashes fetches a batch of hashes from a peer, starting at from, going +// towards the genesis block. +func (p *peer) RequestHashes(from common.Hash) error { + glog.V(logger.Debug).Infof("Peer [%s] fetching hashes (%d) from %x...\n", p.id, downloader.MaxHashFetch, from[:4]) + return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesData{from, uint64(downloader.MaxHashFetch)}) } -func (p *peer) requestHashes(from common.Hash) error { - glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, downloader.MaxHashFetch, from[:4]) - return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, uint64(downloader.MaxHashFetch)}) +// RequestHashesFromNumber fetches a batch of hashes from a peer, starting at the +// requested block number, going upwards towards the genesis block. +func (p *peer) RequestHashesFromNumber(from uint64, count int) error { + glog.V(logger.Debug).Infof("Peer [%s] fetching hashes (%d) from #%d...\n", p.id, count, from) + return p2p.Send(p.rw, GetBlockHashesFromNumberMsg, getBlockHashesFromNumberData{from, uint64(count)}) } -func (p *peer) requestBlocks(hashes []common.Hash) error { +// RequestBlocks fetches a batch of blocks corresponding to the specified hashes. +func (p *peer) RequestBlocks(hashes []common.Hash) error { glog.V(logger.Debug).Infof("[%s] fetching %v blocks\n", p.id, len(hashes)) return p2p.Send(p.rw, GetBlocksMsg, hashes) } -func (p *peer) handleStatus() error { +// Handshake executes the eth protocol handshake, negotiating version number, +// network IDs, difficulties, head and genesis blocks. +func (p *peer) Handshake(td *big.Int, head common.Hash, genesis common.Hash) error { + // Send out own handshake in a new thread errc := make(chan error, 1) go func() { - errc <- p2p.Send(p.rw, StatusMsg, &statusMsgData{ - ProtocolVersion: uint32(p.protv), - NetworkId: uint32(p.netid), - TD: p.ourTd, - CurrentBlock: p.ourHash, - GenesisBlock: p.genesis, + errc <- p2p.Send(p.rw, StatusMsg, &statusData{ + ProtocolVersion: uint32(p.version), + NetworkId: uint32(p.network), + TD: td, + CurrentBlock: head, + GenesisBlock: genesis, }) }() - - // read and handle remote status + // In the mean time retrieve the remote status message msg, err := p.rw.ReadMsg() if err != nil { return err @@ -175,31 +204,32 @@ func (p *peer) handleStatus() error { if msg.Size > ProtocolMaxMsgSize { return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) } - - var status statusMsgData + // Decode the handshake and make sure everything matches + var status statusData if err := msg.Decode(&status); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - - if status.GenesisBlock != p.genesis { - return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, p.genesis) + if status.GenesisBlock != genesis { + return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesis) } - - if int(status.NetworkId) != p.netid { - return errResp(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, p.netid) + if int(status.NetworkId) != p.network { + return errResp(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, p.network) } - - if int(status.ProtocolVersion) != p.protv { - return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, p.protv) + if int(status.ProtocolVersion) != p.version { + return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, p.version) } - // Set the total difficulty of the peer - p.td = status.TD - // set the best hash of the peer - p.head = status.CurrentBlock - + // Configure the remote peer, and sanity check out handshake too + p.td, p.head = status.TD, status.CurrentBlock return <-errc } +// String implements fmt.Stringer. +func (p *peer) String() string { + return fmt.Sprintf("Peer %s [%s]", p.id, + fmt.Sprintf("eth/%2d", p.version), + ) +} + // peerSet represents the collection of active peers currently participating in // the Ethereum sub-protocol. type peerSet struct { @@ -264,7 +294,7 @@ func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer { list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if !p.blockHashes.Has(hash) { + if !p.knownBlocks.Has(hash) { list = append(list, p) } } @@ -279,7 +309,7 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if !p.txHashes.Has(hash) { + if !p.knownTxs.Has(hash) { list = append(list, p) } } diff --git a/eth/protocol.go b/eth/protocol.go index 57805d9bd..bf9e155c5 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -7,11 +7,15 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) +// Supported versions of the eth protocol (first is primary). +var ProtocolVersions = []uint{61, 60} + +// Number of implemented message corresponding to different protocol versions. +var ProtocolLengths = []uint64{9, 8} + const ( - ProtocolVersion = 60 NetworkId = 0 - ProtocolLength = uint64(8) - ProtocolMaxMsgSize = 10 * 1024 * 1024 + ProtocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message ) // eth protocol message codes @@ -24,6 +28,7 @@ const ( GetBlocksMsg BlocksMsg NewBlockMsg + GetBlockHashesFromNumberMsg ) type errCode int @@ -72,8 +77,31 @@ type chainManager interface { Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash) } -// message structs used for RLP serialization -type newBlockMsgData struct { +// statusData is the network packet for the status message. +type statusData struct { + ProtocolVersion uint32 + NetworkId uint32 + TD *big.Int + CurrentBlock common.Hash + GenesisBlock common.Hash +} + +// getBlockHashesData is the network packet for the hash based block retrieval +// message. +type getBlockHashesData struct { + Hash common.Hash + Amount uint64 +} + +// getBlockHashesFromNumberData is the network packet for the number based block +// retrieval message. +type getBlockHashesFromNumberData struct { + Number uint64 + Amount uint64 +} + +// newBlockData is the network packet for the block propagation message. +type newBlockData struct { Block *types.Block TD *big.Int } diff --git a/eth/protocol_test.go b/eth/protocol_test.go index 60fa35443..4c1579d4e 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -39,15 +39,15 @@ func TestStatusMsgErrors(t *testing.T) { wantError: errResp(ErrNoStatusMsg, "first msg has code 2 (!= 0)"), }, { - code: StatusMsg, data: statusMsgData{10, NetworkId, td, currentBlock, genesis}, + code: StatusMsg, data: statusData{10, NetworkId, td, currentBlock, genesis}, wantError: errResp(ErrProtocolVersionMismatch, "10 (!= 0)"), }, { - code: StatusMsg, data: statusMsgData{ProtocolVersion, 999, td, currentBlock, genesis}, + code: StatusMsg, data: statusData{uint32(ProtocolVersions[0]), 999, td, currentBlock, genesis}, wantError: errResp(ErrNetworkIdMismatch, "999 (!= 0)"), }, { - code: StatusMsg, data: statusMsgData{ProtocolVersion, NetworkId, td, currentBlock, common.Hash{3}}, + code: StatusMsg, data: statusData{uint32(ProtocolVersions[0]), NetworkId, td, currentBlock, common.Hash{3}}, wantError: errResp(ErrGenesisBlockMismatch, "0300000000000000000000000000000000000000000000000000000000000000 (!= %x)", genesis), }, } @@ -167,7 +167,7 @@ func newProtocolManagerForTesting(txAdded chan<- []*types.Transaction) *Protocol db, _ = ethdb.NewMemDatabase() chain, _ = core.NewChainManager(core.GenesisBlock(0, db), db, db, core.FakePow{}, em) txpool = &fakeTxPool{added: txAdded} - pm = NewProtocolManager(ProtocolVersion, 0, em, txpool, core.FakePow{}, chain) + pm = NewProtocolManager(0, em, txpool, core.FakePow{}, chain) ) pm.Start() return pm @@ -188,7 +188,7 @@ func newTestPeer(pm *ProtocolManager) (*testPeer, <-chan error) { func (p *testPeer) handshake(t *testing.T) { td, currentBlock, genesis := p.pm.chainman.Status() - msg := &statusMsgData{ + msg := &statusData{ ProtocolVersion: uint32(p.pm.protVer), NetworkId: uint32(p.pm.netId), TD: td, diff --git a/eth/sync.go b/eth/sync.go index 82abb725f..47fd7363e 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -20,14 +20,6 @@ const ( txsyncPackSize = 100 * 1024 ) -// blockAnnounce is the hash notification of the availability of a new block in -// the network. -type blockAnnounce struct { - hash common.Hash - peer *peer - time time.Time -} - type txsync struct { p *peer txs []*types.Transaction @@ -75,7 +67,7 @@ func (pm *ProtocolManager) txsyncLoop() { // Send the pack in the background. glog.V(logger.Detail).Infof("%v: sending %d transactions (%v)", s.p.Peer, len(pack.txs), size) sending = true - go func() { done <- pack.p.sendTransactions(pack.txs) }() + go func() { done <- pack.p.SendTransactions(pack.txs) }() } // pick chooses the next pending sync. diff --git a/miner/worker.go b/miner/worker.go index f06b6afa1..90914ddcb 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -368,8 +368,8 @@ func (self *worker) commitNewWork() { tstart := time.Now() parent := self.chain.CurrentBlock() tstamp := tstart.Unix() - if tstamp <= parent.Time() { - tstamp = parent.Time() + 1 + if tstamp <= int64(parent.Time()) { + tstamp = int64(parent.Time()) + 1 } // this will ensure we're not going off too far in the future if now := time.Now().Unix(); tstamp > now+4 { @@ -382,7 +382,7 @@ func (self *worker) commitNewWork() { header := &types.Header{ ParentHash: parent.Hash(), Number: num.Add(num, common.Big1), - Difficulty: core.CalcDifficulty(tstamp, parent.Time(), parent.Difficulty()), + Difficulty: core.CalcDifficulty(int64(tstamp), int64(parent.Time()), parent.Difficulty()), GasLimit: core.CalcGasLimit(parent), GasUsed: new(big.Int), Coinbase: self.coinbase, diff --git a/rpc/api/api_test.go b/rpc/api/api_test.go index 7e273ef28..2ac8bcd45 100644 --- a/rpc/api/api_test.go +++ b/rpc/api/api_test.go @@ -76,8 +76,9 @@ func TestCompileSolidity(t *testing.T) { expLanguageVersion := "0" expSource := source - xeth := xeth.NewTest(ð.Ethereum{}, nil) - api := NewEthApi(xeth, codec.JSON) + eth := ð.Ethereum{} + xeth := xeth.NewTest(eth, nil) + api := NewEthApi(xeth, eth, codec.JSON) var rpcRequest shared.Request json.Unmarshal([]byte(jsonstr), &rpcRequest) diff --git a/rpc/api/eth.go b/rpc/api/eth.go index 962c8d0f9..db0b4b024 100644 --- a/rpc/api/eth.go +++ b/rpc/api/eth.go @@ -6,9 +6,12 @@ import ( "math/big" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/rpc/codec" "github.com/ethereum/go-ethereum/rpc/shared" "github.com/ethereum/go-ethereum/xeth" + "gopkg.in/fatih/set.v0" ) const ( @@ -18,9 +21,10 @@ const ( // eth api provider // See https://github.com/ethereum/wiki/wiki/JSON-RPC type ethApi struct { - xeth *xeth.XEth - methods map[string]ethhandler - codec codec.ApiCoder + xeth *xeth.XEth + ethereum *eth.Ethereum + methods map[string]ethhandler + codec codec.ApiCoder } // eth callback handler @@ -71,12 +75,14 @@ var ( "eth_hashrate": (*ethApi).Hashrate, "eth_getWork": (*ethApi).GetWork, "eth_submitWork": (*ethApi).SubmitWork, + "eth_resend": (*ethApi).Resend, + "eth_pendingTransactions": (*ethApi).PendingTransactions, } ) // create new ethApi instance -func NewEthApi(xeth *xeth.XEth, codec codec.Codec) *ethApi { - return ðApi{xeth, ethMapping, codec.New(nil)} +func NewEthApi(xeth *xeth.XEth, eth *eth.Ethereum, codec codec.Codec) *ethApi { + return ðApi{xeth, eth, ethMapping, codec.New(nil)} } // collection with supported methods @@ -548,3 +554,45 @@ func (self *ethApi) SubmitWork(req *shared.Request) (interface{}, error) { } return self.xeth.RemoteMining().SubmitWork(args.Nonce, common.HexToHash(args.Digest), common.HexToHash(args.Header)), nil } + +func (self *ethApi) Resend(req *shared.Request) (interface{}, error) { + args := new(ResendArgs) + if err := self.codec.Decode(req.Params, &args); err != nil { + return nil, shared.NewDecodeParamError(err.Error()) + } + + ret, err := self.xeth.Transact(args.Tx.From, args.Tx.To, args.Tx.Nonce, args.Tx.Value, args.GasLimit, args.GasPrice, args.Tx.Data) + if err != nil { + return nil, err + } + + self.ethereum.TxPool().RemoveTransactions(types.Transactions{args.Tx.tx}) + + return ret, nil +} + +func (self *ethApi) PendingTransactions(req *shared.Request) (interface{}, error) { + txs := self.ethereum.TxPool().GetTransactions() + + // grab the accounts from the account manager. This will help with determining which + // transactions should be returned. + accounts, err := self.ethereum.AccountManager().Accounts() + if err != nil { + return nil, err + } + + // Add the accouns to a new set + accountSet := set.New() + for _, account := range accounts { + accountSet.Add(account.Address) + } + + var ltxs []*tx + for _, tx := range txs { + if from, _ := tx.From(); accountSet.Has(from) { + ltxs = append(ltxs, newTx(tx)) + } + } + + return ltxs, nil +} diff --git a/rpc/api/eth_args.go b/rpc/api/eth_args.go index bf8ffead6..8f64280d3 100644 --- a/rpc/api/eth_args.go +++ b/rpc/api/eth_args.go @@ -4,9 +4,12 @@ import ( "encoding/json" "fmt" "math/big" + "strconv" + "strings" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc/shared" ) @@ -858,3 +861,178 @@ func (args *SubmitWorkArgs) UnmarshalJSON(b []byte) (err error) { return nil } + +type tx struct { + tx *types.Transaction + + To string + From string + Nonce string + Value string + Data string + GasLimit string + GasPrice string +} + +func newTx(t *types.Transaction) *tx { + from, _ := t.From() + var to string + if t := t.To(); t != nil { + to = t.Hex() + } + + return &tx{ + tx: t, + To: to, + From: from.Hex(), + Value: t.Value().String(), + Nonce: strconv.Itoa(int(t.Nonce())), + Data: "0x" + common.Bytes2Hex(t.Data()), + GasLimit: t.Gas().String(), + GasPrice: t.GasPrice().String(), + } +} + +type ResendArgs struct { + Tx *tx + GasPrice string + GasLimit string +} + +func (tx *tx) UnmarshalJSON(b []byte) (err error) { + var fields map[string]interface{} + if err := json.Unmarshal(b, &fields); err != nil { + return shared.NewDecodeParamError(err.Error()) + } + + var ( + nonce uint64 + to common.Address + amount = new(big.Int).Set(common.Big0) + gasLimit = new(big.Int).Set(common.Big0) + gasPrice = new(big.Int).Set(common.Big0) + data []byte + contractCreation = true + ) + + if val, found := fields["To"]; found { + if strVal, ok := val.(string); ok && len(strVal) > 0 { + tx.To = strVal + to = common.HexToAddress(strVal) + contractCreation = false + } + } + + if val, found := fields["From"]; found { + if strVal, ok := val.(string); ok { + tx.From = strVal + } + } + + if val, found := fields["Nonce"]; found { + if strVal, ok := val.(string); ok { + tx.Nonce = strVal + if nonce, err = strconv.ParseUint(strVal, 10, 64); err != nil { + return shared.NewDecodeParamError(fmt.Sprintf("Unable to decode tx.Nonce - %v", err)) + } + } + } else { + return shared.NewDecodeParamError("tx.Nonce not found") + } + + var parseOk bool + if val, found := fields["Value"]; found { + if strVal, ok := val.(string); ok { + tx.Value = strVal + if _, parseOk = amount.SetString(strVal, 0); !parseOk { + return shared.NewDecodeParamError(fmt.Sprintf("Unable to decode tx.Amount - %v", err)) + } + } + } + + if val, found := fields["Data"]; found { + if strVal, ok := val.(string); ok { + tx.Data = strVal + if strings.HasPrefix(strVal, "0x") { + data = common.Hex2Bytes(strVal[2:]) + } else { + data = common.Hex2Bytes(strVal) + } + } + } + + if val, found := fields["GasLimit"]; found { + if strVal, ok := val.(string); ok { + tx.GasLimit = strVal + if _, parseOk = gasLimit.SetString(strVal, 0); !parseOk { + return shared.NewDecodeParamError(fmt.Sprintf("Unable to decode tx.GasLimit - %v", err)) + } + } + } + + if val, found := fields["GasPrice"]; found { + if strVal, ok := val.(string); ok { + tx.GasPrice = strVal + if _, parseOk = gasPrice.SetString(strVal, 0); !parseOk { + return shared.NewDecodeParamError(fmt.Sprintf("Unable to decode tx.GasPrice - %v", err)) + } + } + } + + if contractCreation { + tx.tx = types.NewContractCreation(nonce, amount, gasLimit, gasPrice, data) + } else { + tx.tx = types.NewTransaction(nonce, to, amount, gasLimit, gasPrice, data) + } + + return nil +} + +func (args *ResendArgs) UnmarshalJSON(b []byte) (err error) { + var obj []interface{} + if err = json.Unmarshal(b, &obj); err != nil { + return shared.NewDecodeParamError(err.Error()) + } + + if len(obj) < 1 { + return shared.NewInsufficientParamsError(len(obj), 1) + } + + data, err := json.Marshal(obj[0]) + if err != nil { + return shared.NewDecodeParamError("Unable to parse transaction object") + } + + trans := new(tx) + err = json.Unmarshal(data, trans) + if err != nil { + return shared.NewDecodeParamError("Unable to parse transaction object") + } + + if trans == nil || trans.tx == nil { + return shared.NewDecodeParamError("Unable to parse transaction object") + } + + gasLimit, gasPrice := trans.GasLimit, trans.GasPrice + + if len(obj) > 1 && obj[1] != nil { + if gp, ok := obj[1].(string); ok { + gasPrice = gp + } else { + return shared.NewInvalidTypeError("gasPrice", "not a string") + } + } + if len(obj) > 2 && obj[2] != nil { + if gl, ok := obj[2].(string); ok { + gasLimit = gl + } else { + return shared.NewInvalidTypeError("gasLimit", "not a string") + } + } + + args.Tx = trans + args.GasPrice = gasPrice + args.GasLimit = gasLimit + + return nil +} diff --git a/rpc/api/eth_js.go b/rpc/api/eth_js.go index e1268eb76..4512cc147 100644 --- a/rpc/api/eth_js.go +++ b/rpc/api/eth_js.go @@ -14,6 +14,21 @@ web3._extend({ params: 2, inputFormatter: [web3._extend.formatters.formatInputString,web3._extend.formatters.formatInputString], outputFormatter: web3._extend.formatters.formatOutputString + }), + new web3._extend.Method({ + name: 'resend', + call: 'eth_resend', + params: 3, + inputFormatter: [function(obj) { return obj; },web3._extend.formatters.formatInputString,web3._extend.formatters.formatInputString], + outputFormatter: web3._extend.formatters.formatOutputString + }) + ], + properties: + [ + new web3._extend.Property({ + name: 'pendingTransactions', + getter: 'eth_pendingTransactions', + outputFormatter: function(obj) { return obj; } }) ] }); diff --git a/rpc/api/mergedapi.go b/rpc/api/mergedapi.go index bc4fa32e8..c40716996 100644 --- a/rpc/api/mergedapi.go +++ b/rpc/api/mergedapi.go @@ -42,7 +42,7 @@ func (self *MergedApi) Methods() []string { // Call the correct API's Execute method for the given request func (self *MergedApi) Execute(req *shared.Request) (interface{}, error) { - glog.V(logger.Detail).Infof("rpc method: %s", req.Method) + glog.V(logger.Detail).Infof("%s %s", req.Method, req.Params) if res, _ := self.handle(req); res != nil { return res, nil diff --git a/rpc/api/utils.go b/rpc/api/utils.go index 6e4835de6..e6a01d3d6 100644 --- a/rpc/api/utils.go +++ b/rpc/api/utils.go @@ -84,6 +84,8 @@ var ( "hashrate", "getWork", "submitWork", + "pendingTransactions", + "resend", }, "miner": []string{ "hashrate", @@ -149,7 +151,7 @@ func ParseApiString(apistr string, codec codec.Codec, xeth *xeth.XEth, eth *eth. case shared.DbApiName: apis[i] = NewDbApi(xeth, eth, codec) case shared.EthApiName: - apis[i] = NewEthApi(xeth, codec) + apis[i] = NewEthApi(xeth, eth, codec) case shared.MinerApiName: apis[i] = NewMinerApi(eth, codec) case shared.NetApiName: diff --git a/tests/util.go b/tests/util.go index 67650c188..ccdba57e0 100644 --- a/tests/util.go +++ b/tests/util.go @@ -120,7 +120,7 @@ type Env struct { coinbase common.Address number *big.Int - time int64 + time uint64 difficulty *big.Int gasLimit *big.Int @@ -150,7 +150,7 @@ func NewEnvFromMap(state *state.StateDB, envValues map[string]string, exeValues //env.parent = common.Hex2Bytes(envValues["previousHash"]) env.coinbase = common.HexToAddress(envValues["currentCoinbase"]) env.number = common.Big(envValues["currentNumber"]) - env.time = common.Big(envValues["currentTimestamp"]).Int64() + env.time = common.Big(envValues["currentTimestamp"]).Uint64() env.difficulty = common.Big(envValues["currentDifficulty"]) env.gasLimit = common.Big(envValues["currentGasLimit"]) env.Gas = new(big.Int) @@ -163,7 +163,7 @@ func (self *Env) BlockNumber() *big.Int { return self.number } //func (self *Env) PrevHash() []byte { return self.parent } func (self *Env) Coinbase() common.Address { return self.coinbase } -func (self *Env) Time() int64 { return self.time } +func (self *Env) Time() uint64 { return self.time } func (self *Env) Difficulty() *big.Int { return self.difficulty } func (self *Env) State() *state.StateDB { return self.state } func (self *Env) GasLimit() *big.Int { return self.gasLimit } diff --git a/xeth/types.go b/xeth/types.go index ed64dc45e..cc06a8dcd 100644 --- a/xeth/types.go +++ b/xeth/types.go @@ -60,7 +60,7 @@ type Block struct { Hash string `json:"hash"` Transactions *common.List `json:"transactions"` Uncles *common.List `json:"uncles"` - Time int64 `json:"time"` + Time uint64 `json:"time"` Coinbase string `json:"coinbase"` Name string `json:"name"` GasLimit string `json:"gasLimit"` @@ -149,7 +149,8 @@ func NewTx(tx *types.Transaction) *Transaction { if to := tx.To(); to != nil { receiver = to.Hex() } else { - receiver = core.AddressFromMessage(tx).Hex() + from, _ := tx.From() + receiver = crypto.CreateAddress(from, tx.Nonce()).Hex() } createsContract := core.MessageCreatesContract(tx) diff --git a/xeth/xeth.go b/xeth/xeth.go index 0dbedff43..2a1366fe1 100644 --- a/xeth/xeth.go +++ b/xeth/xeth.go @@ -802,7 +802,12 @@ func (self *XEth) PushTx(encodedTx string) (string, error) { } if tx.To() == nil { - addr := core.AddressFromMessage(tx) + from, err := tx.From() + if err != nil { + return "", err + } + + addr := crypto.CreateAddress(from, tx.Nonce()) glog.V(logger.Info).Infof("Tx(%x) created: %x\n", tx.Hash(), addr) return addr.Hex(), nil } else { @@ -969,7 +974,7 @@ func (self *XEth) Transact(fromStr, toStr, nonceStr, valueStr, gasStr, gasPriceS } if contractCreation { - addr := core.AddressFromMessage(tx) + addr := crypto.CreateAddress(from, nonce) glog.V(logger.Info).Infof("Tx(%x) created: %x\n", tx.Hash(), addr) return addr.Hex(), nil } else { |