diff options
author | obscuren <geffobscura@gmail.com> | 2015-04-16 07:14:27 +0800 |
---|---|---|
committer | obscuren <geffobscura@gmail.com> | 2015-04-16 07:28:24 +0800 |
commit | 3a51c3b584b16b408c3fbf87c4f9719fcfb1c52a (patch) | |
tree | b2d4a5e7fcc1da2fd5b1aa3c139ddc7594c8646c /eth | |
parent | 97d2954e227049a089652d91e6fb0ea1c8115cc6 (diff) | |
parent | c4678ffd77a18a9d03c888fdf242c9e5915b9f5f (diff) | |
download | go-tangerine-3a51c3b584b16b408c3fbf87c4f9719fcfb1c52a.tar go-tangerine-3a51c3b584b16b408c3fbf87c4f9719fcfb1c52a.tar.gz go-tangerine-3a51c3b584b16b408c3fbf87c4f9719fcfb1c52a.tar.bz2 go-tangerine-3a51c3b584b16b408c3fbf87c4f9719fcfb1c52a.tar.lz go-tangerine-3a51c3b584b16b408c3fbf87c4f9719fcfb1c52a.tar.xz go-tangerine-3a51c3b584b16b408c3fbf87c4f9719fcfb1c52a.tar.zst go-tangerine-3a51c3b584b16b408c3fbf87c4f9719fcfb1c52a.zip |
Merge branch 'develop' into downloader-proto
Diffstat (limited to 'eth')
-rw-r--r-- | eth/backend.go | 29 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 70 | ||||
-rw-r--r-- | eth/protocol.go | 12 |
3 files changed, 79 insertions, 32 deletions
diff --git a/eth/backend.go b/eth/backend.go index a71d5721e..3d5c4ba09 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -4,6 +4,7 @@ import ( "crypto/ecdsa" "fmt" "io/ioutil" + "math" "path" "strings" @@ -43,6 +44,9 @@ type Config struct { ProtocolVersion int NetworkId int + BlockChainVersion int + SkipBcVersionCheck bool // e.g. blockchain export + DataDir string LogFile string LogLevel int @@ -151,7 +155,7 @@ type Ethereum struct { } func New(config *Config) (*Ethereum, error) { - // Boostrap database + // Bootstrap database logger.New(config.DataDir, config.LogFile, config.LogLevel) if len(config.LogJSON) > 0 { logger.NewJSONsystem(config.DataDir, config.LogJSON) @@ -181,6 +185,16 @@ func New(config *Config) (*Ethereum, error) { saveProtocolVersion(blockDb, config.ProtocolVersion) glog.V(logger.Info).Infof("Protocol Version: %v, Network Id: %v", config.ProtocolVersion, config.NetworkId) + if !config.SkipBcVersionCheck { + b, _ := blockDb.Get([]byte("BlockchainVersion")) + bcVersion := int(common.NewValue(b).Uint()) + if bcVersion != config.BlockChainVersion && bcVersion != 0 { + return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.\n", bcVersion, config.BlockChainVersion) + } + saveBlockchainVersion(blockDb, config.BlockChainVersion) + } + glog.V(logger.Info).Infof("Blockchain DB Version: %d", config.BlockChainVersion) + eth := &Ethereum{ shutdownChan: make(chan bool), blockDb: blockDb, @@ -439,7 +453,7 @@ func (self *Ethereum) txBroadcastLoop() { // automatically stops if unsubscribe for obj := range self.txSub.Chan() { event := obj.(core.TxPreEvent) - self.net.Broadcast("eth", TxMsg, []*types.Transaction{event.Tx}) + self.net.BroadcastLimited("eth", TxMsg, math.Sqrt, []*types.Transaction{event.Tx}) self.syncAccounts(event.Tx) } } @@ -463,7 +477,7 @@ func (self *Ethereum) blockBroadcastLoop() { for obj := range self.blockSub.Chan() { switch ev := obj.(type) { case core.ChainHeadEvent: - self.net.Broadcast("eth", NewBlockMsg, []interface{}{ev.Block, ev.Block.Td}) + self.net.BroadcastLimited("eth", NewBlockMsg, math.Sqrt, []interface{}{ev.Block, ev.Block.Td}) } } } @@ -476,3 +490,12 @@ func saveProtocolVersion(db common.Database, protov int) { db.Put([]byte("ProtocolVersion"), common.NewValue(protov).Bytes()) } } + +func saveBlockchainVersion(db common.Database, bcVersion int) { + d, _ := db.Get([]byte("BlockchainVersion")) + blockchainVersion := common.NewValue(d).Uint() + + if blockchainVersion == 0 { + db.Put([]byte("BlockchainVersion"), common.NewValue(bcVersion).Bytes()) + } +} diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 83e6b8d32..1707e1395 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -54,8 +54,9 @@ type blockPack struct { } type syncPack struct { - peer *peer - hash common.Hash + peer *peer + hash common.Hash + ignoreInitial bool } func New(hasBlock hashCheckFn, insertChain chainInsertFn, currentTd currentTdFn) *Downloader { @@ -104,11 +105,13 @@ func (d *Downloader) UnregisterPeer(id string) { func (d *Downloader) peerHandler() { // itimer is used to determine when to start ignoring `minDesiredPeerCount` - itimer := time.NewTicker(5 * time.Second) + //itimer := time.NewTicker(5 * time.Second) + itimer := time.NewTimer(5 * time.Second) out: for { select { case <-d.newPeerCh: + itimer.Stop() // Meet the `minDesiredPeerCount` before we select our best peer if len(d.peers) < minDesiredPeerCount { break @@ -137,7 +140,7 @@ func (d *Downloader) selectPeer(p *peer) { } glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td) - d.syncCh <- syncPack{p, p.recentHash} + d.syncCh <- syncPack{p, p.recentHash, false} } } @@ -147,11 +150,11 @@ out: select { case sync := <-d.syncCh: selectedPeer := sync.peer - glog.V(logger.Detail).Infoln("Synchronising with network using:", selectedPeer.id) + glog.V(logger.Detail).Infoln("Synchronising with the network using:", selectedPeer.id) // Start the fetcher. This will block the update entirely // interupts need to be send to the appropriate channels // respectively. - if err := d.startFetchingHashes(selectedPeer, sync.hash); err != nil { + if err := d.startFetchingHashes(selectedPeer, sync.hash, sync.ignoreInitial); err != nil { // handle error glog.V(logger.Debug).Infoln("Error fetching hashes:", err) // XXX Reset @@ -178,11 +181,18 @@ out: } // XXX Make synchronous -func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash) error { - glog.V(logger.Debug).Infoln("Downloading hashes") +func (d *Downloader) startFetchingHashes(p *peer, hash common.Hash, ignoreInitial bool) error { + glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", hash.Bytes()[:4], p.id) start := time.Now() + // We ignore the initial hash in some cases (e.g. we received a block without it's parent) + // In such circumstances we don't need to download the block so don't add it to the queue. + if !ignoreInitial { + // Add the hash to the queue first + d.queue.hashPool.Add(hash) + } + // Get the first batch of hashes p.getHashes(hash) atomic.StoreInt32(&d.fetchingHashes, 1) @@ -195,7 +205,7 @@ out: hashSet := set.New() for _, hash := range hashes { if d.hasBlock(hash) { - glog.V(logger.Debug).Infof("Found common hash %x\n", hash) + glog.V(logger.Debug).Infof("Found common hash %x\n", hash[:4]) done = true break @@ -207,7 +217,7 @@ out: // Add hashes to the chunk set // Check if we're done fetching - if !done { + if !done && len(hashes) > 0 { //fmt.Println("re-fetch. current =", d.queue.hashPool.Size()) // Get the next set of hashes p.getHashes(hashes[len(hashes)-1]) @@ -218,7 +228,7 @@ out: } } } - glog.V(logger.Detail).Infoln("Download hashes: done. Took", time.Since(start)) + glog.V(logger.Detail).Infof("Downloaded hashes (%d). Took %v\n", d.queue.hashPool.Size(), time.Since(start)) return nil } @@ -242,6 +252,10 @@ out: // from the available peers. if d.queue.hashPool.Size() > 0 { availablePeers := d.peers.get(idleState) + if len(availablePeers) == 0 { + glog.V(logger.Detail).Infoln("No peers available out of", len(d.peers)) + } + for _, peer := range availablePeers { // Get a possible chunk. If nil is returned no chunk // could be returned due to no hashes available. @@ -317,21 +331,33 @@ func (d *Downloader) AddBlock(id string, block *types.Block, td *big.Int) { return } - glog.V(logger.Detail).Infoln("Inserting new block from:", id) - d.queue.addBlock(id, block, td) - + peer := d.peers.getPeer(id) // if the peer is in our healthy list of peers; update the td - // here is a good chance to add the peer back to the list - if peer := d.peers.getPeer(id); peer != nil { - peer.mu.Lock() - peer.td = td - peer.recentHash = block.Hash() - peer.mu.Unlock() + // and add the block. Otherwise just ignore it + if peer == nil { + glog.V(logger.Detail).Infof("Ignored block from bad peer %s\n", id) + return } + peer.mu.Lock() + peer.td = td + peer.recentHash = block.Hash() + peer.mu.Unlock() + + glog.V(logger.Detail).Infoln("Inserting new block from:", id) + d.queue.addBlock(id, block, td) + // if neither go ahead to process if !(d.isFetchingHashes() || d.isDownloadingBlocks()) { - d.process() + // Check if the parent of the received block is known. + // If the block is not know, request it otherwise, request. + phash := block.ParentHash() + if !d.hasBlock(phash) { + glog.V(logger.Detail).Infof("Missing parent %x, requires fetching\n", phash.Bytes()[:4]) + d.syncCh <- syncPack{peer, peer.recentHash, true} + } else { + d.process() + } } } @@ -369,7 +395,7 @@ func (d *Downloader) process() error { // TODO change this. This shite for i, block := range blocks[:max] { if !d.hasBlock(block.ParentHash()) { - d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash()} + d.syncCh <- syncPack{d.peers.bestPeer(), block.Hash(), true} // remove processed blocks blocks = blocks[i:] diff --git a/eth/protocol.go b/eth/protocol.go index b15868898..a85d15a0c 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -324,7 +324,7 @@ func (self *ethProtocol) handle() error { // to simplify backend interface adding a new block // uses AddPeer followed by AddBlock only if peer is the best peer // (or selected as new best peer) - if best, _ := self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect); best { + if _, suspended := self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect); !suspended { self.blockPool.AddBlock(request.Block, self.id) } @@ -415,11 +415,9 @@ func (self *ethProtocol) sendStatus() error { } func (self *ethProtocol) protoErrorDisconnect(err *errs.Error) { - //err.Log(self.peer.Logger) err.Log(glog.V(logger.Info)) - /* - if err.Fatal() { - self.peer.Disconnect(p2p.DiscSubprotocolError) - } - */ + if err.Fatal() { + self.peer.Disconnect(p2p.DiscSubprotocolError) + } + } |