From 09f24f35eff61861c21b854a648c3afec579ff47 Mon Sep 17 00:00:00 2001 From: Wei-Ning Huang Date: Wed, 21 Nov 2018 16:14:51 +0800 Subject: core: add global signature cache and improve concurrency (#42) From the go trace result, the bottleneck hides in the lock of StoreTxCache. To improve this, we update the cache in a batched fassion. --- core/blockchain.go | 2 +- core/tx_pool.go | 2 +- core/types/transaction.go | 36 -------------- core/types/transaction_signing.go | 99 +++++++++++++++++++++++++++++++++------ 4 files changed, 86 insertions(+), 53 deletions(-) (limited to 'core') diff --git a/core/blockchain.go b/core/blockchain.go index 983448de8..e7b1c846f 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -268,7 +268,7 @@ func (bc *BlockChain) AddConfirmedBlock(block *coreTypes.Block) error { if err != nil { return err } - _, err = transactions.TouchSenders(types.MakeSigner(bc.Config(), new(big.Int))) + _, err = types.GlobalSigCache.Add(types.NewEIP155Signer(bc.Config().ChainID), transactions) if err != nil { return err } diff --git a/core/tx_pool.go b/core/tx_pool.go index fc36d50bf..0a87be25b 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -966,7 +966,7 @@ func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { } } - types.DeleteTxCacheByHash(hash) + types.GlobalSigCache.Prune([]common.Hash{hash}) } // promoteExecutables moves transactions that have become processable from the diff --git a/core/types/transaction.go b/core/types/transaction.go index 5e11c0dbc..857ac2137 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -21,8 +21,6 @@ import ( "errors" "io" "math/big" - "runtime" - "sync" "sync/atomic" "github.com/dexon-foundation/dexon/common" @@ -273,40 +271,6 @@ func (s Transactions) GetRlp(i int) []byte { return enc } -// TouchSenders calculates the sender of each transaction and update the cache. -func (s Transactions) TouchSenders(signer Signer) (errorTx *Transaction, err error) { - num := runtime.NumCPU() - batchSize := len(s) / num - wg := sync.WaitGroup{} - wg.Add(num) - txError := make(chan error, 1) - for i := 0; i < num; i++ { - go func(txs Transactions) { - defer wg.Done() - for _, tx := range txs { - if len(txError) > 0 { - return - } - _, err := Sender(signer, tx) - if err != nil { - select { - case txError <- err: - errorTx = tx - default: - } - return - } - } - }(s[i*batchSize : (i+1)*batchSize]) - } - wg.Wait() - select { - case err = <-txError: - default: - } - return -} - // TxDifference returns a new set which is the difference between a and b. func TxDifference(a, b Transactions) Transactions { keep := make(Transactions, 0, len(a)) diff --git a/core/types/transaction_signing.go b/core/types/transaction_signing.go index a6c5c1f16..47c7a2f91 100644 --- a/core/types/transaction_signing.go +++ b/core/types/transaction_signing.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "math/big" + "runtime" "sync" "github.com/dexon-foundation/dexon/common" @@ -32,25 +33,95 @@ var ( ErrInvalidChainId = errors.New("invalid chain id for signer") ) -var ( - txCache = &sync.Map{} -) +var GlobalSigCache *globalSigCache -func DeleteTxCacheByHash(hash common.Hash) { - txCache.Delete(hash) +func init() { + GlobalSigCache = newGlobalSigCache() } -func StoreTxCache(key common.Hash, value common.Address) { - txCache.Store(key, value) +// globalSigCache stores the mapping between txHash and sender address. +// Since ECRecover is slow, and we run ECRecover very frequently (in +// app.VerifyBlock, app.ConfirmedBlock), so we need to cache it globally. +type globalSigCache struct { + cache map[common.Hash]common.Address + cacheMu sync.RWMutex } -func LoadTxCache(key common.Hash) (common.Address, bool) { - addr, ok := txCache.Load(key) - if !ok { - return common.Address{}, ok +func newGlobalSigCache() *globalSigCache { + return &globalSigCache{ + cache: make(map[common.Hash]common.Address), } +} + +type resultEntry struct { + Hash common.Hash + Addr common.Address +} - return addr.(common.Address), ok +// Add adds a list of transactions into sig cache. +func (c *globalSigCache) Add(signer Signer, txs Transactions) (errorTx *Transaction, err error) { + num := runtime.NumCPU() + batchSize := len(txs) / num + wg := sync.WaitGroup{} + wg.Add(num) + txError := make(chan error, 1) + + for i := 0; i < num; i++ { + go func(txs Transactions) { + defer wg.Done() + results := make([]resultEntry, len(txs)) + for i, tx := range txs { + if len(txError) > 0 { + return + } + addr, err := Sender(signer, tx) + if err != nil { + select { + case txError <- err: + errorTx = tx + default: + } + return + } + results[i] = resultEntry{ + Hash: tx.Hash(), + Addr: addr, + } + } + // Acquire lock and set cache. + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + for _, r := range results { + c.cache[r.Hash] = r.Addr + } + }(txs[i*batchSize : (i+1)*batchSize]) + } + wg.Wait() + + select { + case err = <-txError: + default: + } + return +} + +// Prune removes a list of hashes of tx from the cache. +func (c *globalSigCache) Prune(hashes []common.Hash) { + c.cacheMu.Lock() + defer c.cacheMu.Unlock() + + for _, hash := range hashes { + delete(c.cache, hash) + } +} + +// Get returns a single address given a tx hash. +func (c *globalSigCache) Get(hash common.Hash) (common.Address, bool) { + c.cacheMu.RLock() + defer c.cacheMu.RUnlock() + + res, ok := c.cache[hash] + return res, ok } // sigCache is used to cache the derived sender and contains @@ -147,7 +218,7 @@ func (s EIP155Signer) Equal(s2 Signer) bool { var big8 = big.NewInt(8) func (s EIP155Signer) Sender(tx *Transaction) (common.Address, error) { - addr, ok := LoadTxCache(tx.Hash()) + addr, ok := GlobalSigCache.Get(tx.Hash()) if ok { return addr, nil } @@ -165,8 +236,6 @@ func (s EIP155Signer) Sender(tx *Transaction) (common.Address, error) { if err != nil { return common.Address{}, err } - - StoreTxCache(tx.Hash(), addr) return addr, nil } -- cgit v1.2.3