From 49a513bdebd7c4402b3a7f2f169a31c34f2ca9df Mon Sep 17 00:00:00 2001 From: Bas van Kervel Date: Mon, 13 Apr 2015 10:13:52 +0200 Subject: Added blockchain DB versioning support, closes #650 --- eth/backend.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index c7a5b233f..f073ec6e6 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -42,6 +42,9 @@ type Config struct { ProtocolVersion int NetworkId int + BlockChainVersion int + SkipBcVersionCheck bool // e.g. blockchain export + DataDir string LogFile string LogLevel int @@ -149,7 +152,7 @@ type Ethereum struct { } func New(config *Config) (*Ethereum, error) { - // Boostrap database + // Bootstrap database logger.New(config.DataDir, config.LogFile, config.LogLevel) if len(config.LogJSON) > 0 { logger.NewJSONsystem(config.DataDir, config.LogJSON) @@ -179,6 +182,16 @@ func New(config *Config) (*Ethereum, error) { saveProtocolVersion(blockDb, config.ProtocolVersion) glog.V(logger.Info).Infof("Protocol Version: %v, Network Id: %v", config.ProtocolVersion, config.NetworkId) + if !config.SkipBcVersionCheck { + b, _ := blockDb.Get([]byte("BlockchainVersion")) + bcVersion := int(common.NewValue(b).Uint()) + if bcVersion != config.BlockChainVersion && bcVersion != 0 { + return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, config.BlockChainVersion) + } + saveBlockchainVersion(blockDb, config.BlockChainVersion) + } + glog.V(logger.Info).Infof("Blockchain DB Version: %d", config.BlockChainVersion) + eth := &Ethereum{ shutdownChan: make(chan bool), blockDb: blockDb, @@ -472,3 +485,12 @@ func saveProtocolVersion(db common.Database, protov int) { db.Put([]byte("ProtocolVersion"), common.NewValue(protov).Bytes()) } } + +func saveBlockchainVersion(db common.Database, bcVersion int) { + d, _ := db.Get([]byte("BlockchainVersion")) + blockchainVersion := common.NewValue(d).Uint() + + if blockchainVersion == 0 { + db.Put([]byte("BlockchainVersion"), common.NewValue(bcVersion).Bytes()) + } +} -- cgit v1.2.3 From 3d57e377a4e95941fd3f572b42e073b40d10d27c Mon Sep 17 00:00:00 2001 From: zelig Date: Sun, 12 Apr 2015 20:25:09 +0100 Subject: blockpool stability fixes: - follow up locks and fix them - chainManager: call SetQueued for parentErr future blocks, uncomment TD checks, unskip test - make ErrIncorrectTD non-fatal to be forgiving to genuine mistaken nodes (temp) but demote them to guard against stuck best peers. - add purging to bounded nodeCache (config nodeCacheSize) - use nodeCache when creating blockpool entries and let non-best peers add blocks (performance boost) - minor error in addError - reduce idleBestPeerTimeout to 1 minute - correct status counts and unskip status passing status test - glogified logging --- eth/protocol.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) (limited to 'eth') diff --git a/eth/protocol.go b/eth/protocol.go index 878038f74..1a19307db 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -299,7 +299,7 @@ func (self *ethProtocol) handle() error { // to simplify backend interface adding a new block // uses AddPeer followed by AddBlock only if peer is the best peer // (or selected as new best peer) - if best, _ := self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect); best { + if _, suspended := self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect); !suspended { self.blockPool.AddBlock(request.Block, self.id) } @@ -384,11 +384,9 @@ func (self *ethProtocol) sendStatus() error { } func (self *ethProtocol) protoErrorDisconnect(err *errs.Error) { - //err.Log(self.peer.Logger) err.Log(glog.V(logger.Info)) - /* - if err.Fatal() { - self.peer.Disconnect(p2p.DiscSubprotocolError) - } - */ + if err.Fatal() { + self.peer.Disconnect(p2p.DiscSubprotocolError) + } + } -- cgit v1.2.3 From 9800c84348b5492dd87802f82ef54c5b9676a52a Mon Sep 17 00:00:00 2001 From: obscuren Date: Tue, 14 Apr 2015 12:49:15 +0200 Subject: eth: limit the amount of peers that will receive Block/Tx messages All transaction and block messages are now limited using `sqrt(peers)` --- eth/backend.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index f073ec6e6..cde7b167d 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -4,6 +4,7 @@ import ( "crypto/ecdsa" "fmt" "io/ioutil" + "math" "path" "strings" @@ -448,7 +449,7 @@ func (self *Ethereum) txBroadcastLoop() { // automatically stops if unsubscribe for obj := range self.txSub.Chan() { event := obj.(core.TxPreEvent) - self.net.Broadcast("eth", TxMsg, []*types.Transaction{event.Tx}) + self.net.BroadcastLimited("eth", TxMsg, math.Sqrt, []*types.Transaction{event.Tx}) self.syncAccounts(event.Tx) } } @@ -472,7 +473,7 @@ func (self *Ethereum) blockBroadcastLoop() { for obj := range self.blockSub.Chan() { switch ev := obj.(type) { case core.ChainHeadEvent: - self.net.Broadcast("eth", NewBlockMsg, []interface{}{ev.Block, ev.Block.Td}) + self.net.BroadcastLimited("eth", NewBlockMsg, math.Sqrt, []interface{}{ev.Block, ev.Block.Td}) } } } -- cgit v1.2.3 From c4678ffd77a18a9d03c888fdf242c9e5915b9f5f Mon Sep 17 00:00:00 2001 From: obscuren Date: Thu, 16 Apr 2015 00:14:31 +0200 Subject: downloader: updated downloader and fixed issues with catch up Properly ignore blocks coming from peers not in our peer list (blocked) and do never request anything from bad peers. Added some checks to account for blocks known when requesting hashes (missing parents). --- eth/downloader/downloader.go | 70 ++++++++++++++++++++++++++++++-------------- 1 file changed, 48 insertions(+), 22 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 83e6b8d32..1707e1395 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -54,8 +54,9 @@ type blockPack struct { } type syncPack struct { - peer *peer - hash common.Hash + peer *peer + hash common.Hash + ignoreInitial bool } func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn) *Downloader { @@ -104,11 +105,13 @@ func (d *Downloader) UnregisterPeer(id string) { func (d *Downloader) peerHandler() { // itimer is used to determine when to start ignoring `minDesiredPeerCount` - itimer := time.NewTicker(5 * time.Second) + //itimer := time.NewTicker(5 * time.Second) + itimer := time.NewTimer(5 * time.Second) out: for { select { case <-d.newPeerCh: + itimer.Stop() // Meet the `minDesiredPeerCount` before we select our best peer if len(d.peers) < minDesiredPeerCount { break @@ -137,7 +140,7 @@ func (d *Downloader) selectPeer(p *peer) { } glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td) - d.syncCh <- syncPack{p, p.recentHash} + d.syncCh <- syncPack{p, p.recentHash, false} } } @@ -147,11 +150,11 @@ out: select { case sync := <-d.syncCh: selectedPeer := sync.peer - glog.V(logger.Detail).Infoln("Synchronising with network using:", selectedPeer.id) + glog.V(logger.Detail).Infoln("Synchronising with the network using:", selectedPeer.id) // Start the fetcher. This will block the update entirely // interupts need to be send to the appropriate channels // respectively. - if err := d.startFetchingHashes(selectedPeer, sync.hash); err != nil { + if err := d.startFetchingHashes(selectedPeer, sync.hash, sync.ignoreInitial); err != nil { // handle error glog.V(logger.Debug).Infoln("Error fetching hashes:", err) // XXX Reset @@ -178,11 +181,18 @@ out: } // XXX Make synchronous -func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash) error { - glog.V(logger.Debug).Infoln("Downloading hashes") +func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitial bool) error { + glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", hash.Bytes()[:4], p.id) start := time.Now() + // We ignore the initial hash in some cases (e.g. we received a block without it's parent) + // In such circumstances we don't need to download the block so don't add it to the queue. + if !ignoreInitial { + // Add the hash to the queue first + d.queue.hashPool.Add(hash) + } + // Get the first batch of hashes p.getHashes(hash) atomic.StoreInt32(&d.fetchingHashes, 1) @@ -195,7 +205,7 @@ out: hashSet := set.New() for _, hash := range hashes { if d.hasBlock(hash) { - glog.V(logger.Debug).Infof("Found common hash %x\n", hash) + glog.V(logger.Debug).Infof("Found common hash %x\n", hash[:4]) done = true break @@ -207,7 +217,7 @@ out: // Add hashes to the chunk set // Check if we're done fetching - if !done { + if !done && len(hashes) > 0 { //fmt.Println("re-fetch. current =", d.queue.hashPool.Size()) // Get the next set of hashes p.getHashes(hashes[len(hashes)-1]) @@ -218,7 +228,7 @@ out: } } } - glog.V(logger.Detail).Infoln("Download hashes: done. Took", time.Since(start)) + glog.V(logger.Detail).Infof("Downloaded hashes (%d). Took %v\n", d.queue.hashPool.Size(), time.Since(start)) return nil } @@ -242,6 +252,10 @@ out: // from the available peers. if d.queue.hashPool.Size() > 0 { availablePeers := d.peers.get(idleState) + if len(availablePeers) == 0 { + glog.V(logger.Detail).Infoln("No peers available out of", len(d.peers)) + } + for _, peer := range availablePeers { // Get a possible chunk. If nil is returned no chunk // could be returned due to no hashes available. @@ -317,21 +331,33 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) { return } - glog.V(logger.Detail).Infoln("Inserting new block from:", id) - d.queue.addBlock(id, block, td) - + peer := d.peers.getPeer(id) // if the peer is in our healthy list of peers; update the td - // here is a good chance to add the peer back to the list - if peer := d.peers.getPeer(id); peer != nil { - peer.mu.Lock() - peer.td = td - peer.recentHash = block.Hash() - peer.mu.Unlock() + // and add the block. Otherwise just ignore it + if peer == nil { + glog.V(logger.Detail).Infof("Ignored block from bad peer %s\n", id) + return } + peer.mu.Lock() + peer.td = td + peer.recentHash = block.Hash() + peer.mu.Unlock() + + glog.V(logger.Detail).Infoln("Inserting new block from:", id) + d.queue.addBlock(id, block, td) + // if neither go ahead to process if !(d.isFetchingHashes() || d.isDownloadingBlocks()) { - d.process() + // Check if the parent of the received block is known. + // If the block is not know, request it otherwise, request. + phash := block.ParentHash() + if !d.hasBlock(phash) { + glog.V(logger.Detail).Infof("Missing parent %x, requires fetching\n", phash.Bytes()[:4]) + d.syncCh <- syncPack{peer, peer.recentHash, true} + } else { + d.process() + } } } @@ -369,7 +395,7 @@ func (d *Downloader) process() error { // TODO change this. This shite for i, block := range blocks[:max] { if !d.hasBlock(block.ParentHash()) { - d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash()} + d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash(), true} // remove processed blocks blocks = blocks[i:] -- cgit v1.2.3