aboutsummaryrefslogtreecommitdiffstats
path: root/light/txpool.go
blob: 767a797bdceca2ab7775a4efb7f3d115e9ec9d29 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package light

import (
    "context"
    "fmt"
    "sync"
    "time"

    "github.com/ethereum/go-ethereum/common"
    "github.com/ethereum/go-ethereum/core"
    "github.com/ethereum/go-ethereum/core/rawdb"
    "github.com/ethereum/go-ethereum/core/state"
    "github.com/ethereum/go-ethereum/core/types"
    "github.com/ethereum/go-ethereum/ethdb"
    "github.com/ethereum/go-ethereum/event"
    "github.com/ethereum/go-ethereum/log"
    "github.com/ethereum/go-ethereum/params"
    "github.com/ethereum/go-ethereum/rlp"
)

const (
    // chainHeadChanSize is the size of channel listening to ChainHeadEvent.
    chainHeadChanSize = 10
)

// txPermanent is the number of mined blocks after a mined transaction is
// considered permanent and no rollback is expected
var txPermanent = uint64(500)

// TxPool implements the transaction pool for light clients, which keeps track
// of the status of locally created transactions, detecting if they are included
// in a block (mined) or rolled back. There are no queued transactions since we
// always receive all locally signed transactions in the same order as they are
// created.
type TxPool struct {
    config       *params.ChainConfig
    signer       types.Signer
    quit         chan bool
    txFeed       event.Feed
    scope        event.SubscriptionScope
    chainHeadCh  chan core.ChainHeadEvent
    chainHeadSub event.Subscription
    mu           sync.RWMutex
    chain        *LightChain
    odr          OdrBackend
    chainDb      ethdb.Database
    relay        TxRelayBackend
    head         common.Hash
    nonce        map[common.Address]uint64            // "pending" nonce
    pending      map[common.Hash]*types.Transaction   // pending transactions by tx hash
    mined        map[common.Hash][]*types.Transaction // mined transactions by block hash
    clearIdx     uint64                               // earliest block nr that can contain mined tx info

    homestead bool
}

// TxRelayBackend provides an interface to the mechanism that forwards transacions
// to the ETH network. The implementations of the functions should be non-blocking.
//
// Send instructs backend to forward new transactions
// NewHead notifies backend about a new head after processed by the tx pool,
//  including  mined and rolled back transactions since the last event
// Discard notifies backend about transactions that should be discarded either
//  because they have been replaced by a re-send or because they have been mined
//  long ago and no rollback is expected
type TxRelayBackend interface {
    Send(txs types.Transactions)
    NewHead(head common.Hash, mined []common.Hash, rollback []common.Hash)
    Discard(hashes []common.Hash)
}

// NewTxPool creates a new light transaction pool
func NewTxPool(config *params.ChainConfig, chain *LightChain, relay TxRelayBackend) *TxPool {
    pool := &TxPool{
        config:      config,
        signer:      types.NewEIP155Signer(config.ChainID),
        nonce:       make(map[common.Address]uint64),
        pending:     make(map[common.Hash]*types.Transaction),
        mined:       make(map[common.Hash][]*types.Transaction),
        quit:        make(chan bool),
        chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize),
        chain:       chain,
        relay:       relay,
        odr:         chain.Odr(),
        chainDb:     chain.Odr().Database(),
        head:        chain.CurrentHeader().Hash(),
        clearIdx:    chain.CurrentHeader().Number.Uint64(),
    }
    // Subscribe events from blockchain
    pool.chainHeadSub = pool.chain.SubscribeChainHeadEvent(pool.chainHeadCh)
    go pool.eventLoop()

    return pool
}

// currentState returns the light state of the current head header
func (pool *TxPool) currentState(ctx context.Context) *state.StateDB {
    return NewState(ctx, pool.chain.CurrentHeader(), pool.odr)
}

// GetNonce returns the "pending" nonce of a given address. It always queries
// the nonce belonging to the latest header too in order to detect if another
// client using the same key sent a transaction.
func (pool *TxPool) GetNonce(ctx context.Context, addr common.Address) (uint64, error) {
    state := pool.currentState(ctx)
    nonce := state.GetNonce(addr)
    if state.Error() != nil {
        return 0, state.Error()
    }
    sn, ok := pool.nonce[addr]
    if ok && sn > nonce {
        nonce = sn
    }
    if !ok || sn < nonce {
        pool.nonce[addr] = nonce
    }
    return nonce, nil
}

