diff options
-rw-r--r-- | core/blockchain.go | 55 | ||||
-rw-r--r-- | core/state/state_object.go | 4 | ||||
-rw-r--r-- | core/state/statedb.go | 11 | ||||
-rw-r--r-- | core/tx_pool.go | 65 | ||||
-rw-r--r-- | core/tx_pool_test.go | 90 | ||||
-rw-r--r-- | core/types/block.go | 2 | ||||
-rw-r--r-- | core/types/json_test.go | 8 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 2 | ||||
-rw-r--r-- | eth/handler.go | 2 | ||||
-rw-r--r-- | internal/debug/flags.go | 9 | ||||
-rw-r--r-- | internal/ethapi/api.go | 2 | ||||
-rw-r--r-- | light/trie.go | 6 | ||||
-rw-r--r-- | metrics/metrics.go | 2 | ||||
-rw-r--r-- | swarm/storage/chunker.go | 58 | ||||
-rw-r--r-- | swarm/storage/chunker_test.go | 79 | ||||
-rw-r--r-- | swarm/storage/common_test.go | 42 | ||||
-rw-r--r-- | swarm/storage/pyramid.go | 16 | ||||
-rw-r--r-- | trie/hasher.go | 78 | ||||
-rw-r--r-- | trie/iterator.go | 12 | ||||
-rw-r--r-- | trie/node.go | 63 | ||||
-rw-r--r-- | trie/node_test.go | 58 | ||||
-rw-r--r-- | trie/proof.go | 10 | ||||
-rw-r--r-- | trie/secure_trie.go | 13 | ||||
-rw-r--r-- | trie/secure_trie_test.go | 4 | ||||
-rw-r--r-- | trie/sync.go | 4 | ||||
-rw-r--r-- | trie/trie.go | 114 | ||||
-rw-r--r-- | trie/trie_test.go | 10 |
27 files changed, 550 insertions, 269 deletions
diff --git a/core/blockchain.go b/core/blockchain.go index 1fbcdfc6f..7657fce78 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -269,7 +269,7 @@ func (self *BlockChain) FastSyncCommitHead(hash common.Hash) error { if block == nil { return fmt.Errorf("non existent block [%x…]", hash[:4]) } - if _, err := trie.NewSecure(block.Root(), self.chainDb); err != nil { + if _, err := trie.NewSecure(block.Root(), self.chainDb, 0); err != nil { return err } // If all checks out, manually set the head block @@ -834,19 +834,16 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { // faster than direct delivery and requires much less mutex // acquiring. var ( - stats struct{ queued, processed, ignored int } + stats = insertStats{startTime: time.Now()} events = make([]interface{}, 0, len(chain)) coalescedLogs vm.Logs - tstart = time.Now() - - nonceChecked = make([]bool, len(chain)) + nonceChecked = make([]bool, len(chain)) ) // Start the parallel nonce verifier. nonceAbort, nonceResults := verifyNoncesFromBlocks(self.pow, chain) defer close(nonceAbort) - txcount := 0 for i, block := range chain { if atomic.LoadInt32(&self.procInterrupt) == 1 { glog.V(logger.Debug).Infoln("Premature abort during block chain processing") @@ -941,7 +938,6 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { return i, err } - txcount += len(block.Transactions()) // write the block to the chain and get the status status, err := self.WriteBlock(block) if err != nil { @@ -976,19 +972,54 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) { case SplitStatTy: events = append(events, ChainSplitEvent{block, logs}) } + stats.processed++ + if glog.V(logger.Info) { + stats.report(chain, i) + } } - if (stats.queued > 0 || stats.processed > 0 || stats.ignored > 0) && bool(glog.V(logger.Info)) { - tend := time.Since(tstart) - start, end := chain[0], chain[len(chain)-1] - glog.Infof("imported %d block(s) (%d queued %d ignored) including %d txs in %v. #%v [%x / %x]\n", stats.processed, stats.queued, stats.ignored, txcount, tend, end.Number(), start.Hash().Bytes()[:4], end.Hash().Bytes()[:4]) - } go self.postChainEvents(events, coalescedLogs) return 0, nil } +// insertStats tracks and reports on block insertion. +type insertStats struct { + queued, processed, ignored int + lastIndex int + startTime time.Time +} + +const ( + statsReportLimit = 1024 + statsReportTimeLimit = 8 * time.Second +) + +// report prints statistics if some number of blocks have been processed +// or more than a few seconds have passed since the last message. +func (st *insertStats) report(chain []*types.Block, index int) { + limit := statsReportLimit + if index == len(chain)-1 { + limit = 0 // Always print a message for the last block. + } + now := time.Now() + duration := now.Sub(st.startTime) + if duration > statsReportTimeLimit || st.queued > limit || st.processed > limit || st.ignored > limit { + start, end := chain[st.lastIndex], chain[index] + txcount := countTransactions(chain[st.lastIndex : index+1]) + glog.Infof("imported %d block(s) (%d queued %d ignored) including %d txs in %v. #%v [%x / %x]\n", st.processed, st.queued, st.ignored, txcount, duration, end.Number(), start.Hash().Bytes()[:4], end.Hash().Bytes()[:4]) + *st = insertStats{startTime: now, lastIndex: index} + } +} + +func countTransactions(chain []*types.Block) (c int) { + for _, b := range chain { + c += len(b.Transactions()) + } + return c +} + // reorgs takes two blocks, an old chain and a new chain and will reconstruct the blocks and inserts them // to be part of the new canonical chain and accumulates potential missing transactions and post an // event about them diff --git a/core/state/state_object.go b/core/state/state_object.go index 6eab27d9e..edb073173 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -137,9 +137,9 @@ func (self *StateObject) markSuicided() { func (c *StateObject) getTrie(db trie.Database) *trie.SecureTrie { if c.trie == nil { var err error - c.trie, err = trie.NewSecure(c.data.Root, db) + c.trie, err = trie.NewSecure(c.data.Root, db, 0) if err != nil { - c.trie, _ = trie.NewSecure(common.Hash{}, db) + c.trie, _ = trie.NewSecure(common.Hash{}, db, 0) c.setError(fmt.Errorf("can't create storage trie: %v", err)) } } diff --git a/core/state/statedb.go b/core/state/statedb.go index ec9e9392f..dcb897628 100644 --- a/core/state/statedb.go +++ b/core/state/statedb.go @@ -41,7 +41,10 @@ var StartingNonce uint64 const ( // Number of past tries to keep. The arbitrarily chosen value here // is max uncle depth + 1. - maxTrieCacheLength = 8 + maxPastTries = 8 + + // Trie cache generation limit. + maxTrieCacheGen = 100 // Number of codehash->size associations to keep. codeSizeCacheSize = 100000 @@ -86,7 +89,7 @@ type StateDB struct { // Create a new state from a given trie func New(root common.Hash, db ethdb.Database) (*StateDB, error) { - tr, err := trie.NewSecure(root, db) + tr, err := trie.NewSecure(root, db, maxTrieCacheGen) if err != nil { return nil, err } @@ -155,14 +158,14 @@ func (self *StateDB) openTrie(root common.Hash) (*trie.SecureTrie, error) { return &tr, nil } } - return trie.NewSecure(root, self.db) + return trie.NewSecure(root, self.db, maxTrieCacheGen) } func (self *StateDB) pushTrie(t *trie.SecureTrie) { self.lock.Lock() defer self.lock.Unlock() - if len(self.pastTries) >= maxTrieCacheLength { + if len(self.pastTries) >= maxPastTries { copy(self.pastTries, self.pastTries[1:]) self.pastTries[len(self.pastTries)-1] = t } else { diff --git a/core/tx_pool.go b/core/tx_pool.go index 10a110e0b..2c8a5c396 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -30,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" + "gopkg.in/karalabe/cookiejar.v2/collections/prque" ) var ( @@ -46,10 +47,12 @@ var ( ) var ( - maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address - maxQueuedInTotal = uint64(65536) // Max limit of queued transactions from all accounts - maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued - evictionInterval = time.Minute // Time interval to check for evictable transactions + minPendingPerAccount = uint64(16) // Min number of guaranteed transaction slots per address + maxPendingTotal = uint64(4096) // Max limit of pending transactions from all accounts (soft) + maxQueuedPerAccount = uint64(64) // Max limit of queued transactions per address + maxQueuedInTotal = uint64(1024) // Max limit of queued transactions from all accounts + maxQueuedLifetime = 3 * time.Hour // Max amount of time transactions from idle accounts are queued + evictionInterval = time.Minute // Time interval to check for evictable transactions ) type stateFn func() (*state.StateDB, error) @@ -481,7 +484,6 @@ func (pool *TxPool) promoteExecutables() { } // Iterate over all accounts and promote any executable transactions queued := uint64(0) - for addr, list := range pool.queue { // Drop all transactions that are deemed too old (low nonce) for _, tx := range list.Forward(state.GetNonce(addr)) { @@ -519,6 +521,59 @@ func (pool *TxPool) promoteExecutables() { delete(pool.queue, addr) } } + // If the pending limit is overflown, start equalizing allowances + pending := uint64(0) + for _, list := range pool.pending { + pending += uint64(list.Len()) + } + if pending > maxPendingTotal { + // Assemble a spam order to penalize large transactors first + spammers := prque.New() + for addr, list := range pool.pending { + // Only evict transactions from high rollers + if uint64(list.Len()) > minPendingPerAccount { + // Skip local accounts as pools should maintain backlogs for themselves + for _, tx := range list.txs.items { + if !pool.localTx.contains(tx.Hash()) { + spammers.Push(addr, float32(list.Len())) + } + break // Checking on transaction for locality is enough + } + } + } + // Gradually drop transactions from offenders + offenders := []common.Address{} + for pending > maxPendingTotal && !spammers.Empty() { + // Retrieve the next offender if not local address + offender, _ := spammers.Pop() + offenders = append(offenders, offender.(common.Address)) + + // Equalize balances until all the same or below threshold + if len(offenders) > 1 { + // Calculate the equalization threshold for all current offenders + threshold := pool.pending[offender.(common.Address)].Len() + + // Iteratively reduce all offenders until below limit or threshold reached + for pending > maxPendingTotal && pool.pending[offenders[len(offenders)-2]].Len() > threshold { + for i := 0; i < len(offenders)-1; i++ { + list := pool.pending[offenders[i]] + list.Cap(list.Len() - 1) + pending-- + } + } + } + } + // If still above threshold, reduce to limit or min allowance + if pending > maxPendingTotal && len(offenders) > 0 { + for pending > maxPendingTotal && uint64(pool.pending[offenders[len(offenders)-1]].Len()) > minPendingPerAccount { + for _, addr := range offenders { + list := pool.pending[addr] + list.Cap(list.Len() - 1) + pending-- + } + } + } + } // If we've queued more transactions than the hard limit, drop oldest ones if queued > maxQueuedInTotal { // Sort all accounts with queued transactions by heartbeat diff --git a/core/tx_pool_test.go b/core/tx_pool_test.go index 4bc5aed38..dbe6fa635 100644 --- a/core/tx_pool_test.go +++ b/core/tx_pool_test.go @@ -618,6 +618,96 @@ func testTransactionLimitingEquivalency(t *testing.T, origin uint64) { } } +// Tests that if the transaction count belonging to multiple accounts go above +// some hard threshold, the higher transactions are dropped to prevent DOS +// attacks. +func TestTransactionPendingGlobalLimiting(t *testing.T) { + // Reduce the queue limits to shorten test time + defer func(old uint64) { maxPendingTotal = old }(maxPendingTotal) + maxPendingTotal = minPendingPerAccount * 10 + + // Create the pool to test the limit enforcement with + db, _ := ethdb.NewMemDatabase() + statedb, _ := state.New(common.Hash{}, db) + + pool := NewTxPool(testChainConfig(), new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool.resetState() + + // Create a number of test accounts and fund them + state, _ := pool.currentState() + + keys := make([]*ecdsa.PrivateKey, 5) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + } + // Generate and queue a batch of transactions + nonces := make(map[common.Address]uint64) + + txs := types.Transactions{} + for _, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) + for j := 0; j < int(maxPendingTotal)/len(keys)*2; j++ { + txs = append(txs, transaction(nonces[addr], big.NewInt(100000), key)) + nonces[addr]++ + } + } + // Import the batch and verify that limits have been enforced + pool.AddBatch(txs) + + pending := 0 + for _, list := range pool.pending { + pending += list.Len() + } + if pending > int(maxPendingTotal) { + t.Fatalf("total pending transactions overflow allowance: %d > %d", pending, maxPendingTotal) + } +} + +// Tests that if the transaction count belonging to multiple accounts go above +// some hard threshold, if they are under the minimum guaranteed slot count then +// the transactions are still kept. +func TestTransactionPendingMinimumAllowance(t *testing.T) { + // Reduce the queue limits to shorten test time + defer func(old uint64) { maxPendingTotal = old }(maxPendingTotal) + maxPendingTotal = 0 + + // Create the pool to test the limit enforcement with + db, _ := ethdb.NewMemDatabase() + statedb, _ := state.New(common.Hash{}, db) + + pool := NewTxPool(testChainConfig(), new(event.TypeMux), func() (*state.StateDB, error) { return statedb, nil }, func() *big.Int { return big.NewInt(1000000) }) + pool.resetState() + + // Create a number of test accounts and fund them + state, _ := pool.currentState() + + keys := make([]*ecdsa.PrivateKey, 5) + for i := 0; i < len(keys); i++ { + keys[i], _ = crypto.GenerateKey() + state.AddBalance(crypto.PubkeyToAddress(keys[i].PublicKey), big.NewInt(1000000)) + } + // Generate and queue a batch of transactions + nonces := make(map[common.Address]uint64) + + txs := types.Transactions{} + for _, key := range keys { + addr := crypto.PubkeyToAddress(key.PublicKey) + for j := 0; j < int(minPendingPerAccount)*2; j++ { + txs = append(txs, transaction(nonces[addr], big.NewInt(100000), key)) + nonces[addr]++ + } + } + // Import the batch and verify that limits have been enforced + pool.AddBatch(txs) + + for addr, list := range pool.pending { + if list.Len() != int(minPendingPerAccount) { + t.Errorf("addr %x: total pending transactions mismatch: have %d, want %d", addr, list.Len(), minPendingPerAccount) + } + } +} + // Benchmarks the speed of validating the contents of the pending queue of the // transaction pool. func BenchmarkPendingDemotion100(b *testing.B) { benchmarkPendingDemotion(b, 100) } diff --git a/core/types/block.go b/core/types/block.go index 559fbdd20..fedcfdbbe 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -104,7 +104,7 @@ type jsonHeader struct { Coinbase *common.Address `json:"miner"` Root *common.Hash `json:"stateRoot"` TxHash *common.Hash `json:"transactionsRoot"` - ReceiptHash *common.Hash `json:"receiptRoot"` + ReceiptHash *common.Hash `json:"receiptsRoot"` Bloom *Bloom `json:"logsBloom"` Difficulty *hexBig `json:"difficulty"` Number *hexBig `json:"number"` diff --git a/core/types/json_test.go b/core/types/json_test.go index 605c2b564..e17424c82 100644 --- a/core/types/json_test.go +++ b/core/types/json_test.go @@ -14,19 +14,19 @@ var unmarshalHeaderTests = map[string]struct { wantError error }{ "block 0x1e2200": { - input: `{"difficulty":"0x311ca98cebfe","extraData":"0x7777772e62772e636f6d","gasLimit":"0x47db3d","gasUsed":"0x43760c","hash":"0x3724bc6b9dcd4a2b3a26e0ed9b821e7380b5b3d7dec7166c7983cead62a37e48","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0xbcdfc35b86bedf72f0cda046a3c16829a2ef41d1","mixHash":"0x1ccfddb506dac5afc09b6f92eb09a043ffc8e08f7592250af57b9c64c20f9b25","nonce":"0x670bd98c79585197","number":"0x1e2200","parentHash":"0xd3e13296d064e7344f20c57c57b67a022f6bf7741fa42428c2db77e91abdf1f8","receiptRoot":"0xeeab1776c1fafbe853a8ee0c1bafe2e775a1b6fdb6ff3e9f9410ddd4514889ff","sha3Uncles":"0x5fbfa4ec8b089678c53b6798cc0d9260ea40a529e06d5300aae35596262e0eb3","size":"0x57f","stateRoot":"0x62ad2007e4a3f31ea98e5d2fd150d894887bafde36eeac7331a60ae12053ec76","timestamp":"0x579b82f2","totalDifficulty":"0x24fe813c101d00f97","transactions":["0xb293408e85735bfc78b35aa89de8b48e49641e3d82e3d52ea2d44ec42a4e88cf","0x124acc383ff2da6faa0357829084dae64945221af6f6f09da1d11688b779f939","0xee090208b6051c442ccdf9ec19f66389e604d342a6d71144c7227ce995bef46f"],"transactionsRoot":"0xce0042dd9af0c1923dd7f58ca6faa156d39d4ef39fdb65c5bcd1d4b4720096db","uncles":["0x6818a31d1f204cf640c952082940b68b8db6d1b39ee71f7efe0e3629ed5d7eb3"]}`, + input: `{"difficulty":"0x311ca98cebfe","extraData":"0x7777772e62772e636f6d","gasLimit":"0x47db3d","gasUsed":"0x43760c","hash":"0x3724bc6b9dcd4a2b3a26e0ed9b821e7380b5b3d7dec7166c7983cead62a37e48","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0xbcdfc35b86bedf72f0cda046a3c16829a2ef41d1","mixHash":"0x1ccfddb506dac5afc09b6f92eb09a043ffc8e08f7592250af57b9c64c20f9b25","nonce":"0x670bd98c79585197","number":"0x1e2200","parentHash":"0xd3e13296d064e7344f20c57c57b67a022f6bf7741fa42428c2db77e91abdf1f8","receiptsRoot":"0xeeab1776c1fafbe853a8ee0c1bafe2e775a1b6fdb6ff3e9f9410ddd4514889ff","sha3Uncles":"0x5fbfa4ec8b089678c53b6798cc0d9260ea40a529e06d5300aae35596262e0eb3","size":"0x57f","stateRoot":"0x62ad2007e4a3f31ea98e5d2fd150d894887bafde36eeac7331a60ae12053ec76","timestamp":"0x579b82f2","totalDifficulty":"0x24fe813c101d00f97","transactions":["0xb293408e85735bfc78b35aa89de8b48e49641e3d82e3d52ea2d44ec42a4e88cf","0x124acc383ff2da6faa0357829084dae64945221af6f6f09da1d11688b779f939","0xee090208b6051c442ccdf9ec19f66389e604d342a6d71144c7227ce995bef46f"],"transactionsRoot":"0xce0042dd9af0c1923dd7f58ca6faa156d39d4ef39fdb65c5bcd1d4b4720096db","uncles":["0x6818a31d1f204cf640c952082940b68b8db6d1b39ee71f7efe0e3629ed5d7eb3"]}`, wantHash: common.HexToHash("0x3724bc6b9dcd4a2b3a26e0ed9b821e7380b5b3d7dec7166c7983cead62a37e48"), }, "bad nonce": { - input: `{"difficulty":"0x311ca98cebfe","extraData":"0x7777772e62772e636f6d","gasLimit":"0x47db3d","gasUsed":"0x43760c","hash":"0x3724bc6b9dcd4a2b3a26e0ed9b821e7380b5b3d7dec7166c7983cead62a37e48","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0xbcdfc35b86bedf72f0cda046a3c16829a2ef41d1","mixHash":"0x1ccfddb506dac5afc09b6f92eb09a043ffc8e08f7592250af57b9c64c20f9b25","nonce":"0x670bd98c7958","number":"0x1e2200","parentHash":"0xd3e13296d064e7344f20c57c57b67a022f6bf7741fa42428c2db77e91abdf1f8","receiptRoot":"0xeeab1776c1fafbe853a8ee0c1bafe2e775a1b6fdb6ff3e9f9410ddd4514889ff","sha3Uncles":"0x5fbfa4ec8b089678c53b6798cc0d9260ea40a529e06d5300aae35596262e0eb3","size":"0x57f","stateRoot":"0x62ad2007e4a3f31ea98e5d2fd150d894887bafde36eeac7331a60ae12053ec76","timestamp":"0x579b82f2","totalDifficulty":"0x24fe813c101d00f97","transactions":["0xb293408e85735bfc78b35aa89de8b48e49641e3d82e3d52ea2d44ec42a4e88cf","0x124acc383ff2da6faa0357829084dae64945221af6f6f09da1d11688b779f939","0xee090208b6051c442ccdf9ec19f66389e604d342a6d71144c7227ce995bef46f"],"transactionsRoot":"0xce0042dd9af0c1923dd7f58ca6faa156d39d4ef39fdb65c5bcd1d4b4720096db","uncles":["0x6818a31d1f204cf640c952082940b68b8db6d1b39ee71f7efe0e3629ed5d7eb3"]}`, + input: `{"difficulty":"0x311ca98cebfe","extraData":"0x7777772e62772e636f6d","gasLimit":"0x47db3d","gasUsed":"0x43760c","hash":"0x3724bc6b9dcd4a2b3a26e0ed9b821e7380b5b3d7dec7166c7983cead62a37e48","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0xbcdfc35b86bedf72f0cda046a3c16829a2ef41d1","mixHash":"0x1ccfddb506dac5afc09b6f92eb09a043ffc8e08f7592250af57b9c64c20f9b25","nonce":"0x670bd98c7958","number":"0x1e2200","parentHash":"0xd3e13296d064e7344f20c57c57b67a022f6bf7741fa42428c2db77e91abdf1f8","receiptsRoot":"0xeeab1776c1fafbe853a8ee0c1bafe2e775a1b6fdb6ff3e9f9410ddd4514889ff","sha3Uncles":"0x5fbfa4ec8b089678c53b6798cc0d9260ea40a529e06d5300aae35596262e0eb3","size":"0x57f","stateRoot":"0x62ad2007e4a3f31ea98e5d2fd150d894887bafde36eeac7331a60ae12053ec76","timestamp":"0x579b82f2","totalDifficulty":"0x24fe813c101d00f97","transactions":["0xb293408e85735bfc78b35aa89de8b48e49641e3d82e3d52ea2d44ec42a4e88cf","0x124acc383ff2da6faa0357829084dae64945221af6f6f09da1d11688b779f939","0xee090208b6051c442ccdf9ec19f66389e604d342a6d71144c7227ce995bef46f"],"transactionsRoot":"0xce0042dd9af0c1923dd7f58ca6faa156d39d4ef39fdb65c5bcd1d4b4720096db","uncles":["0x6818a31d1f204cf640c952082940b68b8db6d1b39ee71f7efe0e3629ed5d7eb3"]}`, wantError: errBadNonceSize, }, "missing mixHash": { - input: `{"difficulty":"0x311ca98cebfe","extraData":"0x7777772e62772e636f6d","gasLimit":"0x47db3d","gasUsed":"0x43760c","hash":"0x3724bc6b9dcd4a2b3a26e0ed9b821e7380b5b3d7dec7166c7983cead62a37e48","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0xbcdfc35b86bedf72f0cda046a3c16829a2ef41d1","nonce":"0x670bd98c79585197","number":"0x1e2200","parentHash":"0xd3e13296d064e7344f20c57c57b67a022f6bf7741fa42428c2db77e91abdf1f8","receiptRoot":"0xeeab1776c1fafbe853a8ee0c1bafe2e775a1b6fdb6ff3e9f9410ddd4514889ff","sha3Uncles":"0x5fbfa4ec8b089678c53b6798cc0d9260ea40a529e06d5300aae35596262e0eb3","size":"0x57f","stateRoot":"0x62ad2007e4a3f31ea98e5d2fd150d894887bafde36eeac7331a60ae12053ec76","timestamp":"0x579b82f2","totalDifficulty":"0x24fe813c101d00f97","transactions":["0xb293408e85735bfc78b35aa89de8b48e49641e3d82e3d52ea2d44ec42a4e88cf","0x124acc383ff2da6faa0357829084dae64945221af6f6f09da1d11688b779f939","0xee090208b6051c442ccdf9ec19f66389e604d342a6d71144c7227ce995bef46f"],"transactionsRoot":"0xce0042dd9af0c1923dd7f58ca6faa156d39d4ef39fdb65c5bcd1d4b4720096db","uncles":["0x6818a31d1f204cf640c952082940b68b8db6d1b39ee71f7efe0e3629ed5d7eb3"]}`, + input: `{"difficulty":"0x311ca98cebfe","extraData":"0x7777772e62772e636f6d","gasLimit":"0x47db3d","gasUsed":"0x43760c","hash":"0x3724bc6b9dcd4a2b3a26e0ed9b821e7380b5b3d7dec7166c7983cead62a37e48","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0xbcdfc35b86bedf72f0cda046a3c16829a2ef41d1","nonce":"0x670bd98c79585197","number":"0x1e2200","parentHash":"0xd3e13296d064e7344f20c57c57b67a022f6bf7741fa42428c2db77e91abdf1f8","receiptsRoot":"0xeeab1776c1fafbe853a8ee0c1bafe2e775a1b6fdb6ff3e9f9410ddd4514889ff","sha3Uncles":"0x5fbfa4ec8b089678c53b6798cc0d9260ea40a529e06d5300aae35596262e0eb3","size":"0x57f","stateRoot":"0x62ad2007e4a3f31ea98e5d2fd150d894887bafde36eeac7331a60ae12053ec76","timestamp":"0x579b82f2","totalDifficulty":"0x24fe813c101d00f97","transactions":["0xb293408e85735bfc78b35aa89de8b48e49641e3d82e3d52ea2d44ec42a4e88cf","0x124acc383ff2da6faa0357829084dae64945221af6f6f09da1d11688b779f939","0xee090208b6051c442ccdf9ec19f66389e604d342a6d71144c7227ce995bef46f"],"transactionsRoot":"0xce0042dd9af0c1923dd7f58ca6faa156d39d4ef39fdb65c5bcd1d4b4720096db","uncles":["0x6818a31d1f204cf640c952082940b68b8db6d1b39ee71f7efe0e3629ed5d7eb3"]}`, wantError: errMissingHeaderMixDigest, }, "missing fields": { - input: `{"gasLimit":"0x47db3d","gasUsed":"0x43760c","hash":"0x3724bc6b9dcd4a2b3a26e0ed9b821e7380b5b3d7dec7166c7983cead62a37e48","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0xbcdfc35b86bedf72f0cda046a3c16829a2ef41d1","mixHash":"0x1ccfddb506dac5afc09b6f92eb09a043ffc8e08f7592250af57b9c64c20f9b25","nonce":"0x670bd98c79585197","number":"0x1e2200","parentHash":"0xd3e13296d064e7344f20c57c57b67a022f6bf7741fa42428c2db77e91abdf1f8","receiptRoot":"0xeeab1776c1fafbe853a8ee0c1bafe2e775a1b6fdb6ff3e9f9410ddd4514889ff","sha3Uncles":"0x5fbfa4ec8b089678c53b6798cc0d9260ea40a529e06d5300aae35596262e0eb3","size":"0x57f","stateRoot":"0x62ad2007e4a3f31ea98e5d2fd150d894887bafde36eeac7331a60ae12053ec76","timestamp":"0x579b82f2","totalDifficulty":"0x24fe813c101d00f97","transactions":["0xb293408e85735bfc78b35aa89de8b48e49641e3d82e3d52ea2d44ec42a4e88cf","0x124acc383ff2da6faa0357829084dae64945221af6f6f09da1d11688b779f939","0xee090208b6051c442ccdf9ec19f66389e604d342a6d71144c7227ce995bef46f"],"transactionsRoot":"0xce0042dd9af0c1923dd7f58ca6faa156d39d4ef39fdb65c5bcd1d4b4720096db","uncles":["0x6818a31d1f204cf640c952082940b68b8db6d1b39ee71f7efe0e3629ed5d7eb3"]}`, + input: `{"gasLimit":"0x47db3d","gasUsed":"0x43760c","hash":"0x3724bc6b9dcd4a2b3a26e0ed9b821e7380b5b3d7dec7166c7983cead62a37e48","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0xbcdfc35b86bedf72f0cda046a3c16829a2ef41d1","mixHash":"0x1ccfddb506dac5afc09b6f92eb09a043ffc8e08f7592250af57b9c64c20f9b25","nonce":"0x670bd98c79585197","number":"0x1e2200","parentHash":"0xd3e13296d064e7344f20c57c57b67a022f6bf7741fa42428c2db77e91abdf1f8","receiptsRoot":"0xeeab1776c1fafbe853a8ee0c1bafe2e775a1b6fdb6ff3e9f9410ddd4514889ff","sha3Uncles":"0x5fbfa4ec8b089678c53b6798cc0d9260ea40a529e06d5300aae35596262e0eb3","size":"0x57f","stateRoot":"0x62ad2007e4a3f31ea98e5d2fd150d894887bafde36eeac7331a60ae12053ec76","timestamp":"0x579b82f2","totalDifficulty":"0x24fe813c101d00f97","transactions":["0xb293408e85735bfc78b35aa89de8b48e49641e3d82e3d52ea2d44ec42a4e88cf","0x124acc383ff2da6faa0357829084dae64945221af6f6f09da1d11688b779f939","0xee090208b6051c442ccdf9ec19f66389e604d342a6d71144c7227ce995bef46f"],"transactionsRoot":"0xce0042dd9af0c1923dd7f58ca6faa156d39d4ef39fdb65c5bcd1d4b4720096db","uncles":["0x6818a31d1f204cf640c952082940b68b8db6d1b39ee71f7efe0e3629ed5d7eb3"]}`, wantError: errMissingHeaderFields, }, } diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index ae9a85ae1..366c248bb 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -286,7 +286,7 @@ func (dl *downloadTester) headFastBlock() *types.Block { func (dl *downloadTester) commitHeadBlock(hash common.Hash) error { // For now only check that the state trie is correct if block := dl.getBlock(hash); block != nil { - _, err := trie.NewSecure(block.Root(), dl.stateDb) + _, err := trie.NewSecure(block.Root(), dl.stateDb, 0) return err } return fmt.Errorf("non existent block: %x", hash[:4]) diff --git a/eth/handler.go b/eth/handler.go index d72185dd3..e478990f7 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -288,7 +288,7 @@ func (pm *ProtocolManager) handle(p *peer) error { } // Start a timer to disconnect if the peer doesn't reply in time p.forkDrop = time.AfterFunc(daoChallengeTimeout, func() { - glog.V(logger.Warn).Infof("%v: timed out DAO fork-check, dropping", p) + glog.V(logger.Debug).Infof("%v: timed out DAO fork-check, dropping", p) pm.removePeer(p.id) }) // Make sure it's cleaned up if the peer dies off diff --git a/internal/debug/flags.go b/internal/debug/flags.go index 9fc5fc4fe..ed17f87c4 100644 --- a/internal/debug/flags.go +++ b/internal/debug/flags.go @@ -52,6 +52,11 @@ var ( Usage: "pprof HTTP server listening port", Value: 6060, } + pprofAddrFlag = cli.StringFlag{ + Name: "pprofaddr", + Usage: "pprof HTTP server listening interface", + Value: "127.0.0.1", + } memprofilerateFlag = cli.IntFlag{ Name: "memprofilerate", Usage: "Turn on memory profiling with the given rate", @@ -74,7 +79,7 @@ var ( // Flags holds all command-line flags required for debugging. var Flags = []cli.Flag{ verbosityFlag, vmoduleFlag, backtraceAtFlag, - pprofFlag, pprofPortFlag, + pprofFlag, pprofAddrFlag, pprofPortFlag, memprofilerateFlag, blockprofilerateFlag, cpuprofileFlag, traceFlag, } @@ -101,7 +106,7 @@ func Setup(ctx *cli.Context) error { // pprof server if ctx.GlobalBool(pprofFlag.Name) { - address := fmt.Sprintf("127.0.0.1:%d", ctx.GlobalInt(pprofPortFlag.Name)) + address := fmt.Sprintf("%s:%d", ctx.GlobalString(pprofAddrFlag.Name), ctx.GlobalInt(pprofPortFlag.Name)) go func() { glog.V(logger.Info).Infof("starting pprof server at http://%s/debug/pprof", address) glog.Errorln(http.ListenAndServe(address, nil)) diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 333c39965..53ea8d186 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -597,7 +597,7 @@ func (s *PublicBlockChainAPI) rpcOutputBlock(b *types.Block, inclTx bool, fullTx "gasUsed": rpc.NewHexNumber(head.GasUsed), "timestamp": rpc.NewHexNumber(head.Time), "transactionsRoot": head.TxHash, - "receiptRoot": head.ReceiptHash, + "receiptsRoot": head.ReceiptHash, } if inclTx { diff --git a/light/trie.go b/light/trie.go index e9c96ea48..42a943d50 100644 --- a/light/trie.go +++ b/light/trie.go @@ -79,7 +79,7 @@ func (t *LightTrie) do(ctx context.Context, fallbackKey []byte, fn func() error) func (t *LightTrie) Get(ctx context.Context, key []byte) (res []byte, err error) { err = t.do(ctx, key, func() (err error) { if t.trie == nil { - t.trie, err = trie.NewSecure(t.originalRoot, t.db) + t.trie, err = trie.NewSecure(t.originalRoot, t.db, 0) } if err == nil { res, err = t.trie.TryGet(key) @@ -98,7 +98,7 @@ func (t *LightTrie) Get(ctx context.Context, key []byte) (res []byte, err error) func (t *LightTrie) Update(ctx context.Context, key, value []byte) (err error) { err = t.do(ctx, key, func() (err error) { if t.trie == nil { - t.trie, err = trie.NewSecure(t.originalRoot, t.db) + t.trie, err = trie.NewSecure(t.originalRoot, t.db, 0) } if err == nil { err = t.trie.TryUpdate(key, value) @@ -112,7 +112,7 @@ func (t *LightTrie) Update(ctx context.Context, key, value []byte) (err error) { func (t *LightTrie) Delete(ctx context.Context, key []byte) (err error) { err = t.do(ctx, key, func() (err error) { if t.trie == nil { - t.trie, err = trie.NewSecure(t.originalRoot, t.db) + t.trie, err = trie.NewSecure(t.originalRoot, t.db, 0) } if err == nil { err = t.trie.TryDelete(key) diff --git a/metrics/metrics.go b/metrics/metrics.go index fcf8b5c32..7f647cd00 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -26,6 +26,7 @@ import ( "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/rcrowley/go-metrics" + "github.com/rcrowley/go-metrics/exp" ) // MetricsEnabledFlag is the CLI flag name to use to enable metrics collections. @@ -44,6 +45,7 @@ func init() { Enabled = true } } + exp.Exp(metrics.DefaultRegistry) } // NewMeter create a new metrics Meter, either a real one of a NOP stub depending diff --git a/swarm/storage/chunker.go b/swarm/storage/chunker.go index 4c8551da9..c0f950de5 100644 --- a/swarm/storage/chunker.go +++ b/swarm/storage/chunker.go @@ -23,8 +23,6 @@ import ( "hash" "io" "sync" - // "github.com/ethereum/go-ethereum/logger" - // "github.com/ethereum/go-ethereum/logger/glog" ) /* @@ -124,12 +122,13 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s jobC := make(chan *hashJob, 2*processors) wg := &sync.WaitGroup{} errC := make(chan error) + quitC := make(chan bool) // wwg = workers waitgroup keeps track of hashworkers spawned by this split call if wwg != nil { wwg.Add(1) } - go self.hashWorker(jobC, chunkC, errC, swg, wwg) + go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg) depth := 0 treeSize := self.chunkSize @@ -141,11 +140,10 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s } key := make([]byte, self.hashFunc().Size()) - // glog.V(logger.Detail).Infof("split request received for data (%v bytes, depth: %v)", size, depth) // this waitgroup member is released after the root hash is calculated wg.Add(1) //launch actual recursive function passing the waitgroups - go self.split(depth, treeSize/self.branches, key, data, size, jobC, chunkC, errC, wg, swg, wwg) + go self.split(depth, treeSize/self.branches, key, data, size, jobC, chunkC, errC, quitC, wg, swg, wwg) // closes internal error channel if all subprocesses in the workgroup finished go func() { @@ -153,7 +151,6 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s wg.Wait() // if storage waitgroup is non-nil, we wait for storage to finish too if swg != nil { - // glog.V(logger.Detail).Infof("Waiting for storage to finish") swg.Wait() } close(errC) @@ -162,14 +159,15 @@ func (self *TreeChunker) Split(data io.Reader, size int64, chunkC chan *Chunk, s select { case err := <-errC: if err != nil { + close(quitC) return nil, err } - // + //TODO: add a timeout } return key, nil } -func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, parentWg, swg, wwg *sync.WaitGroup) { +func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reader, size int64, jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, parentWg, swg, wwg *sync.WaitGroup) { for depth > 0 && size < treeSize { treeSize /= self.branches @@ -180,17 +178,24 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade // leaf nodes -> content chunks chunkData := make([]byte, size+8) binary.LittleEndian.PutUint64(chunkData[0:8], uint64(size)) - data.Read(chunkData[8:]) + var readBytes int64 + for readBytes < size { + n, err := data.Read(chunkData[8+readBytes:]) + readBytes += int64(n) + if err != nil && !(err == io.EOF && readBytes == size) { + errC <- err + return + } + } select { case jobC <- &hashJob{key, chunkData, size, parentWg}: - case <-errC: + case <-quitC: } - // glog.V(logger.Detail).Infof("read %v", size) return } + // dept > 0 // intermediate chunk containing child nodes hashes branchCnt := int64((size + treeSize - 1) / treeSize) - // glog.V(logger.Detail).Infof("intermediate node: setting branches: %v, depth: %v, max subtree size: %v, data size: %v", branches, depth, treeSize, size) var chunk []byte = make([]byte, branchCnt*self.hashSize+8) var pos, i int64 @@ -210,7 +215,7 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade subTreeKey := chunk[8+i*self.hashSize : 8+(i+1)*self.hashSize] childrenWg.Add(1) - self.split(depth-1, treeSize/self.branches, subTreeKey, data, secSize, jobC, chunkC, errC, childrenWg, swg, wwg) + self.split(depth-1, treeSize/self.branches, subTreeKey, data, secSize, jobC, chunkC, errC, quitC, childrenWg, swg, wwg) i++ pos += treeSize @@ -224,15 +229,15 @@ func (self *TreeChunker) split(depth int, treeSize int64, key Key, data io.Reade wwg.Add(1) } self.workerCount++ - go self.hashWorker(jobC, chunkC, errC, swg, wwg) + go self.hashWorker(jobC, chunkC, errC, quitC, swg, wwg) } select { case jobC <- &hashJob{key, chunk, size, parentWg}: - case <-errC: + case <-quitC: } } -func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, swg, wwg *sync.WaitGroup) { +func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC chan error, quitC chan bool, swg, wwg *sync.WaitGroup) { hasher := self.hashFunc() if wwg != nil { defer wwg.Done() @@ -247,8 +252,7 @@ func (self *TreeChunker) hashWorker(jobC chan *hashJob, chunkC chan *Chunk, errC // now we got the hashes in the chunk, then hash the chunks hasher.Reset() self.hashChunk(hasher, job, chunkC, swg) - // glog.V(logger.Detail).Infof("hash chunk (%v)", job.size) - case <-errC: + case <-quitC: return } } @@ -276,6 +280,7 @@ func (self *TreeChunker) hashChunk(hasher hash.Hash, job *hashJob, chunkC chan * } } job.parentWg.Done() + if chunkC != nil { chunkC <- newChunk } @@ -328,7 +333,6 @@ func (self *LazyChunkReader) Size(quitC chan bool) (n int64, err error) { func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { // this is correct, a swarm doc cannot be zero length, so no EOF is expected if len(b) == 0 { - // glog.V(logger.Detail).Infof("Size query for %v", chunk.Key.Log()) return 0, nil } quitC := make(chan bool) @@ -336,13 +340,10 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { if err != nil { return 0, err } - // glog.V(logger.Detail).Infof("readAt: len(b): %v, off: %v, size: %v ", len(b), off, size) errC := make(chan error) - // glog.V(logger.Detail).Infof("readAt: reading %v into %d bytes at offset %d.", self.chunk.Key.Log(), len(b), off) // } - // glog.V(logger.Detail).Infof("-> want: %v, off: %v size: %v ", want, off, self.size) var treeSize int64 var depth int // calculate depth and max treeSize @@ -364,22 +365,15 @@ func (self *LazyChunkReader) ReadAt(b []byte, off int64) (read int, err error) { return 0, err } - // glog.V(logger.Detail).Infof("ReadAt received %v", err) - // glog.V(logger.Detail).Infof("end: len(b): %v, off: %v, size: %v ", len(b), off, size) if off+int64(len(b)) >= size { - // glog.V(logger.Detail).Infof(" len(b): %v EOF", len(b)) return len(b), io.EOF } - // glog.V(logger.Detail).Infof("ReadAt returning at %d: %v", read, err) return len(b), nil } func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, treeSize int64, chunk *Chunk, parentWg *sync.WaitGroup, errC chan error, quitC chan bool) { defer parentWg.Done() // return NewDPA(&LocalStore{}) - // glog.V(logger.Detail).Infof("inh len(b): %v, off: %v eoff: %v ", len(b), off, eoff) - - // glog.V(logger.Detail).Infof("depth: %v, loff: %v, eoff: %v, chunk.Size: %v, treeSize: %v", depth, off, eoff, chunk.Size, treeSize) // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) @@ -391,7 +385,6 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr // leaf chunk found if depth == 0 { - // glog.V(logger.Detail).Infof("depth: %v, len(b): %v, off: %v, eoff: %v, chunk.Size: %v %v, treeSize: %v", depth, len(b), off, eoff, chunk.Size, len(chunk.SData), treeSize) extra := 8 + eoff - int64(len(chunk.SData)) if extra > 0 { eoff -= extra @@ -406,7 +399,6 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr wg := &sync.WaitGroup{} defer wg.Wait() - // glog.V(logger.Detail).Infof("start %v,end %v", start, end) for i := start; i < end; i++ { soff := i * treeSize @@ -425,7 +417,6 @@ func (self *LazyChunkReader) join(b []byte, off int64, eoff int64, depth int, tr wg.Add(1) go func(j int64) { childKey := chunk.SData[8+j*self.hashSize : 8+(j+1)*self.hashSize] - // glog.V(logger.Detail).Infof("subtree ind.ex: %v -> %v", j, childKey.Log()) chunk := retrieve(childKey, self.chunkC, quitC) if chunk == nil { select { @@ -450,7 +441,6 @@ func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk { Key: key, C: make(chan bool), // close channel to signal data delivery } - // glog.V(logger.Detail).Infof("chunk data sent for %v (key interval in chunk %v-%v)", ch.Key.Log(), j*self.chunker.hashSize, (j+1)*self.chunker.hashSize) // submit chunk for retrieval select { case chunkC <- chunk: // submit retrieval request, someone should be listening on the other side (or we will time out globally) @@ -464,7 +454,6 @@ func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk { // this is how we control process leakage (quitC is closed once join is finished (after timeout)) return nil case <-chunk.C: // bells are ringing, data have been delivered - // glog.V(logger.Detail).Infof("chunk data received") } if len(chunk.SData) == 0 { return nil // chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) @@ -476,7 +465,6 @@ func retrieve(key Key, chunkC chan *Chunk, quitC chan bool) *Chunk { // Read keeps a cursor so cannot be called simulateously, see ReadAt func (self *LazyChunkReader) Read(b []byte) (read int, err error) { read, err = self.ReadAt(b, self.off) - // glog.V(logger.Detail).Infof("read: %v, off: %v, error: %v", read, self.off, err) self.off += int64(read) return diff --git a/swarm/storage/chunker_test.go b/swarm/storage/chunker_test.go index e6ca3d087..4f05cd1cc 100644 --- a/swarm/storage/chunker_test.go +++ b/swarm/storage/chunker_test.go @@ -18,6 +18,7 @@ package storage import ( "bytes" + "crypto/rand" "encoding/binary" "fmt" "io" @@ -27,6 +28,7 @@ import ( "time" ) + /* Tests TreeChunker by splitting and joining a random byte slice */ @@ -49,7 +51,7 @@ func (self *chunkerTester) checkChunks(t *testing.T, want int) { } } -func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, chunkC chan *Chunk, swg *sync.WaitGroup) (key Key) { +func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, chunkC chan *Chunk, swg *sync.WaitGroup, expectedError error) (key Key) { // reset self.chunks = make(map[string]*Chunk) @@ -65,14 +67,9 @@ func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, c select { case <-timeout: self.t.Fatalf("Join timeout error") - - case chunk, ok := <-chunkC: - if !ok { - // glog.V(logger.Info).Infof("chunkC closed quitting") - close(quitC) - return - } - // glog.V(logger.Info).Infof("chunk %v received", len(self.chunks)) + case <-quitC: + return + case chunk := <-chunkC: // self.chunks = append(self.chunks, chunk) self.chunks[chunk.Key.String()] = chunk if chunk.wg != nil { @@ -83,21 +80,16 @@ func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, c }() } key, err := chunker.Split(data, size, chunkC, swg, nil) - if err != nil { + if err != nil && expectedError == nil { self.t.Fatalf("Split error: %v", err) + } else if expectedError != nil && (err == nil || err.Error() != expectedError.Error()) { + self.t.Fatalf("Not receiving the correct error! Expected %v, received %v", expectedError, err) } if chunkC != nil { if swg != nil { - // glog.V(logger.Info).Infof("Waiting for storage to finish") swg.Wait() - // glog.V(logger.Info).Infof("Storage finished") } - close(chunkC) - } - if chunkC != nil { - // glog.V(logger.Info).Infof("waiting for splitter finished") - <-quitC - // glog.V(logger.Info).Infof("Splitter finished") + close(quitC) } return } @@ -105,11 +97,9 @@ func (self *chunkerTester) Split(chunker Splitter, data io.Reader, size int64, c func (self *chunkerTester) Join(chunker Chunker, key Key, c int, chunkC chan *Chunk, quitC chan bool) LazySectionReader { // reset but not the chunks - // glog.V(logger.Info).Infof("Splitter finished") reader := chunker.Join(key, chunkC) timeout := time.After(600 * time.Second) - // glog.V(logger.Info).Infof("Splitter finished") i := 0 go func() { for { @@ -122,15 +112,12 @@ func (self *chunkerTester) Join(chunker Chunker, key Key, c int, chunkC chan *Ch close(quitC) return } - // glog.V(logger.Info).Infof("chunk %v: %v", i, chunk.Key.String()) // this just mocks the behaviour of a chunk store retrieval stored, success := self.chunks[chunk.Key.String()] - // glog.V(logger.Info).Infof("chunk %v, success: %v", chunk.Key.String(), success) if !success { self.t.Fatalf("not found") return } - // glog.V(logger.Info).Infof("chunk %v: %v", i, chunk.Key.String()) chunk.SData = stored.SData chunk.Size = int64(binary.LittleEndian.Uint64(chunk.SData[0:8])) close(chunk.C) @@ -141,6 +128,26 @@ func (self *chunkerTester) Join(chunker Chunker, key Key, c int, chunkC chan *Ch return reader } +func testRandomBrokenData(splitter Splitter, n int, tester *chunkerTester) { + data := io.LimitReader(rand.Reader, int64(n)) + brokendata := brokenLimitReader(data, n, n/2) + + buf := make([]byte, n) + _, err := brokendata.Read(buf) + if err == nil || err.Error() != "Broken reader" { + tester.t.Fatalf("Broken reader is not broken, hence broken. Returns: %v", err) + } + + data = io.LimitReader(rand.Reader, int64(n)) + brokendata = brokenLimitReader(data, n, n/2) + + chunkC := make(chan *Chunk, 1000) + swg := &sync.WaitGroup{} + + key := tester.Split(splitter, brokendata, int64(n), chunkC, swg, fmt.Errorf("Broken reader")) + tester.t.Logf(" Key = %v\n", key) +} + func testRandomData(splitter Splitter, n int, tester *chunkerTester) { if tester.inputs == nil { tester.inputs = make(map[uint64][]byte) @@ -151,13 +158,13 @@ func testRandomData(splitter Splitter, n int, tester *chunkerTester) { data, input = testDataReaderAndSlice(n) tester.inputs[uint64(n)] = input } else { - data = limitReader(bytes.NewReader(input), n) + data = io.LimitReader(bytes.NewReader(input), int64(n)) } chunkC := make(chan *Chunk, 1000) swg := &sync.WaitGroup{} - key := tester.Split(splitter, data, int64(n), chunkC, swg) + key := tester.Split(splitter, data, int64(n), chunkC, swg, nil) tester.t.Logf(" Key = %v\n", key) chunkC = make(chan *Chunk, 1000) @@ -166,9 +173,7 @@ func testRandomData(splitter Splitter, n int, tester *chunkerTester) { chunker := NewTreeChunker(NewChunkerParams()) reader := tester.Join(chunker, key, 0, chunkC, quitC) output := make([]byte, n) - // glog.V(logger.Info).Infof(" Key = %v\n", key) r, err := reader.Read(output) - // glog.V(logger.Info).Infof(" read = %v %v\n", r, err) if r != n || err != io.EOF { tester.t.Fatalf("read error read: %v n = %v err = %v\n", r, n, err) } @@ -183,7 +188,7 @@ func testRandomData(splitter Splitter, n int, tester *chunkerTester) { func TestRandomData(t *testing.T) { // sizes := []int{123456} - sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 123456} + sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 123456, 2345678} tester := &chunkerTester{t: t} chunker := NewTreeChunker(NewChunkerParams()) for _, s := range sizes { @@ -195,6 +200,16 @@ func TestRandomData(t *testing.T) { } } +func TestRandomBrokenData(t *testing.T) { + sizes := []int{1, 60, 83, 179, 253, 1024, 4095, 4096, 4097, 8191, 8192, 8193, 123456, 2345678} + tester := &chunkerTester{t: t} + chunker := NewTreeChunker(NewChunkerParams()) + for _, s := range sizes { + testRandomBrokenData(chunker, s, tester) + t.Logf("done size: %v", s) + } +} + func readAll(reader LazySectionReader, result []byte) { size := int64(len(result)) @@ -227,7 +242,7 @@ func benchmarkJoin(n int, t *testing.B) { chunkC := make(chan *Chunk, 1000) swg := &sync.WaitGroup{} - key := tester.Split(chunker, data, int64(n), chunkC, swg) + key := tester.Split(chunker, data, int64(n), chunkC, swg, nil) // t.StartTimer() chunkC = make(chan *Chunk, 1000) quitC := make(chan bool) @@ -248,8 +263,7 @@ func benchmarkSplitTree(n int, t *testing.B) { chunker := NewTreeChunker(NewChunkerParams()) tester := &chunkerTester{t: t} data := testDataReader(n) - // glog.V(logger.Info).Infof("splitting data of length %v", n) - tester.Split(chunker, data, int64(n), nil, nil) + tester.Split(chunker, data, int64(n), nil, nil, nil) } stats := new(runtime.MemStats) runtime.ReadMemStats(stats) @@ -262,8 +276,7 @@ func benchmarkSplitPyramid(n int, t *testing.B) { splitter := NewPyramidChunker(NewChunkerParams()) tester := &chunkerTester{t: t} data := testDataReader(n) - // glog.V(logger.Info).Infof("splitting data of length %v", n) - tester.Split(splitter, data, int64(n), nil, nil) + tester.Split(splitter, data, int64(n), nil, nil, nil) } stats := new(runtime.MemStats) runtime.ReadMemStats(stats) diff --git a/swarm/storage/common_test.go b/swarm/storage/common_test.go index e81a82b7b..889b28a70 100644 --- a/swarm/storage/common_test.go +++ b/swarm/storage/common_test.go @@ -19,6 +19,7 @@ package storage import ( "bytes" "crypto/rand" + "fmt" "io" "sync" "testing" @@ -27,32 +28,31 @@ import ( "github.com/ethereum/go-ethereum/logger/glog" ) -type limitedReader struct { - r io.Reader - off int64 - size int64 +type brokenLimitedReader struct { + lr io.Reader + errAt int + off int + size int } -func limitReader(r io.Reader, size int) *limitedReader { - return &limitedReader{r, 0, int64(size)} -} - -func (self *limitedReader) Read(buf []byte) (int, error) { - limit := int64(len(buf)) - left := self.size - self.off - if limit >= left { - limit = left - } - n, err := self.r.Read(buf[:limit]) - if err == nil && limit == left { - err = io.EOF +func brokenLimitReader(data io.Reader, size int, errAt int) *brokenLimitedReader { + return &brokenLimitedReader{ + lr: data, + errAt: errAt, + size: size, } - self.off += int64(n) - return n, err } func testDataReader(l int) (r io.Reader) { - return limitReader(rand.Reader, l) + return io.LimitReader(rand.Reader, int64(l)) +} + +func (self *brokenLimitedReader) Read(buf []byte) (int, error) { + if self.off+len(buf) > self.errAt { + return 0, fmt.Errorf("Broken reader") + } + self.off += len(buf) + return self.lr.Read(buf) } func testDataReaderAndSlice(l int) (r io.Reader, slice []byte) { @@ -60,7 +60,7 @@ func testDataReaderAndSlice(l int) (r io.Reader, slice []byte) { if _, err := rand.Read(slice); err != nil { panic("rand error") } - r = limitReader(bytes.NewReader(slice), l) + r = io.LimitReader(bytes.NewReader(slice), int64(l)) return } diff --git a/swarm/storage/pyramid.go b/swarm/storage/pyramid.go index 3c1ef17a0..79e1927b9 100644 --- a/swarm/storage/pyramid.go +++ b/swarm/storage/pyramid.go @@ -81,7 +81,6 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk chunks := (size + self.chunkSize - 1) / self.chunkSize depth := int(math.Ceil(math.Log(float64(chunks))/math.Log(float64(self.branches)))) + 1 - // glog.V(logger.Detail).Infof("chunks: %v, depth: %v", chunks, depth) results := Tree{ Chunks: chunks, @@ -99,26 +98,24 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk go self.processor(pend, swg, tasks, chunkC, &results) } // Feed the chunks into the task pool + read := 0 for index := 0; ; index++ { buffer := make([]byte, self.chunkSize+8) n, err := data.Read(buffer[8:]) - last := err == io.ErrUnexpectedEOF || err == io.EOF - // glog.V(logger.Detail).Infof("n: %v, index: %v, depth: %v", n, index, depth) + read += n + last := int64(read) == size || err == io.ErrUnexpectedEOF || err == io.EOF if err != nil && !last { - // glog.V(logger.Info).Infof("error: %v", err) close(abortC) break } binary.LittleEndian.PutUint64(buffer[:8], uint64(n)) pend.Add(1) - // glog.V(logger.Info).Infof("-> task %v (%v)", index, n) select { case tasks <- &Task{Index: int64(index), Size: uint64(n), Data: buffer[:n+8], Last: last}: case <-abortC: return nil, err } if last { - // glog.V(logger.Info).Infof("last task %v (%v)", index, n) break } } @@ -126,7 +123,6 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk close(tasks) pend.Wait() - // glog.V(logger.Info).Infof("len: %v", results.Levels[0][0]) key := results.Levels[0][0].Children[0][:] return key, nil } @@ -134,12 +130,10 @@ func (self *PyramidChunker) Split(data io.Reader, size int64, chunkC chan *Chunk func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Task, chunkC chan *Chunk, results *Tree) { defer pend.Done() - // glog.V(logger.Info).Infof("processor started") // Start processing leaf chunks ad infinitum hasher := self.hashFunc() for task := range tasks { depth, pow := len(results.Levels)-1, self.branches - // glog.V(logger.Info).Infof("task: %v, last: %v", task.Index, task.Last) size := task.Size data := task.Data var node *Node @@ -171,10 +165,8 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas } node = &Node{pending, 0, make([]common.Hash, pending), last} results.Levels[depth][task.Index/pow] = node - // glog.V(logger.Info).Infof("create node %v, %v (%v children, all pending)", depth, task.Index/pow, pending) } node.Pending-- - // glog.V(logger.Info).Infof("pending now: %v", node.Pending) i := task.Index / (pow / self.branches) % self.branches if last { node.Last = true @@ -182,7 +174,6 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas copy(node.Children[i][:], hash) node.Size += size left := node.Pending - // glog.V(logger.Info).Infof("left pending now: %v, node size: %v", left, node.Size) if chunkC != nil { if swg != nil { swg.Add(1) @@ -198,7 +189,6 @@ func (self *PyramidChunker) processor(pend, swg *sync.WaitGroup, tasks chan *Tas results.Lock.Unlock() // If there's more work to be done, leave for others - // glog.V(logger.Info).Infof("left %v", left) if left > 0 { break } diff --git a/trie/hasher.go b/trie/hasher.go index 87e02fb85..e395e00d7 100644 --- a/trie/hasher.go +++ b/trie/hasher.go @@ -27,8 +27,9 @@ import ( ) type hasher struct { - tmp *bytes.Buffer - sha hash.Hash + tmp *bytes.Buffer + sha hash.Hash + cachegen, cachelimit uint16 } // hashers live in a global pool. @@ -38,8 +39,10 @@ var hasherPool = sync.Pool{ }, } -func newHasher() *hasher { - return hasherPool.Get().(*hasher) +func newHasher(cachegen, cachelimit uint16) *hasher { + h := hasherPool.Get().(*hasher) + h.cachegen, h.cachelimit = cachegen, cachelimit + return h } func returnHasherToPool(h *hasher) { @@ -50,8 +53,18 @@ func returnHasherToPool(h *hasher) { // original node initialzied with the computed hash to replace the original one. func (h *hasher) hash(n node, db DatabaseWriter, force bool) (node, node, error) { // If we're not storing the node, just hashing, use avaialble cached data - if hash, dirty := n.cache(); hash != nil && (db == nil || !dirty) { - return hash, n, nil + if hash, dirty := n.cache(); hash != nil { + if db == nil { + return hash, n, nil + } + if n.canUnload(h.cachegen, h.cachelimit) { + // Evict the node from cache. All of its subnodes will have a lower or equal + // cache generation number. + return hash, hash, nil + } + if !dirty { + return hash, n, nil + } } // Trie not processed yet or needs storage, walk the children collapsed, cached, err := h.hashChildren(n, db) @@ -62,19 +75,21 @@ func (h *hasher) hash(n node, db DatabaseWriter, force bool) (node, node, error) if err != nil { return hashNode{}, n, err } - // Cache the hash and RLP blob of the ndoe for later reuse + // Cache the hash of the ndoe for later reuse. if hash, ok := hashed.(hashNode); ok && !force { switch cached := cached.(type) { - case shortNode: - cached.hash = hash + case *shortNode: + cached = cached.copy() + cached.flags.hash = hash if db != nil { - cached.dirty = false + cached.flags.dirty = false } return hashed, cached, nil - case fullNode: - cached.hash = hash + case *fullNode: + cached = cached.copy() + cached.flags.hash = hash if db != nil { - cached.dirty = false + cached.flags.dirty = false } return hashed, cached, nil } @@ -89,40 +104,42 @@ func (h *hasher) hashChildren(original node, db DatabaseWriter) (node, node, err var err error switch n := original.(type) { - case shortNode: + case *shortNode: // Hash the short node's child, caching the newly hashed subtree - cached := n - cached.Key = common.CopyBytes(cached.Key) + collapsed, cached := n.copy(), n.copy() + collapsed.Key = compactEncode(n.Key) + cached.Key = common.CopyBytes(n.Key) - n.Key = compactEncode(n.Key) if _, ok := n.Val.(valueNode); !ok { - if n.Val, cached.Val, err = h.hash(n.Val, db, false); err != nil { - return n, original, err + collapsed.Val, cached.Val, err = h.hash(n.Val, db, false) + if err != nil { + return original, original, err } } - if n.Val == nil { - n.Val = valueNode(nil) // Ensure that nil children are encoded as empty strings. + if collapsed.Val == nil { + collapsed.Val = valueNode(nil) // Ensure that nil children are encoded as empty strings. } - return n, cached, nil + return collapsed, cached, nil - case fullNode: + case *fullNode: // Hash the full node's children, caching the newly hashed subtrees - cached := fullNode{dirty: n.dirty} + collapsed, cached := n.copy(), n.copy() for i := 0; i < 16; i++ { if n.Children[i] != nil { - if n.Children[i], cached.Children[i], err = h.hash(n.Children[i], db, false); err != nil { - return n, original, err + collapsed.Children[i], cached.Children[i], err = h.hash(n.Children[i], db, false) + if err != nil { + return original, original, err } } else { - n.Children[i] = valueNode(nil) // Ensure that nil children are encoded as empty strings. + collapsed.Children[i] = valueNode(nil) // Ensure that nil children are encoded as empty strings. } } cached.Children[16] = n.Children[16] - if n.Children[16] == nil { - n.Children[16] = valueNode(nil) + if collapsed.Children[16] == nil { + collapsed.Children[16] = valueNode(nil) } - return n, cached, nil + return collapsed, cached, nil default: // Value and hash nodes don't have children so they're left as were @@ -140,6 +157,7 @@ func (h *hasher) store(n node, db DatabaseWriter, force bool) (node, error) { if err := rlp.Encode(h.tmp, n); err != nil { panic("encode error: " + err.Error()) } + if h.tmp.Len() < 32 && !force { return n, nil // Nodes smaller than 32 bytes are stored inside their parent } diff --git a/trie/iterator.go b/trie/iterator.go index 8cad51aff..afde6e19e 100644 --- a/trie/iterator.go +++ b/trie/iterator.go @@ -56,11 +56,11 @@ func (it *Iterator) makeKey() []byte { key := it.keyBuf[:0] for _, se := range it.nodeIt.stack { switch node := se.node.(type) { - case fullNode: + case *fullNode: if se.child <= 16 { key = append(key, byte(se.child)) } - case shortNode: + case *shortNode: if hasTerm(node.Key) { key = append(key, node.Key[:len(node.Key)-1]...) } else { @@ -148,7 +148,7 @@ func (it *NodeIterator) step() error { if (ancestor == common.Hash{}) { ancestor = parent.parent } - if node, ok := parent.node.(fullNode); ok { + if node, ok := parent.node.(*fullNode); ok { // Full node, traverse all children, then the node itself if parent.child >= len(node.Children) { break @@ -156,7 +156,7 @@ func (it *NodeIterator) step() error { for parent.child++; parent.child < len(node.Children); parent.child++ { if current := node.Children[parent.child]; current != nil { it.stack = append(it.stack, &nodeIteratorState{ - hash: common.BytesToHash(node.hash), + hash: common.BytesToHash(node.flags.hash), node: current, parent: ancestor, child: -1, @@ -164,14 +164,14 @@ func (it *NodeIterator) step() error { break } } - } else if node, ok := parent.node.(shortNode); ok { + } else if node, ok := parent.node.(*shortNode); ok { // Short node, traverse the pointer singleton child, then the node itself if parent.child >= 0 { break } parent.child++ it.stack = append(it.stack, &nodeIteratorState{ - hash: common.BytesToHash(node.hash), + hash: common.BytesToHash(node.flags.hash), node: node.Val, parent: ancestor, child: -1, diff --git a/trie/node.go b/trie/node.go index b97d370be..de9752c93 100644 --- a/trie/node.go +++ b/trie/node.go @@ -30,42 +30,60 @@ var indices = []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b type node interface { fstring(string) string cache() (hashNode, bool) + canUnload(cachegen, cachelimit uint16) bool } type ( fullNode struct { Children [17]node // Actual trie node data to encode/decode (needs custom encoder) - hash hashNode // Cached hash of the node to prevent rehashing (may be nil) - dirty bool // Cached flag whether the node's new or already stored + flags nodeFlag } shortNode struct { Key []byte Val node - hash hashNode // Cached hash of the node to prevent rehashing (may be nil) - dirty bool // Cached flag whether the node's new or already stored + flags nodeFlag } hashNode []byte valueNode []byte ) // EncodeRLP encodes a full node into the consensus RLP format. -func (n fullNode) EncodeRLP(w io.Writer) error { +func (n *fullNode) EncodeRLP(w io.Writer) error { return rlp.Encode(w, n.Children) } -// Cache accessors to retrieve precalculated values (avoid lengthy type switches). -func (n fullNode) cache() (hashNode, bool) { return n.hash, n.dirty } -func (n shortNode) cache() (hashNode, bool) { return n.hash, n.dirty } -func (n hashNode) cache() (hashNode, bool) { return nil, true } -func (n valueNode) cache() (hashNode, bool) { return nil, true } +func (n *fullNode) copy() *fullNode { copy := *n; return © } +func (n *shortNode) copy() *shortNode { copy := *n; return © } + +// nodeFlag contains caching-related metadata about a node. +type nodeFlag struct { + hash hashNode // cached hash of the node (may be nil) + gen uint16 // cache generation counter + dirty bool // whether the node has changes that must be written to the database +} + +// canUnload tells whether a node can be unloaded. +func (n *nodeFlag) canUnload(cachegen, cachelimit uint16) bool { + return !n.dirty && cachegen-n.gen >= cachelimit +} + +func (n *fullNode) canUnload(gen, limit uint16) bool { return n.flags.canUnload(gen, limit) } +func (n *shortNode) canUnload(gen, limit uint16) bool { return n.flags.canUnload(gen, limit) } +func (n hashNode) canUnload(uint16, uint16) bool { return false } +func (n valueNode) canUnload(uint16, uint16) bool { return false } + +func (n *fullNode) cache() (hashNode, bool) { return n.flags.hash, n.flags.dirty } +func (n *shortNode) cache() (hashNode, bool) { return n.flags.hash, n.flags.dirty } +func (n hashNode) cache() (hashNode, bool) { return nil, true } +func (n valueNode) cache() (hashNode, bool) { return nil, true } // Pretty printing. -func (n fullNode) String() string { return n.fstring("") } -func (n shortNode) String() string { return n.fstring("") } -func (n hashNode) String() string { return n.fstring("") } -func (n valueNode) String() string { return n.fstring("") } +func (n *fullNode) String() string { return n.fstring("") } +func (n *shortNode) String() string { return n.fstring("") } +func (n hashNode) String() string { return n.fstring("") } +func (n valueNode) String() string { return n.fstring("") } -func (n fullNode) fstring(ind string) string { +func (n *fullNode) fstring(ind string) string { resp := fmt.Sprintf("[\n%s ", ind) for i, node := range n.Children { if node == nil { @@ -76,7 +94,7 @@ func (n fullNode) fstring(ind string) string { } return resp + fmt.Sprintf("\n%s] ", ind) } -func (n shortNode) fstring(ind string) string { +func (n *shortNode) fstring(ind string) string { return fmt.Sprintf("{%x: %v} ", n.Key, n.Val.fstring(ind+" ")) } func (n hashNode) fstring(ind string) string { @@ -120,6 +138,7 @@ func decodeShort(hash, buf, elems []byte) (node, error) { if err != nil { return nil, err } + flag := nodeFlag{hash: hash} key := compactDecode(kbuf) if key[len(key)-1] == 16 { // value node @@ -127,17 +146,17 @@ func decodeShort(hash, buf, elems []byte) (node, error) { if err != nil { return nil, fmt.Errorf("invalid value node: %v", err) } - return shortNode{key, valueNode(val), hash, false}, nil + return &shortNode{key, append(valueNode{}, val...), flag}, nil } r, _, err := decodeRef(rest) if err != nil { return nil, wrapError(err, "val") } - return shortNode{key, r, hash, false}, nil + return &shortNode{key, r, flag}, nil } -func decodeFull(hash, buf, elems []byte) (fullNode, error) { - n := fullNode{hash: hash} +func decodeFull(hash, buf, elems []byte) (*fullNode, error) { + n := &fullNode{flags: nodeFlag{hash: hash}} for i := 0; i < 16; i++ { cld, rest, err := decodeRef(elems) if err != nil { @@ -150,7 +169,7 @@ func decodeFull(hash, buf, elems []byte) (fullNode, error) { return n, err } if len(val) > 0 { - n.Children[16] = valueNode(val) + n.Children[16] = append(valueNode{}, val...) } return n, nil } @@ -176,7 +195,7 @@ func decodeRef(buf []byte) (node, []byte, error) { // empty node return nil, rest, nil case kind == rlp.String && len(val) == 32: - return hashNode(val), rest, nil + return append(hashNode{}, val...), rest, nil default: return nil, nil, fmt.Errorf("invalid RLP string size %d (want 0 or 32)", len(val)) } diff --git a/trie/node_test.go b/trie/node_test.go new file mode 100644 index 000000000..7ad1ff9e7 --- /dev/null +++ b/trie/node_test.go @@ -0,0 +1,58 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package trie + +import "testing" + +func TestCanUnload(t *testing.T) { + tests := []struct { + flag nodeFlag + cachegen, cachelimit uint16 + want bool + }{ + { + flag: nodeFlag{dirty: true, gen: 0}, + want: false, + }, + { + flag: nodeFlag{dirty: false, gen: 0}, + cachegen: 0, cachelimit: 0, + want: true, + }, + { + flag: nodeFlag{dirty: false, gen: 65534}, + cachegen: 65535, cachelimit: 1, + want: true, + }, + { + flag: nodeFlag{dirty: false, gen: 65534}, + cachegen: 0, cachelimit: 1, + want: true, + }, + { + flag: nodeFlag{dirty: false, gen: 1}, + cachegen: 65535, cachelimit: 1, + want: true, + }, + } + + for _, test := range tests { + if got := test.flag.canUnload(test.cachegen, test.cachelimit); got != test.want { + t.Errorf("%+v\n got %t, want %t", test, got, test.want) + } + } +} diff --git a/trie/proof.go b/trie/proof.go index 116c13a1b..f193b52df 100644 --- a/trie/proof.go +++ b/trie/proof.go @@ -44,7 +44,7 @@ func (t *Trie) Prove(key []byte) []rlp.RawValue { tn := t.root for len(key) > 0 && tn != nil { switch n := tn.(type) { - case shortNode: + case *shortNode: if len(key) < len(n.Key) || !bytes.Equal(n.Key, key[:len(n.Key)]) { // The trie doesn't contain the key. tn = nil @@ -53,7 +53,7 @@ func (t *Trie) Prove(key []byte) []rlp.RawValue { key = key[len(n.Key):] } nodes = append(nodes, n) - case fullNode: + case *fullNode: tn = n.Children[key[0]] key = key[1:] nodes = append(nodes, n) @@ -70,7 +70,7 @@ func (t *Trie) Prove(key []byte) []rlp.RawValue { panic(fmt.Sprintf("%T: invalid node: %v", tn, tn)) } } - hasher := newHasher() + hasher := newHasher(0, 0) proof := make([]rlp.RawValue, 0, len(nodes)) for i, n := range nodes { // Don't bother checking for errors here since hasher panics @@ -130,13 +130,13 @@ func VerifyProof(rootHash common.Hash, key []byte, proof []rlp.RawValue) (value func get(tn node, key []byte) ([]byte, node) { for len(key) > 0 { switch n := tn.(type) { - case shortNode: + case *shortNode: if len(key) < len(n.Key) || !bytes.Equal(n.Key, key[:len(n.Key)]) { return nil, nil } tn = n.Val key = key[len(n.Key):] - case fullNode: + case *fullNode: tn = n.Children[key[0]] key = key[1:] case hashNode: diff --git a/trie/secure_trie.go b/trie/secure_trie.go index 2a8b57214..4d9ebe4d3 100644 --- a/trie/secure_trie.go +++ b/trie/secure_trie.go @@ -49,8 +49,12 @@ type SecureTrie struct { // If root is the zero hash or the sha3 hash of an empty string, the // trie is initially empty. Otherwise, New will panic if db is nil // and returns MissingNodeError if the root node cannot be found. +// // Accessing the trie loads nodes from db on demand. -func NewSecure(root common.Hash, db Database) (*SecureTrie, error) { +// Loaded nodes are kept around until their 'cache generation' expires. +// A new cache generation is created by each call to Commit. +// cachelimit sets the number of past cache generations to keep. +func NewSecure(root common.Hash, db Database, cachelimit uint16) (*SecureTrie, error) { if db == nil { panic("NewSecure called with nil database") } @@ -58,9 +62,8 @@ func NewSecure(root common.Hash, db Database) (*SecureTrie, error) { if err != nil { return nil, err } - return &SecureTrie{ - trie: *trie, - }, nil + trie.SetCacheLimit(cachelimit) + return &SecureTrie{trie: *trie}, nil } // Get returns the value for key stored in the trie. @@ -191,7 +194,7 @@ func (t *SecureTrie) secKey(key []byte) []byte { // The caller must not hold onto the return value because it will become // invalid on the next call to hashKey or secKey. func (t *SecureTrie) hashKey(key []byte) []byte { - h := newHasher() + h := newHasher(0, 0) h.sha.Reset() h.sha.Write(key) buf := h.sha.Sum(t.hashKeyBuf[:0]) diff --git a/trie/secure_trie_test.go b/trie/secure_trie_test.go index 3171b8c31..159640fda 100644 --- a/trie/secure_trie_test.go +++ b/trie/secure_trie_test.go @@ -29,7 +29,7 @@ import ( func newEmptySecure() *SecureTrie { db, _ := ethdb.NewMemDatabase() - trie, _ := NewSecure(common.Hash{}, db) + trie, _ := NewSecure(common.Hash{}, db, 0) return trie } @@ -37,7 +37,7 @@ func newEmptySecure() *SecureTrie { func makeTestSecureTrie() (ethdb.Database, *SecureTrie, map[string][]byte) { // Create an empty trie db, _ := ethdb.NewMemDatabase() - trie, _ := NewSecure(common.Hash{}, db) + trie, _ := NewSecure(common.Hash{}, db, 0) // Fill it with some arbitrary data content := make(map[string][]byte) diff --git a/trie/sync.go b/trie/sync.go index 6e9e029b9..3de758536 100644 --- a/trie/sync.go +++ b/trie/sync.go @@ -212,12 +212,12 @@ func (s *TrieSync) children(req *request) ([]*request, error) { children := []child{} switch node := (*req.object).(type) { - case shortNode: + case *shortNode: children = []child{{ node: &node.Val, depth: req.depth + len(node.Key), }} - case fullNode: + case *fullNode: for i := 0; i < 17; i++ { if node.Children[i] != nil { children = append(children, child{ diff --git a/trie/trie.go b/trie/trie.go index 55598af98..65005bae8 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -62,6 +62,23 @@ type Trie struct { root node db Database originalRoot common.Hash + + // Cache generation values. + // cachegen increase by one with each commit operation. + // new nodes are tagged with the current generation and unloaded + // when their generation is older than than cachegen-cachelimit. + cachegen, cachelimit uint16 +} + +// SetCacheLimit sets the number of 'cache generations' to keep. +// A cache generations is created by a call to Commit. +func (t *Trie) SetCacheLimit(l uint16) { + t.cachelimit = l +} + +// newFlag returns the cache flag value for a newly created node. +func (t *Trie) newFlag() nodeFlag { + return nodeFlag{dirty: true, gen: t.cachegen} } // New creates a trie with an existing root node from db. @@ -120,27 +137,25 @@ func (t *Trie) tryGet(origNode node, key []byte, pos int) (value []byte, newnode return nil, nil, false, nil case valueNode: return n, n, false, nil - case shortNode: + case *shortNode: if len(key)-pos < len(n.Key) || !bytes.Equal(n.Key, key[pos:pos+len(n.Key)]) { // key not found in trie return nil, n, false, nil } value, newnode, didResolve, err = t.tryGet(n.Val, key, pos+len(n.Key)) if err == nil && didResolve { + n = n.copy() n.Val = newnode - return value, n, didResolve, err - } else { - return value, origNode, didResolve, err } - case fullNode: - child := n.Children[key[pos]] - value, newnode, didResolve, err = t.tryGet(child, key, pos+1) + return value, n, didResolve, err + case *fullNode: + value, newnode, didResolve, err = t.tryGet(n.Children[key[pos]], key, pos+1) if err == nil && didResolve { + n = n.copy() n.Children[key[pos]] = newnode - return value, n, didResolve, err - } else { - return value, origNode, didResolve, err + } + return value, n, didResolve, err case hashNode: child, err := t.resolveHash(n, key[:pos], key[pos:]) if err != nil { @@ -199,22 +214,19 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error return true, value, nil } switch n := n.(type) { - case shortNode: + case *shortNode: matchlen := prefixLen(key, n.Key) // If the whole key matches, keep this short node as is // and only update the value. if matchlen == len(n.Key) { dirty, nn, err := t.insert(n.Val, append(prefix, key[:matchlen]...), key[matchlen:], value) - if err != nil { - return false, nil, err + if !dirty || err != nil { + return false, n, err } - if !dirty { - return false, n, nil - } - return true, shortNode{n.Key, nn, nil, true}, nil + return true, &shortNode{n.Key, nn, t.newFlag()}, nil } // Otherwise branch out at the index where they differ. - branch := fullNode{dirty: true} + branch := &fullNode{flags: t.newFlag()} var err error _, branch.Children[n.Key[matchlen]], err = t.insert(nil, append(prefix, n.Key[:matchlen+1]...), n.Key[matchlen+1:], n.Val) if err != nil { @@ -229,21 +241,19 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error return true, branch, nil } // Otherwise, replace it with a short node leading up to the branch. - return true, shortNode{key[:matchlen], branch, nil, true}, nil + return true, &shortNode{key[:matchlen], branch, t.newFlag()}, nil - case fullNode: + case *fullNode: dirty, nn, err := t.insert(n.Children[key[0]], append(prefix, key[0]), key[1:], value) - if err != nil { - return false, nil, err + if !dirty || err != nil { + return false, n, err } - if !dirty { - return false, n, nil - } - n.Children[key[0]], n.hash, n.dirty = nn, nil, true + n = n.copy() + n.Children[key[0]], n.flags.hash, n.flags.dirty = nn, nil, true return true, n, nil case nil: - return true, shortNode{key, value, nil, true}, nil + return true, &shortNode{key, value, t.newFlag()}, nil case hashNode: // We've hit a part of the trie that isn't loaded yet. Load @@ -254,11 +264,8 @@ func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error return false, nil, err } dirty, nn, err := t.insert(rn, prefix, key, value) - if err != nil { - return false, nil, err - } - if !dirty { - return false, rn, nil + if !dirty || err != nil { + return false, rn, err } return true, nn, nil @@ -291,7 +298,7 @@ func (t *Trie) TryDelete(key []byte) error { // nodes on the way up after deleting recursively. func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) { switch n := n.(type) { - case shortNode: + case *shortNode: matchlen := prefixLen(key, n.Key) if matchlen < len(n.Key) { return false, n, nil // don't replace n on mismatch @@ -304,34 +311,29 @@ func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) { // subtrie must contain at least two other values with keys // longer than n.Key. dirty, child, err := t.delete(n.Val, append(prefix, key[:len(n.Key)]...), key[len(n.Key):]) - if err != nil { - return false, nil, err - } - if !dirty { - return false, n, nil + if !dirty || err != nil { + return false, n, err } switch child := child.(type) { - case shortNode: + case *shortNode: // Deleting from the subtrie reduced it to another // short node. Merge the nodes to avoid creating a // shortNode{..., shortNode{...}}. Use concat (which // always creates a new slice) instead of append to // avoid modifying n.Key since it might be shared with // other nodes. - return true, shortNode{concat(n.Key, child.Key...), child.Val, nil, true}, nil + return true, &shortNode{concat(n.Key, child.Key...), child.Val, t.newFlag()}, nil default: - return true, shortNode{n.Key, child, nil, true}, nil + return true, &shortNode{n.Key, child, t.newFlag()}, nil } - case fullNode: + case *fullNode: dirty, nn, err := t.delete(n.Children[key[0]], append(prefix, key[0]), key[1:]) - if err != nil { - return false, nil, err - } - if !dirty { - return false, n, nil + if !dirty || err != nil { + return false, n, err } - n.Children[key[0]], n.hash, n.dirty = nn, nil, true + n = n.copy() + n.Children[key[0]], n.flags.hash, n.flags.dirty = nn, nil, true // Check how many non-nil entries are left after deleting and // reduce the full node to a short node if only one entry is @@ -365,14 +367,14 @@ func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) { if err != nil { return false, nil, err } - if cnode, ok := cnode.(shortNode); ok { + if cnode, ok := cnode.(*shortNode); ok { k := append([]byte{byte(pos)}, cnode.Key...) - return true, shortNode{k, cnode.Val, nil, true}, nil + return true, &shortNode{k, cnode.Val, t.newFlag()}, nil } } // Otherwise, n is replaced by a one-nibble short node // containing the child. - return true, shortNode{[]byte{byte(pos)}, n.Children[pos], nil, true}, nil + return true, &shortNode{[]byte{byte(pos)}, n.Children[pos], t.newFlag()}, nil } // n still contains at least two values and cannot be reduced. return true, n, nil @@ -392,11 +394,8 @@ func (t *Trie) delete(n node, prefix, key []byte) (bool, node, error) { return false, nil, err } dirty, nn, err := t.delete(rn, prefix, key) - if err != nil { - return false, nil, err - } - if !dirty { - return false, rn, nil + if !dirty || err != nil { + return false, rn, err } return true, nn, nil @@ -471,6 +470,7 @@ func (t *Trie) CommitTo(db DatabaseWriter) (root common.Hash, err error) { return (common.Hash{}), err } t.root = cached + t.cachegen++ return common.BytesToHash(hash.(hashNode)), nil } @@ -478,7 +478,7 @@ func (t *Trie) hashRoot(db DatabaseWriter) (node, node, error) { if t.root == nil { return hashNode(emptyRoot.Bytes()), nil, nil } - h := newHasher() + h := newHasher(t.cachegen, t.cachelimit) defer returnHasherToPool(h) return h.hash(t.root, db, true) } diff --git a/trie/trie_test.go b/trie/trie_test.go index 87a7ec258..32fbe6801 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -460,8 +460,7 @@ const benchElemCount = 20000 func benchGet(b *testing.B, commit bool) { trie := new(Trie) if commit { - dir, tmpdb := tempDB() - defer os.RemoveAll(dir) + _, tmpdb := tempDB() trie, _ = New(common.Hash{}, tmpdb) } k := make([]byte, 32) @@ -478,6 +477,13 @@ func benchGet(b *testing.B, commit bool) { for i := 0; i < b.N; i++ { trie.Get(k) } + b.StopTimer() + + if commit { + ldb := trie.db.(*ethdb.LDBDatabase) + ldb.Close() + os.RemoveAll(ldb.Path()) + } } func benchUpdate(b *testing.B, e binary.ByteOrder) *Trie { |