aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
authorFelföldi Zsolt <zsfelfoldi@gmail.com>2017-10-24 21:19:09 +0800
committerFelix Lange <fjl@users.noreply.github.com>2017-10-24 21:19:09 +0800
commitca376ead88a5a26626a90abdb62f4de7f6313822 (patch)
tree71d11e3b6cd40d2bf29033b7e23d30d04e086558 /core
parent6d6a5a93370371a33fb815d7ae47b60c7021c86a (diff)
downloaddexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar
dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.gz
dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.bz2
dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.lz
dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.xz
dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.tar.zst
dexon-ca376ead88a5a26626a90abdb62f4de7f6313822.zip
les, light: LES/2 protocol version (#14970)
This PR implements the new LES protocol version extensions: * new and more efficient Merkle proofs reply format (when replying to a multiple Merkle proofs request, we just send a single set of trie nodes containing all necessary nodes) * BBT (BloomBitsTrie) works similarly to the existing CHT and contains the bloombits search data to speed up log searches * GetTxStatusMsg returns the inclusion position or the pending/queued/unknown state of a transaction referenced by hash * an optional signature of new block data (number/hash/td) can be included in AnnounceMsg to provide an option for "very light clients" (mobile/embedded devices) to skip expensive Ethash check and accept multiple signatures of somewhat trusted servers (still a lot better than trusting a single server completely and retrieving everything through RPC). The new client mode is not implemented in this PR, just the protocol extension.
Diffstat (limited to 'core')
-rw-r--r--core/bloombits/matcher.go51
-rw-r--r--core/bloombits/matcher_test.go9
-rw-r--r--core/chain_indexer.go69
-rw-r--r--core/chain_indexer_test.go4
-rw-r--r--core/database_util.go15
-rw-r--r--core/tx_list.go10
-rw-r--r--core/tx_pool.go106
7 files changed, 199 insertions, 65 deletions
diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go
index e33de018a..32a660337 100644
--- a/core/bloombits/matcher.go
+++ b/core/bloombits/matcher.go
@@ -18,6 +18,7 @@ package bloombits
import (
"bytes"
+ "context"
"errors"
"math"
"sort"
@@ -60,6 +61,8 @@ type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte
+ Error error
+ Context context.Context
}
// Matcher is a pipelined system of schedulers and logic matchers which perform
@@ -137,7 +140,7 @@ func (m *Matcher) addScheduler(idx uint) {
// Start starts the matching process and returns a stream of bloom matches in
// a given range of blocks. If there are no more matches in the range, the result
// channel is closed.
-func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession, error) {
+func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
// Make sure we're not creating concurrent sessions
if atomic.SwapUint32(&m.running, 1) == 1 {
return nil, errors.New("matcher already running")
@@ -149,6 +152,7 @@ func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession
matcher: m,
quit: make(chan struct{}),
kill: make(chan struct{}),
+ ctx: ctx,
}
for _, scheduler := range m.schedulers {
scheduler.reset()
@@ -502,15 +506,28 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
type MatcherSession struct {
matcher *Matcher
- quit chan struct{} // Quit channel to request pipeline termination
- kill chan struct{} // Term channel to signal non-graceful forced shutdown
- pend sync.WaitGroup
+ quit chan struct{} // Quit channel to request pipeline termination
+ kill chan struct{} // Term channel to signal non-graceful forced shutdown
+ ctx context.Context
+ err error
+ stopping bool
+ lock sync.Mutex
+ pend sync.WaitGroup
}
// Close stops the matching process and waits for all subprocesses to terminate
// before returning. The timeout may be used for graceful shutdown, allowing the
// currently running retrievals to complete before this time.
-func (s *MatcherSession) Close(timeout time.Duration) {
+func (s *MatcherSession) Close() {
+ s.lock.Lock()
+ stopping := s.stopping
+ s.stopping = true
+ s.lock.Unlock()
+ // ensure that we only close the session once
+ if stopping {
+ return
+ }
+
// Bail out if the matcher is not running
select {
case <-s.quit:
@@ -519,10 +536,26 @@ func (s *MatcherSession) Close(timeout time.Duration) {
}
// Signal termination and wait for all goroutines to tear down
close(s.quit)
- time.AfterFunc(timeout, func() { close(s.kill) })
+ time.AfterFunc(time.Second, func() { close(s.kill) })
s.pend.Wait()
}
+// setError sets an error and stops the session
+func (s *MatcherSession) setError(err error) {
+ s.lock.Lock()
+ s.err = err
+ s.lock.Unlock()
+ s.Close()
+}
+
+// Error returns an error if one has happened during the session
+func (s *MatcherSession) Error() error {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+
+ return s.err
+}
+
// AllocateRetrieval assigns a bloom bit index to a client process that can either
// immediately reuest and fetch the section contents assigned to this bit or wait
// a little while for more sections to be requested.
@@ -618,9 +651,13 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
case mux <- request:
// Retrieval accepted, something must arrive before we're aborting
- request <- &Retrieval{Bit: bit, Sections: sections}
+ request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx}
result := <-request
+ if result.Error != nil {
+ s.setError(result.Error)
+ }
+
s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
}
}
diff --git a/core/bloombits/matcher_test.go b/core/bloombits/matcher_test.go
index 2e15e7aac..0d8544136 100644
--- a/core/bloombits/matcher_test.go
+++ b/core/bloombits/matcher_test.go
@@ -17,6 +17,7 @@
package bloombits
import (
+ "context"
"math/rand"
"sync/atomic"
"testing"
@@ -144,7 +145,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt
quit := make(chan struct{})
matches := make(chan uint64, 16)
- session, err := matcher.Start(0, blocks-1, matches)
+ session, err := matcher.Start(context.Background(), 0, blocks-1, matches)
if err != nil {
t.Fatalf("failed to stat matcher session: %v", err)
}
@@ -163,13 +164,13 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt
}
// If we're testing intermittent mode, abort and restart the pipeline
if intermittent {
- session.Close(time.Second)
+ session.Close()
close(quit)
quit = make(chan struct{})
matches = make(chan uint64, 16)
- session, err = matcher.Start(i+1, blocks-1, matches)
+ session, err = matcher.Start(context.Background(), i+1, blocks-1, matches)
if err != nil {
t.Fatalf("failed to stat matcher session: %v", err)
}
@@ -183,7 +184,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt
t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match)
}
// Clean up the session and ensure we match the expected retrieval count
- session.Close(time.Second)
+ session.Close()
close(quit)
if retrievals != 0 && requested != retrievals {
diff --git a/core/chain_indexer.go b/core/chain_indexer.go
index f4c207dcc..837c908ab 100644
--- a/core/chain_indexer.go
+++ b/core/chain_indexer.go
@@ -36,13 +36,14 @@ import (
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
- Reset(section uint64)
+ Reset(section uint64, lastSectionHead common.Hash) error
// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
Process(header *types.Header)
- // Commit finalizes the section metadata and stores it into the database.
+ // Commit finalizes the section metadata and stores it into the database. This
+ // interface will usually be a batch writer.
Commit() error
}
@@ -100,11 +101,34 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken
return c
}
+// AddKnownSectionHead marks a new section head as known/processed if it is newer
+// than the already known best section head
+func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) {
+ c.lock.Lock()
+ defer c.lock.Unlock()
+
+ if section < c.storedSections {
+ return
+ }
+ c.setSectionHead(section, shead)
+ c.setValidSections(section + 1)
+}
+
+// IndexerChain interface is used for connecting the indexer to a blockchain
+type IndexerChain interface {
+ CurrentHeader() *types.Header
+ SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
+}
+
// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing. Children do not need to be started, they
// are notified about new events by their parents.
-func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
- go c.eventLoop(currentHeader, chainEventer)
+func (c *ChainIndexer) Start(chain IndexerChain) {
+ ch := make(chan ChainEvent, 10)
+ sub := chain.SubscribeChainEvent(ch)
+ currentHeader := chain.CurrentHeader()
+
+ go c.eventLoop(currentHeader, ch, sub)
}
// Close tears down all goroutines belonging to the indexer and returns any error
@@ -125,12 +149,14 @@ func (c *ChainIndexer) Close() error {
errs = append(errs, err)
}
}
+
// Close all children
for _, child := range c.children {
if err := child.Close(); err != nil {
errs = append(errs, err)
}
}
+
// Return any failures
switch {
case len(errs) == 0:
@@ -147,12 +173,10 @@ func (c *ChainIndexer) Close() error {
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
-func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
+func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1)
- events := make(chan ChainEvent, 10)
- sub := chainEventer(events)
defer sub.Unsubscribe()
// Fire the initial new head event to start any outstanding processing
@@ -169,7 +193,7 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(
errc <- nil
return
- case ev, ok := <-events:
+ case ev, ok := <-ch:
// Received a new event, ensure it's not nil (closing) and update
if !ok {
errc := <-c.quit
@@ -178,7 +202,9 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(
}
header := ev.Block.Header()
if header.ParentHash != prevHash {
- c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true)
+ if h := FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
+ c.newHead(h.Number.Uint64(), true)
+ }
}
c.newHead(header.Number.Uint64(), false)
@@ -233,9 +259,10 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) {
// down into the processing backend.
func (c *ChainIndexer) updateLoop() {
var (
- updating bool
- updated time.Time
+ updated time.Time
+ updateMsg bool
)
+
for {
select {
case errc := <-c.quit:
@@ -250,7 +277,7 @@ func (c *ChainIndexer) updateLoop() {
// Periodically print an upgrade log message to the user
if time.Since(updated) > 8*time.Second {
if c.knownSections > c.storedSections+1 {
- updating = true
+ updateMsg = true
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
}
updated = time.Now()
@@ -259,7 +286,7 @@ func (c *ChainIndexer) updateLoop() {
section := c.storedSections
var oldHead common.Hash
if section > 0 {
- oldHead = c.sectionHead(section - 1)
+ oldHead = c.SectionHead(section - 1)
}
// Process the newly defined section in the background
c.lock.Unlock()
@@ -270,11 +297,11 @@ func (c *ChainIndexer) updateLoop() {
c.lock.Lock()
// If processing succeeded and no reorgs occcurred, mark the section completed
- if err == nil && oldHead == c.sectionHead(section-1) {
+ if err == nil && oldHead == c.SectionHead(section-1) {
c.setSectionHead(section, newHead)
c.setValidSections(section + 1)
- if c.storedSections == c.knownSections && updating {
- updating = false
+ if c.storedSections == c.knownSections && updateMsg {
+ updateMsg = false
c.log.Info("Finished upgrading chain index")
}
@@ -311,7 +338,11 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
c.log.Trace("Processing new chain section", "section", section)
// Reset and partial processing
- c.backend.Reset(section)
+
+ if err := c.backend.Reset(section, lastHead); err != nil {
+ c.setValidSections(0)
+ return common.Hash{}, err
+ }
for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
hash := GetCanonicalHash(c.chainDb, number)
@@ -341,7 +372,7 @@ func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) {
c.lock.Lock()
defer c.lock.Unlock()
- return c.storedSections, c.storedSections*c.sectionSize - 1, c.sectionHead(c.storedSections - 1)
+ return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1)
}
// AddChildIndexer adds a child ChainIndexer that can use the output of this one
@@ -383,7 +414,7 @@ func (c *ChainIndexer) setValidSections(sections uint64) {
// sectionHead retrieves the last block hash of a processed section from the
// index database.
-func (c *ChainIndexer) sectionHead(section uint64) common.Hash {
+func (c *ChainIndexer) SectionHead(section uint64) common.Hash {
var data [8]byte
binary.BigEndian.PutUint64(data[:], section)
diff --git a/core/chain_indexer_test.go b/core/chain_indexer_test.go
index b761e8a5b..d685d3f8d 100644
--- a/core/chain_indexer_test.go
+++ b/core/chain_indexer_test.go
@@ -23,6 +23,7 @@ import (
"testing"
"time"
+ "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
)
@@ -208,9 +209,10 @@ func (b *testChainIndexBackend) reorg(headNum uint64) uint64 {
return b.stored * b.indexer.sectionSize
}
-func (b *testChainIndexBackend) Reset(section uint64) {
+func (b *testChainIndexBackend) Reset(section uint64, lastSectionHead common.Hash) error {
b.section = section
b.headerCnt = 0
+ return nil
}
func (b *testChainIndexBackend) Process(header *types.Header) {
diff --git a/core/database_util.go b/core/database_util.go
index 1730a048e..c6b125dae 100644
--- a/core/database_util.go
+++ b/core/database_util.go
@@ -74,9 +74,9 @@ var (
preimageHitCounter = metrics.NewCounter("db/preimage/hits")
)
-// txLookupEntry is a positional metadata to help looking up the data content of
+// TxLookupEntry is a positional metadata to help looking up the data content of
// a transaction or receipt given only its hash.
-type txLookupEntry struct {
+type TxLookupEntry struct {
BlockHash common.Hash
BlockIndex uint64
Index uint64
@@ -260,7 +260,7 @@ func GetTxLookupEntry(db DatabaseReader, hash common.Hash) (common.Hash, uint64,
return common.Hash{}, 0, 0
}
// Parse and return the contents of the lookup entry
- var entry txLookupEntry
+ var entry TxLookupEntry
if err := rlp.DecodeBytes(data, &entry); err != nil {
log.Error("Invalid lookup entry RLP", "hash", hash, "err", err)
return common.Hash{}, 0, 0
@@ -296,7 +296,7 @@ func GetTransaction(db DatabaseReader, hash common.Hash) (*types.Transaction, co
if len(data) == 0 {
return nil, common.Hash{}, 0, 0
}
- var entry txLookupEntry
+ var entry TxLookupEntry
if err := rlp.DecodeBytes(data, &entry); err != nil {
return nil, common.Hash{}, 0, 0
}
@@ -332,14 +332,13 @@ func GetReceipt(db DatabaseReader, hash common.Hash) (*types.Receipt, common.Has
// GetBloomBits retrieves the compressed bloom bit vector belonging to the given
// section and bit index from the.
-func GetBloomBits(db DatabaseReader, bit uint, section uint64, head common.Hash) []byte {
+func GetBloomBits(db DatabaseReader, bit uint, section uint64, head common.Hash) ([]byte, error) {
key := append(append(bloomBitsPrefix, make([]byte, 10)...), head.Bytes()...)
binary.BigEndian.PutUint16(key[1:], uint16(bit))
binary.BigEndian.PutUint64(key[3:], section)
- bits, _ := db.Get(key)
- return bits
+ return db.Get(key)
}
// WriteCanonicalHash stores the canonical hash for the given block number.
@@ -465,7 +464,7 @@ func WriteBlockReceipts(db ethdb.Putter, hash common.Hash, number uint64, receip
func WriteTxLookupEntries(db ethdb.Putter, block *types.Block) error {
// Iterate over each transaction and encode its metadata
for i, tx := range block.Transactions() {
- entry := txLookupEntry{
+ entry := TxLookupEntry{
BlockHash: block.Hash(),
BlockIndex: block.NumberU64(),
Index: uint64(i),
diff --git a/core/tx_list.go b/core/tx_list.go
index 2935929d7..94721aa5f 100644
--- a/core/tx_list.go
+++ b/core/tx_list.go
@@ -384,13 +384,13 @@ func (h *priceHeap) Pop() interface{} {
// txPricedList is a price-sorted heap to allow operating on transactions pool
// contents in a price-incrementing way.
type txPricedList struct {
- all *map[common.Hash]*types.Transaction // Pointer to the map of all transactions
- items *priceHeap // Heap of prices of all the stored transactions
- stales int // Number of stale price points to (re-heap trigger)
+ all *map[common.Hash]txLookupRec // Pointer to the map of all transactions
+ items *priceHeap // Heap of prices of all the stored transactions
+ stales int // Number of stale price points to (re-heap trigger)
}
// newTxPricedList creates a new price-sorted transaction heap.
-func newTxPricedList(all *map[common.Hash]*types.Transaction) *txPricedList {
+func newTxPricedList(all *map[common.Hash]txLookupRec) *txPricedList {
return &txPricedList{
all: all,
items: new(priceHeap),
@@ -416,7 +416,7 @@ func (l *txPricedList) Removed() {
l.stales, l.items = 0, &reheap
for _, tx := range *l.all {
- *l.items = append(*l.items, tx)
+ *l.items = append(*l.items, tx.tx)
}
heap.Init(l.items)
}
diff --git a/core/tx_pool.go b/core/tx_pool.go
index a705e36d6..5fdc91e65 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -192,17 +192,22 @@ type TxPool struct {
locals *accountSet // Set of local transaction to exepmt from evicion rules
journal *txJournal // Journal of local transaction to back up to disk
- pending map[common.Address]*txList // All currently processable transactions
- queue map[common.Address]*txList // Queued but non-processable transactions
- beats map[common.Address]time.Time // Last heartbeat from each known account
- all map[common.Hash]*types.Transaction // All transactions to allow lookups
- priced *txPricedList // All transactions sorted by price
+ pending map[common.Address]*txList // All currently processable transactions
+ queue map[common.Address]*txList // Queued but non-processable transactions
+ beats map[common.Address]time.Time // Last heartbeat from each known account
+ all map[common.Hash]txLookupRec // All transactions to allow lookups
+ priced *txPricedList // All transactions sorted by price
wg sync.WaitGroup // for shutdown sync
homestead bool
}
+type txLookupRec struct {
+ tx *types.Transaction
+ pending bool
+}
+
// NewTxPool creates a new transaction pool to gather, sort and filter inbound
// trnsactions from the network.
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
@@ -218,7 +223,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
- all: make(map[common.Hash]*types.Transaction),
+ all: make(map[common.Hash]txLookupRec),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
@@ -594,7 +599,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
// If the transaction is already known, discard it
hash := tx.Hash()
- if pool.all[hash] != nil {
+ if _, ok := pool.all[hash]; ok {
log.Trace("Discarding already known transaction", "hash", hash)
return false, fmt.Errorf("known transaction: %x", hash)
}
@@ -635,7 +640,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.priced.Removed()
pendingReplaceCounter.Inc(1)
}
- pool.all[tx.Hash()] = tx
+ pool.all[tx.Hash()] = txLookupRec{tx, false}
pool.priced.Put(tx)
pool.journalTx(from, tx)
@@ -682,7 +687,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
pool.priced.Removed()
queuedReplaceCounter.Inc(1)
}
- pool.all[hash] = tx
+ pool.all[hash] = txLookupRec{tx, false}
pool.priced.Put(tx)
return old != nil, nil
}
@@ -725,10 +730,13 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
pendingReplaceCounter.Inc(1)
}
- // Failsafe to work around direct pending inserts (tests)
- if pool.all[hash] == nil {
- pool.all[hash] = tx
+ if pool.all[hash].tx == nil {
+ // Failsafe to work around direct pending inserts (tests)
+ pool.all[hash] = txLookupRec{tx, true}
pool.priced.Put(tx)
+ } else {
+ // set pending flag to true
+ pool.all[hash] = txLookupRec{tx, true}
}
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.beats[addr] = time.Now()
@@ -755,14 +763,16 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error {
// marking the senders as a local ones in the mean time, ensuring they go around
// the local pricing constraints.
func (pool *TxPool) AddLocals(txs []*types.Transaction) error {
- return pool.addTxs(txs, !pool.config.NoLocals)
+ pool.addTxs(txs, !pool.config.NoLocals)
+ return nil
}
// AddRemotes enqueues a batch of transactions into the pool if they are valid.
// If the senders are not among the locally tracked ones, full pricing constraints
// will apply.
func (pool *TxPool) AddRemotes(txs []*types.Transaction) error {
- return pool.addTxs(txs, false)
+ pool.addTxs(txs, false)
+ return nil
}
// addTx enqueues a single transaction into the pool if it is valid.
@@ -784,7 +794,7 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error {
}
// addTxs attempts to queue a batch of transactions if they are valid.
-func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error {
+func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) []error {
pool.mu.Lock()
defer pool.mu.Unlock()
@@ -793,11 +803,13 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) error {
// addTxsLocked attempts to queue a batch of transactions if they are valid,
// whilst assuming the transaction pool lock is already held.
-func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error {
+func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error {
// Add the batch of transaction, tracking the accepted ones
dirty := make(map[common.Address]struct{})
- for _, tx := range txs {
- if replace, err := pool.add(tx, local); err == nil {
+ txErr := make([]error, len(txs))
+ for i, tx := range txs {
+ var replace bool
+ if replace, txErr[i] = pool.add(tx, local); txErr[i] == nil {
if !replace {
from, _ := types.Sender(pool.signer, tx) // already validated
dirty[from] = struct{}{}
@@ -812,7 +824,58 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) error {
}
pool.promoteExecutables(addrs)
}
- return nil
+ return txErr
+}
+
+// TxStatusData is returned by AddOrGetTxStatus for each transaction
+type TxStatusData struct {
+ Status uint
+ Data []byte
+}
+
+const (
+ TxStatusUnknown = iota
+ TxStatusQueued
+ TxStatusPending
+ TxStatusIncluded // Data contains a TxChainPos struct
+ TxStatusError // Data contains the error string
+)
+
+// AddOrGetTxStatus returns the status (unknown/pending/queued) of a batch of transactions
+// identified by their hashes in txHashes. Optionally the transactions themselves can be
+// passed too in txs, in which case the function will try adding the previously unknown ones
+// to the pool. If a new transaction cannot be added, TxStatusError is returned. Adding already
+// known transactions will return their previous status.
+// If txs is specified, txHashes is still required and has to match the transactions in txs.
+
+// Note: TxStatusIncluded is never returned by this function since the pool does not track
+// mined transactions. Included status can be checked by the caller (as it happens in the
+// LES protocol manager)
+func (pool *TxPool) AddOrGetTxStatus(txs []*types.Transaction, txHashes []common.Hash) []TxStatusData {
+ status := make([]TxStatusData, len(txHashes))
+ if txs != nil {
+ if len(txs) != len(txHashes) {
+ panic(nil)
+ }
+ txErr := pool.addTxs(txs, false)
+ for i, err := range txErr {
+ if err != nil {
+ status[i] = TxStatusData{TxStatusError, ([]byte)(err.Error())}
+ }
+ }
+ }
+
+ for i, hash := range txHashes {
+ r, ok := pool.all[hash]
+ if ok {
+ if r.pending {
+ status[i] = TxStatusData{TxStatusPending, nil}
+ } else {
+ status[i] = TxStatusData{TxStatusQueued, nil}
+ }
+ }
+ }
+ return status
}
// Get returns a transaction if it is contained in the pool
@@ -821,17 +884,18 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
pool.mu.RLock()
defer pool.mu.RUnlock()
- return pool.all[hash]
+ return pool.all[hash].tx
}
// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
func (pool *TxPool) removeTx(hash common.Hash) {
// Fetch the transaction we wish to delete
- tx, ok := pool.all[hash]
+ txl, ok := pool.all[hash]
if !ok {
return
}
+ tx := txl.tx
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
// Remove it from the list of known transactions