// txStateChanges stores the recent changes between pending/mined states of
// transactions. True means mined, false means rolled back, no entry means no change
type txStateChanges map[common.Hash]bool

// setState sets the status of a tx to either recently mined or recently rolled back
func (txc txStateChanges) setState(txHash common.Hash, mined bool) {
    val, ent := txc[txHash]
    if ent && (val != mined) {
        delete(txc, txHash)
    } else {
        txc[txHash] = mined
    }
}

// getLists creates lists of mined and rolled back tx hashes
func (txc txStateChanges) getLists() (mined []common.Hash, rollback []common.Hash) {
    for hash, val := range txc {
        if val {
            mined = append(mined, hash)
        } else {
            rollback = append(rollback, hash)
        }
    }
    return
}

// checkMinedTxs checks newly added blocks for the currently pending transactions
// and marks them as mined if necessary. It also stores block position in the db
// and adds them to the received txStateChanges map.
func (pool *TxPool) checkMinedTxs(ctx context.Context, hash common.Hash, number uint64, txc txStateChanges) error {
    // If no transactions are pending, we don't care about anything
    if len(pool.pending) == 0 {
        return nil
    }
    block, err := GetBlock(ctx, pool.odr, hash, number)
    if err != nil {
        return err
    }
    // Gather all the local transaction mined in this block
    list := pool.mined[hash]
    for _, tx := range block.Transactions() {
        if _, ok := pool.pending[tx.Hash()]; ok {
            list = append(list, tx)
        }
    }
    // If some transactions have been mined, write the needed data to disk and update
    if list != nil {
        // Retrieve all the receipts belonging to this block and write the loopup table
        if _, err := GetBlockReceipts(ctx, pool.odr, hash, number); err != nil { // ODR caches, ignore results
            return err
        }
        rawdb.WriteTxLookupEntries(pool.chainDb, block)

        // Update the transaction pool's state
        for _, tx := range list {
            delete(pool.pending, tx.Hash())
            txc.setState(tx.Hash(), true)
        }
        pool.mined[hash] = list
    }
    return nil
}

// rollbackTxs marks the transactions contained in recently rolled back blocks
// as rolled back. It also removes any positional lookup entries.
func (pool *TxPool) rollbackTxs(hash common.Hash, txc txStateChanges) {
    batch := pool.chainDb.NewBatch()
    if list, ok := pool.mined[hash]; ok {
        for _, tx := range list {
            txHash := tx.Hash()
            rawdb.DeleteTxLookupEntry(batch, txHash)
            pool.pending[txHash] = tx
            txc.setState(txHash, false)
        }
        delete(pool.mined, hash)
    }
    batch.Write()
}

// reorgOnNewHead sets a new head header, processing (and rolling back if necessary)
// the blocks since the last known head and returns a txStateChanges map containing
// the recently mined and rolled back transaction hashes. If an error (context
// timeout) occurs during checking new blocks, it leaves the locally known head
// at the latest checked block and still returns a valid txStateChanges, making it
// possible to continue checking the missing blocks at the next chain head event
func (pool *TxPool) reorgOnNewHead(ctx context.Context, newHeader *types.Header) (txStateChanges, error) {
    txc := make(txStateChanges)
    oldh := pool.chain.GetHeaderByHash(pool.head)
    newh := newHeader
    // find common ancestor, create list of rolled back and new block hashes
    var oldHashes, newHashes []common.Hash
    for oldh.Hash() != newh.Hash() {
        if oldh.Number.Uint64() >= newh.Number.Uint64() {
            oldHashes = append(oldHashes, oldh.Hash())
            oldh = pool.chain.GetHeader(oldh.ParentHash, oldh.Number.Uint64()-1)
        }
        if oldh.Number.Uint64() < newh.Number.Uint64() {
            newHashes = append(newHashes, newh.Hash())
            newh = pool.chain.GetHeader(newh.ParentHash, newh.Number.Uint64()-1)
            if newh == nil {
                // happens when CHT syncing, nothing to do
                newh = oldh
            }
        }
    }
    if oldh.Number.Uint64() < pool.clearIdx {
        pool.clearIdx = oldh.Number.Uint64()
    }
    // roll back old blocks
    for _, hash := range oldHashes {
        pool.rollbackTxs(hash, txc)
    }
    pool.head = oldh.Hash()
    // check mined txs of new blocks (array is in reversed order)
    for i := len(newHashes) - 1; i >= 0; i-- {
        hash := newHashes[i]
        if err := pool.checkMinedTxs(ctx, hash, newHeader.Number.Uint64()-uint64(i), txc); err != nil {
            return txc, err
        }
        pool.head = hash
    }

    // clear old mined tx entries of old blocks
    if idx := newHeader.Number.Uint64(); idx > pool.clearIdx+txPermanent {
        idx2 := idx - txPermanent
        if len(pool.mined) > 0 {
            for i := pool.clearIdx; i < idx2; i++ {
                hash := rawdb.ReadCanonicalHash(pool.chainDb, i)
                if list, ok := pool.mined[hash]; ok {
                    hashes := make([]common.Hash, len(list))
                    for i, tx := range list {
                        hashes[i] = tx.Hash()
                    }
                    pool.relay.Discard(hashes)
                    delete(pool.mined, hash)
                }
            }
        }
        pool.clearIdx = idx2
    }

    return txc, nil
}

