aboutsummaryrefslogtreecommitdiffstats
path: root/dex
diff options
context:
space:
mode:
authorSonic <sonic@cobinhood.com>2018-09-19 13:49:30 +0800
committerWei-Ning Huang <w@dexon.org>2019-04-09 21:32:48 +0800
commit46211993755749081d85561595baca388e663310 (patch)
tree714e482bc71106f677762134bdd850b36ad73a3c /dex
parentfaa3e4f294a69d4eddbba1e7888c932297f35225 (diff)
downloadgo-tangerine-46211993755749081d85561595baca388e663310.tar
go-tangerine-46211993755749081d85561595baca388e663310.tar.gz
go-tangerine-46211993755749081d85561595baca388e663310.tar.bz2
go-tangerine-46211993755749081d85561595baca388e663310.tar.lz
go-tangerine-46211993755749081d85561595baca388e663310.tar.xz
go-tangerine-46211993755749081d85561595baca388e663310.tar.zst
go-tangerine-46211993755749081d85561595baca388e663310.zip
Copy codebase from eth
Diffstat (limited to 'dex')
-rw-r--r--dex/config.go135
-rw-r--r--dex/handler.go787
-rw-r--r--dex/helper_test.go201
-rw-r--r--dex/metrics.go139
-rw-r--r--dex/peer.go522
-rw-r--r--dex/protocol.go183
-rw-r--r--dex/protocol_test.go223
-rw-r--r--dex/sync.go217
8 files changed, 2407 insertions, 0 deletions
diff --git a/dex/config.go b/dex/config.go
new file mode 100644
index 000000000..375fbcc3c
--- /dev/null
+++ b/dex/config.go
@@ -0,0 +1,135 @@
+// Copyright 2017 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "math/big"
+ "os"
+ "os/user"
+ "path/filepath"
+ "runtime"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/common/hexutil"
+ "github.com/ethereum/go-ethereum/consensus/ethash"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/eth/gasprice"
+ "github.com/ethereum/go-ethereum/params"
+)
+
+// DefaultConfig contains default settings for use on the Ethereum main net.
+var DefaultConfig = Config{
+ SyncMode: downloader.FastSync,
+ Ethash: ethash.Config{
+ CacheDir: "ethash",
+ CachesInMem: 2,
+ CachesOnDisk: 3,
+ DatasetsInMem: 1,
+ DatasetsOnDisk: 2,
+ },
+ NetworkId: 1,
+ LightPeers: 100,
+ DatabaseCache: 768,
+ TrieCleanCache: 256,
+ TrieDirtyCache: 256,
+ TrieTimeout: 60 * time.Minute,
+ MinerGasFloor: 8000000,
+ MinerGasCeil: 8000000,
+ MinerGasPrice: big.NewInt(params.GWei),
+ MinerRecommit: 3 * time.Second,
+
+ TxPool: core.DefaultTxPoolConfig,
+ GPO: gasprice.Config{
+ Blocks: 20,
+ Percentile: 60,
+ },
+}
+
+func init() {
+ home := os.Getenv("HOME")
+ if home == "" {
+ if user, err := user.Current(); err == nil {
+ home = user.HomeDir
+ }
+ }
+ if runtime.GOOS == "windows" {
+ DefaultConfig.Ethash.DatasetDir = filepath.Join(home, "AppData", "Ethash")
+ } else {
+ DefaultConfig.Ethash.DatasetDir = filepath.Join(home, ".ethash")
+ }
+}
+
+//go:generate gencodec -type Config -field-override configMarshaling -formats toml -out gen_config.go
+
+type Config struct {
+ // The genesis block, which is inserted if the database is empty.
+ // If nil, the Ethereum main net block is used.
+ Genesis *core.Genesis `toml:",omitempty"`
+
+ // Protocol options
+ NetworkId uint64 // Network ID to use for selecting peers to connect to
+ SyncMode downloader.SyncMode
+ NoPruning bool
+
+ // Light client options
+ LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests
+ LightPeers int `toml:",omitempty"` // Maximum number of LES client peers
+
+ // Database options
+ SkipBcVersionCheck bool `toml:"-"`
+ DatabaseHandles int `toml:"-"`
+ DatabaseCache int
+ TrieCleanCache int
+ TrieDirtyCache int
+ TrieTimeout time.Duration
+
+ // Mining-related options
+ Etherbase common.Address `toml:",omitempty"`
+ MinerNotify []string `toml:",omitempty"`
+ MinerExtraData []byte `toml:",omitempty"`
+ MinerGasFloor uint64
+ MinerGasCeil uint64
+ MinerGasPrice *big.Int
+ MinerRecommit time.Duration
+ MinerNoverify bool
+
+ // Ethash options
+ Ethash ethash.Config
+
+ // Transaction pool options
+ TxPool core.TxPoolConfig
+
+ // Gas Price Oracle options
+ GPO gasprice.Config
+
+ // Enables tracking of SHA3 preimages in the VM
+ EnablePreimageRecording bool
+
+ // Miscellaneous options
+ DocRoot string `toml:"-"`
+
+ // Type of the EWASM interpreter ("" for detault)
+ EWASMInterpreter string
+ // Type of the EVM interpreter ("" for default)
+ EVMInterpreter string
+}
+
+type configMarshaling struct {
+ MinerExtraData hexutil.Bytes
+}
diff --git a/dex/handler.go b/dex/handler.go
new file mode 100644
index 000000000..9f4522627
--- /dev/null
+++ b/dex/handler.go
@@ -0,0 +1,787 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "math"
+ "math/big"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/dexon-foundation/dexon/p2p/enode"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/consensus"
+ "github.com/ethereum/go-ethereum/consensus/misc"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/eth/fetcher"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/params"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+const (
+ softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data.
+ estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header
+
+ // txChanSize is the size of channel listening to NewTxsEvent.
+ // The number is referenced from the size of tx pool.
+ txChanSize = 4096
+)
+
+var (
+ daoChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the DAO handshake challenge
+)
+
+// errIncompatibleConfig is returned if the requested protocols and configs are
+// not compatible (low protocol version restrictions and high requirements).
+var errIncompatibleConfig = errors.New("incompatible configuration")
+
+func errResp(code errCode, format string, v ...interface{}) error {
+ return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
+}
+
+type ProtocolManager struct {
+ networkID uint64
+
+ fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks)
+ acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing)
+
+ txpool txPool
+ blockchain *core.BlockChain
+ chainconfig *params.ChainConfig
+ maxPeers int
+
+ downloader *downloader.Downloader
+ fetcher *fetcher.Fetcher
+ peers *peerSet
+
+ SubProtocols []p2p.Protocol
+
+ eventMux *event.TypeMux
+ txsCh chan core.NewTxsEvent
+ txsSub event.Subscription
+ minedBlockSub *event.TypeMuxSubscription
+
+ // channels for fetcher, syncer, txsyncLoop
+ newPeerCh chan *peer
+ txsyncCh chan *txsync
+ quitSync chan struct{}
+ noMorePeers chan struct{}
+
+ // wait group is used for graceful shutdowns during downloading
+ // and processing
+ wg sync.WaitGroup
+}
+
+// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
+// with the Ethereum network.
+func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
+ // Create the protocol manager with the base fields
+ manager := &ProtocolManager{
+ networkID: networkID,
+ eventMux: mux,
+ txpool: txpool,
+ blockchain: blockchain,
+ chainconfig: config,
+ peers: newPeerSet(),
+ newPeerCh: make(chan *peer),
+ noMorePeers: make(chan struct{}),
+ txsyncCh: make(chan *txsync),
+ quitSync: make(chan struct{}),
+ }
+ // Figure out whether to allow fast sync or not
+ if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
+ log.Warn("Blockchain not empty, fast sync disabled")
+ mode = downloader.FullSync
+ }
+ if mode == downloader.FastSync {
+ manager.fastSync = uint32(1)
+ }
+ // Initiate a sub-protocol for every implemented version we can handle
+ manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
+ for i, version := range ProtocolVersions {
+ // Skip protocol version if incompatible with the mode of operation
+ if mode == downloader.FastSync && version < eth63 {
+ continue
+ }
+ // Compatible; initialise the sub-protocol
+ version := version // Closure for the run
+ manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
+ Name: ProtocolName,
+ Version: version,
+ Length: ProtocolLengths[i],
+ Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
+ peer := manager.newPeer(int(version), p, rw)
+ select {
+ case manager.newPeerCh <- peer:
+ manager.wg.Add(1)
+ defer manager.wg.Done()
+ return manager.handle(peer)
+ case <-manager.quitSync:
+ return p2p.DiscQuitting
+ }
+ },
+ NodeInfo: func() interface{} {
+ return manager.NodeInfo()
+ },
+ PeerInfo: func(id enode.ID) interface{} {
+ if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
+ return p.Info()
+ }
+ return nil
+ },
+ })
+ }
+ if len(manager.SubProtocols) == 0 {
+ return nil, errIncompatibleConfig
+ }
+ // Construct the different synchronisation mechanisms
+ manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
+
+ validator := func(header *types.Header) error {
+ return engine.VerifyHeader(blockchain, header, true)
+ }
+ heighter := func() uint64 {
+ return blockchain.CurrentBlock().NumberU64()
+ }
+ inserter := func(blocks types.Blocks) (int, error) {
+ // If fast sync is running, deny importing weird blocks
+ if atomic.LoadUint32(&manager.fastSync) == 1 {
+ log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
+ return 0, nil
+ }
+ atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
+ return manager.blockchain.InsertChain(blocks)
+ }
+ manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
+
+ return manager, nil
+}
+
+func (pm *ProtocolManager) removePeer(id string) {
+ // Short circuit if the peer was already removed
+ peer := pm.peers.Peer(id)
+ if peer == nil {
+ return
+ }
+ log.Debug("Removing Ethereum peer", "peer", id)
+
+ // Unregister the peer from the downloader and Ethereum peer set
+ pm.downloader.UnregisterPeer(id)
+ if err := pm.peers.Unregister(id); err != nil {
+ log.Error("Peer removal failed", "peer", id, "err", err)
+ }
+ // Hard disconnect at the networking layer
+ if peer != nil {
+ peer.Peer.Disconnect(p2p.DiscUselessPeer)
+ }
+}
+
+func (pm *ProtocolManager) Start(maxPeers int) {
+ pm.maxPeers = maxPeers
+
+ // broadcast transactions
+ pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
+ pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
+ go pm.txBroadcastLoop()
+
+ // broadcast mined blocks
+ pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
+ go pm.minedBroadcastLoop()
+
+ // start sync handlers
+ go pm.syncer()
+ go pm.txsyncLoop()
+}
+
+func (pm *ProtocolManager) Stop() {
+ log.Info("Stopping Ethereum protocol")
+
+ pm.txsSub.Unsubscribe() // quits txBroadcastLoop
+ pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
+
+ // Quit the sync loop.
+ // After this send has completed, no new peers will be accepted.
+ pm.noMorePeers <- struct{}{}
+
+ // Quit fetcher, txsyncLoop.
+ close(pm.quitSync)
+
+ // Disconnect existing sessions.
+ // This also closes the gate for any new registrations on the peer set.
+ // sessions which are already established but not added to pm.peers yet
+ // will exit when they try to register.
+ pm.peers.Close()
+
+ // Wait for all peer handler goroutines and the loops to come down.
+ pm.wg.Wait()
+
+ log.Info("Ethereum protocol stopped")
+}
+
+func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
+ return newPeer(pv, p, newMeteredMsgWriter(rw))
+}
+
+// handle is the callback invoked to manage the life cycle of an eth peer. When
+// this function terminates, the peer is disconnected.
+func (pm *ProtocolManager) handle(p *peer) error {
+ // Ignore maxPeers if this is a trusted peer
+ if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted {
+ return p2p.DiscTooManyPeers
+ }
+ p.Log().Debug("Ethereum peer connected", "name", p.Name())
+
+ // Execute the Ethereum handshake
+ var (
+ genesis = pm.blockchain.Genesis()
+ head = pm.blockchain.CurrentHeader()
+ hash = head.Hash()
+ number = head.Number.Uint64()
+ td = pm.blockchain.GetTd(hash, number)
+ )
+ if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil {
+ p.Log().Debug("Ethereum handshake failed", "err", err)
+ return err
+ }
+ if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
+ rw.Init(p.version)
+ }
+ // Register the peer locally
+ if err := pm.peers.Register(p); err != nil {
+ p.Log().Error("Ethereum peer registration failed", "err", err)
+ return err
+ }
+ defer pm.removePeer(p.id)
+
+ // Register the peer in the downloader. If the downloader considers it banned, we disconnect
+ if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil {
+ return err
+ }
+ // Propagate existing transactions. new transactions appearing
+ // after this will be sent via broadcasts.
+ pm.syncTransactions(p)
+
+ // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork
+ if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil {
+ // Request the peer's DAO fork header for extra-data validation
+ if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil {
+ return err
+ }
+ // Start a timer to disconnect if the peer doesn't reply in time
+ p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() {
+ p.Log().Debug("Timed out DAO fork-check, dropping")
+ pm.removePeer(p.id)
+ })
+ // Make sure it's cleaned up if the peer dies off
+ defer func() {
+ if p.forkDrop != nil {
+ p.forkDrop.Stop()
+ p.forkDrop = nil
+ }
+ }()
+ }
+ // main loop. handle incoming messages.
+ for {
+ if err := pm.handleMsg(p); err != nil {
+ p.Log().Debug("Ethereum message handling failed", "err", err)
+ return err
+ }
+ }
+}
+
+// handleMsg is invoked whenever an inbound message is received from a remote
+// peer. The remote connection is torn down upon returning any error.
+func (pm *ProtocolManager) handleMsg(p *peer) error {
+ // Read the next message from the remote peer, and ensure it's fully consumed
+ msg, err := p.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if msg.Size > ProtocolMaxMsgSize {
+ return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
+ }
+ defer msg.Discard()
+
+ // Handle the message depending on its contents
+ switch {
+ case msg.Code == StatusMsg:
+ // Status messages should never arrive after the handshake
+ return errResp(ErrExtraStatusMsg, "uncontrolled status message")
+
+ // Block header query, collect the requested headers and reply
+ case msg.Code == GetBlockHeadersMsg:
+ // Decode the complex header query
+ var query getBlockHeadersData
+ if err := msg.Decode(&query); err != nil {
+ return errResp(ErrDecode, "%v: %v", msg, err)
+ }
+ hashMode := query.Origin.Hash != (common.Hash{})
+ first := true
+ maxNonCanonical := uint64(100)
+
+ // Gather headers until the fetch or network limits is reached
+ var (
+ bytes common.StorageSize
+ headers []*types.Header
+ unknown bool
+ )
+ for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch {
+ // Retrieve the next header satisfying the query
+ var origin *types.Header
+ if hashMode {
+ if first {
+ first = false
+ origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash)
+ if origin != nil {
+ query.Origin.Number = origin.Number.Uint64()
+ }
+ } else {
+ origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number)
+ }
+ } else {
+ origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number)
+ }
+ if origin == nil {
+ break
+ }
+ headers = append(headers, origin)
+ bytes += estHeaderRlpSize
+
+ // Advance to the next header of the query
+ switch {
+ case hashMode && query.Reverse:
+ // Hash based traversal towards the genesis block
+ ancestor := query.Skip + 1
+ if ancestor == 0 {
+ unknown = true
+ } else {
+ query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical)
+ unknown = (query.Origin.Hash == common.Hash{})
+ }
+ case hashMode && !query.Reverse:
+ // Hash based traversal towards the leaf block
+ var (
+ current = origin.Number.Uint64()
+ next = current + query.Skip + 1
+ )
+ if next <= current {
+ infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ")
+ p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos)
+ unknown = true
+ } else {
+ if header := pm.blockchain.GetHeaderByNumber(next); header != nil {
+ nextHash := header.Hash()
+ expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical)
+ if expOldHash == query.Origin.Hash {
+ query.Origin.Hash, query.Origin.Number = nextHash, next
+ } else {
+ unknown = true
+ }
+ } else {
+ unknown = true
+ }
+ }
+ case query.Reverse:
+ // Number based traversal towards the genesis block
+ if query.Origin.Number >= query.Skip+1 {
+ query.Origin.Number -= query.Skip + 1
+ } else {
+ unknown = true
+ }
+
+ case !query.Reverse:
+ // Number based traversal towards the leaf block
+ query.Origin.Number += query.Skip + 1
+ }
+ }
+ return p.SendBlockHeaders(headers)
+
+ case msg.Code == BlockHeadersMsg:
+ // A batch of headers arrived to one of our previous requests
+ var headers []*types.Header
+ if err := msg.Decode(&headers); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // If no headers were received, but we're expending a DAO fork check, maybe it's that
+ if len(headers) == 0 && p.forkDrop != nil {
+ // Possibly an empty reply to the fork header checks, sanity check TDs
+ verifyDAO := true
+
+ // If we already have a DAO header, we can check the peer's TD against it. If
+ // the peer's ahead of this, it too must have a reply to the DAO check
+ if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil {
+ if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 {
+ verifyDAO = false
+ }
+ }
+ // If we're seemingly on the same chain, disable the drop timer
+ if verifyDAO {
+ p.Log().Debug("Seems to be on the same side of the DAO fork")
+ p.forkDrop.Stop()
+ p.forkDrop = nil
+ return nil
+ }
+ }
+ // Filter out any explicitly requested headers, deliver the rest to the downloader
+ filter := len(headers) == 1
+ if filter {
+ // If it's a potential DAO fork check, validate against the rules
+ if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 {
+ // Disable the fork drop timer
+ p.forkDrop.Stop()
+ p.forkDrop = nil
+
+ // Validate the header and either drop the peer or continue
+ if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil {
+ p.Log().Debug("Verified to be on the other side of the DAO fork, dropping")
+ return err
+ }
+ p.Log().Debug("Verified to be on the same side of the DAO fork")
+ return nil
+ }
+ // Irrelevant of the fork checks, send the header to the fetcher just in case
+ headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
+ }
+ if len(headers) > 0 || !filter {
+ err := pm.downloader.DeliverHeaders(p.id, headers)
+ if err != nil {
+ log.Debug("Failed to deliver headers", "err", err)
+ }
+ }
+
+ case msg.Code == GetBlockBodiesMsg:
+ // Decode the retrieval message
+ msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+ // Gather blocks until the fetch or network limits is reached
+ var (
+ hash common.Hash
+ bytes int
+ bodies []rlp.RawValue
+ )
+ for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch {
+ // Retrieve the hash of the next block
+ if err := msgStream.Decode(&hash); err == rlp.EOL {
+ break
+ } else if err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Retrieve the requested block body, stopping if enough was found
+ if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 {
+ bodies = append(bodies, data)
+ bytes += len(data)
+ }
+ }
+ return p.SendBlockBodiesRLP(bodies)
+
+ case msg.Code == BlockBodiesMsg:
+ // A batch of block bodies arrived to one of our previous requests
+ var request blockBodiesData
+ if err := msg.Decode(&request); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Deliver them all to the downloader for queuing
+ transactions := make([][]*types.Transaction, len(request))
+ uncles := make([][]*types.Header, len(request))
+
+ for i, body := range request {
+ transactions[i] = body.Transactions
+ uncles[i] = body.Uncles
+ }
+ // Filter out any explicitly requested bodies, deliver the rest to the downloader
+ filter := len(transactions) > 0 || len(uncles) > 0
+ if filter {
+ transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
+ }
+ if len(transactions) > 0 || len(uncles) > 0 || !filter {
+ err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
+ if err != nil {
+ log.Debug("Failed to deliver bodies", "err", err)
+ }
+ }
+
+ case p.version >= eth63 && msg.Code == GetNodeDataMsg:
+ // Decode the retrieval message
+ msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+ // Gather state data until the fetch or network limits is reached
+ var (
+ hash common.Hash
+ bytes int
+ data [][]byte
+ )
+ for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch {
+ // Retrieve the hash of the next state entry
+ if err := msgStream.Decode(&hash); err == rlp.EOL {
+ break
+ } else if err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Retrieve the requested state entry, stopping if enough was found
+ if entry, err := pm.blockchain.TrieNode(hash); err == nil {
+ data = append(data, entry)
+ bytes += len(entry)
+ }
+ }
+ return p.SendNodeData(data)
+
+ case p.version >= eth63 && msg.Code == NodeDataMsg:
+ // A batch of node state data arrived to one of our previous requests
+ var data [][]byte
+ if err := msg.Decode(&data); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Deliver all to the downloader
+ if err := pm.downloader.DeliverNodeData(p.id, data); err != nil {
+ log.Debug("Failed to deliver node state data", "err", err)
+ }
+
+ case p.version >= eth63 && msg.Code == GetReceiptsMsg:
+ // Decode the retrieval message
+ msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+ // Gather state data until the fetch or network limits is reached
+ var (
+ hash common.Hash
+ bytes int
+ receipts []rlp.RawValue
+ )
+ for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch {
+ // Retrieve the hash of the next block
+ if err := msgStream.Decode(&hash); err == rlp.EOL {
+ break
+ } else if err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Retrieve the requested block's receipts, skipping if unknown to us
+ results := pm.blockchain.GetReceiptsByHash(hash)
+ if results == nil {
+ if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash {
+ continue
+ }
+ }
+ // If known, encode and queue for response packet
+ if encoded, err := rlp.EncodeToBytes(results); err != nil {
+ log.Error("Failed to encode receipt", "err", err)
+ } else {
+ receipts = append(receipts, encoded)
+ bytes += len(encoded)
+ }
+ }
+ return p.SendReceiptsRLP(receipts)
+
+ case p.version >= eth63 && msg.Code == ReceiptsMsg:
+ // A batch of receipts arrived to one of our previous requests
+ var receipts [][]*types.Receipt
+ if err := msg.Decode(&receipts); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Deliver all to the downloader
+ if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil {
+ log.Debug("Failed to deliver receipts", "err", err)
+ }
+
+ case msg.Code == NewBlockHashesMsg:
+ var announces newBlockHashesData
+ if err := msg.Decode(&announces); err != nil {
+ return errResp(ErrDecode, "%v: %v", msg, err)
+ }
+ // Mark the hashes as present at the remote node
+ for _, block := range announces {
+ p.MarkBlock(block.Hash)
+ }
+ // Schedule all the unknown hashes for retrieval
+ unknown := make(newBlockHashesData, 0, len(announces))
+ for _, block := range announces {
+ if !pm.blockchain.HasBlock(block.Hash, block.Number) {
+ unknown = append(unknown, block)
+ }
+ }
+ for _, block := range unknown {
+ pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
+ }
+
+ case msg.Code == NewBlockMsg:
+ // Retrieve and decode the propagated block
+ var request newBlockData
+ if err := msg.Decode(&request); err != nil {
+ return errResp(ErrDecode, "%v: %v", msg, err)
+ }
+ request.Block.ReceivedAt = msg.ReceivedAt
+ request.Block.ReceivedFrom = p
+
+ // Mark the peer as owning the block and schedule it for import
+ p.MarkBlock(request.Block.Hash())
+ pm.fetcher.Enqueue(p.id, request.Block)
+
+ // Assuming the block is importable by the peer, but possibly not yet done so,
+ // calculate the head hash and TD that the peer truly must have.
+ var (
+ trueHead = request.Block.ParentHash()
+ trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty())
+ )
+ // Update the peers total difficulty if better than the previous
+ if _, td := p.Head(); trueTD.Cmp(td) > 0 {
+ p.SetHead(trueHead, trueTD)
+
+ // Schedule a sync if above ours. Note, this will not fire a sync for a gap of
+ // a singe block (as the true TD is below the propagated block), however this
+ // scenario should easily be covered by the fetcher.
+ currentBlock := pm.blockchain.CurrentBlock()
+ if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 {
+ go pm.synchronise(p)
+ }
+ }
+
+ case msg.Code == TxMsg:
+ // Transactions arrived, make sure we have a valid and fresh chain to handle them
+ if atomic.LoadUint32(&pm.acceptTxs) == 0 {
+ break
+ }
+ // Transactions can be processed, parse all of them and deliver to the pool
+ var txs []*types.Transaction
+ if err := msg.Decode(&txs); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ for i, tx := range txs {
+ // Validate and mark the remote transaction
+ if tx == nil {
+ return errResp(ErrDecode, "transaction %d is nil", i)
+ }
+ p.MarkTransaction(tx.Hash())
+ }
+ pm.txpool.AddRemotes(txs)
+
+ default:
+ return errResp(ErrInvalidMsgCode, "%v", msg.Code)
+ }
+ return nil
+}
+
+// BroadcastBlock will either propagate a block to a subset of it's peers, or
+// will only announce it's availability (depending what's requested).
+func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
+ hash := block.Hash()
+ peers := pm.peers.PeersWithoutBlock(hash)
+
+ // If propagation is requested, send to a subset of the peer
+ if propagate {
+ // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
+ var td *big.Int
+ if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
+ td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
+ } else {
+ log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
+ return
+ }
+ // Send the block to a subset of our peers
+ transfer := peers[:int(math.Sqrt(float64(len(peers))))]
+ for _, peer := range transfer {
+ peer.AsyncSendNewBlock(block, td)
+ }
+ log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
+ return
+ }
+ // Otherwise if the block is indeed in out own chain, announce it
+ if pm.blockchain.HasBlock(hash, block.NumberU64()) {
+ for _, peer := range peers {
+ peer.AsyncSendNewBlockHash(block)
+ }
+ log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
+ }
+}
+
+// BroadcastTxs will propagate a batch of transactions to all peers which are not known to
+// already have the given transaction.
+func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
+ var txset = make(map[*peer]types.Transactions)
+
+ // Broadcast transactions to a batch of peers not knowing about it
+ for _, tx := range txs {
+ peers := pm.peers.PeersWithoutTx(tx.Hash())
+ for _, peer := range peers {
+ txset[peer] = append(txset[peer], tx)
+ }
+ log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
+ }
+ // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
+ for peer, txs := range txset {
+ peer.AsyncSendTransactions(txs)
+ }
+}
+
+// Mined broadcast loop
+func (pm *ProtocolManager) minedBroadcastLoop() {
+ // automatically stops if unsubscribe
+ for obj := range pm.minedBlockSub.Chan() {
+ if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
+ pm.BroadcastBlock(ev.Block, true) // First propagate block to peers
+ pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest
+ }
+ }
+}
+
+func (pm *ProtocolManager) txBroadcastLoop() {
+ for {
+ select {
+ case event := <-pm.txsCh:
+ pm.BroadcastTxs(event.Txs)
+
+ // Err() channel will be closed when unsubscribing.
+ case <-pm.txsSub.Err():
+ return
+ }
+ }
+}
+
+// NodeInfo represents a short summary of the Ethereum sub-protocol metadata
+// known about the host peer.
+type NodeInfo struct {
+ Network uint64 `json:"network"` // Ethereum network ID (1=Frontier, 2=Morden, Ropsten=3, Rinkeby=4)
+ Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain
+ Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block
+ Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules
+ Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block
+}
+
+// NodeInfo retrieves some protocol metadata about the running host node.
+func (pm *ProtocolManager) NodeInfo() *NodeInfo {
+ currentBlock := pm.blockchain.CurrentBlock()
+ return &NodeInfo{
+ Network: pm.networkID,
+ Difficulty: pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()),
+ Genesis: pm.blockchain.Genesis().Hash(),
+ Config: pm.blockchain.Config(),
+ Head: currentBlock.Hash(),
+ }
+}
diff --git a/dex/helper_test.go b/dex/helper_test.go
new file mode 100644
index 000000000..3d2ab0aba
--- /dev/null
+++ b/dex/helper_test.go
@@ -0,0 +1,201 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+// This file contains some shares testing functionality, common to multiple
+// different files and modules being tested.
+
+package eth
+
+import (
+ "crypto/ecdsa"
+ "crypto/rand"
+ "math/big"
+ "sort"
+ "sync"
+ "testing"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/consensus/ethash"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/core/vm"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/ethdb"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+ "github.com/ethereum/go-ethereum/params"
+)
+
+var (
+ testBankKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+ testBank = crypto.PubkeyToAddress(testBankKey.PublicKey)
+)
+
+// newTestProtocolManager creates a new protocol manager for testing purposes,
+// with the given number of blocks already known, and potential notification
+// channels for different events.
+func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func(int, *core.BlockGen), newtx chan<- []*types.Transaction) (*ProtocolManager, *ethdb.MemDatabase, error) {
+ var (
+ evmux = new(event.TypeMux)
+ engine = ethash.NewFaker()
+ db = ethdb.NewMemDatabase()
+ gspec = &core.Genesis{
+ Config: params.TestChainConfig,
+ Alloc: core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000)}},
+ }
+ genesis = gspec.MustCommit(db)
+ blockchain, _ = core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{})
+ )
+ chain, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, blocks, generator)
+ if _, err := blockchain.InsertChain(chain); err != nil {
+ panic(err)
+ }
+
+ pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db)
+ if err != nil {
+ return nil, nil, err
+ }
+ pm.Start(1000)
+ return pm, db, nil
+}
+
+// newTestProtocolManagerMust creates a new protocol manager for testing purposes,
+// with the given number of blocks already known, and potential notification
+// channels for different events. In case of an error, the constructor force-
+// fails the test.
+func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks int, generator func(int, *core.BlockGen), newtx chan<- []*types.Transaction) (*ProtocolManager, *ethdb.MemDatabase) {
+ pm, db, err := newTestProtocolManager(mode, blocks, generator, newtx)
+ if err != nil {
+ t.Fatalf("Failed to create protocol manager: %v", err)
+ }
+ return pm, db
+}
+
+// testTxPool is a fake, helper transaction pool for testing purposes
+type testTxPool struct {
+ txFeed event.Feed
+ pool []*types.Transaction // Collection of all transactions
+ added chan<- []*types.Transaction // Notification channel for new transactions
+
+ lock sync.RWMutex // Protects the transaction pool
+}
+
+// AddRemotes appends a batch of transactions to the pool, and notifies any
+// listeners if the addition channel is non nil
+func (p *testTxPool) AddRemotes(txs []*types.Transaction) []error {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ p.pool = append(p.pool, txs...)
+ if p.added != nil {
+ p.added <- txs
+ }
+ return make([]error, len(txs))
+}
+
+// Pending returns all the transactions known to the pool
+func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ batches := make(map[common.Address]types.Transactions)
+ for _, tx := range p.pool {
+ from, _ := types.Sender(types.HomesteadSigner{}, tx)
+ batches[from] = append(batches[from], tx)
+ }
+ for _, batch := range batches {
+ sort.Sort(types.TxByNonce(batch))
+ }
+ return batches, nil
+}
+
+func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
+ return p.txFeed.Subscribe(ch)
+}
+
+// newTestTransaction create a new dummy transaction.
+func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction {
+ tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), 100000, big.NewInt(0), make([]byte, datasize))
+ tx, _ = types.SignTx(tx, types.HomesteadSigner{}, from)
+ return tx
+}
+
+// testPeer is a simulated peer to allow testing direct network calls.
+type testPeer struct {
+ net p2p.MsgReadWriter // Network layer reader/writer to simulate remote messaging
+ app *p2p.MsgPipeRW // Application layer reader/writer to simulate the local side
+ *peer
+}
+
+// newTestPeer creates a new peer registered at the given protocol manager.
+func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*testPeer, <-chan error) {
+ // Create a message pipe to communicate through
+ app, net := p2p.MsgPipe()
+
+ // Generate a random id and create the peer
+ var id discover.NodeID
+ rand.Read(id[:])
+
+ peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net)
+
+ // Start the peer on a new thread
+ errc := make(chan error, 1)
+ go func() {
+ select {
+ case pm.newPeerCh <- peer:
+ errc <- pm.handle(peer)
+ case <-pm.quitSync:
+ errc <- p2p.DiscQuitting
+ }
+ }()
+ tp := &testPeer{app: app, net: net, peer: peer}
+ // Execute any implicitly requested handshakes and return
+ if shake {
+ var (
+ genesis = pm.blockchain.Genesis()
+ head = pm.blockchain.CurrentHeader()
+ td = pm.blockchain.GetTd(head.Hash(), head.Number.Uint64())
+ )
+ tp.handshake(nil, td, head.Hash(), genesis.Hash())
+ }
+ return tp, errc
+}
+
+// handshake simulates a trivial handshake that expects the same state from the
+// remote side as we are simulating locally.
+func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, genesis common.Hash) {
+ msg := &statusData{
+ ProtocolVersion: uint32(p.version),
+ NetworkId: DefaultConfig.NetworkId,
+ TD: td,
+ CurrentBlock: head,
+ GenesisBlock: genesis,
+ }
+ if err := p2p.ExpectMsg(p.app, StatusMsg, msg); err != nil {
+ t.Fatalf("status recv: %v", err)
+ }
+ if err := p2p.Send(p.app, StatusMsg, msg); err != nil {
+ t.Fatalf("status send: %v", err)
+ }
+}
+
+// close terminates the local side of the peer, notifying the remote protocol
+// manager of termination.
+func (p *testPeer) close() {
+ p.app.Close()
+}
diff --git a/dex/metrics.go b/dex/metrics.go
new file mode 100644
index 000000000..0533a2a87
--- /dev/null
+++ b/dex/metrics.go
@@ -0,0 +1,139 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "github.com/ethereum/go-ethereum/metrics"
+ "github.com/ethereum/go-ethereum/p2p"
+)
+
+var (
+ propTxnInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/txns/in/packets", nil)
+ propTxnInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/txns/in/traffic", nil)
+ propTxnOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/txns/out/packets", nil)
+ propTxnOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/txns/out/traffic", nil)
+ propHashInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/hashes/in/packets", nil)
+ propHashInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/hashes/in/traffic", nil)
+ propHashOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/hashes/out/packets", nil)
+ propHashOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/hashes/out/traffic", nil)
+ propBlockInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/blocks/in/packets", nil)
+ propBlockInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/blocks/in/traffic", nil)
+ propBlockOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/blocks/out/packets", nil)
+ propBlockOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/blocks/out/traffic", nil)
+ reqHeaderInPacketsMeter = metrics.NewRegisteredMeter("eth/req/headers/in/packets", nil)
+ reqHeaderInTrafficMeter = metrics.NewRegisteredMeter("eth/req/headers/in/traffic", nil)
+ reqHeaderOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/headers/out/packets", nil)
+ reqHeaderOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/headers/out/traffic", nil)
+ reqBodyInPacketsMeter = metrics.NewRegisteredMeter("eth/req/bodies/in/packets", nil)
+ reqBodyInTrafficMeter = metrics.NewRegisteredMeter("eth/req/bodies/in/traffic", nil)
+ reqBodyOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/bodies/out/packets", nil)
+ reqBodyOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/bodies/out/traffic", nil)
+ reqStateInPacketsMeter = metrics.NewRegisteredMeter("eth/req/states/in/packets", nil)
+ reqStateInTrafficMeter = metrics.NewRegisteredMeter("eth/req/states/in/traffic", nil)
+ reqStateOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/states/out/packets", nil)
+ reqStateOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/states/out/traffic", nil)
+ reqReceiptInPacketsMeter = metrics.NewRegisteredMeter("eth/req/receipts/in/packets", nil)
+ reqReceiptInTrafficMeter = metrics.NewRegisteredMeter("eth/req/receipts/in/traffic", nil)
+ reqReceiptOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/receipts/out/packets", nil)
+ reqReceiptOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/receipts/out/traffic", nil)
+ miscInPacketsMeter = metrics.NewRegisteredMeter("eth/misc/in/packets", nil)
+ miscInTrafficMeter = metrics.NewRegisteredMeter("eth/misc/in/traffic", nil)
+ miscOutPacketsMeter = metrics.NewRegisteredMeter("eth/misc/out/packets", nil)
+ miscOutTrafficMeter = metrics.NewRegisteredMeter("eth/misc/out/traffic", nil)
+)
+
+// meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of
+// accumulating the above defined metrics based on the data stream contents.
+type meteredMsgReadWriter struct {
+ p2p.MsgReadWriter // Wrapped message stream to meter
+ version int // Protocol version to select correct meters
+}
+
+// newMeteredMsgWriter wraps a p2p MsgReadWriter with metering support. If the
+// metrics system is disabled, this function returns the original object.
+func newMeteredMsgWriter(rw p2p.MsgReadWriter) p2p.MsgReadWriter {
+ if !metrics.Enabled {
+ return rw
+ }
+ return &meteredMsgReadWriter{MsgReadWriter: rw}
+}
+
+// Init sets the protocol version used by the stream to know which meters to
+// increment in case of overlapping message ids between protocol versions.
+func (rw *meteredMsgReadWriter) Init(version int) {
+ rw.version = version
+}
+
+func (rw *meteredMsgReadWriter) ReadMsg() (p2p.Msg, error) {
+ // Read the message and short circuit in case of an error
+ msg, err := rw.MsgReadWriter.ReadMsg()
+ if err != nil {
+ return msg, err
+ }
+ // Account for the data traffic
+ packets, traffic := miscInPacketsMeter, miscInTrafficMeter
+ switch {
+ case msg.Code == BlockHeadersMsg:
+ packets, traffic = reqHeaderInPacketsMeter, reqHeaderInTrafficMeter
+ case msg.Code == BlockBodiesMsg:
+ packets, traffic = reqBodyInPacketsMeter, reqBodyInTrafficMeter
+
+ case rw.version >= eth63 && msg.Code == NodeDataMsg:
+ packets, traffic = reqStateInPacketsMeter, reqStateInTrafficMeter
+ case rw.version >= eth63 && msg.Code == ReceiptsMsg:
+ packets, traffic = reqReceiptInPacketsMeter, reqReceiptInTrafficMeter
+
+ case msg.Code == NewBlockHashesMsg:
+ packets, traffic = propHashInPacketsMeter, propHashInTrafficMeter
+ case msg.Code == NewBlockMsg:
+ packets, traffic = propBlockInPacketsMeter, propBlockInTrafficMeter
+ case msg.Code == TxMsg:
+ packets, traffic = propTxnInPacketsMeter, propTxnInTrafficMeter
+ }
+ packets.Mark(1)
+ traffic.Mark(int64(msg.Size))
+
+ return msg, err
+}
+
+func (rw *meteredMsgReadWriter) WriteMsg(msg p2p.Msg) error {
+ // Account for the data traffic
+ packets, traffic := miscOutPacketsMeter, miscOutTrafficMeter
+ switch {
+ case msg.Code == BlockHeadersMsg:
+ packets, traffic = reqHeaderOutPacketsMeter, reqHeaderOutTrafficMeter
+ case msg.Code == BlockBodiesMsg:
+ packets, traffic = reqBodyOutPacketsMeter, reqBodyOutTrafficMeter
+
+ case rw.version >= eth63 && msg.Code == NodeDataMsg:
+ packets, traffic = reqStateOutPacketsMeter, reqStateOutTrafficMeter
+ case rw.version >= eth63 && msg.Code == ReceiptsMsg:
+ packets, traffic = reqReceiptOutPacketsMeter, reqReceiptOutTrafficMeter
+
+ case msg.Code == NewBlockHashesMsg:
+ packets, traffic = propHashOutPacketsMeter, propHashOutTrafficMeter
+ case msg.Code == NewBlockMsg:
+ packets, traffic = propBlockOutPacketsMeter, propBlockOutTrafficMeter
+ case msg.Code == TxMsg:
+ packets, traffic = propTxnOutPacketsMeter, propTxnOutTrafficMeter
+ }
+ packets.Mark(1)
+ traffic.Mark(int64(msg.Size))
+
+ // Send the packet to the p2p layer
+ return rw.MsgReadWriter.WriteMsg(msg)
+}
diff --git a/dex/peer.go b/dex/peer.go
new file mode 100644
index 000000000..b5f450855
--- /dev/null
+++ b/dex/peer.go
@@ -0,0 +1,522 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "errors"
+ "fmt"
+ "math/big"
+ "sync"
+ "time"
+
+ mapset "github.com/deckarep/golang-set"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+var (
+ errClosed = errors.New("peer set is closed")
+ errAlreadyRegistered = errors.New("peer is already registered")
+ errNotRegistered = errors.New("peer is not registered")
+)
+
+const (
+ maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+ maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
+
+ // maxQueuedTxs is the maximum number of transaction lists to queue up before
+ // dropping broadcasts. This is a sensitive number as a transaction list might
+ // contain a single transaction, or thousands.
+ maxQueuedTxs = 128
+
+ // maxQueuedProps is the maximum number of block propagations to queue up before
+ // dropping broadcasts. There's not much point in queueing stale blocks, so a few
+ // that might cover uncles should be enough.
+ maxQueuedProps = 4
+
+ // maxQueuedAnns is the maximum number of block announcements to queue up before
+ // dropping broadcasts. Similarly to block propagations, there's no point to queue
+ // above some healthy uncle limit, so use that.
+ maxQueuedAnns = 4
+
+ handshakeTimeout = 5 * time.Second
+)
+
+// PeerInfo represents a short summary of the Ethereum sub-protocol metadata known
+// about a connected peer.
+type PeerInfo struct {
+ Version int `json:"version"` // Ethereum protocol version negotiated
+ Difficulty *big.Int `json:"difficulty"` // Total difficulty of the peer's blockchain
+ Head string `json:"head"` // SHA3 hash of the peer's best owned block
+}
+
+// propEvent is a block propagation, waiting for its turn in the broadcast queue.
+type propEvent struct {
+ block *types.Block
+ td *big.Int
+}
+
+type peer struct {
+ id string
+
+ *p2p.Peer
+ rw p2p.MsgReadWriter
+
+ version int // Protocol version negotiated
+ forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time
+
+ head common.Hash
+ td *big.Int
+ lock sync.RWMutex
+
+ knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
+ knownBlocks mapset.Set // Set of block hashes known to be known by this peer
+ queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
+ queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
+ queuedAnns chan *types.Block // Queue of blocks to announce to the peer
+ term chan struct{} // Termination channel to stop the broadcaster
+}
+
+func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
+ return &peer{
+ Peer: p,
+ rw: rw,
+ version: version,
+ id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
+ knownTxs: mapset.NewSet(),
+ knownBlocks: mapset.NewSet(),
+ queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
+ queuedProps: make(chan *propEvent, maxQueuedProps),
+ queuedAnns: make(chan *types.Block, maxQueuedAnns),
+ term: make(chan struct{}),
+ }
+}
+
+// broadcast is a write loop that multiplexes block propagations, announcements
+// and transaction broadcasts into the remote peer. The goal is to have an async
+// writer that does not lock up node internals.
+func (p *peer) broadcast() {
+ for {
+ select {
+ case txs := <-p.queuedTxs:
+ if err := p.SendTransactions(txs); err != nil {
+ return
+ }
+ p.Log().Trace("Broadcast transactions", "count", len(txs))
+
+ case prop := <-p.queuedProps:
+ if err := p.SendNewBlock(prop.block, prop.td); err != nil {
+ return
+ }
+ p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
+
+ case block := <-p.queuedAnns:
+ if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
+ return
+ }
+ p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash())
+
+ case <-p.term:
+ return
+ }
+ }
+}
+
+// close signals the broadcast goroutine to terminate.
+func (p *peer) close() {
+ close(p.term)
+}
+
+// Info gathers and returns a collection of metadata known about a peer.
+func (p *peer) Info() *PeerInfo {
+ hash, td := p.Head()
+
+ return &PeerInfo{
+ Version: p.version,
+ Difficulty: td,
+ Head: hash.Hex(),
+ }
+}
+
+// Head retrieves a copy of the current head hash and total difficulty of the
+// peer.
+func (p *peer) Head() (hash common.Hash, td *big.Int) {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
+
+ copy(hash[:], p.head[:])
+ return hash, new(big.Int).Set(p.td)
+}
+
+// SetHead updates the head hash and total difficulty of the peer.
+func (p *peer) SetHead(hash common.Hash, td *big.Int) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ copy(p.head[:], hash[:])
+ p.td.Set(td)
+}
+
+// MarkBlock marks a block as known for the peer, ensuring that the block will
+// never be propagated to this particular peer.
+func (p *peer) MarkBlock(hash common.Hash) {
+ // If we reached the memory allowance, drop a previously known block hash
+ for p.knownBlocks.Cardinality() >= maxKnownBlocks {
+ p.knownBlocks.Pop()
+ }
+ p.knownBlocks.Add(hash)
+}
+
+// MarkTransaction marks a transaction as known for the peer, ensuring that it
+// will never be propagated to this particular peer.
+func (p *peer) MarkTransaction(hash common.Hash) {
+ // If we reached the memory allowance, drop a previously known transaction hash
+ for p.knownTxs.Cardinality() >= maxKnownTxs {
+ p.knownTxs.Pop()
+ }
+ p.knownTxs.Add(hash)
+}
+
+// SendTransactions sends transactions to the peer and includes the hashes
+// in its transaction hash set for future reference.
+func (p *peer) SendTransactions(txs types.Transactions) error {
+ for _, tx := range txs {
+ p.knownTxs.Add(tx.Hash())
+ }
+ return p2p.Send(p.rw, TxMsg, txs)
+}
+
+// AsyncSendTransactions queues list of transactions propagation to a remote
+// peer. If the peer's broadcast queue is full, the event is silently dropped.
+func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
+ select {
+ case p.queuedTxs <- txs:
+ for _, tx := range txs {
+ p.knownTxs.Add(tx.Hash())
+ }
+ default:
+ p.Log().Debug("Dropping transaction propagation", "count", len(txs))
+ }
+}
+
+// SendNewBlockHashes announces the availability of a number of blocks through
+// a hash notification.
+func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
+ for _, hash := range hashes {
+ p.knownBlocks.Add(hash)
+ }
+ request := make(newBlockHashesData, len(hashes))
+ for i := 0; i < len(hashes); i++ {
+ request[i].Hash = hashes[i]
+ request[i].Number = numbers[i]
+ }
+ return p2p.Send(p.rw, NewBlockHashesMsg, request)
+}
+
+// AsyncSendNewBlockHash queues the availability of a block for propagation to a
+// remote peer. If the peer's broadcast queue is full, the event is silently
+// dropped.
+func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
+ select {
+ case p.queuedAnns <- block:
+ p.knownBlocks.Add(block.Hash())
+ default:
+ p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
+ }
+}
+
+// SendNewBlock propagates an entire block to a remote peer.
+func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
+ p.knownBlocks.Add(block.Hash())
+ return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
+}
+
+// AsyncSendNewBlock queues an entire block for propagation to a remote peer. If
+// the peer's broadcast queue is full, the event is silently dropped.
+func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
+ select {
+ case p.queuedProps <- &propEvent{block: block, td: td}:
+ p.knownBlocks.Add(block.Hash())
+ default:
+ p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
+ }
+}
+
+// SendBlockHeaders sends a batch of block headers to the remote peer.
+func (p *peer) SendBlockHeaders(headers []*types.Header) error {
+ return p2p.Send(p.rw, BlockHeadersMsg, headers)
+}
+
+// SendBlockBodies sends a batch of block contents to the remote peer.
+func (p *peer) SendBlockBodies(bodies []*blockBody) error {
+ return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesData(bodies))
+}
+
+// SendBlockBodiesRLP sends a batch of block contents to the remote peer from
+// an already RLP encoded format.
+func (p *peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error {
+ return p2p.Send(p.rw, BlockBodiesMsg, bodies)
+}
+
+// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the
+// hashes requested.
+func (p *peer) SendNodeData(data [][]byte) error {
+ return p2p.Send(p.rw, NodeDataMsg, data)
+}
+
+// SendReceiptsRLP sends a batch of transaction receipts, corresponding to the
+// ones requested from an already RLP encoded format.
+func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
+ return p2p.Send(p.rw, ReceiptsMsg, receipts)
+}
+
+// RequestOneHeader is a wrapper around the header query functions to fetch a
+// single header. It is used solely by the fetcher.
+func (p *peer) RequestOneHeader(hash common.Hash) error {
+ p.Log().Debug("Fetching single header", "hash", hash)
+ return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false})
+}
+
+// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the
+// specified header query, based on the hash of an origin block.
+func (p *peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error {
+ p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse)
+ return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
+}
+
+// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the
+// specified header query, based on the number of an origin block.
+func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error {
+ p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse)
+ return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse})
+}
+
+// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes
+// specified.
+func (p *peer) RequestBodies(hashes []common.Hash) error {
+ p.Log().Debug("Fetching batch of block bodies", "count", len(hashes))
+ return p2p.Send(p.rw, GetBlockBodiesMsg, hashes)
+}
+
+// RequestNodeData fetches a batch of arbitrary data from a node's known state
+// data, corresponding to the specified hashes.
+func (p *peer) RequestNodeData(hashes []common.Hash) error {
+ p.Log().Debug("Fetching batch of state data", "count", len(hashes))
+ return p2p.Send(p.rw, GetNodeDataMsg, hashes)
+}
+
+// RequestReceipts fetches a batch of transaction receipts from a remote node.
+func (p *peer) RequestReceipts(hashes []common.Hash) error {
+ p.Log().Debug("Fetching batch of receipts", "count", len(hashes))
+ return p2p.Send(p.rw, GetReceiptsMsg, hashes)
+}
+
+// Handshake executes the eth protocol handshake, negotiating version number,
+// network IDs, difficulties, head and genesis blocks.
+func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash) error {
+ // Send out own handshake in a new thread
+ errc := make(chan error, 2)
+ var status statusData // safe to read after two values have been received from errc
+
+ go func() {
+ errc <- p2p.Send(p.rw, StatusMsg, &statusData{
+ ProtocolVersion: uint32(p.version),
+ NetworkId: network,
+ TD: td,
+ CurrentBlock: head,
+ GenesisBlock: genesis,
+ })
+ }()
+ go func() {
+ errc <- p.readStatus(network, &status, genesis)
+ }()
+ timeout := time.NewTimer(handshakeTimeout)
+ defer timeout.Stop()
+ for i := 0; i < 2; i++ {
+ select {
+ case err := <-errc:
+ if err != nil {
+ return err
+ }
+ case <-timeout.C:
+ return p2p.DiscReadTimeout
+ }
+ }
+ p.td, p.head = status.TD, status.CurrentBlock
+ return nil
+}
+
+func (p *peer) readStatus(network uint64, status *statusData, genesis common.Hash) (err error) {
+ msg, err := p.rw.ReadMsg()
+ if err != nil {
+ return err
+ }
+ if msg.Code != StatusMsg {
+ return errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg)
+ }
+ if msg.Size > ProtocolMaxMsgSize {
+ return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize)
+ }
+ // Decode the handshake and make sure everything matches
+ if err := msg.Decode(&status); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ if status.GenesisBlock != genesis {
+ return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock[:8], genesis[:8])
+ }
+ if status.NetworkId != network {
+ return errResp(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, network)
+ }
+ if int(status.ProtocolVersion) != p.version {
+ return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, p.version)
+ }
+ return nil
+}
+
+// String implements fmt.Stringer.
+func (p *peer) String() string {
+ return fmt.Sprintf("Peer %s [%s]", p.id,
+ fmt.Sprintf("eth/%2d", p.version),
+ )
+}
+
+// peerSet represents the collection of active peers currently participating in
+// the Ethereum sub-protocol.
+type peerSet struct {
+ peers map[string]*peer
+ lock sync.RWMutex
+ closed bool
+}
+
+// newPeerSet creates a new peer set to track the active participants.
+func newPeerSet() *peerSet {
+ return &peerSet{
+ peers: make(map[string]*peer),
+ }
+}
+
+// Register injects a new peer into the working set, or returns an error if the
+// peer is already known. If a new peer it registered, its broadcast loop is also
+// started.
+func (ps *peerSet) Register(p *peer) error {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ if ps.closed {
+ return errClosed
+ }
+ if _, ok := ps.peers[p.id]; ok {
+ return errAlreadyRegistered
+ }
+ ps.peers[p.id] = p
+ go p.broadcast()
+
+ return nil
+}
+
+// Unregister removes a remote peer from the active set, disabling any further
+// actions to/from that particular entity.
+func (ps *peerSet) Unregister(id string) error {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ p, ok := ps.peers[id]
+ if !ok {
+ return errNotRegistered
+ }
+ delete(ps.peers, id)
+ p.close()
+
+ return nil
+}
+
+// Peer retrieves the registered peer with the given id.
+func (ps *peerSet) Peer(id string) *peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ return ps.peers[id]
+}
+
+// Len returns if the current number of peers in the set.
+func (ps *peerSet) Len() int {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ return len(ps.peers)
+}
+
+// PeersWithoutBlock retrieves a list of peers that do not have a given block in
+// their set of known hashes.
+func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ if !p.knownBlocks.Contains(hash) {
+ list = append(list, p)
+ }
+ }
+ return list
+}
+
+// PeersWithoutTx retrieves a list of peers that do not have a given transaction
+// in their set of known hashes.
+func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ list := make([]*peer, 0, len(ps.peers))
+ for _, p := range ps.peers {
+ if !p.knownTxs.Contains(hash) {
+ list = append(list, p)
+ }
+ }
+ return list
+}
+
+// BestPeer retrieves the known peer with the currently highest total difficulty.
+func (ps *peerSet) BestPeer() *peer {
+ ps.lock.RLock()
+ defer ps.lock.RUnlock()
+
+ var (
+ bestPeer *peer
+ bestTd *big.Int
+ )
+ for _, p := range ps.peers {
+ if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 {
+ bestPeer, bestTd = p, td
+ }
+ }
+ return bestPeer
+}
+
+// Close disconnects all peers.
+// No new peers can be registered after Close has returned.
+func (ps *peerSet) Close() {
+ ps.lock.Lock()
+ defer ps.lock.Unlock()
+
+ for _, p := range ps.peers {
+ p.Disconnect(p2p.DiscQuitting)
+ }
+ ps.closed = true
+}
diff --git a/dex/protocol.go b/dex/protocol.go
new file mode 100644
index 000000000..0e90e6a2e
--- /dev/null
+++ b/dex/protocol.go
@@ -0,0 +1,183 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "fmt"
+ "io"
+ "math/big"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/event"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+// Constants to match up protocol versions and messages
+const (
+ eth62 = 62
+ eth63 = 63
+)
+
+// ProtocolName is the official short name of the protocol used during capability negotiation.
+var ProtocolName = "eth"
+
+// ProtocolVersions are the upported versions of the eth protocol (first is primary).
+var ProtocolVersions = []uint{eth63, eth62}
+
+// ProtocolLengths are the number of implemented message corresponding to different protocol versions.
+var ProtocolLengths = []uint64{17, 8}
+
+const ProtocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message
+
+// eth protocol message codes
+const (
+ // Protocol messages belonging to eth/62
+ StatusMsg = 0x00
+ NewBlockHashesMsg = 0x01
+ TxMsg = 0x02
+ GetBlockHeadersMsg = 0x03
+ BlockHeadersMsg = 0x04
+ GetBlockBodiesMsg = 0x05
+ BlockBodiesMsg = 0x06
+ NewBlockMsg = 0x07
+
+ // Protocol messages belonging to eth/63
+ GetNodeDataMsg = 0x0d
+ NodeDataMsg = 0x0e
+ GetReceiptsMsg = 0x0f
+ ReceiptsMsg = 0x10
+)
+
+type errCode int
+
+const (
+ ErrMsgTooLarge = iota
+ ErrDecode
+ ErrInvalidMsgCode
+ ErrProtocolVersionMismatch
+ ErrNetworkIdMismatch
+ ErrGenesisBlockMismatch
+ ErrNoStatusMsg
+ ErrExtraStatusMsg
+ ErrSuspendedPeer
+)
+
+func (e errCode) String() string {
+ return errorToString[int(e)]
+}
+
+// XXX change once legacy code is out
+var errorToString = map[int]string{
+ ErrMsgTooLarge: "Message too long",
+ ErrDecode: "Invalid message",
+ ErrInvalidMsgCode: "Invalid message code",
+ ErrProtocolVersionMismatch: "Protocol version mismatch",
+ ErrNetworkIdMismatch: "NetworkId mismatch",
+ ErrGenesisBlockMismatch: "Genesis block mismatch",
+ ErrNoStatusMsg: "No status message",
+ ErrExtraStatusMsg: "Extra status message",
+ ErrSuspendedPeer: "Suspended peer",
+}
+
+type txPool interface {
+ // AddRemotes should add the given transactions to the pool.
+ AddRemotes([]*types.Transaction) []error
+
+ // Pending should return pending transactions.
+ // The slice should be modifiable by the caller.
+ Pending() (map[common.Address]types.Transactions, error)
+
+ // SubscribeNewTxsEvent should return an event subscription of
+ // NewTxsEvent and send events to the given channel.
+ SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription
+}
+
+// statusData is the network packet for the status message.
+type statusData struct {
+ ProtocolVersion uint32
+ NetworkId uint64
+ TD *big.Int
+ CurrentBlock common.Hash
+ GenesisBlock common.Hash
+}
+
+// newBlockHashesData is the network packet for the block announcements.
+type newBlockHashesData []struct {
+ Hash common.Hash // Hash of one particular block being announced
+ Number uint64 // Number of one particular block being announced
+}
+
+// getBlockHeadersData represents a block header query.
+type getBlockHeadersData struct {
+ Origin hashOrNumber // Block from which to retrieve headers
+ Amount uint64 // Maximum number of headers to retrieve
+ Skip uint64 // Blocks to skip between consecutive headers
+ Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis)
+}
+
+// hashOrNumber is a combined field for specifying an origin block.
+type hashOrNumber struct {
+ Hash common.Hash // Block hash from which to retrieve headers (excludes Number)
+ Number uint64 // Block hash from which to retrieve headers (excludes Hash)
+}
+
+// EncodeRLP is a specialized encoder for hashOrNumber to encode only one of the
+// two contained union fields.
+func (hn *hashOrNumber) EncodeRLP(w io.Writer) error {
+ if hn.Hash == (common.Hash{}) {
+ return rlp.Encode(w, hn.Number)
+ }
+ if hn.Number != 0 {
+ return fmt.Errorf("both origin hash (%x) and number (%d) provided", hn.Hash, hn.Number)
+ }
+ return rlp.Encode(w, hn.Hash)
+}
+
+// DecodeRLP is a specialized decoder for hashOrNumber to decode the contents
+// into either a block hash or a block number.
+func (hn *hashOrNumber) DecodeRLP(s *rlp.Stream) error {
+ _, size, _ := s.Kind()
+ origin, err := s.Raw()
+ if err == nil {
+ switch {
+ case size == 32:
+ err = rlp.DecodeBytes(origin, &hn.Hash)
+ case size <= 8:
+ err = rlp.DecodeBytes(origin, &hn.Number)
+ default:
+ err = fmt.Errorf("invalid input size %d for origin", size)
+ }
+ }
+ return err
+}
+
+// newBlockData is the network packet for the block propagation message.
+type newBlockData struct {
+ Block *types.Block
+ TD *big.Int
+}
+
+// blockBody represents the data content of a single block.
+type blockBody struct {
+ Transactions []*types.Transaction // Transactions contained within a block
+ Uncles []*types.Header // Uncles contained within a block
+}
+
+// blockBodiesData is the network packet for block content distribution.
+type blockBodiesData []*blockBody
diff --git a/dex/protocol_test.go b/dex/protocol_test.go
new file mode 100644
index 000000000..aa43dfa92
--- /dev/null
+++ b/dex/protocol_test.go
@@ -0,0 +1,223 @@
+// Copyright 2014 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/p2p"
+ "github.com/ethereum/go-ethereum/rlp"
+)
+
+func init() {
+ // log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(false))))
+}
+
+var testAccount, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
+
+// Tests that handshake failures are detected and reported correctly.
+func TestStatusMsgErrors62(t *testing.T) { testStatusMsgErrors(t, 62) }
+func TestStatusMsgErrors63(t *testing.T) { testStatusMsgErrors(t, 63) }
+
+func testStatusMsgErrors(t *testing.T, protocol int) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ var (
+ genesis = pm.blockchain.Genesis()
+ head = pm.blockchain.CurrentHeader()
+ td = pm.blockchain.GetTd(head.Hash(), head.Number.Uint64())
+ )
+ defer pm.Stop()
+
+ tests := []struct {
+ code uint64
+ data interface{}
+ wantError error
+ }{
+ {
+ code: TxMsg, data: []interface{}{},
+ wantError: errResp(ErrNoStatusMsg, "first msg has code 2 (!= 0)"),
+ },
+ {
+ code: StatusMsg, data: statusData{10, DefaultConfig.NetworkId, td, head.Hash(), genesis.Hash()},
+ wantError: errResp(ErrProtocolVersionMismatch, "10 (!= %d)", protocol),
+ },
+ {
+ code: StatusMsg, data: statusData{uint32(protocol), 999, td, head.Hash(), genesis.Hash()},
+ wantError: errResp(ErrNetworkIdMismatch, "999 (!= 1)"),
+ },
+ {
+ code: StatusMsg, data: statusData{uint32(protocol), DefaultConfig.NetworkId, td, head.Hash(), common.Hash{3}},
+ wantError: errResp(ErrGenesisBlockMismatch, "0300000000000000 (!= %x)", genesis.Hash().Bytes()[:8]),
+ },
+ }
+
+ for i, test := range tests {
+ p, errc := newTestPeer("peer", protocol, pm, false)
+ // The send call might hang until reset because
+ // the protocol might not read the payload.
+ go p2p.Send(p.app, test.code, test.data)
+
+ select {
+ case err := <-errc:
+ if err == nil {
+ t.Errorf("test %d: protocol returned nil error, want %q", i, test.wantError)
+ } else if err.Error() != test.wantError.Error() {
+ t.Errorf("test %d: wrong error: got %q, want %q", i, err, test.wantError)
+ }
+ case <-time.After(2 * time.Second):
+ t.Errorf("protocol did not shut down within 2 seconds")
+ }
+ p.close()
+ }
+}
+
+// This test checks that received transactions are added to the local pool.
+func TestRecvTransactions62(t *testing.T) { testRecvTransactions(t, 62) }
+func TestRecvTransactions63(t *testing.T) { testRecvTransactions(t, 63) }
+
+func testRecvTransactions(t *testing.T, protocol int) {
+ txAdded := make(chan []*types.Transaction)
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, txAdded)
+ pm.acceptTxs = 1 // mark synced to accept transactions
+ p, _ := newTestPeer("peer", protocol, pm, true)
+ defer pm.Stop()
+ defer p.close()
+
+ tx := newTestTransaction(testAccount, 0, 0)
+ if err := p2p.Send(p.app, TxMsg, []interface{}{tx}); err != nil {
+ t.Fatalf("send error: %v", err)
+ }
+ select {
+ case added := <-txAdded:
+ if len(added) != 1 {
+ t.Errorf("wrong number of added transactions: got %d, want 1", len(added))
+ } else if added[0].Hash() != tx.Hash() {
+ t.Errorf("added wrong tx hash: got %v, want %v", added[0].Hash(), tx.Hash())
+ }
+ case <-time.After(2 * time.Second):
+ t.Errorf("no NewTxsEvent received within 2 seconds")
+ }
+}
+
+// This test checks that pending transactions are sent.
+func TestSendTransactions62(t *testing.T) { testSendTransactions(t, 62) }
+func TestSendTransactions63(t *testing.T) { testSendTransactions(t, 63) }
+
+func testSendTransactions(t *testing.T, protocol int) {
+ pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
+ defer pm.Stop()
+
+ // Fill the pool with big transactions.
+ const txsize = txsyncPackSize / 10
+ alltxs := make([]*types.Transaction, 100)
+ for nonce := range alltxs {
+ alltxs[nonce] = newTestTransaction(testAccount, uint64(nonce), txsize)
+ }
+ pm.txpool.AddRemotes(alltxs)
+
+ // Connect several peers. They should all receive the pending transactions.
+ var wg sync.WaitGroup
+ checktxs := func(p *testPeer) {
+ defer wg.Done()
+ defer p.close()
+ seen := make(map[common.Hash]bool)
+ for _, tx := range alltxs {
+ seen[tx.Hash()] = false
+ }
+ for n := 0; n < len(alltxs) && !t.Failed(); {
+ var txs []*types.Transaction
+ msg, err := p.app.ReadMsg()
+ if err != nil {
+ t.Errorf("%v: read error: %v", p.Peer, err)
+ } else if msg.Code != TxMsg {
+ t.Errorf("%v: got code %d, want TxMsg", p.Peer, msg.Code)
+ }
+ if err := msg.Decode(&txs); err != nil {
+ t.Errorf("%v: %v", p.Peer, err)
+ }
+ for _, tx := range txs {
+ hash := tx.Hash()
+ seentx, want := seen[hash]
+ if seentx {
+ t.Errorf("%v: got tx more than once: %x", p.Peer, hash)
+ }
+ if !want {
+ t.Errorf("%v: got unexpected tx: %x", p.Peer, hash)
+ }
+ seen[hash] = true
+ n++
+ }
+ }
+ }
+ for i := 0; i < 3; i++ {
+ p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), protocol, pm, true)
+ wg.Add(1)
+ go checktxs(p)
+ }
+ wg.Wait()
+}
+
+// Tests that the custom union field encoder and decoder works correctly.
+func TestGetBlockHeadersDataEncodeDecode(t *testing.T) {
+ // Create a "random" hash for testing
+ var hash common.Hash
+ for i := range hash {
+ hash[i] = byte(i)
+ }
+ // Assemble some table driven tests
+ tests := []struct {
+ packet *getBlockHeadersData
+ fail bool
+ }{
+ // Providing the origin as either a hash or a number should both work
+ {fail: false, packet: &getBlockHeadersData{Origin: hashOrNumber{Number: 314}}},
+ {fail: false, packet: &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}}},
+
+ // Providing arbitrary query field should also work
+ {fail: false, packet: &getBlockHeadersData{Origin: hashOrNumber{Number: 314}, Amount: 314, Skip: 1, Reverse: true}},
+ {fail: false, packet: &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: 314, Skip: 1, Reverse: true}},
+
+ // Providing both the origin hash and origin number must fail
+ {fail: true, packet: &getBlockHeadersData{Origin: hashOrNumber{Hash: hash, Number: 314}}},
+ }
+ // Iterate over each of the tests and try to encode and then decode
+ for i, tt := range tests {
+ bytes, err := rlp.EncodeToBytes(tt.packet)
+ if err != nil && !tt.fail {
+ t.Fatalf("test %d: failed to encode packet: %v", i, err)
+ } else if err == nil && tt.fail {
+ t.Fatalf("test %d: encode should have failed", i)
+ }
+ if !tt.fail {
+ packet := new(getBlockHeadersData)
+ if err := rlp.DecodeBytes(bytes, packet); err != nil {
+ t.Fatalf("test %d: failed to decode packet: %v", i, err)
+ }
+ if packet.Origin.Hash != tt.packet.Origin.Hash || packet.Origin.Number != tt.packet.Origin.Number || packet.Amount != tt.packet.Amount ||
+ packet.Skip != tt.packet.Skip || packet.Reverse != tt.packet.Reverse {
+ t.Fatalf("test %d: encode decode mismatch: have %+v, want %+v", i, packet, tt.packet)
+ }
+ }
+ }
+}
diff --git a/dex/sync.go b/dex/sync.go
new file mode 100644
index 000000000..e49e40087
--- /dev/null
+++ b/dex/sync.go
@@ -0,0 +1,217 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package eth
+
+import (
+ "math/rand"
+ "sync/atomic"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/eth/downloader"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/p2p/discover"
+)
+
+const (
+ forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available
+ minDesiredPeerCount = 5 // Amount of peers desired to start syncing
+
+ // This is the target size for the packs of transactions sent by txsyncLoop.
+ // A pack can get larger than this if a single transactions exceeds this size.
+ txsyncPackSize = 100 * 1024
+)
+
+type txsync struct {
+ p *peer
+ txs []*types.Transaction
+}
+
+// syncTransactions starts sending all currently pending transactions to the given peer.
+func (pm *ProtocolManager) syncTransactions(p *peer) {
+ var txs types.Transactions
+ pending, _ := pm.txpool.Pending()
+ for _, batch := range pending {
+ txs = append(txs, batch...)
+ }
+ if len(txs) == 0 {
+ return
+ }
+ select {
+ case pm.txsyncCh <- &txsync{p, txs}:
+ case <-pm.quitSync:
+ }
+}
+
+// txsyncLoop takes care of the initial transaction sync for each new
+// connection. When a new peer appears, we relay all currently pending
+// transactions. In order to minimise egress bandwidth usage, we send
+// the transactions in small packs to one peer at a time.
+func (pm *ProtocolManager) txsyncLoop() {
+ var (
+ pending = make(map[discover.NodeID]*txsync)
+ sending = false // whether a send is active
+ pack = new(txsync) // the pack that is being sent
+ done = make(chan error, 1) // result of the send
+ )
+
+ // send starts a sending a pack of transactions from the sync.
+ send := func(s *txsync) {
+ // Fill pack with transactions up to the target size.
+ size := common.StorageSize(0)
+ pack.p = s.p
+ pack.txs = pack.txs[:0]
+ for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
+ pack.txs = append(pack.txs, s.txs[i])
+ size += s.txs[i].Size()
+ }
+ // Remove the transactions that will be sent.
+ s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
+ if len(s.txs) == 0 {
+ delete(pending, s.p.ID())
+ }
+ // Send the pack in the background.
+ s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
+ sending = true
+ go func() { done <- pack.p.SendTransactions(pack.txs) }()
+ }
+
+ // pick chooses the next pending sync.
+ pick := func() *txsync {
+ if len(pending) == 0 {
+ return nil
+ }
+ n := rand.Intn(len(pending)) + 1
+ for _, s := range pending {
+ if n--; n == 0 {
+ return s
+ }
+ }
+ return nil
+ }
+
+ for {
+ select {
+ case s := <-pm.txsyncCh:
+ pending[s.p.ID()] = s
+ if !sending {
+ send(s)
+ }
+ case err := <-done:
+ sending = false
+ // Stop tracking peers that cause send failures.
+ if err != nil {
+ pack.p.Log().Debug("Transaction send failed", "err", err)
+ delete(pending, pack.p.ID())
+ }
+ // Schedule the next send.
+ if s := pick(); s != nil {
+ send(s)
+ }
+ case <-pm.quitSync:
+ return
+ }
+ }
+}
+
+// syncer is responsible for periodically synchronising with the network, both
+// downloading hashes and blocks as well as handling the announcement handler.
+func (pm *ProtocolManager) syncer() {
+ // Start and ensure cleanup of sync mechanisms
+ pm.fetcher.Start()
+ defer pm.fetcher.Stop()
+ defer pm.downloader.Terminate()
+
+ // Wait for different events to fire synchronisation operations
+ forceSync := time.NewTicker(forceSyncCycle)
+ defer forceSync.Stop()
+
+ for {
+ select {
+ case <-pm.newPeerCh:
+ // Make sure we have peers to select from, then sync
+ if pm.peers.Len() < minDesiredPeerCount {
+ break
+ }
+ go pm.synchronise(pm.peers.BestPeer())
+
+ case <-forceSync.C:
+ // Force a sync even if not enough peers are present
+ go pm.synchronise(pm.peers.BestPeer())
+
+ case <-pm.noMorePeers:
+ return
+ }
+ }
+}
+
+// synchronise tries to sync up our local block chain with a remote peer.
+func (pm *ProtocolManager) synchronise(peer *peer) {
+ // Short circuit if no peers are available
+ if peer == nil {
+ return
+ }
+ // Make sure the peer's TD is higher than our own
+ currentBlock := pm.blockchain.CurrentBlock()
+ td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())
+
+ pHead, pTd := peer.Head()
+ if pTd.Cmp(td) <= 0 {
+ return
+ }
+ // Otherwise try to sync with the downloader
+ mode := downloader.FullSync
+ if atomic.LoadUint32(&pm.fastSync) == 1 {
+ // Fast sync was explicitly requested, and explicitly granted
+ mode = downloader.FastSync
+ } else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 {
+ // The database seems empty as the current block is the genesis. Yet the fast
+ // block is ahead, so fast sync was enabled for this node at a certain point.
+ // The only scenario where this can happen is if the user manually (or via a
+ // bad block) rolled back a fast sync node below the sync point. In this case
+ // however it's safe to reenable fast sync.
+ atomic.StoreUint32(&pm.fastSync, 1)
+ mode = downloader.FastSync
+ }
+
+ if mode == downloader.FastSync {
+ // Make sure the peer's total difficulty we are synchronizing is higher.
+ if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 {
+ return
+ }
+ }
+
+ // Run the sync cycle, and disable fast sync if we've went past the pivot block
+ if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil {
+ return
+ }
+ if atomic.LoadUint32(&pm.fastSync) == 1 {
+ log.Info("Fast sync complete, auto disabling")
+ atomic.StoreUint32(&pm.fastSync, 0)
+ }
+ atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done
+ if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 {
+ // We've completed a sync cycle, notify all peers of new state. This path is
+ // essential in star-topology networks where a gateway node needs to notify
+ // all its out-of-date peers of the availability of a new block. This failure
+ // scenario will most often crop up in private and hackathon networks with
+ // degenerate connectivity, but it should be healthy for the mainnet too to
+ // more reliably update peers or the local TD state.
+ go pm.BroadcastBlock(head, false)
+ }
+}