From 07e85d8e146c0aa08fcfe0871d1e9b69487e67ea Mon Sep 17 00:00:00 2001
From: Bas van Kervel <basvankervel@gmail.com>
Date: Wed, 22 Apr 2015 12:46:41 +0200
Subject: Moved leveldb update loop to eth/backend

---
 common/db.go             |  1 +
 eth/backend.go           | 65 +++++++++++++++++++++++++++++++++++-------------
 ethdb/database.go        | 28 +++------------------
 ethdb/memory_database.go |  4 +++
 4 files changed, 56 insertions(+), 42 deletions(-)

diff --git a/common/db.go b/common/db.go
index 408b1e755..ae13c7557 100644
--- a/common/db.go
+++ b/common/db.go
@@ -7,4 +7,5 @@ type Database interface {
 	Delete(key []byte) error
 	LastKnownTD() []byte
 	Close()
+	Flush() error
 }
diff --git a/eth/backend.go b/eth/backend.go
index 88456e448..18c3397c8 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -6,6 +6,7 @@ import (
 	"math"
 	"path"
 	"strings"
+	"time"
 
 	"github.com/ethereum/ethash"
 	"github.com/ethereum/go-ethereum/accounts"
@@ -124,6 +125,8 @@ type Ethereum struct {
 	blockDb common.Database // Block chain database
 	stateDb common.Database // State changes database
 	extraDb common.Database // Extra database (txs, etc)
+	// Closed when databases are flushed and closed
+	databasesClosed chan bool
 
 	//*** SERVICES ***
 	// State manager for processing new blocks and managing the over all states
@@ -199,18 +202,19 @@ func New(config *Config) (*Ethereum, error) {
 	glog.V(logger.Info).Infof("Blockchain DB Version: %d", config.BlockChainVersion)
 
 	eth := &Ethereum{
-		shutdownChan:   make(chan bool),
-		blockDb:        blockDb,
-		stateDb:        stateDb,
-		extraDb:        extraDb,
-		eventMux:       &event.TypeMux{},
-		accountManager: config.AccountManager,
-		DataDir:        config.DataDir,
-		etherbase:      common.HexToAddress(config.Etherbase),
-		clientVersion:  config.Name, // TODO should separate from Name
-		ethVersionId:   config.ProtocolVersion,
-		netVersionId:   config.NetworkId,
-		NatSpec:        config.NatSpec,
+		shutdownChan:    make(chan bool),
+		databasesClosed: make(chan bool),
+		blockDb:         blockDb,
+		stateDb:         stateDb,
+		extraDb:         extraDb,
+		eventMux:        &event.TypeMux{},
+		accountManager:  config.AccountManager,
+		DataDir:         config.DataDir,
+		etherbase:       common.HexToAddress(config.Etherbase),
+		clientVersion:   config.Name, // TODO should separate from Name
+		ethVersionId:    config.ProtocolVersion,
+		netVersionId:    config.NetworkId,
+		NatSpec:         config.NatSpec,
 	}
 
 	eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux())
@@ -378,6 +382,9 @@ func (s *Ethereum) Start() error {
 		}
 	}
 
+	// periodically flush databases
+	go s.syncDatabases()
+
 	// Start services
 	s.txPool.Start()
 
@@ -397,6 +404,34 @@ func (s *Ethereum) Start() error {
 	return nil
 }
 
+func (s *Ethereum) syncDatabases() {
+	ticker := time.NewTicker(1 * time.Minute)
+done:
+	for {
+		select {
+		case <-ticker.C:
+			// don't change the order of database flushes
+			if err := s.extraDb.Flush(); err != nil {
+				glog.V(logger.Error).Infof("error: flush extraDb: %v\n", err)
+			}
+			if err := s.stateDb.Flush(); err != nil {
+				glog.V(logger.Error).Infof("error: flush stateDb: %v\n", err)
+			}
+			if err := s.blockDb.Flush(); err != nil {
+				glog.V(logger.Error).Infof("error: flush blockDb: %v\n", err)
+			}
+		case <-s.shutdownChan:
+			break done
+		}
+	}
+
+	s.blockDb.Close()
+	s.stateDb.Close()
+	s.extraDb.Close()
+
+	close(s.databasesClosed)
+}
+
 func (s *Ethereum) StartForTest() {
 	jsonlogger.LogJson(&logger.LogStarting{
 		ClientString:    s.net.Name,
@@ -417,11 +452,6 @@ func (self *Ethereum) SuggestPeer(nodeURL string) error {
 }
 
 func (s *Ethereum) Stop() {
-	// Close the database
-	defer s.blockDb.Close()
-	defer s.stateDb.Close()
-	defer s.extraDb.Close()
-
 	s.txSub.Unsubscribe()         // quits txBroadcastLoop
 	s.minedBlockSub.Unsubscribe() // quits blockBroadcastLoop
 
@@ -437,6 +467,7 @@ func (s *Ethereum) Stop() {
 
 // This function will wait for a shutdown and resumes main thread execution
 func (s *Ethereum) WaitForShutdown() {
+	<-s.databasesClosed
 	<-s.shutdownChan
 }
 
diff --git a/ethdb/database.go b/ethdb/database.go
index eb562f852..57a3f9ee6 100644
--- a/ethdb/database.go
+++ b/ethdb/database.go
@@ -2,7 +2,6 @@ package ethdb
 
 import (
 	"sync"
-	"time"
 
 	"github.com/ethereum/go-ethereum/compression/rle"
 	"github.com/ethereum/go-ethereum/logger"
@@ -35,8 +34,6 @@ func NewLDBDatabase(file string) (*LDBDatabase, error) {
 	}
 	database.makeQueue()
 
-	go database.update()
-
 	return database, nil
 }
 
@@ -111,35 +108,16 @@ func (self *LDBDatabase) Flush() error {
 	}
 	self.makeQueue() // reset the queue
 
+	glog.V(logger.Detail).Infoln("Flush database: ", self.fn)
+
 	return self.db.Write(batch, nil)
 }
 
 func (self *LDBDatabase) Close() {
-	self.quit <- struct{}{}
-	<-self.quit
-	glog.V(logger.Info).Infoln("flushed and closed db:", self.fn)
-}
-
-func (self *LDBDatabase) update() {
-	ticker := time.NewTicker(1 * time.Minute)
-done:
-	for {
-		select {
-		case <-ticker.C:
-			if err := self.Flush(); err != nil {
-				glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err)
-			}
-		case <-self.quit:
-			break done
-		}
-	}
-
 	if err := self.Flush(); err != nil {
 		glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err)
 	}
 
-	// Close the leveldb database
 	self.db.Close()
-
-	self.quit <- struct{}{}
+	glog.V(logger.Error).Infoln("flushed and closed db:", self.fn)
 }
diff --git a/ethdb/memory_database.go b/ethdb/memory_database.go
index d4988d0d8..f5d5faee7 100644
--- a/ethdb/memory_database.go
+++ b/ethdb/memory_database.go
@@ -65,3 +65,7 @@ func (db *MemDatabase) LastKnownTD() []byte {
 
 	return data
 }
+
+func (db *MemDatabase) Flush() error {
+	return nil
+}
-- 
cgit v1.2.3


From c9e22976f59f788a9776beab346e7dcf25af48ac Mon Sep 17 00:00:00 2001
From: Bas van Kervel <basvankervel@gmail.com>
Date: Wed, 22 Apr 2015 12:50:33 +0200
Subject: change order of block insert and update LastBlock

---
 core/chain_manager.go | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/core/chain_manager.go b/core/chain_manager.go
index 1df56b27f..e5fcd4afc 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -342,14 +342,14 @@ func (self *ChainManager) Export(w io.Writer) error {
 }
 
 func (bc *ChainManager) insert(block *types.Block) {
-	bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes())
-	bc.currentBlock = block
-	bc.lastBlockHash = block.Hash()
-
 	key := append(blockNumPre, block.Number().Bytes()...)
 	bc.blockDb.Put(key, bc.lastBlockHash.Bytes())
 	// Push block to cache
 	bc.cache.Push(block)
+
+	bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes())
+	bc.currentBlock = block
+	bc.lastBlockHash = block.Hash()
 }
 
 func (bc *ChainManager) write(block *types.Block) {
-- 
cgit v1.2.3


From 5cfa0e918799f296580d01093b2e3ec921b93ba4 Mon Sep 17 00:00:00 2001
From: Bas van Kervel <basvankervel@gmail.com>
Date: Thu, 23 Apr 2015 17:35:05 +0200
Subject: bugfix, wrong hash stored in blockDb

---
 core/chain_manager.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/core/chain_manager.go b/core/chain_manager.go
index e5fcd4afc..0003a6011 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -343,7 +343,7 @@ func (self *ChainManager) Export(w io.Writer) error {
 
 func (bc *ChainManager) insert(block *types.Block) {
 	key := append(blockNumPre, block.Number().Bytes()...)
-	bc.blockDb.Put(key, bc.lastBlockHash.Bytes())
+	bc.blockDb.Put(key, block.Hash().Bytes())
 	// Push block to cache
 	bc.cache.Push(block)
 
-- 
cgit v1.2.3


From c273ed7d82f3d5beab7c213fbe1f5c0942adf0bd Mon Sep 17 00:00:00 2001
From: Bas van Kervel <basvankervel@gmail.com>
Date: Wed, 22 Apr 2015 12:46:41 +0200
Subject: Moved leveldb update loop to eth/backend

change order of block insert and update LastBlock

bugfix, wrong hash stored in blockDb
---
 common/db.go             |  1 +
 core/chain_manager.go    | 10 ++++----
 eth/backend.go           | 67 +++++++++++++++++++++++++++++++++++-------------
 ethdb/database.go        | 28 +++-----------------
 ethdb/memory_database.go |  4 +++
 5 files changed, 62 insertions(+), 48 deletions(-)

diff --git a/common/db.go b/common/db.go
index 408b1e755..ae13c7557 100644
--- a/common/db.go
+++ b/common/db.go
@@ -7,4 +7,5 @@ type Database interface {
 	Delete(key []byte) error
 	LastKnownTD() []byte
 	Close()
+	Flush() error
 }
diff --git a/core/chain_manager.go b/core/chain_manager.go
index 47f84b80a..a09b2e63b 100644
--- a/core/chain_manager.go
+++ b/core/chain_manager.go
@@ -342,14 +342,14 @@ func (self *ChainManager) Export(w io.Writer) error {
 }
 
 func (bc *ChainManager) insert(block *types.Block) {
-	bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes())
-	bc.currentBlock = block
-	bc.lastBlockHash = block.Hash()
-
 	key := append(blockNumPre, block.Number().Bytes()...)
-	bc.blockDb.Put(key, bc.lastBlockHash.Bytes())
+	bc.blockDb.Put(key, block.Hash().Bytes())
 	// Push block to cache
 	bc.cache.Push(block)
+
+	bc.blockDb.Put([]byte("LastBlock"), block.Hash().Bytes())
+	bc.currentBlock = block
+	bc.lastBlockHash = block.Hash()
 }
 
 func (bc *ChainManager) write(block *types.Block) {
diff --git a/eth/backend.go b/eth/backend.go
index 646a4eaf2..0e671f1bf 100644
--- a/eth/backend.go
+++ b/eth/backend.go
@@ -5,6 +5,7 @@ import (
 	"fmt"
 	"path"
 	"strings"
+	"time"
 
 	"github.com/ethereum/ethash"
 	"github.com/ethereum/go-ethereum/accounts"
@@ -123,6 +124,8 @@ type Ethereum struct {
 	blockDb common.Database // Block chain database
 	stateDb common.Database // State changes database
 	extraDb common.Database // Extra database (txs, etc)
+	// Closed when databases are flushed and closed
+	databasesClosed chan bool
 
 	//*** SERVICES ***
 	// State manager for processing new blocks and managing the over all states
@@ -197,18 +200,19 @@ func New(config *Config) (*Ethereum, error) {
 	glog.V(logger.Info).Infof("Blockchain DB Version: %d", config.BlockChainVersion)
 
 	eth := &Ethereum{
-		shutdownChan:   make(chan bool),
-		blockDb:        blockDb,
-		stateDb:        stateDb,
-		extraDb:        extraDb,
-		eventMux:       &event.TypeMux{},
-		accountManager: config.AccountManager,
-		DataDir:        config.DataDir,
-		etherbase:      common.HexToAddress(config.Etherbase),
-		clientVersion:  config.Name, // TODO should separate from Name
-		ethVersionId:   config.ProtocolVersion,
-		netVersionId:   config.NetworkId,
-		NatSpec:        config.NatSpec,
+		shutdownChan:    make(chan bool),
+		databasesClosed: make(chan bool),
+		blockDb:         blockDb,
+		stateDb:         stateDb,
+		extraDb:         extraDb,
+		eventMux:        &event.TypeMux{},
+		accountManager:  config.AccountManager,
+		DataDir:         config.DataDir,
+		etherbase:       common.HexToAddress(config.Etherbase),
+		clientVersion:   config.Name, // TODO should separate from Name
+		ethVersionId:    config.ProtocolVersion,
+		netVersionId:    config.NetworkId,
+		NatSpec:         config.NatSpec,
 	}
 
 	eth.chainManager = core.NewChainManager(blockDb, stateDb, eth.EventMux())
@@ -376,6 +380,9 @@ func (s *Ethereum) Start() error {
 		}
 	}
 
+	// periodically flush databases
+	go s.syncDatabases()
+
 	// Start services
 	go s.txPool.Start()
 	s.protocolManager.Start()
@@ -392,6 +399,34 @@ func (s *Ethereum) Start() error {
 	return nil
 }
 
+func (s *Ethereum) syncDatabases() {
+	ticker := time.NewTicker(1 * time.Minute)
+done:
+	for {
+		select {
+		case <-ticker.C:
+			// don't change the order of database flushes
+			if err := s.extraDb.Flush(); err != nil {
+				glog.V(logger.Error).Infof("error: flush extraDb: %v\n", err)
+			}
+			if err := s.stateDb.Flush(); err != nil {
+				glog.V(logger.Error).Infof("error: flush stateDb: %v\n", err)
+			}
+			if err := s.blockDb.Flush(); err != nil {
+				glog.V(logger.Error).Infof("error: flush blockDb: %v\n", err)
+			}
+		case <-s.shutdownChan:
+			break done
+		}
+	}
+
+	s.blockDb.Close()
+	s.stateDb.Close()
+	s.extraDb.Close()
+
+	close(s.databasesClosed)
+}
+
 func (s *Ethereum) StartForTest() {
 	jsonlogger.LogJson(&logger.LogStarting{
 		ClientString:    s.net.Name,
@@ -412,12 +447,7 @@ func (self *Ethereum) SuggestPeer(nodeURL string) error {
 }
 
 func (s *Ethereum) Stop() {
-	// Close the database
-	defer s.blockDb.Close()
-	defer s.stateDb.Close()
-	defer s.extraDb.Close()
-
-	s.txSub.Unsubscribe() // quits txBroadcastLoop
+	s.txSub.Unsubscribe()         // quits txBroadcastLoop
 
 	s.protocolManager.Stop()
 	s.txPool.Stop()
@@ -432,6 +462,7 @@ func (s *Ethereum) Stop() {
 
 // This function will wait for a shutdown and resumes main thread execution
 func (s *Ethereum) WaitForShutdown() {
+	<-s.databasesClosed
 	<-s.shutdownChan
 }
 
diff --git a/ethdb/database.go b/ethdb/database.go
index eb562f852..57a3f9ee6 100644
--- a/ethdb/database.go
+++ b/ethdb/database.go
@@ -2,7 +2,6 @@ package ethdb
 
 import (
 	"sync"
-	"time"
 
 	"github.com/ethereum/go-ethereum/compression/rle"
 	"github.com/ethereum/go-ethereum/logger"
@@ -35,8 +34,6 @@ func NewLDBDatabase(file string) (*LDBDatabase, error) {
 	}
 	database.makeQueue()
 
-	go database.update()
-
 	return database, nil
 }
 
@@ -111,35 +108,16 @@ func (self *LDBDatabase) Flush() error {
 	}
 	self.makeQueue() // reset the queue
 
+	glog.V(logger.Detail).Infoln("Flush database: ", self.fn)
+
 	return self.db.Write(batch, nil)
 }
 
 func (self *LDBDatabase) Close() {
-	self.quit <- struct{}{}
-	<-self.quit
-	glog.V(logger.Info).Infoln("flushed and closed db:", self.fn)
-}
-
-func (self *LDBDatabase) update() {
-	ticker := time.NewTicker(1 * time.Minute)
-done:
-	for {
-		select {
-		case <-ticker.C:
-			if err := self.Flush(); err != nil {
-				glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err)
-			}
-		case <-self.quit:
-			break done
-		}
-	}
-
 	if err := self.Flush(); err != nil {
 		glog.V(logger.Error).Infof("error: flush '%s': %v\n", self.fn, err)
 	}
 
-	// Close the leveldb database
 	self.db.Close()
-
-	self.quit <- struct{}{}
+	glog.V(logger.Error).Infoln("flushed and closed db:", self.fn)
 }
diff --git a/ethdb/memory_database.go b/ethdb/memory_database.go
index d4988d0d8..f5d5faee7 100644
--- a/ethdb/memory_database.go
+++ b/ethdb/memory_database.go
@@ -65,3 +65,7 @@ func (db *MemDatabase) LastKnownTD() []byte {
 
 	return data
 }
+
+func (db *MemDatabase) Flush() error {
+	return nil
+}
-- 
cgit v1.2.3