diff options
author | Felföldi Zsolt <zsfelfoldi@gmail.com> | 2017-10-24 21:19:09 +0800 |
---|---|---|
committer | Felix Lange <fjl@users.noreply.github.com> | 2017-10-24 21:19:09 +0800 |
commit | ca376ead88a5a26626a90abdb62f4de7f6313822 (patch) | |
tree | 71d11e3b6cd40d2bf29033b7e23d30d04e086558 /core | |
parent | 6d6a5a93370371a33fb815d7ae47b60c7021c86a (diff) | |
download | dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.gz dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.bz2 dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.lz dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.xz dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.zst dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.zip |
les, light: LES/2 protocol version (#14970)
This PR implements the new LES protocol version extensions:
* new and more efficient Merkle proofs reply format (when replying to
a multiple Merkle proofs request, we just send a single set of trie
nodes containing all necessary nodes)
* BBT (BloomBitsTrie) works similarly to the existing CHT and contains
the bloombits search data to speed up log searches
* GetTxStatusMsg returns the inclusion position or the
pending/queued/unknown state of a transaction referenced by hash
* an optional signature of new block data (number/hash/td) can be
included in AnnounceMsg to provide an option for "very light
clients" (mobile/embedded devices) to skip expensive Ethash check
and accept multiple signatures of somewhat trusted servers (still a
lot better than trusting a single server completely and retrieving
everything through RPC). The new client mode is not implemented in
this PR, just the protocol extension.
Diffstat (limited to 'core')
-rw-r--r-- | core/bloombits/matcher.go | 51 | ||||
-rw-r--r-- | core/bloombits/matcher_test.go | 9 | ||||
-rw-r--r-- | core/chain_indexer.go | 69 | ||||
-rw-r--r-- | core/chain_indexer_test.go | 4 | ||||
-rw-r--r-- | core/database_util.go | 15 | ||||
-rw-r--r-- | core/tx_list.go | 10 | ||||
-rw-r--r-- | core/tx_pool.go | 106 |
7 files changed, 199 insertions, 65 deletions
diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go index e33de018a..32a660337 100644 --- a/core/bloombits/matcher.go +++ b/core/bloombits/matcher.go @@ -18,6 +18,7 @@ package bloombits import ( "bytes" + "context" "errors" "math" "sort" @@ -60,6 +61,8 @@ type Retrieval struct { Bit uint Sections []uint64 Bitsets [][]byte + Error error + Context context.Context } // Matcher is a pipelined system of schedulers and logic matchers which perform @@ -137,7 +140,7 @@ func (m *Matcher) addScheduler(idx uint) { // Start starts the matching process and returns a stream of bloom matches in // a given range of blocks. If there are no more matches in the range, the result // channel is closed. -func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession, error) { +func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) { // Make sure we're not creating concurrent sessions if atomic.SwapUint32(&m.running, 1) == 1 { return nil, errors.New("matcher already running") @@ -149,6 +152,7 @@ func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession matcher: m, quit: make(chan struct{}), kill: make(chan struct{}), + ctx: ctx, } for _, scheduler := range m.schedulers { scheduler.reset() @@ -502,15 +506,28 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) { type MatcherSession struct { matcher *Matcher - quit chan struct{} // Quit channel to request pipeline termination - kill chan struct{} // Term channel to signal non-graceful forced shutdown - pend sync.WaitGroup + quit chan struct{} // Quit channel to request pipeline termination + kill chan struct{} // Term channel to signal non-graceful forced shutdown + ctx context.Context + err error + stopping bool + lock sync.Mutex + pend sync.WaitGroup } // Close stops the matching process and waits for all subprocesses to terminate // before returning. The timeout may be used for graceful shutdown, allowing the // currently running retrievals to complete before this time. -func (s *MatcherSession) Close(timeout time.Duration) { +func (s *MatcherSession) Close() { + s.lock.Lock() + stopping := s.stopping + s.stopping = true + s.lock.Unlock() + // ensure that we only close the session once + if stopping { + return + } + // Bail out if the matcher is not running select { case <-s.quit: @@ -519,10 +536,26 @@ func (s *MatcherSession) Close(timeout time.Duration) { } // Signal termination and wait for all goroutines to tear down close(s.quit) - time.AfterFunc(timeout, func() { close(s.kill) }) + time.AfterFunc(time.Second, func() { close(s.kill) }) s.pend.Wait() } +// setError sets an error and stops the session +func (s *MatcherSession) setError(err error) { + s.lock.Lock() + s.err = err + s.lock.Unlock() + s.Close() +} + +// Error returns an error if one has happened during the session +func (s *MatcherSession) Error() error { + s.lock.Lock() + defer s.lock.Unlock() + + return s.err +} + // AllocateRetrieval assigns a bloom bit index to a client process that can either // immediately reuest and fetch the section contents assigned to this bit or wait // a little while for more sections to be requested. @@ -618,9 +651,13 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan case mux <- request: // Retrieval accepted, something must arrive before we're aborting - request <- &Retrieval{Bit: bit, Sections: sections} + request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx} result := <-request + if result.Error != nil { + s.setError(result.Error) + } + s.DeliverSections(result.Bit, result.Sections, result.Bitsets) } } diff --git a/core/bloombits/matcher_test.go b/core/bloombits/matcher_test.go index 2e15e7aac..0d8544136 100644 --- a/core/bloombits/matcher_test.go +++ b/core/bloombits/matcher_test.go @@ -17,6 +17,7 @@ package bloombits import ( + "context" "math/rand" "sync/atomic" "testing" @@ -144,7 +145,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt quit := make(chan struct{}) matches := make(chan uint64, 16) - session, err := matcher.Start(0, blocks-1, matches) + session, err := matcher.Start(context.Background(), 0, blocks-1, matches) if err != nil { t.Fatalf("failed to stat matcher session: %v", err) } @@ -163,13 +164,13 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt } // If we're testing intermittent mode, abort and restart the pipeline if intermittent { - session.Close(time.Second) + session.Close() close(quit) quit = make(chan struct{}) matches = make(chan uint64, 16) - session, err = matcher.Start(i+1, blocks-1, matches) + session, err = matcher.Start(context.Background(), i+1, blocks-1, matches) if err != nil { t.Fatalf("failed to stat matcher session: %v", err) } @@ -183,7 +184,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match) } // Clean up the session and ensure we match the expected retrieval count - session.Close(time.Second) + session.Close() close(quit) if retrievals != 0 && requested != retrievals { diff --git a/core/chain_indexer.go b/core/chain_indexer.go index f4c207dcc..837c908ab 100644 --- a/core/chain_indexer.go +++ b/core/chain_indexer.go @@ -36,13 +36,14 @@ import ( type ChainIndexerBackend interface { // Reset initiates the processing of a new chain segment, potentially terminating // any partially completed operations (in case of a reorg). - Reset(section uint64) + Reset(section uint64, lastSectionHead common.Hash) error // Process crunches through the next header in the chain segment. The caller // will ensure a sequential order of headers. Process(header *types.Header) - // Commit finalizes the section metadata and stores it into the database. + // Commit finalizes the section metadata and stores it into the database. This + // interface will usually be a batch writer. Commit() error } @@ -100,11 +101,34 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken return c } +// AddKnownSectionHead marks a new section head as known/processed if it is newer +// than the already known best section head +func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) { + c.lock.Lock() + defer c.lock.Unlock() + + if section < c.storedSections { + return + } + c.setSectionHead(section, shead) + c.setValidSections(section + 1) +} + +// IndexerChain interface is used for connecting the indexer to a blockchain +type IndexerChain interface { + CurrentHeader() *types.Header + SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription +} + // Start creates a goroutine to feed chain head events into the indexer for // cascading background processing. Children do not need to be started, they // are notified about new events by their parents. -func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) { - go c.eventLoop(currentHeader, chainEventer) +func (c *ChainIndexer) Start(chain IndexerChain) { + ch := make(chan ChainEvent, 10) + sub := chain.SubscribeChainEvent(ch) + currentHeader := chain.CurrentHeader() + + go c.eventLoop(currentHeader, ch, sub) } // Close tears down all goroutines belonging to the indexer and returns any error @@ -125,12 +149,14 @@ func (c *ChainIndexer) Close() error { errs = append(errs, err) } } + // Close all children for _, child := range c.children { if err := child.Close(); err != nil { errs = append(errs, err) } } + // Return any failures switch { case len(errs) == 0: @@ -147,12 +173,10 @@ func (c *ChainIndexer) Close() error { // eventLoop is a secondary - optional - event loop of the indexer which is only // started for the outermost indexer to push chain head events into a processing // queue. -func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) { +func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent, sub event.Subscription) { // Mark the chain indexer as active, requiring an additional teardown atomic.StoreUint32(&c.active, 1) - events := make(chan ChainEvent, 10) - sub := chainEventer(events) defer sub.Unsubscribe() // Fire the initial new head event to start any outstanding processing @@ -169,7 +193,7 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func( errc <- nil return - case ev, ok := <-events: + case ev, ok := <-ch: // Received a new event, ensure it's not nil (closing) and update if !ok { errc := <-c.quit @@ -178,7 +202,9 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func( } header := ev.Block.Header() if header.ParentHash != prevHash { - c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true) + if h := FindCommonAncestor(c.chainDb, prevHeader, header); h != nil { + c.newHead(h.Number.Uint64(), true) + } } c.newHead(header.Number.Uint64(), false) @@ -233,9 +259,10 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) { // down into the processing backend. func (c *ChainIndexer) updateLoop() { var ( - updating bool - updated time.Time + updated time.Time + updateMsg bool ) + for { select { case errc := <-c.quit: @@ -250,7 +277,7 @@ func (c *ChainIndexer) updateLoop() { // Periodically print an upgrade log message to the user if time.Since(updated) > 8*time.Second { if c.knownSections > c.storedSections+1 { - updating = true + updateMsg = true c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections) } updated = time.Now() @@ -259,7 +286,7 @@ func (c *ChainIndexer) updateLoop() { section := c.storedSections var oldHead common.Hash if section > 0 { - oldHead = c.sectionHead(section - 1) + oldHead = c.SectionHead(section - 1) } // Process the newly defined section in the background c.lock.Unlock() @@ -270,11 +297,11 @@ func (c *ChainIndexer) updateLoop() { c.lock.Lock() // If processing succeeded and no reorgs occcurred, mark the section completed - if err == nil && oldHead == c.sectionHead(section-1) { + if err == nil && oldHead == c.SectionHead(section-1) { c.setSectionHead(section, newHead) c.setValidSections(section + 1) - if c.storedSections == c.knownSections && updating { - updating = false + if c.storedSections == c.knownSections && updateMsg { + updateMsg = false c.log.Info("Finished upgrading chain index") } @@ -311,7 +338,11 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com c.log.Trace("Processing new chain section", "section", section) // Reset and partial processing - c.backend.Reset(section) + + if err := c.backend.Reset(section, lastHead); err != nil { + c.setValidSections(0) + return common.Hash{}, err + } for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ { hash := GetCanonicalHash(c.chainDb, number) @@ -341,7 +372,7 @@ func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) { c.lock.Lock() defer c.lock.Unlock() - return c.storedSections, c.storedSections*c.sectionSize - 1, c.sectionHead(c.storedSections - 1) + return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1) } // AddChildIndexer adds a child ChainIndexer that can use the output of this one @@ -383,7 +414,7 @@ func (c *ChainIndexer) setValidSections(sections uint64) { // sectionHead retrieves the last block hash of a processed section from the // index database. -func (c *ChainIndexer) sectionHead(section uint64) common.Hash { +func (c *ChainIndexer) SectionHead(section uint64) common.Hash { var data [8]byte binary.BigEndian.PutUint64(data[:], section) diff --git a/core/chain_indexer_test.go b/core/chain_indexer_test.go index b761e8a5b..d685d3f8d 100644 --- a/core/chain_indexer_test.go +++ b/core/chain_indexer_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" ) @@ -208,9 +209,10 @@ func (b *testChainIndexBackend) reorg(headNum uint64) uint64 { return b.stored * b.indexer.sectionSize } -func (b *testChainIndexBackend) Reset(section uint64) { +func (b *testChainIndexBackend) Reset(section uint64, lastSectionHead common.Hash) error { b.section = section b.headerCnt = 0 + return nil } func (b *testChainIndexBackend) Process(header *types.Header) { diff --git a/core/database_util.go b/core/database_util.go index 1730a048e..c6b125dae 100644 --- a/core/database_util.go +++ b/core/database_util.go @@ -74,9 +74,9 @@ var ( preimageHitCounter = metrics.NewCounter("db/preimage/hits") ) -// txLookupEntry is a positional metadata to help looking up the data content of +// TxLookupEntry is a positional metadata to help looking up the data content of // a transaction or receipt given only its hash. -type txLookupEntry struct { +type TxLookupEntry struct { BlockHash common.Hash BlockIndex uint64 Index uint64 @@ -260,7 +260,7 @@ func GetTxLookupEntry(db DatabaseReader, hash common.Hash) (common.Hash, uint64, return common.Hash{}, 0, 0 } // Parse and return the contents of the lookup entry - var entry txLookupEntry + var entry TxLookupEntry if err := rlp.DecodeBytes(data, &entry); err != nil { log.Error("Invalid lookup entry RLP", "hash", hash, "err", err) return common.Hash{}, 0, 0 @@ -296,7 +296,7 @@ func GetTransaction(db DatabaseReader, hash common.Hash) (*types.Transaction, co if len(data) == 0 { return nil, common.Hash{}, 0, 0 } - var entry txLookupEntry + var entry TxLookupEntry if err := rlp.DecodeBytes(data, &entry); err != nil { return nil, common.Hash{}, 0, 0 } @@ -332,14 +332,13 @@ func GetReceipt(db DatabaseReader, hash common.Hash) (*types.Receipt, common.Has // GetBloomBits retrieves the compressed bloom bit vector belonging to the given // section and bit index from the. -func GetBloomBits(db DatabaseReader, bit uint, section uint64, head common.Hash) []byte { +func GetBloomBits(db DatabaseReader, bit uint, section uint64, head common.Hash) ([]byte, error) { key := append(append(bloomBitsPrefix, make([]byte, 10)...), head.Bytes()...) binary.BigEndian.PutUint16(key[1:], uint16(bit)) binary.BigEndian.PutUint64(key[3:], section) - bits, _ := db.Get(key) - return bits + return db.Get(key) } // WriteCanonicalHash stores the canonical hash for the given block number. @@ -465,7 +464,7 @@ func WriteBlockReceipts(db ethdb.Putter, hash common.Hash, number uint64, receip func WriteTxLookupEntries(db ethdb.Putter, block *types.Block) error { // Iterate over each transaction and encode its metadata for i, tx := range block.Transactions() { - entry := txLookupEntry{ + entry := TxLookupEntry{ BlockHash: block.Hash(), BlockIndex: block.NumberU64(), Index: uint64(i), diff --git a/core/tx_list.go b/core/tx_list.go index 2935929d7..94721aa5f 100644 --- a/core/tx_list.go +++ b/core/tx_list.go @@ -384,13 +384,13 @@ func (h *priceHeap) Pop() interface{} { // txPricedList is a price-sorted heap to allow operating on transactions pool // contents in a price-incrementing way. type txPricedList struct { - all *map[common.Hash]*types.Transaction // Pointer to the map of all transactions - items *priceHeap // Heap of prices of all the stored transactions - stales int // Number of stale price points to (re-heap trigger) + all *map[common.Hash]txLookupRec // Pointer to the map of all transactions + items *priceHeap // Heap of prices of all the stored transactions + stales int // Number of stale price points to (re-heap trigger) } // newTxPricedList creates a new price-sorted transaction heap. -func newTxPricedList(all *map[common.Hash]*types.Transaction) *txPricedList { +func newTxPricedList(all *map[common.Hash]txLookupRec) *txPricedList { return &txPricedList{ all: all, items: new(priceHeap), @@ -416,7 +416,7 @@ func (l *txPricedList) Removed() { l.stales, l.items = 0, &reheap for _, tx := range *l.all { - *l.items = append(*l.items, tx) + *l.items = append(*l.items, tx.tx) } heap.Init(l.items) } diff --git a/core/tx_pool.go b/core/tx_pool.go index a705e36d6..5fdc91e65 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -192,17 +192,22 @@ type TxPool struct { locals *accountSet // Set of local transaction to exepmt from evicion rules journal *txJournal // Journal of local transaction to back up to disk - pending map[common.Address]*txList // All currently processable transactions - queue map[common.Address]*txList // Queued but non-processable transactions - beats map[common.Address]time.Time // Last heartbeat from each known account - all map[common.Hash]*types.Transaction // All transactions to allow lookups - priced *txPricedList // All transactions sorted by price + pending map[common.Address]*txList // All currently processable transactions + queue map[common.Address]*txList // Queued but non-processable transactions + beats map[common.Address]time.Time // Last heartbeat from each known account + all map[common.Hash]txLookupRec // All transactions to allow lookups + priced *txPricedList // All transactions sorted by price wg sync.WaitGroup // for shutdown sync homestead bool } +type txLookupRec struct { + tx *types.Transaction + pending bool +} + // NewTxPool creates a new transaction pool to gather, sort and filter inbound // trnsactions from the network. func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool { @@ -218,7 +223,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block pending: make(map[common.Address]*txList), queue: make(map[common.Address]*txList), beats: make(map[common.Address]time.Time), - all: make(map[common.Hash]*types.Transaction), + all: make(map[common.Hash]txLookupRec), chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), gasPrice: new(big.Int).SetUint64(config.PriceLimit), } @@ -594,7 +599,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { // If the transaction is already known, discard it hash := tx.Hash() - if pool.all[hash] != nil { + if _, ok := pool.all[hash]; ok { log.Trace("Discarding already known transaction", "hash", hash) return false, fmt.Errorf("known transaction: %x", hash) } @@ -635,7 +640,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) { pool.priced.Removed() pendingReplaceCounter.Inc(1) } - pool.all[tx.Hash()] = tx + pool.all[tx.Hash()] = txLookupRec{tx, false} pool.priced.Put(tx) pool.journalTx(from, tx) @@ -682,7 +687,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er pool.priced.Removed() queuedReplaceCounter.Inc(1) } - pool.all[hash] = tx + pool.all[hash] = txLookupRec{tx, false} pool.priced.Put(tx) return old != nil, nil } @@ -725,10 +730,13 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T pendingReplaceCounter.Inc(1) } - // Failsafe to work around direct pending inserts (tests) - if pool.all[hash] == nil { - pool.all[hash] = tx + if pool.all[hash].tx == nil { + // Failsafe to work around direct pending inserts (tests) + pool.all[hash] = txLookupRec{tx, true} pool.priced.Put(tx) + } else { + // set pending flag to true + pool.all[hash] = txLookupRec{tx, true} } // Set the potentially new pending nonce and notify any subsystems of the new tx pool.beats[addr] = time.Now() @@ -755,14 +763,16 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error { // marking the senders as a local ones in the mean time, ensuring they go around // the local pricing constraints. func (pool *TxPool) AddLocals(txs []*types.Transaction) error { - return pool.addTxs(txs, !pool.config.NoLocals) + pool.addTxs(txs, !pool.config.NoLocals) + return nil } // AddRemotes enqueues a batch of transactions into the pool if they are valid. // If the senders are not among the locally tracked ones, full pricing constraints // will apply. func (pool *TxPool) AddRemotes(txs []*types.Transaction) error { - return pool.addTxs(txs, false) + pool.addTxs(txs, false) + return nil } // addTx enqueues a single transaction into the pool if it is valid. @@ -784,7 +794,7 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { } // addTxs attempts to queue a batch of transactions if they are valid. -func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { +func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) []error { pool.mu.Lock() defer pool.mu.Unlock() @@ -793,11 +803,13 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error { // addTxsLocked attempts to queue a batch of transactions if they are valid, // whilst assuming the transaction pool lock is already held. -func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error { +func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error { // Add the batch of transaction, tracking the accepted ones dirty := make(map[common.Address]struct{}) - for _, tx := range txs { - if replace, err := pool.add(tx, local); err == nil { + txErr := make([]error, len(txs)) + for i, tx := range txs { + var replace bool + if replace, txErr[i] = pool.add(tx, local); txErr[i] == nil { if !replace { from, _ := types.Sender(pool.signer, tx) // already validated dirty[from] = struct{}{} @@ -812,7 +824,58 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error { } pool.promoteExecutables(addrs) } - return nil + return txErr +} + +// TxStatusData is returned by AddOrGetTxStatus for each transaction +type TxStatusData struct { + Status uint + Data []byte +} + +const ( + TxStatusUnknown = iota + TxStatusQueued + TxStatusPending + TxStatusIncluded // Data contains a TxChainPos struct + TxStatusError // Data contains the error string +) + +// AddOrGetTxStatus returns the status (unknown/pending/queued) of a batch of transactions +// identified by their hashes in txHashes. Optionally the transactions themselves can be +// passed too in txs, in which case the function will try adding the previously unknown ones +// to the pool. If a new transaction cannot be added, TxStatusError is returned. Adding already +// known transactions will return their previous status. +// If txs is specified, txHashes is still required and has to match the transactions in txs. + +// Note: TxStatusIncluded is never returned by this function since the pool does not track +// mined transactions. Included status can be checked by the caller (as it happens in the +// LES protocol manager) +func (pool *TxPool) AddOrGetTxStatus(txs []*types.Transaction, txHashes []common.Hash) []TxStatusData { + status := make([]TxStatusData, len(txHashes)) + if txs != nil { + if len(txs) != len(txHashes) { + panic(nil) + } + txErr := pool.addTxs(txs, false) + for i, err := range txErr { + if err != nil { + status[i] = TxStatusData{TxStatusError, ([]byte)(err.Error())} + } + } + } + + for i, hash := range txHashes { + r, ok := pool.all[hash] + if ok { + if r.pending { + status[i] = TxStatusData{TxStatusPending, nil} + } else { + status[i] = TxStatusData{TxStatusQueued, nil} + } + } + } + return status } // Get returns a transaction if it is contained in the pool @@ -821,17 +884,18 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction { pool.mu.RLock() defer pool.mu.RUnlock() - return pool.all[hash] + return pool.all[hash].tx } // removeTx removes a single transaction from the queue, moving all subsequent // transactions back to the future queue. func (pool *TxPool) removeTx(hash common.Hash) { // Fetch the transaction we wish to delete - tx, ok := pool.all[hash] + txl, ok := pool.all[hash] if !ok { return } + tx := txl.tx addr, _ := types.Sender(pool.signer, tx) // already validated during insertion // Remove it from the list of known transactions |