diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2017-10-24 21:19:09 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-10-24 21:19:09 +0800 |
commit | ca376ead88a5a26626a90abdb62f4de7f6313822 (patch) | |
tree | 71d11e3b6cd40d2bf29033b7e23d30d04e086558 /core/tx_pool.go | |
parent | 6d6a5a93370371a33fb815d7ae47b60c7021c86a (diff) | |
download | dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.gz dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.bz2 dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.lz dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.xz dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.zst dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.zip |
les, light: LES/2 protocol version (#14970)
This PR implements the new LES protocol version extensions:
* new and more efficient Merkle proofs reply format (when replying to
a multiple Merkle proofs request, we just send a single set of trie
nodes containing all necessary nodes)
* BBT (BloomBitsTrie) works similarly to the existing CHT and contains
the bloombits search data to speed up log searches
* GetTxStatusMsg returns the inclusion position or the
pending/queued/unknown state of a transaction referenced by hash
* an optional signature of new block data (number/hash/td) can be
included in AnnounceMsg to provide an option for "very light
clients" (mobile/embedded devices) to skip expensive Ethash check
and accept multiple signatures of somewhat trusted servers (still a
lot better than trusting a single server completely and retrieving
everything through RPC). The new client mode is not implemented in
this PR, just the protocol extension.
Diffstat (limited to 'core/tx_pool.go')
-rw-r--r-- | core/tx_pool.go | 106 |
1 files changed, 85 insertions, 21 deletions
diff --git a/core/tx_pool.go b/core/tx_pool.go index a705e36d6..5fdc91e65 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -192,17 +192,22 @@ type TxPool struct { locals *accountSet // Set of local transaction to exepmt from evicion rules journal *txJournal // Journal of local transaction to back up to disk - pending map[common.Address]*txList // All currently processable transactions - queue map[common.Address]*txList // Queued but non-processable transactions - beats map[common.Address]time.Time // Last heartbeat from each known account - all map[common.Hash]*types.Transaction // All transactions to allow lookups - priced *txPricedList // All transactions sorted by price + pending map[common.Address]*txList // All currently processable transactions + queue map[common.Address]*txList // Queued but non-processable transactions + beats map[common.Address]time.Time // Last heartbeat from each known account + all map[common.Hash]txLookupRec // All transactions to allow lookups + priced *txPricedList // All transactions sorted by price wg sync.WaitGroup // for shutdown sync homestead bool } +type txLookupRec struct { + tx *types.Transaction + pending bool +} + // NewTxPool creates a new transaction pool to gather, sort and filter inbound // trnsactions from the network. func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool { @@ -218,7 +223,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), - all: make(map[common.Hash]*types.Transaction), + all: make(map[common.Hash]txLookupRec), chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), gasPrice: new(big.Int).SetUint64(config.PriceLimit), } @@ -594,7 +599,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { // If the transaction is already known, discard it hash := tx.Hash() - if pool.all[hash] != nil { + if _, ok := pool.all[hash]; ok { log.Trace("Discarding already known transaction", "hash", hash) return false, fmt.Errorf("known transaction: %x", hash) } @@ -635,7 +640,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { pool.priced.Removed() pendingReplaceCounter.Inc(1) } - pool.all[tx.Hash()] = tx + pool.all[tx.Hash()] = txLookupRec{tx, false} pool.priced.Put(tx) pool.journalTx(from, tx) @@ -682,7 +687,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er pool.priced.Removed() queuedReplaceCounter.Inc(1) } - pool.all[hash] = tx + pool.all[hash] = txLookupRec{tx, false} pool.priced.Put(tx) return old != nil, nil } @@ -725,10 +730,13 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T pendingReplaceCounter.Inc(1) } - // Failsafe to work around direct pending inserts (tests) - if pool.all[hash] == nil { - pool.all[hash] = tx + if pool.all[hash].tx == nil { + // Failsafe to work around direct pending inserts (tests) + pool.all[hash] = txLookupRec{tx, true} pool.priced.Put(tx) + } else { + // set pending flag to true + pool.all[hash] = txLookupRec{tx, true} } // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() @@ -755,14 +763,16 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error { // marking the senders as a local ones in the mean time, ensuring they go around // the local pricing constraints. func (pool *TxPool) AddLocals(txs []*types.Transaction) error { - return pool.addTxs(txs, !pool.config.NoLocals) + pool.addTxs(txs, !pool.config.NoLocals) + return nil } // AddRemotes enqueues a batch of transactions into the pool if they are valid. // If the senders are not among the locally tracked ones, full pricing constraints // will apply. func (pool *TxPool) AddRemotes(txs []*types.Transaction) error { - return pool.addTxs(txs, false) + pool.addTxs(txs, false) + return nil } // addTx enqueues a single transaction into the pool if it is valid. @@ -784,7 +794,7 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { } // addTxs attempts to queue a batch of transactions if they are valid. -func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { +func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) []error { pool.mu.Lock() defer pool.mu.Unlock() @@ -793,11 +803,13 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { // addTxsLocked attempts to queue a batch of transactions if they are valid, // whilst assuming the transaction pool lock is already held. -func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error { +func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error { // Add the batch of transaction, tracking the accepted ones dirty := make(map[common.Address]struct{}) - for _, tx := range txs { - if replace, err := pool.add(tx, local); err == nil { + txErr := make([]error, len(txs)) + for i, tx := range txs { + var replace bool + if replace, txErr[i] = pool.add(tx, local); txErr[i] == nil { if !replace { from, _ := types.Sender(pool.signer, tx) // already validated dirty[from] = struct{}{} @@ -812,7 +824,58 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error { } pool.promoteExecutables(addrs) } - return nil + return txErr +} + +// TxStatusData is returned by AddOrGetTxStatus for each transaction +type TxStatusData struct { + Status uint + Data []byte +} + +const ( + TxStatusUnknown = iota + TxStatusQueued + TxStatusPending + TxStatusIncluded // Data contains a TxChainPos struct + TxStatusError // Data contains the error string +) + +// AddOrGetTxStatus returns the status (unknown/pending/queued) of a batch of transactions +// identified by their hashes in txHashes. Optionally the transactions themselves can be +// passed too in txs, in which case the function will try adding the previously unknown ones +// to the pool. If a new transaction cannot be added, TxStatusError is returned. Adding already +// known transactions will return their previous status. +// If txs is specified, txHashes is still required and has to match the transactions in txs. + +// Note: TxStatusIncluded is never returned by this function since the pool does not track +// mined transactions. Included status can be checked by the caller (as it happens in the +// LES protocol manager) +func (pool *TxPool) AddOrGetTxStatus(txs []*types.Transaction, txHashes []common.Hash) []TxStatusData { + status := make([]TxStatusData, len(txHashes)) + if txs != nil { + if len(txs) != len(txHashes) { + panic(nil) + } + txErr := pool.addTxs(txs, false) + for i, err := range txErr { + if err != nil { + status[i] = TxStatusData{TxStatusError, ([]byte)(err.Error())} + } + } + } + + for i, hash := range txHashes { + r, ok := pool.all[hash] + if ok { + if r.pending { + status[i] = TxStatusData{TxStatusPending, nil} + } else { + status[i] = TxStatusData{TxStatusQueued, nil} + } + } + } + return status } // Get returns a transaction if it is contained in the pool @@ -821,17 +884,18 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction { pool.mu.RLock() defer pool.mu.RUnlock() - return pool.all[hash] + return pool.all[hash].tx } // removeTx removes a single transaction from the queue, moving all subsequent // transactions back to the future queue. func (pool *TxPool) removeTx(hash common.Hash) { // Fetch the transaction we wish to delete - tx, ok := pool.all[hash] + txl, ok := pool.all[hash] if !ok { return } + tx := txl.tx addr, _ := types.Sender(pool.signer, tx) // already validated during insertion // Remove it from the list of known transactions |