diff options
author | Sonic <sonic@dexon.org> | 2019-05-06 14:52:18 +0800 |
---|---|---|
committer | Sonic <sonic@dexon.org> | 2019-05-06 14:52:18 +0800 |
commit | fb9741ac8feeab30be376a1574cb4e935f8d5ae2 (patch) | |
tree | 31dfbf9e565b62e590280f6458baa412e26eafd6 /les | |
parent | e069e2a2b92afab5cb4b238848225300822e7526 (diff) | |
download | dexon-fb9741ac8feeab30be376a1574cb4e935f8d5ae2.tar dexon-fb9741ac8feeab30be376a1574cb4e935f8d5ae2.tar.gz dexon-fb9741ac8feeab30be376a1574cb4e935f8d5ae2.tar.bz2 dexon-fb9741ac8feeab30be376a1574cb4e935f8d5ae2.tar.lz dexon-fb9741ac8feeab30be376a1574cb4e935f8d5ae2.tar.xz dexon-fb9741ac8feeab30be376a1574cb4e935f8d5ae2.tar.zst dexon-fb9741ac8feeab30be376a1574cb4e935f8d5ae2.zip |
les: rename package to lds
Diffstat (limited to 'les')
-rw-r--r-- | les/api_backend.go | 211 | ||||
-rw-r--r-- | les/backend.go | 262 | ||||
-rw-r--r-- | les/bloombits.go | 74 | ||||
-rw-r--r-- | les/commons.go | 120 | ||||
-rw-r--r-- | les/distributor.go | 283 | ||||
-rw-r--r-- | les/distributor_test.go | 185 | ||||
-rw-r--r-- | les/execqueue.go | 97 | ||||
-rw-r--r-- | les/execqueue_test.go | 62 | ||||
-rw-r--r-- | les/fetcher.go | 785 | ||||
-rw-r--r-- | les/flowcontrol/control.go | 183 | ||||
-rw-r--r-- | les/flowcontrol/manager.go | 224 | ||||
-rw-r--r-- | les/freeclient.go | 278 | ||||
-rw-r--r-- | les/freeclient_test.go | 139 | ||||
-rw-r--r-- | les/handler.go | 1250 | ||||
-rw-r--r-- | les/handler_test.go | 564 | ||||
-rw-r--r-- | les/helper_test.go | 445 | ||||
-rw-r--r-- | les/metrics.go | 111 | ||||
-rw-r--r-- | les/odr.go | 129 | ||||
-rw-r--r-- | les/odr_requests.go | 580 | ||||
-rw-r--r-- | les/odr_test.go | 204 | ||||
-rw-r--r-- | les/peer.go | 656 | ||||
-rw-r--r-- | les/protocol.go | 226 | ||||
-rw-r--r-- | les/randselect.go | 170 | ||||
-rw-r--r-- | les/randselect_test.go | 67 | ||||
-rw-r--r-- | les/request_test.go | 122 | ||||
-rw-r--r-- | les/retrieve.go | 414 | ||||
-rw-r--r-- | les/server.go | 387 | ||||
-rw-r--r-- | les/serverpool.go | 855 | ||||
-rw-r--r-- | les/sync.go | 79 | ||||
-rw-r--r-- | les/txrelay.go | 175 |
30 files changed, 0 insertions, 9337 deletions
diff --git a/les/api_backend.go b/les/api_backend.go deleted file mode 100644 index f69e67c60..000000000 --- a/les/api_backend.go +++ /dev/null @@ -1,211 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package les - -import ( - "context" - "math/big" - - "github.com/dexon-foundation/dexon/accounts" - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/common/math" - "github.com/dexon-foundation/dexon/core" - "github.com/dexon-foundation/dexon/core/bloombits" - "github.com/dexon-foundation/dexon/core/rawdb" - "github.com/dexon-foundation/dexon/core/state" - "github.com/dexon-foundation/dexon/core/types" - "github.com/dexon-foundation/dexon/core/vm" - "github.com/dexon-foundation/dexon/eth/gasprice" - "github.com/dexon-foundation/dexon/ethdb" - "github.com/dexon-foundation/dexon/event" - "github.com/dexon-foundation/dexon/internal/ethapi" - "github.com/dexon-foundation/dexon/light" - "github.com/dexon-foundation/dexon/params" - "github.com/dexon-foundation/dexon/rpc" -) - -type LesApiBackend struct { - eth *LightEthereum - gpo *gasprice.Oracle -} - -func (b *LesApiBackend) ChainConfig() *params.ChainConfig { - return b.eth.chainConfig -} - -func (b *LesApiBackend) CurrentBlock() *types.Block { - return types.NewBlockWithHeader(b.eth.BlockChain().CurrentHeader()) -} - -func (b *LesApiBackend) SetHead(number uint64) { - b.eth.protocolManager.downloader.Cancel() - b.eth.blockchain.SetHead(number) -} - -func (b *LesApiBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) { - if blockNr == rpc.LatestBlockNumber || blockNr == rpc.PendingBlockNumber { - return b.eth.blockchain.CurrentHeader(), nil - } - return b.eth.blockchain.GetHeaderByNumberOdr(ctx, uint64(blockNr)) -} - -func (b *LesApiBackend) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { - return b.eth.blockchain.GetHeaderByHash(hash), nil -} - -func (b *LesApiBackend) BlockByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Block, error) { - header, err := b.HeaderByNumber(ctx, blockNr) - if header == nil || err != nil { - return nil, err - } - return b.GetBlock(ctx, header.Hash()) -} - -func (b *LesApiBackend) StateAndHeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*state.StateDB, *types.Header, error) { - header, err := b.HeaderByNumber(ctx, blockNr) - if header == nil || err != nil { - return nil, nil, err - } - return light.NewState(ctx, header, b.eth.odr), header, nil -} - -func (b *LesApiBackend) GetBlock(ctx context.Context, blockHash common.Hash) (*types.Block, error) { - return b.eth.blockchain.GetBlockByHash(ctx, blockHash) -} - -func (b *LesApiBackend) GetReceipts(ctx context.Context, hash common.Hash) (types.Receipts, error) { - if number := rawdb.ReadHeaderNumber(b.eth.chainDb, hash); number != nil { - return light.GetBlockReceipts(ctx, b.eth.odr, hash, *number) - } - return nil, nil -} - -func (b *LesApiBackend) GetLogs(ctx context.Context, hash common.Hash) ([][]*types.Log, error) { - if number := rawdb.ReadHeaderNumber(b.eth.chainDb, hash); number != nil { - return light.GetBlockLogs(ctx, b.eth.odr, hash, *number) - } - return nil, nil -} - -func (b *LesApiBackend) GetTd(hash common.Hash) *big.Int { - return b.eth.blockchain.GetTdByHash(hash) -} - -func (b *LesApiBackend) GetEVM(ctx context.Context, msg core.Message, state *state.StateDB, header *types.Header) (*vm.EVM, func() error, error) { - state.SetBalance(msg.From(), math.MaxBig256) - context := core.NewEVMContext(msg, header, b.eth.blockchain, nil) - return vm.NewEVM(context, state, b.eth.chainConfig, vm.Config{}), state.Error, nil -} - -func (b *LesApiBackend) SendTx(ctx context.Context, signedTx *types.Transaction) error { - return b.eth.txPool.Add(ctx, signedTx) -} - -func (b *LesApiBackend) SendTxs(ctx context.Context, signedTxs []*types.Transaction) []error { - b.eth.txPool.AddBatch(ctx, signedTxs) - return nil -} - -func (b *LesApiBackend) RemoveTx(txHash common.Hash) { - b.eth.txPool.RemoveTx(txHash) -} - -func (b *LesApiBackend) GetPoolTransactions() (types.Transactions, error) { - return b.eth.txPool.GetTransactions() -} - -func (b *LesApiBackend) GetPoolTransaction(txHash common.Hash) *types.Transaction { - return b.eth.txPool.GetTransaction(txHash) -} - -func (b *LesApiBackend) GetPoolNonce(ctx context.Context, addr common.Address) (uint64, error) { - return b.eth.txPool.GetNonce(ctx, addr) -} - -func (b *LesApiBackend) Stats() (pending int, queued int) { - return b.eth.txPool.Stats(), 0 -} - -func (b *LesApiBackend) TxPoolContent() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) { - return b.eth.txPool.Content() -} - -func (b *LesApiBackend) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription { - return b.eth.txPool.SubscribeNewTxsEvent(ch) -} - -func (b *LesApiBackend) SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription { - return b.eth.blockchain.SubscribeChainEvent(ch) -} - -func (b *LesApiBackend) SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription { - return b.eth.blockchain.SubscribeChainHeadEvent(ch) -} - -func (b *LesApiBackend) SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription { - return b.eth.blockchain.SubscribeChainSideEvent(ch) -} - -func (b *LesApiBackend) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription { - return b.eth.blockchain.SubscribeLogsEvent(ch) -} - -func (b *LesApiBackend) SubscribeRemovedLogsEvent(ch chan<- core.RemovedLogsEvent) event.Subscription { - return b.eth.blockchain.SubscribeRemovedLogsEvent(ch) -} - -func (b *LesApiBackend) Downloader() ethapi.Downloader { - return b.eth.Downloader() -} - -func (b *LesApiBackend) ProtocolVersion() int { - return b.eth.LesVersion() + 10000 -} - -func (b *LesApiBackend) SuggestPrice(ctx context.Context) (*big.Int, error) { - return b.gpo.SuggestPrice(ctx) -} - -func (b *LesApiBackend) ChainDb() ethdb.Database { - return b.eth.chainDb -} - -func (b *LesApiBackend) EventMux() *event.TypeMux { - return b.eth.eventMux -} - -func (b *LesApiBackend) AccountManager() *accounts.Manager { - return b.eth.accountManager -} - -func (b *LesApiBackend) RPCGasCap() *big.Int { - return b.eth.config.RPCGasCap -} - -func (b *LesApiBackend) BloomStatus() (uint64, uint64) { - if b.eth.bloomIndexer == nil { - return 0, 0 - } - sections, _, _ := b.eth.bloomIndexer.Sections() - return params.BloomBitsBlocksClient, sections -} - -func (b *LesApiBackend) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) { - for i := 0; i < bloomFilterThreads; i++ { - go session.Multiplex(bloomRetrievalBatch, bloomRetrievalWait, b.eth.bloomRequests) - } -} diff --git a/les/backend.go b/les/backend.go deleted file mode 100644 index b56222bba..000000000 --- a/les/backend.go +++ /dev/null @@ -1,262 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package les implements the Light Ethereum Subprotocol. -package les - -import ( - "fmt" - "sync" - "time" - - "github.com/dexon-foundation/dexon/accounts" - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/common/hexutil" - "github.com/dexon-foundation/dexon/consensus" - "github.com/dexon-foundation/dexon/core" - "github.com/dexon-foundation/dexon/core/bloombits" - "github.com/dexon-foundation/dexon/core/rawdb" - "github.com/dexon-foundation/dexon/core/types" - "github.com/dexon-foundation/dexon/eth" - "github.com/dexon-foundation/dexon/eth/downloader" - "github.com/dexon-foundation/dexon/eth/filters" - "github.com/dexon-foundation/dexon/eth/gasprice" - "github.com/dexon-foundation/dexon/event" - "github.com/dexon-foundation/dexon/internal/ethapi" - "github.com/dexon-foundation/dexon/light" - "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/node" - "github.com/dexon-foundation/dexon/p2p" - "github.com/dexon-foundation/dexon/p2p/discv5" - "github.com/dexon-foundation/dexon/params" - rpc "github.com/dexon-foundation/dexon/rpc" -) - -type LightEthereum struct { - lesCommons - - odr *LesOdr - relay *LesTxRelay - chainConfig *params.ChainConfig - // Channel for shutting down the service - shutdownChan chan bool - - // Handlers - peers *peerSet - txPool *light.TxPool - blockchain *light.LightChain - serverPool *serverPool - reqDist *requestDistributor - retriever *retrieveManager - - bloomRequests chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests - bloomIndexer *core.ChainIndexer - - ApiBackend *LesApiBackend - - eventMux *event.TypeMux - engine consensus.Engine - accountManager *accounts.Manager - - networkId uint64 - netRPCService *ethapi.PublicNetAPI - - wg sync.WaitGroup -} - -func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) { - chainDb, err := eth.CreateDB(ctx, config, "lightchaindata") - if err != nil { - return nil, err - } - chainConfig, genesisHash, genesisErr := core.SetupGenesisBlockWithOverride(chainDb, config.Genesis, config.ConstantinopleOverride) - if _, isCompat := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !isCompat { - return nil, genesisErr - } - log.Info("Initialised chain configuration", "config", chainConfig) - - peers := newPeerSet() - quitSync := make(chan struct{}) - - leth := &LightEthereum{ - lesCommons: lesCommons{ - chainDb: chainDb, - config: config, - iConfig: light.DefaultClientIndexerConfig, - }, - chainConfig: chainConfig, - eventMux: ctx.EventMux, - peers: peers, - reqDist: newRequestDistributor(peers, quitSync), - accountManager: ctx.AccountManager, - engine: eth.CreateConsensusEngine(ctx, chainConfig, &config.Ethash, nil, false, chainDb), - shutdownChan: make(chan bool), - networkId: config.NetworkId, - bloomRequests: make(chan chan *bloombits.Retrieval), - bloomIndexer: eth.NewBloomIndexer(chainDb, params.BloomBitsBlocksClient, params.HelperTrieConfirmations), - } - - leth.relay = NewLesTxRelay(peers, leth.reqDist) - leth.serverPool = newServerPool(chainDb, quitSync, &leth.wg) - leth.retriever = newRetrieveManager(peers, leth.reqDist, leth.serverPool) - - leth.odr = NewLesOdr(chainDb, light.DefaultClientIndexerConfig, leth.retriever) - leth.chtIndexer = light.NewChtIndexer(chainDb, leth.odr, params.CHTFrequencyClient, params.HelperTrieConfirmations) - leth.bloomTrieIndexer = light.NewBloomTrieIndexer(chainDb, leth.odr, params.BloomBitsBlocksClient, params.BloomTrieFrequency) - leth.odr.SetIndexers(leth.chtIndexer, leth.bloomTrieIndexer, leth.bloomIndexer) - - // Note: NewLightChain adds the trusted checkpoint so it needs an ODR with - // indexers already set but not started yet - if leth.blockchain, err = light.NewLightChain(leth.odr, leth.chainConfig, leth.engine); err != nil { - return nil, err - } - // Note: AddChildIndexer starts the update process for the child - leth.bloomIndexer.AddChildIndexer(leth.bloomTrieIndexer) - leth.chtIndexer.Start(leth.blockchain) - leth.bloomIndexer.Start(leth.blockchain) - - // Rewind the chain in case of an incompatible config upgrade. - if compat, ok := genesisErr.(*params.ConfigCompatError); ok { - log.Warn("Rewinding chain to upgrade configuration", "err", compat) - leth.blockchain.SetHead(compat.RewindTo) - rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig) - } - - leth.txPool = light.NewTxPool(leth.chainConfig, leth.blockchain, leth.relay) - if leth.protocolManager, err = NewProtocolManager(leth.chainConfig, light.DefaultClientIndexerConfig, true, config.NetworkId, leth.eventMux, leth.engine, leth.peers, leth.blockchain, nil, chainDb, leth.odr, leth.relay, leth.serverPool, quitSync, &leth.wg); err != nil { - return nil, err - } - leth.ApiBackend = &LesApiBackend{leth, nil} - gpoParams := config.GPO - if gpoParams.Default == nil { - gpoParams.Default = config.MinerGasPrice - } - leth.ApiBackend.gpo = gasprice.NewOracle(leth.ApiBackend, gpoParams) - return leth, nil -} - -func lesTopic(genesisHash common.Hash, protocolVersion uint) discv5.Topic { - var name string - switch protocolVersion { - case lpv1: - name = "LES" - case lpv2: - name = "LES2" - default: - panic(nil) - } - return discv5.Topic(name + "@" + common.Bytes2Hex(genesisHash.Bytes()[0:8])) -} - -type LightDummyAPI struct{} - -// Etherbase is the address that mining rewards will be send to -func (s *LightDummyAPI) Etherbase() (common.Address, error) { - return common.Address{}, fmt.Errorf("not supported") -} - -// Coinbase is the address that mining rewards will be send to (alias for Etherbase) -func (s *LightDummyAPI) Coinbase() (common.Address, error) { - return common.Address{}, fmt.Errorf("not supported") -} - -// Hashrate returns the POW hashrate -func (s *LightDummyAPI) Hashrate() hexutil.Uint { - return 0 -} - -// Mining returns an indication if this node is currently mining. -func (s *LightDummyAPI) Mining() bool { - return false -} - -// APIs returns the collection of RPC services the ethereum package offers. -// NOTE, some of these services probably need to be moved to somewhere else. -func (s *LightEthereum) APIs() []rpc.API { - return append(ethapi.GetAPIs(s.ApiBackend), []rpc.API{ - { - Namespace: "eth", - Version: "1.0", - Service: &LightDummyAPI{}, - Public: true, - }, { - Namespace: "eth", - Version: "1.0", - Service: downloader.NewPublicDownloaderAPI(s.protocolManager.downloader, s.eventMux), - Public: true, - }, { - Namespace: "eth", - Version: "1.0", - Service: filters.NewPublicFilterAPI(s.ApiBackend, true), - Public: true, - }, { - Namespace: "net", - Version: "1.0", - Service: s.netRPCService, - Public: true, - }, - }...) -} - -func (s *LightEthereum) ResetWithGenesisBlock(gb *types.Block) { - s.blockchain.ResetWithGenesisBlock(gb) -} - -func (s *LightEthereum) BlockChain() *light.LightChain { return s.blockchain } -func (s *LightEthereum) TxPool() *light.TxPool { return s.txPool } -func (s *LightEthereum) Engine() consensus.Engine { return s.engine } -func (s *LightEthereum) LesVersion() int { return int(ClientProtocolVersions[0]) } -func (s *LightEthereum) Downloader() ethapi.Downloader { return s.protocolManager.downloader } -func (s *LightEthereum) EventMux() *event.TypeMux { return s.eventMux } - -// Protocols implements node.Service, returning all the currently configured -// network protocols to start. -func (s *LightEthereum) Protocols() []p2p.Protocol { - return s.makeProtocols(ClientProtocolVersions) -} - -// Start implements node.Service, starting all internal goroutines needed by the -// Ethereum protocol implementation. -func (s *LightEthereum) Start(srvr *p2p.Server) error { - log.Warn("Light client mode is an experimental feature") - s.startBloomHandlers(params.BloomBitsBlocksClient) - s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.networkId) - // clients are searching for the first advertised protocol in the list - protocolVersion := AdvertiseProtocolVersions[0] - s.serverPool.start(srvr, lesTopic(s.blockchain.Genesis().Hash(), protocolVersion)) - s.protocolManager.Start(s.config.LightPeers) - return nil -} - -// Stop implements node.Service, terminating all internal goroutines used by the -// Ethereum protocol. -func (s *LightEthereum) Stop() error { - s.odr.Stop() - s.bloomIndexer.Close() - s.chtIndexer.Close() - s.blockchain.Stop() - s.protocolManager.Stop() - s.txPool.Stop() - s.engine.Close() - - s.eventMux.Stop() - - time.Sleep(time.Millisecond * 200) - s.chainDb.Close() - close(s.shutdownChan) - - return nil -} diff --git a/les/bloombits.go b/les/bloombits.go deleted file mode 100644 index 6d0142c21..000000000 --- a/les/bloombits.go +++ /dev/null @@ -1,74 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package les - -import ( - "time" - - "github.com/dexon-foundation/dexon/common/bitutil" - "github.com/dexon-foundation/dexon/light" -) - -const ( - // bloomServiceThreads is the number of goroutines used globally by an Ethereum - // instance to service bloombits lookups for all running filters. - bloomServiceThreads = 16 - - // bloomFilterThreads is the number of goroutines used locally per filter to - // multiplex requests onto the global servicing goroutines. - bloomFilterThreads = 3 - - // bloomRetrievalBatch is the maximum number of bloom bit retrievals to service - // in a single batch. - bloomRetrievalBatch = 16 - - // bloomRetrievalWait is the maximum time to wait for enough bloom bit requests - // to accumulate request an entire batch (avoiding hysteresis). - bloomRetrievalWait = time.Microsecond * 100 -) - -// startBloomHandlers starts a batch of goroutines to accept bloom bit database -// retrievals from possibly a range of filters and serving the data to satisfy. -func (eth *LightEthereum) startBloomHandlers(sectionSize uint64) { - for i := 0; i < bloomServiceThreads; i++ { - go func() { - for { - select { - case <-eth.shutdownChan: - return - - case request := <-eth.bloomRequests: - task := <-request - task.Bitsets = make([][]byte, len(task.Sections)) - compVectors, err := light.GetBloomBits(task.Context, eth.odr, task.Bit, task.Sections) - if err == nil { - for i := range task.Sections { - if blob, err := bitutil.DecompressBytes(compVectors[i], int(sectionSize/8)); err == nil { - task.Bitsets[i] = blob - } else { - task.Error = err - } - } - } else { - task.Error = err - } - request <- task - } - } - }() - } -} diff --git a/les/commons.go b/les/commons.go deleted file mode 100644 index 0d0dbca7e..000000000 --- a/les/commons.go +++ /dev/null @@ -1,120 +0,0 @@ -// Copyright 2018 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package les - -import ( - "fmt" - "math/big" - - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/core" - "github.com/dexon-foundation/dexon/eth" - "github.com/dexon-foundation/dexon/ethdb" - "github.com/dexon-foundation/dexon/light" - "github.com/dexon-foundation/dexon/p2p" - "github.com/dexon-foundation/dexon/p2p/enode" - "github.com/dexon-foundation/dexon/params" -) - -// lesCommons contains fields needed by both server and client. -type lesCommons struct { - config *eth.Config - iConfig *light.IndexerConfig - chainDb ethdb.Database - protocolManager *ProtocolManager - chtIndexer, bloomTrieIndexer *core.ChainIndexer -} - -// NodeInfo represents a short summary of the Ethereum sub-protocol metadata -// known about the host peer. -type NodeInfo struct { - Network uint64 `json:"network"` // Ethereum network ID (1=Frontier, 2=Morden, Ropsten=3, Rinkeby=4) - Difficulty *big.Int `json:"difficulty"` // Total difficulty of the host's blockchain - Genesis common.Hash `json:"genesis"` // SHA3 hash of the host's genesis block - Config *params.ChainConfig `json:"config"` // Chain configuration for the fork rules - Head common.Hash `json:"head"` // SHA3 hash of the host's best owned block - CHT params.TrustedCheckpoint `json:"cht"` // Trused CHT checkpoint for fast catchup -} - -// makeProtocols creates protocol descriptors for the given LES versions. -func (c *lesCommons) makeProtocols(versions []uint) []p2p.Protocol { - protos := make([]p2p.Protocol, len(versions)) - for i, version := range versions { - version := version - protos[i] = p2p.Protocol{ - Name: "les", - Version: version, - Length: ProtocolLengths[version], - NodeInfo: c.nodeInfo, - Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { - return c.protocolManager.runPeer(version, p, rw) - }, - PeerInfo: func(id enode.ID) interface{} { - if p := c.protocolManager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil { - return p.Info() - } - return nil - }, - } - } - return protos -} - -// nodeInfo retrieves some protocol metadata about the running host node. -func (c *lesCommons) nodeInfo() interface{} { - var cht params.TrustedCheckpoint - sections, _, _ := c.chtIndexer.Sections() - sections2, _, _ := c.bloomTrieIndexer.Sections() - - if !c.protocolManager.lightSync { - // convert to client section size if running in server mode - sections /= c.iConfig.PairChtSize / c.iConfig.ChtSize - } - - if sections2 < sections { - sections = sections2 - } - if sections > 0 { - sectionIndex := sections - 1 - sectionHead := c.bloomTrieIndexer.SectionHead(sectionIndex) - var chtRoot common.Hash - if c.protocolManager.lightSync { - chtRoot = light.GetChtRoot(c.chainDb, sectionIndex, sectionHead) - } else { - idxV2 := (sectionIndex+1)*c.iConfig.PairChtSize/c.iConfig.ChtSize - 1 - chtRoot = light.GetChtRoot(c.chainDb, idxV2, sectionHead) - } - cht = params.TrustedCheckpoint{ - SectionIndex: sectionIndex, - SectionHead: sectionHead, - CHTRoot: chtRoot, - BloomRoot: light.GetBloomTrieRoot(c.chainDb, sectionIndex, sectionHead), - } - } - - chain := c.protocolManager.blockchain - head := chain.CurrentHeader() - hash := head.Hash() - return &NodeInfo{ - Network: c.config.NetworkId, - Difficulty: chain.GetTd(hash, head.Number.Uint64()), - Genesis: chain.Genesis().Hash(), - Config: chain.Config(), - Head: chain.CurrentHeader().Hash(), - CHT: cht, - } -} diff --git a/les/distributor.go b/les/distributor.go deleted file mode 100644 index f90765b62..000000000 --- a/les/distributor.go +++ /dev/null @@ -1,283 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package light implements on-demand retrieval capable state and chain objects -// for the Ethereum Light Client. -package les - -import ( - "container/list" - "sync" - "time" -) - -// requestDistributor implements a mechanism that distributes requests to -// suitable peers, obeying flow control rules and prioritizing them in creation -// order (even when a resend is necessary). -type requestDistributor struct { - reqQueue *list.List - lastReqOrder uint64 - peers map[distPeer]struct{} - peerLock sync.RWMutex - stopChn, loopChn chan struct{} - loopNextSent bool - lock sync.Mutex -} - -// distPeer is an LES server peer interface for the request distributor. -// waitBefore returns either the necessary waiting time before sending a request -// with the given upper estimated cost or the estimated remaining relative buffer -// value after sending such a request (in which case the request can be sent -// immediately). At least one of these values is always zero. -type distPeer interface { - waitBefore(uint64) (time.Duration, float64) - canQueue() bool - queueSend(f func()) -} - -// distReq is the request abstraction used by the distributor. It is based on -// three callback functions: -// - getCost returns the upper estimate of the cost of sending the request to a given peer -// - canSend tells if the server peer is suitable to serve the request -// - request prepares sending the request to the given peer and returns a function that -// does the actual sending. Request order should be preserved but the callback itself should not -// block until it is sent because other peers might still be able to receive requests while -// one of them is blocking. Instead, the returned function is put in the peer's send queue. -type distReq struct { - getCost func(distPeer) uint64 - canSend func(distPeer) bool - request func(distPeer) func() - - reqOrder uint64 - sentChn chan distPeer - element *list.Element -} - -// newRequestDistributor creates a new request distributor -func newRequestDistributor(peers *peerSet, stopChn chan struct{}) *requestDistributor { - d := &requestDistributor{ - reqQueue: list.New(), - loopChn: make(chan struct{}, 2), - stopChn: stopChn, - peers: make(map[distPeer]struct{}), - } - if peers != nil { - peers.notify(d) - } - go d.loop() - return d -} - -// registerPeer implements peerSetNotify -func (d *requestDistributor) registerPeer(p *peer) { - d.peerLock.Lock() - d.peers[p] = struct{}{} - d.peerLock.Unlock() -} - -// unregisterPeer implements peerSetNotify -func (d *requestDistributor) unregisterPeer(p *peer) { - d.peerLock.Lock() - delete(d.peers, p) - d.peerLock.Unlock() -} - -// registerTestPeer adds a new test peer -func (d *requestDistributor) registerTestPeer(p distPeer) { - d.peerLock.Lock() - d.peers[p] = struct{}{} - d.peerLock.Unlock() -} - -// distMaxWait is the maximum waiting time after which further necessary waiting -// times are recalculated based on new feedback from the servers -const distMaxWait = time.Millisecond * 10 - -// main event loop -func (d *requestDistributor) loop() { - for { - select { - case <-d.stopChn: - d.lock.Lock() - elem := d.reqQueue.Front() - for elem != nil { - req := elem.Value.(*distReq) - close(req.sentChn) - req.sentChn = nil - elem = elem.Next() - } - d.lock.Unlock() - return - case <-d.loopChn: - d.lock.Lock() - d.loopNextSent = false - loop: - for { - peer, req, wait := d.nextRequest() - if req != nil && wait == 0 { - chn := req.sentChn // save sentChn because remove sets it to nil - d.remove(req) - send := req.request(peer) - if send != nil { - peer.queueSend(send) - } - chn <- peer - close(chn) - } else { - if wait == 0 { - // no request to send and nothing to wait for; the next - // queued request will wake up the loop - break loop - } - d.loopNextSent = true // a "next" signal has been sent, do not send another one until this one has been received - if wait > distMaxWait { - // waiting times may be reduced by incoming request replies, if it is too long, recalculate it periodically - wait = distMaxWait - } - go func() { - time.Sleep(wait) - d.loopChn <- struct{}{} - }() - break loop - } - } - d.lock.Unlock() - } - } -} - -// selectPeerItem represents a peer to be selected for a request by weightedRandomSelect -type selectPeerItem struct { - peer distPeer - req *distReq - weight int64 -} - -// Weight implements wrsItem interface -func (sp selectPeerItem) Weight() int64 { - return sp.weight -} - -// nextRequest returns the next possible request from any peer, along with the -// associated peer and necessary waiting time -func (d *requestDistributor) nextRequest() (distPeer, *distReq, time.Duration) { - checkedPeers := make(map[distPeer]struct{}) - elem := d.reqQueue.Front() - var ( - bestPeer distPeer - bestReq *distReq - bestWait time.Duration - sel *weightedRandomSelect - ) - - d.peerLock.RLock() - defer d.peerLock.RUnlock() - - for (len(d.peers) > 0 || elem == d.reqQueue.Front()) && elem != nil { - req := elem.Value.(*distReq) - canSend := false - for peer := range d.peers { - if _, ok := checkedPeers[peer]; !ok && peer.canQueue() && req.canSend(peer) { - canSend = true - cost := req.getCost(peer) - wait, bufRemain := peer.waitBefore(cost) - if wait == 0 { - if sel == nil { - sel = newWeightedRandomSelect() - } - sel.update(selectPeerItem{peer: peer, req: req, weight: int64(bufRemain*1000000) + 1}) - } else { - if bestReq == nil || wait < bestWait { - bestPeer = peer - bestReq = req - bestWait = wait - } - } - checkedPeers[peer] = struct{}{} - } - } - next := elem.Next() - if !canSend && elem == d.reqQueue.Front() { - close(req.sentChn) - d.remove(req) - } - elem = next - } - - if sel != nil { - c := sel.choose().(selectPeerItem) - return c.peer, c.req, 0 - } - return bestPeer, bestReq, bestWait -} - -// queue adds a request to the distribution queue, returns a channel where the -// receiving peer is sent once the request has been sent (request callback returned). -// If the request is cancelled or timed out without suitable peers, the channel is -// closed without sending any peer references to it. -func (d *requestDistributor) queue(r *distReq) chan distPeer { - d.lock.Lock() - defer d.lock.Unlock() - - if r.reqOrder == 0 { - d.lastReqOrder++ - r.reqOrder = d.lastReqOrder - } - - back := d.reqQueue.Back() - if back == nil || r.reqOrder > back.Value.(*distReq).reqOrder { - r.element = d.reqQueue.PushBack(r) - } else { - before := d.reqQueue.Front() - for before.Value.(*distReq).reqOrder < r.reqOrder { - before = before.Next() - } - r.element = d.reqQueue.InsertBefore(r, before) - } - - if !d.loopNextSent { - d.loopNextSent = true - d.loopChn <- struct{}{} - } - - r.sentChn = make(chan distPeer, 1) - return r.sentChn -} - -// cancel removes a request from the queue if it has not been sent yet (returns -// false if it has been sent already). It is guaranteed that the callback functions -// will not be called after cancel returns. -func (d *requestDistributor) cancel(r *distReq) bool { - d.lock.Lock() - defer d.lock.Unlock() - - if r.sentChn == nil { - return false - } - - close(r.sentChn) - d.remove(r) - return true -} - -// remove removes a request from the queue -func (d *requestDistributor) remove(r *distReq) { - r.sentChn = nil - if r.element != nil { - d.reqQueue.Remove(r.element) - r.element = nil - } -} diff --git a/les/distributor_test.go b/les/distributor_test.go deleted file mode 100644 index 8c7621f26..000000000 --- a/les/distributor_test.go +++ /dev/null @@ -1,185 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package light implements on-demand retrieval capable state and chain objects -// for the Ethereum Light Client. -package les - -import ( - "math/rand" - "sync" - "testing" - "time" -) - -type testDistReq struct { - cost, procTime, order uint64 - canSendTo map[*testDistPeer]struct{} -} - -func (r *testDistReq) getCost(dp distPeer) uint64 { - return r.cost -} - -func (r *testDistReq) canSend(dp distPeer) bool { - _, ok := r.canSendTo[dp.(*testDistPeer)] - return ok -} - -func (r *testDistReq) request(dp distPeer) func() { - return func() { dp.(*testDistPeer).send(r) } -} - -type testDistPeer struct { - sent []*testDistReq - sumCost uint64 - lock sync.RWMutex -} - -func (p *testDistPeer) send(r *testDistReq) { - p.lock.Lock() - defer p.lock.Unlock() - - p.sent = append(p.sent, r) - p.sumCost += r.cost -} - -func (p *testDistPeer) worker(t *testing.T, checkOrder bool, stop chan struct{}) { - var last uint64 - for { - wait := time.Millisecond - p.lock.Lock() - if len(p.sent) > 0 { - rq := p.sent[0] - wait = time.Duration(rq.procTime) - p.sumCost -= rq.cost - if checkOrder { - if rq.order <= last { - t.Errorf("Requests processed in wrong order") - } - last = rq.order - } - p.sent = p.sent[1:] - } - p.lock.Unlock() - select { - case <-stop: - return - case <-time.After(wait): - } - } -} - -const ( - testDistBufLimit = 10000000 - testDistMaxCost = 1000000 - testDistPeerCount = 5 - testDistReqCount = 5000 - testDistMaxResendCount = 3 -) - -func (p *testDistPeer) waitBefore(cost uint64) (time.Duration, float64) { - p.lock.RLock() - sumCost := p.sumCost + cost - p.lock.RUnlock() - if sumCost < testDistBufLimit { - return 0, float64(testDistBufLimit-sumCost) / float64(testDistBufLimit) - } - return time.Duration(sumCost - testDistBufLimit), 0 -} - -func (p *testDistPeer) canQueue() bool { - return true -} - -func (p *testDistPeer) queueSend(f func()) { - f() -} - -func TestRequestDistributor(t *testing.T) { - testRequestDistributor(t, false) -} - -func TestRequestDistributorResend(t *testing.T) { - testRequestDistributor(t, true) -} - -func testRequestDistributor(t *testing.T, resend bool) { - stop := make(chan struct{}) - defer close(stop) - - dist := newRequestDistributor(nil, stop) - var peers [testDistPeerCount]*testDistPeer - for i := range peers { - peers[i] = &testDistPeer{} - go peers[i].worker(t, !resend, stop) - dist.registerTestPeer(peers[i]) - } - - var wg sync.WaitGroup - - for i := 1; i <= testDistReqCount; i++ { - cost := uint64(rand.Int63n(testDistMaxCost)) - procTime := uint64(rand.Int63n(int64(cost + 1))) - rq := &testDistReq{ - cost: cost, - procTime: procTime, - order: uint64(i), - canSendTo: make(map[*testDistPeer]struct{}), - } - for _, peer := range peers { - if rand.Intn(2) != 0 { - rq.canSendTo[peer] = struct{}{} - } - } - - wg.Add(1) - req := &distReq{ - getCost: rq.getCost, - canSend: rq.canSend, - request: rq.request, - } - chn := dist.queue(req) - go func() { - cnt := 1 - if resend && len(rq.canSendTo) != 0 { - cnt = rand.Intn(testDistMaxResendCount) + 1 - } - for i := 0; i < cnt; i++ { - if i != 0 { - chn = dist.queue(req) - } - p := <-chn - if p == nil { - if len(rq.canSendTo) != 0 { - t.Errorf("Request that could have been sent was dropped") - } - } else { - peer := p.(*testDistPeer) - if _, ok := rq.canSendTo[peer]; !ok { - t.Errorf("Request sent to wrong peer") - } - } - } - wg.Done() - }() - if rand.Intn(1000) == 0 { - time.Sleep(time.Duration(rand.Intn(5000000))) - } - } - - wg.Wait() -} diff --git a/les/execqueue.go b/les/execqueue.go deleted file mode 100644 index 614721bf0..000000000 --- a/les/execqueue.go +++ /dev/null @@ -1,97 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package les - -import "sync" - -// execQueue implements a queue that executes function calls in a single thread, -// in the same order as they have been queued. -type execQueue struct { - mu sync.Mutex - cond *sync.Cond - funcs []func() - closeWait chan struct{} -} - -// newExecQueue creates a new execution queue. -func newExecQueue(capacity int) *execQueue { - q := &execQueue{funcs: make([]func(), 0, capacity)} - q.cond = sync.NewCond(&q.mu) - go q.loop() - return q -} - -func (q *execQueue) loop() { - for f := q.waitNext(false); f != nil; f = q.waitNext(true) { - f() - } - close(q.closeWait) -} - -func (q *execQueue) waitNext(drop bool) (f func()) { - q.mu.Lock() - if drop { - // Remove the function that just executed. We do this here instead of when - // dequeuing so len(q.funcs) includes the function that is running. - q.funcs = append(q.funcs[:0], q.funcs[1:]...) - } - for !q.isClosed() { - if len(q.funcs) > 0 { - f = q.funcs[0] - break - } - q.cond.Wait() - } - q.mu.Unlock() - return f -} - -func (q *execQueue) isClosed() bool { - return q.closeWait != nil -} - -// canQueue returns true if more function calls can be added to the execution queue. -func (q *execQueue) canQueue() bool { - q.mu.Lock() - ok := !q.isClosed() && len(q.funcs) < cap(q.funcs) - q.mu.Unlock() - return ok -} - -// queue adds a function call to the execution queue. Returns true if successful. -func (q *execQueue) queue(f func()) bool { - q.mu.Lock() - ok := !q.isClosed() && len(q.funcs) < cap(q.funcs) - if ok { - q.funcs = append(q.funcs, f) - q.cond.Signal() - } - q.mu.Unlock() - return ok -} - -// quit stops the exec queue. -// quit waits for the current execution to finish before returning. -func (q *execQueue) quit() { - q.mu.Lock() - if !q.isClosed() { - q.closeWait = make(chan struct{}) - q.cond.Signal() - } - q.mu.Unlock() - <-q.closeWait -} diff --git a/les/execqueue_test.go b/les/execqueue_test.go deleted file mode 100644 index cd45b03f2..000000000 --- a/les/execqueue_test.go +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package les - -import ( - "testing" -) - -func TestExecQueue(t *testing.T) { - var ( - N = 10000 - q = newExecQueue(N) - counter int - execd = make(chan int) - testexit = make(chan struct{}) - ) - defer q.quit() - defer close(testexit) - - check := func(state string, wantOK bool) { - c := counter - counter++ - qf := func() { - select { - case execd <- c: - case <-testexit: - } - } - if q.canQueue() != wantOK { - t.Fatalf("canQueue() == %t for %s", !wantOK, state) - } - if q.queue(qf) != wantOK { - t.Fatalf("canQueue() == %t for %s", !wantOK, state) - } - } - - for i := 0; i < N; i++ { - check("queue below cap", true) - } - check("full queue", false) - for i := 0; i < N; i++ { - if c := <-execd; c != i { - t.Fatal("execution out of order") - } - } - q.quit() - check("closed queue", false) -} diff --git a/les/fetcher.go b/les/fetcher.go deleted file mode 100644 index 5183fdc92..000000000 --- a/les/fetcher.go +++ /dev/null @@ -1,785 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package les implements the Light Ethereum Subprotocol. -package les - -import ( - "math/big" - "sync" - "time" - - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/common/mclock" - "github.com/dexon-foundation/dexon/consensus" - "github.com/dexon-foundation/dexon/core/rawdb" - "github.com/dexon-foundation/dexon/core/types" - "github.com/dexon-foundation/dexon/light" - "github.com/dexon-foundation/dexon/log" -) - -const ( - blockDelayTimeout = time.Second * 10 // timeout for a peer to announce a head that has already been confirmed by others - maxNodeCount = 20 // maximum number of fetcherTreeNode entries remembered for each peer - serverStateAvailable = 100 // number of recent blocks where state availability is assumed -) - -// lightFetcher implements retrieval of newly announced headers. It also provides a peerHasBlock function for the -// ODR system to ensure that we only request data related to a certain block from peers who have already processed -// and announced that block. -type lightFetcher struct { - pm *ProtocolManager - odr *LesOdr - chain *light.LightChain - - lock sync.Mutex // lock protects access to the fetcher's internal state variables except sent requests - maxConfirmedTd *big.Int - peers map[*peer]*fetcherPeerInfo - lastUpdateStats *updateStatsEntry - syncing bool - syncDone chan *peer - - reqMu sync.RWMutex // reqMu protects access to sent header fetch requests - requested map[uint64]fetchRequest - deliverChn chan fetchResponse - timeoutChn chan uint64 - requestChn chan bool // true if initiated from outside -} - -// fetcherPeerInfo holds fetcher-specific information about each active peer -type fetcherPeerInfo struct { - root, lastAnnounced *fetcherTreeNode - nodeCnt int - confirmedTd *big.Int - bestConfirmed *fetcherTreeNode - nodeByHash map[common.Hash]*fetcherTreeNode - firstUpdateStats *updateStatsEntry -} - -// fetcherTreeNode is a node of a tree that holds information about blocks recently -// announced and confirmed by a certain peer. Each new announce message from a peer -// adds nodes to the tree, based on the previous announced head and the reorg depth. -// There are three possible states for a tree node: -// - announced: not downloaded (known) yet, but we know its head, number and td -// - intermediate: not known, hash and td are empty, they are filled out when it becomes known -// - known: both announced by this peer and downloaded (from any peer). -// This structure makes it possible to always know which peer has a certain block, -// which is necessary for selecting a suitable peer for ODR requests and also for -// canonizing new heads. It also helps to always download the minimum necessary -// amount of headers with a single request. -type fetcherTreeNode struct { - hash common.Hash - number uint64 - td *big.Int - known, requested bool - parent *fetcherTreeNode - children []*fetcherTreeNode -} - -// fetchRequest represents a header download request -type fetchRequest struct { - hash common.Hash - amount uint64 - peer *peer - sent mclock.AbsTime - timeout bool -} - -// fetchResponse represents a header download response -type fetchResponse struct { - reqID uint64 - headers []*types.Header - peer *peer -} - -// newLightFetcher creates a new light fetcher -func newLightFetcher(pm *ProtocolManager) *lightFetcher { - f := &lightFetcher{ - pm: pm, - chain: pm.blockchain.(*light.LightChain), - odr: pm.odr, - peers: make(map[*peer]*fetcherPeerInfo), - deliverChn: make(chan fetchResponse, 100), - requested: make(map[uint64]fetchRequest), - timeoutChn: make(chan uint64), - requestChn: make(chan bool, 100), - syncDone: make(chan *peer), - maxConfirmedTd: big.NewInt(0), - } - pm.peers.notify(f) - - f.pm.wg.Add(1) - go f.syncLoop() - return f -} - -// syncLoop is the main event loop of the light fetcher -func (f *lightFetcher) syncLoop() { - requesting := false - defer f.pm.wg.Done() - for { - select { - case <-f.pm.quitSync: - return - // when a new announce is received, request loop keeps running until - // no further requests are necessary or possible - case newAnnounce := <-f.requestChn: - f.lock.Lock() - s := requesting - requesting = false - var ( - rq *distReq - reqID uint64 - syncing bool - ) - if !f.syncing && !(newAnnounce && s) { - rq, reqID, syncing = f.nextRequest() - } - f.lock.Unlock() - - if rq != nil { - requesting = true - if _, ok := <-f.pm.reqDist.queue(rq); ok { - if syncing { - f.lock.Lock() - f.syncing = true - f.lock.Unlock() - } else { - go func() { - time.Sleep(softRequestTimeout) - f.reqMu.Lock() - req, ok := f.requested[reqID] - if ok { - req.timeout = true - f.requested[reqID] = req - } - f.reqMu.Unlock() - // keep starting new requests while possible - f.requestChn <- false - }() - } - } else { - f.requestChn <- false - } - } - case reqID := <-f.timeoutChn: - f.reqMu.Lock() - req, ok := f.requested[reqID] - if ok { - delete(f.requested, reqID) - } - f.reqMu.Unlock() - if ok { - f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), true) - req.peer.Log().Debug("Fetching data timed out hard") - go f.pm.removePeer(req.peer.id) - } - case resp := <-f.deliverChn: - f.reqMu.Lock() - req, ok := f.requested[resp.reqID] - if ok && req.peer != resp.peer { - ok = false - } - if ok { - delete(f.requested, resp.reqID) - } - f.reqMu.Unlock() - if ok { - f.pm.serverPool.adjustResponseTime(req.peer.poolEntry, time.Duration(mclock.Now()-req.sent), req.timeout) - } - f.lock.Lock() - if !ok || !(f.syncing || f.processResponse(req, resp)) { - resp.peer.Log().Debug("Failed processing response") - go f.pm.removePeer(resp.peer.id) - } - f.lock.Unlock() - case p := <-f.syncDone: - f.lock.Lock() - p.Log().Debug("Done synchronising with peer") - f.checkSyncedHeaders(p) - f.syncing = false - f.lock.Unlock() - f.requestChn <- false - } - } -} - -// registerPeer adds a new peer to the fetcher's peer set -func (f *lightFetcher) registerPeer(p *peer) { - p.lock.Lock() - p.hasBlock = func(hash common.Hash, number uint64, hasState bool) bool { - return f.peerHasBlock(p, hash, number, hasState) - } - p.lock.Unlock() - - f.lock.Lock() - defer f.lock.Unlock() - - f.peers[p] = &fetcherPeerInfo{nodeByHash: make(map[common.Hash]*fetcherTreeNode)} -} - -// unregisterPeer removes a new peer from the fetcher's peer set -func (f *lightFetcher) unregisterPeer(p *peer) { - p.lock.Lock() - p.hasBlock = nil - p.lock.Unlock() - - f.lock.Lock() - defer f.lock.Unlock() - - // check for potential timed out block delay statistics - f.checkUpdateStats(p, nil) - delete(f.peers, p) -} - -// announce processes a new announcement message received from a peer, adding new -// nodes to the peer's block tree and removing old nodes if necessary -func (f *lightFetcher) announce(p *peer, head *announceData) { - f.lock.Lock() - defer f.lock.Unlock() - p.Log().Debug("Received new announcement", "number", head.Number, "hash", head.Hash, "reorg", head.ReorgDepth) - - fp := f.peers[p] - if fp == nil { - p.Log().Debug("Announcement from unknown peer") - return - } - - if fp.lastAnnounced != nil && head.Td.Cmp(fp.lastAnnounced.td) <= 0 { - // announced tds should be strictly monotonic - p.Log().Debug("Received non-monotonic td", "current", head.Td, "previous", fp.lastAnnounced.td) - go f.pm.removePeer(p.id) - return - } - - n := fp.lastAnnounced - for i := uint64(0); i < head.ReorgDepth; i++ { - if n == nil { - break - } - n = n.parent - } - // n is now the reorg common ancestor, add a new branch of nodes - if n != nil && (head.Number >= n.number+maxNodeCount || head.Number <= n.number) { - // if announced head block height is lower or same as n or too far from it to add - // intermediate nodes then discard previous announcement info and trigger a resync - n = nil - fp.nodeCnt = 0 - fp.nodeByHash = make(map[common.Hash]*fetcherTreeNode) - } - if n != nil { - // check if the node count is too high to add new nodes, discard oldest ones if necessary - locked := false - for uint64(fp.nodeCnt)+head.Number-n.number > maxNodeCount && fp.root != nil { - if !locked { - f.chain.LockChain() - defer f.chain.UnlockChain() - locked = true - } - // if one of root's children is canonical, keep it, delete other branches and root itself - var newRoot *fetcherTreeNode - for i, nn := range fp.root.children { - if rawdb.ReadCanonicalHash(f.pm.chainDb, nn.number) == nn.hash { - fp.root.children = append(fp.root.children[:i], fp.root.children[i+1:]...) - nn.parent = nil - newRoot = nn - break - } - } - fp.deleteNode(fp.root) - if n == fp.root { - n = newRoot - } - fp.root = newRoot - if newRoot == nil || !f.checkKnownNode(p, newRoot) { - fp.bestConfirmed = nil - fp.confirmedTd = nil - } - - if n == nil { - break - } - } - if n != nil { - for n.number < head.Number { - nn := &fetcherTreeNode{number: n.number + 1, parent: n} - n.children = append(n.children, nn) - n = nn - fp.nodeCnt++ - } - n.hash = head.Hash - n.td = head.Td - fp.nodeByHash[n.hash] = n - } - } - if n == nil { - // could not find reorg common ancestor or had to delete entire tree, a new root and a resync is needed - if fp.root != nil { - fp.deleteNode(fp.root) - } - n = &fetcherTreeNode{hash: head.Hash, number: head.Number, td: head.Td} - fp.root = n - fp.nodeCnt++ - fp.nodeByHash[n.hash] = n - fp.bestConfirmed = nil - fp.confirmedTd = nil - } - - f.checkKnownNode(p, n) - p.lock.Lock() - p.headInfo = head - fp.lastAnnounced = n - p.lock.Unlock() - f.checkUpdateStats(p, nil) - f.requestChn <- true -} - -// peerHasBlock returns true if we can assume the peer knows the given block -// based on its announcements -func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64, hasState bool) bool { - f.lock.Lock() - defer f.lock.Unlock() - - fp := f.peers[p] - if fp == nil || fp.root == nil { - return false - } - - if hasState { - if fp.lastAnnounced == nil || fp.lastAnnounced.number > number+serverStateAvailable { - return false - } - } - - if f.syncing { - // always return true when syncing - // false positives are acceptable, a more sophisticated condition can be implemented later - return true - } - - if number >= fp.root.number { - // it is recent enough that if it is known, is should be in the peer's block tree - return fp.nodeByHash[hash] != nil - } - f.chain.LockChain() - defer f.chain.UnlockChain() - // if it's older than the peer's block tree root but it's in the same canonical chain - // as the root, we can still be sure the peer knows it - // - // when syncing, just check if it is part of the known chain, there is nothing better we - // can do since we do not know the most recent block hash yet - return rawdb.ReadCanonicalHash(f.pm.chainDb, fp.root.number) == fp.root.hash && rawdb.ReadCanonicalHash(f.pm.chainDb, number) == hash -} - -// requestAmount calculates the amount of headers to be downloaded starting -// from a certain head backwards -func (f *lightFetcher) requestAmount(p *peer, n *fetcherTreeNode) uint64 { - amount := uint64(0) - nn := n - for nn != nil && !f.checkKnownNode(p, nn) { - nn = nn.parent - amount++ - } - if nn == nil { - amount = n.number - } - return amount -} - -// requestedID tells if a certain reqID has been requested by the fetcher -func (f *lightFetcher) requestedID(reqID uint64) bool { - f.reqMu.RLock() - _, ok := f.requested[reqID] - f.reqMu.RUnlock() - return ok -} - -// nextRequest selects the peer and announced head to be requested next, amount -// to be downloaded starting from the head backwards is also returned -func (f *lightFetcher) nextRequest() (*distReq, uint64, bool) { - var ( - bestHash common.Hash - bestAmount uint64 - ) - bestTd := f.maxConfirmedTd - bestSyncing := false - - for p, fp := range f.peers { - for hash, n := range fp.nodeByHash { - if !f.checkKnownNode(p, n) && !n.requested && (bestTd == nil || n.td.Cmp(bestTd) >= 0) { - amount := f.requestAmount(p, n) - if bestTd == nil || n.td.Cmp(bestTd) > 0 || amount < bestAmount { - bestHash = hash - bestAmount = amount - bestTd = n.td - bestSyncing = fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) - } - } - } - } - if bestTd == f.maxConfirmedTd { - return nil, 0, false - } - - var rq *distReq - reqID := genReqID() - if bestSyncing { - rq = &distReq{ - getCost: func(dp distPeer) uint64 { - return 0 - }, - canSend: func(dp distPeer) bool { - p := dp.(*peer) - f.lock.Lock() - defer f.lock.Unlock() - - fp := f.peers[p] - return fp != nil && fp.nodeByHash[bestHash] != nil - }, - request: func(dp distPeer) func() { - go func() { - p := dp.(*peer) - p.Log().Debug("Synchronisation started") - f.pm.synchronise(p) - f.syncDone <- p - }() - return nil - }, - } - } else { - rq = &distReq{ - getCost: func(dp distPeer) uint64 { - p := dp.(*peer) - return p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) - }, - canSend: func(dp distPeer) bool { - p := dp.(*peer) - f.lock.Lock() - defer f.lock.Unlock() - - fp := f.peers[p] - if fp == nil { - return false - } - n := fp.nodeByHash[bestHash] - return n != nil && !n.requested - }, - request: func(dp distPeer) func() { - p := dp.(*peer) - f.lock.Lock() - fp := f.peers[p] - if fp != nil { - n := fp.nodeByHash[bestHash] - if n != nil { - n.requested = true - } - } - f.lock.Unlock() - - cost := p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)) - p.fcServer.QueueRequest(reqID, cost) - f.reqMu.Lock() - f.requested[reqID] = fetchRequest{hash: bestHash, amount: bestAmount, peer: p, sent: mclock.Now()} - f.reqMu.Unlock() - go func() { - time.Sleep(hardRequestTimeout) - f.timeoutChn <- reqID - }() - return func() { p.RequestHeadersByHash(reqID, cost, bestHash, int(bestAmount), 0, true) } - }, - } - } - return rq, reqID, bestSyncing -} - -// deliverHeaders delivers header download request responses for processing -func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types.Header) { - f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer} -} - -// processResponse processes header download request responses, returns true if successful -func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool { - if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash { - req.peer.Log().Debug("Response content mismatch", "requested", len(resp.headers), "reqfrom", resp.headers[0], "delivered", req.amount, "delfrom", req.hash) - return false - } - headers := make([]*types.Header, req.amount) - for i, header := range resp.headers { - headers[int(req.amount)-1-i] = header - } - if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil { - if err == consensus.ErrFutureBlock { - return true - } - log.Debug("Failed to insert header chain", "err", err) - return false - } - tds := make([]*big.Int, len(headers)) - for i, header := range headers { - td := f.chain.GetTd(header.Hash(), header.Number.Uint64()) - if td == nil { - log.Debug("Total difficulty not found for header", "index", i+1, "number", header.Number, "hash", header.Hash()) - return false - } - tds[i] = td - } - f.newHeaders(headers, tds) - return true -} - -// newHeaders updates the block trees of all active peers according to a newly -// downloaded and validated batch or headers -func (f *lightFetcher) newHeaders(headers []*types.Header, tds []*big.Int) { - var maxTd *big.Int - for p, fp := range f.peers { - if !f.checkAnnouncedHeaders(fp, headers, tds) { - p.Log().Debug("Inconsistent announcement") - go f.pm.removePeer(p.id) - } - if fp.confirmedTd != nil && (maxTd == nil || maxTd.Cmp(fp.confirmedTd) > 0) { - maxTd = fp.confirmedTd - } - } - if maxTd != nil { - f.updateMaxConfirmedTd(maxTd) - } -} - -// checkAnnouncedHeaders updates peer's block tree if necessary after validating -// a batch of headers. It searches for the latest header in the batch that has a -// matching tree node (if any), and if it has not been marked as known already, -// sets it and its parents to known (even those which are older than the currently -// validated ones). Return value shows if all hashes, numbers and Tds matched -// correctly to the announced values (otherwise the peer should be dropped). -func (f *lightFetcher) checkAnnouncedHeaders(fp *fetcherPeerInfo, headers []*types.Header, tds []*big.Int) bool { - var ( - n *fetcherTreeNode - header *types.Header - td *big.Int - ) - - for i := len(headers) - 1; ; i-- { - if i < 0 { - if n == nil { - // no more headers and nothing to match - return true - } - // we ran out of recently delivered headers but have not reached a node known by this peer yet, continue matching - hash, number := header.ParentHash, header.Number.Uint64()-1 - td = f.chain.GetTd(hash, number) - header = f.chain.GetHeader(hash, number) - if header == nil || td == nil { - log.Error("Missing parent of validated header", "hash", hash, "number", number) - return false - } - } else { - header = headers[i] - td = tds[i] - } - hash := header.Hash() - number := header.Number.Uint64() - if n == nil { - n = fp.nodeByHash[hash] - } - if n != nil { - if n.td == nil { - // node was unannounced - if nn := fp.nodeByHash[hash]; nn != nil { - // if there was already a node with the same hash, continue there and drop this one - nn.children = append(nn.children, n.children...) - n.children = nil - fp.deleteNode(n) - n = nn - } else { - n.hash = hash - n.td = td - fp.nodeByHash[hash] = n - } - } - // check if it matches the header - if n.hash != hash || n.number != number || n.td.Cmp(td) != 0 { - // peer has previously made an invalid announcement - return false - } - if n.known { - // we reached a known node that matched our expectations, return with success - return true - } - n.known = true - if fp.confirmedTd == nil || td.Cmp(fp.confirmedTd) > 0 { - fp.confirmedTd = td - fp.bestConfirmed = n - } - n = n.parent - if n == nil { - return true - } - } - } -} - -// checkSyncedHeaders updates peer's block tree after synchronisation by marking -// downloaded headers as known. If none of the announced headers are found after -// syncing, the peer is dropped. -func (f *lightFetcher) checkSyncedHeaders(p *peer) { - fp := f.peers[p] - if fp == nil { - p.Log().Debug("Unknown peer to check sync headers") - return - } - n := fp.lastAnnounced - var td *big.Int - for n != nil { - if td = f.chain.GetTd(n.hash, n.number); td != nil { - break - } - n = n.parent - } - // now n is the latest downloaded header after syncing - if n == nil { - p.Log().Debug("Synchronisation failed") - go f.pm.removePeer(p.id) - } else { - header := f.chain.GetHeader(n.hash, n.number) - f.newHeaders([]*types.Header{header}, []*big.Int{td}) - } -} - -// checkKnownNode checks if a block tree node is known (downloaded and validated) -// If it was not known previously but found in the database, sets its known flag -func (f *lightFetcher) checkKnownNode(p *peer, n *fetcherTreeNode) bool { - if n.known { - return true - } - td := f.chain.GetTd(n.hash, n.number) - if td == nil { - return false - } - header := f.chain.GetHeader(n.hash, n.number) - // check the availability of both header and td because reads are not protected by chain db mutex - // Note: returning false is always safe here - if header == nil { - return false - } - - fp := f.peers[p] - if fp == nil { - p.Log().Debug("Unknown peer to check known nodes") - return false - } - if !f.checkAnnouncedHeaders(fp, []*types.Header{header}, []*big.Int{td}) { - p.Log().Debug("Inconsistent announcement") - go f.pm.removePeer(p.id) - } - if fp.confirmedTd != nil { - f.updateMaxConfirmedTd(fp.confirmedTd) - } - return n.known -} - -// deleteNode deletes a node and its child subtrees from a peer's block tree -func (fp *fetcherPeerInfo) deleteNode(n *fetcherTreeNode) { - if n.parent != nil { - for i, nn := range n.parent.children { - if nn == n { - n.parent.children = append(n.parent.children[:i], n.parent.children[i+1:]...) - break - } - } - } - for { - if n.td != nil { - delete(fp.nodeByHash, n.hash) - } - fp.nodeCnt-- - if len(n.children) == 0 { - return - } - for i, nn := range n.children { - if i == 0 { - n = nn - } else { - fp.deleteNode(nn) - } - } - } -} - -// updateStatsEntry items form a linked list that is expanded with a new item every time a new head with a higher Td -// than the previous one has been downloaded and validated. The list contains a series of maximum confirmed Td values -// and the time these values have been confirmed, both increasing monotonically. A maximum confirmed Td is calculated -// both globally for all peers and also for each individual peer (meaning that the given peer has announced the head -// and it has also been downloaded from any peer, either before or after the given announcement). -// The linked list has a global tail where new confirmed Td entries are added and a separate head for each peer, -// pointing to the next Td entry that is higher than the peer's max confirmed Td (nil if it has already confirmed -// the current global head). -type updateStatsEntry struct { - time mclock.AbsTime - td *big.Int - next *updateStatsEntry -} - -// updateMaxConfirmedTd updates the block delay statistics of active peers. Whenever a new highest Td is confirmed, -// adds it to the end of a linked list together with the time it has been confirmed. Then checks which peers have -// already confirmed a head with the same or higher Td (which counts as zero block delay) and updates their statistics. -// Those who have not confirmed such a head by now will be updated by a subsequent checkUpdateStats call with a -// positive block delay value. -func (f *lightFetcher) updateMaxConfirmedTd(td *big.Int) { - if f.maxConfirmedTd == nil || td.Cmp(f.maxConfirmedTd) > 0 { - f.maxConfirmedTd = td - newEntry := &updateStatsEntry{ - time: mclock.Now(), - td: td, - } - if f.lastUpdateStats != nil { - f.lastUpdateStats.next = newEntry - } - f.lastUpdateStats = newEntry - for p := range f.peers { - f.checkUpdateStats(p, newEntry) - } - } -} - -// checkUpdateStats checks those peers who have not confirmed a certain highest Td (or a larger one) by the time it -// has been confirmed by another peer. If they have confirmed such a head by now, their stats are updated with the -// block delay which is (this peer's confirmation time)-(first confirmation time). After blockDelayTimeout has passed, -// the stats are updated with blockDelayTimeout value. In either case, the confirmed or timed out updateStatsEntry -// items are removed from the head of the linked list. -// If a new entry has been added to the global tail, it is passed as a parameter here even though this function -// assumes that it has already been added, so that if the peer's list is empty (all heads confirmed, head is nil), -// it can set the new head to newEntry. -func (f *lightFetcher) checkUpdateStats(p *peer, newEntry *updateStatsEntry) { - now := mclock.Now() - fp := f.peers[p] - if fp == nil { - p.Log().Debug("Unknown peer to check update stats") - return - } - if newEntry != nil && fp.firstUpdateStats == nil { - fp.firstUpdateStats = newEntry - } - for fp.firstUpdateStats != nil && fp.firstUpdateStats.time <= now-mclock.AbsTime(blockDelayTimeout) { - f.pm.serverPool.adjustBlockDelay(p.poolEntry, blockDelayTimeout) - fp.firstUpdateStats = fp.firstUpdateStats.next - } - if fp.confirmedTd != nil { - for fp.firstUpdateStats != nil && fp.firstUpdateStats.td.Cmp(fp.confirmedTd) <= 0 { - f.pm.serverPool.adjustBlockDelay(p.poolEntry, time.Duration(now-fp.firstUpdateStats.time)) - fp.firstUpdateStats = fp.firstUpdateStats.next - } - } -} diff --git a/les/flowcontrol/control.go b/les/flowcontrol/control.go deleted file mode 100644 index bb4a73b6d..000000000 --- a/les/flowcontrol/control.go +++ /dev/null @@ -1,183 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package flowcontrol implements a client side flow control mechanism -package flowcontrol - -import ( - "sync" - "time" - - "github.com/dexon-foundation/dexon/common/mclock" -) - -const fcTimeConst = time.Millisecond - -type ServerParams struct { - BufLimit, MinRecharge uint64 -} - -type ClientNode struct { - params *ServerParams - bufValue uint64 - lastTime mclock.AbsTime - lock sync.Mutex - cm *ClientManager - cmNode *cmNode -} - -func NewClientNode(cm *ClientManager, params *ServerParams) *ClientNode { - node := &ClientNode{ - cm: cm, - params: params, - bufValue: params.BufLimit, - lastTime: mclock.Now(), - } - node.cmNode = cm.addNode(node) - return node -} - -func (peer *ClientNode) Remove(cm *ClientManager) { - cm.removeNode(peer.cmNode) -} - -func (peer *ClientNode) recalcBV(time mclock.AbsTime) { - dt := uint64(time - peer.lastTime) - if time < peer.lastTime { - dt = 0 - } - peer.bufValue += peer.params.MinRecharge * dt / uint64(fcTimeConst) - if peer.bufValue > peer.params.BufLimit { - peer.bufValue = peer.params.BufLimit - } - peer.lastTime = time -} - -func (peer *ClientNode) AcceptRequest() (uint64, bool) { - peer.lock.Lock() - defer peer.lock.Unlock() - - time := mclock.Now() - peer.recalcBV(time) - return peer.bufValue, peer.cm.accept(peer.cmNode, time) -} - -func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) { - peer.lock.Lock() - defer peer.lock.Unlock() - - time := mclock.Now() - peer.recalcBV(time) - peer.bufValue -= cost - rcValue, rcost := peer.cm.processed(peer.cmNode, time) - if rcValue < peer.params.BufLimit { - bv := peer.params.BufLimit - rcValue - if bv > peer.bufValue { - peer.bufValue = bv - } - } - return peer.bufValue, rcost -} - -type ServerNode struct { - bufEstimate uint64 - lastTime mclock.AbsTime - params *ServerParams - sumCost uint64 // sum of req costs sent to this server - pending map[uint64]uint64 // value = sumCost after sending the given req - lock sync.RWMutex -} - -func NewServerNode(params *ServerParams) *ServerNode { - return &ServerNode{ - bufEstimate: params.BufLimit, - lastTime: mclock.Now(), - params: params, - pending: make(map[uint64]uint64), - } -} - -func (peer *ServerNode) recalcBLE(time mclock.AbsTime) { - dt := uint64(time - peer.lastTime) - if time < peer.lastTime { - dt = 0 - } - peer.bufEstimate += peer.params.MinRecharge * dt / uint64(fcTimeConst) - if peer.bufEstimate > peer.params.BufLimit { - peer.bufEstimate = peer.params.BufLimit - } - peer.lastTime = time -} - -// safetyMargin is added to the flow control waiting time when estimated buffer value is low -const safetyMargin = time.Millisecond - -func (peer *ServerNode) canSend(maxCost uint64) (time.Duration, float64) { - peer.recalcBLE(mclock.Now()) - maxCost += uint64(safetyMargin) * peer.params.MinRecharge / uint64(fcTimeConst) - if maxCost > peer.params.BufLimit { - maxCost = peer.params.BufLimit - } - if peer.bufEstimate >= maxCost { - return 0, float64(peer.bufEstimate-maxCost) / float64(peer.params.BufLimit) - } - return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge), 0 -} - -// CanSend returns the minimum waiting time required before sending a request -// with the given maximum estimated cost. Second return value is the relative -// estimated buffer level after sending the request (divided by BufLimit). -func (peer *ServerNode) CanSend(maxCost uint64) (time.Duration, float64) { - peer.lock.RLock() - defer peer.lock.RUnlock() - - return peer.canSend(maxCost) -} - -// QueueRequest should be called when the request has been assigned to the given -// server node, before putting it in the send queue. It is mandatory that requests -// are sent in the same order as the QueueRequest calls are made. -func (peer *ServerNode) QueueRequest(reqID, maxCost uint64) { - peer.lock.Lock() - defer peer.lock.Unlock() - - peer.bufEstimate -= maxCost - peer.sumCost += maxCost - peer.pending[reqID] = peer.sumCost -} - -// GotReply adjusts estimated buffer value according to the value included in -// the latest request reply. -func (peer *ServerNode) GotReply(reqID, bv uint64) { - - peer.lock.Lock() - defer peer.lock.Unlock() - - if bv > peer.params.BufLimit { - bv = peer.params.BufLimit - } - sc, ok := peer.pending[reqID] - if !ok { - return - } - delete(peer.pending, reqID) - cc := peer.sumCost - sc - peer.bufEstimate = 0 - if bv > cc { - peer.bufEstimate = bv - cc - } - peer.lastTime = mclock.Now() -} diff --git a/les/flowcontrol/manager.go b/les/flowcontrol/manager.go deleted file mode 100644 index ee565c83d..000000000 --- a/les/flowcontrol/manager.go +++ /dev/null @@ -1,224 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package flowcontrol implements a client side flow control mechanism -package flowcontrol - -import ( - "sync" - "time" - - "github.com/dexon-foundation/dexon/common/mclock" -) - -const rcConst = 1000000 - -type cmNode struct { - node *ClientNode - lastUpdate mclock.AbsTime - serving, recharging bool - rcWeight uint64 - rcValue, rcDelta, startValue int64 - finishRecharge mclock.AbsTime -} - -func (node *cmNode) update(time mclock.AbsTime) { - dt := int64(time - node.lastUpdate) - node.rcValue += node.rcDelta * dt / rcConst - node.lastUpdate = time - if node.recharging && time >= node.finishRecharge { - node.recharging = false - node.rcDelta = 0 - node.rcValue = 0 - } -} - -func (node *cmNode) set(serving bool, simReqCnt, sumWeight uint64) { - if node.serving && !serving { - node.recharging = true - sumWeight += node.rcWeight - } - node.serving = serving - if node.recharging && serving { - node.recharging = false - sumWeight -= node.rcWeight - } - - node.rcDelta = 0 - if serving { - node.rcDelta = int64(rcConst / simReqCnt) - } - if node.recharging { - node.rcDelta = -int64(node.node.cm.rcRecharge * node.rcWeight / sumWeight) - node.finishRecharge = node.lastUpdate + mclock.AbsTime(node.rcValue*rcConst/(-node.rcDelta)) - } -} - -type ClientManager struct { - lock sync.Mutex - nodes map[*cmNode]struct{} - simReqCnt, sumWeight, rcSumValue uint64 - maxSimReq, maxRcSum uint64 - rcRecharge uint64 - resumeQueue chan chan bool - time mclock.AbsTime -} - -func NewClientManager(rcTarget, maxSimReq, maxRcSum uint64) *ClientManager { - cm := &ClientManager{ - nodes: make(map[*cmNode]struct{}), - resumeQueue: make(chan chan bool), - rcRecharge: rcConst * rcConst / (100*rcConst/rcTarget - rcConst), - maxSimReq: maxSimReq, - maxRcSum: maxRcSum, - } - go cm.queueProc() - return cm -} - -func (self *ClientManager) Stop() { - self.lock.Lock() - defer self.lock.Unlock() - - // signal any waiting accept routines to return false - self.nodes = make(map[*cmNode]struct{}) - close(self.resumeQueue) -} - -func (self *ClientManager) addNode(cnode *ClientNode) *cmNode { - time := mclock.Now() - node := &cmNode{ - node: cnode, - lastUpdate: time, - finishRecharge: time, - rcWeight: 1, - } - self.lock.Lock() - defer self.lock.Unlock() - - self.nodes[node] = struct{}{} - self.update(mclock.Now()) - return node -} - -func (self *ClientManager) removeNode(node *cmNode) { - self.lock.Lock() - defer self.lock.Unlock() - - time := mclock.Now() - self.stop(node, time) - delete(self.nodes, node) - self.update(time) -} - -// recalc sumWeight -func (self *ClientManager) updateNodes(time mclock.AbsTime) (rce bool) { - var sumWeight, rcSum uint64 - for node := range self.nodes { - rc := node.recharging - node.update(time) - if rc && !node.recharging { - rce = true - } - if node.recharging { - sumWeight += node.rcWeight - } - rcSum += uint64(node.rcValue) - } - self.sumWeight = sumWeight - self.rcSumValue = rcSum - return -} - -func (self *ClientManager) update(time mclock.AbsTime) { - for { - firstTime := time - for node := range self.nodes { - if node.recharging && node.finishRecharge < firstTime { - firstTime = node.finishRecharge - } - } - if self.updateNodes(firstTime) { - for node := range self.nodes { - if node.recharging { - node.set(node.serving, self.simReqCnt, self.sumWeight) - } - } - } else { - self.time = time - return - } - } -} - -func (self *ClientManager) canStartReq() bool { - return self.simReqCnt < self.maxSimReq && self.rcSumValue < self.maxRcSum -} - -func (self *ClientManager) queueProc() { - for rc := range self.resumeQueue { - for { - time.Sleep(time.Millisecond * 10) - self.lock.Lock() - self.update(mclock.Now()) - cs := self.canStartReq() - self.lock.Unlock() - if cs { - break - } - } - close(rc) - } -} - -func (self *ClientManager) accept(node *cmNode, time mclock.AbsTime) bool { - self.lock.Lock() - defer self.lock.Unlock() - - self.update(time) - if !self.canStartReq() { - resume := make(chan bool) - self.lock.Unlock() - self.resumeQueue <- resume - <-resume - self.lock.Lock() - if _, ok := self.nodes[node]; !ok { - return false // reject if node has been removed or manager has been stopped - } - } - self.simReqCnt++ - node.set(true, self.simReqCnt, self.sumWeight) - node.startValue = node.rcValue - self.update(self.time) - return true -} - -func (self *ClientManager) stop(node *cmNode, time mclock.AbsTime) { - if node.serving { - self.update(time) - self.simReqCnt-- - node.set(false, self.simReqCnt, self.sumWeight) - self.update(time) - } -} - -func (self *ClientManager) processed(node *cmNode, time mclock.AbsTime) (rcValue, rcCost uint64) { - self.lock.Lock() - defer self.lock.Unlock() - - self.stop(node, time) - return uint64(node.rcValue), uint64(node.rcValue - node.startValue) -} diff --git a/les/freeclient.go b/les/freeclient.go deleted file mode 100644 index 10c7bba35..000000000 --- a/les/freeclient.go +++ /dev/null @@ -1,278 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package les implements the Light Ethereum Subprotocol. -package les - -import ( - "io" - "math" - "sync" - "time" - - "github.com/dexon-foundation/dexon/common/mclock" - "github.com/dexon-foundation/dexon/common/prque" - "github.com/dexon-foundation/dexon/ethdb" - "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/rlp" -) - -// freeClientPool implements a client database that limits the connection time -// of each client and manages accepting/rejecting incoming connections and even -// kicking out some connected clients. The pool calculates recent usage time -// for each known client (a value that increases linearly when the client is -// connected and decreases exponentially when not connected). Clients with lower -// recent usage are preferred, unknown nodes have the highest priority. Already -// connected nodes receive a small bias in their favor in order to avoid accepting -// and instantly kicking out clients. -// -// Note: the pool can use any string for client identification. Using signature -// keys for that purpose would not make sense when being known has a negative -// value for the client. Currently the LES protocol manager uses IP addresses -// (without port address) to identify clients. -type freeClientPool struct { - db ethdb.Database - lock sync.Mutex - clock mclock.Clock - closed bool - - connectedLimit, totalLimit int - - addressMap map[string]*freeClientPoolEntry - connPool, disconnPool *prque.Prque - startupTime mclock.AbsTime - logOffsetAtStartup int64 -} - -const ( - recentUsageExpTC = time.Hour // time constant of the exponential weighting window for "recent" server usage - fixedPointMultiplier = 0x1000000 // constant to convert logarithms to fixed point format - connectedBias = time.Minute // this bias is applied in favor of already connected clients in order to avoid kicking them out very soon -) - -// newFreeClientPool creates a new free client pool -func newFreeClientPool(db ethdb.Database, connectedLimit, totalLimit int, clock mclock.Clock) *freeClientPool { - pool := &freeClientPool{ - db: db, - clock: clock, - addressMap: make(map[string]*freeClientPoolEntry), - connPool: prque.New(poolSetIndex), - disconnPool: prque.New(poolSetIndex), - connectedLimit: connectedLimit, - totalLimit: totalLimit, - } - pool.loadFromDb() - return pool -} - -func (f *freeClientPool) stop() { - f.lock.Lock() - f.closed = true - f.saveToDb() - f.lock.Unlock() -} - -// connect should be called after a successful handshake. If the connection was -// rejected, there is no need to call disconnect. -// -// Note: the disconnectFn callback should not block. -func (f *freeClientPool) connect(address string, disconnectFn func()) bool { - f.lock.Lock() - defer f.lock.Unlock() - - if f.closed { - return false - } - e := f.addressMap[address] - now := f.clock.Now() - var recentUsage int64 - if e == nil { - e = &freeClientPoolEntry{address: address, index: -1} - f.addressMap[address] = e - } else { - if e.connected { - log.Debug("Client already connected", "address", address) - return false - } - recentUsage = int64(math.Exp(float64(e.logUsage-f.logOffset(now)) / fixedPointMultiplier)) - } - e.linUsage = recentUsage - int64(now) - // check whether (linUsage+connectedBias) is smaller than the highest entry in the connected pool - if f.connPool.Size() == f.connectedLimit { - i := f.connPool.PopItem().(*freeClientPoolEntry) - if e.linUsage+int64(connectedBias)-i.linUsage < 0 { - // kick it out and accept the new client - f.connPool.Remove(i.index) - f.calcLogUsage(i, now) - i.connected = false - f.disconnPool.Push(i, -i.logUsage) - log.Debug("Client kicked out", "address", i.address) - i.disconnectFn() - } else { - // keep the old client and reject the new one - f.connPool.Push(i, i.linUsage) - log.Debug("Client rejected", "address", address) - return false - } - } - f.disconnPool.Remove(e.index) - e.connected = true - e.disconnectFn = disconnectFn - f.connPool.Push(e, e.linUsage) - if f.connPool.Size()+f.disconnPool.Size() > f.totalLimit { - f.disconnPool.Pop() - } - log.Debug("Client accepted", "address", address) - return true -} - -// disconnect should be called when a connection is terminated. If the disconnection -// was initiated by the pool itself using disconnectFn then calling disconnect is -// not necessary but permitted. -func (f *freeClientPool) disconnect(address string) { - f.lock.Lock() - defer f.lock.Unlock() - - if f.closed { - return - } - e := f.addressMap[address] - now := f.clock.Now() - if !e.connected { - log.Debug("Client already disconnected", "address", address) - return - } - - f.connPool.Remove(e.index) - f.calcLogUsage(e, now) - e.connected = false - f.disconnPool.Push(e, -e.logUsage) - log.Debug("Client disconnected", "address", address) -} - -// logOffset calculates the time-dependent offset for the logarithmic -// representation of recent usage -func (f *freeClientPool) logOffset(now mclock.AbsTime) int64 { - // Note: fixedPointMultiplier acts as a multiplier here; the reason for dividing the divisor - // is to avoid int64 overflow. We assume that int64(recentUsageExpTC) >> fixedPointMultiplier. - logDecay := int64((time.Duration(now - f.startupTime)) / (recentUsageExpTC / fixedPointMultiplier)) - return f.logOffsetAtStartup + logDecay -} - -// calcLogUsage converts recent usage from linear to logarithmic representation -// when disconnecting a peer or closing the client pool -func (f *freeClientPool) calcLogUsage(e *freeClientPoolEntry, now mclock.AbsTime) { - dt := e.linUsage + int64(now) - if dt < 1 { - dt = 1 - } - e.logUsage = int64(math.Log(float64(dt))*fixedPointMultiplier) + f.logOffset(now) -} - -// freeClientPoolStorage is the RLP representation of the pool's database storage -type freeClientPoolStorage struct { - LogOffset uint64 - List []*freeClientPoolEntry -} - -// loadFromDb restores pool status from the database storage -// (automatically called at initialization) -func (f *freeClientPool) loadFromDb() { - enc, err := f.db.Get([]byte("freeClientPool")) - if err != nil { - return - } - var storage freeClientPoolStorage - err = rlp.DecodeBytes(enc, &storage) - if err != nil { - log.Error("Failed to decode client list", "err", err) - return - } - f.logOffsetAtStartup = int64(storage.LogOffset) - f.startupTime = f.clock.Now() - for _, e := range storage.List { - log.Debug("Loaded free client record", "address", e.address, "logUsage", e.logUsage) - f.addressMap[e.address] = e - f.disconnPool.Push(e, -e.logUsage) - } -} - -// saveToDb saves pool status to the database storage -// (automatically called during shutdown) -func (f *freeClientPool) saveToDb() { - now := f.clock.Now() - storage := freeClientPoolStorage{ - LogOffset: uint64(f.logOffset(now)), - List: make([]*freeClientPoolEntry, len(f.addressMap)), - } - i := 0 - for _, e := range f.addressMap { - if e.connected { - f.calcLogUsage(e, now) - } - storage.List[i] = e - i++ - } - enc, err := rlp.EncodeToBytes(storage) - if err != nil { - log.Error("Failed to encode client list", "err", err) - } else { - f.db.Put([]byte("freeClientPool"), enc) - } -} - -// freeClientPoolEntry represents a client address known by the pool. -// When connected, recent usage is calculated as linUsage + int64(clock.Now()) -// When disconnected, it is calculated as exp(logUsage - logOffset) where logOffset -// also grows linearly with time while the server is running. -// Conversion between linear and logarithmic representation happens when connecting -// or disconnecting the node. -// -// Note: linUsage and logUsage are values used with constantly growing offsets so -// even though they are close to each other at any time they may wrap around int64 -// limits over time. Comparison should be performed accordingly. -type freeClientPoolEntry struct { - address string - connected bool - disconnectFn func() - linUsage, logUsage int64 - index int -} - -func (e *freeClientPoolEntry) EncodeRLP(w io.Writer) error { - return rlp.Encode(w, []interface{}{e.address, uint64(e.logUsage)}) -} - -func (e *freeClientPoolEntry) DecodeRLP(s *rlp.Stream) error { - var entry struct { - Address string - LogUsage uint64 - } - if err := s.Decode(&entry); err != nil { - return err - } - e.address = entry.Address - e.logUsage = int64(entry.LogUsage) - e.connected = false - e.index = -1 - return nil -} - -// poolSetIndex callback is used by both priority queues to set/update the index of -// the element in the queue. Index is needed to remove elements other than the top one. -func poolSetIndex(a interface{}, i int) { - a.(*freeClientPoolEntry).index = i -} diff --git a/les/freeclient_test.go b/les/freeclient_test.go deleted file mode 100644 index de879fc12..000000000 --- a/les/freeclient_test.go +++ /dev/null @@ -1,139 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package light implements on-demand retrieval capable state and chain objects -// for the Ethereum Light Client. -package les - -import ( - "fmt" - "math/rand" - "testing" - "time" - - "github.com/dexon-foundation/dexon/common/mclock" - "github.com/dexon-foundation/dexon/ethdb" -) - -func TestFreeClientPoolL10C100(t *testing.T) { - testFreeClientPool(t, 10, 100) -} - -func TestFreeClientPoolL40C200(t *testing.T) { - testFreeClientPool(t, 40, 200) -} - -func TestFreeClientPoolL100C300(t *testing.T) { - testFreeClientPool(t, 100, 300) -} - -const testFreeClientPoolTicks = 500000 - -func testFreeClientPool(t *testing.T, connLimit, clientCount int) { - var ( - clock mclock.Simulated - db = ethdb.NewMemDatabase() - pool = newFreeClientPool(db, connLimit, 10000, &clock) - connected = make([]bool, clientCount) - connTicks = make([]int, clientCount) - disconnCh = make(chan int, clientCount) - ) - peerId := func(i int) string { - return fmt.Sprintf("test peer #%d", i) - } - disconnFn := func(i int) func() { - return func() { - disconnCh <- i - } - } - - // pool should accept new peers up to its connected limit - for i := 0; i < connLimit; i++ { - if pool.connect(peerId(i), disconnFn(i)) { - connected[i] = true - } else { - t.Fatalf("Test peer #%d rejected", i) - } - } - // since all accepted peers are new and should not be kicked out, the next one should be rejected - if pool.connect(peerId(connLimit), disconnFn(connLimit)) { - connected[connLimit] = true - t.Fatalf("Peer accepted over connected limit") - } - - // randomly connect and disconnect peers, expect to have a similar total connection time at the end - for tickCounter := 0; tickCounter < testFreeClientPoolTicks; tickCounter++ { - clock.Run(1 * time.Second) - - i := rand.Intn(clientCount) - if connected[i] { - pool.disconnect(peerId(i)) - connected[i] = false - connTicks[i] += tickCounter - } else { - if pool.connect(peerId(i), disconnFn(i)) { - connected[i] = true - connTicks[i] -= tickCounter - } - } - pollDisconnects: - for { - select { - case i := <-disconnCh: - pool.disconnect(peerId(i)) - if connected[i] { - connTicks[i] += tickCounter - connected[i] = false - } - default: - break pollDisconnects - } - } - } - - expTicks := testFreeClientPoolTicks * connLimit / clientCount - expMin := expTicks - expTicks/10 - expMax := expTicks + expTicks/10 - - // check if the total connected time of peers are all in the expected range - for i, c := range connected { - if c { - connTicks[i] += testFreeClientPoolTicks - } - if connTicks[i] < expMin || connTicks[i] > expMax { - t.Errorf("Total connected time of test node #%d (%d) outside expected range (%d to %d)", i, connTicks[i], expMin, expMax) - } - } - - // a previously unknown peer should be accepted now - if !pool.connect("newPeer", func() {}) { - t.Fatalf("Previously unknown peer rejected") - } - - // close and restart pool - pool.stop() - pool = newFreeClientPool(db, connLimit, 10000, &clock) - - // try connecting all known peers (connLimit should be filled up) - for i := 0; i < clientCount; i++ { - pool.connect(peerId(i), func() {}) - } - // expect pool to remember known nodes and kick out one of them to accept a new one - if !pool.connect("newPeer2", func() {}) { - t.Errorf("Previously unknown peer rejected after restarting pool") - } - pool.stop() -} diff --git a/les/handler.go b/les/handler.go deleted file mode 100644 index 9c0aa0c29..000000000 --- a/les/handler.go +++ /dev/null @@ -1,1250 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package les implements the Light Ethereum Subprotocol. -package les - -import ( - "encoding/binary" - "encoding/json" - "fmt" - "math/big" - "net" - "sync" - "time" - - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/common/mclock" - "github.com/dexon-foundation/dexon/consensus" - "github.com/dexon-foundation/dexon/core" - "github.com/dexon-foundation/dexon/core/rawdb" - "github.com/dexon-foundation/dexon/core/state" - "github.com/dexon-foundation/dexon/core/types" - "github.com/dexon-foundation/dexon/eth/downloader" - "github.com/dexon-foundation/dexon/ethdb" - "github.com/dexon-foundation/dexon/event" - "github.com/dexon-foundation/dexon/light" - "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/p2p" - "github.com/dexon-foundation/dexon/p2p/discv5" - "github.com/dexon-foundation/dexon/params" - "github.com/dexon-foundation/dexon/rlp" - "github.com/dexon-foundation/dexon/trie" -) - -const ( - softResponseLimit = 2 * 1024 * 1024 // Target maximum size of returned blocks, headers or node data. - estHeaderRlpSize = 500 // Approximate size of an RLP encoded block header - - ethVersion = 63 // equivalent eth version for the downloader - - MaxHeaderFetch = 192 // Amount of block headers to be fetched per retrieval request - MaxBodyFetch = 32 // Amount of block bodies to be fetched per retrieval request - MaxReceiptFetch = 128 // Amount of transaction receipts to allow fetching per request - MaxCodeFetch = 64 // Amount of contract codes to allow fetching per request - MaxProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request - MaxHelperTrieProofsFetch = 64 // Amount of merkle proofs to be fetched per retrieval request - MaxTxSend = 64 // Amount of transactions to be send per request - MaxTxStatus = 256 // Amount of transactions to queried per request - - disableClientRemovePeer = false -) - -func errResp(code errCode, format string, v ...interface{}) error { - return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) -} - -type BlockChain interface { - Config() *params.ChainConfig - HasHeader(hash common.Hash, number uint64) bool - GetHeader(hash common.Hash, number uint64) *types.Header - GetHeaderByHash(hash common.Hash) *types.Header - CurrentHeader() *types.Header - GetTd(hash common.Hash, number uint64) *big.Int - State() (*state.StateDB, error) - InsertHeaderChain(chain []*types.Header, checkFreq int) (int, error) - Rollback(chain []common.Hash) - GetHeaderByNumber(number uint64) *types.Header - GetAncestor(hash common.Hash, number, ancestor uint64, maxNonCanonical *uint64) (common.Hash, uint64) - Genesis() *types.Block - SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription -} - -type txPool interface { - AddRemotes(txs []*types.Transaction) []error - Status(hashes []common.Hash) []core.TxStatus -} - -type ProtocolManager struct { - lightSync bool - txpool txPool - txrelay *LesTxRelay - networkId uint64 - chainConfig *params.ChainConfig - iConfig *light.IndexerConfig - blockchain BlockChain - chainDb ethdb.Database - odr *LesOdr - server *LesServer - serverPool *serverPool - clientPool *freeClientPool - lesTopic discv5.Topic - reqDist *requestDistributor - retriever *retrieveManager - - downloader *downloader.Downloader - fetcher *lightFetcher - peers *peerSet - maxPeers int - - eventMux *event.TypeMux - - // channels for fetcher, syncer, txsyncLoop - newPeerCh chan *peer - quitSync chan struct{} - noMorePeers chan struct{} - - // wait group is used for graceful shutdowns during downloading - // and processing - wg *sync.WaitGroup -} - -// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable -// with the ethereum network. -func NewProtocolManager(chainConfig *params.ChainConfig, indexerConfig *light.IndexerConfig, lightSync bool, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, serverPool *serverPool, quitSync chan struct{}, wg *sync.WaitGroup) (*ProtocolManager, error) { - // Create the protocol manager with the base fields - manager := &ProtocolManager{ - lightSync: lightSync, - eventMux: mux, - blockchain: blockchain, - chainConfig: chainConfig, - iConfig: indexerConfig, - chainDb: chainDb, - odr: odr, - networkId: networkId, - txpool: txpool, - txrelay: txrelay, - serverPool: serverPool, - peers: peers, - newPeerCh: make(chan *peer), - quitSync: quitSync, - wg: wg, - noMorePeers: make(chan struct{}), - } - if odr != nil { - manager.retriever = odr.retriever - manager.reqDist = odr.retriever.dist - } - - removePeer := manager.removePeer - if disableClientRemovePeer { - removePeer = func(id string) {} - } - - if lightSync { - manager.downloader = downloader.New(downloader.LightSync, chainDb, manager.eventMux, nil, blockchain, removePeer) - manager.peers.notify((*downloaderPeerNotify)(manager)) - manager.fetcher = newLightFetcher(manager) - } - - return manager, nil -} - -// removePeer initiates disconnection from a peer by removing it from the peer set -func (pm *ProtocolManager) removePeer(id string) { - pm.peers.Unregister(id) -} - -func (pm *ProtocolManager) Start(maxPeers int) { - pm.maxPeers = maxPeers - - if pm.lightSync { - go pm.syncer() - } else { - pm.clientPool = newFreeClientPool(pm.chainDb, maxPeers, 10000, mclock.System{}) - go func() { - for range pm.newPeerCh { - } - }() - } -} - -func (pm *ProtocolManager) Stop() { - // Showing a log message. During download / process this could actually - // take between 5 to 10 seconds and therefor feedback is required. - log.Info("Stopping light Ethereum protocol") - - // Quit the sync loop. - // After this send has completed, no new peers will be accepted. - pm.noMorePeers <- struct{}{} - - close(pm.quitSync) // quits syncer, fetcher - if pm.clientPool != nil { - pm.clientPool.stop() - } - - // Disconnect existing sessions. - // This also closes the gate for any new registrations on the peer set. - // sessions which are already established but not added to pm.peers yet - // will exit when they try to register. - pm.peers.Close() - - // Wait for any process action - pm.wg.Wait() - - log.Info("Light Ethereum protocol stopped") -} - -// runPeer is the p2p protocol run function for the given version. -func (pm *ProtocolManager) runPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter) error { - var entry *poolEntry - peer := pm.newPeer(int(version), pm.networkId, p, rw) - if pm.serverPool != nil { - entry = pm.serverPool.connect(peer, peer.Node()) - } - peer.poolEntry = entry - select { - case pm.newPeerCh <- peer: - pm.wg.Add(1) - defer pm.wg.Done() - err := pm.handle(peer) - if entry != nil { - pm.serverPool.disconnect(entry) - } - return err - case <-pm.quitSync: - if entry != nil { - pm.serverPool.disconnect(entry) - } - return p2p.DiscQuitting - } -} - -func (pm *ProtocolManager) newPeer(pv int, nv uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { - return newPeer(pv, nv, p, newMeteredMsgWriter(rw)) -} - -// handle is the callback invoked to manage the life cycle of a les peer. When -// this function terminates, the peer is disconnected. -func (pm *ProtocolManager) handle(p *peer) error { - // Ignore maxPeers if this is a trusted peer - // In server mode we try to check into the client pool after handshake - if pm.lightSync && pm.peers.Len() >= pm.maxPeers && !p.Peer.Info().Network.Trusted { - return p2p.DiscTooManyPeers - } - - p.Log().Debug("Light Ethereum peer connected", "name", p.Name()) - - // Execute the LES handshake - var ( - genesis = pm.blockchain.Genesis() - head = pm.blockchain.CurrentHeader() - hash = head.Hash() - number = head.Number.Uint64() - td = pm.blockchain.GetTd(hash, number) - ) - if err := p.Handshake(td, hash, number, genesis.Hash(), pm.server); err != nil { - p.Log().Debug("Light Ethereum handshake failed", "err", err) - return err - } - - if !pm.lightSync && !p.Peer.Info().Network.Trusted { - addr, ok := p.RemoteAddr().(*net.TCPAddr) - // test peer address is not a tcp address, don't use client pool if can not typecast - if ok { - id := addr.IP.String() - if !pm.clientPool.connect(id, func() { go pm.removePeer(p.id) }) { - return p2p.DiscTooManyPeers - } - defer pm.clientPool.disconnect(id) - } - } - - if rw, ok := p.rw.(*meteredMsgReadWriter); ok { - rw.Init(p.version) - } - // Register the peer locally - if err := pm.peers.Register(p); err != nil { - p.Log().Error("Light Ethereum peer registration failed", "err", err) - return err - } - defer func() { - if pm.server != nil && pm.server.fcManager != nil && p.fcClient != nil { - p.fcClient.Remove(pm.server.fcManager) - } - pm.removePeer(p.id) - }() - // Register the peer in the downloader. If the downloader considers it banned, we disconnect - if pm.lightSync { - p.lock.Lock() - head := p.headInfo - p.lock.Unlock() - if pm.fetcher != nil { - pm.fetcher.announce(p, head) - } - - if p.poolEntry != nil { - pm.serverPool.registered(p.poolEntry) - } - } - - stop := make(chan struct{}) - defer close(stop) - go func() { - // new block announce loop - for { - select { - case announce := <-p.announceChn: - p.SendAnnounce(announce) - case <-stop: - return - } - } - }() - - // main loop. handle incoming messages. - for { - if err := pm.handleMsg(p); err != nil { - p.Log().Debug("Light Ethereum message handling failed", "err", err) - return err - } - } -} - -var reqList = []uint64{GetBlockHeadersMsg, GetBlockBodiesMsg, GetCodeMsg, GetReceiptsMsg, GetProofsV1Msg, SendTxMsg, SendTxV2Msg, GetTxStatusMsg, GetHeaderProofsMsg, GetProofsV2Msg, GetHelperTrieProofsMsg} - -// handleMsg is invoked whenever an inbound message is received from a remote -// peer. The remote connection is torn down upon returning any error. -func (pm *ProtocolManager) handleMsg(p *peer) error { - // Read the next message from the remote peer, and ensure it's fully consumed - msg, err := p.rw.ReadMsg() - if err != nil { - return err - } - p.Log().Trace("Light Ethereum message arrived", "code", msg.Code, "bytes", msg.Size) - - costs := p.fcCosts[msg.Code] - reject := func(reqCnt, maxCnt uint64) bool { - if p.fcClient == nil || reqCnt > maxCnt { - return true - } - bufValue, _ := p.fcClient.AcceptRequest() - cost := costs.baseCost + reqCnt*costs.reqCost - if cost > pm.server.defParams.BufLimit { - cost = pm.server.defParams.BufLimit - } - if cost > bufValue { - recharge := time.Duration((cost - bufValue) * 1000000 / pm.server.defParams.MinRecharge) - p.Log().Error("Request came too early", "recharge", common.PrettyDuration(recharge)) - return true - } - return false - } - - if msg.Size > ProtocolMaxMsgSize { - return errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) - } - defer msg.Discard() - - var deliverMsg *Msg - - // Handle the message depending on its contents - switch msg.Code { - case StatusMsg: - p.Log().Trace("Received status message") - // Status messages should never arrive after the handshake - return errResp(ErrExtraStatusMsg, "uncontrolled status message") - - // Block header query, collect the requested headers and reply - case AnnounceMsg: - p.Log().Trace("Received announce message") - if p.requestAnnounceType == announceTypeNone { - return errResp(ErrUnexpectedResponse, "") - } - - var req announceData - if err := msg.Decode(&req); err != nil { - return errResp(ErrDecode, "%v: %v", msg, err) - } - - if p.requestAnnounceType == announceTypeSigned { - if err := req.checkSignature(p.ID()); err != nil { - p.Log().Trace("Invalid announcement signature", "err", err) - return err - } - p.Log().Trace("Valid announcement signature") - } - - p.Log().Trace("Announce message content", "number", req.Number, "hash", req.Hash, "td", req.Td, "reorg", req.ReorgDepth) - if pm.fetcher != nil { - pm.fetcher.announce(p, &req) - } - - case GetBlockHeadersMsg: - p.Log().Trace("Received block header request") - // Decode the complex header query - var req struct { - ReqID uint64 - Query getBlockHeadersData - } - if err := msg.Decode(&req); err != nil { - return errResp(ErrDecode, "%v: %v", msg, err) - } - - query := req.Query - if reject(query.Amount, MaxHeaderFetch) { - return errResp(ErrRequestRejected, "") - } - - hashMode := query.Origin.Hash != (common.Hash{}) - first := true - maxNonCanonical := uint64(100) - - // Gather headers until the fetch or network limits is reached - var ( - bytes common.StorageSize - headers []*types.Header - unknown bool - ) - for !unknown && len(headers) < int(query.Amount) && bytes < softResponseLimit { - // Retrieve the next header satisfying the query - var origin *types.Header - if hashMode { - if first { - first = false - origin = pm.blockchain.GetHeaderByHash(query.Origin.Hash) - if origin != nil { - query.Origin.Number = origin.Number.Uint64() - } - } else { - origin = pm.blockchain.GetHeader(query.Origin.Hash, query.Origin.Number) - } - } else { - origin = pm.blockchain.GetHeaderByNumber(query.Origin.Number) - } - if origin == nil { - break - } - headers = append(headers, origin) - bytes += estHeaderRlpSize - - // Advance to the next header of the query - switch { - case hashMode && query.Reverse: - // Hash based traversal towards the genesis block - ancestor := query.Skip + 1 - if ancestor == 0 { - unknown = true - } else { - query.Origin.Hash, query.Origin.Number = pm.blockchain.GetAncestor(query.Origin.Hash, query.Origin.Number, ancestor, &maxNonCanonical) - unknown = (query.Origin.Hash == common.Hash{}) - } - case hashMode && !query.Reverse: - // Hash based traversal towards the leaf block - var ( - current = origin.Number.Uint64() - next = current + query.Skip + 1 - ) - if next <= current { - infos, _ := json.MarshalIndent(p.Peer.Info(), "", " ") - p.Log().Warn("GetBlockHeaders skip overflow attack", "current", current, "skip", query.Skip, "next", next, "attacker", infos) - unknown = true - } else { - if header := pm.blockchain.GetHeaderByNumber(next); header != nil { - nextHash := header.Hash() - expOldHash, _ := pm.blockchain.GetAncestor(nextHash, next, query.Skip+1, &maxNonCanonical) - if expOldHash == query.Origin.Hash { - query.Origin.Hash, query.Origin.Number = nextHash, next - } else { - unknown = true - } - } else { - unknown = true - } - } - case query.Reverse: - // Number based traversal towards the genesis block - if query.Origin.Number >= query.Skip+1 { - query.Origin.Number -= query.Skip + 1 - } else { - unknown = true - } - - case !query.Reverse: - // Number based traversal towards the leaf block - query.Origin.Number += query.Skip + 1 - } - } - - bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + query.Amount*costs.reqCost) - pm.server.fcCostStats.update(msg.Code, query.Amount, rcost) - return p.SendBlockHeaders(req.ReqID, bv, headers) - - case BlockHeadersMsg: - if pm.downloader == nil { - return errResp(ErrUnexpectedResponse, "") - } - - p.Log().Trace("Received block header response message") - // A batch of headers arrived to one of our previous requests - var resp struct { - ReqID, BV uint64 - Headers []*types.Header - } - if err := msg.Decode(&resp); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - p.fcServer.GotReply(resp.ReqID, resp.BV) - if pm.fetcher != nil && pm.fetcher.requestedID(resp.ReqID) { - pm.fetcher.deliverHeaders(p, resp.ReqID, resp.Headers) - } else { - err := pm.downloader.DeliverHeaders(p.id, resp.Headers) - if err != nil { - log.Debug(fmt.Sprint(err)) - } - } - - case GetBlockBodiesMsg: - p.Log().Trace("Received block bodies request") - // Decode the retrieval message - var req struct { - ReqID uint64 - Hashes []common.Hash - } - if err := msg.Decode(&req); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - // Gather blocks until the fetch or network limits is reached - var ( - bytes int - bodies []rlp.RawValue - ) - reqCnt := len(req.Hashes) - if reject(uint64(reqCnt), MaxBodyFetch) { - return errResp(ErrRequestRejected, "") - } - for _, hash := range req.Hashes { - if bytes >= softResponseLimit { - break - } - // Retrieve the requested block body, stopping if enough was found - if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil { - if data := rawdb.ReadBodyRLP(pm.chainDb, hash, *number); len(data) != 0 { - bodies = append(bodies, data) - bytes += len(data) - } - } - } - bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) - pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) - return p.SendBlockBodiesRLP(req.ReqID, bv, bodies) - - case BlockBodiesMsg: - if pm.odr == nil { - return errResp(ErrUnexpectedResponse, "") - } - - p.Log().Trace("Received block bodies response") - // A batch of block bodies arrived to one of our previous requests - var resp struct { - ReqID, BV uint64 - Data []*types.Body - } - if err := msg.Decode(&resp); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - p.fcServer.GotReply(resp.ReqID, resp.BV) - deliverMsg = &Msg{ - MsgType: MsgBlockBodies, - ReqID: resp.ReqID, - Obj: resp.Data, - } - - case GetCodeMsg: - p.Log().Trace("Received code request") - // Decode the retrieval message - var req struct { - ReqID uint64 - Reqs []CodeReq - } - if err := msg.Decode(&req); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - // Gather state data until the fetch or network limits is reached - var ( - bytes int - data [][]byte - ) - reqCnt := len(req.Reqs) - if reject(uint64(reqCnt), MaxCodeFetch) { - return errResp(ErrRequestRejected, "") - } - for _, req := range req.Reqs { - // Retrieve the requested state entry, stopping if enough was found - if number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number != nil { - if header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header != nil { - statedb, err := pm.blockchain.State() - if err != nil { - continue - } - account, err := pm.getAccount(statedb, header.Root, common.BytesToHash(req.AccKey)) - if err != nil { - continue - } - code, _ := statedb.Database().TrieDB().Node(common.BytesToHash(account.CodeHash)) - - data = append(data, code) - if bytes += len(code); bytes >= softResponseLimit { - break - } - } - } - } - bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) - pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) - return p.SendCode(req.ReqID, bv, data) - - case CodeMsg: - if pm.odr == nil { - return errResp(ErrUnexpectedResponse, "") - } - - p.Log().Trace("Received code response") - // A batch of node state data arrived to one of our previous requests - var resp struct { - ReqID, BV uint64 - Data [][]byte - } - if err := msg.Decode(&resp); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - p.fcServer.GotReply(resp.ReqID, resp.BV) - deliverMsg = &Msg{ - MsgType: MsgCode, - ReqID: resp.ReqID, - Obj: resp.Data, - } - - case GetReceiptsMsg: - p.Log().Trace("Received receipts request") - // Decode the retrieval message - var req struct { - ReqID uint64 - Hashes []common.Hash - } - if err := msg.Decode(&req); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - // Gather state data until the fetch or network limits is reached - var ( - bytes int - receipts []rlp.RawValue - ) - reqCnt := len(req.Hashes) - if reject(uint64(reqCnt), MaxReceiptFetch) { - return errResp(ErrRequestRejected, "") - } - for _, hash := range req.Hashes { - if bytes >= softResponseLimit { - break - } - // Retrieve the requested block's receipts, skipping if unknown to us - var results types.Receipts - if number := rawdb.ReadHeaderNumber(pm.chainDb, hash); number != nil { - results = rawdb.ReadReceipts(pm.chainDb, hash, *number) - } - if results == nil { - if header := pm.blockchain.GetHeaderByHash(hash); header == nil || header.ReceiptHash != types.EmptyRootHash { - continue - } - } - // If known, encode and queue for response packet - if encoded, err := rlp.EncodeToBytes(results); err != nil { - log.Error("Failed to encode receipt", "err", err) - } else { - receipts = append(receipts, encoded) - bytes += len(encoded) - } - } - bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) - pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) - return p.SendReceiptsRLP(req.ReqID, bv, receipts) - - case ReceiptsMsg: - if pm.odr == nil { - return errResp(ErrUnexpectedResponse, "") - } - - p.Log().Trace("Received receipts response") - // A batch of receipts arrived to one of our previous requests - var resp struct { - ReqID, BV uint64 - Receipts []types.Receipts - } - if err := msg.Decode(&resp); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - p.fcServer.GotReply(resp.ReqID, resp.BV) - deliverMsg = &Msg{ - MsgType: MsgReceipts, - ReqID: resp.ReqID, - Obj: resp.Receipts, - } - - case GetProofsV1Msg: - p.Log().Trace("Received proofs request") - // Decode the retrieval message - var req struct { - ReqID uint64 - Reqs []ProofReq - } - if err := msg.Decode(&req); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - // Gather state data until the fetch or network limits is reached - var ( - bytes int - proofs proofsData - ) - reqCnt := len(req.Reqs) - if reject(uint64(reqCnt), MaxProofsFetch) { - return errResp(ErrRequestRejected, "") - } - for _, req := range req.Reqs { - // Retrieve the requested state entry, stopping if enough was found - if number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number != nil { - if header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header != nil { - statedb, err := pm.blockchain.State() - if err != nil { - continue - } - var trie state.Trie - if len(req.AccKey) > 0 { - account, err := pm.getAccount(statedb, header.Root, common.BytesToHash(req.AccKey)) - if err != nil { - continue - } - trie, _ = statedb.Database().OpenStorageTrie(common.BytesToHash(req.AccKey), account.Root) - } else { - trie, _ = statedb.Database().OpenTrie(header.Root) - } - if trie != nil { - var proof light.NodeList - trie.Prove(req.Key, 0, &proof) - - proofs = append(proofs, proof) - if bytes += proof.DataSize(); bytes >= softResponseLimit { - break - } - } - } - } - } - bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) - pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) - return p.SendProofs(req.ReqID, bv, proofs) - - case GetProofsV2Msg: - p.Log().Trace("Received les/2 proofs request") - // Decode the retrieval message - var req struct { - ReqID uint64 - Reqs []ProofReq - } - if err := msg.Decode(&req); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - // Gather state data until the fetch or network limits is reached - var ( - lastBHash common.Hash - statedb *state.StateDB - root common.Hash - ) - reqCnt := len(req.Reqs) - if reject(uint64(reqCnt), MaxProofsFetch) { - return errResp(ErrRequestRejected, "") - } - - nodes := light.NewNodeSet() - - for _, req := range req.Reqs { - // Look up the state belonging to the request - if statedb == nil || req.BHash != lastBHash { - statedb, root, lastBHash = nil, common.Hash{}, req.BHash - - if number := rawdb.ReadHeaderNumber(pm.chainDb, req.BHash); number != nil { - if header := rawdb.ReadHeader(pm.chainDb, req.BHash, *number); header != nil { - statedb, _ = pm.blockchain.State() - root = header.Root - } - } - } - if statedb == nil { - continue - } - // Pull the account or storage trie of the request - var trie state.Trie - if len(req.AccKey) > 0 { - account, err := pm.getAccount(statedb, root, common.BytesToHash(req.AccKey)) - if err != nil { - continue - } - trie, _ = statedb.Database().OpenStorageTrie(common.BytesToHash(req.AccKey), account.Root) - } else { - trie, _ = statedb.Database().OpenTrie(root) - } - if trie == nil { - continue - } - // Prove the user's request from the account or stroage trie - trie.Prove(req.Key, req.FromLevel, nodes) - if nodes.DataSize() >= softResponseLimit { - break - } - } - bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) - pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) - return p.SendProofsV2(req.ReqID, bv, nodes.NodeList()) - - case ProofsV1Msg: - if pm.odr == nil { - return errResp(ErrUnexpectedResponse, "") - } - - p.Log().Trace("Received proofs response") - // A batch of merkle proofs arrived to one of our previous requests - var resp struct { - ReqID, BV uint64 - Data []light.NodeList - } - if err := msg.Decode(&resp); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - p.fcServer.GotReply(resp.ReqID, resp.BV) - deliverMsg = &Msg{ - MsgType: MsgProofsV1, - ReqID: resp.ReqID, - Obj: resp.Data, - } - - case ProofsV2Msg: - if pm.odr == nil { - return errResp(ErrUnexpectedResponse, "") - } - - p.Log().Trace("Received les/2 proofs response") - // A batch of merkle proofs arrived to one of our previous requests - var resp struct { - ReqID, BV uint64 - Data light.NodeList - } - if err := msg.Decode(&resp); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - p.fcServer.GotReply(resp.ReqID, resp.BV) - deliverMsg = &Msg{ - MsgType: MsgProofsV2, - ReqID: resp.ReqID, - Obj: resp.Data, - } - - case GetHeaderProofsMsg: - p.Log().Trace("Received headers proof request") - // Decode the retrieval message - var req struct { - ReqID uint64 - Reqs []ChtReq - } - if err := msg.Decode(&req); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - // Gather state data until the fetch or network limits is reached - var ( - bytes int - proofs []ChtResp - ) - reqCnt := len(req.Reqs) - if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) { - return errResp(ErrRequestRejected, "") - } - trieDb := trie.NewDatabase(ethdb.NewTable(pm.chainDb, light.ChtTablePrefix)) - for _, req := range req.Reqs { - if header := pm.blockchain.GetHeaderByNumber(req.BlockNum); header != nil { - sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, req.ChtNum*pm.iConfig.ChtSize-1) - if root := light.GetChtRoot(pm.chainDb, req.ChtNum-1, sectionHead); root != (common.Hash{}) { - trie, err := trie.New(root, trieDb) - if err != nil { - continue - } - var encNumber [8]byte - binary.BigEndian.PutUint64(encNumber[:], req.BlockNum) - - var proof light.NodeList - trie.Prove(encNumber[:], 0, &proof) - - proofs = append(proofs, ChtResp{Header: header, Proof: proof}) - if bytes += proof.DataSize() + estHeaderRlpSize; bytes >= softResponseLimit { - break - } - } - } - } - bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) - pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) - return p.SendHeaderProofs(req.ReqID, bv, proofs) - - case GetHelperTrieProofsMsg: - p.Log().Trace("Received helper trie proof request") - // Decode the retrieval message - var req struct { - ReqID uint64 - Reqs []HelperTrieReq - } - if err := msg.Decode(&req); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - // Gather state data until the fetch or network limits is reached - var ( - auxBytes int - auxData [][]byte - ) - reqCnt := len(req.Reqs) - if reject(uint64(reqCnt), MaxHelperTrieProofsFetch) { - return errResp(ErrRequestRejected, "") - } - - var ( - lastIdx uint64 - lastType uint - root common.Hash - auxTrie *trie.Trie - ) - nodes := light.NewNodeSet() - for _, req := range req.Reqs { - if auxTrie == nil || req.Type != lastType || req.TrieIdx != lastIdx { - auxTrie, lastType, lastIdx = nil, req.Type, req.TrieIdx - - var prefix string - if root, prefix = pm.getHelperTrie(req.Type, req.TrieIdx); root != (common.Hash{}) { - auxTrie, _ = trie.New(root, trie.NewDatabase(ethdb.NewTable(pm.chainDb, prefix))) - } - } - if req.AuxReq == auxRoot { - var data []byte - if root != (common.Hash{}) { - data = root[:] - } - auxData = append(auxData, data) - auxBytes += len(data) - } else { - if auxTrie != nil { - auxTrie.Prove(req.Key, req.FromLevel, nodes) - } - if req.AuxReq != 0 { - data := pm.getHelperTrieAuxData(req) - auxData = append(auxData, data) - auxBytes += len(data) - } - } - if nodes.DataSize()+auxBytes >= softResponseLimit { - break - } - } - bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) - pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) - return p.SendHelperTrieProofs(req.ReqID, bv, HelperTrieResps{Proofs: nodes.NodeList(), AuxData: auxData}) - - case HeaderProofsMsg: - if pm.odr == nil { - return errResp(ErrUnexpectedResponse, "") - } - - p.Log().Trace("Received headers proof response") - var resp struct { - ReqID, BV uint64 - Data []ChtResp - } - if err := msg.Decode(&resp); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - p.fcServer.GotReply(resp.ReqID, resp.BV) - deliverMsg = &Msg{ - MsgType: MsgHeaderProofs, - ReqID: resp.ReqID, - Obj: resp.Data, - } - - case HelperTrieProofsMsg: - if pm.odr == nil { - return errResp(ErrUnexpectedResponse, "") - } - - p.Log().Trace("Received helper trie proof response") - var resp struct { - ReqID, BV uint64 - Data HelperTrieResps - } - if err := msg.Decode(&resp); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - - p.fcServer.GotReply(resp.ReqID, resp.BV) - deliverMsg = &Msg{ - MsgType: MsgHelperTrieProofs, - ReqID: resp.ReqID, - Obj: resp.Data, - } - - case SendTxMsg: - if pm.txpool == nil { - return errResp(ErrRequestRejected, "") - } - // Transactions arrived, parse all of them and deliver to the pool - var txs []*types.Transaction - if err := msg.Decode(&txs); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - reqCnt := len(txs) - if reject(uint64(reqCnt), MaxTxSend) { - return errResp(ErrRequestRejected, "") - } - pm.txpool.AddRemotes(txs) - - _, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) - pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) - - case SendTxV2Msg: - if pm.txpool == nil { - return errResp(ErrRequestRejected, "") - } - // Transactions arrived, parse all of them and deliver to the pool - var req struct { - ReqID uint64 - Txs []*types.Transaction - } - if err := msg.Decode(&req); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - reqCnt := len(req.Txs) - if reject(uint64(reqCnt), MaxTxSend) { - return errResp(ErrRequestRejected, "") - } - - hashes := make([]common.Hash, len(req.Txs)) - for i, tx := range req.Txs { - hashes[i] = tx.Hash() - } - stats := pm.txStatus(hashes) - for i, stat := range stats { - if stat.Status == core.TxStatusUnknown { - if errs := pm.txpool.AddRemotes([]*types.Transaction{req.Txs[i]}); errs[0] != nil { - stats[i].Error = errs[0].Error() - continue - } - stats[i] = pm.txStatus([]common.Hash{hashes[i]})[0] - } - } - - bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) - pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) - - return p.SendTxStatus(req.ReqID, bv, stats) - - case GetTxStatusMsg: - if pm.txpool == nil { - return errResp(ErrUnexpectedResponse, "") - } - // Transactions arrived, parse all of them and deliver to the pool - var req struct { - ReqID uint64 - Hashes []common.Hash - } - if err := msg.Decode(&req); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - reqCnt := len(req.Hashes) - if reject(uint64(reqCnt), MaxTxStatus) { - return errResp(ErrRequestRejected, "") - } - bv, rcost := p.fcClient.RequestProcessed(costs.baseCost + uint64(reqCnt)*costs.reqCost) - pm.server.fcCostStats.update(msg.Code, uint64(reqCnt), rcost) - - return p.SendTxStatus(req.ReqID, bv, pm.txStatus(req.Hashes)) - - case TxStatusMsg: - if pm.odr == nil { - return errResp(ErrUnexpectedResponse, "") - } - - p.Log().Trace("Received tx status response") - var resp struct { - ReqID, BV uint64 - Status []txStatus - } - if err := msg.Decode(&resp); err != nil { - return errResp(ErrDecode, "msg %v: %v", msg, err) - } - - p.fcServer.GotReply(resp.ReqID, resp.BV) - - default: - p.Log().Trace("Received unknown message", "code", msg.Code) - return errResp(ErrInvalidMsgCode, "%v", msg.Code) - } - - if deliverMsg != nil { - err := pm.retriever.deliver(p, deliverMsg) - if err != nil { - p.responseErrors++ - if p.responseErrors > maxResponseErrors { - return err - } - } - } - return nil -} - -// getAccount retrieves an account from the state based at root. -func (pm *ProtocolManager) getAccount(statedb *state.StateDB, root, hash common.Hash) (state.Account, error) { - trie, err := trie.New(root, statedb.Database().TrieDB()) - if err != nil { - return state.Account{}, err - } - blob, err := trie.TryGet(hash[:]) - if err != nil { - return state.Account{}, err - } - var account state.Account - if err = rlp.DecodeBytes(blob, &account); err != nil { - return state.Account{}, err - } - return account, nil -} - -// getHelperTrie returns the post-processed trie root for the given trie ID and section index -func (pm *ProtocolManager) getHelperTrie(id uint, idx uint64) (common.Hash, string) { - switch id { - case htCanonical: - idxV1 := (idx+1)*(pm.iConfig.PairChtSize/pm.iConfig.ChtSize) - 1 - sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idxV1+1)*pm.iConfig.ChtSize-1) - return light.GetChtRoot(pm.chainDb, idxV1, sectionHead), light.ChtTablePrefix - case htBloomBits: - sectionHead := rawdb.ReadCanonicalHash(pm.chainDb, (idx+1)*pm.iConfig.BloomTrieSize-1) - return light.GetBloomTrieRoot(pm.chainDb, idx, sectionHead), light.BloomTrieTablePrefix - } - return common.Hash{}, "" -} - -// getHelperTrieAuxData returns requested auxiliary data for the given HelperTrie request -func (pm *ProtocolManager) getHelperTrieAuxData(req HelperTrieReq) []byte { - if req.Type == htCanonical && req.AuxReq == auxHeader && len(req.Key) == 8 { - blockNum := binary.BigEndian.Uint64(req.Key) - hash := rawdb.ReadCanonicalHash(pm.chainDb, blockNum) - return rawdb.ReadHeaderRLP(pm.chainDb, hash, blockNum) - } - return nil -} - -func (pm *ProtocolManager) txStatus(hashes []common.Hash) []txStatus { - stats := make([]txStatus, len(hashes)) - for i, stat := range pm.txpool.Status(hashes) { - // Save the status we've got from the transaction pool - stats[i].Status = stat - - // If the transaction is unknown to the pool, try looking it up locally - if stat == core.TxStatusUnknown { - if block, number, index := rawdb.ReadTxLookupEntry(pm.chainDb, hashes[i]); block != (common.Hash{}) { - stats[i].Status = core.TxStatusIncluded - stats[i].Lookup = &rawdb.TxLookupEntry{BlockHash: block, BlockIndex: number, Index: index} - } - } - } - return stats -} - -// downloaderPeerNotify implements peerSetNotify -type downloaderPeerNotify ProtocolManager - -type peerConnection struct { - manager *ProtocolManager - peer *peer -} - -func (pc *peerConnection) Head() (common.Hash, *big.Int) { - return pc.peer.HeadAndTd() -} - -func (pc *peerConnection) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error { - reqID := genReqID() - rq := &distReq{ - getCost: func(dp distPeer) uint64 { - peer := dp.(*peer) - return peer.GetRequestCost(GetBlockHeadersMsg, amount) - }, - canSend: func(dp distPeer) bool { - return dp.(*peer) == pc.peer - }, - request: func(dp distPeer) func() { - peer := dp.(*peer) - cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) - peer.fcServer.QueueRequest(reqID, cost) - return func() { peer.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } - }, - } - _, ok := <-pc.manager.reqDist.queue(rq) - if !ok { - return light.ErrNoPeers - } - return nil -} - -func (pc *peerConnection) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { - reqID := genReqID() - rq := &distReq{ - getCost: func(dp distPeer) uint64 { - peer := dp.(*peer) - return peer.GetRequestCost(GetBlockHeadersMsg, amount) - }, - canSend: func(dp distPeer) bool { - return dp.(*peer) == pc.peer - }, - request: func(dp distPeer) func() { - peer := dp.(*peer) - cost := peer.GetRequestCost(GetBlockHeadersMsg, amount) - peer.fcServer.QueueRequest(reqID, cost) - return func() { peer.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) } - }, - } - _, ok := <-pc.manager.reqDist.queue(rq) - if !ok { - return light.ErrNoPeers - } - return nil -} - -func (d *downloaderPeerNotify) registerPeer(p *peer) { - pm := (*ProtocolManager)(d) - pc := &peerConnection{ - manager: pm, - peer: p, - } - pm.downloader.RegisterLightPeer(p.id, ethVersion, pc) -} - -func (d *downloaderPeerNotify) unregisterPeer(p *peer) { - pm := (*ProtocolManager)(d) - pm.downloader.UnregisterPeer(p.id) -} diff --git a/les/handler_test.go b/les/handler_test.go deleted file mode 100644 index 30bf382f1..000000000 --- a/les/handler_test.go +++ /dev/null @@ -1,564 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package les - -import ( - "encoding/binary" - "math/big" - "math/rand" - "testing" - "time" - - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/consensus/ethash" - "github.com/dexon-foundation/dexon/core" - "github.com/dexon-foundation/dexon/core/rawdb" - "github.com/dexon-foundation/dexon/core/types" - "github.com/dexon-foundation/dexon/crypto" - "github.com/dexon-foundation/dexon/eth/downloader" - "github.com/dexon-foundation/dexon/ethdb" - "github.com/dexon-foundation/dexon/light" - "github.com/dexon-foundation/dexon/p2p" - "github.com/dexon-foundation/dexon/params" - "github.com/dexon-foundation/dexon/rlp" - "github.com/dexon-foundation/dexon/trie" -) - -func expectResponse(r p2p.MsgReader, msgcode, reqID, bv uint64, data interface{}) error { - type resp struct { - ReqID, BV uint64 - Data interface{} - } - return p2p.ExpectMsg(r, msgcode, resp{reqID, bv, data}) -} - -// Tests that block headers can be retrieved from a remote chain based on user queries. -func TestGetBlockHeadersLes1(t *testing.T) { testGetBlockHeaders(t, 1) } -func TestGetBlockHeadersLes2(t *testing.T) { testGetBlockHeaders(t, 2) } - -func testGetBlockHeaders(t *testing.T, protocol int) { - server, tearDown := newServerEnv(t, downloader.MaxHashFetch+15, protocol, nil) - defer tearDown() - bc := server.pm.blockchain.(*core.BlockChain) - - // Create a "random" unknown hash for testing - var unknown common.Hash - for i := range unknown { - unknown[i] = byte(i) - } - // Create a batch of tests for various scenarios - limit := uint64(MaxHeaderFetch) - tests := []struct { - query *getBlockHeadersData // The query to execute for header retrieval - expect []common.Hash // The hashes of the block whose headers are expected - }{ - // A single random block should be retrievable by hash and number too - { - &getBlockHeadersData{Origin: hashOrNumber{Hash: bc.GetBlockByNumber(limit / 2).Hash()}, Amount: 1}, - []common.Hash{bc.GetBlockByNumber(limit / 2).Hash()}, - }, { - &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 1}, - []common.Hash{bc.GetBlockByNumber(limit / 2).Hash()}, - }, - // Multiple headers should be retrievable in both directions - { - &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 3}, - []common.Hash{ - bc.GetBlockByNumber(limit / 2).Hash(), - bc.GetBlockByNumber(limit/2 + 1).Hash(), - bc.GetBlockByNumber(limit/2 + 2).Hash(), - }, - }, { - &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Amount: 3, Reverse: true}, - []common.Hash{ - bc.GetBlockByNumber(limit / 2).Hash(), - bc.GetBlockByNumber(limit/2 - 1).Hash(), - bc.GetBlockByNumber(limit/2 - 2).Hash(), - }, - }, - // Multiple headers with skip lists should be retrievable - { - &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3}, - []common.Hash{ - bc.GetBlockByNumber(limit / 2).Hash(), - bc.GetBlockByNumber(limit/2 + 4).Hash(), - bc.GetBlockByNumber(limit/2 + 8).Hash(), - }, - }, { - &getBlockHeadersData{Origin: hashOrNumber{Number: limit / 2}, Skip: 3, Amount: 3, Reverse: true}, - []common.Hash{ - bc.GetBlockByNumber(limit / 2).Hash(), - bc.GetBlockByNumber(limit/2 - 4).Hash(), - bc.GetBlockByNumber(limit/2 - 8).Hash(), - }, - }, - // The chain endpoints should be retrievable - { - &getBlockHeadersData{Origin: hashOrNumber{Number: 0}, Amount: 1}, - []common.Hash{bc.GetBlockByNumber(0).Hash()}, - }, { - &getBlockHeadersData{Origin: hashOrNumber{Number: bc.CurrentBlock().NumberU64()}, Amount: 1}, - []common.Hash{bc.CurrentBlock().Hash()}, - }, - // Ensure protocol limits are honored - /*{ - &getBlockHeadersData{Origin: hashOrNumber{Number: bc.CurrentBlock().NumberU64() - 1}, Amount: limit + 10, Reverse: true}, - bc.GetBlockHashesFromHash(bc.CurrentBlock().Hash(), limit), - },*/ - // Check that requesting more than available is handled gracefully - { - &getBlockHeadersData{Origin: hashOrNumber{Number: bc.CurrentBlock().NumberU64() - 4}, Skip: 3, Amount: 3}, - []common.Hash{ - bc.GetBlockByNumber(bc.CurrentBlock().NumberU64() - 4).Hash(), - bc.GetBlockByNumber(bc.CurrentBlock().NumberU64()).Hash(), - }, - }, { - &getBlockHeadersData{Origin: hashOrNumber{Number: 4}, Skip: 3, Amount: 3, Reverse: true}, - []common.Hash{ - bc.GetBlockByNumber(4).Hash(), - bc.GetBlockByNumber(0).Hash(), - }, - }, - // Check that requesting more than available is handled gracefully, even if mid skip - { - &getBlockHeadersData{Origin: hashOrNumber{Number: bc.CurrentBlock().NumberU64() - 4}, Skip: 2, Amount: 3}, - []common.Hash{ - bc.GetBlockByNumber(bc.CurrentBlock().NumberU64() - 4).Hash(), - bc.GetBlockByNumber(bc.CurrentBlock().NumberU64() - 1).Hash(), - }, - }, { - &getBlockHeadersData{Origin: hashOrNumber{Number: 4}, Skip: 2, Amount: 3, Reverse: true}, - []common.Hash{ - bc.GetBlockByNumber(4).Hash(), - bc.GetBlockByNumber(1).Hash(), - }, - }, - // Check that non existing headers aren't returned - { - &getBlockHeadersData{Origin: hashOrNumber{Hash: unknown}, Amount: 1}, - []common.Hash{}, - }, { - &getBlockHeadersData{Origin: hashOrNumber{Number: bc.CurrentBlock().NumberU64() + 1}, Amount: 1}, - []common.Hash{}, - }, - } - // Run each of the tests and verify the results against the chain - var reqID uint64 - for i, tt := range tests { - // Collect the headers to expect in the response - headers := []*types.Header{} - for _, hash := range tt.expect { - headers = append(headers, bc.GetHeaderByHash(hash)) - } - // Send the hash request and verify the response - reqID++ - cost := server.tPeer.GetRequestCost(GetBlockHeadersMsg, int(tt.query.Amount)) - sendRequest(server.tPeer.app, GetBlockHeadersMsg, reqID, cost, tt.query) - if err := expectResponse(server.tPeer.app, BlockHeadersMsg, reqID, testBufLimit, headers); err != nil { - t.Errorf("test %d: headers mismatch: %v", i, err) - } - } -} - -// Tests that block contents can be retrieved from a remote chain based on their hashes. -func TestGetBlockBodiesLes1(t *testing.T) { testGetBlockBodies(t, 1) } -func TestGetBlockBodiesLes2(t *testing.T) { testGetBlockBodies(t, 2) } - -func testGetBlockBodies(t *testing.T, protocol int) { - server, tearDown := newServerEnv(t, downloader.MaxBlockFetch+15, protocol, nil) - defer tearDown() - bc := server.pm.blockchain.(*core.BlockChain) - - // Create a batch of tests for various scenarios - limit := MaxBodyFetch - tests := []struct { - random int // Number of blocks to fetch randomly from the chain - explicit []common.Hash // Explicitly requested blocks - available []bool // Availability of explicitly requested blocks - expected int // Total number of existing blocks to expect - }{ - {1, nil, nil, 1}, // A single random block should be retrievable - {10, nil, nil, 10}, // Multiple random blocks should be retrievable - {limit, nil, nil, limit}, // The maximum possible blocks should be retrievable - //{limit + 1, nil, nil, limit}, // No more than the possible block count should be returned - {0, []common.Hash{bc.Genesis().Hash()}, []bool{true}, 1}, // The genesis block should be retrievable - {0, []common.Hash{bc.CurrentBlock().Hash()}, []bool{true}, 1}, // The chains head block should be retrievable - {0, []common.Hash{{}}, []bool{false}, 0}, // A non existent block should not be returned - - // Existing and non-existing blocks interleaved should not cause problems - {0, []common.Hash{ - {}, - bc.GetBlockByNumber(1).Hash(), - {}, - bc.GetBlockByNumber(10).Hash(), - {}, - bc.GetBlockByNumber(100).Hash(), - {}, - }, []bool{false, true, false, true, false, true, false}, 3}, - } - // Run each of the tests and verify the results against the chain - var reqID uint64 - for i, tt := range tests { - // Collect the hashes to request, and the response to expect - hashes, seen := []common.Hash{}, make(map[int64]bool) - bodies := []*types.Body{} - - for j := 0; j < tt.random; j++ { - for { - num := rand.Int63n(int64(bc.CurrentBlock().NumberU64())) - if !seen[num] { - seen[num] = true - - block := bc.GetBlockByNumber(uint64(num)) - hashes = append(hashes, block.Hash()) - if len(bodies) < tt.expected { - bodies = append(bodies, &types.Body{Transactions: block.Transactions(), Uncles: block.Uncles()}) - } - break - } - } - } - for j, hash := range tt.explicit { - hashes = append(hashes, hash) - if tt.available[j] && len(bodies) < tt.expected { - block := bc.GetBlockByHash(hash) - bodies = append(bodies, &types.Body{Transactions: block.Transactions(), Uncles: block.Uncles()}) - } - } - reqID++ - // Send the hash request and verify the response - cost := server.tPeer.GetRequestCost(GetBlockBodiesMsg, len(hashes)) - sendRequest(server.tPeer.app, GetBlockBodiesMsg, reqID, cost, hashes) - if err := expectResponse(server.tPeer.app, BlockBodiesMsg, reqID, testBufLimit, bodies); err != nil { - t.Errorf("test %d: bodies mismatch: %v", i, err) - } - } -} - -// Tests that the contract codes can be retrieved based on account addresses. -func TestGetCodeLes1(t *testing.T) { testGetCode(t, 1) } -func TestGetCodeLes2(t *testing.T) { testGetCode(t, 2) } - -func testGetCode(t *testing.T, protocol int) { - // Assemble the test environment - server, tearDown := newServerEnv(t, 4, protocol, nil) - defer tearDown() - bc := server.pm.blockchain.(*core.BlockChain) - - var codereqs []*CodeReq - var codes [][]byte - - for i := uint64(0); i <= bc.CurrentBlock().NumberU64(); i++ { - header := bc.GetHeaderByNumber(i) - req := &CodeReq{ - BHash: header.Hash(), - AccKey: crypto.Keccak256(testContractAddr[:]), - } - codereqs = append(codereqs, req) - if i >= testContractDeployed { - codes = append(codes, testContractCodeDeployed) - } - } - - cost := server.tPeer.GetRequestCost(GetCodeMsg, len(codereqs)) - sendRequest(server.tPeer.app, GetCodeMsg, 42, cost, codereqs) - if err := expectResponse(server.tPeer.app, CodeMsg, 42, testBufLimit, codes); err != nil { - t.Errorf("codes mismatch: %v", err) - } -} - -// Tests that the transaction receipts can be retrieved based on hashes. -func TestGetReceiptLes1(t *testing.T) { testGetReceipt(t, 1) } -func TestGetReceiptLes2(t *testing.T) { testGetReceipt(t, 2) } - -func testGetReceipt(t *testing.T, protocol int) { - // Assemble the test environment - server, tearDown := newServerEnv(t, 4, protocol, nil) - defer tearDown() - bc := server.pm.blockchain.(*core.BlockChain) - - // Collect the hashes to request, and the response to expect - hashes, receipts := []common.Hash{}, []types.Receipts{} - for i := uint64(0); i <= bc.CurrentBlock().NumberU64(); i++ { - block := bc.GetBlockByNumber(i) - - hashes = append(hashes, block.Hash()) - receipts = append(receipts, rawdb.ReadReceipts(server.db, block.Hash(), block.NumberU64())) - } - // Send the hash request and verify the response - cost := server.tPeer.GetRequestCost(GetReceiptsMsg, len(hashes)) - sendRequest(server.tPeer.app, GetReceiptsMsg, 42, cost, hashes) - if err := expectResponse(server.tPeer.app, ReceiptsMsg, 42, testBufLimit, receipts); err != nil { - t.Errorf("receipts mismatch: %v", err) - } -} - -// Tests that trie merkle proofs can be retrieved -func TestGetProofsLes1(t *testing.T) { testGetProofs(t, 1) } -func TestGetProofsLes2(t *testing.T) { testGetProofs(t, 2) } - -func testGetProofs(t *testing.T, protocol int) { - // Assemble the test environment - server, tearDown := newServerEnv(t, 4, protocol, nil) - defer tearDown() - bc := server.pm.blockchain.(*core.BlockChain) - - var ( - proofreqs []ProofReq - proofsV1 [][]rlp.RawValue - ) - proofsV2 := light.NewNodeSet() - - accounts := []common.Address{testBankAddress, acc1Addr, acc2Addr, {}} - for i := uint64(0); i <= bc.CurrentBlock().NumberU64(); i++ { - header := bc.GetHeaderByNumber(i) - root := header.Root - trie, _ := trie.New(root, trie.NewDatabase(server.db)) - - for _, acc := range accounts { - req := ProofReq{ - BHash: header.Hash(), - Key: crypto.Keccak256(acc[:]), - } - proofreqs = append(proofreqs, req) - - switch protocol { - case 1: - var proof light.NodeList - trie.Prove(crypto.Keccak256(acc[:]), 0, &proof) - proofsV1 = append(proofsV1, proof) - case 2: - trie.Prove(crypto.Keccak256(acc[:]), 0, proofsV2) - } - } - } - // Send the proof request and verify the response - switch protocol { - case 1: - cost := server.tPeer.GetRequestCost(GetProofsV1Msg, len(proofreqs)) - sendRequest(server.tPeer.app, GetProofsV1Msg, 42, cost, proofreqs) - if err := expectResponse(server.tPeer.app, ProofsV1Msg, 42, testBufLimit, proofsV1); err != nil { - t.Errorf("proofs mismatch: %v", err) - } - case 2: - cost := server.tPeer.GetRequestCost(GetProofsV2Msg, len(proofreqs)) - sendRequest(server.tPeer.app, GetProofsV2Msg, 42, cost, proofreqs) - if err := expectResponse(server.tPeer.app, ProofsV2Msg, 42, testBufLimit, proofsV2.NodeList()); err != nil { - t.Errorf("proofs mismatch: %v", err) - } - } -} - -// Tests that CHT proofs can be correctly retrieved. -func TestGetCHTProofsLes1(t *testing.T) { testGetCHTProofs(t, 1) } -func TestGetCHTProofsLes2(t *testing.T) { testGetCHTProofs(t, 2) } - -func testGetCHTProofs(t *testing.T, protocol int) { - config := light.TestServerIndexerConfig - frequency := config.ChtSize - if protocol == 2 { - frequency = config.PairChtSize - } - - waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) { - expectSections := frequency / config.ChtSize - for { - cs, _, _ := cIndexer.Sections() - bs, _, _ := bIndexer.Sections() - if cs >= expectSections && bs >= expectSections { - break - } - time.Sleep(10 * time.Millisecond) - } - } - server, tearDown := newServerEnv(t, int(frequency+config.ChtConfirms), protocol, waitIndexers) - defer tearDown() - bc := server.pm.blockchain.(*core.BlockChain) - - // Assemble the proofs from the different protocols - header := bc.GetHeaderByNumber(frequency - 1) - rlp, _ := rlp.EncodeToBytes(header) - - key := make([]byte, 8) - binary.BigEndian.PutUint64(key, frequency-1) - - proofsV1 := []ChtResp{{ - Header: header, - }} - proofsV2 := HelperTrieResps{ - AuxData: [][]byte{rlp}, - } - switch protocol { - case 1: - root := light.GetChtRoot(server.db, 0, bc.GetHeaderByNumber(frequency-1).Hash()) - trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(server.db, light.ChtTablePrefix))) - - var proof light.NodeList - trie.Prove(key, 0, &proof) - proofsV1[0].Proof = proof - - case 2: - root := light.GetChtRoot(server.db, (frequency/config.ChtSize)-1, bc.GetHeaderByNumber(frequency-1).Hash()) - trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(server.db, light.ChtTablePrefix))) - trie.Prove(key, 0, &proofsV2.Proofs) - } - // Assemble the requests for the different protocols - requestsV1 := []ChtReq{{ - ChtNum: frequency / config.ChtSize, - BlockNum: frequency - 1, - }} - requestsV2 := []HelperTrieReq{{ - Type: htCanonical, - TrieIdx: frequency/config.PairChtSize - 1, - Key: key, - AuxReq: auxHeader, - }} - // Send the proof request and verify the response - switch protocol { - case 1: - cost := server.tPeer.GetRequestCost(GetHeaderProofsMsg, len(requestsV1)) - sendRequest(server.tPeer.app, GetHeaderProofsMsg, 42, cost, requestsV1) - if err := expectResponse(server.tPeer.app, HeaderProofsMsg, 42, testBufLimit, proofsV1); err != nil { - t.Errorf("proofs mismatch: %v", err) - } - case 2: - cost := server.tPeer.GetRequestCost(GetHelperTrieProofsMsg, len(requestsV2)) - sendRequest(server.tPeer.app, GetHelperTrieProofsMsg, 42, cost, requestsV2) - if err := expectResponse(server.tPeer.app, HelperTrieProofsMsg, 42, testBufLimit, proofsV2); err != nil { - t.Errorf("proofs mismatch: %v", err) - } - } -} - -// Tests that bloombits proofs can be correctly retrieved. -func TestGetBloombitsProofs(t *testing.T) { - config := light.TestServerIndexerConfig - - waitIndexers := func(cIndexer, bIndexer, btIndexer *core.ChainIndexer) { - for { - cs, _, _ := cIndexer.Sections() - bs, _, _ := bIndexer.Sections() - bts, _, _ := btIndexer.Sections() - if cs >= 8 && bs >= 8 && bts >= 1 { - break - } - time.Sleep(10 * time.Millisecond) - } - } - server, tearDown := newServerEnv(t, int(config.BloomTrieSize+config.BloomTrieConfirms), 2, waitIndexers) - defer tearDown() - bc := server.pm.blockchain.(*core.BlockChain) - - // Request and verify each bit of the bloom bits proofs - for bit := 0; bit < 2048; bit++ { - // Assemble the request and proofs for the bloombits - key := make([]byte, 10) - - binary.BigEndian.PutUint16(key[:2], uint16(bit)) - // Only the first bloom section has data. - binary.BigEndian.PutUint64(key[2:], 0) - - requests := []HelperTrieReq{{ - Type: htBloomBits, - TrieIdx: 0, - Key: key, - }} - var proofs HelperTrieResps - - root := light.GetBloomTrieRoot(server.db, 0, bc.GetHeaderByNumber(config.BloomTrieSize-1).Hash()) - trie, _ := trie.New(root, trie.NewDatabase(ethdb.NewTable(server.db, light.BloomTrieTablePrefix))) - trie.Prove(key, 0, &proofs.Proofs) - - // Send the proof request and verify the response - cost := server.tPeer.GetRequestCost(GetHelperTrieProofsMsg, len(requests)) - sendRequest(server.tPeer.app, GetHelperTrieProofsMsg, 42, cost, requests) - if err := expectResponse(server.tPeer.app, HelperTrieProofsMsg, 42, testBufLimit, proofs); err != nil { - t.Errorf("bit %d: proofs mismatch: %v", bit, err) - } - } -} - -func TestTransactionStatusLes2(t *testing.T) { - db := ethdb.NewMemDatabase() - pm := newTestProtocolManagerMust(t, false, 0, nil, nil, nil, db) - chain := pm.blockchain.(*core.BlockChain) - config := core.DefaultTxPoolConfig - config.Journal = "" - txpool := core.NewTxPool(config, params.TestChainConfig, chain) - pm.txpool = txpool - peer, _ := newTestPeer(t, "peer", 2, pm, true) - defer peer.close() - - var reqID uint64 - - test := func(tx *types.Transaction, send bool, expStatus txStatus) { - reqID++ - if send { - cost := peer.GetRequestCost(SendTxV2Msg, 1) - sendRequest(peer.app, SendTxV2Msg, reqID, cost, types.Transactions{tx}) - } else { - cost := peer.GetRequestCost(GetTxStatusMsg, 1) - sendRequest(peer.app, GetTxStatusMsg, reqID, cost, []common.Hash{tx.Hash()}) - } - if err := expectResponse(peer.app, TxStatusMsg, reqID, testBufLimit, []txStatus{expStatus}); err != nil { - t.Errorf("transaction status mismatch") - } - } - - signer := types.HomesteadSigner{} - - // test error status by sending an underpriced transaction - tx0, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey) - test(tx0, true, txStatus{Status: core.TxStatusUnknown, Error: core.ErrUnderpriced.Error()}) - - tx1, _ := types.SignTx(types.NewTransaction(0, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey) - test(tx1, false, txStatus{Status: core.TxStatusUnknown}) // query before sending, should be unknown - test(tx1, true, txStatus{Status: core.TxStatusPending}) // send valid processable tx, should return pending - test(tx1, true, txStatus{Status: core.TxStatusPending}) // adding it again should not return an error - - tx2, _ := types.SignTx(types.NewTransaction(1, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey) - tx3, _ := types.SignTx(types.NewTransaction(2, acc1Addr, big.NewInt(10000), params.TxGas, big.NewInt(100000000000), nil), signer, testBankKey) - // send transactions in the wrong order, tx3 should be queued - test(tx3, true, txStatus{Status: core.TxStatusQueued}) - test(tx2, true, txStatus{Status: core.TxStatusPending}) - // query again, now tx3 should be pending too - test(tx3, false, txStatus{Status: core.TxStatusPending}) - - // generate and add a block with tx1 and tx2 included - gchain, _ := core.GenerateChain(params.TestChainConfig, chain.GetBlockByNumber(0), ethash.NewFaker(), db, 1, func(i int, block *core.BlockGen) { - block.AddTx(tx1) - block.AddTx(tx2) - }) - if _, err := chain.InsertChain(gchain); err != nil { - panic(err) - } - // wait until TxPool processes the inserted block - for i := 0; i < 10; i++ { - if pending, _ := txpool.Stats(); pending == 1 { - break - } - time.Sleep(100 * time.Millisecond) - } - if pending, _ := txpool.Stats(); pending != 1 { - t.Fatalf("pending count mismatch: have %d, want 1", pending) - } - - // check if their status is included now - block1hash := rawdb.ReadCanonicalHash(db, 1) - test(tx1, false, txStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.TxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 0}}) - test(tx2, false, txStatus{Status: core.TxStatusIncluded, Lookup: &rawdb.TxLookupEntry{BlockHash: block1hash, BlockIndex: 1, Index: 1}}) -} diff --git a/les/helper_test.go b/les/helper_test.go deleted file mode 100644 index 2ce78f0c9..000000000 --- a/les/helper_test.go +++ /dev/null @@ -1,445 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// This file contains some shares testing functionality, common to multiple -// different files and modules being tested. - -package les - -import ( - "crypto/rand" - "math/big" - "sync" - "testing" - "time" - - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/consensus/ethash" - "github.com/dexon-foundation/dexon/core" - "github.com/dexon-foundation/dexon/core/types" - "github.com/dexon-foundation/dexon/core/vm" - "github.com/dexon-foundation/dexon/crypto" - "github.com/dexon-foundation/dexon/eth" - "github.com/dexon-foundation/dexon/ethdb" - "github.com/dexon-foundation/dexon/event" - "github.com/dexon-foundation/dexon/les/flowcontrol" - "github.com/dexon-foundation/dexon/light" - "github.com/dexon-foundation/dexon/p2p" - "github.com/dexon-foundation/dexon/p2p/enode" - "github.com/dexon-foundation/dexon/params" -) - -var ( - testBankKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") - testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey) - testBankFunds = big.NewInt(1000000000000000000) - - acc1Key, _ = crypto.HexToECDSA("8a1f9a8f95be41cd7ccb6168179afb4504aefe388d1e14474d32c45c72ce7b7a") - acc2Key, _ = crypto.HexToECDSA("49a7b37aa6f6645917e7b807e9d1c00d4fa71f18343b0d4122a4d2df64dd6fee") - acc1Addr = crypto.PubkeyToAddress(acc1Key.PublicKey) - acc2Addr = crypto.PubkeyToAddress(acc2Key.PublicKey) - - testContractCode = common.Hex2Bytes("606060405260cc8060106000396000f360606040526000357c01000000000000000000000000000000000000000000000000000000009004806360cd2685146041578063c16431b914606b57603f565b005b6055600480803590602001909190505060a9565b6040518082815260200191505060405180910390f35b60886004808035906020019091908035906020019091905050608a565b005b80600060005083606481101560025790900160005b50819055505b5050565b6000600060005082606481101560025790900160005b5054905060c7565b91905056") - testContractAddr common.Address - testContractCodeDeployed = testContractCode[16:] - testContractDeployed = uint64(2) - - testEventEmitterCode = common.Hex2Bytes("60606040523415600e57600080fd5b7f57050ab73f6b9ebdd9f76b8d4997793f48cf956e965ee070551b9ca0bb71584e60405160405180910390a160358060476000396000f3006060604052600080fd00a165627a7a723058203f727efcad8b5811f8cb1fc2620ce5e8c63570d697aef968172de296ea3994140029") - testEventEmitterAddr common.Address - - testBufLimit = uint64(100) -) - -/* -contract test { - - uint256[100] data; - - function Put(uint256 addr, uint256 value) { - data[addr] = value; - } - - function Get(uint256 addr) constant returns (uint256 value) { - return data[addr]; - } -} -*/ - -func testChainGen(i int, block *core.BlockGen) { - signer := types.HomesteadSigner{} - - switch i { - case 0: - // In block 1, the test bank sends account #1 some ether. - tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBankAddress), acc1Addr, big.NewInt(10000), params.TxGas, nil, nil), signer, testBankKey) - block.AddTx(tx) - case 1: - // In block 2, the test bank sends some more ether to account #1. - // acc1Addr passes it on to account #2. - // acc1Addr creates a test contract. - // acc1Addr creates a test event. - nonce := block.TxNonce(acc1Addr) - - tx1, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBankAddress), acc1Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, testBankKey) - tx2, _ := types.SignTx(types.NewTransaction(nonce, acc2Addr, big.NewInt(1000), params.TxGas, nil, nil), signer, acc1Key) - tx3, _ := types.SignTx(types.NewContractCreation(nonce+1, big.NewInt(0), 200000, big.NewInt(0), testContractCode), signer, acc1Key) - testContractAddr = crypto.CreateAddress(acc1Addr, nonce+1) - tx4, _ := types.SignTx(types.NewContractCreation(nonce+2, big.NewInt(0), 200000, big.NewInt(0), testEventEmitterCode), signer, acc1Key) - testEventEmitterAddr = crypto.CreateAddress(acc1Addr, nonce+2) - block.AddTx(tx1) - block.AddTx(tx2) - block.AddTx(tx3) - block.AddTx(tx4) - case 2: - // Block 3 is empty but was mined by account #2. - block.SetCoinbase(acc2Addr) - block.SetExtra([]byte("yeehaw")) - data := common.Hex2Bytes("C16431B900000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000000000001") - tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBankAddress), testContractAddr, big.NewInt(0), 100000, nil, data), signer, testBankKey) - block.AddTx(tx) - case 3: - // Block 4 includes blocks 2 and 3 as uncle headers (with modified extra data). - b2 := block.PrevBlock(1).Header() - b2.Extra = []byte("foo") - block.AddUncle(b2) - b3 := block.PrevBlock(2).Header() - b3.Extra = []byte("foo") - block.AddUncle(b3) - data := common.Hex2Bytes("C16431B900000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000002") - tx, _ := types.SignTx(types.NewTransaction(block.TxNonce(testBankAddress), testContractAddr, big.NewInt(0), 100000, nil, data), signer, testBankKey) - block.AddTx(tx) - } -} - -// testIndexers creates a set of indexers with specified params for testing purpose. -func testIndexers(db ethdb.Database, odr light.OdrBackend, iConfig *light.IndexerConfig) (*core.ChainIndexer, *core.ChainIndexer, *core.ChainIndexer) { - chtIndexer := light.NewChtIndexer(db, odr, iConfig.ChtSize, iConfig.ChtConfirms) - bloomIndexer := eth.NewBloomIndexer(db, iConfig.BloomSize, iConfig.BloomConfirms) - bloomTrieIndexer := light.NewBloomTrieIndexer(db, odr, iConfig.BloomSize, iConfig.BloomTrieSize) - bloomIndexer.AddChildIndexer(bloomTrieIndexer) - return chtIndexer, bloomIndexer, bloomTrieIndexer -} - -func testRCL() RequestCostList { - cl := make(RequestCostList, len(reqList)) - for i, code := range reqList { - cl[i].MsgCode = code - cl[i].BaseCost = 0 - cl[i].ReqCost = 0 - } - return cl -} - -// newTestProtocolManager creates a new protocol manager for testing purposes, -// with the given number of blocks already known, potential notification -// channels for different events and relative chain indexers array. -func newTestProtocolManager(lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database) (*ProtocolManager, error) { - var ( - evmux = new(event.TypeMux) - engine = ethash.NewFaker() - gspec = core.Genesis{ - Config: params.TestChainConfig, - Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}, - } - genesis = gspec.MustCommit(db) - chain BlockChain - ) - if peers == nil { - peers = newPeerSet() - } - - if lightSync { - chain, _ = light.NewLightChain(odr, gspec.Config, engine) - } else { - blockchain, _ := core.NewBlockChain(db, nil, gspec.Config, engine, vm.Config{}, nil) - gchain, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, blocks, generator) - if _, err := blockchain.InsertChain(gchain); err != nil { - panic(err) - } - chain = blockchain - } - - indexConfig := light.TestServerIndexerConfig - if lightSync { - indexConfig = light.TestClientIndexerConfig - } - pm, err := NewProtocolManager(gspec.Config, indexConfig, lightSync, NetworkId, evmux, engine, peers, chain, nil, db, odr, nil, nil, make(chan struct{}), new(sync.WaitGroup)) - if err != nil { - return nil, err - } - if !lightSync { - srv := &LesServer{lesCommons: lesCommons{protocolManager: pm}} - pm.server = srv - - srv.defParams = &flowcontrol.ServerParams{ - BufLimit: testBufLimit, - MinRecharge: 1, - } - - srv.fcManager = flowcontrol.NewClientManager(50, 10, 1000000000) - srv.fcCostStats = newCostStats(nil) - } - pm.Start(1000) - return pm, nil -} - -// newTestProtocolManagerMust creates a new protocol manager for testing purposes, -// with the given number of blocks already known, potential notification -// channels for different events and relative chain indexers array. In case of an error, the constructor force- -// fails the test. -func newTestProtocolManagerMust(t *testing.T, lightSync bool, blocks int, generator func(int, *core.BlockGen), odr *LesOdr, peers *peerSet, db ethdb.Database) *ProtocolManager { - pm, err := newTestProtocolManager(lightSync, blocks, generator, odr, peers, db) - if err != nil { - t.Fatalf("Failed to create protocol manager: %v", err) - } - return pm -} - -// testPeer is a simulated peer to allow testing direct network calls. -type testPeer struct { - net p2p.MsgReadWriter // Network layer reader/writer to simulate remote messaging - app *p2p.MsgPipeRW // Application layer reader/writer to simulate the local side - *peer -} - -// newTestPeer creates a new peer registered at the given protocol manager. -func newTestPeer(t *testing.T, name string, version int, pm *ProtocolManager, shake bool) (*testPeer, <-chan error) { - // Create a message pipe to communicate through - app, net := p2p.MsgPipe() - - // Generate a random id and create the peer - var id enode.ID - rand.Read(id[:]) - - peer := pm.newPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net) - - // Start the peer on a new thread - errc := make(chan error, 1) - go func() { - select { - case pm.newPeerCh <- peer: - errc <- pm.handle(peer) - case <-pm.quitSync: - errc <- p2p.DiscQuitting - } - }() - tp := &testPeer{ - app: app, - net: net, - peer: peer, - } - // Execute any implicitly requested handshakes and return - if shake { - var ( - genesis = pm.blockchain.Genesis() - head = pm.blockchain.CurrentHeader() - td = pm.blockchain.GetTd(head.Hash(), head.Number.Uint64()) - ) - tp.handshake(t, td, head.Hash(), head.Number.Uint64(), genesis.Hash()) - } - return tp, errc -} - -func newTestPeerPair(name string, version int, pm, pm2 *ProtocolManager) (*peer, <-chan error, *peer, <-chan error) { - // Create a message pipe to communicate through - app, net := p2p.MsgPipe() - - // Generate a random id and create the peer - var id enode.ID - rand.Read(id[:]) - - peer := pm.newPeer(version, NetworkId, p2p.NewPeer(id, name, nil), net) - peer2 := pm2.newPeer(version, NetworkId, p2p.NewPeer(id, name, nil), app) - - // Start the peer on a new thread - errc := make(chan error, 1) - errc2 := make(chan error, 1) - go func() { - select { - case pm.newPeerCh <- peer: - errc <- pm.handle(peer) - case <-pm.quitSync: - errc <- p2p.DiscQuitting - } - }() - go func() { - select { - case pm2.newPeerCh <- peer2: - errc2 <- pm2.handle(peer2) - case <-pm2.quitSync: - errc2 <- p2p.DiscQuitting - } - }() - return peer, errc, peer2, errc2 -} - -// handshake simulates a trivial handshake that expects the same state from the -// remote side as we are simulating locally. -func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, headNum uint64, genesis common.Hash) { - var expList keyValueList - expList = expList.add("protocolVersion", uint64(p.version)) - expList = expList.add("networkId", uint64(NetworkId)) - expList = expList.add("headTd", td) - expList = expList.add("headHash", head) - expList = expList.add("headNum", headNum) - expList = expList.add("genesisHash", genesis) - sendList := make(keyValueList, len(expList)) - copy(sendList, expList) - expList = expList.add("serveHeaders", nil) - expList = expList.add("serveChainSince", uint64(0)) - expList = expList.add("serveStateSince", uint64(0)) - expList = expList.add("txRelay", nil) - expList = expList.add("flowControl/BL", testBufLimit) - expList = expList.add("flowControl/MRR", uint64(1)) - expList = expList.add("flowControl/MRC", testRCL()) - - if err := p2p.ExpectMsg(p.app, StatusMsg, expList); err != nil { - t.Fatalf("status recv: %v", err) - } - if err := p2p.Send(p.app, StatusMsg, sendList); err != nil { - t.Fatalf("status send: %v", err) - } - - p.fcServerParams = &flowcontrol.ServerParams{ - BufLimit: testBufLimit, - MinRecharge: 1, - } -} - -// close terminates the local side of the peer, notifying the remote protocol -// manager of termination. -func (p *testPeer) close() { - p.app.Close() -} - -// TestEntity represents a network entity for testing with necessary auxiliary fields. -type TestEntity struct { - db ethdb.Database - rPeer *peer - tPeer *testPeer - peers *peerSet - pm *ProtocolManager - // Indexers - chtIndexer *core.ChainIndexer - bloomIndexer *core.ChainIndexer - bloomTrieIndexer *core.ChainIndexer -} - -// newServerEnv creates a server testing environment with a connected test peer for testing purpose. -func newServerEnv(t *testing.T, blocks int, protocol int, waitIndexers func(*core.ChainIndexer, *core.ChainIndexer, *core.ChainIndexer)) (*TestEntity, func()) { - db := ethdb.NewMemDatabase() - cIndexer, bIndexer, btIndexer := testIndexers(db, nil, light.TestServerIndexerConfig) - - pm := newTestProtocolManagerMust(t, false, blocks, testChainGen, nil, nil, db) - peer, _ := newTestPeer(t, "peer", protocol, pm, true) - - cIndexer.Start(pm.blockchain.(*core.BlockChain)) - bIndexer.Start(pm.blockchain.(*core.BlockChain)) - - // Wait until indexers generate enough index data. - if waitIndexers != nil { - waitIndexers(cIndexer, bIndexer, btIndexer) - } - - return &TestEntity{ - db: db, - tPeer: peer, - pm: pm, - chtIndexer: cIndexer, - bloomIndexer: bIndexer, - bloomTrieIndexer: btIndexer, - }, func() { - peer.close() - // Note bloom trie indexer will be closed by it parent recursively. - cIndexer.Close() - bIndexer.Close() - } -} - -// newClientServerEnv creates a client/server arch environment with a connected les server and light client pair -// for testing purpose. -func newClientServerEnv(t *testing.T, blocks int, protocol int, waitIndexers func(*core.ChainIndexer, *core.ChainIndexer, *core.ChainIndexer), newPeer bool) (*TestEntity, *TestEntity, func()) { - db, ldb := ethdb.NewMemDatabase(), ethdb.NewMemDatabase() - peers, lPeers := newPeerSet(), newPeerSet() - - dist := newRequestDistributor(lPeers, make(chan struct{})) - rm := newRetrieveManager(lPeers, dist, nil) - odr := NewLesOdr(ldb, light.TestClientIndexerConfig, rm) - - cIndexer, bIndexer, btIndexer := testIndexers(db, nil, light.TestServerIndexerConfig) - lcIndexer, lbIndexer, lbtIndexer := testIndexers(ldb, odr, light.TestClientIndexerConfig) - odr.SetIndexers(lcIndexer, lbtIndexer, lbIndexer) - - pm := newTestProtocolManagerMust(t, false, blocks, testChainGen, nil, peers, db) - lpm := newTestProtocolManagerMust(t, true, 0, nil, odr, lPeers, ldb) - - startIndexers := func(clientMode bool, pm *ProtocolManager) { - if clientMode { - lcIndexer.Start(pm.blockchain.(*light.LightChain)) - lbIndexer.Start(pm.blockchain.(*light.LightChain)) - } else { - cIndexer.Start(pm.blockchain.(*core.BlockChain)) - bIndexer.Start(pm.blockchain.(*core.BlockChain)) - } - } - - startIndexers(false, pm) - startIndexers(true, lpm) - - // Execute wait until function if it is specified. - if waitIndexers != nil { - waitIndexers(cIndexer, bIndexer, btIndexer) - } - - var ( - peer, lPeer *peer - err1, err2 <-chan error - ) - if newPeer { - peer, err1, lPeer, err2 = newTestPeerPair("peer", protocol, pm, lpm) - select { - case <-time.After(time.Millisecond * 100): - case err := <-err1: - t.Fatalf("peer 1 handshake error: %v", err) - case err := <-err2: - t.Fatalf("peer 2 handshake error: %v", err) - } - } - - return &TestEntity{ - db: db, - pm: pm, - rPeer: peer, - peers: peers, - chtIndexer: cIndexer, - bloomIndexer: bIndexer, - bloomTrieIndexer: btIndexer, - }, &TestEntity{ - db: ldb, - pm: lpm, - rPeer: lPeer, - peers: lPeers, - chtIndexer: lcIndexer, - bloomIndexer: lbIndexer, - bloomTrieIndexer: lbtIndexer, - }, func() { - // Note bloom trie indexers will be closed by their parents recursively. - cIndexer.Close() - bIndexer.Close() - lcIndexer.Close() - lbIndexer.Close() - } -} diff --git a/les/metrics.go b/les/metrics.go deleted file mode 100644 index a221282a0..000000000 --- a/les/metrics.go +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package les - -import ( - "github.com/dexon-foundation/dexon/metrics" - "github.com/dexon-foundation/dexon/p2p" -) - -var ( - /* propTxnInPacketsMeter = metrics.NewMeter("eth/prop/txns/in/packets") - propTxnInTrafficMeter = metrics.NewMeter("eth/prop/txns/in/traffic") - propTxnOutPacketsMeter = metrics.NewMeter("eth/prop/txns/out/packets") - propTxnOutTrafficMeter = metrics.NewMeter("eth/prop/txns/out/traffic") - propHashInPacketsMeter = metrics.NewMeter("eth/prop/hashes/in/packets") - propHashInTrafficMeter = metrics.NewMeter("eth/prop/hashes/in/traffic") - propHashOutPacketsMeter = metrics.NewMeter("eth/prop/hashes/out/packets") - propHashOutTrafficMeter = metrics.NewMeter("eth/prop/hashes/out/traffic") - propBlockInPacketsMeter = metrics.NewMeter("eth/prop/blocks/in/packets") - propBlockInTrafficMeter = metrics.NewMeter("eth/prop/blocks/in/traffic") - propBlockOutPacketsMeter = metrics.NewMeter("eth/prop/blocks/out/packets") - propBlockOutTrafficMeter = metrics.NewMeter("eth/prop/blocks/out/traffic") - reqHashInPacketsMeter = metrics.NewMeter("eth/req/hashes/in/packets") - reqHashInTrafficMeter = metrics.NewMeter("eth/req/hashes/in/traffic") - reqHashOutPacketsMeter = metrics.NewMeter("eth/req/hashes/out/packets") - reqHashOutTrafficMeter = metrics.NewMeter("eth/req/hashes/out/traffic") - reqBlockInPacketsMeter = metrics.NewMeter("eth/req/blocks/in/packets") - reqBlockInTrafficMeter = metrics.NewMeter("eth/req/blocks/in/traffic") - reqBlockOutPacketsMeter = metrics.NewMeter("eth/req/blocks/out/packets") - reqBlockOutTrafficMeter = metrics.NewMeter("eth/req/blocks/out/traffic") - reqHeaderInPacketsMeter = metrics.NewMeter("eth/req/headers/in/packets") - reqHeaderInTrafficMeter = metrics.NewMeter("eth/req/headers/in/traffic") - reqHeaderOutPacketsMeter = metrics.NewMeter("eth/req/headers/out/packets") - reqHeaderOutTrafficMeter = metrics.NewMeter("eth/req/headers/out/traffic") - reqBodyInPacketsMeter = metrics.NewMeter("eth/req/bodies/in/packets") - reqBodyInTrafficMeter = metrics.NewMeter("eth/req/bodies/in/traffic") - reqBodyOutPacketsMeter = metrics.NewMeter("eth/req/bodies/out/packets") - reqBodyOutTrafficMeter = metrics.NewMeter("eth/req/bodies/out/traffic") - reqStateInPacketsMeter = metrics.NewMeter("eth/req/states/in/packets") - reqStateInTrafficMeter = metrics.NewMeter("eth/req/states/in/traffic") - reqStateOutPacketsMeter = metrics.NewMeter("eth/req/states/out/packets") - reqStateOutTrafficMeter = metrics.NewMeter("eth/req/states/out/traffic") - reqReceiptInPacketsMeter = metrics.NewMeter("eth/req/receipts/in/packets") - reqReceiptInTrafficMeter = metrics.NewMeter("eth/req/receipts/in/traffic") - reqReceiptOutPacketsMeter = metrics.NewMeter("eth/req/receipts/out/packets") - reqReceiptOutTrafficMeter = metrics.NewMeter("eth/req/receipts/out/traffic")*/ - miscInPacketsMeter = metrics.NewRegisteredMeter("les/misc/in/packets", nil) - miscInTrafficMeter = metrics.NewRegisteredMeter("les/misc/in/traffic", nil) - miscOutPacketsMeter = metrics.NewRegisteredMeter("les/misc/out/packets", nil) - miscOutTrafficMeter = metrics.NewRegisteredMeter("les/misc/out/traffic", nil) -) - -// meteredMsgReadWriter is a wrapper around a p2p.MsgReadWriter, capable of -// accumulating the above defined metrics based on the data stream contents. -type meteredMsgReadWriter struct { - p2p.MsgReadWriter // Wrapped message stream to meter - version int // Protocol version to select correct meters -} - -// newMeteredMsgWriter wraps a p2p MsgReadWriter with metering support. If the -// metrics system is disabled, this function returns the original object. -func newMeteredMsgWriter(rw p2p.MsgReadWriter) p2p.MsgReadWriter { - if !metrics.Enabled { - return rw - } - return &meteredMsgReadWriter{MsgReadWriter: rw} -} - -// Init sets the protocol version used by the stream to know which meters to -// increment in case of overlapping message ids between protocol versions. -func (rw *meteredMsgReadWriter) Init(version int) { - rw.version = version -} - -func (rw *meteredMsgReadWriter) ReadMsg() (p2p.Msg, error) { - // Read the message and short circuit in case of an error - msg, err := rw.MsgReadWriter.ReadMsg() - if err != nil { - return msg, err - } - // Account for the data traffic - packets, traffic := miscInPacketsMeter, miscInTrafficMeter - packets.Mark(1) - traffic.Mark(int64(msg.Size)) - - return msg, err -} - -func (rw *meteredMsgReadWriter) WriteMsg(msg p2p.Msg) error { - // Account for the data traffic - packets, traffic := miscOutPacketsMeter, miscOutTrafficMeter - packets.Mark(1) - traffic.Mark(int64(msg.Size)) - - // Send the packet to the p2p layer - return rw.MsgReadWriter.WriteMsg(msg) -} diff --git a/les/odr.go b/les/odr.go deleted file mode 100644 index 4c05ce7f9..000000000 --- a/les/odr.go +++ /dev/null @@ -1,129 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package les - -import ( - "context" - - "github.com/dexon-foundation/dexon/core" - "github.com/dexon-foundation/dexon/ethdb" - "github.com/dexon-foundation/dexon/light" - "github.com/dexon-foundation/dexon/log" -) - -// LesOdr implements light.OdrBackend -type LesOdr struct { - db ethdb.Database - indexerConfig *light.IndexerConfig - chtIndexer, bloomTrieIndexer, bloomIndexer *core.ChainIndexer - retriever *retrieveManager - stop chan struct{} -} - -func NewLesOdr(db ethdb.Database, config *light.IndexerConfig, retriever *retrieveManager) *LesOdr { - return &LesOdr{ - db: db, - indexerConfig: config, - retriever: retriever, - stop: make(chan struct{}), - } -} - -// Stop cancels all pending retrievals -func (odr *LesOdr) Stop() { - close(odr.stop) -} - -// Database returns the backing database -func (odr *LesOdr) Database() ethdb.Database { - return odr.db -} - -// SetIndexers adds the necessary chain indexers to the ODR backend -func (odr *LesOdr) SetIndexers(chtIndexer, bloomTrieIndexer, bloomIndexer *core.ChainIndexer) { - odr.chtIndexer = chtIndexer - odr.bloomTrieIndexer = bloomTrieIndexer - odr.bloomIndexer = bloomIndexer -} - -// ChtIndexer returns the CHT chain indexer -func (odr *LesOdr) ChtIndexer() *core.ChainIndexer { - return odr.chtIndexer -} - -// BloomTrieIndexer returns the bloom trie chain indexer -func (odr *LesOdr) BloomTrieIndexer() *core.ChainIndexer { - return odr.bloomTrieIndexer -} - -// BloomIndexer returns the bloombits chain indexer -func (odr *LesOdr) BloomIndexer() *core.ChainIndexer { - return odr.bloomIndexer -} - -// IndexerConfig returns the indexer config. -func (odr *LesOdr) IndexerConfig() *light.IndexerConfig { - return odr.indexerConfig -} - -const ( - MsgBlockBodies = iota - MsgCode - MsgReceipts - MsgProofsV1 - MsgProofsV2 - MsgHeaderProofs - MsgHelperTrieProofs -) - -// Msg encodes a LES message that delivers reply data for a request -type Msg struct { - MsgType int - ReqID uint64 - Obj interface{} -} - -// Retrieve tries to fetch an object from the LES network. -// If the network retrieval was successful, it stores the object in local db. -func (odr *LesOdr) Retrieve(ctx context.Context, req light.OdrRequest) (err error) { - lreq := LesRequest(req) - - reqID := genReqID() - rq := &distReq{ - getCost: func(dp distPeer) uint64 { - return lreq.GetCost(dp.(*peer)) - }, - canSend: func(dp distPeer) bool { - p := dp.(*peer) - return lreq.CanSend(p) - }, - request: func(dp distPeer) func() { - p := dp.(*peer) - cost := lreq.GetCost(p) - p.fcServer.QueueRequest(reqID, cost) - return func() { lreq.Request(reqID, p) } - }, - } - - if err = odr.retriever.retrieve(ctx, reqID, rq, func(p distPeer, msg *Msg) error { return lreq.Validate(odr.db, msg) }, odr.stop); err == nil { - // retrieved from network, store in db - req.StoreResult(odr.db) - } else { - log.Debug("Failed to retrieve data from network", "err", err) - } - return -} diff --git a/les/odr_requests.go b/les/odr_requests.go deleted file mode 100644 index 1a3887141..000000000 --- a/les/odr_requests.go +++ /dev/null @@ -1,580 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package light implements on-demand retrieval capable state and chain objects -// for the Ethereum Light Client. -package les - -import ( - "encoding/binary" - "errors" - "fmt" - - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/core/rawdb" - "github.com/dexon-foundation/dexon/core/types" - "github.com/dexon-foundation/dexon/crypto" - "github.com/dexon-foundation/dexon/ethdb" - "github.com/dexon-foundation/dexon/light" - "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/rlp" - "github.com/dexon-foundation/dexon/trie" -) - -var ( - errInvalidMessageType = errors.New("invalid message type") - errInvalidEntryCount = errors.New("invalid number of response entries") - errHeaderUnavailable = errors.New("header unavailable") - errTxHashMismatch = errors.New("transaction hash mismatch") - errUncleHashMismatch = errors.New("uncle hash mismatch") - errReceiptHashMismatch = errors.New("receipt hash mismatch") - errDataHashMismatch = errors.New("data hash mismatch") - errCHTHashMismatch = errors.New("cht hash mismatch") - errCHTNumberMismatch = errors.New("cht number mismatch") - errUselessNodes = errors.New("useless nodes in merkle proof nodeset") -) - -type LesOdrRequest interface { - GetCost(*peer) uint64 - CanSend(*peer) bool - Request(uint64, *peer) error - Validate(ethdb.Database, *Msg) error -} - -func LesRequest(req light.OdrRequest) LesOdrRequest { - switch r := req.(type) { - case *light.BlockRequest: - return (*BlockRequest)(r) - case *light.ReceiptsRequest: - return (*ReceiptsRequest)(r) - case *light.TrieRequest: - return (*TrieRequest)(r) - case *light.CodeRequest: - return (*CodeRequest)(r) - case *light.ChtRequest: - return (*ChtRequest)(r) - case *light.BloomRequest: - return (*BloomRequest)(r) - default: - return nil - } -} - -// BlockRequest is the ODR request type for block bodies -type BlockRequest light.BlockRequest - -// GetCost returns the cost of the given ODR request according to the serving -// peer's cost table (implementation of LesOdrRequest) -func (r *BlockRequest) GetCost(peer *peer) uint64 { - return peer.GetRequestCost(GetBlockBodiesMsg, 1) -} - -// CanSend tells if a certain peer is suitable for serving the given request -func (r *BlockRequest) CanSend(peer *peer) bool { - return peer.HasBlock(r.Hash, r.Number, false) -} - -// Request sends an ODR request to the LES network (implementation of LesOdrRequest) -func (r *BlockRequest) Request(reqID uint64, peer *peer) error { - peer.Log().Debug("Requesting block body", "hash", r.Hash) - return peer.RequestBodies(reqID, r.GetCost(peer), []common.Hash{r.Hash}) -} - -// Valid processes an ODR request reply message from the LES network -// returns true and stores results in memory if the message was a valid reply -// to the request (implementation of LesOdrRequest) -func (r *BlockRequest) Validate(db ethdb.Database, msg *Msg) error { - log.Debug("Validating block body", "hash", r.Hash) - - // Ensure we have a correct message with a single block body - if msg.MsgType != MsgBlockBodies { - return errInvalidMessageType - } - bodies := msg.Obj.([]*types.Body) - if len(bodies) != 1 { - return errInvalidEntryCount - } - body := bodies[0] - - // Retrieve our stored header and validate block content against it - header := rawdb.ReadHeader(db, r.Hash, r.Number) - if header == nil { - return errHeaderUnavailable - } - if header.TxHash != types.DeriveSha(types.Transactions(body.Transactions)) { - return errTxHashMismatch - } - if header.UncleHash != types.CalcUncleHash(body.Uncles) { - return errUncleHashMismatch - } - // Validations passed, encode and store RLP - data, err := rlp.EncodeToBytes(body) - if err != nil { - return err - } - r.Rlp = data - return nil -} - -// ReceiptsRequest is the ODR request type for block receipts by block hash -type ReceiptsRequest light.ReceiptsRequest - -// GetCost returns the cost of the given ODR request according to the serving -// peer's cost table (implementation of LesOdrRequest) -func (r *ReceiptsRequest) GetCost(peer *peer) uint64 { - return peer.GetRequestCost(GetReceiptsMsg, 1) -} - -// CanSend tells if a certain peer is suitable for serving the given request -func (r *ReceiptsRequest) CanSend(peer *peer) bool { - return peer.HasBlock(r.Hash, r.Number, false) -} - -// Request sends an ODR request to the LES network (implementation of LesOdrRequest) -func (r *ReceiptsRequest) Request(reqID uint64, peer *peer) error { - peer.Log().Debug("Requesting block receipts", "hash", r.Hash) - return peer.RequestReceipts(reqID, r.GetCost(peer), []common.Hash{r.Hash}) -} - -// Valid processes an ODR request reply message from the LES network -// returns true and stores results in memory if the message was a valid reply -// to the request (implementation of LesOdrRequest) -func (r *ReceiptsRequest) Validate(db ethdb.Database, msg *Msg) error { - log.Debug("Validating block receipts", "hash", r.Hash) - - // Ensure we have a correct message with a single block receipt - if msg.MsgType != MsgReceipts { - return errInvalidMessageType - } - receipts := msg.Obj.([]types.Receipts) - if len(receipts) != 1 { - return errInvalidEntryCount - } - receipt := receipts[0] - - // Retrieve our stored header and validate receipt content against it - header := rawdb.ReadHeader(db, r.Hash, r.Number) - if header == nil { - return errHeaderUnavailable - } - if header.ReceiptHash != types.DeriveSha(receipt) { - return errReceiptHashMismatch - } - // Validations passed, store and return - r.Receipts = receipt - return nil -} - -type ProofReq struct { - BHash common.Hash - AccKey, Key []byte - FromLevel uint -} - -// ODR request type for state/storage trie entries, see LesOdrRequest interface -type TrieRequest light.TrieRequest - -// GetCost returns the cost of the given ODR request according to the serving -// peer's cost table (implementation of LesOdrRequest) -func (r *TrieRequest) GetCost(peer *peer) uint64 { - switch peer.version { - case lpv1: - return peer.GetRequestCost(GetProofsV1Msg, 1) - case lpv2: - return peer.GetRequestCost(GetProofsV2Msg, 1) - default: - panic(nil) - } -} - -// CanSend tells if a certain peer is suitable for serving the given request -func (r *TrieRequest) CanSend(peer *peer) bool { - return peer.HasBlock(r.Id.BlockHash, r.Id.BlockNumber, true) -} - -// Request sends an ODR request to the LES network (implementation of LesOdrRequest) -func (r *TrieRequest) Request(reqID uint64, peer *peer) error { - peer.Log().Debug("Requesting trie proof", "root", r.Id.Root, "key", r.Key) - req := ProofReq{ - BHash: r.Id.BlockHash, - AccKey: r.Id.AccKey, - Key: r.Key, - } - return peer.RequestProofs(reqID, r.GetCost(peer), []ProofReq{req}) -} - -// Valid processes an ODR request reply message from the LES network -// returns true and stores results in memory if the message was a valid reply -// to the request (implementation of LesOdrRequest) -func (r *TrieRequest) Validate(db ethdb.Database, msg *Msg) error { - log.Debug("Validating trie proof", "root", r.Id.Root, "key", r.Key) - - switch msg.MsgType { - case MsgProofsV1: - proofs := msg.Obj.([]light.NodeList) - if len(proofs) != 1 { - return errInvalidEntryCount - } - nodeSet := proofs[0].NodeSet() - // Verify the proof and store if checks out - if _, _, err := trie.VerifyProof(r.Id.Root, r.Key, nodeSet); err != nil { - return fmt.Errorf("merkle proof verification failed: %v", err) - } - r.Proof = nodeSet - return nil - - case MsgProofsV2: - proofs := msg.Obj.(light.NodeList) - // Verify the proof and store if checks out - nodeSet := proofs.NodeSet() - reads := &readTraceDB{db: nodeSet} - if _, _, err := trie.VerifyProof(r.Id.Root, r.Key, reads); err != nil { - return fmt.Errorf("merkle proof verification failed: %v", err) - } - // check if all nodes have been read by VerifyProof - if len(reads.reads) != nodeSet.KeyCount() { - return errUselessNodes - } - r.Proof = nodeSet - return nil - - default: - return errInvalidMessageType - } -} - -type CodeReq struct { - BHash common.Hash - AccKey []byte -} - -// ODR request type for node data (used for retrieving contract code), see LesOdrRequest interface -type CodeRequest light.CodeRequest - -// GetCost returns the cost of the given ODR request according to the serving -// peer's cost table (implementation of LesOdrRequest) -func (r *CodeRequest) GetCost(peer *peer) uint64 { - return peer.GetRequestCost(GetCodeMsg, 1) -} - -// CanSend tells if a certain peer is suitable for serving the given request -func (r *CodeRequest) CanSend(peer *peer) bool { - return peer.HasBlock(r.Id.BlockHash, r.Id.BlockNumber, true) -} - -// Request sends an ODR request to the LES network (implementation of LesOdrRequest) -func (r *CodeRequest) Request(reqID uint64, peer *peer) error { - peer.Log().Debug("Requesting code data", "hash", r.Hash) - req := CodeReq{ - BHash: r.Id.BlockHash, - AccKey: r.Id.AccKey, - } - return peer.RequestCode(reqID, r.GetCost(peer), []CodeReq{req}) -} - -// Valid processes an ODR request reply message from the LES network -// returns true and stores results in memory if the message was a valid reply -// to the request (implementation of LesOdrRequest) -func (r *CodeRequest) Validate(db ethdb.Database, msg *Msg) error { - log.Debug("Validating code data", "hash", r.Hash) - - // Ensure we have a correct message with a single code element - if msg.MsgType != MsgCode { - return errInvalidMessageType - } - reply := msg.Obj.([][]byte) - if len(reply) != 1 { - return errInvalidEntryCount - } - data := reply[0] - - // Verify the data and store if checks out - if hash := crypto.Keccak256Hash(data); r.Hash != hash { - return errDataHashMismatch - } - r.Data = data - return nil -} - -const ( - // helper trie type constants - htCanonical = iota // Canonical hash trie - htBloomBits // BloomBits trie - - // applicable for all helper trie requests - auxRoot = 1 - // applicable for htCanonical - auxHeader = 2 -) - -type HelperTrieReq struct { - Type uint - TrieIdx uint64 - Key []byte - FromLevel, AuxReq uint -} - -type HelperTrieResps struct { // describes all responses, not just a single one - Proofs light.NodeList - AuxData [][]byte -} - -// legacy LES/1 -type ChtReq struct { - ChtNum, BlockNum uint64 - FromLevel uint -} - -// legacy LES/1 -type ChtResp struct { - Header *types.Header - Proof []rlp.RawValue -} - -// ODR request type for requesting headers by Canonical Hash Trie, see LesOdrRequest interface -type ChtRequest light.ChtRequest - -// GetCost returns the cost of the given ODR request according to the serving -// peer's cost table (implementation of LesOdrRequest) -func (r *ChtRequest) GetCost(peer *peer) uint64 { - switch peer.version { - case lpv1: - return peer.GetRequestCost(GetHeaderProofsMsg, 1) - case lpv2: - return peer.GetRequestCost(GetHelperTrieProofsMsg, 1) - default: - panic(nil) - } -} - -// CanSend tells if a certain peer is suitable for serving the given request -func (r *ChtRequest) CanSend(peer *peer) bool { - peer.lock.RLock() - defer peer.lock.RUnlock() - - return peer.headInfo.Number >= r.Config.ChtConfirms && r.ChtNum <= (peer.headInfo.Number-r.Config.ChtConfirms)/r.Config.ChtSize -} - -// Request sends an ODR request to the LES network (implementation of LesOdrRequest) -func (r *ChtRequest) Request(reqID uint64, peer *peer) error { - peer.Log().Debug("Requesting CHT", "cht", r.ChtNum, "block", r.BlockNum) - var encNum [8]byte - binary.BigEndian.PutUint64(encNum[:], r.BlockNum) - req := HelperTrieReq{ - Type: htCanonical, - TrieIdx: r.ChtNum, - Key: encNum[:], - AuxReq: auxHeader, - } - switch peer.version { - case lpv1: - var reqsV1 ChtReq - if req.Type != htCanonical || req.AuxReq != auxHeader || len(req.Key) != 8 { - return fmt.Errorf("Request invalid in LES/1 mode") - } - blockNum := binary.BigEndian.Uint64(req.Key) - // convert HelperTrie request to old CHT request - reqsV1 = ChtReq{ChtNum: (req.TrieIdx + 1) * (r.Config.ChtSize / r.Config.PairChtSize), BlockNum: blockNum, FromLevel: req.FromLevel} - return peer.RequestHelperTrieProofs(reqID, r.GetCost(peer), []ChtReq{reqsV1}) - case lpv2: - return peer.RequestHelperTrieProofs(reqID, r.GetCost(peer), []HelperTrieReq{req}) - default: - panic(nil) - } -} - -// Valid processes an ODR request reply message from the LES network -// returns true and stores results in memory if the message was a valid reply -// to the request (implementation of LesOdrRequest) -func (r *ChtRequest) Validate(db ethdb.Database, msg *Msg) error { - log.Debug("Validating CHT", "cht", r.ChtNum, "block", r.BlockNum) - - switch msg.MsgType { - case MsgHeaderProofs: // LES/1 backwards compatibility - proofs := msg.Obj.([]ChtResp) - if len(proofs) != 1 { - return errInvalidEntryCount - } - proof := proofs[0] - - // Verify the CHT - var encNumber [8]byte - binary.BigEndian.PutUint64(encNumber[:], r.BlockNum) - - value, _, err := trie.VerifyProof(r.ChtRoot, encNumber[:], light.NodeList(proof.Proof).NodeSet()) - if err != nil { - return err - } - var node light.ChtNode - if err := rlp.DecodeBytes(value, &node); err != nil { - return err - } - if node.Hash != proof.Header.Hash() { - return errCHTHashMismatch - } - // Verifications passed, store and return - r.Header = proof.Header - r.Proof = light.NodeList(proof.Proof).NodeSet() - r.Td = node.Td - case MsgHelperTrieProofs: - resp := msg.Obj.(HelperTrieResps) - if len(resp.AuxData) != 1 { - return errInvalidEntryCount - } - nodeSet := resp.Proofs.NodeSet() - headerEnc := resp.AuxData[0] - if len(headerEnc) == 0 { - return errHeaderUnavailable - } - header := new(types.Header) - if err := rlp.DecodeBytes(headerEnc, header); err != nil { - return errHeaderUnavailable - } - - // Verify the CHT - var encNumber [8]byte - binary.BigEndian.PutUint64(encNumber[:], r.BlockNum) - - reads := &readTraceDB{db: nodeSet} - value, _, err := trie.VerifyProof(r.ChtRoot, encNumber[:], reads) - if err != nil { - return fmt.Errorf("merkle proof verification failed: %v", err) - } - if len(reads.reads) != nodeSet.KeyCount() { - return errUselessNodes - } - - var node light.ChtNode - if err := rlp.DecodeBytes(value, &node); err != nil { - return err - } - if node.Hash != header.Hash() { - return errCHTHashMismatch - } - if r.BlockNum != header.Number.Uint64() { - return errCHTNumberMismatch - } - // Verifications passed, store and return - r.Header = header - r.Proof = nodeSet - r.Td = node.Td - default: - return errInvalidMessageType - } - return nil -} - -type BloomReq struct { - BloomTrieNum, BitIdx, SectionIndex, FromLevel uint64 -} - -// ODR request type for requesting headers by Canonical Hash Trie, see LesOdrRequest interface -type BloomRequest light.BloomRequest - -// GetCost returns the cost of the given ODR request according to the serving -// peer's cost table (implementation of LesOdrRequest) -func (r *BloomRequest) GetCost(peer *peer) uint64 { - return peer.GetRequestCost(GetHelperTrieProofsMsg, len(r.SectionIndexList)) -} - -// CanSend tells if a certain peer is suitable for serving the given request -func (r *BloomRequest) CanSend(peer *peer) bool { - peer.lock.RLock() - defer peer.lock.RUnlock() - - if peer.version < lpv2 { - return false - } - return peer.headInfo.Number >= r.Config.BloomTrieConfirms && r.BloomTrieNum <= (peer.headInfo.Number-r.Config.BloomTrieConfirms)/r.Config.BloomTrieSize -} - -// Request sends an ODR request to the LES network (implementation of LesOdrRequest) -func (r *BloomRequest) Request(reqID uint64, peer *peer) error { - peer.Log().Debug("Requesting BloomBits", "bloomTrie", r.BloomTrieNum, "bitIdx", r.BitIdx, "sections", r.SectionIndexList) - reqs := make([]HelperTrieReq, len(r.SectionIndexList)) - - var encNumber [10]byte - binary.BigEndian.PutUint16(encNumber[:2], uint16(r.BitIdx)) - - for i, sectionIdx := range r.SectionIndexList { - binary.BigEndian.PutUint64(encNumber[2:], sectionIdx) - reqs[i] = HelperTrieReq{ - Type: htBloomBits, - TrieIdx: r.BloomTrieNum, - Key: common.CopyBytes(encNumber[:]), - } - } - return peer.RequestHelperTrieProofs(reqID, r.GetCost(peer), reqs) -} - -// Valid processes an ODR request reply message from the LES network -// returns true and stores results in memory if the message was a valid reply -// to the request (implementation of LesOdrRequest) -func (r *BloomRequest) Validate(db ethdb.Database, msg *Msg) error { - log.Debug("Validating BloomBits", "bloomTrie", r.BloomTrieNum, "bitIdx", r.BitIdx, "sections", r.SectionIndexList) - - // Ensure we have a correct message with a single proof element - if msg.MsgType != MsgHelperTrieProofs { - return errInvalidMessageType - } - resps := msg.Obj.(HelperTrieResps) - proofs := resps.Proofs - nodeSet := proofs.NodeSet() - reads := &readTraceDB{db: nodeSet} - - r.BloomBits = make([][]byte, len(r.SectionIndexList)) - - // Verify the proofs - var encNumber [10]byte - binary.BigEndian.PutUint16(encNumber[:2], uint16(r.BitIdx)) - - for i, idx := range r.SectionIndexList { - binary.BigEndian.PutUint64(encNumber[2:], idx) - value, _, err := trie.VerifyProof(r.BloomTrieRoot, encNumber[:], reads) - if err != nil { - return err - } - r.BloomBits[i] = value - } - - if len(reads.reads) != nodeSet.KeyCount() { - return errUselessNodes - } - r.Proofs = nodeSet - return nil -} - -// readTraceDB stores the keys of database reads. We use this to check that received node -// sets contain only the trie nodes necessary to make proofs pass. -type readTraceDB struct { - db trie.DatabaseReader - reads map[string]struct{} -} - -// Get returns a stored node -func (db *readTraceDB) Get(k []byte) ([]byte, error) { - if db.reads == nil { - db.reads = make(map[string]struct{}) - } - db.reads[string(k)] = struct{}{} - return db.db.Get(k) -} - -// Has returns true if the node set contains the given key -func (db *readTraceDB) Has(key []byte) (bool, error) { - _, err := db.Get(key) - return err == nil, nil -} diff --git a/les/odr_test.go b/les/odr_test.go deleted file mode 100644 index 9a6b1ef88..000000000 --- a/les/odr_test.go +++ /dev/null @@ -1,204 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package les - -import ( - "bytes" - "context" - "math/big" - "testing" - "time" - - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/common/math" - "github.com/dexon-foundation/dexon/core" - "github.com/dexon-foundation/dexon/core/rawdb" - "github.com/dexon-foundation/dexon/core/state" - "github.com/dexon-foundation/dexon/core/types" - "github.com/dexon-foundation/dexon/core/vm" - "github.com/dexon-foundation/dexon/ethdb" - "github.com/dexon-foundation/dexon/light" - "github.com/dexon-foundation/dexon/params" - "github.com/dexon-foundation/dexon/rlp" -) - -type odrTestFn func(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte - -func TestOdrGetBlockLes1(t *testing.T) { testOdr(t, 1, 1, odrGetBlock) } - -func TestOdrGetBlockLes2(t *testing.T) { testOdr(t, 2, 1, odrGetBlock) } - -func odrGetBlock(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte { - var block *types.Block - if bc != nil { - block = bc.GetBlockByHash(bhash) - } else { - block, _ = lc.GetBlockByHash(ctx, bhash) - } - if block == nil { - return nil - } - rlp, _ := rlp.EncodeToBytes(block) - return rlp -} - -func TestOdrGetReceiptsLes1(t *testing.T) { testOdr(t, 1, 1, odrGetReceipts) } - -func TestOdrGetReceiptsLes2(t *testing.T) { testOdr(t, 2, 1, odrGetReceipts) } - -func odrGetReceipts(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte { - var receipts types.Receipts - if bc != nil { - if number := rawdb.ReadHeaderNumber(db, bhash); number != nil { - receipts = rawdb.ReadReceipts(db, bhash, *number) - } - } else { - if number := rawdb.ReadHeaderNumber(db, bhash); number != nil { - receipts, _ = light.GetBlockReceipts(ctx, lc.Odr(), bhash, *number) - } - } - if receipts == nil { - return nil - } - rlp, _ := rlp.EncodeToBytes(receipts) - return rlp -} - -func TestOdrAccountsLes1(t *testing.T) { testOdr(t, 1, 1, odrAccounts) } - -func TestOdrAccountsLes2(t *testing.T) { testOdr(t, 2, 1, odrAccounts) } - -func odrAccounts(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte { - dummyAddr := common.HexToAddress("1234567812345678123456781234567812345678") - acc := []common.Address{testBankAddress, acc1Addr, acc2Addr, dummyAddr} - - var ( - res []byte - st *state.StateDB - err error - ) - for _, addr := range acc { - if bc != nil { - header := bc.GetHeaderByHash(bhash) - st, err = state.New(header.Root, state.NewDatabase(db)) - } else { - header := lc.GetHeaderByHash(bhash) - st = light.NewState(ctx, header, lc.Odr()) - } - if err == nil { - bal := st.GetBalance(addr) - rlp, _ := rlp.EncodeToBytes(bal) - res = append(res, rlp...) - } - } - return res -} - -func TestOdrContractCallLes1(t *testing.T) { testOdr(t, 1, 2, odrContractCall) } - -func TestOdrContractCallLes2(t *testing.T) { testOdr(t, 2, 2, odrContractCall) } - -type callmsg struct { - types.Message -} - -func (callmsg) CheckNonce() bool { return false } - -func odrContractCall(ctx context.Context, db ethdb.Database, config *params.ChainConfig, bc *core.BlockChain, lc *light.LightChain, bhash common.Hash) []byte { - data := common.Hex2Bytes("60CD26850000000000000000000000000000000000000000000000000000000000000000") - - var res []byte - for i := 0; i < 3; i++ { - data[35] = byte(i) - if bc != nil { - header := bc.GetHeaderByHash(bhash) - statedb, err := state.New(header.Root, state.NewDatabase(db)) - - if err == nil { - from := statedb.GetOrNewStateObject(testBankAddress) - from.SetBalance(math.MaxBig256) - - msg := callmsg{types.NewMessage(from.Address(), &testContractAddr, 0, new(big.Int), 100000, new(big.Int), data, false)} - - context := core.NewEVMContext(msg, header, bc, nil) - vmenv := vm.NewEVM(context, statedb, config, vm.Config{}) - - //vmenv := core.NewEnv(statedb, config, bc, msg, header, vm.Config{}) - gp := new(core.GasPool).AddGas(math.MaxUint64) - ret, _, _, _ := core.ApplyMessage(vmenv, msg, gp) - res = append(res, ret...) - } - } else { - header := lc.GetHeaderByHash(bhash) - state := light.NewState(ctx, header, lc.Odr()) - state.SetBalance(testBankAddress, math.MaxBig256) - msg := callmsg{types.NewMessage(testBankAddress, &testContractAddr, 0, new(big.Int), 100000, new(big.Int), data, false)} - context := core.NewEVMContext(msg, header, lc, nil) - vmenv := vm.NewEVM(context, state, config, vm.Config{}) - gp := new(core.GasPool).AddGas(math.MaxUint64) - ret, _, _, _ := core.ApplyMessage(vmenv, msg, gp) - if state.Error() == nil { - res = append(res, ret...) - } - } - } - return res -} - -// testOdr tests odr requests whose validation guaranteed by block headers. -func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { - // Assemble the test environment - server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, true) - defer tearDown() - client.pm.synchronise(client.rPeer) - - test := func(expFail uint64) { - for i := uint64(0); i <= server.pm.blockchain.CurrentHeader().Number.Uint64(); i++ { - bhash := rawdb.ReadCanonicalHash(server.db, i) - b1 := fn(light.NoOdr, server.db, server.pm.chainConfig, server.pm.blockchain.(*core.BlockChain), nil, bhash) - - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() - b2 := fn(ctx, client.db, client.pm.chainConfig, nil, client.pm.blockchain.(*light.LightChain), bhash) - - eq := bytes.Equal(b1, b2) - exp := i < expFail - if exp && !eq { - t.Errorf("odr mismatch") - } - if !exp && eq { - t.Errorf("unexpected odr match") - } - } - } - // temporarily remove peer to test odr fails - // expect retrievals to fail (except genesis block) without a les peer - client.peers.Unregister(client.rPeer.id) - time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed - test(expFail) - // expect all retrievals to pass - client.peers.Register(client.rPeer) - time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed - client.peers.lock.Lock() - client.rPeer.hasBlock = func(common.Hash, uint64, bool) bool { return true } - client.peers.lock.Unlock() - test(5) - // still expect all retrievals to pass, now data should be cached locally - client.peers.Unregister(client.rPeer.id) - time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed - test(5) -} diff --git a/les/peer.go b/les/peer.go deleted file mode 100644 index bd849516b..000000000 --- a/les/peer.go +++ /dev/null @@ -1,656 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package les implements the Light Ethereum Subprotocol. -package les - -import ( - "errors" - "fmt" - "math/big" - "sync" - "time" - - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/core/types" - "github.com/dexon-foundation/dexon/eth" - "github.com/dexon-foundation/dexon/les/flowcontrol" - "github.com/dexon-foundation/dexon/light" - "github.com/dexon-foundation/dexon/p2p" - "github.com/dexon-foundation/dexon/rlp" -) - -var ( - errClosed = errors.New("peer set is closed") - errAlreadyRegistered = errors.New("peer is already registered") - errNotRegistered = errors.New("peer is not registered") - errInvalidHelpTrieReq = errors.New("invalid help trie request") -) - -const maxResponseErrors = 50 // number of invalid responses tolerated (makes the protocol less brittle but still avoids spam) - -const ( - announceTypeNone = iota - announceTypeSimple - announceTypeSigned -) - -type peer struct { - *p2p.Peer - - rw p2p.MsgReadWriter - - version int // Protocol version negotiated - network uint64 // Network ID being on - - announceType, requestAnnounceType uint64 - - id string - - headInfo *announceData - lock sync.RWMutex - - announceChn chan announceData - sendQueue *execQueue - - poolEntry *poolEntry - hasBlock func(common.Hash, uint64, bool) bool - responseErrors int - - fcClient *flowcontrol.ClientNode // nil if the peer is server only - fcServer *flowcontrol.ServerNode // nil if the peer is client only - fcServerParams *flowcontrol.ServerParams - fcCosts requestCostTable -} - -func newPeer(version int, network uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { - id := p.ID() - - return &peer{ - Peer: p, - rw: rw, - version: version, - network: network, - id: fmt.Sprintf("%x", id[:8]), - announceChn: make(chan announceData, 20), - } -} - -func (p *peer) canQueue() bool { - return p.sendQueue.canQueue() -} - -func (p *peer) queueSend(f func()) { - p.sendQueue.queue(f) -} - -// Info gathers and returns a collection of metadata known about a peer. -func (p *peer) Info() *eth.PeerInfo { - return ð.PeerInfo{ - Version: p.version, - Difficulty: p.Td(), - Head: fmt.Sprintf("%x", p.Head()), - } -} - -// Head retrieves a copy of the current head (most recent) hash of the peer. -func (p *peer) Head() (hash common.Hash) { - p.lock.RLock() - defer p.lock.RUnlock() - - copy(hash[:], p.headInfo.Hash[:]) - return hash -} - -func (p *peer) HeadAndTd() (hash common.Hash, td *big.Int) { - p.lock.RLock() - defer p.lock.RUnlock() - - copy(hash[:], p.headInfo.Hash[:]) - return hash, p.headInfo.Td -} - -func (p *peer) headBlockInfo() blockInfo { - p.lock.RLock() - defer p.lock.RUnlock() - - return blockInfo{Hash: p.headInfo.Hash, Number: p.headInfo.Number, Td: p.headInfo.Td} -} - -// Td retrieves the current total difficulty of a peer. -func (p *peer) Td() *big.Int { - p.lock.RLock() - defer p.lock.RUnlock() - - return new(big.Int).Set(p.headInfo.Td) -} - -// waitBefore implements distPeer interface -func (p *peer) waitBefore(maxCost uint64) (time.Duration, float64) { - return p.fcServer.CanSend(maxCost) -} - -func sendRequest(w p2p.MsgWriter, msgcode, reqID, cost uint64, data interface{}) error { - type req struct { - ReqID uint64 - Data interface{} - } - return p2p.Send(w, msgcode, req{reqID, data}) -} - -func sendResponse(w p2p.MsgWriter, msgcode, reqID, bv uint64, data interface{}) error { - type resp struct { - ReqID, BV uint64 - Data interface{} - } - return p2p.Send(w, msgcode, resp{reqID, bv, data}) -} - -func (p *peer) GetRequestCost(msgcode uint64, amount int) uint64 { - p.lock.RLock() - defer p.lock.RUnlock() - - cost := p.fcCosts[msgcode].baseCost + p.fcCosts[msgcode].reqCost*uint64(amount) - if cost > p.fcServerParams.BufLimit { - cost = p.fcServerParams.BufLimit - } - return cost -} - -// HasBlock checks if the peer has a given block -func (p *peer) HasBlock(hash common.Hash, number uint64, hasState bool) bool { - p.lock.RLock() - hasBlock := p.hasBlock - p.lock.RUnlock() - return hasBlock != nil && hasBlock(hash, number, hasState) -} - -// SendAnnounce announces the availability of a number of blocks through -// a hash notification. -func (p *peer) SendAnnounce(request announceData) error { - return p2p.Send(p.rw, AnnounceMsg, request) -} - -// SendBlockHeaders sends a batch of block headers to the remote peer. -func (p *peer) SendBlockHeaders(reqID, bv uint64, headers []*types.Header) error { - return sendResponse(p.rw, BlockHeadersMsg, reqID, bv, headers) -} - -// SendBlockBodiesRLP sends a batch of block contents to the remote peer from -// an already RLP encoded format. -func (p *peer) SendBlockBodiesRLP(reqID, bv uint64, bodies []rlp.RawValue) error { - return sendResponse(p.rw, BlockBodiesMsg, reqID, bv, bodies) -} - -// SendCodeRLP sends a batch of arbitrary internal data, corresponding to the -// hashes requested. -func (p *peer) SendCode(reqID, bv uint64, data [][]byte) error { - return sendResponse(p.rw, CodeMsg, reqID, bv, data) -} - -// SendReceiptsRLP sends a batch of transaction receipts, corresponding to the -// ones requested from an already RLP encoded format. -func (p *peer) SendReceiptsRLP(reqID, bv uint64, receipts []rlp.RawValue) error { - return sendResponse(p.rw, ReceiptsMsg, reqID, bv, receipts) -} - -// SendProofs sends a batch of legacy LES/1 merkle proofs, corresponding to the ones requested. -func (p *peer) SendProofs(reqID, bv uint64, proofs proofsData) error { - return sendResponse(p.rw, ProofsV1Msg, reqID, bv, proofs) -} - -// SendProofsV2 sends a batch of merkle proofs, corresponding to the ones requested. -func (p *peer) SendProofsV2(reqID, bv uint64, proofs light.NodeList) error { - return sendResponse(p.rw, ProofsV2Msg, reqID, bv, proofs) -} - -// SendHeaderProofs sends a batch of legacy LES/1 header proofs, corresponding to the ones requested. -func (p *peer) SendHeaderProofs(reqID, bv uint64, proofs []ChtResp) error { - return sendResponse(p.rw, HeaderProofsMsg, reqID, bv, proofs) -} - -// SendHelperTrieProofs sends a batch of HelperTrie proofs, corresponding to the ones requested. -func (p *peer) SendHelperTrieProofs(reqID, bv uint64, resp HelperTrieResps) error { - return sendResponse(p.rw, HelperTrieProofsMsg, reqID, bv, resp) -} - -// SendTxStatus sends a batch of transaction status records, corresponding to the ones requested. -func (p *peer) SendTxStatus(reqID, bv uint64, stats []txStatus) error { - return sendResponse(p.rw, TxStatusMsg, reqID, bv, stats) -} - -// RequestHeadersByHash fetches a batch of blocks' headers corresponding to the -// specified header query, based on the hash of an origin block. -func (p *peer) RequestHeadersByHash(reqID, cost uint64, origin common.Hash, amount int, skip int, reverse bool) error { - p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse) - return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Hash: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) -} - -// RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the -// specified header query, based on the number of an origin block. -func (p *peer) RequestHeadersByNumber(reqID, cost, origin uint64, amount int, skip int, reverse bool) error { - p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse) - return sendRequest(p.rw, GetBlockHeadersMsg, reqID, cost, &getBlockHeadersData{Origin: hashOrNumber{Number: origin}, Amount: uint64(amount), Skip: uint64(skip), Reverse: reverse}) -} - -// RequestBodies fetches a batch of blocks' bodies corresponding to the hashes -// specified. -func (p *peer) RequestBodies(reqID, cost uint64, hashes []common.Hash) error { - p.Log().Debug("Fetching batch of block bodies", "count", len(hashes)) - return sendRequest(p.rw, GetBlockBodiesMsg, reqID, cost, hashes) -} - -// RequestCode fetches a batch of arbitrary data from a node's known state -// data, corresponding to the specified hashes. -func (p *peer) RequestCode(reqID, cost uint64, reqs []CodeReq) error { - p.Log().Debug("Fetching batch of codes", "count", len(reqs)) - return sendRequest(p.rw, GetCodeMsg, reqID, cost, reqs) -} - -// RequestReceipts fetches a batch of transaction receipts from a remote node. -func (p *peer) RequestReceipts(reqID, cost uint64, hashes []common.Hash) error { - p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) - return sendRequest(p.rw, GetReceiptsMsg, reqID, cost, hashes) -} - -// RequestProofs fetches a batch of merkle proofs from a remote node. -func (p *peer) RequestProofs(reqID, cost uint64, reqs []ProofReq) error { - p.Log().Debug("Fetching batch of proofs", "count", len(reqs)) - switch p.version { - case lpv1: - return sendRequest(p.rw, GetProofsV1Msg, reqID, cost, reqs) - case lpv2: - return sendRequest(p.rw, GetProofsV2Msg, reqID, cost, reqs) - default: - panic(nil) - } -} - -// RequestHelperTrieProofs fetches a batch of HelperTrie merkle proofs from a remote node. -func (p *peer) RequestHelperTrieProofs(reqID, cost uint64, data interface{}) error { - switch p.version { - case lpv1: - reqs, ok := data.([]ChtReq) - if !ok { - return errInvalidHelpTrieReq - } - p.Log().Debug("Fetching batch of header proofs", "count", len(reqs)) - return sendRequest(p.rw, GetHeaderProofsMsg, reqID, cost, reqs) - case lpv2: - reqs, ok := data.([]HelperTrieReq) - if !ok { - return errInvalidHelpTrieReq - } - p.Log().Debug("Fetching batch of HelperTrie proofs", "count", len(reqs)) - return sendRequest(p.rw, GetHelperTrieProofsMsg, reqID, cost, reqs) - default: - panic(nil) - } -} - -// RequestTxStatus fetches a batch of transaction status records from a remote node. -func (p *peer) RequestTxStatus(reqID, cost uint64, txHashes []common.Hash) error { - p.Log().Debug("Requesting transaction status", "count", len(txHashes)) - return sendRequest(p.rw, GetTxStatusMsg, reqID, cost, txHashes) -} - -// SendTxStatus sends a batch of transactions to be added to the remote transaction pool. -func (p *peer) SendTxs(reqID, cost uint64, txs types.Transactions) error { - p.Log().Debug("Fetching batch of transactions", "count", len(txs)) - switch p.version { - case lpv1: - return p2p.Send(p.rw, SendTxMsg, txs) // old message format does not include reqID - case lpv2: - return sendRequest(p.rw, SendTxV2Msg, reqID, cost, txs) - default: - panic(nil) - } -} - -type keyValueEntry struct { - Key string - Value rlp.RawValue -} -type keyValueList []keyValueEntry -type keyValueMap map[string]rlp.RawValue - -func (l keyValueList) add(key string, val interface{}) keyValueList { - var entry keyValueEntry - entry.Key = key - if val == nil { - val = uint64(0) - } - enc, err := rlp.EncodeToBytes(val) - if err == nil { - entry.Value = enc - } - return append(l, entry) -} - -func (l keyValueList) decode() keyValueMap { - m := make(keyValueMap) - for _, entry := range l { - m[entry.Key] = entry.Value - } - return m -} - -func (m keyValueMap) get(key string, val interface{}) error { - enc, ok := m[key] - if !ok { - return errResp(ErrMissingKey, "%s", key) - } - if val == nil { - return nil - } - return rlp.DecodeBytes(enc, val) -} - -func (p *peer) sendReceiveHandshake(sendList keyValueList) (keyValueList, error) { - // Send out own handshake in a new thread - errc := make(chan error, 1) - go func() { - errc <- p2p.Send(p.rw, StatusMsg, sendList) - }() - // In the mean time retrieve the remote status message - msg, err := p.rw.ReadMsg() - if err != nil { - return nil, err - } - if msg.Code != StatusMsg { - return nil, errResp(ErrNoStatusMsg, "first msg has code %x (!= %x)", msg.Code, StatusMsg) - } - if msg.Size > ProtocolMaxMsgSize { - return nil, errResp(ErrMsgTooLarge, "%v > %v", msg.Size, ProtocolMaxMsgSize) - } - // Decode the handshake - var recvList keyValueList - if err := msg.Decode(&recvList); err != nil { - return nil, errResp(ErrDecode, "msg %v: %v", msg, err) - } - if err := <-errc; err != nil { - return nil, err - } - return recvList, nil -} - -// Handshake executes the les protocol handshake, negotiating version number, -// network IDs, difficulties, head and genesis blocks. -func (p *peer) Handshake(td *big.Int, head common.Hash, headNum uint64, genesis common.Hash, server *LesServer) error { - p.lock.Lock() - defer p.lock.Unlock() - - var send keyValueList - send = send.add("protocolVersion", uint64(p.version)) - send = send.add("networkId", p.network) - send = send.add("headTd", td) - send = send.add("headHash", head) - send = send.add("headNum", headNum) - send = send.add("genesisHash", genesis) - if server != nil { - send = send.add("serveHeaders", nil) - send = send.add("serveChainSince", uint64(0)) - send = send.add("serveStateSince", uint64(0)) - send = send.add("txRelay", nil) - send = send.add("flowControl/BL", server.defParams.BufLimit) - send = send.add("flowControl/MRR", server.defParams.MinRecharge) - list := server.fcCostStats.getCurrentList() - send = send.add("flowControl/MRC", list) - p.fcCosts = list.decode() - } else { - p.requestAnnounceType = announceTypeSimple // set to default until "very light" client mode is implemented - send = send.add("announceType", p.requestAnnounceType) - } - recvList, err := p.sendReceiveHandshake(send) - if err != nil { - return err - } - recv := recvList.decode() - - var rGenesis, rHash common.Hash - var rVersion, rNetwork, rNum uint64 - var rTd *big.Int - - if err := recv.get("protocolVersion", &rVersion); err != nil { - return err - } - if err := recv.get("networkId", &rNetwork); err != nil { - return err - } - if err := recv.get("headTd", &rTd); err != nil { - return err - } - if err := recv.get("headHash", &rHash); err != nil { - return err - } - if err := recv.get("headNum", &rNum); err != nil { - return err - } - if err := recv.get("genesisHash", &rGenesis); err != nil { - return err - } - - if rGenesis != genesis { - return errResp(ErrGenesisBlockMismatch, "%x (!= %x)", rGenesis[:8], genesis[:8]) - } - if rNetwork != p.network { - return errResp(ErrNetworkIdMismatch, "%d (!= %d)", rNetwork, p.network) - } - if int(rVersion) != p.version { - return errResp(ErrProtocolVersionMismatch, "%d (!= %d)", rVersion, p.version) - } - if server != nil { - // until we have a proper peer connectivity API, allow LES connection to other servers - /*if recv.get("serveStateSince", nil) == nil { - return errResp(ErrUselessPeer, "wanted client, got server") - }*/ - if recv.get("announceType", &p.announceType) != nil { - p.announceType = announceTypeSimple - } - p.fcClient = flowcontrol.NewClientNode(server.fcManager, server.defParams) - } else { - if recv.get("serveChainSince", nil) != nil { - return errResp(ErrUselessPeer, "peer cannot serve chain") - } - if recv.get("serveStateSince", nil) != nil { - return errResp(ErrUselessPeer, "peer cannot serve state") - } - if recv.get("txRelay", nil) != nil { - return errResp(ErrUselessPeer, "peer cannot relay transactions") - } - params := &flowcontrol.ServerParams{} - if err := recv.get("flowControl/BL", ¶ms.BufLimit); err != nil { - return err - } - if err := recv.get("flowControl/MRR", ¶ms.MinRecharge); err != nil { - return err - } - var MRC RequestCostList - if err := recv.get("flowControl/MRC", &MRC); err != nil { - return err - } - p.fcServerParams = params - p.fcServer = flowcontrol.NewServerNode(params) - p.fcCosts = MRC.decode() - } - - p.headInfo = &announceData{Td: rTd, Hash: rHash, Number: rNum} - return nil -} - -// String implements fmt.Stringer. -func (p *peer) String() string { - return fmt.Sprintf("Peer %s [%s]", p.id, - fmt.Sprintf("les/%d", p.version), - ) -} - -// peerSetNotify is a callback interface to notify services about added or -// removed peers -type peerSetNotify interface { - registerPeer(*peer) - unregisterPeer(*peer) -} - -// peerSet represents the collection of active peers currently participating in -// the Light Ethereum sub-protocol. -type peerSet struct { - peers map[string]*peer - lock sync.RWMutex - notifyList []peerSetNotify - closed bool -} - -// newPeerSet creates a new peer set to track the active participants. -func newPeerSet() *peerSet { - return &peerSet{ - peers: make(map[string]*peer), - } -} - -// notify adds a service to be notified about added or removed peers -func (ps *peerSet) notify(n peerSetNotify) { - ps.lock.Lock() - ps.notifyList = append(ps.notifyList, n) - peers := make([]*peer, 0, len(ps.peers)) - for _, p := range ps.peers { - peers = append(peers, p) - } - ps.lock.Unlock() - - for _, p := range peers { - n.registerPeer(p) - } -} - -// Register injects a new peer into the working set, or returns an error if the -// peer is already known. -func (ps *peerSet) Register(p *peer) error { - ps.lock.Lock() - if ps.closed { - ps.lock.Unlock() - return errClosed - } - if _, ok := ps.peers[p.id]; ok { - ps.lock.Unlock() - return errAlreadyRegistered - } - ps.peers[p.id] = p - p.sendQueue = newExecQueue(100) - peers := make([]peerSetNotify, len(ps.notifyList)) - copy(peers, ps.notifyList) - ps.lock.Unlock() - - for _, n := range peers { - n.registerPeer(p) - } - return nil -} - -// Unregister removes a remote peer from the active set, disabling any further -// actions to/from that particular entity. It also initiates disconnection at the networking layer. -func (ps *peerSet) Unregister(id string) error { - ps.lock.Lock() - if p, ok := ps.peers[id]; !ok { - ps.lock.Unlock() - return errNotRegistered - } else { - delete(ps.peers, id) - peers := make([]peerSetNotify, len(ps.notifyList)) - copy(peers, ps.notifyList) - ps.lock.Unlock() - - for _, n := range peers { - n.unregisterPeer(p) - } - p.sendQueue.quit() - p.Peer.Disconnect(p2p.DiscUselessPeer) - return nil - } -} - -// AllPeerIDs returns a list of all registered peer IDs -func (ps *peerSet) AllPeerIDs() []string { - ps.lock.RLock() - defer ps.lock.RUnlock() - - res := make([]string, len(ps.peers)) - idx := 0 - for id := range ps.peers { - res[idx] = id - idx++ - } - return res -} - -// Peer retrieves the registered peer with the given id. -func (ps *peerSet) Peer(id string) *peer { - ps.lock.RLock() - defer ps.lock.RUnlock() - - return ps.peers[id] -} - -// Len returns if the current number of peers in the set. -func (ps *peerSet) Len() int { - ps.lock.RLock() - defer ps.lock.RUnlock() - - return len(ps.peers) -} - -// BestPeer retrieves the known peer with the currently highest total difficulty. -func (ps *peerSet) BestPeer() *peer { - ps.lock.RLock() - defer ps.lock.RUnlock() - - var ( - bestPeer *peer - bestTd *big.Int - ) - for _, p := range ps.peers { - if td := p.Td(); bestPeer == nil || td.Cmp(bestTd) > 0 { - bestPeer, bestTd = p, td - } - } - return bestPeer -} - -// AllPeers returns all peers in a list -func (ps *peerSet) AllPeers() []*peer { - ps.lock.RLock() - defer ps.lock.RUnlock() - - list := make([]*peer, len(ps.peers)) - i := 0 - for _, peer := range ps.peers { - list[i] = peer - i++ - } - return list -} - -// Close disconnects all peers. -// No new peers can be registered after Close has returned. -func (ps *peerSet) Close() { - ps.lock.Lock() - defer ps.lock.Unlock() - - for _, p := range ps.peers { - p.Disconnect(p2p.DiscQuitting) - } - ps.closed = true -} diff --git a/les/protocol.go b/les/protocol.go deleted file mode 100644 index 4d4519a42..000000000 --- a/les/protocol.go +++ /dev/null @@ -1,226 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package les implements the Light Ethereum Subprotocol. -package les - -import ( - "crypto/ecdsa" - "errors" - "fmt" - "io" - "math/big" - - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/core" - "github.com/dexon-foundation/dexon/core/rawdb" - "github.com/dexon-foundation/dexon/crypto" - "github.com/dexon-foundation/dexon/p2p/enode" - "github.com/dexon-foundation/dexon/rlp" -) - -// Constants to match up protocol versions and messages -const ( - lpv1 = 1 - lpv2 = 2 -) - -// Supported versions of the les protocol (first is primary) -var ( - ClientProtocolVersions = []uint{lpv2, lpv1} - ServerProtocolVersions = []uint{lpv2, lpv1} - AdvertiseProtocolVersions = []uint{lpv2} // clients are searching for the first advertised protocol in the list -) - -// Number of implemented message corresponding to different protocol versions. -var ProtocolLengths = map[uint]uint64{lpv1: 15, lpv2: 22} - -const ( - NetworkId = 1 - ProtocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message -) - -// les protocol message codes -const ( - // Protocol messages belonging to LPV1 - StatusMsg = 0x00 - AnnounceMsg = 0x01 - GetBlockHeadersMsg = 0x02 - BlockHeadersMsg = 0x03 - GetBlockBodiesMsg = 0x04 - BlockBodiesMsg = 0x05 - GetReceiptsMsg = 0x06 - ReceiptsMsg = 0x07 - GetProofsV1Msg = 0x08 - ProofsV1Msg = 0x09 - GetCodeMsg = 0x0a - CodeMsg = 0x0b - SendTxMsg = 0x0c - GetHeaderProofsMsg = 0x0d - HeaderProofsMsg = 0x0e - // Protocol messages belonging to LPV2 - GetProofsV2Msg = 0x0f - ProofsV2Msg = 0x10 - GetHelperTrieProofsMsg = 0x11 - HelperTrieProofsMsg = 0x12 - SendTxV2Msg = 0x13 - GetTxStatusMsg = 0x14 - TxStatusMsg = 0x15 -) - -type errCode int - -const ( - ErrMsgTooLarge = iota - ErrDecode - ErrInvalidMsgCode - ErrProtocolVersionMismatch - ErrNetworkIdMismatch - ErrGenesisBlockMismatch - ErrNoStatusMsg - ErrExtraStatusMsg - ErrSuspendedPeer - ErrUselessPeer - ErrRequestRejected - ErrUnexpectedResponse - ErrInvalidResponse - ErrTooManyTimeouts - ErrMissingKey -) - -func (e errCode) String() string { - return errorToString[int(e)] -} - -// XXX change once legacy code is out -var errorToString = map[int]string{ - ErrMsgTooLarge: "Message too long", - ErrDecode: "Invalid message", - ErrInvalidMsgCode: "Invalid message code", - ErrProtocolVersionMismatch: "Protocol version mismatch", - ErrNetworkIdMismatch: "NetworkId mismatch", - ErrGenesisBlockMismatch: "Genesis block mismatch", - ErrNoStatusMsg: "No status message", - ErrExtraStatusMsg: "Extra status message", - ErrSuspendedPeer: "Suspended peer", - ErrRequestRejected: "Request rejected", - ErrUnexpectedResponse: "Unexpected response", - ErrInvalidResponse: "Invalid response", - ErrTooManyTimeouts: "Too many request timeouts", - ErrMissingKey: "Key missing from list", -} - -type announceBlock struct { - Hash common.Hash // Hash of one particular block being announced - Number uint64 // Number of one particular block being announced - Td *big.Int // Total difficulty of one particular block being announced -} - -// announceData is the network packet for the block announcements. -type announceData struct { - Hash common.Hash // Hash of one particular block being announced - Number uint64 // Number of one particular block being announced - Td *big.Int // Total difficulty of one particular block being announced - ReorgDepth uint64 - Update keyValueList -} - -// sign adds a signature to the block announcement by the given privKey -func (a *announceData) sign(privKey *ecdsa.PrivateKey) { - rlp, _ := rlp.EncodeToBytes(announceBlock{a.Hash, a.Number, a.Td}) - sig, _ := crypto.Sign(crypto.Keccak256(rlp), privKey) - a.Update = a.Update.add("sign", sig) -} - -// checkSignature verifies if the block announcement has a valid signature by the given pubKey -func (a *announceData) checkSignature(id enode.ID) error { - var sig []byte - if err := a.Update.decode().get("sign", &sig); err != nil { - return err - } - rlp, _ := rlp.EncodeToBytes(announceBlock{a.Hash, a.Number, a.Td}) - recPubkey, err := crypto.SigToPub(crypto.Keccak256(rlp), sig) - if err != nil { - return err - } - if id == enode.PubkeyToIDV4(recPubkey) { - return nil - } - return errors.New("wrong signature") -} - -type blockInfo struct { - Hash common.Hash // Hash of one particular block being announced - Number uint64 // Number of one particular block being announced - Td *big.Int // Total difficulty of one particular block being announced -} - -// getBlockHeadersData represents a block header query. -type getBlockHeadersData struct { - Origin hashOrNumber // Block from which to retrieve headers - Amount uint64 // Maximum number of headers to retrieve - Skip uint64 // Blocks to skip between consecutive headers - Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis) -} - -// hashOrNumber is a combined field for specifying an origin block. -type hashOrNumber struct { - Hash common.Hash // Block hash from which to retrieve headers (excludes Number) - Number uint64 // Block hash from which to retrieve headers (excludes Hash) -} - -// EncodeRLP is a specialized encoder for hashOrNumber to encode only one of the -// two contained union fields. -func (hn *hashOrNumber) EncodeRLP(w io.Writer) error { - if hn.Hash == (common.Hash{}) { - return rlp.Encode(w, hn.Number) - } - if hn.Number != 0 { - return fmt.Errorf("both origin hash (%x) and number (%d) provided", hn.Hash, hn.Number) - } - return rlp.Encode(w, hn.Hash) -} - -// DecodeRLP is a specialized decoder for hashOrNumber to decode the contents -// into either a block hash or a block number. -func (hn *hashOrNumber) DecodeRLP(s *rlp.Stream) error { - _, size, _ := s.Kind() - origin, err := s.Raw() - if err == nil { - switch { - case size == 32: - err = rlp.DecodeBytes(origin, &hn.Hash) - case size <= 8: - err = rlp.DecodeBytes(origin, &hn.Number) - default: - err = fmt.Errorf("invalid input size %d for origin", size) - } - } - return err -} - -// CodeData is the network response packet for a node data retrieval. -type CodeData []struct { - Value []byte -} - -type proofsData [][]rlp.RawValue - -type txStatus struct { - Status core.TxStatus - Lookup *rawdb.TxLookupEntry `rlp:"nil"` - Error string -} diff --git a/les/randselect.go b/les/randselect.go deleted file mode 100644 index 1cc1d3d3e..000000000 --- a/les/randselect.go +++ /dev/null @@ -1,170 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package les implements the Light Ethereum Subprotocol. -package les - -import ( - "math/rand" -) - -// wrsItem interface should be implemented by any entries that are to be selected from -// a weightedRandomSelect set. Note that recalculating monotonously decreasing item -// weights on-demand (without constantly calling update) is allowed -type wrsItem interface { - Weight() int64 -} - -// weightedRandomSelect is capable of weighted random selection from a set of items -type weightedRandomSelect struct { - root *wrsNode - idx map[wrsItem]int -} - -// newWeightedRandomSelect returns a new weightedRandomSelect structure -func newWeightedRandomSelect() *weightedRandomSelect { - return &weightedRandomSelect{root: &wrsNode{maxItems: wrsBranches}, idx: make(map[wrsItem]int)} -} - -// update updates an item's weight, adds it if it was non-existent or removes it if -// the new weight is zero. Note that explicitly updating decreasing weights is not necessary. -func (w *weightedRandomSelect) update(item wrsItem) { - w.setWeight(item, item.Weight()) -} - -// remove removes an item from the set -func (w *weightedRandomSelect) remove(item wrsItem) { - w.setWeight(item, 0) -} - -// setWeight sets an item's weight to a specific value (removes it if zero) -func (w *weightedRandomSelect) setWeight(item wrsItem, weight int64) { - idx, ok := w.idx[item] - if ok { - w.root.setWeight(idx, weight) - if weight == 0 { - delete(w.idx, item) - } - } else { - if weight != 0 { - if w.root.itemCnt == w.root.maxItems { - // add a new level - newRoot := &wrsNode{sumWeight: w.root.sumWeight, itemCnt: w.root.itemCnt, level: w.root.level + 1, maxItems: w.root.maxItems * wrsBranches} - newRoot.items[0] = w.root - newRoot.weights[0] = w.root.sumWeight - w.root = newRoot - } - w.idx[item] = w.root.insert(item, weight) - } - } -} - -// choose randomly selects an item from the set, with a chance proportional to its -// current weight. If the weight of the chosen element has been decreased since the -// last stored value, returns it with a newWeight/oldWeight chance, otherwise just -// updates its weight and selects another one -func (w *weightedRandomSelect) choose() wrsItem { - for { - if w.root.sumWeight == 0 { - return nil - } - val := rand.Int63n(w.root.sumWeight) - choice, lastWeight := w.root.choose(val) - weight := choice.Weight() - if weight != lastWeight { - w.setWeight(choice, weight) - } - if weight >= lastWeight || rand.Int63n(lastWeight) < weight { - return choice - } - } -} - -const wrsBranches = 8 // max number of branches in the wrsNode tree - -// wrsNode is a node of a tree structure that can store wrsItems or further wrsNodes. -type wrsNode struct { - items [wrsBranches]interface{} - weights [wrsBranches]int64 - sumWeight int64 - level, itemCnt, maxItems int -} - -// insert recursively inserts a new item to the tree and returns the item index -func (n *wrsNode) insert(item wrsItem, weight int64) int { - branch := 0 - for n.items[branch] != nil && (n.level == 0 || n.items[branch].(*wrsNode).itemCnt == n.items[branch].(*wrsNode).maxItems) { - branch++ - if branch == wrsBranches { - panic(nil) - } - } - n.itemCnt++ - n.sumWeight += weight - n.weights[branch] += weight - if n.level == 0 { - n.items[branch] = item - return branch - } - var subNode *wrsNode - if n.items[branch] == nil { - subNode = &wrsNode{maxItems: n.maxItems / wrsBranches, level: n.level - 1} - n.items[branch] = subNode - } else { - subNode = n.items[branch].(*wrsNode) - } - subIdx := subNode.insert(item, weight) - return subNode.maxItems*branch + subIdx -} - -// setWeight updates the weight of a certain item (which should exist) and returns -// the change of the last weight value stored in the tree -func (n *wrsNode) setWeight(idx int, weight int64) int64 { - if n.level == 0 { - oldWeight := n.weights[idx] - n.weights[idx] = weight - diff := weight - oldWeight - n.sumWeight += diff - if weight == 0 { - n.items[idx] = nil - n.itemCnt-- - } - return diff - } - branchItems := n.maxItems / wrsBranches - branch := idx / branchItems - diff := n.items[branch].(*wrsNode).setWeight(idx-branch*branchItems, weight) - n.weights[branch] += diff - n.sumWeight += diff - if weight == 0 { - n.itemCnt-- - } - return diff -} - -// choose recursively selects an item from the tree and returns it along with its weight -func (n *wrsNode) choose(val int64) (wrsItem, int64) { - for i, w := range n.weights { - if val < w { - if n.level == 0 { - return n.items[i].(wrsItem), n.weights[i] - } - return n.items[i].(*wrsNode).choose(val) - } - val -= w - } - panic(nil) -} diff --git a/les/randselect_test.go b/les/randselect_test.go deleted file mode 100644 index 9ae7726dd..000000000 --- a/les/randselect_test.go +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package les - -import ( - "math/rand" - "testing" -) - -type testWrsItem struct { - idx int - widx *int -} - -func (t *testWrsItem) Weight() int64 { - w := *t.widx - if w == -1 || w == t.idx { - return int64(t.idx + 1) - } - return 0 -} - -func TestWeightedRandomSelect(t *testing.T) { - testFn := func(cnt int) { - s := newWeightedRandomSelect() - w := -1 - list := make([]testWrsItem, cnt) - for i := range list { - list[i] = testWrsItem{idx: i, widx: &w} - s.update(&list[i]) - } - w = rand.Intn(cnt) - c := s.choose() - if c == nil { - t.Errorf("expected item, got nil") - } else { - if c.(*testWrsItem).idx != w { - t.Errorf("expected another item") - } - } - w = -2 - if s.choose() != nil { - t.Errorf("expected nil, got item") - } - } - testFn(1) - testFn(10) - testFn(100) - testFn(1000) - testFn(10000) - testFn(100000) - testFn(1000000) -} diff --git a/les/request_test.go b/les/request_test.go deleted file mode 100644 index c99ebab53..000000000 --- a/les/request_test.go +++ /dev/null @@ -1,122 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package les - -import ( - "context" - "testing" - "time" - - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/core/rawdb" - "github.com/dexon-foundation/dexon/crypto" - "github.com/dexon-foundation/dexon/ethdb" - "github.com/dexon-foundation/dexon/light" -) - -var testBankSecureTrieKey = secAddr(testBankAddress) - -func secAddr(addr common.Address) []byte { - return crypto.Keccak256(addr[:]) -} - -type accessTestFn func(db ethdb.Database, bhash common.Hash, number uint64) light.OdrRequest - -func TestBlockAccessLes1(t *testing.T) { testAccess(t, 1, tfBlockAccess) } - -func TestBlockAccessLes2(t *testing.T) { testAccess(t, 2, tfBlockAccess) } - -func tfBlockAccess(db ethdb.Database, bhash common.Hash, number uint64) light.OdrRequest { - return &light.BlockRequest{Hash: bhash, Number: number} -} - -func TestReceiptsAccessLes1(t *testing.T) { testAccess(t, 1, tfReceiptsAccess) } - -func TestReceiptsAccessLes2(t *testing.T) { testAccess(t, 2, tfReceiptsAccess) } - -func tfReceiptsAccess(db ethdb.Database, bhash common.Hash, number uint64) light.OdrRequest { - return &light.ReceiptsRequest{Hash: bhash, Number: number} -} - -func TestTrieEntryAccessLes1(t *testing.T) { testAccess(t, 1, tfTrieEntryAccess) } - -func TestTrieEntryAccessLes2(t *testing.T) { testAccess(t, 2, tfTrieEntryAccess) } - -func tfTrieEntryAccess(db ethdb.Database, bhash common.Hash, number uint64) light.OdrRequest { - if number := rawdb.ReadHeaderNumber(db, bhash); number != nil { - return &light.TrieRequest{Id: light.StateTrieID(rawdb.ReadHeader(db, bhash, *number)), Key: testBankSecureTrieKey} - } - return nil -} - -func TestCodeAccessLes1(t *testing.T) { testAccess(t, 1, tfCodeAccess) } - -func TestCodeAccessLes2(t *testing.T) { testAccess(t, 2, tfCodeAccess) } - -func tfCodeAccess(db ethdb.Database, bhash common.Hash, num uint64) light.OdrRequest { - number := rawdb.ReadHeaderNumber(db, bhash) - if number != nil { - return nil - } - header := rawdb.ReadHeader(db, bhash, *number) - if header.Number.Uint64() < testContractDeployed { - return nil - } - sti := light.StateTrieID(header) - ci := light.StorageTrieID(sti, crypto.Keccak256Hash(testContractAddr[:]), common.Hash{}) - return &light.CodeRequest{Id: ci, Hash: crypto.Keccak256Hash(testContractCodeDeployed)} -} - -func testAccess(t *testing.T, protocol int, fn accessTestFn) { - // Assemble the test environment - server, client, tearDown := newClientServerEnv(t, 4, protocol, nil, true) - defer tearDown() - client.pm.synchronise(client.rPeer) - - test := func(expFail uint64) { - for i := uint64(0); i <= server.pm.blockchain.CurrentHeader().Number.Uint64(); i++ { - bhash := rawdb.ReadCanonicalHash(server.db, i) - if req := fn(client.db, bhash, i); req != nil { - ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) - defer cancel() - err := client.pm.odr.Retrieve(ctx, req) - got := err == nil - exp := i < expFail - if exp && !got { - t.Errorf("object retrieval failed") - } - if !exp && got { - t.Errorf("unexpected object retrieval success") - } - } - } - } - - // temporarily remove peer to test odr fails - client.peers.Unregister(client.rPeer.id) - time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed - // expect retrievals to fail (except genesis block) without a les peer - test(0) - - client.peers.Register(client.rPeer) - time.Sleep(time.Millisecond * 10) // ensure that all peerSetNotify callbacks are executed - client.rPeer.lock.Lock() - client.rPeer.hasBlock = func(common.Hash, uint64, bool) bool { return true } - client.rPeer.lock.Unlock() - // expect all retrievals to pass - test(5) -} diff --git a/les/retrieve.go b/les/retrieve.go deleted file mode 100644 index bc8763f3e..000000000 --- a/les/retrieve.go +++ /dev/null @@ -1,414 +0,0 @@ -// Copyright 2017 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package light implements on-demand retrieval capable state and chain objects -// for the Ethereum Light Client. -package les - -import ( - "context" - "crypto/rand" - "encoding/binary" - "fmt" - "sync" - "time" - - "github.com/dexon-foundation/dexon/common/mclock" - "github.com/dexon-foundation/dexon/light" -) - -var ( - retryQueue = time.Millisecond * 100 - softRequestTimeout = time.Millisecond * 500 - hardRequestTimeout = time.Second * 10 -) - -// retrieveManager is a layer on top of requestDistributor which takes care of -// matching replies by request ID and handles timeouts and resends if necessary. -type retrieveManager struct { - dist *requestDistributor - peers *peerSet - serverPool peerSelector - - lock sync.RWMutex - sentReqs map[uint64]*sentReq -} - -// validatorFunc is a function that processes a reply message -type validatorFunc func(distPeer, *Msg) error - -// peerSelector receives feedback info about response times and timeouts -type peerSelector interface { - adjustResponseTime(*poolEntry, time.Duration, bool) -} - -// sentReq represents a request sent and tracked by retrieveManager -type sentReq struct { - rm *retrieveManager - req *distReq - id uint64 - validate validatorFunc - - eventsCh chan reqPeerEvent - stopCh chan struct{} - stopped bool - err error - - lock sync.RWMutex // protect access to sentTo map - sentTo map[distPeer]sentReqToPeer - - lastReqQueued bool // last request has been queued but not sent - lastReqSentTo distPeer // if not nil then last request has been sent to given peer but not timed out - reqSrtoCount int // number of requests that reached soft (but not hard) timeout -} - -// sentReqToPeer notifies the request-from-peer goroutine (tryRequest) about a response -// delivered by the given peer. Only one delivery is allowed per request per peer, -// after which delivered is set to true, the validity of the response is sent on the -// valid channel and no more responses are accepted. -type sentReqToPeer struct { - delivered bool - valid chan bool -} - -// reqPeerEvent is sent by the request-from-peer goroutine (tryRequest) to the -// request state machine (retrieveLoop) through the eventsCh channel. -type reqPeerEvent struct { - event int - peer distPeer -} - -const ( - rpSent = iota // if peer == nil, not sent (no suitable peers) - rpSoftTimeout - rpHardTimeout - rpDeliveredValid - rpDeliveredInvalid -) - -// newRetrieveManager creates the retrieve manager -func newRetrieveManager(peers *peerSet, dist *requestDistributor, serverPool peerSelector) *retrieveManager { - return &retrieveManager{ - peers: peers, - dist: dist, - serverPool: serverPool, - sentReqs: make(map[uint64]*sentReq), - } -} - -// retrieve sends a request (to multiple peers if necessary) and waits for an answer -// that is delivered through the deliver function and successfully validated by the -// validator callback. It returns when a valid answer is delivered or the context is -// cancelled. -func (rm *retrieveManager) retrieve(ctx context.Context, reqID uint64, req *distReq, val validatorFunc, shutdown chan struct{}) error { - sentReq := rm.sendReq(reqID, req, val) - select { - case <-sentReq.stopCh: - case <-ctx.Done(): - sentReq.stop(ctx.Err()) - case <-shutdown: - sentReq.stop(fmt.Errorf("Client is shutting down")) - } - return sentReq.getError() -} - -// sendReq starts a process that keeps trying to retrieve a valid answer for a -// request from any suitable peers until stopped or succeeded. -func (rm *retrieveManager) sendReq(reqID uint64, req *distReq, val validatorFunc) *sentReq { - r := &sentReq{ - rm: rm, - req: req, - id: reqID, - sentTo: make(map[distPeer]sentReqToPeer), - stopCh: make(chan struct{}), - eventsCh: make(chan reqPeerEvent, 10), - validate: val, - } - - canSend := req.canSend - req.canSend = func(p distPeer) bool { - // add an extra check to canSend: the request has not been sent to the same peer before - r.lock.RLock() - _, sent := r.sentTo[p] - r.lock.RUnlock() - return !sent && canSend(p) - } - - request := req.request - req.request = func(p distPeer) func() { - // before actually sending the request, put an entry into the sentTo map - r.lock.Lock() - r.sentTo[p] = sentReqToPeer{false, make(chan bool, 1)} - r.lock.Unlock() - return request(p) - } - rm.lock.Lock() - rm.sentReqs[reqID] = r - rm.lock.Unlock() - - go r.retrieveLoop() - return r -} - -// deliver is called by the LES protocol manager to deliver reply messages to waiting requests -func (rm *retrieveManager) deliver(peer distPeer, msg *Msg) error { - rm.lock.RLock() - req, ok := rm.sentReqs[msg.ReqID] - rm.lock.RUnlock() - - if ok { - return req.deliver(peer, msg) - } - return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID) -} - -// reqStateFn represents a state of the retrieve loop state machine -type reqStateFn func() reqStateFn - -// retrieveLoop is the retrieval state machine event loop -func (r *sentReq) retrieveLoop() { - go r.tryRequest() - r.lastReqQueued = true - state := r.stateRequesting - - for state != nil { - state = state() - } - - r.rm.lock.Lock() - delete(r.rm.sentReqs, r.id) - r.rm.lock.Unlock() -} - -// stateRequesting: a request has been queued or sent recently; when it reaches soft timeout, -// a new request is sent to a new peer -func (r *sentReq) stateRequesting() reqStateFn { - select { - case ev := <-r.eventsCh: - r.update(ev) - switch ev.event { - case rpSent: - if ev.peer == nil { - // request send failed, no more suitable peers - if r.waiting() { - // we are already waiting for sent requests which may succeed so keep waiting - return r.stateNoMorePeers - } - // nothing to wait for, no more peers to ask, return with error - r.stop(light.ErrNoPeers) - // no need to go to stopped state because waiting() already returned false - return nil - } - case rpSoftTimeout: - // last request timed out, try asking a new peer - go r.tryRequest() - r.lastReqQueued = true - return r.stateRequesting - case rpDeliveredInvalid: - // if it was the last sent request (set to nil by update) then start a new one - if !r.lastReqQueued && r.lastReqSentTo == nil { - go r.tryRequest() - r.lastReqQueued = true - } - return r.stateRequesting - case rpDeliveredValid: - r.stop(nil) - return r.stateStopped - } - return r.stateRequesting - case <-r.stopCh: - return r.stateStopped - } -} - -// stateNoMorePeers: could not send more requests because no suitable peers are available. -// Peers may become suitable for a certain request later or new peers may appear so we -// keep trying. -func (r *sentReq) stateNoMorePeers() reqStateFn { - select { - case <-time.After(retryQueue): - go r.tryRequest() - r.lastReqQueued = true - return r.stateRequesting - case ev := <-r.eventsCh: - r.update(ev) - if ev.event == rpDeliveredValid { - r.stop(nil) - return r.stateStopped - } - if r.waiting() { - return r.stateNoMorePeers - } - r.stop(light.ErrNoPeers) - return nil - case <-r.stopCh: - return r.stateStopped - } -} - -// stateStopped: request succeeded or cancelled, just waiting for some peers -// to either answer or time out hard -func (r *sentReq) stateStopped() reqStateFn { - for r.waiting() { - r.update(<-r.eventsCh) - } - return nil -} - -// update updates the queued/sent flags and timed out peers counter according to the event -func (r *sentReq) update(ev reqPeerEvent) { - switch ev.event { - case rpSent: - r.lastReqQueued = false - r.lastReqSentTo = ev.peer - case rpSoftTimeout: - r.lastReqSentTo = nil - r.reqSrtoCount++ - case rpHardTimeout: - r.reqSrtoCount-- - case rpDeliveredValid, rpDeliveredInvalid: - if ev.peer == r.lastReqSentTo { - r.lastReqSentTo = nil - } else { - r.reqSrtoCount-- - } - } -} - -// waiting returns true if the retrieval mechanism is waiting for an answer from -// any peer -func (r *sentReq) waiting() bool { - return r.lastReqQueued || r.lastReqSentTo != nil || r.reqSrtoCount > 0 -} - -// tryRequest tries to send the request to a new peer and waits for it to either -// succeed or time out if it has been sent. It also sends the appropriate reqPeerEvent -// messages to the request's event channel. -func (r *sentReq) tryRequest() { - sent := r.rm.dist.queue(r.req) - var p distPeer - select { - case p = <-sent: - case <-r.stopCh: - if r.rm.dist.cancel(r.req) { - p = nil - } else { - p = <-sent - } - } - - r.eventsCh <- reqPeerEvent{rpSent, p} - if p == nil { - return - } - - reqSent := mclock.Now() - srto, hrto := false, false - - r.lock.RLock() - s, ok := r.sentTo[p] - r.lock.RUnlock() - if !ok { - panic(nil) - } - - defer func() { - // send feedback to server pool and remove peer if hard timeout happened - pp, ok := p.(*peer) - if ok && r.rm.serverPool != nil { - respTime := time.Duration(mclock.Now() - reqSent) - r.rm.serverPool.adjustResponseTime(pp.poolEntry, respTime, srto) - } - if hrto { - pp.Log().Debug("Request timed out hard") - if r.rm.peers != nil { - r.rm.peers.Unregister(pp.id) - } - } - - r.lock.Lock() - delete(r.sentTo, p) - r.lock.Unlock() - }() - - select { - case ok := <-s.valid: - if ok { - r.eventsCh <- reqPeerEvent{rpDeliveredValid, p} - } else { - r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p} - } - return - case <-time.After(softRequestTimeout): - srto = true - r.eventsCh <- reqPeerEvent{rpSoftTimeout, p} - } - - select { - case ok := <-s.valid: - if ok { - r.eventsCh <- reqPeerEvent{rpDeliveredValid, p} - } else { - r.eventsCh <- reqPeerEvent{rpDeliveredInvalid, p} - } - case <-time.After(hardRequestTimeout): - hrto = true - r.eventsCh <- reqPeerEvent{rpHardTimeout, p} - } -} - -// deliver a reply belonging to this request -func (r *sentReq) deliver(peer distPeer, msg *Msg) error { - r.lock.Lock() - defer r.lock.Unlock() - - s, ok := r.sentTo[peer] - if !ok || s.delivered { - return errResp(ErrUnexpectedResponse, "reqID = %v", msg.ReqID) - } - valid := r.validate(peer, msg) == nil - r.sentTo[peer] = sentReqToPeer{true, s.valid} - s.valid <- valid - if !valid { - return errResp(ErrInvalidResponse, "reqID = %v", msg.ReqID) - } - return nil -} - -// stop stops the retrieval process and sets an error code that will be returned -// by getError -func (r *sentReq) stop(err error) { - r.lock.Lock() - if !r.stopped { - r.stopped = true - r.err = err - close(r.stopCh) - } - r.lock.Unlock() -} - -// getError returns any retrieval error (either internally generated or set by the -// stop function) after stopCh has been closed -func (r *sentReq) getError() error { - return r.err -} - -// genReqID generates a new random request ID -func genReqID() uint64 { - var rnd [8]byte - rand.Read(rnd[:]) - return binary.BigEndian.Uint64(rnd[:]) -} diff --git a/les/server.go b/les/server.go deleted file mode 100644 index d681ba359..000000000 --- a/les/server.go +++ /dev/null @@ -1,387 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package les implements the Light Ethereum Subprotocol. -package les - -import ( - "crypto/ecdsa" - "encoding/binary" - "math" - "sync" - - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/core" - "github.com/dexon-foundation/dexon/core/rawdb" - "github.com/dexon-foundation/dexon/core/types" - "github.com/dexon-foundation/dexon/eth" - "github.com/dexon-foundation/dexon/ethdb" - "github.com/dexon-foundation/dexon/les/flowcontrol" - "github.com/dexon-foundation/dexon/light" - "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/p2p" - "github.com/dexon-foundation/dexon/p2p/discv5" - "github.com/dexon-foundation/dexon/params" - "github.com/dexon-foundation/dexon/rlp" -) - -type LesServer struct { - lesCommons - - fcManager *flowcontrol.ClientManager // nil if our node is client only - fcCostStats *requestCostStats - defParams *flowcontrol.ServerParams - lesTopics []discv5.Topic - privateKey *ecdsa.PrivateKey - quitSync chan struct{} -} - -func NewLesServer(eth *eth.Ethereum, config *eth.Config) (*LesServer, error) { - quitSync := make(chan struct{}) - pm, err := NewProtocolManager(eth.BlockChain().Config(), light.DefaultServerIndexerConfig, false, config.NetworkId, eth.EventMux(), eth.Engine(), newPeerSet(), eth.BlockChain(), eth.TxPool(), eth.ChainDb(), nil, nil, nil, quitSync, new(sync.WaitGroup)) - if err != nil { - return nil, err - } - - lesTopics := make([]discv5.Topic, len(AdvertiseProtocolVersions)) - for i, pv := range AdvertiseProtocolVersions { - lesTopics[i] = lesTopic(eth.BlockChain().Genesis().Hash(), pv) - } - - srv := &LesServer{ - lesCommons: lesCommons{ - config: config, - chainDb: eth.ChainDb(), - iConfig: light.DefaultServerIndexerConfig, - chtIndexer: light.NewChtIndexer(eth.ChainDb(), nil, params.CHTFrequencyServer, params.HelperTrieProcessConfirmations), - bloomTrieIndexer: light.NewBloomTrieIndexer(eth.ChainDb(), nil, params.BloomBitsBlocks, params.BloomTrieFrequency), - protocolManager: pm, - }, - quitSync: quitSync, - lesTopics: lesTopics, - } - - logger := log.New() - - chtV1SectionCount, _, _ := srv.chtIndexer.Sections() // indexer still uses LES/1 4k section size for backwards server compatibility - chtV2SectionCount := chtV1SectionCount / (params.CHTFrequencyClient / params.CHTFrequencyServer) - if chtV2SectionCount != 0 { - // convert to LES/2 section - chtLastSection := chtV2SectionCount - 1 - // convert last LES/2 section index back to LES/1 index for chtIndexer.SectionHead - chtLastSectionV1 := (chtLastSection+1)*(params.CHTFrequencyClient/params.CHTFrequencyServer) - 1 - chtSectionHead := srv.chtIndexer.SectionHead(chtLastSectionV1) - chtRoot := light.GetChtRoot(pm.chainDb, chtLastSectionV1, chtSectionHead) - logger.Info("Loaded CHT", "section", chtLastSection, "head", chtSectionHead, "root", chtRoot) - } - bloomTrieSectionCount, _, _ := srv.bloomTrieIndexer.Sections() - if bloomTrieSectionCount != 0 { - bloomTrieLastSection := bloomTrieSectionCount - 1 - bloomTrieSectionHead := srv.bloomTrieIndexer.SectionHead(bloomTrieLastSection) - bloomTrieRoot := light.GetBloomTrieRoot(pm.chainDb, bloomTrieLastSection, bloomTrieSectionHead) - logger.Info("Loaded bloom trie", "section", bloomTrieLastSection, "head", bloomTrieSectionHead, "root", bloomTrieRoot) - } - - srv.chtIndexer.Start(eth.BlockChain()) - pm.server = srv - - srv.defParams = &flowcontrol.ServerParams{ - BufLimit: 300000000, - MinRecharge: 50000, - } - srv.fcManager = flowcontrol.NewClientManager(uint64(config.LightServ), 10, 1000000000) - srv.fcCostStats = newCostStats(eth.ChainDb()) - return srv, nil -} - -func (s *LesServer) Protocols() []p2p.Protocol { - return s.makeProtocols(ServerProtocolVersions) -} - -// Start starts the LES server -func (s *LesServer) Start(srvr *p2p.Server) { - s.protocolManager.Start(s.config.LightPeers) - if srvr.DiscV5 != nil { - for _, topic := range s.lesTopics { - topic := topic - go func() { - logger := log.New("topic", topic) - logger.Info("Starting topic registration") - defer logger.Info("Terminated topic registration") - - srvr.DiscV5.RegisterTopic(topic, s.quitSync) - }() - } - } - s.privateKey = srvr.PrivateKey - s.protocolManager.blockLoop() -} - -func (s *LesServer) SetBloomBitsIndexer(bloomIndexer *core.ChainIndexer) { - bloomIndexer.AddChildIndexer(s.bloomTrieIndexer) -} - -// Stop stops the LES service -func (s *LesServer) Stop() { - s.chtIndexer.Close() - // bloom trie indexer is closed by parent bloombits indexer - s.fcCostStats.store() - s.fcManager.Stop() - go func() { - <-s.protocolManager.noMorePeers - }() - s.protocolManager.Stop() -} - -type requestCosts struct { - baseCost, reqCost uint64 -} - -type requestCostTable map[uint64]*requestCosts - -type RequestCostList []struct { - MsgCode, BaseCost, ReqCost uint64 -} - -func (list RequestCostList) decode() requestCostTable { - table := make(requestCostTable) - for _, e := range list { - table[e.MsgCode] = &requestCosts{ - baseCost: e.BaseCost, - reqCost: e.ReqCost, - } - } - return table -} - -type linReg struct { - sumX, sumY, sumXX, sumXY float64 - cnt uint64 -} - -const linRegMaxCnt = 100000 - -func (l *linReg) add(x, y float64) { - if l.cnt >= linRegMaxCnt { - sub := float64(l.cnt+1-linRegMaxCnt) / linRegMaxCnt - l.sumX -= l.sumX * sub - l.sumY -= l.sumY * sub - l.sumXX -= l.sumXX * sub - l.sumXY -= l.sumXY * sub - l.cnt = linRegMaxCnt - 1 - } - l.cnt++ - l.sumX += x - l.sumY += y - l.sumXX += x * x - l.sumXY += x * y -} - -func (l *linReg) calc() (b, m float64) { - if l.cnt == 0 { - return 0, 0 - } - cnt := float64(l.cnt) - d := cnt*l.sumXX - l.sumX*l.sumX - if d < 0.001 { - return l.sumY / cnt, 0 - } - m = (cnt*l.sumXY - l.sumX*l.sumY) / d - b = (l.sumY / cnt) - (m * l.sumX / cnt) - return b, m -} - -func (l *linReg) toBytes() []byte { - var arr [40]byte - binary.BigEndian.PutUint64(arr[0:8], math.Float64bits(l.sumX)) - binary.BigEndian.PutUint64(arr[8:16], math.Float64bits(l.sumY)) - binary.BigEndian.PutUint64(arr[16:24], math.Float64bits(l.sumXX)) - binary.BigEndian.PutUint64(arr[24:32], math.Float64bits(l.sumXY)) - binary.BigEndian.PutUint64(arr[32:40], l.cnt) - return arr[:] -} - -func linRegFromBytes(data []byte) *linReg { - if len(data) != 40 { - return nil - } - l := &linReg{} - l.sumX = math.Float64frombits(binary.BigEndian.Uint64(data[0:8])) - l.sumY = math.Float64frombits(binary.BigEndian.Uint64(data[8:16])) - l.sumXX = math.Float64frombits(binary.BigEndian.Uint64(data[16:24])) - l.sumXY = math.Float64frombits(binary.BigEndian.Uint64(data[24:32])) - l.cnt = binary.BigEndian.Uint64(data[32:40]) - return l -} - -type requestCostStats struct { - lock sync.RWMutex - db ethdb.Database - stats map[uint64]*linReg -} - -type requestCostStatsRlp []struct { - MsgCode uint64 - Data []byte -} - -var rcStatsKey = []byte("_requestCostStats") - -func newCostStats(db ethdb.Database) *requestCostStats { - stats := make(map[uint64]*linReg) - for _, code := range reqList { - stats[code] = &linReg{cnt: 100} - } - - if db != nil { - data, err := db.Get(rcStatsKey) - var statsRlp requestCostStatsRlp - if err == nil { - err = rlp.DecodeBytes(data, &statsRlp) - } - if err == nil { - for _, r := range statsRlp { - if stats[r.MsgCode] != nil { - if l := linRegFromBytes(r.Data); l != nil { - stats[r.MsgCode] = l - } - } - } - } - } - - return &requestCostStats{ - db: db, - stats: stats, - } -} - -func (s *requestCostStats) store() { - s.lock.Lock() - defer s.lock.Unlock() - - statsRlp := make(requestCostStatsRlp, len(reqList)) - for i, code := range reqList { - statsRlp[i].MsgCode = code - statsRlp[i].Data = s.stats[code].toBytes() - } - - if data, err := rlp.EncodeToBytes(statsRlp); err == nil { - s.db.Put(rcStatsKey, data) - } -} - -func (s *requestCostStats) getCurrentList() RequestCostList { - s.lock.Lock() - defer s.lock.Unlock() - - list := make(RequestCostList, len(reqList)) - //fmt.Println("RequestCostList") - for idx, code := range reqList { - b, m := s.stats[code].calc() - //fmt.Println(code, s.stats[code].cnt, b/1000000, m/1000000) - if m < 0 { - b += m - m = 0 - } - if b < 0 { - b = 0 - } - - list[idx].MsgCode = code - list[idx].BaseCost = uint64(b * 2) - list[idx].ReqCost = uint64(m * 2) - } - return list -} - -func (s *requestCostStats) update(msgCode, reqCnt, cost uint64) { - s.lock.Lock() - defer s.lock.Unlock() - - c, ok := s.stats[msgCode] - if !ok || reqCnt == 0 { - return - } - c.add(float64(reqCnt), float64(cost)) -} - -func (pm *ProtocolManager) blockLoop() { - pm.wg.Add(1) - headCh := make(chan core.ChainHeadEvent, 10) - headSub := pm.blockchain.SubscribeChainHeadEvent(headCh) - go func() { - var lastHead *types.Header - lastBroadcastTd := common.Big0 - for { - select { - case ev := <-headCh: - peers := pm.peers.AllPeers() - if len(peers) > 0 { - header := ev.Block.Header() - hash := header.Hash() - number := header.Number.Uint64() - td := rawdb.ReadTd(pm.chainDb, hash, number) - if td != nil && td.Cmp(lastBroadcastTd) > 0 { - var reorg uint64 - if lastHead != nil { - reorg = lastHead.Number.Uint64() - rawdb.FindCommonAncestor(pm.chainDb, header, lastHead).Number.Uint64() - } - lastHead = header - lastBroadcastTd = td - - log.Debug("Announcing block to peers", "number", number, "hash", hash, "td", td, "reorg", reorg) - - announce := announceData{Hash: hash, Number: number, Td: td, ReorgDepth: reorg} - var ( - signed bool - signedAnnounce announceData - ) - - for _, p := range peers { - switch p.announceType { - - case announceTypeSimple: - select { - case p.announceChn <- announce: - default: - pm.removePeer(p.id) - } - - case announceTypeSigned: - if !signed { - signedAnnounce = announce - signedAnnounce.sign(pm.server.privateKey) - signed = true - } - - select { - case p.announceChn <- signedAnnounce: - default: - pm.removePeer(p.id) - } - } - } - } - } - case <-pm.quitSync: - headSub.Unsubscribe() - pm.wg.Done() - return - } - } - }() -} diff --git a/les/serverpool.go b/les/serverpool.go deleted file mode 100644 index 5990344f4..000000000 --- a/les/serverpool.go +++ /dev/null @@ -1,855 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -// Package les implements the Light Ethereum Subprotocol. -package les - -import ( - "crypto/ecdsa" - "fmt" - "io" - "math" - "math/rand" - "net" - "strconv" - "sync" - "time" - - "github.com/dexon-foundation/dexon/common/mclock" - "github.com/dexon-foundation/dexon/crypto" - "github.com/dexon-foundation/dexon/ethdb" - "github.com/dexon-foundation/dexon/log" - "github.com/dexon-foundation/dexon/p2p" - "github.com/dexon-foundation/dexon/p2p/discv5" - "github.com/dexon-foundation/dexon/p2p/enode" - "github.com/dexon-foundation/dexon/rlp" -) - -const ( - // After a connection has been ended or timed out, there is a waiting period - // before it can be selected for connection again. - // waiting period = base delay * (1 + random(1)) - // base delay = shortRetryDelay for the first shortRetryCnt times after a - // successful connection, after that longRetryDelay is applied - shortRetryCnt = 5 - shortRetryDelay = time.Second * 5 - longRetryDelay = time.Minute * 10 - // maxNewEntries is the maximum number of newly discovered (never connected) nodes. - // If the limit is reached, the least recently discovered one is thrown out. - maxNewEntries = 1000 - // maxKnownEntries is the maximum number of known (already connected) nodes. - // If the limit is reached, the least recently connected one is thrown out. - // (not that unlike new entries, known entries are persistent) - maxKnownEntries = 1000 - // target for simultaneously connected servers - targetServerCount = 5 - // target for servers selected from the known table - // (we leave room for trying new ones if there is any) - targetKnownSelect = 3 - // after dialTimeout, consider the server unavailable and adjust statistics - dialTimeout = time.Second * 30 - // targetConnTime is the minimum expected connection duration before a server - // drops a client without any specific reason - targetConnTime = time.Minute * 10 - // new entry selection weight calculation based on most recent discovery time: - // unity until discoverExpireStart, then exponential decay with discoverExpireConst - discoverExpireStart = time.Minute * 20 - discoverExpireConst = time.Minute * 20 - // known entry selection weight is dropped by a factor of exp(-failDropLn) after - // each unsuccessful connection (restored after a successful one) - failDropLn = 0.1 - // known node connection success and quality statistics have a long term average - // and a short term value which is adjusted exponentially with a factor of - // pstatRecentAdjust with each dial/connection and also returned exponentially - // to the average with the time constant pstatReturnToMeanTC - pstatReturnToMeanTC = time.Hour - // node address selection weight is dropped by a factor of exp(-addrFailDropLn) after - // each unsuccessful connection (restored after a successful one) - addrFailDropLn = math.Ln2 - // responseScoreTC and delayScoreTC are exponential decay time constants for - // calculating selection chances from response times and block delay times - responseScoreTC = time.Millisecond * 100 - delayScoreTC = time.Second * 5 - timeoutPow = 10 - // initStatsWeight is used to initialize previously unknown peers with good - // statistics to give a chance to prove themselves - initStatsWeight = 1 -) - -// connReq represents a request for peer connection. -type connReq struct { - p *peer - node *enode.Node - result chan *poolEntry -} - -// disconnReq represents a request for peer disconnection. -type disconnReq struct { - entry *poolEntry - stopped bool - done chan struct{} -} - -// registerReq represents a request for peer registration. -type registerReq struct { - entry *poolEntry - done chan struct{} -} - -// serverPool implements a pool for storing and selecting newly discovered and already -// known light server nodes. It received discovered nodes, stores statistics about -// known nodes and takes care of always having enough good quality servers connected. -type serverPool struct { - db ethdb.Database - dbKey []byte - server *p2p.Server - quit chan struct{} - wg *sync.WaitGroup - connWg sync.WaitGroup - - topic discv5.Topic - - discSetPeriod chan time.Duration - discNodes chan *enode.Node - discLookups chan bool - - entries map[enode.ID]*poolEntry - timeout, enableRetry chan *poolEntry - adjustStats chan poolStatAdjust - - connCh chan *connReq - disconnCh chan *disconnReq - registerCh chan *registerReq - - knownQueue, newQueue poolEntryQueue - knownSelect, newSelect *weightedRandomSelect - knownSelected, newSelected int - fastDiscover bool -} - -// newServerPool creates a new serverPool instance -func newServerPool(db ethdb.Database, quit chan struct{}, wg *sync.WaitGroup) *serverPool { - pool := &serverPool{ - db: db, - quit: quit, - wg: wg, - entries: make(map[enode.ID]*poolEntry), - timeout: make(chan *poolEntry, 1), - adjustStats: make(chan poolStatAdjust, 100), - enableRetry: make(chan *poolEntry, 1), - connCh: make(chan *connReq), - disconnCh: make(chan *disconnReq), - registerCh: make(chan *registerReq), - knownSelect: newWeightedRandomSelect(), - newSelect: newWeightedRandomSelect(), - fastDiscover: true, - } - pool.knownQueue = newPoolEntryQueue(maxKnownEntries, pool.removeEntry) - pool.newQueue = newPoolEntryQueue(maxNewEntries, pool.removeEntry) - return pool -} - -func (pool *serverPool) start(server *p2p.Server, topic discv5.Topic) { - pool.server = server - pool.topic = topic - pool.dbKey = append([]byte("serverPool/"), []byte(topic)...) - pool.wg.Add(1) - pool.loadNodes() - - if pool.server.DiscV5 != nil { - pool.discSetPeriod = make(chan time.Duration, 1) - pool.discNodes = make(chan *enode.Node, 100) - pool.discLookups = make(chan bool, 100) - go pool.discoverNodes() - } - pool.checkDial() - go pool.eventLoop() -} - -// discoverNodes wraps SearchTopic, converting result nodes to enode.Node. -func (pool *serverPool) discoverNodes() { - ch := make(chan *discv5.Node) - go func() { - pool.server.DiscV5.SearchTopic(pool.topic, pool.discSetPeriod, ch, pool.discLookups) - close(ch) - }() - for n := range ch { - pubkey, err := decodePubkey64(n.ID[:]) - if err != nil { - continue - } - pool.discNodes <- enode.NewV4(pubkey, n.IP, int(n.TCP), int(n.UDP)) - } -} - -// connect should be called upon any incoming connection. If the connection has been -// dialed by the server pool recently, the appropriate pool entry is returned. -// Otherwise, the connection should be rejected. -// Note that whenever a connection has been accepted and a pool entry has been returned, -// disconnect should also always be called. -func (pool *serverPool) connect(p *peer, node *enode.Node) *poolEntry { - log.Debug("Connect new entry", "enode", p.id) - req := &connReq{p: p, node: node, result: make(chan *poolEntry, 1)} - select { - case pool.connCh <- req: - case <-pool.quit: - return nil - } - return <-req.result -} - -// registered should be called after a successful handshake -func (pool *serverPool) registered(entry *poolEntry) { - log.Debug("Registered new entry", "enode", entry.node.ID()) - req := ®isterReq{entry: entry, done: make(chan struct{})} - select { - case pool.registerCh <- req: - case <-pool.quit: - return - } - <-req.done -} - -// disconnect should be called when ending a connection. Service quality statistics -// can be updated optionally (not updated if no registration happened, in this case -// only connection statistics are updated, just like in case of timeout) -func (pool *serverPool) disconnect(entry *poolEntry) { - stopped := false - select { - case <-pool.quit: - stopped = true - default: - } - log.Debug("Disconnected old entry", "enode", entry.node.ID()) - req := &disconnReq{entry: entry, stopped: stopped, done: make(chan struct{})} - - // Block until disconnection request is served. - pool.disconnCh <- req - <-req.done -} - -const ( - pseBlockDelay = iota - pseResponseTime - pseResponseTimeout -) - -// poolStatAdjust records are sent to adjust peer block delay/response time statistics -type poolStatAdjust struct { - adjustType int - entry *poolEntry - time time.Duration -} - -// adjustBlockDelay adjusts the block announce delay statistics of a node -func (pool *serverPool) adjustBlockDelay(entry *poolEntry, time time.Duration) { - if entry == nil { - return - } - pool.adjustStats <- poolStatAdjust{pseBlockDelay, entry, time} -} - -// adjustResponseTime adjusts the request response time statistics of a node -func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, timeout bool) { - if entry == nil { - return - } - if timeout { - pool.adjustStats <- poolStatAdjust{pseResponseTimeout, entry, time} - } else { - pool.adjustStats <- poolStatAdjust{pseResponseTime, entry, time} - } -} - -// eventLoop handles pool events and mutex locking for all internal functions -func (pool *serverPool) eventLoop() { - lookupCnt := 0 - var convTime mclock.AbsTime - if pool.discSetPeriod != nil { - pool.discSetPeriod <- time.Millisecond * 100 - } - - // disconnect updates service quality statistics depending on the connection time - // and disconnection initiator. - disconnect := func(req *disconnReq, stopped bool) { - // Handle peer disconnection requests. - entry := req.entry - if entry.state == psRegistered { - connAdjust := float64(mclock.Now()-entry.regTime) / float64(targetConnTime) - if connAdjust > 1 { - connAdjust = 1 - } - if stopped { - // disconnect requested by ourselves. - entry.connectStats.add(1, connAdjust) - } else { - // disconnect requested by server side. - entry.connectStats.add(connAdjust, 1) - } - } - entry.state = psNotConnected - - if entry.knownSelected { - pool.knownSelected-- - } else { - pool.newSelected-- - } - pool.setRetryDial(entry) - pool.connWg.Done() - close(req.done) - } - - for { - select { - case entry := <-pool.timeout: - if !entry.removed { - pool.checkDialTimeout(entry) - } - - case entry := <-pool.enableRetry: - if !entry.removed { - entry.delayedRetry = false - pool.updateCheckDial(entry) - } - - case adj := <-pool.adjustStats: - switch adj.adjustType { - case pseBlockDelay: - adj.entry.delayStats.add(float64(adj.time), 1) - case pseResponseTime: - adj.entry.responseStats.add(float64(adj.time), 1) - adj.entry.timeoutStats.add(0, 1) - case pseResponseTimeout: - adj.entry.timeoutStats.add(1, 1) - } - - case node := <-pool.discNodes: - entry := pool.findOrNewNode(node) - pool.updateCheckDial(entry) - - case conv := <-pool.discLookups: - if conv { - if lookupCnt == 0 { - convTime = mclock.Now() - } - lookupCnt++ - if pool.fastDiscover && (lookupCnt == 50 || time.Duration(mclock.Now()-convTime) > time.Minute) { - pool.fastDiscover = false - if pool.discSetPeriod != nil { - pool.discSetPeriod <- time.Minute - } - } - } - - case req := <-pool.connCh: - // Handle peer connection requests. - entry := pool.entries[req.p.ID()] - if entry == nil { - entry = pool.findOrNewNode(req.node) - } - if entry.state == psConnected || entry.state == psRegistered { - req.result <- nil - continue - } - pool.connWg.Add(1) - entry.peer = req.p - entry.state = psConnected - addr := &poolEntryAddress{ - ip: req.node.IP(), - port: uint16(req.node.TCP()), - lastSeen: mclock.Now(), - } - entry.lastConnected = addr - entry.addr = make(map[string]*poolEntryAddress) - entry.addr[addr.strKey()] = addr - entry.addrSelect = *newWeightedRandomSelect() - entry.addrSelect.update(addr) - req.result <- entry - - case req := <-pool.registerCh: - // Handle peer registration requests. - entry := req.entry - entry.state = psRegistered - entry.regTime = mclock.Now() - if !entry.known { - pool.newQueue.remove(entry) - entry.known = true - } - pool.knownQueue.setLatest(entry) - entry.shortRetry = shortRetryCnt - close(req.done) - - case req := <-pool.disconnCh: - // Handle peer disconnection requests. - disconnect(req, req.stopped) - - case <-pool.quit: - if pool.discSetPeriod != nil { - close(pool.discSetPeriod) - } - - // Spawn a goroutine to close the disconnCh after all connections are disconnected. - go func() { - pool.connWg.Wait() - close(pool.disconnCh) - }() - - // Handle all remaining disconnection requests before exit. - for req := range pool.disconnCh { - disconnect(req, true) - } - pool.saveNodes() - pool.wg.Done() - return - } - } -} - -func (pool *serverPool) findOrNewNode(node *enode.Node) *poolEntry { - now := mclock.Now() - entry := pool.entries[node.ID()] - if entry == nil { - log.Debug("Discovered new entry", "id", node.ID()) - entry = &poolEntry{ - node: node, - addr: make(map[string]*poolEntryAddress), - addrSelect: *newWeightedRandomSelect(), - shortRetry: shortRetryCnt, - } - pool.entries[node.ID()] = entry - // initialize previously unknown peers with good statistics to give a chance to prove themselves - entry.connectStats.add(1, initStatsWeight) - entry.delayStats.add(0, initStatsWeight) - entry.responseStats.add(0, initStatsWeight) - entry.timeoutStats.add(0, initStatsWeight) - } - entry.lastDiscovered = now - addr := &poolEntryAddress{ip: node.IP(), port: uint16(node.TCP())} - if a, ok := entry.addr[addr.strKey()]; ok { - addr = a - } else { - entry.addr[addr.strKey()] = addr - } - addr.lastSeen = now - entry.addrSelect.update(addr) - if !entry.known { - pool.newQueue.setLatest(entry) - } - return entry -} - -// loadNodes loads known nodes and their statistics from the database -func (pool *serverPool) loadNodes() { - enc, err := pool.db.Get(pool.dbKey) - if err != nil { - return - } - var list []*poolEntry - err = rlp.DecodeBytes(enc, &list) - if err != nil { - log.Debug("Failed to decode node list", "err", err) - return - } - for _, e := range list { - log.Debug("Loaded server stats", "id", e.node.ID(), "fails", e.lastConnected.fails, - "conn", fmt.Sprintf("%v/%v", e.connectStats.avg, e.connectStats.weight), - "delay", fmt.Sprintf("%v/%v", time.Duration(e.delayStats.avg), e.delayStats.weight), - "response", fmt.Sprintf("%v/%v", time.Duration(e.responseStats.avg), e.responseStats.weight), - "timeout", fmt.Sprintf("%v/%v", e.timeoutStats.avg, e.timeoutStats.weight)) - pool.entries[e.node.ID()] = e - pool.knownQueue.setLatest(e) - pool.knownSelect.update((*knownEntry)(e)) - } -} - -// saveNodes saves known nodes and their statistics into the database. Nodes are -// ordered from least to most recently connected. -func (pool *serverPool) saveNodes() { - list := make([]*poolEntry, len(pool.knownQueue.queue)) - for i := range list { - list[i] = pool.knownQueue.fetchOldest() - } - enc, err := rlp.EncodeToBytes(list) - if err == nil { - pool.db.Put(pool.dbKey, enc) - } -} - -// removeEntry removes a pool entry when the entry count limit is reached. -// Note that it is called by the new/known queues from which the entry has already -// been removed so removing it from the queues is not necessary. -func (pool *serverPool) removeEntry(entry *poolEntry) { - pool.newSelect.remove((*discoveredEntry)(entry)) - pool.knownSelect.remove((*knownEntry)(entry)) - entry.removed = true - delete(pool.entries, entry.node.ID()) -} - -// setRetryDial starts the timer which will enable dialing a certain node again -func (pool *serverPool) setRetryDial(entry *poolEntry) { - delay := longRetryDelay - if entry.shortRetry > 0 { - entry.shortRetry-- - delay = shortRetryDelay - } - delay += time.Duration(rand.Int63n(int64(delay) + 1)) - entry.delayedRetry = true - go func() { - select { - case <-pool.quit: - case <-time.After(delay): - select { - case <-pool.quit: - case pool.enableRetry <- entry: - } - } - }() -} - -// updateCheckDial is called when an entry can potentially be dialed again. It updates -// its selection weights and checks if new dials can/should be made. -func (pool *serverPool) updateCheckDial(entry *poolEntry) { - pool.newSelect.update((*discoveredEntry)(entry)) - pool.knownSelect.update((*knownEntry)(entry)) - pool.checkDial() -} - -// checkDial checks if new dials can/should be made. It tries to select servers both -// based on good statistics and recent discovery. -func (pool *serverPool) checkDial() { - fillWithKnownSelects := !pool.fastDiscover - for pool.knownSelected < targetKnownSelect { - entry := pool.knownSelect.choose() - if entry == nil { - fillWithKnownSelects = false - break - } - pool.dial((*poolEntry)(entry.(*knownEntry)), true) - } - for pool.knownSelected+pool.newSelected < targetServerCount { - entry := pool.newSelect.choose() - if entry == nil { - break - } - pool.dial((*poolEntry)(entry.(*discoveredEntry)), false) - } - if fillWithKnownSelects { - // no more newly discovered nodes to select and since fast discover period - // is over, we probably won't find more in the near future so select more - // known entries if possible - for pool.knownSelected < targetServerCount { - entry := pool.knownSelect.choose() - if entry == nil { - break - } - pool.dial((*poolEntry)(entry.(*knownEntry)), true) - } - } -} - -// dial initiates a new connection -func (pool *serverPool) dial(entry *poolEntry, knownSelected bool) { - if pool.server == nil || entry.state != psNotConnected { - return - } - entry.state = psDialed - entry.knownSelected = knownSelected - if knownSelected { - pool.knownSelected++ - } else { - pool.newSelected++ - } - addr := entry.addrSelect.choose().(*poolEntryAddress) - log.Debug("Dialing new peer", "lesaddr", entry.node.ID().String()+"@"+addr.strKey(), "set", len(entry.addr), "known", knownSelected) - entry.dialed = addr - go func() { - pool.server.AddPeer(entry.node) - select { - case <-pool.quit: - case <-time.After(dialTimeout): - select { - case <-pool.quit: - case pool.timeout <- entry: - } - } - }() -} - -// checkDialTimeout checks if the node is still in dialed state and if so, resets it -// and adjusts connection statistics accordingly. -func (pool *serverPool) checkDialTimeout(entry *poolEntry) { - if entry.state != psDialed { - return - } - log.Debug("Dial timeout", "lesaddr", entry.node.ID().String()+"@"+entry.dialed.strKey()) - entry.state = psNotConnected - if entry.knownSelected { - pool.knownSelected-- - } else { - pool.newSelected-- - } - entry.connectStats.add(0, 1) - entry.dialed.fails++ - pool.setRetryDial(entry) -} - -const ( - psNotConnected = iota - psDialed - psConnected - psRegistered -) - -// poolEntry represents a server node and stores its current state and statistics. -type poolEntry struct { - peer *peer - pubkey [64]byte // secp256k1 key of the node - addr map[string]*poolEntryAddress - node *enode.Node - lastConnected, dialed *poolEntryAddress - addrSelect weightedRandomSelect - - lastDiscovered mclock.AbsTime - known, knownSelected bool - connectStats, delayStats poolStats - responseStats, timeoutStats poolStats - state int - regTime mclock.AbsTime - queueIdx int - removed bool - - delayedRetry bool - shortRetry int -} - -// poolEntryEnc is the RLP encoding of poolEntry. -type poolEntryEnc struct { - Pubkey []byte - IP net.IP - Port uint16 - Fails uint - CStat, DStat, RStat, TStat poolStats -} - -func (e *poolEntry) EncodeRLP(w io.Writer) error { - return rlp.Encode(w, &poolEntryEnc{ - Pubkey: encodePubkey64(e.node.Pubkey()), - IP: e.lastConnected.ip, - Port: e.lastConnected.port, - Fails: e.lastConnected.fails, - CStat: e.connectStats, - DStat: e.delayStats, - RStat: e.responseStats, - TStat: e.timeoutStats, - }) -} - -func (e *poolEntry) DecodeRLP(s *rlp.Stream) error { - var entry poolEntryEnc - if err := s.Decode(&entry); err != nil { - return err - } - pubkey, err := decodePubkey64(entry.Pubkey) - if err != nil { - return err - } - addr := &poolEntryAddress{ip: entry.IP, port: entry.Port, fails: entry.Fails, lastSeen: mclock.Now()} - e.node = enode.NewV4(pubkey, entry.IP, int(entry.Port), int(entry.Port)) - e.addr = make(map[string]*poolEntryAddress) - e.addr[addr.strKey()] = addr - e.addrSelect = *newWeightedRandomSelect() - e.addrSelect.update(addr) - e.lastConnected = addr - e.connectStats = entry.CStat - e.delayStats = entry.DStat - e.responseStats = entry.RStat - e.timeoutStats = entry.TStat - e.shortRetry = shortRetryCnt - e.known = true - return nil -} - -func encodePubkey64(pub *ecdsa.PublicKey) []byte { - return crypto.FromECDSAPub(pub)[1:] -} - -func decodePubkey64(b []byte) (*ecdsa.PublicKey, error) { - return crypto.UnmarshalPubkey(append([]byte{0x04}, b...)) -} - -// discoveredEntry implements wrsItem -type discoveredEntry poolEntry - -// Weight calculates random selection weight for newly discovered entries -func (e *discoveredEntry) Weight() int64 { - if e.state != psNotConnected || e.delayedRetry { - return 0 - } - t := time.Duration(mclock.Now() - e.lastDiscovered) - if t <= discoverExpireStart { - return 1000000000 - } - return int64(1000000000 * math.Exp(-float64(t-discoverExpireStart)/float64(discoverExpireConst))) -} - -// knownEntry implements wrsItem -type knownEntry poolEntry - -// Weight calculates random selection weight for known entries -func (e *knownEntry) Weight() int64 { - if e.state != psNotConnected || !e.known || e.delayedRetry { - return 0 - } - return int64(1000000000 * e.connectStats.recentAvg() * math.Exp(-float64(e.lastConnected.fails)*failDropLn-e.responseStats.recentAvg()/float64(responseScoreTC)-e.delayStats.recentAvg()/float64(delayScoreTC)) * math.Pow(1-e.timeoutStats.recentAvg(), timeoutPow)) -} - -// poolEntryAddress is a separate object because currently it is necessary to remember -// multiple potential network addresses for a pool entry. This will be removed after -// the final implementation of v5 discovery which will retrieve signed and serial -// numbered advertisements, making it clear which IP/port is the latest one. -type poolEntryAddress struct { - ip net.IP - port uint16 - lastSeen mclock.AbsTime // last time it was discovered, connected or loaded from db - fails uint // connection failures since last successful connection (persistent) -} - -func (a *poolEntryAddress) Weight() int64 { - t := time.Duration(mclock.Now() - a.lastSeen) - return int64(1000000*math.Exp(-float64(t)/float64(discoverExpireConst)-float64(a.fails)*addrFailDropLn)) + 1 -} - -func (a *poolEntryAddress) strKey() string { - return a.ip.String() + ":" + strconv.Itoa(int(a.port)) -} - -// poolStats implement statistics for a certain quantity with a long term average -// and a short term value which is adjusted exponentially with a factor of -// pstatRecentAdjust with each update and also returned exponentially to the -// average with the time constant pstatReturnToMeanTC -type poolStats struct { - sum, weight, avg, recent float64 - lastRecalc mclock.AbsTime -} - -// init initializes stats with a long term sum/update count pair retrieved from the database -func (s *poolStats) init(sum, weight float64) { - s.sum = sum - s.weight = weight - var avg float64 - if weight > 0 { - avg = s.sum / weight - } - s.avg = avg - s.recent = avg - s.lastRecalc = mclock.Now() -} - -// recalc recalculates recent value return-to-mean and long term average -func (s *poolStats) recalc() { - now := mclock.Now() - s.recent = s.avg + (s.recent-s.avg)*math.Exp(-float64(now-s.lastRecalc)/float64(pstatReturnToMeanTC)) - if s.sum == 0 { - s.avg = 0 - } else { - if s.sum > s.weight*1e30 { - s.avg = 1e30 - } else { - s.avg = s.sum / s.weight - } - } - s.lastRecalc = now -} - -// add updates the stats with a new value -func (s *poolStats) add(value, weight float64) { - s.weight += weight - s.sum += value * weight - s.recalc() -} - -// recentAvg returns the short-term adjusted average -func (s *poolStats) recentAvg() float64 { - s.recalc() - return s.recent -} - -func (s *poolStats) EncodeRLP(w io.Writer) error { - return rlp.Encode(w, []interface{}{math.Float64bits(s.sum), math.Float64bits(s.weight)}) -} - -func (s *poolStats) DecodeRLP(st *rlp.Stream) error { - var stats struct { - SumUint, WeightUint uint64 - } - if err := st.Decode(&stats); err != nil { - return err - } - s.init(math.Float64frombits(stats.SumUint), math.Float64frombits(stats.WeightUint)) - return nil -} - -// poolEntryQueue keeps track of its least recently accessed entries and removes -// them when the number of entries reaches the limit -type poolEntryQueue struct { - queue map[int]*poolEntry // known nodes indexed by their latest lastConnCnt value - newPtr, oldPtr, maxCnt int - removeFromPool func(*poolEntry) -} - -// newPoolEntryQueue returns a new poolEntryQueue -func newPoolEntryQueue(maxCnt int, removeFromPool func(*poolEntry)) poolEntryQueue { - return poolEntryQueue{queue: make(map[int]*poolEntry), maxCnt: maxCnt, removeFromPool: removeFromPool} -} - -// fetchOldest returns and removes the least recently accessed entry -func (q *poolEntryQueue) fetchOldest() *poolEntry { - if len(q.queue) == 0 { - return nil - } - for { - if e := q.queue[q.oldPtr]; e != nil { - delete(q.queue, q.oldPtr) - q.oldPtr++ - return e - } - q.oldPtr++ - } -} - -// remove removes an entry from the queue -func (q *poolEntryQueue) remove(entry *poolEntry) { - if q.queue[entry.queueIdx] == entry { - delete(q.queue, entry.queueIdx) - } -} - -// setLatest adds or updates a recently accessed entry. It also checks if an old entry -// needs to be removed and removes it from the parent pool too with a callback function. -func (q *poolEntryQueue) setLatest(entry *poolEntry) { - if q.queue[entry.queueIdx] == entry { - delete(q.queue, entry.queueIdx) - } else { - if len(q.queue) == q.maxCnt { - e := q.fetchOldest() - q.remove(e) - q.removeFromPool(e) - } - } - entry.queueIdx = q.newPtr - q.queue[entry.queueIdx] = entry - q.newPtr++ -} diff --git a/les/sync.go b/les/sync.go deleted file mode 100644 index 42afb3933..000000000 --- a/les/sync.go +++ /dev/null @@ -1,79 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package les - -import ( - "context" - "time" - - "github.com/dexon-foundation/dexon/core/rawdb" - "github.com/dexon-foundation/dexon/eth/downloader" - "github.com/dexon-foundation/dexon/light" -) - -// syncer is responsible for periodically synchronising with the network, both -// downloading hashes and blocks as well as handling the announcement handler. -func (pm *ProtocolManager) syncer() { - // Start and ensure cleanup of sync mechanisms - //pm.fetcher.Start() - //defer pm.fetcher.Stop() - defer pm.downloader.Terminate() - - // Wait for different events to fire synchronisation operations - //forceSync := time.Tick(forceSyncCycle) - for { - select { - case <-pm.newPeerCh: - /* // Make sure we have peers to select from, then sync - if pm.peers.Len() < minDesiredPeerCount { - break - } - go pm.synchronise(pm.peers.BestPeer()) - */ - /*case <-forceSync: - // Force a sync even if not enough peers are present - go pm.synchronise(pm.peers.BestPeer()) - */ - case <-pm.noMorePeers: - return - } - } -} - -func (pm *ProtocolManager) needToSync(peerHead blockInfo) bool { - head := pm.blockchain.CurrentHeader() - currentTd := rawdb.ReadTd(pm.chainDb, head.Hash(), head.Number.Uint64()) - return currentTd != nil && peerHead.Td.Cmp(currentTd) > 0 -} - -// synchronise tries to sync up our local block chain with a remote peer. -func (pm *ProtocolManager) synchronise(peer *peer) { - // Short circuit if no peers are available - if peer == nil { - return - } - - // Make sure the peer's TD is higher than our own. - if !pm.needToSync(peer.headBlockInfo()) { - return - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - pm.blockchain.(*light.LightChain).SyncCht(ctx) - pm.downloader.Synchronise(peer.id, peer.Head(), peer.Td(), downloader.LightSync) -} diff --git a/les/txrelay.go b/les/txrelay.go deleted file mode 100644 index 2d57baead..000000000 --- a/les/txrelay.go +++ /dev/null @@ -1,175 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. - -package les - -import ( - "sync" - - "github.com/dexon-foundation/dexon/common" - "github.com/dexon-foundation/dexon/core/types" -) - -type ltrInfo struct { - tx *types.Transaction - sentTo map[*peer]struct{} -} - -type LesTxRelay struct { - txSent map[common.Hash]*ltrInfo - txPending map[common.Hash]struct{} - ps *peerSet - peerList []*peer - peerStartPos int - lock sync.RWMutex - - reqDist *requestDistributor -} - -func NewLesTxRelay(ps *peerSet, reqDist *requestDistributor) *LesTxRelay { - r := &LesTxRelay{ - txSent: make(map[common.Hash]*ltrInfo), - txPending: make(map[common.Hash]struct{}), - ps: ps, - reqDist: reqDist, - } - ps.notify(r) - return r -} - -func (self *LesTxRelay) registerPeer(p *peer) { - self.lock.Lock() - defer self.lock.Unlock() - - self.peerList = self.ps.AllPeers() -} - -func (self *LesTxRelay) unregisterPeer(p *peer) { - self.lock.Lock() - defer self.lock.Unlock() - - self.peerList = self.ps.AllPeers() -} - -// send sends a list of transactions to at most a given number of peers at -// once, never resending any particular transaction to the same peer twice -func (self *LesTxRelay) send(txs types.Transactions, count int) { - sendTo := make(map[*peer]types.Transactions) - - self.peerStartPos++ // rotate the starting position of the peer list - if self.peerStartPos >= len(self.peerList) { - self.peerStartPos = 0 - } - - for _, tx := range txs { - hash := tx.Hash() - ltr, ok := self.txSent[hash] - if !ok { - ltr = <rInfo{ - tx: tx, - sentTo: make(map[*peer]struct{}), - } - self.txSent[hash] = ltr - self.txPending[hash] = struct{}{} - } - - if len(self.peerList) > 0 { - cnt := count - pos := self.peerStartPos - for { - peer := self.peerList[pos] - if _, ok := ltr.sentTo[peer]; !ok { - sendTo[peer] = append(sendTo[peer], tx) - ltr.sentTo[peer] = struct{}{} - cnt-- - } - if cnt == 0 { - break // sent it to the desired number of peers - } - pos++ - if pos == len(self.peerList) { - pos = 0 - } - if pos == self.peerStartPos { - break // tried all available peers - } - } - } - } - - for p, list := range sendTo { - pp := p - ll := list - - reqID := genReqID() - rq := &distReq{ - getCost: func(dp distPeer) uint64 { - peer := dp.(*peer) - return peer.GetRequestCost(SendTxMsg, len(ll)) - }, - canSend: func(dp distPeer) bool { - return dp.(*peer) == pp - }, - request: func(dp distPeer) func() { - peer := dp.(*peer) - cost := peer.GetRequestCost(SendTxMsg, len(ll)) - peer.fcServer.QueueRequest(reqID, cost) - return func() { peer.SendTxs(reqID, cost, ll) } - }, - } - self.reqDist.queue(rq) - } -} - -func (self *LesTxRelay) Send(txs types.Transactions) { - self.lock.Lock() - defer self.lock.Unlock() - - self.send(txs, 3) -} - -func (self *LesTxRelay) NewHead(head common.Hash, mined []common.Hash, rollback []common.Hash) { - self.lock.Lock() - defer self.lock.Unlock() - - for _, hash := range mined { - delete(self.txPending, hash) - } - - for _, hash := range rollback { - self.txPending[hash] = struct{}{} - } - - if len(self.txPending) > 0 { - txs := make(types.Transactions, len(self.txPending)) - i := 0 - for hash := range self.txPending { - txs[i] = self.txSent[hash].tx - i++ - } - self.send(txs, 1) - } -} - -func (self *LesTxRelay) Discard(hashes []common.Hash) { - self.lock.Lock() - defer self.lock.Unlock() - - for _, hash := range hashes { - delete(self.txSent, hash) - delete(self.txPending, hash) - } -} |