diff options
author | Sonic <sonic@cobinhood.com> | 2018-09-19 13:49:30 +0800 |
---|---|---|
committer | Wei-Ning Huang <w@dexon.org> | 2018-12-19 18:45:23 +0800 |
commit | 92fdb2d4f9a137971af1a94225e106db42d839cb (patch) | |
tree | 275a64a886e11422c018eda3ab0660b1650e2e2b | |
parent | 54436e915f7b59be46b78f2bccb2d217c07b88bd (diff) | |
download | dexon-92fdb2d4f9a137971af1a94225e106db42d839cb.tar dexon-92fdb2d4f9a137971af1a94225e106db42d839cb.tar.gz dexon-92fdb2d4f9a137971af1a94225e106db42d839cb.tar.bz2 dexon-92fdb2d4f9a137971af1a94225e106db42d839cb.tar.lz dexon-92fdb2d4f9a137971af1a94225e106db42d839cb.tar.xz dexon-92fdb2d4f9a137971af1a94225e106db42d839cb.tar.zst dexon-92fdb2d4f9a137971af1a94225e106db42d839cb.zip |
Copy codebase from eth
-rw-r--r-- | dex/config.go | 135 | ||||
-rw-r--r-- | dex/handler.go | 787 | ||||
-rw-r--r-- | dex/helper_test.go | 201 | ||||
-rw-r--r-- | dex/metrics.go | 139 | ||||
-rw-r--r-- | dex/peer.go | 522 | ||||
-rw-r--r-- | dex/protocol.go | 183 | ||||
-rw-r--r-- | dex/protocol_test.go | 223 | ||||
-rw-r--r-- | dex/sync.go | 217 |
8 files changed, 2407 insertions, 0 deletions
diff --git a/dex/config.go b/dex/config.go new file mode 100644 index 000000000..375fbcc3c --- /dev/null +++ b/dex/config.go @@ -0,0 +1,135 @@ +// Copyright 2017 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "math/big" + "os" + "os/user" + "path/filepath" + "runtime" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/eth/gasprice" + "github.com/ethereum/go-ethereum/params" +) + +// DefaultConfig contains default settings for use on the Ethereum main net. +var DefaultConfig = Config{ + SyncMode: downloader.FastSync, + Ethash: ethash.Config{ + CacheDir: "ethash", + CachesInMem: 2, + CachesOnDisk: 3, + DatasetsInMem: 1, + DatasetsOnDisk: 2, + }, + NetworkId: 1, + LightPeers: 100, + DatabaseCache: 768, + TrieCleanCache: 256, + TrieDirtyCache: 256, + TrieTimeout: 60 * time.Minute, + MinerGasFloor: 8000000, + MinerGasCeil: 8000000, + MinerGasPrice: big.NewInt(params.GWei), + MinerRecommit: 3 * time.Second, + + TxPool: core.DefaultTxPoolConfig, + GPO: gasprice.Config{ + Blocks: 20, + Percentile: 60, + }, +} + +func init() { + home := os.Getenv("HOME") + if home == "" { + if user, err := user.Current(); err == nil { + home = user.HomeDir + } + } + if runtime.GOOS == "windows" { + DefaultConfig.Ethash.DatasetDir = filepath.Join(home, "AppData", "Ethash") + } else { + DefaultConfig.Ethash.DatasetDir = filepath.Join(home, ".ethash") + } +} + +//go:generate gencodec -type Config -field-override configMarshaling -formats toml -out gen_config.go + +type Config struct { + // The genesis block, which is inserted if the database is empty. + // If nil, the Ethereum main net block is used. + Genesis *core.Genesis `toml:",omitempty"` + + // Protocol options + NetworkId uint64 // Network ID to use for selecting peers to connect to + SyncMode downloader.SyncMode + NoPruning bool + + // Light client options + LightServ int `toml:",omitempty"` // Maximum percentage of time allowed for serving LES requests + LightPeers int `toml:",omitempty"` // Maximum number of LES client peers + + // Database options + SkipBcVersionCheck bool `toml:"-"` + DatabaseHandles int `toml:"-"` + DatabaseCache int + TrieCleanCache int + TrieDirtyCache int + TrieTimeout time.Duration + + // Mining-related options + Etherbase common.Address `toml:",omitempty"` + MinerNotify []string `toml:",omitempty"` + MinerExtraData []byte `toml:",omitempty"` + MinerGasFloor uint64 + MinerGasCeil uint64 + MinerGasPrice *big.Int + MinerRecommit time.Duration + MinerNoverify bool + + // Ethash options + Ethash ethash.Config + + // Transaction pool options + TxPool core.TxPoolConfig + + // Gas Price Oracle options + GPO gasprice.Config + + // Enables tracking of SHA3 preimages in the VM + EnablePreimageRecording bool + + // Miscellaneous options + DocRoot string `toml:"-"` + + // Type of the EWASM interpreter ("" for detault) + EWASMInterpreter string + // Type of the EVM interpreter ("" for default) + EVMInterpreter string +} + +type configMarshaling struct { + MinerExtraData hexutil.Bytes +} diff --git a/dex/handler.go b/dex/handler.go new file mode 100644 index 000000000..9f4522627 --- /dev/null +++ b/dex/handler.go @@ -0,0 +1,787 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "encoding/json" + "errors" + "fmt" + "math" + "math/big" + "sync" + "sync/atomic" + "time" + + "github.com/dexon-foundation/dexon/p2p/enode" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus" + "github.com/ethereum/go-ethereum/consensus/misc" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/eth/fetcher" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" +) + +const ( + softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data. + estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header + + // txChanSize is the size of channel listening to NewTxsEvent. + // The number is referenced from the size of tx pool. + txChanSize = 4096 +) + +var ( + daoChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the DAO handshake challenge +) + +// errIncompatibleConfig is returned if the requested protocols and configs are +// not compatible (low protocol version restrictions and high requirements). +var errIncompatibleConfig = errors.New("incompatible configuration") + +func errResp(code errCode, format string, v ...interface{}) error { + return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) +} + +type ProtocolManager struct { + networkID uint64 + + fastSync uint32 // Flag whether fast sync is enabled (gets disabled if we already have blocks) + acceptTxs uint32 // Flag whether we're considered synchronised (enables transaction processing) + + txpool txPool + blockchain *core.BlockChain + chainconfig *params.ChainConfig + maxPeers int + + downloader *downloader.Downloader + fetcher *fetcher.Fetcher + peers *peerSet + + SubProtocols []p2p.Protocol + + eventMux *event.TypeMux + txsCh chan core.NewTxsEvent + txsSub event.Subscription + minedBlockSub *event.TypeMuxSubscription + + // channels for fetcher, syncer, txsyncLoop + newPeerCh chan *peer + txsyncCh chan *txsync + quitSync chan struct{} + noMorePeers chan struct{} + + // wait group is used for graceful shutdowns during downloading + // and processing + wg sync.WaitGroup +} + +// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable +// with the Ethereum network. +func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { + // Create the protocol manager with the base fields + manager := &ProtocolManager{ + networkID: networkID, + eventMux: mux, + txpool: txpool, + blockchain: blockchain, + chainconfig: config, + peers: newPeerSet(), + newPeerCh: make(chan *peer), + noMorePeers: make(chan struct{}), + txsyncCh: make(chan *txsync), + quitSync: make(chan struct{}), + } + // Figure out whether to allow fast sync or not + if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 { + log.Warn("Blockchain not empty, fast sync disabled") + mode = downloader.FullSync + } + if mode == downloader.FastSync { + manager.fastSync = uint32(1) + } + // Initiate a sub-protocol for every implemented version we can handle + manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions)) + for i, version := range ProtocolVersions { + // Skip protocol version if incompatible with the mode of operation + if mode == downloader.FastSync && version < eth63 { + continue + } + // Compatible; initialise the sub-protocol + version := version // Closure for the run + manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{ + Name: ProtocolName, + Version: version, + Length: ProtocolLengths[i], + Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { + peer := manager.newPeer(int(version), p, rw) + select { + case manager.newPeerCh <- peer: + manager.wg.Add(1) + defer manager.wg.Done() + return manager.handle(peer) + case <-manager.quitSync: + return p2p.DiscQuitting + } + }, + NodeInfo: func() interface{} { + return manager.NodeInfo() + }, + PeerInfo: func(id enode.ID) interface{} { + if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil { + return p.Info() + } + return nil + }, + }) + } + if len(manager.SubProtocols) == 0 { + return nil, errIncompatibleConfig + } + // Construct the different synchronisation mechanisms + manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer) + + validator := func(header *types.Header) error { + return engine.VerifyHeader(blockchain, header, true) + } + heighter := func() uint64 { + return blockchain.CurrentBlock().NumberU64() + } + inserter := func(blocks types.Blocks) (int, error) { + // If fast sync is running, deny importing weird blocks + if atomic.LoadUint32(&manager.fastSync) == 1 { + log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash()) + return 0, nil + } + atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import + return manager.blockchain.InsertChain(blocks) + } + manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) + + return manager, nil +} + +func (pm *ProtocolManager) removePeer(id string) { + // Short circuit if the peer was already removed + peer := pm.peers.Peer(id) + if peer == nil { + return + } + log.Debug("Removing Ethereum peer", "peer", id) + + // Unregister the peer from the downloader and Ethereum peer set + pm.downloader.UnregisterPeer(id) + if err := pm.peers.Unregister(id); err != nil { + log.Error("Peer removal failed", "peer", id, "err", err) + } + // Hard disconnect at the networking layer + if peer != nil { + peer.Peer.Disconnect(p2p.DiscUselessPeer) + } +} + +func (pm *ProtocolManager) Start(maxPeers int) { + pm.maxPeers = maxPeers + + // broadcast transactions + pm.txsCh = make(chan core.NewTxsEvent, txChanSize) + pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh) + go pm.txBroadcastLoop() + + // broadcast mined blocks + pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{}) + go pm.minedBroadcastLoop() + + // start sync handlers + go pm.syncer() + go pm.txsyncLoop() +} + +func (pm *ProtocolManager) Stop() { + log.Info("Stopping Ethereum protocol") + + pm.txsSub.Unsubscribe() // quits txBroadcastLoop + pm.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop + + // Quit the sync loop. + // After this send has completed, no new peers will be accepted. + pm.noMorePeers <- struct{}{} + + // Quit fetcher, txsyncLoop. + close(pm.quitSync) + + // Disconnect existing sessions. + // This also closes the gate for any new registrations on the peer set. + // sessions which are already established but not added to pm.peers yet + // will exit when they try to register. + pm.peers.Close() + + // Wait for all peer handler goroutines and the loops to come down. + pm.wg.Wait() + + log.Info("Ethereum protocol stopped") +} + +func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { + return newPeer(pv, p, newMeteredMsgWriter(rw)) +} + +// handle is the callback invoked to manage the life cycle of an eth peer. When +// this function terminates, the peer is disconnected. +func (pm *ProtocolManager) handle(p *peer) error { + // Ignore maxPeers if this is a trusted peer + if pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted { + return p2p.DiscTooManyPeers + } + p.Log().Debug("Ethereum peer connected", "name", p.Name()) + + // Execute the Ethereum handshake + var ( + genesis = pm.blockchain.Genesis() + head = pm.blockchain.CurrentHeader() + hash = head.Hash() + number = head.Number.Uint64() + td = pm.blockchain.GetTd(hash, number) + ) + if err := p.Handshake(pm.networkID, td, hash, genesis.Hash()); err != nil { + p.Log().Debug("Ethereum handshake failed", "err", err) + return err + } + if rw, ok := p.rw.(*meteredMsgReadWriter); ok { + rw.Init(p.version) + } + // Register the peer locally + if err := pm.peers.Register(p); err != nil { + p.Log().Error("Ethereum peer registration failed", "err", err) + return err + } + defer pm.removePeer(p.id) + + // Register the peer in the downloader. If the downloader considers it banned, we disconnect + if err := pm.downloader.RegisterPeer(p.id, p.version, p); err != nil { + return err + } + // Propagate existing transactions. new transactions appearing + // after this will be sent via broadcasts. + pm.syncTransactions(p) + + // If we're DAO hard-fork aware, validate any remote peer with regard to the hard-fork + if daoBlock := pm.chainconfig.DAOForkBlock; daoBlock != nil { + // Request the peer's DAO fork header for extra-data validation + if err := p.RequestHeadersByNumber(daoBlock.Uint64(), 1, 0, false); err != nil { + return err + } + // Start a timer to disconnect if the peer doesn't reply in time + p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() { + p.Log().Debug("Timed out DAO fork-check, dropping") + pm.removePeer(p.id) + }) + // Make sure it's cleaned up if the peer dies off + defer func() { + if p.forkDrop != nil { + p.forkDrop.Stop() + p.forkDrop = nil + } + }() + } + // main loop. handle incoming messages. + for { + if err := pm.handleMsg(p); err != nil { + p.Log().Debug("Ethereum message handling failed", "err", err) + return err + } + } +} + +// handleMsg is invoked whenever an inbound message is received from a remote +// peer. The remote connection is torn down upon returning any error. +func (pm *ProtocolManager) handleMsg(p *peer) error { + // Read the next message from the remote peer, and ensure it's fully consumed + msg, err := p.rw.ReadMsg() + if err != nil { + return err + } + if msg.Size > ProtocolMaxMsgSize { + return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) + } + defer msg.Discard() + + // Handle the message depending on its contents + switch { + case msg.Code == StatusMsg: + // Status messages should never arrive after the handshake + return errResp(ErrExtraStatusMsg, "uncontrolled status message") + + // Block header query, collect the requested headers and reply + case msg.Code == GetBlockHeadersMsg: + // Decode the complex header query + var query getBlockHeadersData + if err := msg.Decode(&query); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + hashMode := query.Origin.Hash != (common.Hash{}) + first := true + maxNonCanonical := uint64(100) + + // Gather headers until the fetch or network limits is reached + var ( + bytes common.StorageSize + headers []*types.Header + unknown bool + ) + for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit && len(headers) < downloader.MaxHeaderFetch { + // Retrieve the next header satisfying the query + var origin *types.Header + if hashMode { + if first { + first = false + origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash) + if origin != nil { + query.Origin.Number = origin.Number.Uint64() + } + } else { + origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number) + } + } else { + origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number) + } + if origin == nil { + break + } + headers = append(headers, origin) + bytes += estHeaderRlpSize + + // Advance to the next header of the query + switch { + case hashMode && query.Reverse: + // Hash based traversal towards the genesis block + ancestor := query.Skip + 1 + if ancestor == 0 { + unknown = true + } else { + query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical) + unknown = (query.Origin.Hash == common.Hash{}) + } + case hashMode && !query.Reverse: + // Hash based traversal towards the leaf block + var ( + current = origin.Number.Uint64() + next = current + query.Skip + 1 + ) + if next <= current { + infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ") + p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos) + unknown = true + } else { + if header := pm.blockchain.GetHeaderByNumber(next); header != nil { + nextHash := header.Hash() + expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical) + if expOldHash == query.Origin.Hash { + query.Origin.Hash, query.Origin.Number = nextHash, next + } else { + unknown = true + } + } else { + unknown = true + } + } + case query.Reverse: + // Number based traversal towards the genesis block + if query.Origin.Number >= query.Skip+1 { + query.Origin.Number -= query.Skip + 1 + } else { + unknown = true + } + + case !query.Reverse: + // Number based traversal towards the leaf block + query.Origin.Number += query.Skip + 1 + } + } + return p.SendBlockHeaders(headers) + + case msg.Code == BlockHeadersMsg: + // A batch of headers arrived to one of our previous requests + var headers []*types.Header + if err := msg.Decode(&headers); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // If no headers were received, but we're expending a DAO fork check, maybe it's that + if len(headers) == 0 && p.forkDrop != nil { + // Possibly an empty reply to the fork header checks, sanity check TDs + verifyDAO := true + + // If we already have a DAO header, we can check the peer's TD against it. If + // the peer's ahead of this, it too must have a reply to the DAO check + if daoHeader := pm.blockchain.GetHeaderByNumber(pm.chainconfig.DAOForkBlock.Uint64()); daoHeader != nil { + if _, td := p.Head(); td.Cmp(pm.blockchain.GetTd(daoHeader.Hash(), daoHeader.Number.Uint64())) >= 0 { + verifyDAO = false + } + } + // If we're seemingly on the same chain, disable the drop timer + if verifyDAO { + p.Log().Debug("Seems to be on the same side of the DAO fork") + p.forkDrop.Stop() + p.forkDrop = nil + return nil + } + } + // Filter out any explicitly requested headers, deliver the rest to the downloader + filter := len(headers) == 1 + if filter { + // If it's a potential DAO fork check, validate against the rules + if p.forkDrop != nil && pm.chainconfig.DAOForkBlock.Cmp(headers[0].Number) == 0 { + // Disable the fork drop timer + p.forkDrop.Stop() + p.forkDrop = nil + + // Validate the header and either drop the peer or continue + if err := misc.VerifyDAOHeaderExtraData(pm.chainconfig, headers[0]); err != nil { + p.Log().Debug("Verified to be on the other side of the DAO fork, dropping") + return err + } + p.Log().Debug("Verified to be on the same side of the DAO fork") + return nil + } + // Irrelevant of the fork checks, send the header to the fetcher just in case + headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) + } + if len(headers) > 0 || !filter { + err := pm.downloader.DeliverHeaders(p.id, headers) + if err != nil { + log.Debug("Failed to deliver headers", "err", err) + } + } + + case msg.Code == GetBlockBodiesMsg: + // Decode the retrieval message + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err + } + // Gather blocks until the fetch or network limits is reached + var ( + hash common.Hash + bytes int + bodies []rlp.RawValue + ) + for bytes < softResponseLimit && len(bodies) < downloader.MaxBlockFetch { + // Retrieve the hash of the next block + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested block body, stopping if enough was found + if data := pm.blockchain.GetBodyRLP(hash); len(data) != 0 { + bodies = append(bodies, data) + bytes += len(data) + } + } + return p.SendBlockBodiesRLP(bodies) + + case msg.Code == BlockBodiesMsg: + // A batch of block bodies arrived to one of our previous requests + var request blockBodiesData + if err := msg.Decode(&request); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver them all to the downloader for queuing + transactions := make([][]*types.Transaction, len(request)) + uncles := make([][]*types.Header, len(request)) + + for i, body := range request { + transactions[i] = body.Transactions + uncles[i] = body.Uncles + } + // Filter out any explicitly requested bodies, deliver the rest to the downloader + filter := len(transactions) > 0 || len(uncles) > 0 + if filter { + transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now()) + } + if len(transactions) > 0 || len(uncles) > 0 || !filter { + err := pm.downloader.DeliverBodies(p.id, transactions, uncles) + if err != nil { + log.Debug("Failed to deliver bodies", "err", err) + } + } + + case p.version >= eth63 && msg.Code == GetNodeDataMsg: + // Decode the retrieval message + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err + } + // Gather state data until the fetch or network limits is reached + var ( + hash common.Hash + bytes int + data [][]byte + ) + for bytes < softResponseLimit && len(data) < downloader.MaxStateFetch { + // Retrieve the hash of the next state entry + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested state entry, stopping if enough was found + if entry, err := pm.blockchain.TrieNode(hash); err == nil { + data = append(data, entry) + bytes += len(entry) + } + } + return p.SendNodeData(data) + + case p.version >= eth63 && msg.Code == NodeDataMsg: + // A batch of node state data arrived to one of our previous requests + var data [][]byte + if err := msg.Decode(&data); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver all to the downloader + if err := pm.downloader.DeliverNodeData(p.id, data); err != nil { + log.Debug("Failed to deliver node state data", "err", err) + } + + case p.version >= eth63 && msg.Code == GetReceiptsMsg: + // Decode the retrieval message + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err + } + // Gather state data until the fetch or network limits is reached + var ( + hash common.Hash + bytes int + receipts []rlp.RawValue + ) + for bytes < softResponseLimit && len(receipts) < downloader.MaxReceiptFetch { + // Retrieve the hash of the next block + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested block's receipts, skipping if unknown to us + results := pm.blockchain.GetReceiptsByHash(hash) + if results == nil { + if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { + continue + } + } + // If known, encode and queue for response packet + if encoded, err := rlp.EncodeToBytes(results); err != nil { + log.Error("Failed to encode receipt", "err", err) + } else { + receipts = append(receipts, encoded) + bytes += len(encoded) + } + } + return p.SendReceiptsRLP(receipts) + + case p.version >= eth63 && msg.Code == ReceiptsMsg: + // A batch of receipts arrived to one of our previous requests + var receipts [][]*types.Receipt + if err := msg.Decode(&receipts); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Deliver all to the downloader + if err := pm.downloader.DeliverReceipts(p.id, receipts); err != nil { + log.Debug("Failed to deliver receipts", "err", err) + } + + case msg.Code == NewBlockHashesMsg: + var announces newBlockHashesData + if err := msg.Decode(&announces); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + // Mark the hashes as present at the remote node + for _, block := range announces { + p.MarkBlock(block.Hash) + } + // Schedule all the unknown hashes for retrieval + unknown := make(newBlockHashesData, 0, len(announces)) + for _, block := range announces { + if !pm.blockchain.HasBlock(block.Hash, block.Number) { + unknown = append(unknown, block) + } + } + for _, block := range unknown { + pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) + } + + case msg.Code == NewBlockMsg: + // Retrieve and decode the propagated block + var request newBlockData + if err := msg.Decode(&request); err != nil { + return errResp(ErrDecode, "%v: %v", msg, err) + } + request.Block.ReceivedAt = msg.ReceivedAt + request.Block.ReceivedFrom = p + + // Mark the peer as owning the block and schedule it for import + p.MarkBlock(request.Block.Hash()) + pm.fetcher.Enqueue(p.id, request.Block) + + // Assuming the block is importable by the peer, but possibly not yet done so, + // calculate the head hash and TD that the peer truly must have. + var ( + trueHead = request.Block.ParentHash() + trueTD = new(big.Int).Sub(request.TD, request.Block.Difficulty()) + ) + // Update the peers total difficulty if better than the previous + if _, td := p.Head(); trueTD.Cmp(td) > 0 { + p.SetHead(trueHead, trueTD) + + // Schedule a sync if above ours. Note, this will not fire a sync for a gap of + // a singe block (as the true TD is below the propagated block), however this + // scenario should easily be covered by the fetcher. + currentBlock := pm.blockchain.CurrentBlock() + if trueTD.Cmp(pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64())) > 0 { + go pm.synchronise(p) + } + } + + case msg.Code == TxMsg: + // Transactions arrived, make sure we have a valid and fresh chain to handle them + if atomic.LoadUint32(&pm.acceptTxs) == 0 { + break + } + // Transactions can be processed, parse all of them and deliver to the pool + var txs []*types.Transaction + if err := msg.Decode(&txs); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + for i, tx := range txs { + // Validate and mark the remote transaction + if tx == nil { + return errResp(ErrDecode, "transaction %d is nil", i) + } + p.MarkTransaction(tx.Hash()) + } + pm.txpool.AddRemotes(txs) + + default: + return errResp(ErrInvalidMsgCode, "%v", msg.Code) + } + return nil +} + +// BroadcastBlock will either propagate a block to a subset of it's peers, or +// will only announce it's availability (depending what's requested). +func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { + hash := block.Hash() + peers := pm.peers.PeersWithoutBlock(hash) + + // If propagation is requested, send to a subset of the peer + if propagate { + // Calculate the TD of the block (it's not imported yet, so block.Td is not valid) + var td *big.Int + if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil { + td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1)) + } else { + log.Error("Propagating dangling block", "number", block.Number(), "hash", hash) + return + } + // Send the block to a subset of our peers + transfer := peers[:int(math.Sqrt(float64(len(peers))))] + for _, peer := range transfer { + peer.AsyncSendNewBlock(block, td) + } + log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) + return + } + // Otherwise if the block is indeed in out own chain, announce it + if pm.blockchain.HasBlock(hash, block.NumberU64()) { + for _, peer := range peers { + peer.AsyncSendNewBlockHash(block) + } + log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt))) + } +} + +// BroadcastTxs will propagate a batch of transactions to all peers which are not known to +// already have the given transaction. +func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { + var txset = make(map[*peer]types.Transactions) + + // Broadcast transactions to a batch of peers not knowing about it + for _, tx := range txs { + peers := pm.peers.PeersWithoutTx(tx.Hash()) + for _, peer := range peers { + txset[peer] = append(txset[peer], tx) + } + log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers)) + } + // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] + for peer, txs := range txset { + peer.AsyncSendTransactions(txs) + } +} + +// Mined broadcast loop +func (pm *ProtocolManager) minedBroadcastLoop() { + // automatically stops if unsubscribe + for obj := range pm.minedBlockSub.Chan() { + if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok { + pm.BroadcastBlock(ev.Block, true) // First propagate block to peers + pm.BroadcastBlock(ev.Block, false) // Only then announce to the rest + } + } +} + +func (pm *ProtocolManager) txBroadcastLoop() { + for { + select { + case event := <-pm.txsCh: + pm.BroadcastTxs(event.Txs) + + // Err() channel will be closed when unsubscribing. + case <-pm.txsSub.Err(): + return + } + } +} + +// NodeInfo represents a short summary of the Ethereum sub-protocol metadata +// known about the host peer. +type NodeInfo struct { + Network uint64 `json:"network"` // Ethereum network ID (1=Frontier, 2=Morden, Ropsten=3, Rinkeby=4) + Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain + Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block + Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules + Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block +} + +// NodeInfo retrieves some protocol metadata about the running host node. +func (pm *ProtocolManager) NodeInfo() *NodeInfo { + currentBlock := pm.blockchain.CurrentBlock() + return &NodeInfo{ + Network: pm.networkID, + Difficulty: pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()), + Genesis: pm.blockchain.Genesis().Hash(), + Config: pm.blockchain.Config(), + Head: currentBlock.Hash(), + } +} diff --git a/dex/helper_test.go b/dex/helper_test.go new file mode 100644 index 000000000..3d2ab0aba --- /dev/null +++ b/dex/helper_test.go @@ -0,0 +1,201 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// This file contains some shares testing functionality, common to multiple +// different files and modules being tested. + +package eth + +import ( + "crypto/ecdsa" + "crypto/rand" + "math/big" + "sort" + "sync" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/consensus/ethash" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" + "github.com/ethereum/go-ethereum/params" +) + +var ( + testBankKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + testBank = crypto.PubkeyToAddress(testBankKey.PublicKey) +) + +// newTestProtocolManager creates a new protocol manager for testing purposes, +// with the given number of blocks already known, and potential notification +// channels for different events. +func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func(int, *core.BlockGen), newtx chan<- []*types.Transaction) (*ProtocolManager, *ethdb.MemDatabase, error) { + var ( + evmux = new(event.TypeMux) + engine = ethash.NewFaker() + db = ethdb.NewMemDatabase() + gspec = &core.Genesis{ + Config: params.TestChainConfig, + Alloc: core.GenesisAlloc{testBank: {Balance: big.NewInt(1000000)}}, + } + genesis = gspec.MustCommit(db) + blockchain, _ = core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{}) + ) + chain, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, blocks, generator) + if _, err := blockchain.InsertChain(chain); err != nil { + panic(err) + } + + pm, err := NewProtocolManager(gspec.Config, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db) + if err != nil { + return nil, nil, err + } + pm.Start(1000) + return pm, db, nil +} + +// newTestProtocolManagerMust creates a new protocol manager for testing purposes, +// with the given number of blocks already known, and potential notification +// channels for different events. In case of an error, the constructor force- +// fails the test. +func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks int, generator func(int, *core.BlockGen), newtx chan<- []*types.Transaction) (*ProtocolManager, *ethdb.MemDatabase) { + pm, db, err := newTestProtocolManager(mode, blocks, generator, newtx) + if err != nil { + t.Fatalf("Failed to create protocol manager: %v", err) + } + return pm, db +} + +// testTxPool is a fake, helper transaction pool for testing purposes +type testTxPool struct { + txFeed event.Feed + pool []*types.Transaction // Collection of all transactions + added chan<- []*types.Transaction // Notification channel for new transactions + + lock sync.RWMutex // Protects the transaction pool +} + +// AddRemotes appends a batch of transactions to the pool, and notifies any +// listeners if the addition channel is non nil +func (p *testTxPool) AddRemotes(txs []*types.Transaction) []error { + p.lock.Lock() + defer p.lock.Unlock() + + p.pool = append(p.pool, txs...) + if p.added != nil { + p.added <- txs + } + return make([]error, len(txs)) +} + +// Pending returns all the transactions known to the pool +func (p *testTxPool) Pending() (map[common.Address]types.Transactions, error) { + p.lock.RLock() + defer p.lock.RUnlock() + + batches := make(map[common.Address]types.Transactions) + for _, tx := range p.pool { + from, _ := types.Sender(types.HomesteadSigner{}, tx) + batches[from] = append(batches[from], tx) + } + for _, batch := range batches { + sort.Sort(types.TxByNonce(batch)) + } + return batches, nil +} + +func (p *testTxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { + return p.txFeed.Subscribe(ch) +} + +// newTestTransaction create a new dummy transaction. +func newTestTransaction(from *ecdsa.PrivateKey, nonce uint64, datasize int) *types.Transaction { + tx := types.NewTransaction(nonce, common.Address{}, big.NewInt(0), 100000, big.NewInt(0), make([]byte, datasize)) + tx, _ = types.SignTx(tx, types.HomesteadSigner{}, from) + return tx +} + +// testPeer is a simulated peer to allow testing direct network calls. +type testPeer struct { + net p2p.MsgReadWriter // Network layer reader/writer to simulate remote messaging + app *p2p.MsgPipeRW // Application layer reader/writer to simulate the local side + *peer +} + +// newTestPeer creates a new peer registered at the given protocol manager. +func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*testPeer, <-chan error) { + // Create a message pipe to communicate through + app, net := p2p.MsgPipe() + + // Generate a random id and create the peer + var id discover.NodeID + rand.Read(id[:]) + + peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net) + + // Start the peer on a new thread + errc := make(chan error, 1) + go func() { + select { + case pm.newPeerCh <- peer: + errc <- pm.handle(peer) + case <-pm.quitSync: + errc <- p2p.DiscQuitting + } + }() + tp := &testPeer{app: app, net: net, peer: peer} + // Execute any implicitly requested handshakes and return + if shake { + var ( + genesis = pm.blockchain.Genesis() + head = pm.blockchain.CurrentHeader() + td = pm.blockchain.GetTd(head.Hash(), head.Number.Uint64()) + ) + tp.handshake(nil, td, head.Hash(), genesis.Hash()) + } + return tp, errc +} + +// handshake simulates a trivial handshake that expects the same state from the +// remote side as we are simulating locally. +func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, genesis common.Hash) { + msg := &statusData{ + ProtocolVersion: uint32(p.version), + NetworkId: DefaultConfig.NetworkId, + TD: td, + CurrentBlock: head, + GenesisBlock: genesis, + } + if err := p2p.ExpectMsg(p.app, StatusMsg, msg); err != nil { + t.Fatalf("status recv: %v", err) + } + if err := p2p.Send(p.app, StatusMsg, msg); err != nil { + t.Fatalf("status send: %v", err) + } +} + +// close terminates the local side of the peer, notifying the remote protocol +// manager of termination. +func (p *testPeer) close() { + p.app.Close() +} diff --git a/dex/metrics.go b/dex/metrics.go new file mode 100644 index 000000000..0533a2a87 --- /dev/null +++ b/dex/metrics.go @@ -0,0 +1,139 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/p2p" +) + +var ( + propTxnInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/txns/in/packets", nil) + propTxnInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/txns/in/traffic", nil) + propTxnOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/txns/out/packets", nil) + propTxnOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/txns/out/traffic", nil) + propHashInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/hashes/in/packets", nil) + propHashInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/hashes/in/traffic", nil) + propHashOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/hashes/out/packets", nil) + propHashOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/hashes/out/traffic", nil) + propBlockInPacketsMeter = metrics.NewRegisteredMeter("eth/prop/blocks/in/packets", nil) + propBlockInTrafficMeter = metrics.NewRegisteredMeter("eth/prop/blocks/in/traffic", nil) + propBlockOutPacketsMeter = metrics.NewRegisteredMeter("eth/prop/blocks/out/packets", nil) + propBlockOutTrafficMeter = metrics.NewRegisteredMeter("eth/prop/blocks/out/traffic", nil) + reqHeaderInPacketsMeter = metrics.NewRegisteredMeter("eth/req/headers/in/packets", nil) + reqHeaderInTrafficMeter = metrics.NewRegisteredMeter("eth/req/headers/in/traffic", nil) + reqHeaderOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/headers/out/packets", nil) + reqHeaderOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/headers/out/traffic", nil) + reqBodyInPacketsMeter = metrics.NewRegisteredMeter("eth/req/bodies/in/packets", nil) + reqBodyInTrafficMeter = metrics.NewRegisteredMeter("eth/req/bodies/in/traffic", nil) + reqBodyOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/bodies/out/packets", nil) + reqBodyOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/bodies/out/traffic", nil) + reqStateInPacketsMeter = metrics.NewRegisteredMeter("eth/req/states/in/packets", nil) + reqStateInTrafficMeter = metrics.NewRegisteredMeter("eth/req/states/in/traffic", nil) + reqStateOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/states/out/packets", nil) + reqStateOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/states/out/traffic", nil) + reqReceiptInPacketsMeter = metrics.NewRegisteredMeter("eth/req/receipts/in/packets", nil) + reqReceiptInTrafficMeter = metrics.NewRegisteredMeter("eth/req/receipts/in/traffic", nil) + reqReceiptOutPacketsMeter = metrics.NewRegisteredMeter("eth/req/receipts/out/packets", nil) + reqReceiptOutTrafficMeter = metrics.NewRegisteredMeter("eth/req/receipts/out/traffic", nil) + miscInPacketsMeter = metrics.NewRegisteredMeter("eth/misc/in/packets", nil) + miscInTrafficMeter = metrics.NewRegisteredMeter("eth/misc/in/traffic", nil) + miscOutPacketsMeter = metrics.NewRegisteredMeter("eth/misc/out/packets", nil) + miscOutTrafficMeter = metrics.NewRegisteredMeter("eth/misc/out/traffic", nil) +) + +// meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of +// accumulating the above defined metrics based on the data stream contents. +type meteredMsgReadWriter struct { + p2p.MsgReadWriter // Wrapped message stream to meter + version int // Protocol version to select correct meters +} + +// newMeteredMsgWriter wraps a p2p MsgReadWriter with metering support. If the +// metrics system is disabled, this function returns the original object. +func newMeteredMsgWriter(rw p2p.MsgReadWriter) p2p.MsgReadWriter { + if !metrics.Enabled { + return rw + } + return &meteredMsgReadWriter{MsgReadWriter: rw} +} + +// Init sets the protocol version used by the stream to know which meters to +// increment in case of overlapping message ids between protocol versions. +func (rw *meteredMsgReadWriter) Init(version int) { + rw.version = version +} + +func (rw *meteredMsgReadWriter) ReadMsg() (p2p.Msg, error) { + // Read the message and short circuit in case of an error + msg, err := rw.MsgReadWriter.ReadMsg() + if err != nil { + return msg, err + } + // Account for the data traffic + packets, traffic := miscInPacketsMeter, miscInTrafficMeter + switch { + case msg.Code == BlockHeadersMsg: + packets, traffic = reqHeaderInPacketsMeter, reqHeaderInTrafficMeter + case msg.Code == BlockBodiesMsg: + packets, traffic = reqBodyInPacketsMeter, reqBodyInTrafficMeter + + case rw.version >= eth63 && msg.Code == NodeDataMsg: + packets, traffic = reqStateInPacketsMeter, reqStateInTrafficMeter + case rw.version >= eth63 && msg.Code == ReceiptsMsg: + packets, traffic = reqReceiptInPacketsMeter, reqReceiptInTrafficMeter + + case msg.Code == NewBlockHashesMsg: + packets, traffic = propHashInPacketsMeter, propHashInTrafficMeter + case msg.Code == NewBlockMsg: + packets, traffic = propBlockInPacketsMeter, propBlockInTrafficMeter + case msg.Code == TxMsg: + packets, traffic = propTxnInPacketsMeter, propTxnInTrafficMeter + } + packets.Mark(1) + traffic.Mark(int64(msg.Size)) + + return msg, err +} + +func (rw *meteredMsgReadWriter) WriteMsg(msg p2p.Msg) error { + // Account for the data traffic + packets, traffic := miscOutPacketsMeter, miscOutTrafficMeter + switch { + case msg.Code == BlockHeadersMsg: + packets, traffic = reqHeaderOutPacketsMeter, reqHeaderOutTrafficMeter + case msg.Code == BlockBodiesMsg: + packets, traffic = reqBodyOutPacketsMeter, reqBodyOutTrafficMeter + + case rw.version >= eth63 && msg.Code == NodeDataMsg: + packets, traffic = reqStateOutPacketsMeter, reqStateOutTrafficMeter + case rw.version >= eth63 && msg.Code == ReceiptsMsg: + packets, traffic = reqReceiptOutPacketsMeter, reqReceiptOutTrafficMeter + + case msg.Code == NewBlockHashesMsg: + packets, traffic = propHashOutPacketsMeter, propHashOutTrafficMeter + case msg.Code == NewBlockMsg: + packets, traffic = propBlockOutPacketsMeter, propBlockOutTrafficMeter + case msg.Code == TxMsg: + packets, traffic = propTxnOutPacketsMeter, propTxnOutTrafficMeter + } + packets.Mark(1) + traffic.Mark(int64(msg.Size)) + + // Send the packet to the p2p layer + return rw.MsgReadWriter.WriteMsg(msg) +} diff --git a/dex/peer.go b/dex/peer.go new file mode 100644 index 000000000..b5f450855 --- /dev/null +++ b/dex/peer.go @@ -0,0 +1,522 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "errors" + "fmt" + "math/big" + "sync" + "time" + + mapset "github.com/deckarep/golang-set" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" +) + +var ( + errClosed = errors.New("peer set is closed") + errAlreadyRegistered = errors.New("peer is already registered") + errNotRegistered = errors.New("peer is not registered") +) + +const ( + maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) + maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) + + // maxQueuedTxs is the maximum number of transaction lists to queue up before + // dropping broadcasts. This is a sensitive number as a transaction list might + // contain a single transaction, or thousands. + maxQueuedTxs = 128 + + // maxQueuedProps is the maximum number of block propagations to queue up before + // dropping broadcasts. There's not much point in queueing stale blocks, so a few + // that might cover uncles should be enough. + maxQueuedProps = 4 + + // maxQueuedAnns is the maximum number of block announcements to queue up before + // dropping broadcasts. Similarly to block propagations, there's no point to queue + // above some healthy uncle limit, so use that. + maxQueuedAnns = 4 + + handshakeTimeout = 5 * time.Second +) + +// PeerInfo represents a short summary of the Ethereum sub-protocol metadata known +// about a connected peer. +type PeerInfo struct { + Version int `json:"version"` // Ethereum protocol version negotiated + Difficulty *big.Int `json:"difficulty"` // Total difficulty of the peer's blockchain + Head string `json:"head"` // SHA3 hash of the peer's best owned block +} + +// propEvent is a block propagation, waiting for its turn in the broadcast queue. +type propEvent struct { + block *types.Block + td *big.Int +} + +type peer struct { + id string + + *p2p.Peer + rw p2p.MsgReadWriter + + version int // Protocol version negotiated + forkDrop *time.Timer // Timed connection dropper if forks aren't validated in time + + head common.Hash + td *big.Int + lock sync.RWMutex + + knownTxs mapset.Set // Set of transaction hashes known to be known by this peer + knownBlocks mapset.Set // Set of block hashes known to be known by this peer + queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer + queuedProps chan *propEvent // Queue of blocks to broadcast to the peer + queuedAnns chan *types.Block // Queue of blocks to announce to the peer + term chan struct{} // Termination channel to stop the broadcaster +} + +func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { + return &peer{ + Peer: p, + rw: rw, + version: version, + id: fmt.Sprintf("%x", p.ID().Bytes()[:8]), + knownTxs: mapset.NewSet(), + knownBlocks: mapset.NewSet(), + queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), + queuedProps: make(chan *propEvent, maxQueuedProps), + queuedAnns: make(chan *types.Block, maxQueuedAnns), + term: make(chan struct{}), + } +} + +// broadcast is a write loop that multiplexes block propagations, announcements +// and transaction broadcasts into the remote peer. The goal is to have an async +// writer that does not lock up node internals. +func (p *peer) broadcast() { + for { + select { + case txs := <-p.queuedTxs: + if err := p.SendTransactions(txs); err != nil { + return + } + p.Log().Trace("Broadcast transactions", "count", len(txs)) + + case prop := <-p.queuedProps: + if err := p.SendNewBlock(prop.block, prop.td); err != nil { + return + } + p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) + + case block := <-p.queuedAnns: + if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil { + return + } + p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) + + case <-p.term: + return + } + } +} + +// close signals the broadcast goroutine to terminate. +func (p *peer) close() { + close(p.term) +} + +// Info gathers and returns a collection of metadata known about a peer. +func (p *peer) Info() *PeerInfo { + hash, td := p.Head() + + return &PeerInfo{ + Version: p.version, + Difficulty: td, + Head: hash.Hex(), + } +} + +// Head retrieves a copy of the current head hash and total difficulty of the +// peer. +func (p *peer) Head() (hash common.Hash, td *big.Int) { + p.lock.RLock() + defer p.lock.RUnlock() + + copy(hash[:], p.head[:]) + return hash, new(big.Int).Set(p.td) +} + +// SetHead updates the head hash and total difficulty of the peer. +func (p *peer) SetHead(hash common.Hash, td *big.Int) { + p.lock.Lock() + defer p.lock.Unlock() + + copy(p.head[:], hash[:]) + p.td.Set(td) +} + +// MarkBlock marks a block as known for the peer, ensuring that the block will +// never be propagated to this particular peer. +func (p *peer) MarkBlock(hash common.Hash) { + // If we reached the memory allowance, drop a previously known block hash + for p.knownBlocks.Cardinality() >= maxKnownBlocks { + p.knownBlocks.Pop() + } + p.knownBlocks.Add(hash) +} + +// MarkTransaction marks a transaction as known for the peer, ensuring that it +// will never be propagated to this particular peer. +func (p *peer) MarkTransaction(hash common.Hash) { + // If we reached the memory allowance, drop a previously known transaction hash + for p.knownTxs.Cardinality() >= maxKnownTxs { + p.knownTxs.Pop() + } + p.knownTxs.Add(hash) +} + +// SendTransactions sends transactions to the peer and includes the hashes +// in its transaction hash set for future reference. +func (p *peer) SendTransactions(txs types.Transactions) error { + for _, tx := range txs { + p.knownTxs.Add(tx.Hash()) + } + return p2p.Send(p.rw, TxMsg, txs) +} + +// AsyncSendTransactions queues list of transactions propagation to a remote +// peer. If the peer's broadcast queue is full, the event is silently dropped. +func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { + select { + case p.queuedTxs <- txs: + for _, tx := range txs { + p.knownTxs.Add(tx.Hash()) + } + default: + p.Log().Debug("Dropping transaction propagation", "count", len(txs)) + } +} + +// SendNewBlockHashes announces the availability of a number of blocks through +// a hash notification. +func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { + for _, hash := range hashes { + p.knownBlocks.Add(hash) + } + request := make(newBlockHashesData, len(hashes)) + for i := 0; i < len(hashes); i++ { + request[i].Hash = hashes[i] + request[i].Number = numbers[i] + } + return p2p.Send(p.rw, NewBlockHashesMsg, request) +} + +// AsyncSendNewBlockHash queues the availability of a block for propagation to a +// remote peer. If the peer's broadcast queue is full, the event is silently +// dropped. +func (p *peer) AsyncSendNewBlockHash(block *types.Block) { + select { + case p.queuedAnns <- block: + p.knownBlocks.Add(block.Hash()) + default: + p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash()) + } +} + +// SendNewBlock propagates an entire block to a remote peer. +func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { + p.knownBlocks.Add(block.Hash()) + return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) +} + +// AsyncSendNewBlock queues an entire block for propagation to a remote peer. If +// the peer's broadcast queue is full, the event is silently dropped. +func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { + select { + case p.queuedProps <- &propEvent{block: block, td: td}: + p.knownBlocks.Add(block.Hash()) + default: + p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash()) + } +} + +// SendBlockHeaders sends a batch of block headers to the remote peer. +func (p *peer) SendBlockHeaders(headers []*types.Header) error { + return p2p.Send(p.rw, BlockHeadersMsg, headers) +} + +// SendBlockBodies sends a batch of block contents to the remote peer. +func (p *peer) SendBlockBodies(bodies []*blockBody) error { + return p2p.Send(p.rw, BlockBodiesMsg, blockBodiesData(bodies)) +} + +// SendBlockBodiesRLP sends a batch of block contents to the remote peer from +// an already RLP encoded format. +func (p *peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error { + return p2p.Send(p.rw, BlockBodiesMsg, bodies) +} + +// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the +// hashes requested. +func (p *peer) SendNodeData(data [][]byte) error { + return p2p.Send(p.rw, NodeDataMsg, data) +} + +// SendReceiptsRLP sends a batch of transaction receipts, corresponding to the +// ones requested from an already RLP encoded format. +func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error { + return p2p.Send(p.rw, ReceiptsMsg, receipts) +} + +// RequestOneHeader is a wrapper around the header query functions to fetch a +// single header. It is used solely by the fetcher. +func (p *peer) RequestOneHeader(hash common.Hash) error { + p.Log().Debug("Fetching single header", "hash", hash) + return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: uint64(1), Skip: uint64(0), Reverse: false}) +} + +// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the +// specified header query, based on the hash of an origin block. +func (p *peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error { + p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse) + return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) +} + +// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the +// specified header query, based on the number of an origin block. +func (p *peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { + p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse) + return p2p.Send(p.rw, GetBlockHeadersMsg, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) +} + +// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes +// specified. +func (p *peer) RequestBodies(hashes []common.Hash) error { + p.Log().Debug("Fetching batch of block bodies", "count", len(hashes)) + return p2p.Send(p.rw, GetBlockBodiesMsg, hashes) +} + +// RequestNodeData fetches a batch of arbitrary data from a node's known state +// data, corresponding to the specified hashes. +func (p *peer) RequestNodeData(hashes []common.Hash) error { + p.Log().Debug("Fetching batch of state data", "count", len(hashes)) + return p2p.Send(p.rw, GetNodeDataMsg, hashes) +} + +// RequestReceipts fetches a batch of transaction receipts from a remote node. +func (p *peer) RequestReceipts(hashes []common.Hash) error { + p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) + return p2p.Send(p.rw, GetReceiptsMsg, hashes) +} + +// Handshake executes the eth protocol handshake, negotiating version number, +// network IDs, difficulties, head and genesis blocks. +func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash) error { + // Send out own handshake in a new thread + errc := make(chan error, 2) + var status statusData // safe to read after two values have been received from errc + + go func() { + errc <- p2p.Send(p.rw, StatusMsg, &statusData{ + ProtocolVersion: uint32(p.version), + NetworkId: network, + TD: td, + CurrentBlock: head, + GenesisBlock: genesis, + }) + }() + go func() { + errc <- p.readStatus(network, &status, genesis) + }() + timeout := time.NewTimer(handshakeTimeout) + defer timeout.Stop() + for i := 0; i < 2; i++ { + select { + case err := <-errc: + if err != nil { + return err + } + case <-timeout.C: + return p2p.DiscReadTimeout + } + } + p.td, p.head = status.TD, status.CurrentBlock + return nil +} + +func (p *peer) readStatus(network uint64, status *statusData, genesis common.Hash) (err error) { + msg, err := p.rw.ReadMsg() + if err != nil { + return err + } + if msg.Code != StatusMsg { + return errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg) + } + if msg.Size > ProtocolMaxMsgSize { + return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) + } + // Decode the handshake and make sure everything matches + if err := msg.Decode(&status); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + if status.GenesisBlock != genesis { + return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", status.GenesisBlock[:8], genesis[:8]) + } + if status.NetworkId != network { + return errResp(ErrNetworkIdMismatch, "%d (!= %d)", status.NetworkId, network) + } + if int(status.ProtocolVersion) != p.version { + return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, p.version) + } + return nil +} + +// String implements fmt.Stringer. +func (p *peer) String() string { + return fmt.Sprintf("Peer %s [%s]", p.id, + fmt.Sprintf("eth/%2d", p.version), + ) +} + +// peerSet represents the collection of active peers currently participating in +// the Ethereum sub-protocol. +type peerSet struct { + peers map[string]*peer + lock sync.RWMutex + closed bool +} + +// newPeerSet creates a new peer set to track the active participants. +func newPeerSet() *peerSet { + return &peerSet{ + peers: make(map[string]*peer), + } +} + +// Register injects a new peer into the working set, or returns an error if the +// peer is already known. If a new peer it registered, its broadcast loop is also +// started. +func (ps *peerSet) Register(p *peer) error { + ps.lock.Lock() + defer ps.lock.Unlock() + + if ps.closed { + return errClosed + } + if _, ok := ps.peers[p.id]; ok { + return errAlreadyRegistered + } + ps.peers[p.id] = p + go p.broadcast() + + return nil +} + +// Unregister removes a remote peer from the active set, disabling any further +// actions to/from that particular entity. +func (ps *peerSet) Unregister(id string) error { + ps.lock.Lock() + defer ps.lock.Unlock() + + p, ok := ps.peers[id] + if !ok { + return errNotRegistered + } + delete(ps.peers, id) + p.close() + + return nil +} + +// Peer retrieves the registered peer with the given id. +func (ps *peerSet) Peer(id string) *peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return ps.peers[id] +} + +// Len returns if the current number of peers in the set. +func (ps *peerSet) Len() int { + ps.lock.RLock() + defer ps.lock.RUnlock() + + return len(ps.peers) +} + +// PeersWithoutBlock retrieves a list of peers that do not have a given block in +// their set of known hashes. +func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.knownBlocks.Contains(hash) { + list = append(list, p) + } + } + return list +} + +// PeersWithoutTx retrieves a list of peers that do not have a given transaction +// in their set of known hashes. +func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + list := make([]*peer, 0, len(ps.peers)) + for _, p := range ps.peers { + if !p.knownTxs.Contains(hash) { + list = append(list, p) + } + } + return list +} + +// BestPeer retrieves the known peer with the currently highest total difficulty. +func (ps *peerSet) BestPeer() *peer { + ps.lock.RLock() + defer ps.lock.RUnlock() + + var ( + bestPeer *peer + bestTd *big.Int + ) + for _, p := range ps.peers { + if _, td := p.Head(); bestPeer == nil || td.Cmp(bestTd) > 0 { + bestPeer, bestTd = p, td + } + } + return bestPeer +} + +// Close disconnects all peers. +// No new peers can be registered after Close has returned. +func (ps *peerSet) Close() { + ps.lock.Lock() + defer ps.lock.Unlock() + + for _, p := range ps.peers { + p.Disconnect(p2p.DiscQuitting) + } + ps.closed = true +} diff --git a/dex/protocol.go b/dex/protocol.go new file mode 100644 index 000000000..0e90e6a2e --- /dev/null +++ b/dex/protocol.go @@ -0,0 +1,183 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "fmt" + "io" + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/rlp" +) + +// Constants to match up protocol versions and messages +const ( + eth62 = 62 + eth63 = 63 +) + +// ProtocolName is the official short name of the protocol used during capability negotiation. +var ProtocolName = "eth" + +// ProtocolVersions are the upported versions of the eth protocol (first is primary). +var ProtocolVersions = []uint{eth63, eth62} + +// ProtocolLengths are the number of implemented message corresponding to different protocol versions. +var ProtocolLengths = []uint64{17, 8} + +const ProtocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message + +// eth protocol message codes +const ( + // Protocol messages belonging to eth/62 + StatusMsg = 0x00 + NewBlockHashesMsg = 0x01 + TxMsg = 0x02 + GetBlockHeadersMsg = 0x03 + BlockHeadersMsg = 0x04 + GetBlockBodiesMsg = 0x05 + BlockBodiesMsg = 0x06 + NewBlockMsg = 0x07 + + // Protocol messages belonging to eth/63 + GetNodeDataMsg = 0x0d + NodeDataMsg = 0x0e + GetReceiptsMsg = 0x0f + ReceiptsMsg = 0x10 +) + +type errCode int + +const ( + ErrMsgTooLarge = iota + ErrDecode + ErrInvalidMsgCode + ErrProtocolVersionMismatch + ErrNetworkIdMismatch + ErrGenesisBlockMismatch + ErrNoStatusMsg + ErrExtraStatusMsg + ErrSuspendedPeer +) + +func (e errCode) String() string { + return errorToString[int(e)] +} + +// XXX change once legacy code is out +var errorToString = map[int]string{ + ErrMsgTooLarge: "Message too long", + ErrDecode: "Invalid message", + ErrInvalidMsgCode: "Invalid message code", + ErrProtocolVersionMismatch: "Protocol version mismatch", + ErrNetworkIdMismatch: "NetworkId mismatch", + ErrGenesisBlockMismatch: "Genesis block mismatch", + ErrNoStatusMsg: "No status message", + ErrExtraStatusMsg: "Extra status message", + ErrSuspendedPeer: "Suspended peer", +} + +type txPool interface { + // AddRemotes should add the given transactions to the pool. + AddRemotes([]*types.Transaction) []error + + // Pending should return pending transactions. + // The slice should be modifiable by the caller. + Pending() (map[common.Address]types.Transactions, error) + + // SubscribeNewTxsEvent should return an event subscription of + // NewTxsEvent and send events to the given channel. + SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription +} + +// statusData is the network packet for the status message. +type statusData struct { + ProtocolVersion uint32 + NetworkId uint64 + TD *big.Int + CurrentBlock common.Hash + GenesisBlock common.Hash +} + +// newBlockHashesData is the network packet for the block announcements. +type newBlockHashesData []struct { + Hash common.Hash // Hash of one particular block being announced + Number uint64 // Number of one particular block being announced +} + +// getBlockHeadersData represents a block header query. +type getBlockHeadersData struct { + Origin hashOrNumber // Block from which to retrieve headers + Amount uint64 // Maximum number of headers to retrieve + Skip uint64 // Blocks to skip between consecutive headers + Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis) +} + +// hashOrNumber is a combined field for specifying an origin block. +type hashOrNumber struct { + Hash common.Hash // Block hash from which to retrieve headers (excludes Number) + Number uint64 // Block hash from which to retrieve headers (excludes Hash) +} + +// EncodeRLP is a specialized encoder for hashOrNumber to encode only one of the +// two contained union fields. +func (hn *hashOrNumber) EncodeRLP(w io.Writer) error { + if hn.Hash == (common.Hash{}) { + return rlp.Encode(w, hn.Number) + } + if hn.Number != 0 { + return fmt.Errorf("both origin hash (%x) and number (%d) provided", hn.Hash, hn.Number) + } + return rlp.Encode(w, hn.Hash) +} + +// DecodeRLP is a specialized decoder for hashOrNumber to decode the contents +// into either a block hash or a block number. +func (hn *hashOrNumber) DecodeRLP(s *rlp.Stream) error { + _, size, _ := s.Kind() + origin, err := s.Raw() + if err == nil { + switch { + case size == 32: + err = rlp.DecodeBytes(origin, &hn.Hash) + case size <= 8: + err = rlp.DecodeBytes(origin, &hn.Number) + default: + err = fmt.Errorf("invalid input size %d for origin", size) + } + } + return err +} + +// newBlockData is the network packet for the block propagation message. +type newBlockData struct { + Block *types.Block + TD *big.Int +} + +// blockBody represents the data content of a single block. +type blockBody struct { + Transactions []*types.Transaction // Transactions contained within a block + Uncles []*types.Header // Uncles contained within a block +} + +// blockBodiesData is the network packet for block content distribution. +type blockBodiesData []*blockBody diff --git a/dex/protocol_test.go b/dex/protocol_test.go new file mode 100644 index 000000000..aa43dfa92 --- /dev/null +++ b/dex/protocol_test.go @@ -0,0 +1,223 @@ +// Copyright 2014 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "fmt" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rlp" +) + +func init() { + // log.Root().SetHandler(log.LvlFilterHandler(log.LvlTrace, log.StreamHandler(os.Stderr, log.TerminalFormat(false)))) +} + +var testAccount, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + +// Tests that handshake failures are detected and reported correctly. +func TestStatusMsgErrors62(t *testing.T) { testStatusMsgErrors(t, 62) } +func TestStatusMsgErrors63(t *testing.T) { testStatusMsgErrors(t, 63) } + +func testStatusMsgErrors(t *testing.T, protocol int) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + var ( + genesis = pm.blockchain.Genesis() + head = pm.blockchain.CurrentHeader() + td = pm.blockchain.GetTd(head.Hash(), head.Number.Uint64()) + ) + defer pm.Stop() + + tests := []struct { + code uint64 + data interface{} + wantError error + }{ + { + code: TxMsg, data: []interface{}{}, + wantError: errResp(ErrNoStatusMsg, "first msg has code 2 (!= 0)"), + }, + { + code: StatusMsg, data: statusData{10, DefaultConfig.NetworkId, td, head.Hash(), genesis.Hash()}, + wantError: errResp(ErrProtocolVersionMismatch, "10 (!= %d)", protocol), + }, + { + code: StatusMsg, data: statusData{uint32(protocol), 999, td, head.Hash(), genesis.Hash()}, + wantError: errResp(ErrNetworkIdMismatch, "999 (!= 1)"), + }, + { + code: StatusMsg, data: statusData{uint32(protocol), DefaultConfig.NetworkId, td, head.Hash(), common.Hash{3}}, + wantError: errResp(ErrGenesisBlockMismatch, "0300000000000000 (!= %x)", genesis.Hash().Bytes()[:8]), + }, + } + + for i, test := range tests { + p, errc := newTestPeer("peer", protocol, pm, false) + // The send call might hang until reset because + // the protocol might not read the payload. + go p2p.Send(p.app, test.code, test.data) + + select { + case err := <-errc: + if err == nil { + t.Errorf("test %d: protocol returned nil error, want %q", i, test.wantError) + } else if err.Error() != test.wantError.Error() { + t.Errorf("test %d: wrong error: got %q, want %q", i, err, test.wantError) + } + case <-time.After(2 * time.Second): + t.Errorf("protocol did not shut down within 2 seconds") + } + p.close() + } +} + +// This test checks that received transactions are added to the local pool. +func TestRecvTransactions62(t *testing.T) { testRecvTransactions(t, 62) } +func TestRecvTransactions63(t *testing.T) { testRecvTransactions(t, 63) } + +func testRecvTransactions(t *testing.T, protocol int) { + txAdded := make(chan []*types.Transaction) + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, txAdded) + pm.acceptTxs = 1 // mark synced to accept transactions + p, _ := newTestPeer("peer", protocol, pm, true) + defer pm.Stop() + defer p.close() + + tx := newTestTransaction(testAccount, 0, 0) + if err := p2p.Send(p.app, TxMsg, []interface{}{tx}); err != nil { + t.Fatalf("send error: %v", err) + } + select { + case added := <-txAdded: + if len(added) != 1 { + t.Errorf("wrong number of added transactions: got %d, want 1", len(added)) + } else if added[0].Hash() != tx.Hash() { + t.Errorf("added wrong tx hash: got %v, want %v", added[0].Hash(), tx.Hash()) + } + case <-time.After(2 * time.Second): + t.Errorf("no NewTxsEvent received within 2 seconds") + } +} + +// This test checks that pending transactions are sent. +func TestSendTransactions62(t *testing.T) { testSendTransactions(t, 62) } +func TestSendTransactions63(t *testing.T) { testSendTransactions(t, 63) } + +func testSendTransactions(t *testing.T, protocol int) { + pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) + defer pm.Stop() + + // Fill the pool with big transactions. + const txsize = txsyncPackSize / 10 + alltxs := make([]*types.Transaction, 100) + for nonce := range alltxs { + alltxs[nonce] = newTestTransaction(testAccount, uint64(nonce), txsize) + } + pm.txpool.AddRemotes(alltxs) + + // Connect several peers. They should all receive the pending transactions. + var wg sync.WaitGroup + checktxs := func(p *testPeer) { + defer wg.Done() + defer p.close() + seen := make(map[common.Hash]bool) + for _, tx := range alltxs { + seen[tx.Hash()] = false + } + for n := 0; n < len(alltxs) && !t.Failed(); { + var txs []*types.Transaction + msg, err := p.app.ReadMsg() + if err != nil { + t.Errorf("%v: read error: %v", p.Peer, err) + } else if msg.Code != TxMsg { + t.Errorf("%v: got code %d, want TxMsg", p.Peer, msg.Code) + } + if err := msg.Decode(&txs); err != nil { + t.Errorf("%v: %v", p.Peer, err) + } + for _, tx := range txs { + hash := tx.Hash() + seentx, want := seen[hash] + if seentx { + t.Errorf("%v: got tx more than once: %x", p.Peer, hash) + } + if !want { + t.Errorf("%v: got unexpected tx: %x", p.Peer, hash) + } + seen[hash] = true + n++ + } + } + } + for i := 0; i < 3; i++ { + p, _ := newTestPeer(fmt.Sprintf("peer #%d", i), protocol, pm, true) + wg.Add(1) + go checktxs(p) + } + wg.Wait() +} + +// Tests that the custom union field encoder and decoder works correctly. +func TestGetBlockHeadersDataEncodeDecode(t *testing.T) { + // Create a "random" hash for testing + var hash common.Hash + for i := range hash { + hash[i] = byte(i) + } + // Assemble some table driven tests + tests := []struct { + packet *getBlockHeadersData + fail bool + }{ + // Providing the origin as either a hash or a number should both work + {fail: false, packet: &getBlockHeadersData{Origin: hashOrNumber{Number: 314}}}, + {fail: false, packet: &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}}}, + + // Providing arbitrary query field should also work + {fail: false, packet: &getBlockHeadersData{Origin: hashOrNumber{Number: 314}, Amount: 314, Skip: 1, Reverse: true}}, + {fail: false, packet: &getBlockHeadersData{Origin: hashOrNumber{Hash: hash}, Amount: 314, Skip: 1, Reverse: true}}, + + // Providing both the origin hash and origin number must fail + {fail: true, packet: &getBlockHeadersData{Origin: hashOrNumber{Hash: hash, Number: 314}}}, + } + // Iterate over each of the tests and try to encode and then decode + for i, tt := range tests { + bytes, err := rlp.EncodeToBytes(tt.packet) + if err != nil && !tt.fail { + t.Fatalf("test %d: failed to encode packet: %v", i, err) + } else if err == nil && tt.fail { + t.Fatalf("test %d: encode should have failed", i) + } + if !tt.fail { + packet := new(getBlockHeadersData) + if err := rlp.DecodeBytes(bytes, packet); err != nil { + t.Fatalf("test %d: failed to decode packet: %v", i, err) + } + if packet.Origin.Hash != tt.packet.Origin.Hash || packet.Origin.Number != tt.packet.Origin.Number || packet.Amount != tt.packet.Amount || + packet.Skip != tt.packet.Skip || packet.Reverse != tt.packet.Reverse { + t.Fatalf("test %d: encode decode mismatch: have %+v, want %+v", i, packet, tt.packet) + } + } + } +} diff --git a/dex/sync.go b/dex/sync.go new file mode 100644 index 000000000..e49e40087 --- /dev/null +++ b/dex/sync.go @@ -0,0 +1,217 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package eth + +import ( + "math/rand" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/downloader" + "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/discover" +) + +const ( + forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available + minDesiredPeerCount = 5 // Amount of peers desired to start syncing + + // This is the target size for the packs of transactions sent by txsyncLoop. + // A pack can get larger than this if a single transactions exceeds this size. + txsyncPackSize = 100 * 1024 +) + +type txsync struct { + p *peer + txs []*types.Transaction +} + +// syncTransactions starts sending all currently pending transactions to the given peer. +func (pm *ProtocolManager) syncTransactions(p *peer) { + var txs types.Transactions + pending, _ := pm.txpool.Pending() + for _, batch := range pending { + txs = append(txs, batch...) + } + if len(txs) == 0 { + return + } + select { + case pm.txsyncCh <- &txsync{p, txs}: + case <-pm.quitSync: + } +} + +// txsyncLoop takes care of the initial transaction sync for each new +// connection. When a new peer appears, we relay all currently pending +// transactions. In order to minimise egress bandwidth usage, we send +// the transactions in small packs to one peer at a time. +func (pm *ProtocolManager) txsyncLoop() { + var ( + pending = make(map[discover.NodeID]*txsync) + sending = false // whether a send is active + pack = new(txsync) // the pack that is being sent + done = make(chan error, 1) // result of the send + ) + + // send starts a sending a pack of transactions from the sync. + send := func(s *txsync) { + // Fill pack with transactions up to the target size. + size := common.StorageSize(0) + pack.p = s.p + pack.txs = pack.txs[:0] + for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ { + pack.txs = append(pack.txs, s.txs[i]) + size += s.txs[i].Size() + } + // Remove the transactions that will be sent. + s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])] + if len(s.txs) == 0 { + delete(pending, s.p.ID()) + } + // Send the pack in the background. + s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size) + sending = true + go func() { done <- pack.p.SendTransactions(pack.txs) }() + } + + // pick chooses the next pending sync. + pick := func() *txsync { + if len(pending) == 0 { + return nil + } + n := rand.Intn(len(pending)) + 1 + for _, s := range pending { + if n--; n == 0 { + return s + } + } + return nil + } + + for { + select { + case s := <-pm.txsyncCh: + pending[s.p.ID()] = s + if !sending { + send(s) + } + case err := <-done: + sending = false + // Stop tracking peers that cause send failures. + if err != nil { + pack.p.Log().Debug("Transaction send failed", "err", err) + delete(pending, pack.p.ID()) + } + // Schedule the next send. + if s := pick(); s != nil { + send(s) + } + case <-pm.quitSync: + return + } + } +} + +// syncer is responsible for periodically synchronising with the network, both +// downloading hashes and blocks as well as handling the announcement handler. +func (pm *ProtocolManager) syncer() { + // Start and ensure cleanup of sync mechanisms + pm.fetcher.Start() + defer pm.fetcher.Stop() + defer pm.downloader.Terminate() + + // Wait for different events to fire synchronisation operations + forceSync := time.NewTicker(forceSyncCycle) + defer forceSync.Stop() + + for { + select { + case <-pm.newPeerCh: + // Make sure we have peers to select from, then sync + if pm.peers.Len() < minDesiredPeerCount { + break + } + go pm.synchronise(pm.peers.BestPeer()) + + case <-forceSync.C: + // Force a sync even if not enough peers are present + go pm.synchronise(pm.peers.BestPeer()) + + case <-pm.noMorePeers: + return + } + } +} + +// synchronise tries to sync up our local block chain with a remote peer. +func (pm *ProtocolManager) synchronise(peer *peer) { + // Short circuit if no peers are available + if peer == nil { + return + } + // Make sure the peer's TD is higher than our own + currentBlock := pm.blockchain.CurrentBlock() + td := pm.blockchain.GetTd(currentBlock.Hash(), currentBlock.NumberU64()) + + pHead, pTd := peer.Head() + if pTd.Cmp(td) <= 0 { + return + } + // Otherwise try to sync with the downloader + mode := downloader.FullSync + if atomic.LoadUint32(&pm.fastSync) == 1 { + // Fast sync was explicitly requested, and explicitly granted + mode = downloader.FastSync + } else if currentBlock.NumberU64() == 0 && pm.blockchain.CurrentFastBlock().NumberU64() > 0 { + // The database seems empty as the current block is the genesis. Yet the fast + // block is ahead, so fast sync was enabled for this node at a certain point. + // The only scenario where this can happen is if the user manually (or via a + // bad block) rolled back a fast sync node below the sync point. In this case + // however it's safe to reenable fast sync. + atomic.StoreUint32(&pm.fastSync, 1) + mode = downloader.FastSync + } + + if mode == downloader.FastSync { + // Make sure the peer's total difficulty we are synchronizing is higher. + if pm.blockchain.GetTdByHash(pm.blockchain.CurrentFastBlock().Hash()).Cmp(pTd) >= 0 { + return + } + } + + // Run the sync cycle, and disable fast sync if we've went past the pivot block + if err := pm.downloader.Synchronise(peer.id, pHead, pTd, mode); err != nil { + return + } + if atomic.LoadUint32(&pm.fastSync) == 1 { + log.Info("Fast sync complete, auto disabling") + atomic.StoreUint32(&pm.fastSync, 0) + } + atomic.StoreUint32(&pm.acceptTxs, 1) // Mark initial sync done + if head := pm.blockchain.CurrentBlock(); head.NumberU64() > 0 { + // We've completed a sync cycle, notify all peers of new state. This path is + // essential in star-topology networks where a gateway node needs to notify + // all its out-of-date peers of the availability of a new block. This failure + // scenario will most often crop up in private and hackathon networks with + // degenerate connectivity, but it should be healthy for the mainnet too to + // more reliably update peers or the local TD state. + go pm.BroadcastBlock(head, false) + } +} |