// blockCheckTimeout is the time limit for checking new blocks for mined
// transactions. Checking resumes at the next chain head event if timed out.
const blockCheckTimeout = time.Second * 3

// eventLoop processes chain head events and also notifies the tx relay backend
// about the new head hash and tx state changes
func (pool *TxPool) eventLoop() {
    for {
        select {
        case ev := <-pool.chainHeadCh:
            pool.setNewHead(ev.Block.Header())
            // hack in order to avoid hogging the lock; this part will
            // be replaced by a subsequent PR.
            time.Sleep(time.Millisecond)

        // System stopped
        case <-pool.chainHeadSub.Err():
            return
        }
    }
}

func (pool *TxPool) setNewHead(head *types.Header) {
    pool.mu.Lock()
    defer pool.mu.Unlock()

    ctx, cancel := context.WithTimeout(context.Background(), blockCheckTimeout)
    defer cancel()

    txc, _ := pool.reorgOnNewHead(ctx, head)
    m, r := txc.getLists()
    pool.relay.NewHead(pool.head, m, r)
    pool.homestead = pool.config.IsHomestead(head.Number)
    pool.signer = types.MakeSigner(pool.config, head.Number)
}

// Stop stops the light transaction pool
func (pool *TxPool) Stop() {
    // Unsubscribe all subscriptions registered from txpool
    pool.scope.Close()
    // Unsubscribe subscriptions registered from blockchain
    pool.chainHeadSub.Unsubscribe()
    close(pool.quit)
    log.Info("Transaction pool stopped")
}

// SubscribeNewTxsEvent registers a subscription of core.NewTxsEvent and
// starts sending event to the given channel.
func (pool *TxPool) SubscribeNewTxsEvent(ch chan<- core.NewTxsEvent) event.Subscription {
    return pool.scope.Track(pool.txFeed.Subscribe(ch))
}

// Stats returns the number of currently pending (locally created) transactions
func (pool *TxPool) Stats() (pending int) {
    pool.mu.RLock()
    defer pool.mu.RUnlock()

    pending = len(pool.pending)
    return
}

// validateTx checks whether a transaction is valid according to the consensus rules.
func (pool *TxPool) validateTx(ctx context.Context, tx *types.Transaction) error {
    // Validate sender
    var (
        from common.Address
        err  error
    )

    // Validate the transaction sender and it's sig. Throw
    // if the from fields is invalid.
    if from, err = types.Sender(pool.signer, tx); err != nil {
        return core.ErrInvalidSender
    }
    // Last but not least check for nonce errors
    currentState := pool.currentState(ctx)
    if n := currentState.GetNonce(from); n > tx.Nonce() {
        return core.ErrNonceTooLow
    }

    // Check the transaction doesn't exceed the current
    // block limit gas.
    header := pool.chain.GetHeaderByHash(pool.head)
    if header.GasLimit < tx.Gas() {
        return core.ErrGasLimit
    }

    // Transactions can't be negative. This may never happen
    // using RLP decoded transactions but may occur if you create
    // a transaction using the RPC for example.
    if tx.Value().Sign() < 0 {
        return core.ErrNegativeValue
    }

    // Transactor should have enough funds to cover the costs
    // cost == V + GP * GL
    if b := currentState.GetBalance(from); b.Cmp(tx.Cost()) < 0 {
        return core.ErrInsufficientFunds
    }

    // Should supply enough intrinsic gas
    gas, err := core.IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead)
    if err != nil {
        return err
    }
    if tx.Gas() < gas {
        return core.ErrIntrinsicGas
    }
    return currentState.Error()
}

