aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--common/db.go1
-rw-r--r--core/chain_manager.go10
-rw-r--r--eth/backend.go67
-rw-r--r--ethdb/database.go28
-rw-r--r--ethdb/memory_database.go4
-rw-r--r--tests/block_test_util.go82
6 files changed, 144 insertions, 48 deletions
diff --git a/common/db.go b/common/db.go
index 408b1e755..ae13c7557 100644
--- a/common/db.go
+++ b/common/db.go
@@ -7,4 +7,5 @@ type Database interface {
Delete(key []byte) error
LastKnownTD() []byte
Close()
+ Flush() error
}
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 47f84b80a..a09b2e63b 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -342,14 +342,14 @@ func (self *ChainManager) Export(w io.Writer) error {
}
func (bc *ChainManager) insert(block *types.Block) {
- bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes())
- bc.currentBlock = block
- bc.lastBlockHash = block.Hash()
-
key := append(blockNumPre, block.Number().Bytes()...)
- bc.blockDb.Put(key, bc.lastBlockHash.Bytes())
+ bc.blockDb.Put(key, block.Hash().Bytes())
// Push block to cache
bc.cache.Push(block)
+
+ bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes())
+ bc.currentBlock = block
+ bc.lastBlockHash = block.Hash()
}
func (bc *ChainManager) write(block *types.Block) {
diff --git a/eth/backend.go b/eth/backend.go
index a78d34057..356e7fd1a 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -5,6 +5,7 @@ import (
"fmt"
"path"
"strings"
+ "time"
"github.com/ethereum/ethash"
"github.com/ethereum/go-ethereum/accounts"
@@ -124,6 +125,8 @@ type Ethereum struct {
blockDb common.Database // Block chain database
stateDb common.Database // State changes database
extraDb common.Database // Extra database (txs, etc)
+ // Closed when databases are flushed and closed
+ databasesClosed chan bool
//*** SERVICES ***
// State manager for processing new blocks and managing the over all states
@@ -198,18 +201,19 @@ func New(config *Config) (*Ethereum, error) {
glog.V(logger.Info).Infof("Blockchain DB Version: %d", config.BlockChainVersion)
eth := &Ethereum{
- shutdownChan: make(chan bool),
- blockDb: blockDb,
- stateDb: stateDb,
- extraDb: extraDb,
- eventMux: &event.TypeMux{},
- accountManager: config.AccountManager,
- DataDir: config.DataDir,
- etherbase: common.HexToAddress(config.Etherbase),
- clientVersion: config.Name, // TODO should separate from Name
- ethVersionId: config.ProtocolVersion,
- netVersionId: config.NetworkId,
- NatSpec: config.NatSpec,
+ shutdownChan: make(chan bool),
+ databasesClosed: make(chan bool),
+ blockDb: blockDb,
+ stateDb: stateDb,
+ extraDb: extraDb,
+ eventMux: &event.TypeMux{},
+ accountManager: config.AccountManager,
+ DataDir: config.DataDir,
+ etherbase: common.HexToAddress(config.Etherbase),
+ clientVersion: config.Name, // TODO should separate from Name
+ ethVersionId: config.ProtocolVersion,
+ netVersionId: config.NetworkId,
+ NatSpec: config.NatSpec,
}
eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux())
@@ -377,6 +381,9 @@ func (s *Ethereum) Start() error {
}
}
+ // periodically flush databases
+ go s.syncDatabases()
+
// Start services
go s.txPool.Start()
s.protocolManager.Start()
@@ -393,6 +400,34 @@ func (s *Ethereum) Start() error {
return nil
}
+func (s *Ethereum) syncDatabases() {
+ ticker := time.NewTicker(1 * time.Minute)
+done:
+ for {
+ select {
+ case <-ticker.C:
+ // don't change the order of database flushes
+ if err := s.extraDb.Flush(); err != nil {
+ glog.V(logger.Error).Infof("error: flush extraDb: %v\n", err)
+ }
+ if err := s.stateDb.Flush(); err != nil {
+ glog.V(logger.Error).Infof("error: flush stateDb: %v\n", err)
+ }
+ if err := s.blockDb.Flush(); err != nil {
+ glog.V(logger.Error).Infof("error: flush blockDb: %v\n", err)
+ }
+ case <-s.shutdownChan:
+ break done
+ }
+ }
+
+ s.blockDb.Close()
+ s.stateDb.Close()
+ s.extraDb.Close()
+
+ close(s.databasesClosed)
+}
+
func (s *Ethereum) StartForTest() {
jsonlogger.LogJson(&logger.LogStarting{
ClientString: s.net.Name,
@@ -413,12 +448,7 @@ func (self *Ethereum) SuggestPeer(nodeURL string) error {
}
func (s *Ethereum) Stop() {
- // Close the database
- defer s.blockDb.Close()
- defer s.stateDb.Close()
- defer s.extraDb.Close()
-
- s.txSub.Unsubscribe() // quits txBroadcastLoop
+ s.txSub.Unsubscribe() // quits txBroadcastLoop
s.protocolManager.Stop()
s.txPool.Stop()
@@ -433,6 +463,7 @@ func (s *Ethereum) Stop() {
// This function will wait for a shutdown and resumes main thread execution
func (s *Ethereum) WaitForShutdown() {
+ <-s.databasesClosed
<-s.shutdownChan
}
diff --git a/ethdb/database.go b/ethdb/database.go
index eb562f852..57a3f9ee6 100644
--- a/ethdb/database.go
+++ b/ethdb/database.go
@@ -2,7 +2,6 @@ package ethdb
import (
"sync"
- "time"
"github.com/ethereum/go-ethereum/compression/rle"
"github.com/ethereum/go-ethereum/logger"
@@ -35,8 +34,6 @@ func NewLDBDatabase(file string) (*LDBDatabase, error) {
}
database.makeQueue()
- go database.update()
-
return database, nil
}
@@ -111,35 +108,16 @@ func (self *LDBDatabase) Flush() error {
}
self.makeQueue() // reset the queue
+ glog.V(logger.Detail).Infoln("Flush database: ", self.fn)
+
return self.db.Write(batch, nil)
}
func (self *LDBDatabase) Close() {
- self.quit <- struct{}{}
- <-self.quit
- glog.V(logger.Info).Infoln("flushed and closed db:", self.fn)
-}
-
-func (self *LDBDatabase) update() {
- ticker := time.NewTicker(1 * time.Minute)
-done:
- for {
- select {
- case <-ticker.C:
- if err := self.Flush(); err != nil {
- glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err)
- }
- case <-self.quit:
- break done
- }
- }
-
if err := self.Flush(); err != nil {
glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err)
}
- // Close the leveldb database
self.db.Close()
-
- self.quit <- struct{}{}
+ glog.V(logger.Error).Infoln("flushed and closed db:", self.fn)
}
diff --git a/ethdb/memory_database.go b/ethdb/memory_database.go
index d4988d0d8..f5d5faee7 100644
--- a/ethdb/memory_database.go
+++ b/ethdb/memory_database.go
@@ -65,3 +65,7 @@ func (db *MemDatabase) LastKnownTD() []byte {
return data
}
+
+func (db *MemDatabase) Flush() error {
+ return nil
+}
diff --git a/tests/block_test_util.go b/tests/block_test_util.go
index df651d24e..f34c5d200 100644
--- a/tests/block_test_util.go
+++ b/tests/block_test_util.go
@@ -160,7 +160,89 @@ func (t *BlockTest) TryBlocksInsert(chainManager *core.ChainManager) error {
if b.BlockHeader == nil {
return fmt.Errorf("Block insertion should have failed")
}
+ err = validateBlockHeader(b.BlockHeader, cb.Header())
+ if err != nil {
+ return fmt.Errorf("Block header validation failed: ", err)
+ }
+ }
+ return nil
+}
+
+func validateBlockHeader(h *btHeader, h2 *types.Header) error {
+ expectedBloom := mustConvertBytes(h.Bloom)
+ if !bytes.Equal(expectedBloom, h2.Bloom.Bytes()) {
+ return fmt.Errorf("Bloom: expected: %v, decoded: %v", expectedBloom, h2.Bloom.Bytes())
+ }
+
+ expectedCoinbase := mustConvertBytes(h.Coinbase)
+ if !bytes.Equal(expectedCoinbase, h2.Coinbase.Bytes()) {
+ return fmt.Errorf("Coinbase: expected: %v, decoded: %v", expectedCoinbase, h2.Coinbase.Bytes())
+ }
+
+ expectedMixHashBytes := mustConvertBytes(h.MixHash)
+ if !bytes.Equal(expectedMixHashBytes, h2.MixDigest.Bytes()) {
+ return fmt.Errorf("MixHash: expected: %v, decoded: %v", expectedMixHashBytes, h2.MixDigest.Bytes())
+ }
+
+ expectedNonce := mustConvertBytes(h.Nonce)
+ if !bytes.Equal(expectedNonce, h2.Nonce[:]) {
+ return fmt.Errorf("Nonce: expected: %v, decoded: %v", expectedNonce, h2.Nonce[:])
+ }
+
+ expectedNumber := mustConvertBigInt(h.Number, 16)
+ if expectedNumber.Cmp(h2.Number) != 0 {
+ return fmt.Errorf("Number: expected: %v, decoded: %v", expectedNumber, h2.Number)
+ }
+
+ expectedParentHash := mustConvertBytes(h.ParentHash)
+ if !bytes.Equal(expectedParentHash, h2.ParentHash.Bytes()) {
+ return fmt.Errorf("Parent hash: expected: %v, decoded: %v", expectedParentHash, h2.ParentHash.Bytes())
+ }
+
+ expectedReceiptHash := mustConvertBytes(h.ReceiptTrie)
+ if !bytes.Equal(expectedReceiptHash, h2.ReceiptHash.Bytes()) {
+ return fmt.Errorf("Receipt hash: expected: %v, decoded: %v", expectedReceiptHash, h2.ReceiptHash.Bytes())
+ }
+
+ expectedTxHash := mustConvertBytes(h.TransactionsTrie)
+ if !bytes.Equal(expectedTxHash, h2.TxHash.Bytes()) {
+ return fmt.Errorf("Tx hash: expected: %v, decoded: %v", expectedTxHash, h2.TxHash.Bytes())
+ }
+
+ expectedStateHash := mustConvertBytes(h.StateRoot)
+ if !bytes.Equal(expectedStateHash, h2.Root.Bytes()) {
+ return fmt.Errorf("State hash: expected: %v, decoded: %v", expectedStateHash, h2.Root.Bytes())
+ }
+
+ expectedUncleHash := mustConvertBytes(h.UncleHash)
+ if !bytes.Equal(expectedUncleHash, h2.UncleHash.Bytes()) {
+ return fmt.Errorf("Uncle hash: expected: %v, decoded: %v", expectedUncleHash, h2.UncleHash.Bytes())
+ }
+
+ expectedExtraData := mustConvertBytes(h.ExtraData)
+ if !bytes.Equal(expectedExtraData, h2.Extra) {
+ return fmt.Errorf("Extra data: expected: %v, decoded: %v", expectedExtraData, h2.Extra)
}
+
+ expectedDifficulty := mustConvertBigInt(h.Difficulty, 16)
+ if expectedDifficulty.Cmp(h2.Difficulty) != 0 {
+ return fmt.Errorf("Difficulty: expected: %v, decoded: %v", expectedDifficulty, h2.Difficulty)
+ }
+
+ expectedGasLimit := mustConvertBigInt(h.GasLimit, 16)
+ if expectedGasLimit.Cmp(h2.GasLimit) != 0 {
+ return fmt.Errorf("GasLimit: expected: %v, decoded: %v", expectedGasLimit, h2.GasLimit)
+ }
+ expectedGasUsed := mustConvertBigInt(h.GasUsed, 16)
+ if expectedGasUsed.Cmp(h2.GasUsed) != 0 {
+ return fmt.Errorf("GasUsed: expected: %v, decoded: %v", expectedGasUsed, h2.GasUsed)
+ }
+
+ expectedTimestamp := mustConvertUint(h.Timestamp, 16)
+ if expectedTimestamp != h2.Time {
+ return fmt.Errorf("Timestamp: expected: %v, decoded: %v", expectedTimestamp, h2.Time)
+ }
+
return nil
}