aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cmd/geth/admin.go26
-rw-r--r--cmd/geth/main.go6
-rw-r--r--cmd/rlpdump/main.go2
-rw-r--r--cmd/utils/cmd.go2
-rw-r--r--cmd/utils/flags.go9
-rw-r--r--core/block_processor.go5
-rw-r--r--core/chain_manager.go5
-rw-r--r--core/types/transaction.go2
-rw-r--r--crypto/crypto.go5
-rw-r--r--eth/backend.go65
-rw-r--r--eth/downloader/downloader.go242
-rw-r--r--eth/downloader/downloader_test.go7
-rw-r--r--eth/downloader/peer.go24
-rw-r--r--eth/downloader/queue.go31
-rw-r--r--eth/downloader/synchronous.go79
-rw-r--r--eth/handler.go334
-rw-r--r--eth/peer.go145
-rw-r--r--eth/protocol.go331
-rw-r--r--eth/protocol_test.go18
-rw-r--r--p2p/discover/udp.go2
-rw-r--r--p2p/message.go3
-rw-r--r--p2p/peer_error.go2
-rw-r--r--rlp/decode.go332
-rw-r--r--rlp/decode_test.go287
-rw-r--r--rlp/encode.go8
-rw-r--r--rlp/typecache.go51
-rw-r--r--whisper/envelope.go9
-rw-r--r--whisper/peer.go2
28 files changed, 1331 insertions, 703 deletions
diff --git a/cmd/geth/admin.go b/cmd/geth/admin.go
index f8c717187..2ac155e33 100644
--- a/cmd/geth/admin.go
+++ b/cmd/geth/admin.go
@@ -48,6 +48,32 @@ func (js *jsre) adminBindings() {
debug := t.Object()
debug.Set("printBlock", js.printBlock)
debug.Set("dumpBlock", js.dumpBlock)
+ debug.Set("getBlockRlp", js.getBlockRlp)
+}
+
+func (js *jsre) getBlockRlp(call otto.FunctionCall) otto.Value {
+ var block *types.Block
+ if len(call.ArgumentList) > 0 {
+ if call.Argument(0).IsNumber() {
+ num, _ := call.Argument(0).ToInteger()
+ block = js.ethereum.ChainManager().GetBlockByNumber(uint64(num))
+ } else if call.Argument(0).IsString() {
+ hash, _ := call.Argument(0).ToString()
+ block = js.ethereum.ChainManager().GetBlock(common.HexToHash(hash))
+ } else {
+ fmt.Println("invalid argument for dump. Either hex string or number")
+ }
+
+ } else {
+ block = js.ethereum.ChainManager().CurrentBlock()
+ }
+ if block == nil {
+ fmt.Println("block not found")
+ return otto.UndefinedValue()
+ }
+
+ encoded, _ := rlp.EncodeToBytes(block)
+ return js.re.ToVal(fmt.Sprintf("%x", encoded))
}
func (js *jsre) setExtra(call otto.FunctionCall) otto.Value {
diff --git a/cmd/geth/main.go b/cmd/geth/main.go
index e18b92a2e..dab167bbb 100644
--- a/cmd/geth/main.go
+++ b/cmd/geth/main.go
@@ -31,6 +31,8 @@ import (
"strconv"
"time"
+ "path"
+
"github.com/codegangsta/cli"
"github.com/ethereum/ethash"
"github.com/ethereum/go-ethereum/accounts"
@@ -42,13 +44,12 @@ import (
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/logger"
"github.com/peterh/liner"
- "path"
)
import _ "net/http/pprof"
const (
ClientIdentifier = "Geth"
- Version = "0.9.9"
+ Version = "0.9.10"
)
var app = utils.NewApp(Version, "the go-ethereum command line interface")
@@ -217,6 +218,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso
},
}
app.Flags = []cli.Flag{
+ utils.IdentityFlag,
utils.UnlockedAccountFlag,
utils.PasswordFileFlag,
utils.BootnodesFlag,
diff --git a/cmd/rlpdump/main.go b/cmd/rlpdump/main.go
index 8567dcff8..528ccc6bd 100644
--- a/cmd/rlpdump/main.go
+++ b/cmd/rlpdump/main.go
@@ -78,7 +78,7 @@ func main() {
os.Exit(2)
}
- s := rlp.NewStream(r)
+ s := rlp.NewStream(r, 0)
for {
if err := dump(s, 0); err != nil {
if err != io.EOF {
diff --git a/cmd/utils/cmd.go b/cmd/utils/cmd.go
index 7286f5c5e..64faf6ad1 100644
--- a/cmd/utils/cmd.go
+++ b/cmd/utils/cmd.go
@@ -154,7 +154,7 @@ func ImportChain(chainmgr *core.ChainManager, fn string) error {
defer fh.Close()
chainmgr.Reset()
- stream := rlp.NewStream(fh)
+ stream := rlp.NewStream(fh, 0)
var i, n int
batchSize := 2500
diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go
index 8141fae82..a1d9eedda 100644
--- a/cmd/utils/flags.go
+++ b/cmd/utils/flags.go
@@ -89,6 +89,10 @@ var (
Usage: "Blockchain version",
Value: core.BlockChainVersion,
}
+ IdentityFlag = cli.StringFlag{
+ Name: "identity",
+ Usage: "node name",
+ }
// miner settings
MinerThreadsFlag = cli.IntFlag{
@@ -242,6 +246,11 @@ func MakeEthConfig(clientID, version string, ctx *cli.Context) *eth.Config {
// Set the log dir
glog.SetLogDir(ctx.GlobalString(LogFileFlag.Name))
+ customName := ctx.GlobalString(IdentityFlag.Name)
+ if len(customName) > 0 {
+ clientID += "/" + customName
+ }
+
return &eth.Config{
Name: common.MakeName(clientID, version),
DataDir: ctx.GlobalString(DataDirFlag.Name),
diff --git a/core/block_processor.go b/core/block_processor.go
index d5a29b258..e3c284979 100644
--- a/core/block_processor.go
+++ b/core/block_processor.go
@@ -323,7 +323,7 @@ func (sm *BlockProcessor) VerifyUncles(statedb *state.StateDB, block, parent *ty
}
uncles.Add(block.Hash())
- for _, uncle := range block.Uncles() {
+ for i, uncle := range block.Uncles() {
if uncles.Has(uncle.Hash()) {
// Error not unique
return UncleError("Uncle not unique")
@@ -340,9 +340,8 @@ func (sm *BlockProcessor) VerifyUncles(statedb *state.StateDB, block, parent *ty
}
if err := sm.ValidateHeader(uncle, ancestorHeaders[uncle.ParentHash]); err != nil {
- return ValidationError(fmt.Sprintf("%v", err))
+ return ValidationError(fmt.Sprintf("uncle[%d](%x) header invalid: %v", i, uncle.Hash().Bytes()[:4], err))
}
-
}
return nil
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 8371a129d..4f1e1e68a 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -330,14 +330,13 @@ func (self *ChainManager) GetBlockHashesFromHash(hash common.Hash, max uint64) (
}
// XXX Could be optimised by using a different database which only holds hashes (i.e., linked list)
for i := uint64(0); i < max; i++ {
- parentHash := block.Header().ParentHash
- block = self.GetBlock(parentHash)
+ block = self.GetBlock(block.ParentHash())
if block == nil {
break
}
chain = append(chain, block.Hash())
- if block.Header().Number.Cmp(common.Big0) <= 0 {
+ if block.Number().Cmp(common.Big0) <= 0 {
break
}
}
diff --git a/core/types/transaction.go b/core/types/transaction.go
index 6646bdf29..d8dcd7424 100644
--- a/core/types/transaction.go
+++ b/core/types/transaction.go
@@ -22,7 +22,7 @@ type Transaction struct {
AccountNonce uint64
Price *big.Int
GasLimit *big.Int
- Recipient *common.Address // nil means contract creation
+ Recipient *common.Address `rlp:"nil"` // nil means contract creation
Amount *big.Int
Payload []byte
V byte
diff --git a/crypto/crypto.go b/crypto/crypto.go
index 9865c87c4..89423e0c4 100644
--- a/crypto/crypto.go
+++ b/crypto/crypto.go
@@ -120,6 +120,7 @@ func HexToECDSA(hexkey string) (*ecdsa.PrivateKey, error) {
}
// LoadECDSA loads a secp256k1 private key from the given file.
+// The key data is expected to be hex-encoded.
func LoadECDSA(file string) (*ecdsa.PrivateKey, error) {
buf := make([]byte, 64)
fd, err := os.Open(file)
@@ -139,8 +140,8 @@ func LoadECDSA(file string) (*ecdsa.PrivateKey, error) {
return ToECDSA(key), nil
}
-// SaveECDSA saves a secp256k1 private key to the given file with restrictive
-// permissions
+// SaveECDSA saves a secp256k1 private key to the given file with
+// restrictive permissions. The key data is saved hex-encoded.
func SaveECDSA(file string, key *ecdsa.PrivateKey) error {
k := hex.EncodeToString(FromECDSA(key))
return ioutil.WriteFile(file, []byte(k), 0600)
diff --git a/eth/backend.go b/eth/backend.go
index cde7b167d..264753aba 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -3,19 +3,18 @@ package eth
import (
"crypto/ecdsa"
"fmt"
- "io/ioutil"
"math"
"path"
"strings"
"github.com/ethereum/ethash"
"github.com/ethereum/go-ethereum/accounts"
- "github.com/ethereum/go-ethereum/blockpool"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
@@ -110,7 +109,7 @@ func (cfg *Config) nodeKey() (*ecdsa.PrivateKey, error) {
if key, err = crypto.GenerateKey(); err != nil {
return nil, fmt.Errorf("could not generate server key: %v", err)
}
- if err := ioutil.WriteFile(keyfile, crypto.FromECDSA(key), 0600); err != nil {
+ if err := crypto.SaveECDSA(keyfile, key); err != nil {
glog.V(logger.Error).Infoln("could not persist nodekey: ", err)
}
return key, nil
@@ -127,19 +126,20 @@ type Ethereum struct {
//*** SERVICES ***
// State manager for processing new blocks and managing the over all states
- blockProcessor *core.BlockProcessor
- txPool *core.TxPool
- chainManager *core.ChainManager
- blockPool *blockpool.BlockPool
- accountManager *accounts.Manager
- whisper *whisper.Whisper
- pow *ethash.Ethash
-
- net *p2p.Server
- eventMux *event.TypeMux
- txSub event.Subscription
- blockSub event.Subscription
- miner *miner.Miner
+ blockProcessor *core.BlockProcessor
+ txPool *core.TxPool
+ chainManager *core.ChainManager
+ accountManager *accounts.Manager
+ whisper *whisper.Whisper
+ pow *ethash.Ethash
+ protocolManager *ProtocolManager
+ downloader *downloader.Downloader
+
+ net *p2p.Server
+ eventMux *event.TypeMux
+ txSub event.Subscription
+ minedBlockSub event.Subscription
+ miner *miner.Miner
// logger logger.LogSystem
@@ -208,6 +208,7 @@ func New(config *Config) (*Ethereum, error) {
}
eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux())
+ eth.downloader = downloader.New(eth.chainManager.HasBlock, eth.chainManager.InsertChain, eth.chainManager.Td)
eth.pow = ethash.New(eth.chainManager)
eth.txPool = core.NewTxPool(eth.EventMux(), eth.chainManager.State)
eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux())
@@ -215,23 +216,16 @@ func New(config *Config) (*Ethereum, error) {
eth.whisper = whisper.New()
eth.shhVersionId = int(eth.whisper.Version())
eth.miner = miner.New(eth, eth.pow, config.MinerThreads)
-
- hasBlock := eth.chainManager.HasBlock
- insertChain := eth.chainManager.InsertChain
- td := eth.chainManager.Td()
- eth.blockPool = blockpool.New(hasBlock, insertChain, eth.pow.Verify, eth.EventMux(), td)
+ eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.txPool, eth.chainManager, eth.downloader)
netprv, err := config.nodeKey()
if err != nil {
return nil, err
}
-
- ethProto := EthProtocol(config.ProtocolVersion, config.NetworkId, eth.txPool, eth.chainManager, eth.blockPool)
- protocols := []p2p.Protocol{ethProto}
+ protocols := []p2p.Protocol{eth.protocolManager.SubProtocol}
if config.Shh {
protocols = append(protocols, eth.whisper.Protocol())
}
-
eth.net = &p2p.Server{
PrivateKey: netprv,
Name: config.Name,
@@ -349,7 +343,6 @@ func (s *Ethereum) AccountManager() *accounts.Manager { return s.accountManag
func (s *Ethereum) ChainManager() *core.ChainManager { return s.chainManager }
func (s *Ethereum) BlockProcessor() *core.BlockProcessor { return s.blockProcessor }
func (s *Ethereum) TxPool() *core.TxPool { return s.txPool }
-func (s *Ethereum) BlockPool() *blockpool.BlockPool { return s.blockPool }
func (s *Ethereum) Whisper() *whisper.Whisper { return s.whisper }
func (s *Ethereum) EventMux() *event.TypeMux { return s.eventMux }
func (s *Ethereum) BlockDb() common.Database { return s.blockDb }
@@ -363,6 +356,7 @@ func (s *Ethereum) ClientVersion() string { return s.clientVersio
func (s *Ethereum) EthVersion() int { return s.ethVersionId }
func (s *Ethereum) NetVersion() int { return s.netVersionId }
func (s *Ethereum) ShhVersion() int { return s.shhVersionId }
+func (s *Ethereum) Downloader() *downloader.Downloader { return s.downloader }
// Start the ethereum
func (s *Ethereum) Start() error {
@@ -380,7 +374,6 @@ func (s *Ethereum) Start() error {
// Start services
s.txPool.Start()
- s.blockPool.Start()
if s.whisper != nil {
s.whisper.Start()
@@ -391,8 +384,8 @@ func (s *Ethereum) Start() error {
go s.txBroadcastLoop()
// broadcast mined blocks
- s.blockSub = s.eventMux.Subscribe(core.ChainHeadEvent{})
- go s.blockBroadcastLoop()
+ s.minedBlockSub = s.eventMux.Subscribe(core.NewMinedBlockEvent{})
+ go s.minedBroadcastLoop()
glog.V(logger.Info).Infoln("Server started")
return nil
@@ -406,7 +399,6 @@ func (s *Ethereum) StartForTest() {
// Start services
s.txPool.Start()
- s.blockPool.Start()
}
func (self *Ethereum) SuggestPeer(nodeURL string) error {
@@ -424,12 +416,11 @@ func (s *Ethereum) Stop() {
defer s.stateDb.Close()
defer s.extraDb.Close()
- s.txSub.Unsubscribe() // quits txBroadcastLoop
- s.blockSub.Unsubscribe() // quits blockBroadcastLoop
+ s.txSub.Unsubscribe() // quits txBroadcastLoop
+ s.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
s.txPool.Stop()
s.eventMux.Stop()
- s.blockPool.Stop()
if s.whisper != nil {
s.whisper.Stop()
}
@@ -468,12 +459,12 @@ func (self *Ethereum) syncAccounts(tx *types.Transaction) {
}
}
-func (self *Ethereum) blockBroadcastLoop() {
+func (self *Ethereum) minedBroadcastLoop() {
// automatically stops if unsubscribe
- for obj := range self.blockSub.Chan() {
+ for obj := range self.minedBlockSub.Chan() {
switch ev := obj.(type) {
- case core.ChainHeadEvent:
- self.net.BroadcastLimited("eth", NewBlockMsg, math.Sqrt, []interface{}{ev.Block, ev.Block.Td})
+ case core.NewMinedBlockEvent:
+ self.protocolManager.BroadcastBlock(ev.Block.Hash(), ev.Block)
}
}
}
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go
index 4e795af6d..c4af5e17b 100644
--- a/eth/downloader/downloader.go
+++ b/eth/downloader/downloader.go
@@ -1,6 +1,8 @@
package downloader
import (
+ "errors"
+ "fmt"
"math"
"math/big"
"sync"
@@ -16,8 +18,22 @@ import (
)
const (
- maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk
- minDesiredPeerCount = 3 // Amount of peers desired to start syncing
+ maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk
+ peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
+ hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out
+)
+
+var (
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+ blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out
+
+ errLowTd = errors.New("peer's TD is too low")
+ errBusy = errors.New("busy")
+ errUnknownPeer = errors.New("peer's unknown or unhealthy")
+ errBadPeer = errors.New("action from bad peer ignored")
+ errTimeout = errors.New("timeout")
+ errEmptyHashSet = errors.New("empty hash set by peer")
+ errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
)
type hashCheckFn func(common.Hash) bool
@@ -26,9 +42,10 @@ type hashIterFn func() (common.Hash, error)
type currentTdFn func() *big.Int
type Downloader struct {
- mu sync.RWMutex
- queue *queue
- peers peers
+ mu sync.RWMutex
+ queue *queue
+ peers peers
+ activePeer string
// Callbacks
hasBlock hashCheckFn
@@ -43,7 +60,7 @@ type Downloader struct {
// Channels
newPeerCh chan *peer
syncCh chan syncPack
- HashCh chan []common.Hash
+ hashCh chan []common.Hash
blockCh chan blockPack
quit chan struct{}
}
@@ -68,7 +85,7 @@ func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn)
currentTd: currentTd,
newPeerCh: make(chan *peer, 1),
syncCh: make(chan syncPack, 1),
- HashCh: make(chan []common.Hash, 1),
+ hashCh: make(chan []common.Hash, 1),
blockCh: make(chan blockPack, 1),
quit: make(chan struct{}),
}
@@ -82,7 +99,7 @@ func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getH
d.mu.Lock()
defer d.mu.Unlock()
- glog.V(logger.Detail).Infoln("Register peer", id)
+ glog.V(logger.Detail).Infoln("Register peer", id, "TD =", td)
// Create a new peer and add it to the list of known peers
peer := newPeer(id, td, hash, getHashes, getBlocks)
@@ -94,6 +111,7 @@ func (d *Downloader) RegisterPeer(id string, td *big.Int, hash common.Hash, getH
return nil
}
+// UnregisterPeer unregister's a peer. This will prevent any action from the specified peer.
func (d *Downloader) UnregisterPeer(id string) {
d.mu.Lock()
defer d.mu.Unlock()
@@ -105,22 +123,28 @@ func (d *Downloader) UnregisterPeer(id string) {
func (d *Downloader) peerHandler() {
// itimer is used to determine when to start ignoring `minDesiredPeerCount`
- //itimer := time.NewTicker(5 * time.Second)
- itimer := time.NewTimer(5 * time.Second)
+ itimer := time.NewTimer(peerCountTimeout)
out:
for {
select {
case <-d.newPeerCh:
- itimer.Stop()
// Meet the `minDesiredPeerCount` before we select our best peer
if len(d.peers) < minDesiredPeerCount {
break
}
+ itimer.Stop()
+
d.selectPeer(d.peers.bestPeer())
case <-itimer.C:
// The timer will make sure that the downloader keeps an active state
// in which it attempts to always check the network for highest td peers
- d.selectPeer(d.peers.bestPeer())
+ // Either select the peer or restart the timer if no peers could
+ // be selected.
+ if peer := d.peers.bestPeer(); peer != nil {
+ d.selectPeer(d.peers.bestPeer())
+ } else {
+ itimer.Reset(5 * time.Second)
+ }
case <-d.quit:
break out
}
@@ -131,17 +155,19 @@ func (d *Downloader) selectPeer(p *peer) {
// Make sure it's doing neither. Once done we can restart the
// downloading process if the TD is higher. For now just get on
// with whatever is going on. This prevents unecessary switching.
- if !(d.isFetchingHashes() || d.isDownloadingBlocks() || d.isProcessing()) {
- // selected peer must be better than our own
- // XXX we also check the peer's recent hash to make sure we
- // don't have it. Some peers report (i think) incorrect TD.
- if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) {
- return
- }
-
- glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td)
- d.syncCh <- syncPack{p, p.recentHash, false}
+ if d.isBusy() {
+ return
+ }
+ // selected peer must be better than our own
+ // XXX we also check the peer's recent hash to make sure we
+ // don't have it. Some peers report (i think) incorrect TD.
+ if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) {
+ return
}
+
+ glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td)
+ d.syncCh <- syncPack{p, p.recentHash, false}
+
}
func (d *Downloader) update() {
@@ -149,30 +175,13 @@ out:
for {
select {
case sync := <-d.syncCh:
- selectedPeer := sync.peer
- glog.V(logger.Detail).Infoln("Synchronising with network using:", selectedPeer.id)
- // Start the fetcher. This will block the update entirely
- // interupts need to be send to the appropriate channels
- // respectively.
- if err := d.startFetchingHashes(selectedPeer, sync.hash, sync.ignoreInitial); err != nil {
- // handle error
- glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
- // XXX Reset
+ var peer *peer = sync.peer
+ err := d.getFromPeer(peer, sync.hash, sync.ignoreInitial)
+ if err != nil {
+ glog.V(logger.Detail).Infoln(err)
break
}
- // Start fetching blocks in paralel. The strategy is simple
- // take any available peers, seserve a chunk for each peer available,
- // let the peer deliver the chunkn and periodically check if a peer
- // has timedout. When done downloading, process blocks.
- if err := d.startFetchingBlocks(selectedPeer); err != nil {
- glog.V(logger.Debug).Infoln("Error downloading blocks:", err)
- // XXX reset
- break
- }
-
- glog.V(logger.Detail).Infoln("Sync completed")
-
d.process()
case <-d.quit:
break out
@@ -182,6 +191,9 @@ out:
// XXX Make synchronous
func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitial bool) error {
+ atomic.StoreInt32(&d.fetchingHashes, 1)
+ defer atomic.StoreInt32(&d.fetchingHashes, 0)
+
glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", hash.Bytes()[:4], p.id)
start := time.Now()
@@ -192,15 +204,17 @@ func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitia
// Add the hash to the queue first
d.queue.hashPool.Add(hash)
}
-
// Get the first batch of hashes
p.getHashes(hash)
- atomic.StoreInt32(&d.fetchingHashes, 1)
+
+ failureResponseTimer := time.NewTimer(hashTtl)
out:
for {
select {
- case hashes := <-d.HashCh:
+ case hashes := <-d.hashCh:
+ failureResponseTimer.Reset(hashTtl)
+
var done bool // determines whether we're done fetching hashes (i.e. common hash found)
hashSet := set.New()
for _, hash := range hashes {
@@ -216,26 +230,40 @@ out:
d.queue.put(hashSet)
// Add hashes to the chunk set
- // Check if we're done fetching
- if !done && len(hashes) > 0 {
- //fmt.Println("re-fetch. current =", d.queue.hashPool.Size())
+ if len(hashes) == 0 { // Make sure the peer actually gave you something valid
+ glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", p.id)
+ d.queue.reset()
+
+ return errEmptyHashSet
+ } else if !done { // Check if we're done fetching
// Get the next set of hashes
p.getHashes(hashes[len(hashes)-1])
- atomic.StoreInt32(&d.fetchingHashes, 1)
- } else {
- atomic.StoreInt32(&d.fetchingHashes, 0)
+ } else { // we're done
break out
}
+ case <-failureResponseTimer.C:
+ glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
+ // TODO instead of reseting the queue select a new peer from which we can start downloading hashes.
+ // 1. check for peer's best hash to be included in the current hash set;
+ // 2. resume from last point (hashes[len(hashes)-1]) using the newly selected peer.
+ d.queue.reset()
+
+ return errTimeout
}
}
- glog.V(logger.Detail).Infof("Downloaded hashes (%d). Took %v\n", d.queue.hashPool.Size(), time.Since(start))
+ glog.V(logger.Detail).Infof("Downloaded hashes (%d) in %v\n", d.queue.hashPool.Size(), time.Since(start))
return nil
}
func (d *Downloader) startFetchingBlocks(p *peer) error {
- glog.V(logger.Detail).Infoln("Downloading", d.queue.hashPool.Size(), "blocks")
+ glog.V(logger.Detail).Infoln("Downloading", d.queue.hashPool.Size(), "block(s)")
atomic.StoreInt32(&d.downloadingBlocks, 1)
+ defer atomic.StoreInt32(&d.downloadingBlocks, 0)
+ // Defer the peer reset. This will empty the peer requested set
+ // and makes sure there are no lingering peers with an incorrect
+ // state
+ defer d.peers.reset()
start := time.Now()
@@ -245,18 +273,18 @@ out:
for {
select {
case blockPack := <-d.blockCh:
- d.peers[blockPack.peerId].promote()
- d.queue.deliver(blockPack.peerId, blockPack.blocks)
- d.peers.setState(blockPack.peerId, idleState)
+ // If the peer was previously banned and failed to deliver it's pack
+ // in a reasonable time frame, ignore it's message.
+ if d.peers[blockPack.peerId] != nil {
+ d.peers[blockPack.peerId].promote()
+ d.queue.deliver(blockPack.peerId, blockPack.blocks)
+ d.peers.setState(blockPack.peerId, idleState)
+ }
case <-ticker.C:
// If there are unrequested hashes left start fetching
// from the available peers.
if d.queue.hashPool.Size() > 0 {
availablePeers := d.peers.get(idleState)
- if len(availablePeers) == 0 {
- glog.V(logger.Detail).Infoln("No peers available out of", len(d.peers))
- }
-
for _, peer := range availablePeers {
// Get a possible chunk. If nil is returned no chunk
// could be returned due to no hashes available.
@@ -265,7 +293,6 @@ out:
continue
}
- //fmt.Println("fetching for", peer.id)
// XXX make fetch blocking.
// Fetch the chunk and check for error. If the peer was somehow
// already fetching a chunk due to a bug, it will be returned to
@@ -276,13 +303,19 @@ out:
d.queue.put(chunk.hashes)
}
}
- atomic.StoreInt32(&d.downloadingBlocks, 1)
+
+ // make sure that we have peers available for fetching. If all peers have been tried
+ // and all failed throw an error
+ if len(d.queue.fetching) == 0 {
+ d.queue.reset()
+
+ return fmt.Errorf("%v peers avaialable = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(availablePeers), len(d.peers), d.queue.hashPool.Size())
+ }
+
} else if len(d.queue.fetching) == 0 {
// When there are no more queue and no more `fetching`. We can
// safely assume we're done. Another part of the process will check
// for parent errors and will re-request anything that's missing
- atomic.StoreInt32(&d.downloadingBlocks, 0)
- // Break out so that we can process with processing blocks
break out
} else {
// Check for bad peers. Bad peers may indicate a peer not responding
@@ -293,10 +326,10 @@ out:
d.queue.mu.Lock()
var badPeers []string
for pid, chunk := range d.queue.fetching {
- if time.Since(chunk.itime) > 5*time.Second {
+ if time.Since(chunk.itime) > blockTtl {
badPeers = append(badPeers, pid)
// remove peer as good peer from peer list
- d.UnregisterPeer(pid)
+ //d.UnregisterPeer(pid)
}
}
d.queue.mu.Unlock()
@@ -313,26 +346,48 @@ out:
d.queue.deliver(pid, nil)
if peer := d.peers[pid]; peer != nil {
peer.demote()
+ peer.reset()
}
}
}
- //fmt.Println(d.queue.hashPool.Size(), len(d.queue.fetching))
}
}
- glog.V(logger.Detail).Infoln("Download blocks: done. Took", time.Since(start))
+ glog.V(logger.Detail).Infoln("Downloaded block(s) in", time.Since(start))
+
+ return nil
+}
+
+// Deliver a chunk to the downloader. This is usually done through the BlocksMsg by
+// the protocol handler.
+func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) {
+ d.blockCh <- blockPack{id, blocks}
+}
+
+func (d *Downloader) AddHashes(id string, hashes []common.Hash) error {
+ // make sure that the hashes that are being added are actually from the peer
+ // that's the current active peer. hashes that have been received from other
+ // peers are dropped and ignored.
+ if d.activePeer != id {
+ return fmt.Errorf("received hashes from %s while active peer is %s", id, d.activePeer)
+ }
+
+ d.hashCh <- hashes
return nil
}
// Add an (unrequested) block to the downloader. This is usually done through the
// NewBlockMsg by the protocol handler.
-func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) {
+// Adding blocks is done synchronously. if there are missing blocks, blocks will be
+// fetched first. If the downloader is busy or if some other processed failed an error
+// will be returned.
+func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) error {
hash := block.Hash()
if d.hasBlock(hash) {
- return
+ return fmt.Errorf("known block %x", hash.Bytes()[:4])
}
peer := d.peers.getPeer(id)
@@ -340,7 +395,7 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) {
// and add the block. Otherwise just ignore it
if peer == nil {
glog.V(logger.Detail).Infof("Ignored block from bad peer %s\n", id)
- return
+ return errBadPeer
}
peer.mu.Lock()
@@ -353,23 +408,24 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) {
d.queue.addBlock(id, block, td)
// if neither go ahead to process
- if !(d.isFetchingHashes() || d.isDownloadingBlocks()) {
- // Check if the parent of the received block is known.
- // If the block is not know, request it otherwise, request.
- phash := block.ParentHash()
- if !d.hasBlock(phash) {
- glog.V(logger.Detail).Infof("Missing parent %x, requires fetching\n", phash.Bytes()[:4])
- d.syncCh <- syncPack{peer, peer.recentHash, true}
- } else {
- d.process()
+ if d.isBusy() {
+ return errBusy
+ }
+
+ // Check if the parent of the received block is known.
+ // If the block is not know, request it otherwise, request.
+ phash := block.ParentHash()
+ if !d.hasBlock(phash) {
+ glog.V(logger.Detail).Infof("Missing parent %x, requires fetching\n", phash.Bytes()[:4])
+
+ // Get the missing hashes from the peer (synchronously)
+ err := d.getFromPeer(peer, peer.recentHash, true)
+ if err != nil {
+ return err
}
}
-}
-// Deliver a chunk to the downloader. This is usually done through the BlocksMsg by
-// the protocol handler.
-func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) {
- d.blockCh <- blockPack{id, blocks}
+ return d.process()
}
func (d *Downloader) process() error {
@@ -383,8 +439,11 @@ func (d *Downloader) process() error {
// to a seperate goroutine where it periodically checks for linked pieces.
types.BlockBy(types.Number).Sort(d.queue.blocks)
blocks := d.queue.blocks
+ if len(blocks) == 0 {
+ return nil
+ }
- glog.V(logger.Debug).Infoln("Inserting chain with", len(blocks), "blocks")
+ glog.V(logger.Debug).Infof("Inserting chain with %d blocks (#%v - #%v)\n", len(blocks), blocks[0].Number(), blocks[len(blocks)-1].Number())
var err error
// Loop untill we're out of blocks
@@ -408,6 +467,11 @@ func (d *Downloader) process() error {
}
}
break
+ } else if err != nil {
+ // Reset chain completely. This needs much, much improvement.
+ // instead: check all blocks leading down to this block false block and remove it
+ blocks = nil
+ break
}
blocks = blocks[max:]
}
@@ -432,3 +496,7 @@ func (d *Downloader) isDownloadingBlocks() bool {
func (d *Downloader) isProcessing() bool {
return atomic.LoadInt32(&d.processingBlocks) == 1
}
+
+func (d *Downloader) isBusy() bool {
+ return d.isFetchingHashes() || d.isDownloadingBlocks() || d.isProcessing()
+}
diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go
index 6cf99b678..1d449cfba 100644
--- a/eth/downloader/downloader_test.go
+++ b/eth/downloader/downloader_test.go
@@ -73,7 +73,7 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) error {
}
func (dl *downloadTester) getHashes(hash common.Hash) error {
- dl.downloader.HashCh <- dl.hashes
+ dl.downloader.hashCh <- dl.hashes
return nil
}
@@ -109,6 +109,9 @@ func TestDownload(t *testing.T) {
glog.SetV(logger.Detail)
glog.SetToStderr(true)
+ minDesiredPeerCount = 4
+ blockTtl = 1 * time.Second
+
hashes := createHashes(0, 1000)
blocks := createBlocksFromHashes(hashes)
tester := newTester(t, hashes, blocks)
@@ -123,7 +126,7 @@ success:
case <-tester.done:
break success
case <-time.After(10 * time.Second): // XXX this could actually fail on a slow computer
- t.Error("timout")
+ t.Error("timeout")
}
}
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 4cd306a05..bcb8ad43a 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -6,6 +6,7 @@ import (
"sync"
"github.com/ethereum/go-ethereum/common"
+ "gopkg.in/fatih/set.v0"
)
const (
@@ -19,6 +20,12 @@ type blockFetcherFn func([]common.Hash) error
// XXX make threadsafe!!!!
type peers map[string]*peer
+func (p peers) reset() {
+ for _, peer := range p {
+ peer.reset()
+ }
+}
+
func (p peers) get(state int) []*peer {
var peers []*peer
for _, peer := range p {
@@ -64,13 +71,23 @@ type peer struct {
td *big.Int
recentHash common.Hash
+ ignored *set.Set
+
getHashes hashFetcherFn
getBlocks blockFetcherFn
}
// create a new peer
func newPeer(id string, td *big.Int, hash common.Hash, getHashes hashFetcherFn, getBlocks blockFetcherFn) *peer {
- return &peer{id: id, td: td, recentHash: hash, getHashes: getHashes, getBlocks: getBlocks, state: idleState}
+ return &peer{
+ id: id,
+ td: td,
+ recentHash: hash,
+ getHashes: getHashes,
+ getBlocks: getBlocks,
+ state: idleState,
+ ignored: set.New(),
+ }
}
// fetch a chunk using the peer
@@ -115,3 +132,8 @@ func (p *peer) demote() {
p.rep = 0
}
}
+
+func (p *peer) reset() {
+ p.state = idleState
+ p.ignored.Clear()
+}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 4d1aa4e93..adbc2a0d0 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -31,6 +31,17 @@ func newqueue() *queue {
}
}
+func (c *queue) reset() {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ c.hashPool.Clear()
+ c.fetchPool.Clear()
+ c.blockHashes.Clear()
+ c.blocks = nil
+ c.fetching = make(map[string]*chunk)
+}
+
// reserve a `max` set of hashes for `p` peer.
func (c *queue) get(p *peer, max int) *chunk {
c.mu.Lock()
@@ -45,22 +56,32 @@ func (c *queue) get(p *peer, max int) *chunk {
// Create a new set of hashes
hashes, i := set.New(), 0
c.hashPool.Each(func(v interface{}) bool {
+ // break on limit
if i == limit {
return false
}
+ // skip any hashes that have previously been requested from the peer
+ if p.ignored.Has(v) {
+ return true
+ }
hashes.Add(v)
i++
return true
})
+ // if no hashes can be requested return a nil chunk
+ if hashes.Size() == 0 {
+ return nil
+ }
+
// remove the fetchable hashes from hash pool
c.hashPool.Separate(hashes)
c.fetchPool.Merge(hashes)
// Create a new chunk for the seperated hashes. The time is being used
// to reset the chunk (timeout)
- chunk := &chunk{hashes, time.Now()}
+ chunk := &chunk{p, hashes, time.Now()}
// register as 'fetching' state
c.fetching[p.id] = chunk
@@ -92,6 +113,12 @@ func (c *queue) deliver(id string, blocks []*types.Block) {
// If the chunk was never requested simply ignore it
if chunk != nil {
delete(c.fetching, id)
+ // check the length of the returned blocks. If the length of blocks is 0
+ // we'll assume the peer doesn't know about the chain.
+ if len(blocks) == 0 {
+ // So we can ignore the blocks we didn't know about
+ chunk.peer.ignored.Merge(chunk.hashes)
+ }
// seperate the blocks and the hashes
blockHashes := chunk.fetchedHashes(blocks)
@@ -99,7 +126,6 @@ func (c *queue) deliver(id string, blocks []*types.Block) {
c.blockHashes.Merge(blockHashes)
// Add the blocks
c.blocks = append(c.blocks, blocks...)
-
// Add back whatever couldn't be delivered
c.hashPool.Merge(chunk.hashes)
c.fetchPool.Separate(chunk.hashes)
@@ -115,6 +141,7 @@ func (c *queue) put(hashes *set.Set) {
}
type chunk struct {
+ peer *peer
hashes *set.Set
itime time.Time
}
diff --git a/eth/downloader/synchronous.go b/eth/downloader/synchronous.go
new file mode 100644
index 000000000..7bb49d24e
--- /dev/null
+++ b/eth/downloader/synchronous.go
@@ -0,0 +1,79 @@
+package downloader
+
+import (
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+)
+
+// THIS IS PENDING AND TO DO CHANGES FOR MAKING THE DOWNLOADER SYNCHRONOUS
+
+// SynchroniseWithPeer will select the peer and use it for synchronising. If an empty string is given
+// it will use the best peer possible and synchronise if it's TD is higher than our own. If any of the
+// checks fail an error will be returned. This method is synchronous
+func (d *Downloader) SynchroniseWithPeer(id string) (types.Blocks, error) {
+ // Check if we're busy
+ if d.isBusy() {
+ return nil, errBusy
+ }
+
+ // Attempt to select a peer. This can either be nothing, which returns, best peer
+ // or selected peer. If no peer could be found an error will be returned
+ var p *peer
+ if len(id) == 0 {
+ p = d.peers[id]
+ if p == nil {
+ return nil, errUnknownPeer
+ }
+ } else {
+ p = d.peers.bestPeer()
+ }
+
+ // Make sure our td is lower than the peer's td
+ if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) {
+ return nil, errLowTd
+ }
+
+ // Get the hash from the peer and initiate the downloading progress.
+ err := d.getFromPeer(p, p.recentHash, false)
+ if err != nil {
+ return nil, err
+ }
+
+ return d.queue.blocks, nil
+}
+
+// Synchronise will synchronise using the best peer.
+func (d *Downloader) Synchronise() (types.Blocks, error) {
+ return d.SynchroniseWithPeer("")
+}
+
+func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) error {
+ d.activePeer = p.id
+
+ glog.V(logger.Detail).Infoln("Synchronising with the network using:", p.id)
+ // Start the fetcher. This will block the update entirely
+ // interupts need to be send to the appropriate channels
+ // respectively.
+ if err := d.startFetchingHashes(p, hash, ignoreInitial); err != nil {
+ // handle error
+ glog.V(logger.Debug).Infoln("Error fetching hashes:", err)
+ // XXX Reset
+ return err
+ }
+
+ // Start fetching blocks in paralel. The strategy is simple
+ // take any available peers, seserve a chunk for each peer available,
+ // let the peer deliver the chunkn and periodically check if a peer
+ // has timedout. When done downloading, process blocks.
+ if err := d.startFetchingBlocks(p); err != nil {
+ glog.V(logger.Debug).Infoln("Error downloading blocks:", err)
+ // XXX reset
+ return err
+ }
+
+ glog.V(logger.Detail).Infoln("Sync completed")
+
+ return nil
+}
diff --git a/eth/handler.go b/eth/handler.go
new file mode 100644
index 000000000..5c0660d84
--- /dev/null
+++ b/eth/handler.go
@@ -0,0 +1,334 @@
+package eth
+
+// XXX Fair warning, most of the code is re-used from the old protocol. Please be aware that most of this will actually change
+// The idea is that most of the calls within the protocol will become synchronous.
+// Block downloading and block processing will be complete seperate processes
+/*
+# Possible scenarios
+
+// Synching scenario
+// Use the best peer to synchronise
+blocks, err := pm.downloader.Synchronise()
+if err != nil {
+ // handle
+ break
+}
+pm.chainman.InsertChain(blocks)
+
+// Receiving block with known parent
+if parent_exist {
+ if err := pm.chainman.InsertChain(block); err != nil {
+ // handle
+ break
+ }
+ pm.BroadcastBlock(block)
+}
+
+// Receiving block with unknown parent
+blocks, err := pm.downloader.SynchroniseWithPeer(peer)
+if err != nil {
+ // handle
+ break
+}
+pm.chainman.InsertChain(blocks)
+
+*/
+
+import (
+ "fmt"
+ "math/big"
+ "sync"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+func errResp(code errCode, format string, v ...interface{}) error {
+ return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
+}
+
+type hashFetcherFn func(common.Hash) error
+type blockFetcherFn func([]common.Hash) error
+
+// extProt is an interface which is passed around so we can expose GetHashes and GetBlock without exposing it to the rest of the protocol
+// extProt is passed around to peers which require to GetHashes and GetBlocks
+type extProt struct {
+ getHashes hashFetcherFn
+ getBlocks blockFetcherFn
+}
+
+func (ep extProt) GetHashes(hash common.Hash) error { return ep.getHashes(hash) }
+func (ep extProt) GetBlock(hashes []common.Hash) error { return ep.getBlocks(hashes) }
+
+type ProtocolManager struct {
+ protVer, netId int
+ txpool txPool
+ chainman *core.ChainManager
+ downloader *downloader.Downloader
+
+ pmu sync.Mutex
+ peers map[string]*peer
+
+ SubProtocol p2p.Protocol
+}
+
+// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
+// with the ethereum network.
+func NewProtocolManager(protocolVersion, networkId int, txpool txPool, chainman *core.ChainManager, downloader *downloader.Downloader) *ProtocolManager {
+ manager := &ProtocolManager{
+ txpool: txpool,
+ chainman: chainman,
+ downloader: downloader,
+ peers: make(map[string]*peer),
+ }
+
+ manager.SubProtocol = p2p.Protocol{
+ Name: "eth",
+ Version: uint(protocolVersion),
+ Length: ProtocolLength,
+ Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ peer := manager.newPeer(protocolVersion, networkId, p, rw)
+ err := manager.handle(peer)
+ //glog.V(logger.Detail).Infof("[%s]: %v\n", peer.id, err)
+
+ return err
+ },
+ }
+
+ return manager
+}
+
+func (pm *ProtocolManager) newPeer(pv, nv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
+
+ td, current, genesis := pm.chainman.Status()
+
+ return newPeer(pv, nv, genesis, current, td, p, rw)
+}
+
+func (pm *ProtocolManager) handle(p *peer) error {
+ if err := p.handleStatus(); err != nil {
+ return err
+ }
+ pm.pmu.Lock()
+ pm.peers[p.id] = p
+ pm.pmu.Unlock()
+
+ pm.downloader.RegisterPeer(p.id, p.td, p.currentHash, p.requestHashes, p.requestBlocks)
+ defer func() {
+ pm.pmu.Lock()
+ defer pm.pmu.Unlock()
+ delete(pm.peers, p.id)
+ pm.downloader.UnregisterPeer(p.id)
+ }()
+
+ // propagate existing transactions. new transactions appearing
+ // after this will be sent via broadcasts.
+ if err := p.sendTransactions(pm.txpool.GetTransactions()); err != nil {
+ return err
+ }
+
+ // main loop. handle incoming messages.
+ for {
+ if err := pm.handleMsg(p); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (self *ProtocolManager) handleMsg(p *peer) error {
+ msg, err := p.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if msg.Size > ProtocolMaxMsgSize {
+ return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
+ }
+ // make sure that the payload has been fully consumed
+ defer msg.Discard()
+
+ switch msg.Code {
+ case GetTxMsg: // ignore
+ case StatusMsg:
+ return errResp(ErrExtraStatusMsg, "uncontrolled status message")
+
+ case TxMsg:
+ // TODO: rework using lazy RLP stream
+ var txs []*types.Transaction
+ if err := msg.Decode(&txs); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ for i, tx := range txs {
+ if tx == nil {
+ return errResp(ErrDecode, "transaction %d is nil", i)
+ }
+ jsonlogger.LogJson(&logger.EthTxReceived{
+ TxHash: tx.Hash().Hex(),
+ RemoteId: p.ID().String(),
+ })
+ }
+ self.txpool.AddTransactions(txs)
+
+ case GetBlockHashesMsg:
+ var request getBlockHashesMsgData
+ if err := msg.Decode(&request); err != nil {
+ return errResp(ErrDecode, "->msg %v: %v", msg, err)
+ }
+
+ if request.Amount > maxHashes {
+ request.Amount = maxHashes
+ }
+
+ hashes := self.chainman.GetBlockHashesFromHash(request.Hash, request.Amount)
+
+ if glog.V(logger.Debug) {
+ if len(hashes) == 0 {
+ glog.Infof("invalid block hash %x", request.Hash.Bytes()[:4])
+ }
+ }
+
+ // returns either requested hashes or nothing (i.e. not found)
+ return p.sendBlockHashes(hashes)
+ case BlockHashesMsg:
+ msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+
+ var hashes []common.Hash
+ if err := msgStream.Decode(&hashes); err != nil {
+ break
+ }
+ err := self.downloader.AddHashes(p.id, hashes)
+ if err != nil {
+ glog.V(logger.Debug).Infoln(err)
+ }
+
+ case GetBlocksMsg:
+ var blocks []*types.Block
+
+ msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+ var i int
+ for {
+ i++
+ var hash common.Hash
+ err := msgStream.Decode(&hash)
+ if err == rlp.EOL {
+ break
+ } else if err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+
+ block := self.chainman.GetBlock(hash)
+ if block != nil {
+ blocks = append(blocks, block)
+ }
+ if i == maxBlocks {
+ break
+ }
+ }
+ return p.sendBlocks(blocks)
+ case BlocksMsg:
+ var blocks []*types.Block
+
+ msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if err := msgStream.Decode(&blocks); err != nil {
+ glog.V(logger.Detail).Infoln("Decode error", err)
+ blocks = nil
+ }
+ self.downloader.DeliverChunk(p.id, blocks)
+
+ case NewBlockMsg:
+ var request newBlockMsgData
+ if err := msg.Decode(&request); err != nil {
+ return errResp(ErrDecode, "%v: %v", msg, err)
+ }
+ if err := request.Block.ValidateFields(); err != nil {
+ return errResp(ErrDecode, "block validation %v: %v", msg, err)
+ }
+ hash := request.Block.Hash()
+ // Add the block hash as a known hash to the peer. This will later be used to detirmine
+ // who should receive this.
+ p.blockHashes.Add(hash)
+
+ _, chainHead, _ := self.chainman.Status()
+
+ jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
+ BlockHash: hash.Hex(),
+ BlockNumber: request.Block.Number(), // this surely must be zero
+ ChainHeadHash: chainHead.Hex(),
+ BlockPrevHash: request.Block.ParentHash().Hex(),
+ RemoteId: p.ID().String(),
+ })
+
+ // Make sure the block isn't already known. If this is the case simply drop
+ // the message and move on. If the TD is < currentTd; drop it as well. If this
+ // chain at some point becomes canonical, the downloader will fetch it.
+ if self.chainman.HasBlock(hash) {
+ break
+ }
+ /* XXX unsure about this */
+ if self.chainman.Td().Cmp(request.TD) > 0 && new(big.Int).Add(request.Block.Number(), big.NewInt(7)).Cmp(self.chainman.CurrentBlock().Number()) < 0 {
+ glog.V(logger.Debug).Infof("[%s] dropped block %v due to low TD %v\n", p.id, request.Block.Number(), request.TD)
+ break
+ }
+
+ // Attempt to insert the newly received by checking if the parent exists.
+ // if the parent exists we process the block and propagate to our peers
+ // if the parent does not exists we delegate to the downloader.
+ // NOTE we can reduce chatter by dropping blocks with Td < currentTd
+ if self.chainman.HasBlock(request.Block.ParentHash()) {
+ if err := self.chainman.InsertChain(types.Blocks{request.Block}); err != nil {
+ // handle error
+ return nil
+ }
+ self.BroadcastBlock(hash, request.Block)
+ //fmt.Println(request.Block.Hash().Hex(), "our calculated TD =", request.Block.Td, "their TD =", request.TD)
+ } else {
+ // adding blocks is synchronous
+ go func() {
+ err := self.downloader.AddBlock(p.id, request.Block, request.TD)
+ if err != nil {
+ glog.V(logger.Detail).Infoln("downloader err:", err)
+ return
+ }
+ self.BroadcastBlock(hash, request.Block)
+ //fmt.Println(request.Block.Hash().Hex(), "our calculated TD =", request.Block.Td, "their TD =", request.TD)
+ }()
+ }
+ default:
+ return errResp(ErrInvalidMsgCode, "%v", msg.Code)
+ }
+ return nil
+}
+
+// BroadcastBlock will propagate the block to its connected peers. It will sort
+// out which peers do not contain the block in their block set and will do a
+// sqrt(peers) to determine the amount of peers we broadcast to.
+func (pm *ProtocolManager) BroadcastBlock(hash common.Hash, block *types.Block) {
+ pm.pmu.Lock()
+ defer pm.pmu.Unlock()
+
+ // Find peers who don't know anything about the given hash. Peers that
+ // don't know about the hash will be a candidate for the broadcast loop
+ var peers []*peer
+ for _, peer := range pm.peers {
+ if !peer.blockHashes.Has(hash) {
+ peers = append(peers, peer)
+ }
+ }
+ // Broadcast block to peer set
+ // XXX due to the current shit state of the network disable the limit
+ //peers = peers[:int(math.Sqrt(float64(len(peers))))]
+ for _, peer := range peers {
+ peer.sendNewBlock(block)
+ }
+ glog.V(logger.Detail).Infoln("broadcast block to", len(peers), "peers")
+}
diff --git a/eth/peer.go b/eth/peer.go
new file mode 100644
index 000000000..972880845
--- /dev/null
+++ b/eth/peer.go
@@ -0,0 +1,145 @@
+package eth
+
+import (
+ "fmt"
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/logger"
+ "github.com/ethereum/go-ethereum/logger/glog"
+ "github.com/ethereum/go-ethereum/p2p"
+ "gopkg.in/fatih/set.v0"
+)
+
+type statusMsgData struct {
+ ProtocolVersion uint32
+ NetworkId uint32
+ TD *big.Int
+ CurrentBlock common.Hash
+ GenesisBlock common.Hash
+}
+
+type getBlockHashesMsgData struct {
+ Hash common.Hash
+ Amount uint64
+}
+
+type peer struct {
+ *p2p.Peer
+
+ rw p2p.MsgReadWriter
+
+ protv, netid int
+
+ currentHash common.Hash
+ id string
+ td *big.Int
+
+ genesis, ourHash common.Hash
+ ourTd *big.Int
+
+ txHashes *set.Set
+ blockHashes *set.Set
+}
+
+func newPeer(protv, netid int, genesis, currentHash common.Hash, td *big.Int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
+ id := p.ID()
+
+ return &peer{
+ Peer: p,
+ rw: rw,
+ genesis: genesis,
+ ourHash: currentHash,
+ ourTd: td,
+ protv: protv,
+ netid: netid,
+ id: fmt.Sprintf("%x", id[:8]),
+ txHashes: set.New(),
+ blockHashes: set.New(),
+ }
+}
+
+// sendTransactions sends transactions to the peer and includes the hashes
+// in it's tx hash set for future reference. The tx hash will allow the
+// manager to check whether the peer has already received this particular
+// transaction
+func (p *peer) sendTransactions(txs types.Transactions) error {
+ for _, tx := range txs {
+ p.txHashes.Add(tx.Hash())
+ }
+
+ return p2p.Send(p.rw, TxMsg, txs)
+}
+
+func (p *peer) sendBlockHashes(hashes []common.Hash) error {
+ return p2p.Send(p.rw, BlockHashesMsg, hashes)
+}
+
+func (p *peer) sendBlocks(blocks []*types.Block) error {
+ return p2p.Send(p.rw, BlocksMsg, blocks)
+}
+
+func (p *peer) sendNewBlock(block *types.Block) error {
+ p.blockHashes.Add(block.Hash())
+
+ return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, block.Td})
+}
+
+func (p *peer) requestHashes(from common.Hash) error {
+ glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, maxHashes, from[:4])
+ return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, maxHashes})
+}
+
+func (p *peer) requestBlocks(hashes []common.Hash) error {
+ glog.V(logger.Debug).Infof("[%s] fetching %v blocks\n", p.id, len(hashes))
+ return p2p.Send(p.rw, GetBlocksMsg, hashes)
+}
+
+func (p *peer) handleStatus() error {
+ errc := make(chan error, 1)
+ go func() {
+ errc <- p2p.Send(p.rw, StatusMsg, &statusMsgData{
+ ProtocolVersion: uint32(p.protv),
+ NetworkId: uint32(p.netid),
+ TD: p.ourTd,
+ CurrentBlock: p.ourHash,
+ GenesisBlock: p.genesis,
+ })
+ }()
+
+ // read and handle remote status
+ msg, err := p.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if msg.Code != StatusMsg {
+ return errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
+ }
+ if msg.Size > ProtocolMaxMsgSize {
+ return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
+ }
+
+ var status statusMsgData
+ if err := msg.Decode(&status); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+
+ if status.GenesisBlock != p.genesis {
+ return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, p.genesis)
+ }
+
+ if int(status.NetworkId) != p.netid {
+ return errResp(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, p.netid)
+ }
+
+ if int(status.ProtocolVersion) != p.protv {
+ return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, p.protv)
+ }
+ // Set the total difficulty of the peer
+ p.td = status.TD
+ // set the best hash of the peer
+ p.currentHash = status.CurrentBlock
+
+ return <-errc
+}
diff --git a/eth/protocol.go b/eth/protocol.go
index 1a19307db..48f37b59c 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -1,16 +1,10 @@
package eth
import (
- "fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/errs"
- "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/logger/glog"
- "github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/rlp"
)
const (
@@ -18,8 +12,8 @@ const (
NetworkId = 0
ProtocolLength = uint64(8)
ProtocolMaxMsgSize = 10 * 1024 * 1024
- maxHashes = 256
- maxBlocks = 64
+ maxHashes = 512
+ maxBlocks = 128
)
// eth protocol message codes
@@ -34,6 +28,8 @@ const (
NewBlockMsg
)
+type errCode int
+
const (
ErrMsgTooLarge = iota
ErrDecode
@@ -46,6 +42,11 @@ const (
ErrSuspendedPeer
)
+func (e errCode) String() string {
+ return errorToString[int(e)]
+}
+
+// XXX change once legacy code is out
var errorToString = map[int]string{
ErrMsgTooLarge: "Message too long",
ErrDecode: "Invalid message",
@@ -58,20 +59,6 @@ var errorToString = map[int]string{
ErrSuspendedPeer: "Suspended peer",
}
-// ethProtocol represents the ethereum wire protocol
-// instance is running on each peer
-type ethProtocol struct {
- txPool txPool
- chainManager chainManager
- blockPool blockPool
- peer *p2p.Peer
- id string
- rw p2p.MsgReadWriter
- errors *errs.Errors
- protocolVersion int
- networkId int
-}
-
// backend is the interface the ethereum protocol backend should implement
// used as an argument to EthProtocol
type txPool interface {
@@ -85,308 +72,8 @@ type chainManager interface {
Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash)
}
-type blockPool interface {
- AddBlockHashes(next func() (common.Hash, bool), peerId string)
- AddBlock(block *types.Block, peerId string)
- AddPeer(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool, suspended bool)
- RemovePeer(peerId string)
-}
-
// message structs used for RLP serialization
type newBlockMsgData struct {
Block *types.Block
TD *big.Int
}
-
-type getBlockHashesMsgData struct {
- Hash common.Hash
- Amount uint64
-}
-
-type statusMsgData struct {
- ProtocolVersion uint32
- NetworkId uint32
- TD *big.Int
- CurrentBlock common.Hash
- GenesisBlock common.Hash
-}
-
-// main entrypoint, wrappers starting a server running the eth protocol
-// use this constructor to attach the protocol ("class") to server caps
-// the Dev p2p layer then runs the protocol instance on each peer
-func EthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool) p2p.Protocol {
- return p2p.Protocol{
- Name: "eth",
- Version: uint(protocolVersion),
- Length: ProtocolLength,
- Run: func(peer *p2p.Peer, rw p2p.MsgReadWriter) error {
- return runEthProtocol(protocolVersion, networkId, txPool, chainManager, blockPool, peer, rw)
- },
- }
-}
-
-// the main loop that handles incoming messages
-// note RemovePeer in the post-disconnect hook
-func runEthProtocol(protocolVersion, networkId int, txPool txPool, chainManager chainManager, blockPool blockPool, peer *p2p.Peer, rw p2p.MsgReadWriter) (err error) {
- id := peer.ID()
- self := &ethProtocol{
- txPool: txPool,
- chainManager: chainManager,
- blockPool: blockPool,
- rw: rw,
- peer: peer,
- protocolVersion: protocolVersion,
- networkId: networkId,
- errors: &errs.Errors{
- Package: "ETH",
- Errors: errorToString,
- },
- id: fmt.Sprintf("%x", id[:8]),
- }
-
- // handshake.
- if err := self.handleStatus(); err != nil {
- return err
- }
- defer self.blockPool.RemovePeer(self.id)
-
- // propagate existing transactions. new transactions appearing
- // after this will be sent via broadcasts.
- if err := p2p.Send(rw, TxMsg, txPool.GetTransactions()); err != nil {
- return err
- }
-
- // main loop. handle incoming messages.
- for {
- if err := self.handle(); err != nil {
- return err
- }
- }
-}
-
-func (self *ethProtocol) handle() error {
- msg, err := self.rw.ReadMsg()
- if err != nil {
- return err
- }
- if msg.Size > ProtocolMaxMsgSize {
- return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
- }
- // make sure that the payload has been fully consumed
- defer msg.Discard()
-
- switch msg.Code {
- case GetTxMsg: // ignore
- case StatusMsg:
- return self.protoError(ErrExtraStatusMsg, "")
-
- case TxMsg:
- // TODO: rework using lazy RLP stream
- var txs []*types.Transaction
- if err := msg.Decode(&txs); err != nil {
- return self.protoError(ErrDecode, "msg %v: %v", msg, err)
- }
- for i, tx := range txs {
- if tx == nil {
- return self.protoError(ErrDecode, "transaction %d is nil", i)
- }
- jsonlogger.LogJson(&logger.EthTxReceived{
- TxHash: tx.Hash().Hex(),
- RemoteId: self.peer.ID().String(),
- })
- }
- self.txPool.AddTransactions(txs)
-
- case GetBlockHashesMsg:
- var request getBlockHashesMsgData
- if err := msg.Decode(&request); err != nil {
- return self.protoError(ErrDecode, "->msg %v: %v", msg, err)
- }
-
- if request.Amount > maxHashes {
- request.Amount = maxHashes
- }
- hashes := self.chainManager.GetBlockHashesFromHash(request.Hash, request.Amount)
- return p2p.Send(self.rw, BlockHashesMsg, hashes)
-
- case BlockHashesMsg:
- msgStream := rlp.NewStream(msg.Payload)
- if _, err := msgStream.List(); err != nil {
- return err
- }
-
- var i int
- iter := func() (hash common.Hash, ok bool) {
- err := msgStream.Decode(&hash)
- if err == rlp.EOL {
- return common.Hash{}, false
- } else if err != nil {
- self.protoError(ErrDecode, "msg %v: after %v hashes : %v", msg, i, err)
- return common.Hash{}, false
- }
-
- i++
- return hash, true
- }
- self.blockPool.AddBlockHashes(iter, self.id)
-
- case GetBlocksMsg:
- msgStream := rlp.NewStream(msg.Payload)
- if _, err := msgStream.List(); err != nil {
- return err
- }
-
- var blocks []*types.Block
- var i int
- for {
- i++
- var hash common.Hash
- err := msgStream.Decode(&hash)
- if err == rlp.EOL {
- break
- } else if err != nil {
- return self.protoError(ErrDecode, "msg %v: %v", msg, err)
- }
-
- block := self.chainManager.GetBlock(hash)
- if block != nil {
- blocks = append(blocks, block)
- }
- if i == maxBlocks {
- break
- }
- }
- return p2p.Send(self.rw, BlocksMsg, blocks)
-
- case BlocksMsg:
- msgStream := rlp.NewStream(msg.Payload)
- if _, err := msgStream.List(); err != nil {
- return err
- }
- for {
- var block types.Block
- if err := msgStream.Decode(&block); err != nil {
- if err == rlp.EOL {
- break
- } else {
- return self.protoError(ErrDecode, "msg %v: %v", msg, err)
- }
- }
- if err := block.ValidateFields(); err != nil {
- return self.protoError(ErrDecode, "block validation %v: %v", msg, err)
- }
- self.blockPool.AddBlock(&block, self.id)
- }
-
- case NewBlockMsg:
- var request newBlockMsgData
- if err := msg.Decode(&request); err != nil {
- return self.protoError(ErrDecode, "%v: %v", msg, err)
- }
- if err := request.Block.ValidateFields(); err != nil {
- return self.protoError(ErrDecode, "block validation %v: %v", msg, err)
- }
- hash := request.Block.Hash()
- _, chainHead, _ := self.chainManager.Status()
-
- jsonlogger.LogJson(&logger.EthChainReceivedNewBlock{
- BlockHash: hash.Hex(),
- BlockNumber: request.Block.Number(), // this surely must be zero
- ChainHeadHash: chainHead.Hex(),
- BlockPrevHash: request.Block.ParentHash().Hex(),
- RemoteId: self.peer.ID().String(),
- })
- // to simplify backend interface adding a new block
- // uses AddPeer followed by AddBlock only if peer is the best peer
- // (or selected as new best peer)
- if _, suspended := self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect); !suspended {
- self.blockPool.AddBlock(request.Block, self.id)
- }
-
- default:
- return self.protoError(ErrInvalidMsgCode, "%v", msg.Code)
- }
- return nil
-}
-
-func (self *ethProtocol) handleStatus() error {
- if err := self.sendStatus(); err != nil {
- return err
- }
-
- // read and handle remote status
- msg, err := self.rw.ReadMsg()
- if err != nil {
- return err
- }
- if msg.Code != StatusMsg {
- return self.protoError(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
- }
- if msg.Size > ProtocolMaxMsgSize {
- return self.protoError(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
- }
-
- var status statusMsgData
- if err := msg.Decode(&status); err != nil {
- return self.protoError(ErrDecode, "msg %v: %v", msg, err)
- }
-
- _, _, genesisBlock := self.chainManager.Status()
-
- if status.GenesisBlock != genesisBlock {
- return self.protoError(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock, genesisBlock)
- }
-
- if int(status.NetworkId) != self.networkId {
- return self.protoError(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, self.networkId)
- }
-
- if int(status.ProtocolVersion) != self.protocolVersion {
- return self.protoError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, self.protocolVersion)
- }
-
- _, suspended := self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect)
- if suspended {
- return self.protoError(ErrSuspendedPeer, "")
- }
-
- self.peer.Debugf("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4])
-
- return nil
-}
-
-func (self *ethProtocol) requestBlockHashes(from common.Hash) error {
- self.peer.Debugf("fetching hashes (%d) %x...\n", maxHashes, from[0:4])
- return p2p.Send(self.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, maxHashes})
-}
-
-func (self *ethProtocol) requestBlocks(hashes []common.Hash) error {
- self.peer.Debugf("fetching %v blocks", len(hashes))
- return p2p.Send(self.rw, GetBlocksMsg, hashes)
-}
-
-func (self *ethProtocol) protoError(code int, format string, params ...interface{}) (err *errs.Error) {
- err = self.errors.New(code, format, params...)
- //err.Log(self.peer.Logger)
- err.Log(glog.V(logger.Info))
- return
-}
-
-func (self *ethProtocol) sendStatus() error {
- td, currentBlock, genesisBlock := self.chainManager.Status()
- return p2p.Send(self.rw, StatusMsg, &statusMsgData{
- ProtocolVersion: uint32(self.protocolVersion),
- NetworkId: uint32(self.networkId),
- TD: td,
- CurrentBlock: currentBlock,
- GenesisBlock: genesisBlock,
- })
-}
-
-func (self *ethProtocol) protoErrorDisconnect(err *errs.Error) {
- err.Log(glog.V(logger.Info))
- if err.Fatal() {
- self.peer.Disconnect(p2p.DiscSubprotocolError)
- }
-
-}
diff --git a/eth/protocol_test.go b/eth/protocol_test.go
index 7c724f7a7..d44f66b89 100644
--- a/eth/protocol_test.go
+++ b/eth/protocol_test.go
@@ -1,20 +1,7 @@
package eth
-import (
- "log"
- "math/big"
- "os"
- "testing"
- "time"
-
- "github.com/ethereum/go-ethereum/common"
- "github.com/ethereum/go-ethereum/core/types"
- "github.com/ethereum/go-ethereum/crypto"
- "github.com/ethereum/go-ethereum/errs"
- ethlogger "github.com/ethereum/go-ethereum/logger"
- "github.com/ethereum/go-ethereum/p2p"
- "github.com/ethereum/go-ethereum/p2p/discover"
-)
+/*
+TODO All of these tests need to be re-written
var logsys = ethlogger.NewStdLogSystem(os.Stdout, log.LstdFlags, ethlogger.LogLevel(ethlogger.DebugDetailLevel))
@@ -398,3 +385,4 @@ func TestTransactionsMsg(t *testing.T) {
eth.checkError(ErrDecode, delay)
}
+*/
diff --git a/p2p/discover/udp.go b/p2p/discover/udp.go
index 61a0abed9..07a1a739c 100644
--- a/p2p/discover/udp.go
+++ b/p2p/discover/udp.go
@@ -413,7 +413,7 @@ func decodePacket(buf []byte) (packet, NodeID, []byte, error) {
default:
return nil, fromID, hash, fmt.Errorf("unknown type: %d", ptype)
}
- err = rlp.Decode(bytes.NewReader(sigdata[1:]), req)
+ err = rlp.DecodeBytes(sigdata[1:], req)
return req, fromID, hash, err
}
diff --git a/p2p/message.go b/p2p/message.go
index b42acbe3c..be6405d6f 100644
--- a/p2p/message.go
+++ b/p2p/message.go
@@ -32,7 +32,8 @@ type Msg struct {
//
// For the decoding rules, please see package rlp.
func (msg Msg) Decode(val interface{}) error {
- if err := rlp.Decode(msg.Payload, val); err != nil {
+ s := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if err := s.Decode(val); err != nil {
return newPeerError(errInvalidMsg, "(code %x) (size %d) %v", msg.Code, msg.Size, err)
}
return nil
diff --git a/p2p/peer_error.go b/p2p/peer_error.go
index 402131630..a912f6064 100644
--- a/p2p/peer_error.go
+++ b/p2p/peer_error.go
@@ -57,7 +57,7 @@ func (self *peerError) Error() string {
return self.message
}
-type DiscReason byte
+type DiscReason uint
const (
DiscRequested DiscReason = iota
diff --git a/rlp/decode.go b/rlp/decode.go
index 3b5617475..6952ecaea 100644
--- a/rlp/decode.go
+++ b/rlp/decode.go
@@ -9,6 +9,7 @@ import (
"io"
"math/big"
"reflect"
+ "strings"
)
var (
@@ -35,25 +36,35 @@ type Decoder interface {
// If the type implements the Decoder interface, decode calls
// DecodeRLP.
//
-// To decode into a pointer, Decode will set the pointer to nil if the
-// input has size zero or the input is a single byte with value zero.
-// If the input has nonzero size, Decode will allocate a new value of
-// the type being pointed to.
+// To decode into a pointer, Decode will decode into the value pointed
+// to. If the pointer is nil, a new value of the pointer's element
+// type is allocated. If the pointer is non-nil, the existing value
+// will reused.
//
// To decode into a struct, Decode expects the input to be an RLP
// list. The decoded elements of the list are assigned to each public
-// field in the order given by the struct's definition. If the input
-// list has too few elements, no error is returned and the remaining
-// fields will have the zero value.
-// Recursive struct types are supported.
+// field in the order given by the struct's definition. The input list
+// must contain an element for each decoded field. Decode returns an
+// error if there are too few or too many elements.
+//
+// The decoding of struct fields honours one particular struct tag,
+// "nil". This tag applies to pointer-typed fields and changes the
+// decoding rules for the field such that input values of size zero
+// decode as a nil pointer. This tag can be useful when decoding recursive
+// types.
+//
+// type StructWithEmptyOK struct {
+// Foo *[20]byte `rlp:"nil"`
+// }
//
// To decode into a slice, the input must be a list and the resulting
-// slice will contain the input elements in order.
-// As a special case, if the slice has a byte-size element type, the input
-// can also be an RLP string.
+// slice will contain the input elements in order. For byte slices,
+// the input must be an RLP string. Array types decode similarly, with
+// the additional restriction that the number of input elements (or
+// bytes) must match the array's length.
//
// To decode into a Go string, the input must be an RLP string. The
-// bytes are taken as-is and will not necessarily be valid UTF-8.
+// input bytes are taken as-is and will not necessarily be valid UTF-8.
//
// To decode into an unsigned integer type, the input must also be an RLP
// string. The bytes are interpreted as a big endian representation of
@@ -64,20 +75,28 @@ type Decoder interface {
// To decode into an interface value, Decode stores one of these
// in the value:
//
-// []interface{}, for RLP lists
-// []byte, for RLP strings
+// []interface{}, for RLP lists
+// []byte, for RLP strings
//
// Non-empty interface types are not supported, nor are booleans,
// signed integers, floating point numbers, maps, channels and
// functions.
+//
+// Note that Decode does not set an input limit for all readers
+// and may be vulnerable to panics cause by huge value sizes. If
+// you need an input limit, use
+//
+// NewStream(r, limit).Decode(val)
func Decode(r io.Reader, val interface{}) error {
- return NewStream(r).Decode(val)
+ // TODO: this could use a Stream from a pool.
+ return NewStream(r, 0).Decode(val)
}
// DecodeBytes parses RLP data from b into val.
// Please see the documentation of Decode for the decoding rules.
func DecodeBytes(b []byte, val interface{}) error {
- return NewStream(bytes.NewReader(b)).Decode(val)
+ // TODO: this could use a Stream from a pool.
+ return NewStream(bytes.NewReader(b), uint64(len(b))).Decode(val)
}
type decodeError struct {
@@ -100,7 +119,9 @@ func (err *decodeError) Error() string {
func wrapStreamError(err error, typ reflect.Type) error {
switch err {
case ErrCanonInt:
- return &decodeError{msg: "canon int error appends zero's", typ: typ}
+ return &decodeError{msg: "non-canonical integer (leading zero bytes)", typ: typ}
+ case ErrCanonSize:
+ return &decodeError{msg: "non-canonical size information", typ: typ}
case ErrExpectedList:
return &decodeError{msg: "expected input list", typ: typ}
case ErrExpectedString:
@@ -125,7 +146,7 @@ var (
bigInt = reflect.TypeOf(big.Int{})
)
-func makeDecoder(typ reflect.Type) (dec decoder, err error) {
+func makeDecoder(typ reflect.Type, tags tags) (dec decoder, err error) {
kind := typ.Kind()
switch {
case typ.Implements(decoderInterface):
@@ -145,6 +166,9 @@ func makeDecoder(typ reflect.Type) (dec decoder, err error) {
case kind == reflect.Struct:
return makeStructDecoder(typ)
case kind == reflect.Ptr:
+ if tags.nilOK {
+ return makeOptionalPtrDecoder(typ)
+ }
return makePtrDecoder(typ)
case kind == reflect.Interface:
return decodeInterface, nil
@@ -186,12 +210,10 @@ func decodeBigInt(s *Stream, val reflect.Value) error {
i = new(big.Int)
val.Set(reflect.ValueOf(i))
}
-
- // Reject big integers which are zero appended
+ // Reject leading zero bytes
if len(b) > 0 && b[0] == 0 {
return wrapStreamError(ErrCanonInt, val.Type())
}
-
i.SetBytes(b)
return nil
}
@@ -205,7 +227,7 @@ func makeListDecoder(typ reflect.Type) (decoder, error) {
return decodeByteSlice, nil
}
}
- etypeinfo, err := cachedTypeInfo1(etype)
+ etypeinfo, err := cachedTypeInfo1(etype, tags{})
if err != nil {
return nil, err
}
@@ -259,19 +281,10 @@ func decodeListSlice(s *Stream, val reflect.Value, elemdec decoder) error {
}
func decodeListArray(s *Stream, val reflect.Value, elemdec decoder) error {
- size, err := s.List()
+ _, err := s.List()
if err != nil {
- return err
- }
- if size == 0 {
- zero(val, 0)
- return s.ListEnd()
+ return wrapStreamError(err, val.Type())
}
-
- // The approach here is stolen from package json, although we differ
- // in the semantics for arrays. package json discards remaining
- // elements that would not fit into the array. We generate an error in
- // this case because we'd be losing information.
vlen := val.Len()
i := 0
for ; i < vlen; i++ {
@@ -282,24 +295,18 @@ func decodeListArray(s *Stream, val reflect.Value, elemdec decoder) error {
}
}
if i < vlen {
- zero(val, i)
+ return &decodeError{msg: "input list has too few elements", typ: val.Type()}
}
return wrapStreamError(s.ListEnd(), val.Type())
}
func decodeByteSlice(s *Stream, val reflect.Value) error {
- kind, _, err := s.Kind()
- if err != nil {
- return err
- }
- if kind == List {
- return decodeListSlice(s, val, decodeUint)
- }
b, err := s.Bytes()
- if err == nil {
- val.SetBytes(b)
+ if err != nil {
+ return wrapStreamError(err, val.Type())
}
- return err
+ val.SetBytes(b)
+ return nil
}
func decodeByteArray(s *Stream, val reflect.Value) error {
@@ -307,42 +314,38 @@ func decodeByteArray(s *Stream, val reflect.Value) error {
if err != nil {
return err
}
+ vlen := val.Len()
switch kind {
case Byte:
- if val.Len() == 0 {
+ if vlen == 0 {
return &decodeError{msg: "input string too long", typ: val.Type()}
}
+ if vlen > 1 {
+ return &decodeError{msg: "input string too short", typ: val.Type()}
+ }
bv, _ := s.Uint()
val.Index(0).SetUint(bv)
- zero(val, 1)
case String:
- if uint64(val.Len()) < size {
+ if uint64(vlen) < size {
return &decodeError{msg: "input string too long", typ: val.Type()}
}
- slice := val.Slice(0, int(size)).Interface().([]byte)
+ if uint64(vlen) > size {
+ return &decodeError{msg: "input string too short", typ: val.Type()}
+ }
+ slice := val.Slice(0, vlen).Interface().([]byte)
if err := s.readFull(slice); err != nil {
return err
}
- zero(val, int(size))
+ // Reject cases where single byte encoding should have been used.
+ if size == 1 && slice[0] < 56 {
+ return wrapStreamError(ErrCanonSize, val.Type())
+ }
case List:
- return decodeListArray(s, val, decodeUint)
+ return wrapStreamError(ErrExpectedString, val.Type())
}
return nil
}
-func zero(val reflect.Value, start int) {
- z := reflect.Zero(val.Type().Elem())
- end := val.Len()
- for i := start; i < end; i++ {
- val.Index(i).Set(z)
- }
-}
-
-type field struct {
- index int
- info *typeinfo
-}
-
func makeStructDecoder(typ reflect.Type) (decoder, error) {
fields, err := structFields(typ)
if err != nil {
@@ -355,8 +358,7 @@ func makeStructDecoder(typ reflect.Type) (decoder, error) {
for _, f := range fields {
err = f.info.decoder(s, val.Field(f.index))
if err == EOL {
- // too few elements. leave the rest at their zero value.
- break
+ return &decodeError{msg: "too few elements", typ: typ}
} else if err != nil {
return addErrorContext(err, "."+typ.Field(f.index).Name)
}
@@ -366,15 +368,41 @@ func makeStructDecoder(typ reflect.Type) (decoder, error) {
return dec, nil
}
+// makePtrDecoder creates a decoder that decodes into
+// the pointer's element type.
func makePtrDecoder(typ reflect.Type) (decoder, error) {
etype := typ.Elem()
- etypeinfo, err := cachedTypeInfo1(etype)
+ etypeinfo, err := cachedTypeInfo1(etype, tags{})
if err != nil {
return nil, err
}
dec := func(s *Stream, val reflect.Value) (err error) {
- _, size, err := s.Kind()
- if err != nil || size == 0 && s.byteval == 0 {
+ newval := val
+ if val.IsNil() {
+ newval = reflect.New(etype)
+ }
+ if err = etypeinfo.decoder(s, newval.Elem()); err == nil {
+ val.Set(newval)
+ }
+ return err
+ }
+ return dec, nil
+}
+
+// makeOptionalPtrDecoder creates a decoder that decodes empty values
+// as nil. Non-empty values are decoded into a value of the element type,
+// just like makePtrDecoder does.
+//
+// This decoder is used for pointer-typed struct fields with struct tag "nil".
+func makeOptionalPtrDecoder(typ reflect.Type) (decoder, error) {
+ etype := typ.Elem()
+ etypeinfo, err := cachedTypeInfo1(etype, tags{})
+ if err != nil {
+ return nil, err
+ }
+ dec := func(s *Stream, val reflect.Value) (err error) {
+ kind, size, err := s.Kind()
+ if err != nil || size == 0 && kind != Byte {
// rearm s.Kind. This is important because the input
// position must advance to the next value even though
// we don't read anything.
@@ -465,15 +493,18 @@ var (
// has been reached during streaming.
EOL = errors.New("rlp: end of list")
- // Other errors
+ // Actual Errors
ErrExpectedString = errors.New("rlp: expected String or Byte")
ErrExpectedList = errors.New("rlp: expected List")
- ErrCanonInt = errors.New("rlp: expected Int")
+ ErrCanonInt = errors.New("rlp: non-canonical integer format")
+ ErrCanonSize = errors.New("rlp: non-canonical size information")
ErrElemTooLarge = errors.New("rlp: element is larger than containing list")
+ ErrValueTooLarge = errors.New("rlp: value size exceeds available input length")
// internal errors
- errNotInList = errors.New("rlp: call of ListEnd outside of any list")
- errNotAtEOL = errors.New("rlp: call of ListEnd not positioned at EOL")
+ errNotInList = errors.New("rlp: call of ListEnd outside of any list")
+ errNotAtEOL = errors.New("rlp: call of ListEnd not positioned at EOL")
+ errUintOverflow = errors.New("rlp: uint overflow")
)
// ByteReader must be implemented by any input reader for a Stream. It
@@ -496,23 +527,44 @@ type ByteReader interface {
//
// Stream is not safe for concurrent use.
type Stream struct {
- r ByteReader
+ r ByteReader
+
+ // number of bytes remaining to be read from r.
+ remaining uint64
+ limited bool
+
+ // auxiliary buffer for integer decoding
uintbuf []byte
kind Kind // kind of value ahead
size uint64 // size of value ahead
byteval byte // value of single byte in type tag
+ kinderr error // error from last readKind
stack []listpos
}
type listpos struct{ pos, size uint64 }
-// NewStream creates a new stream reading from r.
-// If r does not implement ByteReader, the Stream will
-// introduce its own buffering.
-func NewStream(r io.Reader) *Stream {
+// NewStream creates a new decoding stream reading from r.
+//
+// If r implements the ByteReader interface, Stream will
+// not introduce any buffering.
+//
+// For non-toplevel values, Stream returns ErrElemTooLarge
+// for values that do not fit into the enclosing list.
+//
+// Stream supports an optional input limit. If a limit is set, the
+// size of any toplevel value will be checked against the remaining
+// input length. Stream operations that encounter a value exceeding
+// the remaining input length will return ErrValueTooLarge. The limit
+// can be set by passing a non-zero value for inputLimit.
+//
+// If r is a bytes.Reader or strings.Reader, the input limit is set to
+// the length of r's underlying data unless an explicit limit is
+// provided.
+func NewStream(r io.Reader, inputLimit uint64) *Stream {
s := new(Stream)
- s.Reset(r)
+ s.Reset(r, inputLimit)
return s
}
@@ -520,7 +572,7 @@ func NewStream(r io.Reader) *Stream {
// at an encoded list of the given length.
func NewListStream(r io.Reader, len uint64) *Stream {
s := new(Stream)
- s.Reset(r)
+ s.Reset(r, len)
s.kind = List
s.size = len
return s
@@ -543,6 +595,9 @@ func (s *Stream) Bytes() ([]byte, error) {
if err = s.readFull(b); err != nil {
return nil, err
}
+ if size == 1 && b[0] < 56 {
+ return nil, ErrCanonSize
+ }
return b, nil
default:
return nil, ErrExpectedString
@@ -574,8 +629,6 @@ func (s *Stream) Raw() ([]byte, error) {
return buf, nil
}
-var errUintOverflow = errors.New("rlp: uint overflow")
-
// Uint reads an RLP string of up to 8 bytes and returns its contents
// as an unsigned integer. If the input does not contain an RLP string, the
// returned error will be ErrExpectedString.
@@ -590,13 +643,27 @@ func (s *Stream) uint(maxbits int) (uint64, error) {
}
switch kind {
case Byte:
+ if s.byteval == 0 {
+ return 0, ErrCanonInt
+ }
s.kind = -1 // rearm Kind
return uint64(s.byteval), nil
case String:
if size > uint64(maxbits/8) {
return 0, errUintOverflow
}
- return s.readUint(byte(size))
+ v, err := s.readUint(byte(size))
+ switch {
+ case err == ErrCanonSize:
+ // Adjust error because we're not reading a size right now.
+ return 0, ErrCanonInt
+ case err != nil:
+ return 0, err
+ case size > 0 && v < 56:
+ return 0, ErrCanonSize
+ default:
+ return v, nil
+ }
default:
return 0, ErrExpectedString
}
@@ -653,7 +720,7 @@ func (s *Stream) Decode(val interface{}) error {
if rval.IsNil() {
return errDecodeIntoNil
}
- info, err := cachedTypeInfo(rtyp.Elem())
+ info, err := cachedTypeInfo(rtyp.Elem(), tags{})
if err != nil {
return err
}
@@ -667,17 +734,40 @@ func (s *Stream) Decode(val interface{}) error {
}
// Reset discards any information about the current decoding context
-// and starts reading from r. If r does not also implement ByteReader,
-// Stream will do its own buffering.
-func (s *Stream) Reset(r io.Reader) {
+// and starts reading from r. This method is meant to facilitate reuse
+// of a preallocated Stream across many decoding operations.
+//
+// If r does not also implement ByteReader, Stream will do its own
+// buffering.
+func (s *Stream) Reset(r io.Reader, inputLimit uint64) {
+ if inputLimit > 0 {
+ s.remaining = inputLimit
+ s.limited = true
+ } else {
+ // Attempt to automatically discover
+ // the limit when reading from a byte slice.
+ switch br := r.(type) {
+ case *bytes.Reader:
+ s.remaining = uint64(br.Len())
+ s.limited = true
+ case *strings.Reader:
+ s.remaining = uint64(br.Len())
+ s.limited = true
+ default:
+ s.limited = false
+ }
+ }
+ // Wrap r with a buffer if it doesn't have one.
bufr, ok := r.(ByteReader)
if !ok {
bufr = bufio.NewReader(r)
}
s.r = bufr
+ // Reset the decoding context.
s.stack = s.stack[:0]
s.size = 0
s.kind = -1
+ s.kinderr = nil
if s.uintbuf == nil {
s.uintbuf = make([]byte, 8)
}
@@ -700,19 +790,31 @@ func (s *Stream) Kind() (kind Kind, size uint64, err error) {
tos = &s.stack[len(s.stack)-1]
}
if s.kind < 0 {
+ s.kinderr = nil
+ // Don't read further if we're at the end of the
+ // innermost list.
if tos != nil && tos.pos == tos.size {
return 0, 0, EOL
}
- kind, size, err = s.readKind()
- if err != nil {
- return 0, 0, err
+ s.kind, s.size, s.kinderr = s.readKind()
+ if s.kinderr == nil {
+ if tos == nil {
+ // At toplevel, check that the value is smaller
+ // than the remaining input length.
+ if s.limited && s.size > s.remaining {
+ s.kinderr = ErrValueTooLarge
+ }
+ } else {
+ // Inside a list, check that the value doesn't overflow the list.
+ if s.size > tos.size-tos.pos {
+ s.kinderr = ErrElemTooLarge
+ }
+ }
}
- s.kind, s.size = kind, size
- }
- if tos != nil && tos.pos+s.size > tos.size {
- return 0, 0, ErrElemTooLarge
}
- return s.kind, s.size, nil
+ // Note: this might return a sticky error generated
+ // by an earlier call to readKind.
+ return s.kind, s.size, s.kinderr
}
func (s *Stream) readKind() (kind Kind, size uint64, err error) {
@@ -741,6 +843,9 @@ func (s *Stream) readKind() (kind Kind, size uint64, err error) {
// would be encoded as 0xB90400 followed by the string. The range of
// the first byte is thus [0xB8, 0xBF].
size, err = s.readUint(b - 0xB7)
+ if err == nil && size < 56 {
+ err = ErrCanonSize
+ }
return String, size, err
case b < 0xF8:
// If the total payload of a list
@@ -757,27 +862,46 @@ func (s *Stream) readKind() (kind Kind, size uint64, err error) {
// the concatenation of the RLP encodings of the items. The
// range of the first byte is thus [0xF8, 0xFF].
size, err = s.readUint(b - 0xF7)
+ if err == nil && size < 56 {
+ err = ErrCanonSize
+ }
return List, size, err
}
}
func (s *Stream) readUint(size byte) (uint64, error) {
- if size == 1 {
+ switch size {
+ case 0:
+ s.kind = -1 // rearm Kind
+ return 0, nil
+ case 1:
b, err := s.readByte()
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return uint64(b), err
+ default:
+ start := int(8 - size)
+ for i := 0; i < start; i++ {
+ s.uintbuf[i] = 0
+ }
+ if err := s.readFull(s.uintbuf[start:]); err != nil {
+ return 0, err
+ }
+ if s.uintbuf[start] == 0 {
+ // Note: readUint is also used to decode integer
+ // values. The error needs to be adjusted to become
+ // ErrCanonInt in this case.
+ return 0, ErrCanonSize
+ }
+ return binary.BigEndian.Uint64(s.uintbuf), nil
}
- start := int(8 - size)
- for i := 0; i < start; i++ {
- s.uintbuf[i] = 0
- }
- err := s.readFull(s.uintbuf[start:])
- return binary.BigEndian.Uint64(s.uintbuf), err
}
func (s *Stream) readFull(buf []byte) (err error) {
+ if s.limited && s.remaining < uint64(len(buf)) {
+ return ErrValueTooLarge
+ }
s.willRead(uint64(len(buf)))
var nn, n int
for n < len(buf) && err == nil {
@@ -791,6 +915,9 @@ func (s *Stream) readFull(buf []byte) (err error) {
}
func (s *Stream) readByte() (byte, error) {
+ if s.limited && s.remaining == 0 {
+ return 0, io.EOF
+ }
s.willRead(1)
b, err := s.r.ReadByte()
if len(s.stack) > 0 && err == io.EOF {
@@ -801,6 +928,9 @@ func (s *Stream) readByte() (byte, error) {
func (s *Stream) willRead(n uint64) {
s.kind = -1 // rearm Kind
+ if s.limited {
+ s.remaining -= n
+ }
if len(s.stack) > 0 {
s.stack[len(s.stack)-1].pos += n
}
diff --git a/rlp/decode_test.go b/rlp/decode_test.go
index 73a31c67f..d07520bd0 100644
--- a/rlp/decode_test.go
+++ b/rlp/decode_test.go
@@ -21,22 +21,18 @@ func TestStreamKind(t *testing.T) {
{"7F", Byte, 0},
{"80", String, 0},
{"B7", String, 55},
- {"B800", String, 0},
{"B90400", String, 1024},
- {"BA000400", String, 1024},
- {"BB00000400", String, 1024},
{"BFFFFFFFFFFFFFFFFF", String, ^uint64(0)},
{"C0", List, 0},
{"C8", List, 8},
{"F7", List, 55},
- {"F800", List, 0},
- {"F804", List, 4},
{"F90400", List, 1024},
{"FFFFFFFFFFFFFFFFFF", List, ^uint64(0)},
}
for i, test := range tests {
- s := NewStream(bytes.NewReader(unhex(test.input)))
+ // using plainReader to inhibit input limit errors.
+ s := NewStream(newPlainReader(unhex(test.input)), 0)
kind, len, err := s.Kind()
if err != nil {
t.Errorf("test %d: Kind returned error: %v", i, err)
@@ -70,29 +66,85 @@ func TestNewListStream(t *testing.T) {
}
func TestStreamErrors(t *testing.T) {
+ withoutInputLimit := func(b []byte) *Stream {
+ return NewStream(newPlainReader(b), 0)
+ }
+ withCustomInputLimit := func(limit uint64) func([]byte) *Stream {
+ return func(b []byte) *Stream {
+ return NewStream(bytes.NewReader(b), limit)
+ }
+ }
+
type calls []string
tests := []struct {
string
calls
- error
+ newStream func([]byte) *Stream // uses bytes.Reader if nil
+ error error
}{
- {"", calls{"Kind"}, io.EOF},
- {"", calls{"List"}, io.EOF},
- {"", calls{"Uint"}, io.EOF},
- {"C0", calls{"Bytes"}, ErrExpectedString},
- {"C0", calls{"Uint"}, ErrExpectedString},
- {"81", calls{"Bytes"}, io.ErrUnexpectedEOF},
- {"81", calls{"Uint"}, io.ErrUnexpectedEOF},
- {"BFFFFFFFFFFFFFFF", calls{"Bytes"}, io.ErrUnexpectedEOF},
- {"89000000000000000001", calls{"Uint"}, errUintOverflow},
- {"00", calls{"List"}, ErrExpectedList},
- {"80", calls{"List"}, ErrExpectedList},
- {"C0", calls{"List", "Uint"}, EOL},
- {"C801", calls{"List", "Uint", "Uint"}, io.ErrUnexpectedEOF},
- {"C8C9", calls{"List", "Kind"}, ErrElemTooLarge},
- {"C3C2010201", calls{"List", "List", "Uint", "Uint", "ListEnd", "Uint"}, EOL},
- {"00", calls{"ListEnd"}, errNotInList},
- {"C40102", calls{"List", "Uint", "ListEnd"}, errNotAtEOL},
+ {"C0", calls{"Bytes"}, nil, ErrExpectedString},
+ {"C0", calls{"Uint"}, nil, ErrExpectedString},
+ {"89000000000000000001", calls{"Uint"}, nil, errUintOverflow},
+ {"00", calls{"List"}, nil, ErrExpectedList},
+ {"80", calls{"List"}, nil, ErrExpectedList},
+ {"C0", calls{"List", "Uint"}, nil, EOL},
+ {"C8C9010101010101010101", calls{"List", "Kind"}, nil, ErrElemTooLarge},
+ {"C3C2010201", calls{"List", "List", "Uint", "Uint", "ListEnd", "Uint"}, nil, EOL},
+ {"00", calls{"ListEnd"}, nil, errNotInList},
+ {"C401020304", calls{"List", "Uint", "ListEnd"}, nil, errNotAtEOL},
+
+ // Non-canonical integers (e.g. leading zero bytes).
+ {"00", calls{"Uint"}, nil, ErrCanonInt},
+ {"820002", calls{"Uint"}, nil, ErrCanonInt},
+ {"8133", calls{"Uint"}, nil, ErrCanonSize},
+ {"8156", calls{"Uint"}, nil, nil},
+
+ // Size tags must use the smallest possible encoding.
+ // Leading zero bytes in the size tag are also rejected.
+ {"8100", calls{"Uint"}, nil, ErrCanonSize},
+ {"8100", calls{"Bytes"}, nil, ErrCanonSize},
+ {"B800", calls{"Kind"}, withoutInputLimit, ErrCanonSize},
+ {"B90000", calls{"Kind"}, withoutInputLimit, ErrCanonSize},
+ {"B90055", calls{"Kind"}, withoutInputLimit, ErrCanonSize},
+ {"BA0002FFFF", calls{"Bytes"}, withoutInputLimit, ErrCanonSize},
+ {"F800", calls{"Kind"}, withoutInputLimit, ErrCanonSize},
+ {"F90000", calls{"Kind"}, withoutInputLimit, ErrCanonSize},
+ {"F90055", calls{"Kind"}, withoutInputLimit, ErrCanonSize},
+ {"FA0002FFFF", calls{"List"}, withoutInputLimit, ErrCanonSize},
+
+ // Expected EOF
+ {"", calls{"Kind"}, nil, io.EOF},
+ {"", calls{"Uint"}, nil, io.EOF},
+ {"", calls{"List"}, nil, io.EOF},
+ {"8158", calls{"Uint", "Uint"}, nil, io.EOF},
+ {"C0", calls{"List", "ListEnd", "List"}, nil, io.EOF},
+
+ // Input limit errors.
+ {"81", calls{"Bytes"}, nil, ErrValueTooLarge},
+ {"81", calls{"Uint"}, nil, ErrValueTooLarge},
+ {"81", calls{"Raw"}, nil, ErrValueTooLarge},
+ {"BFFFFFFFFFFFFFFFFFFF", calls{"Bytes"}, nil, ErrValueTooLarge},
+ {"C801", calls{"List"}, nil, ErrValueTooLarge},
+
+ // Test for list element size check overflow.
+ {"CD04040404FFFFFFFFFFFFFFFFFF0303", calls{"List", "Uint", "Uint", "Uint", "Uint", "List"}, nil, ErrElemTooLarge},
+
+ // Test for input limit overflow. Since we are counting the limit
+ // down toward zero in Stream.remaining, reading too far can overflow
+ // remaining to a large value, effectively disabling the limit.
+ {"C40102030401", calls{"Raw", "Uint"}, withCustomInputLimit(5), io.EOF},
+ {"C4010203048158", calls{"Raw", "Uint"}, withCustomInputLimit(6), ErrValueTooLarge},
+
+ // Check that the same calls are fine without a limit.
+ {"C40102030401", calls{"Raw", "Uint"}, withoutInputLimit, nil},
+ {"C4010203048158", calls{"Raw", "Uint"}, withoutInputLimit, nil},
+
+ // Unexpected EOF. This only happens when there is
+ // no input limit, so the reader needs to be 'dumbed down'.
+ {"81", calls{"Bytes"}, withoutInputLimit, io.ErrUnexpectedEOF},
+ {"81", calls{"Uint"}, withoutInputLimit, io.ErrUnexpectedEOF},
+ {"BFFFFFFFFFFFFFFF", calls{"Bytes"}, withoutInputLimit, io.ErrUnexpectedEOF},
+ {"C801", calls{"List", "Uint", "Uint"}, withoutInputLimit, io.ErrUnexpectedEOF},
// This test verifies that the input position is advanced
// correctly when calling Bytes for empty strings. Kind can be called
@@ -109,12 +161,15 @@ func TestStreamErrors(t *testing.T) {
"Bytes", // past final element
"Bytes", // this one should fail
- }, EOL},
+ }, nil, EOL},
}
testfor:
for i, test := range tests {
- s := NewStream(bytes.NewReader(unhex(test.string)))
+ if test.newStream == nil {
+ test.newStream = func(b []byte) *Stream { return NewStream(bytes.NewReader(b), 0) }
+ }
+ s := test.newStream(unhex(test.string))
rs := reflect.ValueOf(s)
for j, call := range test.calls {
fval := rs.MethodByName(call)
@@ -124,11 +179,17 @@ testfor:
err = lastret.(error).Error()
}
if j == len(test.calls)-1 {
- if err != test.error.Error() {
- t.Errorf("test %d: last call (%s) error mismatch\ngot: %s\nwant: %v",
+ want := "<nil>"
+ if test.error != nil {
+ want = test.error.Error()
+ }
+ if err != want {
+ t.Log(test)
+ t.Errorf("test %d: last call (%s) error mismatch\ngot: %s\nwant: %s",
i, call, err, test.error)
}
} else if err != "<nil>" {
+ t.Log(test)
t.Errorf("test %d: call %d (%s) unexpected error: %q", i, j, call, err)
continue testfor
}
@@ -137,7 +198,7 @@ testfor:
}
func TestStreamList(t *testing.T) {
- s := NewStream(bytes.NewReader(unhex("C80102030405060708")))
+ s := NewStream(bytes.NewReader(unhex("C80102030405060708")), 0)
len, err := s.List()
if err != nil {
@@ -166,7 +227,7 @@ func TestStreamList(t *testing.T) {
}
func TestStreamRaw(t *testing.T) {
- s := NewStream(bytes.NewReader(unhex("C58401010101")))
+ s := NewStream(bytes.NewReader(unhex("C58401010101")), 0)
s.List()
want := unhex("8401010101")
@@ -219,7 +280,7 @@ type simplestruct struct {
type recstruct struct {
I uint
- Child *recstruct
+ Child *recstruct `rlp:"nil"`
}
var (
@@ -229,78 +290,58 @@ var (
)
)
-var (
- sharedByteArray [5]byte
- sharedPtr = new(*uint)
-)
-
var decodeTests = []decodeTest{
// integers
{input: "05", ptr: new(uint32), value: uint32(5)},
{input: "80", ptr: new(uint32), value: uint32(0)},
- {input: "8105", ptr: new(uint32), value: uint32(5)},
{input: "820505", ptr: new(uint32), value: uint32(0x0505)},
{input: "83050505", ptr: new(uint32), value: uint32(0x050505)},
{input: "8405050505", ptr: new(uint32), value: uint32(0x05050505)},
{input: "850505050505", ptr: new(uint32), error: "rlp: input string too long for uint32"},
{input: "C0", ptr: new(uint32), error: "rlp: expected input string or byte for uint32"},
+ {input: "00", ptr: new(uint32), error: "rlp: non-canonical integer (leading zero bytes) for uint32"},
+ {input: "8105", ptr: new(uint32), error: "rlp: non-canonical size information for uint32"},
+ {input: "820004", ptr: new(uint32), error: "rlp: non-canonical integer (leading zero bytes) for uint32"},
+ {input: "B8020004", ptr: new(uint32), error: "rlp: non-canonical size information for uint32"},
// slices
{input: "C0", ptr: new([]uint), value: []uint{}},
{input: "C80102030405060708", ptr: new([]uint), value: []uint{1, 2, 3, 4, 5, 6, 7, 8}},
+ {input: "F8020004", ptr: new([]uint), error: "rlp: non-canonical size information for []uint"},
// arrays
- {input: "C0", ptr: new([5]uint), value: [5]uint{}},
{input: "C50102030405", ptr: new([5]uint), value: [5]uint{1, 2, 3, 4, 5}},
+ {input: "C0", ptr: new([5]uint), error: "rlp: input list has too few elements for [5]uint"},
+ {input: "C102", ptr: new([5]uint), error: "rlp: input list has too few elements for [5]uint"},
{input: "C6010203040506", ptr: new([5]uint), error: "rlp: input list has too many elements for [5]uint"},
+ {input: "F8020004", ptr: new([5]uint), error: "rlp: non-canonical size information for [5]uint"},
+
+ // zero sized arrays
+ {input: "C0", ptr: new([0]uint), value: [0]uint{}},
+ {input: "C101", ptr: new([0]uint), error: "rlp: input list has too many elements for [0]uint"},
// byte slices
{input: "01", ptr: new([]byte), value: []byte{1}},
{input: "80", ptr: new([]byte), value: []byte{}},
{input: "8D6162636465666768696A6B6C6D", ptr: new([]byte), value: []byte("abcdefghijklm")},
- {input: "C0", ptr: new([]byte), value: []byte{}},
- {input: "C3010203", ptr: new([]byte), value: []byte{1, 2, 3}},
-
- {
- input: "C3820102",
- ptr: new([]byte),
- error: "rlp: input string too long for uint8, decoding into ([]uint8)[0]",
- },
+ {input: "C0", ptr: new([]byte), error: "rlp: expected input string or byte for []uint8"},
+ {input: "8105", ptr: new([]byte), error: "rlp: non-canonical size information for []uint8"},
// byte arrays
- {input: "01", ptr: new([5]byte), value: [5]byte{1}},
- {input: "80", ptr: new([5]byte), value: [5]byte{}},
+ {input: "02", ptr: new([1]byte), value: [1]byte{2}},
{input: "850102030405", ptr: new([5]byte), value: [5]byte{1, 2, 3, 4, 5}},
- {input: "C0", ptr: new([5]byte), value: [5]byte{}},
- {input: "C3010203", ptr: new([5]byte), value: [5]byte{1, 2, 3, 0, 0}},
- {
- input: "C3820102",
- ptr: new([5]byte),
- error: "rlp: input string too long for uint8, decoding into ([5]uint8)[0]",
- },
- {
- input: "86010203040506",
- ptr: new([5]byte),
- error: "rlp: input string too long for [5]uint8",
- },
- {
- input: "850101",
- ptr: new([5]byte),
- error: io.ErrUnexpectedEOF.Error(),
- },
-
- // byte array reuse (should be zeroed)
- {input: "850102030405", ptr: &sharedByteArray, value: [5]byte{1, 2, 3, 4, 5}},
- {input: "8101", ptr: &sharedByteArray, value: [5]byte{1}}, // kind: String
- {input: "850102030405", ptr: &sharedByteArray, value: [5]byte{1, 2, 3, 4, 5}},
- {input: "01", ptr: &sharedByteArray, value: [5]byte{1}}, // kind: Byte
- {input: "C3010203", ptr: &sharedByteArray, value: [5]byte{1, 2, 3, 0, 0}},
- {input: "C101", ptr: &sharedByteArray, value: [5]byte{1}}, // kind: List
+ // byte array errors
+ {input: "02", ptr: new([5]byte), error: "rlp: input string too short for [5]uint8"},
+ {input: "80", ptr: new([5]byte), error: "rlp: input string too short for [5]uint8"},
+ {input: "820000", ptr: new([5]byte), error: "rlp: input string too short for [5]uint8"},
+ {input: "C0", ptr: new([5]byte), error: "rlp: expected input string or byte for [5]uint8"},
+ {input: "C3010203", ptr: new([5]byte), error: "rlp: expected input string or byte for [5]uint8"},
+ {input: "86010203040506", ptr: new([5]byte), error: "rlp: input string too long for [5]uint8"},
+ {input: "8105", ptr: new([1]byte), error: "rlp: non-canonical size information for [1]uint8"},
// zero sized byte arrays
{input: "80", ptr: new([0]byte), value: [0]byte{}},
- {input: "C0", ptr: new([0]byte), value: [0]byte{}},
{input: "01", ptr: new([0]byte), error: "rlp: input string too long for [0]uint8"},
{input: "8101", ptr: new([0]byte), error: "rlp: input string too long for [0]uint8"},
@@ -312,20 +353,44 @@ var decodeTests = []decodeTest{
// big ints
{input: "01", ptr: new(*big.Int), value: big.NewInt(1)},
{input: "89FFFFFFFFFFFFFFFFFF", ptr: new(*big.Int), value: veryBigInt},
- {input: "820001", ptr: new(big.Int), error: "rlp: canon int error appends zero's for *big.Int"},
{input: "10", ptr: new(big.Int), value: *big.NewInt(16)}, // non-pointer also works
{input: "C0", ptr: new(*big.Int), error: "rlp: expected input string or byte for *big.Int"},
+ {input: "820001", ptr: new(big.Int), error: "rlp: non-canonical integer (leading zero bytes) for *big.Int"},
+ {input: "8105", ptr: new(big.Int), error: "rlp: non-canonical size information for *big.Int"},
// structs
- {input: "C0", ptr: new(simplestruct), value: simplestruct{0, ""}},
- {input: "C105", ptr: new(simplestruct), value: simplestruct{5, ""}},
- {input: "C50583343434", ptr: new(simplestruct), value: simplestruct{5, "444"}},
{
- input: "C501C302C103",
+ input: "C50583343434",
+ ptr: new(simplestruct),
+ value: simplestruct{5, "444"},
+ },
+ {
+ input: "C601C402C203C0",
ptr: new(recstruct),
value: recstruct{1, &recstruct{2, &recstruct{3, nil}}},
},
+ // struct errors
+ {
+ input: "C0",
+ ptr: new(simplestruct),
+ error: "rlp: too few elements for rlp.simplestruct",
+ },
+ {
+ input: "C105",
+ ptr: new(simplestruct),
+ error: "rlp: too few elements for rlp.simplestruct",
+ },
+ {
+ input: "C7C50583343434C0",
+ ptr: new([]*simplestruct),
+ error: "rlp: too few elements for rlp.simplestruct, decoding into ([]*rlp.simplestruct)[1]",
+ },
+ {
+ input: "83222222",
+ ptr: new(simplestruct),
+ error: "rlp: expected input list for rlp.simplestruct",
+ },
{
input: "C3010101",
ptr: new(simplestruct),
@@ -338,20 +403,16 @@ var decodeTests = []decodeTest{
},
// pointers
- {input: "00", ptr: new(*uint), value: (*uint)(nil)},
- {input: "80", ptr: new(*uint), value: (*uint)(nil)},
- {input: "C0", ptr: new(*uint), value: (*uint)(nil)},
+ {input: "00", ptr: new(*[]byte), value: &[]byte{0}},
+ {input: "80", ptr: new(*uint), value: uintp(0)},
+ {input: "C0", ptr: new(*uint), error: "rlp: expected input string or byte for uint"},
{input: "07", ptr: new(*uint), value: uintp(7)},
- {input: "8108", ptr: new(*uint), value: uintp(8)},
+ {input: "8158", ptr: new(*uint), value: uintp(0x58)},
{input: "C109", ptr: new(*[]uint), value: &[]uint{9}},
{input: "C58403030303", ptr: new(*[][]byte), value: &[][]byte{{3, 3, 3, 3}}},
// check that input position is advanced also for empty values.
- {input: "C3808005", ptr: new([]*uint), value: []*uint{nil, nil, uintp(5)}},
-
- // pointer should be reset to nil
- {input: "05", ptr: sharedPtr, value: uintp(5)},
- {input: "80", ptr: sharedPtr, value: (*uint)(nil)},
+ {input: "C3808005", ptr: new([]*uint), value: []*uint{uintp(0), uintp(0), uintp(5)}},
// interface{}
{input: "00", ptr: new(interface{}), value: []byte{0}},
@@ -401,11 +462,17 @@ func TestDecodeWithByteReader(t *testing.T) {
})
}
-// dumbReader reads from a byte slice but does not
-// implement ReadByte.
-type dumbReader []byte
+// plainReader reads from a byte slice but does not
+// implement ReadByte. It is also not recognized by the
+// size validation. This is useful to test how the decoder
+// behaves on a non-buffered input stream.
+type plainReader []byte
+
+func newPlainReader(b []byte) io.Reader {
+ return (*plainReader)(&b)
+}
-func (r *dumbReader) Read(buf []byte) (n int, err error) {
+func (r *plainReader) Read(buf []byte) (n int, err error) {
if len(*r) == 0 {
return 0, io.EOF
}
@@ -416,15 +483,14 @@ func (r *dumbReader) Read(buf []byte) (n int, err error) {
func TestDecodeWithNonByteReader(t *testing.T) {
runTests(t, func(input []byte, into interface{}) error {
- r := dumbReader(input)
- return Decode(&r, into)
+ return Decode(newPlainReader(input), into)
})
}
func TestDecodeStreamReset(t *testing.T) {
- s := NewStream(nil)
+ s := NewStream(nil, 0)
runTests(t, func(input []byte, into interface{}) error {
- s.Reset(bytes.NewReader(input))
+ s.Reset(bytes.NewReader(input), 0)
return s.Decode(into)
})
}
@@ -516,9 +582,36 @@ func ExampleDecode() {
// Decoded value: rlp.example{A:0xa, B:0x14, private:0x0, String:"foobar"}
}
+func ExampleDecode_structTagNil() {
+ // In this example, we'll use the "nil" struct tag to change
+ // how a pointer-typed field is decoded. The input contains an RLP
+ // list of one element, an empty string.
+ input := []byte{0xC1, 0x80}
+
+ // This type uses the normal rules.
+ // The empty input string is decoded as a pointer to an empty Go string.
+ var normalRules struct {
+ String *string
+ }
+ Decode(bytes.NewReader(input), &normalRules)
+ fmt.Printf("normal: String = %q\n", *normalRules.String)
+
+ // This type uses the struct tag.
+ // The empty input string is decoded as a nil pointer.
+ var withEmptyOK struct {
+ String *string `rlp:"nil"`
+ }
+ Decode(bytes.NewReader(input), &withEmptyOK)
+ fmt.Printf("with nil tag: String = %v\n", withEmptyOK.String)
+
+ // Output:
+ // normal: String = ""
+ // with nil tag: String = <nil>
+}
+
func ExampleStream() {
input, _ := hex.DecodeString("C90A1486666F6F626172")
- s := NewStream(bytes.NewReader(input))
+ s := NewStream(bytes.NewReader(input), 0)
// Check what kind of value lies ahead
kind, size, _ := s.Kind()
diff --git a/rlp/encode.go b/rlp/encode.go
index 6cf6776d6..10ff0ae79 100644
--- a/rlp/encode.go
+++ b/rlp/encode.go
@@ -194,7 +194,7 @@ func (w *encbuf) Write(b []byte) (int, error) {
func (w *encbuf) encode(val interface{}) error {
rval := reflect.ValueOf(val)
- ti, err := cachedTypeInfo(rval.Type())
+ ti, err := cachedTypeInfo(rval.Type(), tags{})
if err != nil {
return err
}
@@ -485,7 +485,7 @@ func writeInterface(val reflect.Value, w *encbuf) error {
return nil
}
eval := val.Elem()
- ti, err := cachedTypeInfo(eval.Type())
+ ti, err := cachedTypeInfo(eval.Type(), tags{})
if err != nil {
return err
}
@@ -493,7 +493,7 @@ func writeInterface(val reflect.Value, w *encbuf) error {
}
func makeSliceWriter(typ reflect.Type) (writer, error) {
- etypeinfo, err := cachedTypeInfo1(typ.Elem())
+ etypeinfo, err := cachedTypeInfo1(typ.Elem(), tags{})
if err != nil {
return nil, err
}
@@ -530,7 +530,7 @@ func makeStructWriter(typ reflect.Type) (writer, error) {
}
func makePtrWriter(typ reflect.Type) (writer, error) {
- etypeinfo, err := cachedTypeInfo1(typ.Elem())
+ etypeinfo, err := cachedTypeInfo1(typ.Elem(), tags{})
if err != nil {
return nil, err
}
diff --git a/rlp/typecache.go b/rlp/typecache.go
index 398f25d90..d512012e9 100644
--- a/rlp/typecache.go
+++ b/rlp/typecache.go
@@ -7,7 +7,7 @@ import (
var (
typeCacheMutex sync.RWMutex
- typeCache = make(map[reflect.Type]*typeinfo)
+ typeCache = make(map[typekey]*typeinfo)
)
type typeinfo struct {
@@ -15,13 +15,25 @@ type typeinfo struct {
writer
}
+// represents struct tags
+type tags struct {
+ nilOK bool
+}
+
+type typekey struct {
+ reflect.Type
+ // the key must include the struct tags because they
+ // might generate a different decoder.
+ tags
+}
+
type decoder func(*Stream, reflect.Value) error
type writer func(reflect.Value, *encbuf) error
-func cachedTypeInfo(typ reflect.Type) (*typeinfo, error) {
+func cachedTypeInfo(typ reflect.Type, tags tags) (*typeinfo, error) {
typeCacheMutex.RLock()
- info := typeCache[typ]
+ info := typeCache[typekey{typ, tags}]
typeCacheMutex.RUnlock()
if info != nil {
return info, nil
@@ -29,11 +41,12 @@ func cachedTypeInfo(typ reflect.Type) (*typeinfo, error) {
// not in the cache, need to generate info for this type.
typeCacheMutex.Lock()
defer typeCacheMutex.Unlock()
- return cachedTypeInfo1(typ)
+ return cachedTypeInfo1(typ, tags)
}
-func cachedTypeInfo1(typ reflect.Type) (*typeinfo, error) {
- info := typeCache[typ]
+func cachedTypeInfo1(typ reflect.Type, tags tags) (*typeinfo, error) {
+ key := typekey{typ, tags}
+ info := typeCache[key]
if info != nil {
// another goroutine got the write lock first
return info, nil
@@ -41,21 +54,27 @@ func cachedTypeInfo1(typ reflect.Type) (*typeinfo, error) {
// put a dummmy value into the cache before generating.
// if the generator tries to lookup itself, it will get
// the dummy value and won't call itself recursively.
- typeCache[typ] = new(typeinfo)
- info, err := genTypeInfo(typ)
+ typeCache[key] = new(typeinfo)
+ info, err := genTypeInfo(typ, tags)
if err != nil {
// remove the dummy value if the generator fails
- delete(typeCache, typ)
+ delete(typeCache, key)
return nil, err
}
- *typeCache[typ] = *info
- return typeCache[typ], err
+ *typeCache[key] = *info
+ return typeCache[key], err
+}
+
+type field struct {
+ index int
+ info *typeinfo
}
func structFields(typ reflect.Type) (fields []field, err error) {
for i := 0; i < typ.NumField(); i++ {
if f := typ.Field(i); f.PkgPath == "" { // exported
- info, err := cachedTypeInfo1(f.Type)
+ tags := parseStructTag(f.Tag.Get("rlp"))
+ info, err := cachedTypeInfo1(f.Type, tags)
if err != nil {
return nil, err
}
@@ -65,9 +84,13 @@ func structFields(typ reflect.Type) (fields []field, err error) {
return fields, nil
}
-func genTypeInfo(typ reflect.Type) (info *typeinfo, err error) {
+func parseStructTag(tag string) tags {
+ return tags{nilOK: tag == "nil"}
+}
+
+func genTypeInfo(typ reflect.Type, tags tags) (info *typeinfo, err error) {
info = new(typeinfo)
- if info.decoder, err = makeDecoder(typ); err != nil {
+ if info.decoder, err = makeDecoder(typ, tags); err != nil {
return nil, err
}
if info.writer, err = makeWriter(typ); err != nil {
diff --git a/whisper/envelope.go b/whisper/envelope.go
index 0a817e26e..07762c300 100644
--- a/whisper/envelope.go
+++ b/whisper/envelope.go
@@ -109,16 +109,17 @@ func (self *Envelope) Hash() common.Hash {
return self.hash
}
-// rlpenv is an Envelope but is not an rlp.Decoder.
-// It is used for decoding because we need to
-type rlpenv Envelope
-
// DecodeRLP decodes an Envelope from an RLP data stream.
func (self *Envelope) DecodeRLP(s *rlp.Stream) error {
raw, err := s.Raw()
if err != nil {
return err
}
+ // The decoding of Envelope uses the struct fields but also needs
+ // to compute the hash of the whole RLP-encoded envelope. This
+ // type has the same structure as Envelope but is not an
+ // rlp.Decoder so we can reuse the Envelope struct definition.
+ type rlpenv Envelope
if err := rlp.DecodeBytes(raw, (*rlpenv)(self)); err != nil {
return err
}
diff --git a/whisper/peer.go b/whisper/peer.go
index e4301f37c..28abf4260 100644
--- a/whisper/peer.go
+++ b/whisper/peer.go
@@ -66,7 +66,7 @@ func (self *peer) handshake() error {
if packet.Code != statusCode {
return fmt.Errorf("peer sent %x before status packet", packet.Code)
}
- s := rlp.NewStream(packet.Payload)
+ s := rlp.NewStream(packet.Payload, uint64(packet.Size))
if _, err := s.List(); err != nil {
return fmt.Errorf("bad status message: %v", err)
}