// add validates a new transaction and sets its state pending if processable.
// It also updates the locally stored nonce if necessary.
func (self *TxPool) add(ctx context.Context, tx *types.Transaction) error {
    hash := tx.Hash()

    if self.pending[hash] != nil {
        return fmt.Errorf("Known transaction (%x)", hash[:4])
    }
    err := self.validateTx(ctx, tx)
    if err != nil {
        return err
    }

    if _, ok := self.pending[hash]; !ok {
        self.pending[hash] = tx

        nonce := tx.Nonce() + 1

        addr, _ := types.Sender(self.signer, tx)
        if nonce > self.nonce[addr] {
            self.nonce[addr] = nonce
        }

        // Notify the subscribers. This event is posted in a goroutine
        // because it's possible that somewhere during the post "Remove transaction"
        // gets called which will then wait for the global tx pool lock and deadlock.
        go self.txFeed.Send(core.NewTxsEvent{Txs: types.Transactions{tx}})
    }

    // Print a log message if low enough level is set
    log.Debug("Pooled new transaction", "hash", hash, "from", log.Lazy{Fn: func() common.Address { from, _ := types.Sender(self.signer, tx); return from }}, "to", tx.To())
    return nil
}

// Add adds a transaction to the pool if valid and passes it to the tx relay
// backend
func (self *TxPool) Add(ctx context.Context, tx *types.Transaction) error {
    self.mu.Lock()
    defer self.mu.Unlock()

    data, err := rlp.EncodeToBytes(tx)
    if err != nil {
        return err
    }

    if err := self.add(ctx, tx); err != nil {
        return err
    }
    //fmt.Println("Send", tx.Hash())
    self.relay.Send(types.Transactions{tx})

    self.chainDb.Put(tx.Hash().Bytes(), data)
    return nil
}

// AddTransactions adds all valid transactions to the pool and passes them to
// the tx relay backend
func (self *TxPool) AddBatch(ctx context.Context, txs []*types.Transaction) {
    self.mu.Lock()
    defer self.mu.Unlock()
    var sendTx types.Transactions

    for _, tx := range txs {
        if err := self.add(ctx, tx); err == nil {
            sendTx = append(sendTx, tx)
        }
    }
    if len(sendTx) > 0 {
        self.relay.Send(sendTx)
    }
}

// GetTransaction returns a transaction if it is contained in the pool
// and nil otherwise.
func (tp *TxPool) GetTransaction(hash common.Hash) *types.Transaction {
    // check the txs first
    if tx, ok := tp.pending[hash]; ok {
        return tx
    }
    return nil
}

// GetTransactions returns all currently processable transactions.
// The returned slice may be modified by the caller.
func (self *TxPool) GetTransactions() (txs types.Transactions, err error) {
    self.mu.RLock()
    defer self.mu.RUnlock()

    txs = make(types.Transactions, len(self.pending))
    i := 0
    for _, tx := range self.pending {
        txs[i] = tx
        i++
    }
    return txs, nil
}

// Content retrieves the data content of the transaction pool, returning all the
// pending as well as queued transactions, grouped by account and nonce.
func (self *TxPool) Content() (map[common.Address]types.Transactions, map[common.Address]types.Transactions) {
    self.mu.RLock()
    defer self.mu.RUnlock()

    // Retrieve all the pending transactions and sort by account and by nonce
    pending := make(map[common.Address]types.Transactions)
    for _, tx := range self.pending {
        account, _ := types.Sender(self.signer, tx)
        pending[account] = append(pending[account], tx)
    }
    // There are no queued transactions in a light pool, just return an empty map
    queued := make(map[common.Address]types.Transactions)
    return pending, queued
}

// RemoveTransactions removes all given transactions from the pool.
func (self *TxPool) RemoveTransactions(txs types.Transactions) {
    self.mu.Lock()
    defer self.mu.Unlock()

    var hashes []common.Hash
    batch := self.chainDb.NewBatch()
    for _, tx := range txs {
        hash := tx.Hash()
        delete(self.pending, hash)
        batch.Delete(hash.Bytes())
        hashes = append(hashes, hash)
    }
    batch.Write()
    self.relay.Discard(hashes)
}

// RemoveTx removes the transaction with the given hash from the pool.
func (pool *TxPool) RemoveTx(hash common.Hash) {
    pool.mu.Lock()
    defer pool.mu.Unlock()
    // delete from pending pool
    delete(pool.pending, hash)
    pool.chainDb.Delete(hash[:])
    pool.relay.Discard([]common.Hash{hash})
}