aboutsummaryrefslogtreecommitdiffstats
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/bloombits/matcher.go67
-rw-r--r--core/chain_indexer.go49
-rw-r--r--core/chain_indexer_test.go2
-rw-r--r--core/tx_list.go10
-rw-r--r--core/tx_pool.go119
5 files changed, 99 insertions, 148 deletions
diff --git a/core/bloombits/matcher.go b/core/bloombits/matcher.go
index 32a660337..a75f8c085 100644
--- a/core/bloombits/matcher.go
+++ b/core/bloombits/matcher.go
@@ -57,12 +57,16 @@ type partialMatches struct {
// Retrieval represents a request for retrieval task assignments for a given
// bit with the given number of fetch elements, or a response for such a request.
// It can also have the actual results set to be used as a delivery data struct.
+//
+// The contest and error fields are used by the light client to terminate matching
+// early if an error is enountered on some path of the pipeline.
type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte
- Error error
- Context context.Context
+
+ Context context.Context
+ Error error
}
// Matcher is a pipelined system of schedulers and logic matchers which perform
@@ -506,54 +510,31 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
type MatcherSession struct {
matcher *Matcher
- quit chan struct{} // Quit channel to request pipeline termination
- kill chan struct{} // Term channel to signal non-graceful forced shutdown
- ctx context.Context
- err error
- stopping bool
- lock sync.Mutex
- pend sync.WaitGroup
+ closer sync.Once // Sync object to ensure we only ever close once
+ quit chan struct{} // Quit channel to request pipeline termination
+ kill chan struct{} // Term channel to signal non-graceful forced shutdown
+
+ ctx context.Context // Context used by the light client to abort filtering
+ err atomic.Value // Global error to track retrieval failures deep in the chain
+
+ pend sync.WaitGroup
}
// Close stops the matching process and waits for all subprocesses to terminate
// before returning. The timeout may be used for graceful shutdown, allowing the
// currently running retrievals to complete before this time.
func (s *MatcherSession) Close() {
- s.lock.Lock()
- stopping := s.stopping
- s.stopping = true
- s.lock.Unlock()
- // ensure that we only close the session once
- if stopping {
- return
- }
-
- // Bail out if the matcher is not running
- select {
- case <-s.quit:
- return
- default:
- }
- // Signal termination and wait for all goroutines to tear down
- close(s.quit)
- time.AfterFunc(time.Second, func() { close(s.kill) })
- s.pend.Wait()
+ s.closer.Do(func() {
+ // Signal termination and wait for all goroutines to tear down
+ close(s.quit)
+ time.AfterFunc(time.Second, func() { close(s.kill) })
+ s.pend.Wait()
+ })
}
-// setError sets an error and stops the session
-func (s *MatcherSession) setError(err error) {
- s.lock.Lock()
- s.err = err
- s.lock.Unlock()
- s.Close()
-}
-
-// Error returns an error if one has happened during the session
+// Error returns any failure encountered during the matching session.
func (s *MatcherSession) Error() error {
- s.lock.Lock()
- defer s.lock.Unlock()
-
- return s.err
+ return s.err.Load().(error)
}
// AllocateRetrieval assigns a bloom bit index to a client process that can either
@@ -655,9 +636,9 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
result := <-request
if result.Error != nil {
- s.setError(result.Error)
+ s.err.Store(result.Error)
+ s.Close()
}
-
s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
}
}
diff --git a/core/chain_indexer.go b/core/chain_indexer.go
index 837c908ab..7e7500dc8 100644
--- a/core/chain_indexer.go
+++ b/core/chain_indexer.go
@@ -36,17 +36,25 @@ import (
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
- Reset(section uint64, lastSectionHead common.Hash) error
+ Reset(section uint64, prevHead common.Hash) error
// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
Process(header *types.Header)
- // Commit finalizes the section metadata and stores it into the database. This
- // interface will usually be a batch writer.
+ // Commit finalizes the section metadata and stores it into the database.
Commit() error
}
+// ChainIndexerChain interface is used for connecting the indexer to a blockchain
+type ChainIndexerChain interface {
+ // CurrentHeader retrieves the latest locally known header.
+ CurrentHeader() *types.Header
+
+ // SubscribeChainEvent subscribes to new head header notifications.
+ SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
+}
+
// ChainIndexer does a post-processing job for equally sized sections of the
// canonical chain (like BlooomBits and CHT structures). A ChainIndexer is
// connected to the blockchain through the event system by starting a
@@ -114,21 +122,14 @@ func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) {
c.setValidSections(section + 1)
}
-// IndexerChain interface is used for connecting the indexer to a blockchain
-type IndexerChain interface {
- CurrentHeader() *types.Header
- SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
-}
-
// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing. Children do not need to be started, they
// are notified about new events by their parents.
-func (c *ChainIndexer) Start(chain IndexerChain) {
- ch := make(chan ChainEvent, 10)
- sub := chain.SubscribeChainEvent(ch)
- currentHeader := chain.CurrentHeader()
+func (c *ChainIndexer) Start(chain ChainIndexerChain) {
+ events := make(chan ChainEvent, 10)
+ sub := chain.SubscribeChainEvent(events)
- go c.eventLoop(currentHeader, ch, sub)
+ go c.eventLoop(chain.CurrentHeader(), events, sub)
}
// Close tears down all goroutines belonging to the indexer and returns any error
@@ -149,14 +150,12 @@ func (c *ChainIndexer) Close() error {
errs = append(errs, err)
}
}
-
// Close all children
for _, child := range c.children {
if err := child.Close(); err != nil {
errs = append(errs, err)
}
}
-
// Return any failures
switch {
case len(errs) == 0:
@@ -173,7 +172,7 @@ func (c *ChainIndexer) Close() error {
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
-func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent, sub event.Subscription) {
+func (c *ChainIndexer) eventLoop(currentHeader *types.Header, events chan ChainEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1)
@@ -193,7 +192,7 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent
errc <- nil
return
- case ev, ok := <-ch:
+ case ev, ok := <-events:
// Received a new event, ensure it's not nil (closing) and update
if !ok {
errc := <-c.quit
@@ -202,6 +201,8 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent
}
header := ev.Block.Header()
if header.ParentHash != prevHash {
+ // Reorg to the common ancestor (might not exist in light sync mode, skip reorg then)
+ // TODO(karalabe, zsfelfoldi): This seems a bit brittle, can we detect this case explicitly?
if h := FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
c.newHead(h.Number.Uint64(), true)
}
@@ -259,8 +260,8 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) {
// down into the processing backend.
func (c *ChainIndexer) updateLoop() {
var (
- updated time.Time
- updateMsg bool
+ updating bool
+ updated time.Time
)
for {
@@ -277,7 +278,7 @@ func (c *ChainIndexer) updateLoop() {
// Periodically print an upgrade log message to the user
if time.Since(updated) > 8*time.Second {
if c.knownSections > c.storedSections+1 {
- updateMsg = true
+ updating = true
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
}
updated = time.Now()
@@ -300,8 +301,8 @@ func (c *ChainIndexer) updateLoop() {
if err == nil && oldHead == c.SectionHead(section-1) {
c.setSectionHead(section, newHead)
c.setValidSections(section + 1)
- if c.storedSections == c.knownSections && updateMsg {
- updateMsg = false
+ if c.storedSections == c.knownSections && updating {
+ updating = false
c.log.Info("Finished upgrading chain index")
}
@@ -412,7 +413,7 @@ func (c *ChainIndexer) setValidSections(sections uint64) {
c.storedSections = sections // needed if new > old
}
-// sectionHead retrieves the last block hash of a processed section from the
+// SectionHead retrieves the last block hash of a processed section from the
// index database.
func (c *ChainIndexer) SectionHead(section uint64) common.Hash {
var data [8]byte
diff --git a/core/chain_indexer_test.go b/core/chain_indexer_test.go
index d685d3f8d..9fc09eda5 100644
--- a/core/chain_indexer_test.go
+++ b/core/chain_indexer_test.go
@@ -209,7 +209,7 @@ func (b *testChainIndexBackend) reorg(headNum uint64) uint64 {
return b.stored * b.indexer.sectionSize
}
-func (b *testChainIndexBackend) Reset(section uint64, lastSectionHead common.Hash) error {
+func (b *testChainIndexBackend) Reset(section uint64, prevHead common.Hash) error {
b.section = section
b.headerCnt = 0
return nil
diff --git a/core/tx_list.go b/core/tx_list.go
index 94721aa5f..2935929d7 100644
--- a/core/tx_list.go
+++ b/core/tx_list.go
@@ -384,13 +384,13 @@ func (h *priceHeap) Pop() interface{} {
// txPricedList is a price-sorted heap to allow operating on transactions pool
// contents in a price-incrementing way.
type txPricedList struct {
- all *map[common.Hash]txLookupRec // Pointer to the map of all transactions
- items *priceHeap // Heap of prices of all the stored transactions
- stales int // Number of stale price points to (re-heap trigger)
+ all *map[common.Hash]*types.Transaction // Pointer to the map of all transactions
+ items *priceHeap // Heap of prices of all the stored transactions
+ stales int // Number of stale price points to (re-heap trigger)
}
// newTxPricedList creates a new price-sorted transaction heap.
-func newTxPricedList(all *map[common.Hash]txLookupRec) *txPricedList {
+func newTxPricedList(all *map[common.Hash]*types.Transaction) *txPricedList {
return &txPricedList{
all: all,
items: new(priceHeap),
@@ -416,7 +416,7 @@ func (l *txPricedList) Removed() {
l.stales, l.items = 0, &reheap
for _, tx := range *l.all {
- *l.items = append(*l.items, tx.tx)
+ *l.items = append(*l.items, tx)
}
heap.Init(l.items)
}
diff --git a/core/tx_pool.go b/core/tx_pool.go
index 5fdc91e65..0f008ddc0 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -103,6 +103,16 @@ var (
underpricedTxCounter = metrics.NewCounter("txpool/underpriced")
)
+// TxStatus is the current status of a transaction as seen py the pool.
+type TxStatus uint
+
+const (
+ TxStatusUnknown TxStatus = iota
+ TxStatusQueued
+ TxStatusPending
+ TxStatusIncluded
+)
+
// blockChain provides the state of blockchain and current gas limit to do
// some pre checks in tx pool and event subscribers.
type blockChain interface {
@@ -192,22 +202,17 @@ type TxPool struct {
locals *accountSet // Set of local transaction to exepmt from evicion rules
journal *txJournal // Journal of local transaction to back up to disk
- pending map[common.Address]*txList // All currently processable transactions
- queue map[common.Address]*txList // Queued but non-processable transactions
- beats map[common.Address]time.Time // Last heartbeat from each known account
- all map[common.Hash]txLookupRec // All transactions to allow lookups
- priced *txPricedList // All transactions sorted by price
+ pending map[common.Address]*txList // All currently processable transactions
+ queue map[common.Address]*txList // Queued but non-processable transactions
+ beats map[common.Address]time.Time // Last heartbeat from each known account
+ all map[common.Hash]*types.Transaction // All transactions to allow lookups
+ priced *txPricedList // All transactions sorted by price
wg sync.WaitGroup // for shutdown sync
homestead bool
}
-type txLookupRec struct {
- tx *types.Transaction
- pending bool
-}
-
// NewTxPool creates a new transaction pool to gather, sort and filter inbound
// trnsactions from the network.
func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
@@ -223,7 +228,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block
pending: make(map[common.Address]*txList),
queue: make(map[common.Address]*txList),
beats: make(map[common.Address]time.Time),
- all: make(map[common.Hash]txLookupRec),
+ all: make(map[common.Hash]*types.Transaction),
chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize),
gasPrice: new(big.Int).SetUint64(config.PriceLimit),
}
@@ -599,7 +604,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error {
func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
// If the transaction is already known, discard it
hash := tx.Hash()
- if _, ok := pool.all[hash]; ok {
+ if pool.all[hash] != nil {
log.Trace("Discarding already known transaction", "hash", hash)
return false, fmt.Errorf("known transaction: %x", hash)
}
@@ -640,7 +645,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (bool, error) {
pool.priced.Removed()
pendingReplaceCounter.Inc(1)
}
- pool.all[tx.Hash()] = txLookupRec{tx, false}
+ pool.all[tx.Hash()] = tx
pool.priced.Put(tx)
pool.journalTx(from, tx)
@@ -687,7 +692,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction) (bool, er
pool.priced.Removed()
queuedReplaceCounter.Inc(1)
}
- pool.all[hash] = txLookupRec{tx, false}
+ pool.all[hash] = tx
pool.priced.Put(tx)
return old != nil, nil
}
@@ -730,13 +735,10 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T
pendingReplaceCounter.Inc(1)
}
- if pool.all[hash].tx == nil {
- // Failsafe to work around direct pending inserts (tests)
- pool.all[hash] = txLookupRec{tx, true}
+ // Failsafe to work around direct pending inserts (tests)
+ if pool.all[hash] == nil {
+ pool.all[hash] = tx
pool.priced.Put(tx)
- } else {
- // set pending flag to true
- pool.all[hash] = txLookupRec{tx, true}
}
// Set the potentially new pending nonce and notify any subsystems of the new tx
pool.beats[addr] = time.Now()
@@ -762,17 +764,15 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error {
// AddLocals enqueues a batch of transactions into the pool if they are valid,
// marking the senders as a local ones in the mean time, ensuring they go around
// the local pricing constraints.
-func (pool *TxPool) AddLocals(txs []*types.Transaction) error {
- pool.addTxs(txs, !pool.config.NoLocals)
- return nil
+func (pool *TxPool) AddLocals(txs []*types.Transaction) []error {
+ return pool.addTxs(txs, !pool.config.NoLocals)
}
// AddRemotes enqueues a batch of transactions into the pool if they are valid.
// If the senders are not among the locally tracked ones, full pricing constraints
// will apply.
-func (pool *TxPool) AddRemotes(txs []*types.Transaction) error {
- pool.addTxs(txs, false)
- return nil
+func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
+ return pool.addTxs(txs, false)
}
// addTx enqueues a single transaction into the pool if it is valid.
@@ -806,10 +806,11 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) []error {
func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error {
// Add the batch of transaction, tracking the accepted ones
dirty := make(map[common.Address]struct{})
- txErr := make([]error, len(txs))
+ errs := make([]error, len(txs))
+
for i, tx := range txs {
var replace bool
- if replace, txErr[i] = pool.add(tx, local); txErr[i] == nil {
+ if replace, errs[i] = pool.add(tx, local); errs[i] == nil {
if !replace {
from, _ := types.Sender(pool.signer, tx) // already validated
dirty[from] = struct{}{}
@@ -824,54 +825,23 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error {
}
pool.promoteExecutables(addrs)
}
- return txErr
-}
-
-// TxStatusData is returned by AddOrGetTxStatus for each transaction
-type TxStatusData struct {
- Status uint
- Data []byte
+ return errs
}
-const (
- TxStatusUnknown = iota
- TxStatusQueued
- TxStatusPending
- TxStatusIncluded // Data contains a TxChainPos struct
- TxStatusError // Data contains the error string
-)
-
-// AddOrGetTxStatus returns the status (unknown/pending/queued) of a batch of transactions
-// identified by their hashes in txHashes. Optionally the transactions themselves can be
-// passed too in txs, in which case the function will try adding the previously unknown ones
-// to the pool. If a new transaction cannot be added, TxStatusError is returned. Adding already
-// known transactions will return their previous status.
-// If txs is specified, txHashes is still required and has to match the transactions in txs.
-
-// Note: TxStatusIncluded is never returned by this function since the pool does not track
-// mined transactions. Included status can be checked by the caller (as it happens in the
-// LES protocol manager)
-func (pool *TxPool) AddOrGetTxStatus(txs []*types.Transaction, txHashes []common.Hash) []TxStatusData {
- status := make([]TxStatusData, len(txHashes))
- if txs != nil {
- if len(txs) != len(txHashes) {
- panic(nil)
- }
- txErr := pool.addTxs(txs, false)
- for i, err := range txErr {
- if err != nil {
- status[i] = TxStatusData{TxStatusError, ([]byte)(err.Error())}
- }
- }
- }
+// Status returns the status (unknown/pending/queued) of a batch of transactions
+// identified by their hashes.
+func (pool *TxPool) Status(hashes []common.Hash) []TxStatus {
+ pool.mu.RLock()
+ defer pool.mu.RUnlock()
- for i, hash := range txHashes {
- r, ok := pool.all[hash]
- if ok {
- if r.pending {
- status[i] = TxStatusData{TxStatusPending, nil}
+ status := make([]TxStatus, len(hashes))
+ for i, hash := range hashes {
+ if tx := pool.all[hash]; tx != nil {
+ from, _ := types.Sender(pool.signer, tx) // already validated
+ if pool.pending[from].txs.items[tx.Nonce()] != nil {
+ status[i] = TxStatusPending
} else {
- status[i] = TxStatusData{TxStatusQueued, nil}
+ status[i] = TxStatusQueued
}
}
}
@@ -884,18 +854,17 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
pool.mu.RLock()
defer pool.mu.RUnlock()
- return pool.all[hash].tx
+ return pool.all[hash]
}
// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
func (pool *TxPool) removeTx(hash common.Hash) {
// Fetch the transaction we wish to delete
- txl, ok := pool.all[hash]
+ tx, ok := pool.all[hash]
if !ok {
return
}
- tx := txl.tx
addr, _ := types.Sender(pool.signer, tx) // already validated during insertion
// Remove it from the list of known transactions