From cb1fa523e4930841f4cc92689c850c7831810d03 Mon Sep 17 00:00:00 2001 From: obscuren Date: Sat, 9 May 2015 12:00:51 +0200 Subject: cmd/geth, cmd/mist, eth, flags: renamed loglevel to verbosity --- eth/backend.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index 0f23cde2f..8f0789467 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io/ioutil" + "math/big" "os" "path" "path/filepath" @@ -53,12 +54,12 @@ type Config struct { BlockChainVersion int SkipBcVersionCheck bool // e.g. blockchain export - DataDir string - LogFile string - LogLevel int - LogJSON string - VmDebug bool - NatSpec bool + DataDir string + LogFile string + Verbosity int + LogJSON string + VmDebug bool + NatSpec bool MaxPeers int MaxPendingPeers int @@ -76,6 +77,7 @@ type Config struct { Dial bool Etherbase string + GasPrice *big.Int MinerThreads int AccountManager *accounts.Manager @@ -200,7 +202,7 @@ type Ethereum struct { func New(config *Config) (*Ethereum, error) { // Bootstrap database - logger.New(config.DataDir, config.LogFile, config.LogLevel) + logger.New(config.DataDir, config.LogFile, config.Verbosity) if len(config.LogJSON) > 0 { logger.NewJSONsystem(config.DataDir, config.LogJSON) } @@ -266,6 +268,8 @@ func New(config *Config) (*Ethereum, error) { eth.blockProcessor = core.NewBlockProcessor(stateDb, extraDb, eth.pow, eth.txPool, eth.chainManager, eth.EventMux()) eth.chainManager.SetProcessor(eth.blockProcessor) eth.miner = miner.New(eth, eth.pow, config.MinerThreads) + eth.miner.SetGasPrice(config.GasPrice) + eth.protocolManager = NewProtocolManager(config.ProtocolVersion, config.NetworkId, eth.eventMux, eth.txPool, eth.chainManager, eth.downloader) if config.Shh { eth.whisper = whisper.New() -- cgit v1.2.3 From 05715f27cfd68387f720db115bb8461f8725a341 Mon Sep 17 00:00:00 2001 From: obscuren Date: Sun, 10 May 2015 00:34:07 +0200 Subject: eth: added a cancel method for the downloader Added a cancel method to the downloader which gracefully shuts down any active syncing process (hash fetching or block downloading) and resets the queue and remove any pending blocks. Issue with the downloader which would stall because of an active ongoing process when an invalid block was found. --- eth/downloader/downloader.go | 62 +++++++++++++++++++++++++++++++++++++-- eth/downloader/downloader_test.go | 43 +++++++++++++++++++++++++++ eth/sync.go | 3 ++ 3 files changed, 106 insertions(+), 2 deletions(-) (limited to 'eth') diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 18f8d2ba8..14ca2cd3d 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -34,6 +34,9 @@ var ( errPeersUnavailable = errors.New("no peers available or all peers tried for block download process") errAlreadyInPool = errors.New("hash already in pool") errBlockNumberOverflow = errors.New("received block which overflows") + errCancelHashFetch = errors.New("hash fetching cancelled (requested)") + errCancelBlockFetch = errors.New("block downloading cancelled (requested)") + errNoSyncActive = errors.New("no sync active") ) type hashCheckFn func(common.Hash) bool @@ -74,6 +77,7 @@ type Downloader struct { newPeerCh chan *peer hashCh chan hashPack blockCh chan blockPack + cancelCh chan struct{} } func New(hasBlock hashCheckFn, getBlock getBlockFn) *Downloader { @@ -129,6 +133,9 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error { } defer atomic.StoreInt32(&d.synchronising, 0) + // Create cancel channel for aborting midflight + d.cancelCh = make(chan struct{}) + // Abort if the queue still contains some leftover data if _, cached := d.queue.Size(); cached > 0 && d.queue.GetHeadBlock() != nil { return errPendingQueue @@ -161,7 +168,6 @@ func (d *Downloader) Has(hash common.Hash) bool { } func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) (err error) { - d.activePeer = p.id defer func() { // reset on error @@ -191,6 +197,42 @@ func (d *Downloader) getFromPeer(p *peer, hash common.Hash, ignoreInitial bool) return nil } +// Cancel cancels all of the operations and resets the queue. It returns true +// if the cancel operation was completed. +func (d *Downloader) Cancel() bool { + hs, bs := d.queue.Size() + // If we're not syncing just return. + if atomic.LoadInt32(&d.synchronising) == 0 && hs == 0 && bs == 0 { + return false + } + + close(d.cancelCh) + + // clean up +hashDone: + for { + select { + case <-d.hashCh: + default: + break hashDone + } + } + +blockDone: + for { + select { + case <-d.blockCh: + default: + break blockDone + } + } + + // reset the queue + d.queue.Reset() + + return true +} + // XXX Make synchronous func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial bool) error { glog.V(logger.Debug).Infof("Downloading hashes (%x) from %s", h[:4], p.id) @@ -217,6 +259,8 @@ func (d *Downloader) startFetchingHashes(p *peer, h common.Hash, ignoreInitial b out: for { select { + case <-d.cancelCh: + return errCancelHashFetch case hashPack := <-d.hashCh: // Make sure the active peer is giving us the hashes if hashPack.peerId != activePeer.id { @@ -305,6 +349,8 @@ func (d *Downloader) startFetchingBlocks(p *peer) error { out: for { select { + case <-d.cancelCh: + return errCancelBlockFetch case blockPack := <-d.blockCh: // If the peer was previously banned and failed to deliver it's pack // in a reasonable time frame, ignore it's message. @@ -394,11 +440,23 @@ out: // Deliver a chunk to the downloader. This is usually done through the BlocksMsg by // the protocol handler. -func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) { +func (d *Downloader) DeliverChunk(id string, blocks []*types.Block) error { + // Make sure the downloader is active + if atomic.LoadInt32(&d.synchronising) == 0 { + return errNoSyncActive + } + d.blockCh <- blockPack{id, blocks} + + return nil } func (d *Downloader) AddHashes(id string, hashes []common.Hash) error { + // Make sure the downloader is active + if atomic.LoadInt32(&d.synchronising) == 0 { + return errNoSyncActive + } + // make sure that the hashes that are being added are actually from the peer // that's the current active peer. hashes that have been received from other // peers are dropped and ignored. diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 8ccc4d1a5..d0f8d4c8f 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -182,6 +182,49 @@ func TestTaking(t *testing.T) { } } +func TestInactiveDownloader(t *testing.T) { + targetBlocks := 1000 + hashes := createHashes(0, targetBlocks) + blocks := createBlocksFromHashSet(createHashSet(hashes)) + tester := newTester(t, hashes, nil) + + err := tester.downloader.AddHashes("bad peer 001", hashes) + if err != errNoSyncActive { + t.Error("expected no sync error, got", err) + } + + err = tester.downloader.DeliverChunk("bad peer 001", blocks) + if err != errNoSyncActive { + t.Error("expected no sync error, got", err) + } +} + +func TestCancel(t *testing.T) { + minDesiredPeerCount = 4 + blockTtl = 1 * time.Second + + targetBlocks := 1000 + hashes := createHashes(0, targetBlocks) + blocks := createBlocksFromHashes(hashes) + tester := newTester(t, hashes, blocks) + + tester.newPeer("peer1", big.NewInt(10000), hashes[0]) + + err := tester.sync("peer1", hashes[0]) + if err != nil { + t.Error("download error", err) + } + + if !tester.downloader.Cancel() { + t.Error("cancel operation unsuccessfull") + } + + hashSize, blockSize := tester.downloader.queue.Size() + if hashSize > 0 || blockSize > 0 { + t.Error("block (", blockSize, ") or hash (", hashSize, ") not 0") + } +} + func TestThrottling(t *testing.T) { minDesiredPeerCount = 4 blockTtl = 1 * time.Second diff --git a/eth/sync.go b/eth/sync.go index c49f5209d..d955eaa50 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -63,6 +63,9 @@ func (pm *ProtocolManager) processBlocks() error { max := int(math.Min(float64(len(blocks)), float64(blockProcAmount))) _, err := pm.chainman.InsertChain(blocks[:max]) if err != nil { + // cancel download process + pm.downloader.Cancel() + return err } blocks = blocks[max:] -- cgit v1.2.3 From a2919b5e17197afcb689b8f4144f255a5872f85d Mon Sep 17 00:00:00 2001 From: obscuren Date: Sun, 10 May 2015 23:12:18 +0200 Subject: core, eth, miner: improved tx removal & fatal error on db sync err * core: Added GasPriceChange event * eth: When one of the DB flush methods error a fatal error log message is given. Hopefully this will prevent corrupted databases from occuring. * miner: remove transactions with low gas price. Closes #906, #903 --- eth/backend.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'eth') diff --git a/eth/backend.go b/eth/backend.go index 8f0789467..cdbe35b26 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -451,6 +451,8 @@ func (s *Ethereum) Start() error { return nil } +// sync databases every minute. If flushing fails we exit immediatly. The system +// may not continue under any circumstances. func (s *Ethereum) syncDatabases() { ticker := time.NewTicker(1 * time.Minute) done: @@ -459,13 +461,13 @@ done: case <-ticker.C: // don't change the order of database flushes if err := s.extraDb.Flush(); err != nil { - glog.V(logger.Error).Infof("error: flush extraDb: %v\n", err) + glog.Fatalf("fatal error: flush extraDb: %v\n", err) } if err := s.stateDb.Flush(); err != nil { - glog.V(logger.Error).Infof("error: flush stateDb: %v\n", err) + glog.Fatalf("fatal error: flush stateDb: %v\n", err) } if err := s.blockDb.Flush(); err != nil { - glog.V(logger.Error).Infof("error: flush blockDb: %v\n", err) + glog.Fatalf("fatal error: flush blockDb: %v\n", err) } case <-s.shutdownChan: break done -- cgit v1.2.3