diff options
-rw-r--r-- | cmd/geth/admin.go | 26 | ||||
-rw-r--r-- | eth/downloader/downloader.go | 36 | ||||
-rw-r--r-- | eth/downloader/downloader_test.go | 6 | ||||
-rw-r--r-- | eth/downloader/peer.go | 8 | ||||
-rw-r--r-- | eth/downloader/queue.go | 22 | ||||
-rw-r--r-- | eth/handler.go | 6 | ||||
-rw-r--r-- | eth/peer.go | 6 |
7 files changed, 74 insertions, 36 deletions
diff --git a/cmd/geth/admin.go b/cmd/geth/admin.go index f8c717187..2ac155e33 100644 --- a/cmd/geth/admin.go +++ b/cmd/geth/admin.go @@ -48,6 +48,32 @@ func (js *jsre) adminBindings() { debug := t.Object() debug.Set("printBlock", js.printBlock) debug.Set("dumpBlock", js.dumpBlock) + debug.Set("getBlockRlp", js.getBlockRlp) +} + +func (js *jsre) getBlockRlp(call otto.FunctionCall) otto.Value { + var block *types.Block + if len(call.ArgumentList) > 0 { + if call.Argument(0).IsNumber() { + num, _ := call.Argument(0).ToInteger() + block = js.ethereum.ChainManager().GetBlockByNumber(uint64(num)) + } else if call.Argument(0).IsString() { + hash, _ := call.Argument(0).ToString() + block = js.ethereum.ChainManager().GetBlock(common.HexToHash(hash)) + } else { + fmt.Println("invalid argument for dump. Either hex string or number") + } + + } else { + block = js.ethereum.ChainManager().CurrentBlock() + } + if block == nil { + fmt.Println("block not found") + return otto.UndefinedValue() + } + + encoded, _ := rlp.EncodeToBytes(block) + return js.re.ToVal(fmt.Sprintf("%x", encoded)) } func (js *jsre) setExtra(call otto.FunctionCall) otto.Value { diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index 6768c3e67..2b9d1cbee 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -18,14 +18,15 @@ import ( ) const ( - maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk - minDesiredPeerCount = 5 // Amount of peers desired to start syncing - peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount - blockTtl = 15 * time.Second // The amount of time it takes for a block request to time out - hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out + maxBlockFetch = 256 // Amount of max blocks to be fetched per chunk + peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount + blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out + hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out ) var ( + minDesiredPeerCount = 5 // Amount of peers desired to start syncing + errLowTd = errors.New("peer's TD is too low") errBusy = errors.New("busy") errUnknownPeer = errors.New("peer's unknown or unhealthy") @@ -127,11 +128,11 @@ out: for { select { case <-d.newPeerCh: - itimer.Stop() // Meet the `minDesiredPeerCount` before we select our best peer if len(d.peers) < minDesiredPeerCount { break } + itimer.Stop() d.selectPeer(d.peers.bestPeer()) case <-itimer.C: @@ -154,17 +155,18 @@ func (d *Downloader) selectPeer(p *peer) { // Make sure it's doing neither. Once done we can restart the // downloading process if the TD is higher. For now just get on // with whatever is going on. This prevents unecessary switching. - if !d.isBusy() { - // selected peer must be better than our own - // XXX we also check the peer's recent hash to make sure we - // don't have it. Some peers report (i think) incorrect TD. - if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) { - return - } - - glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td) - d.syncCh <- syncPack{p, p.recentHash, false} + if d.isBusy() { + return } + // selected peer must be better than our own + // XXX we also check the peer's recent hash to make sure we + // don't have it. Some peers report (i think) incorrect TD. + if p.td.Cmp(d.currentTd()) <= 0 || d.hasBlock(p.recentHash) { + return + } + + glog.V(logger.Detail).Infoln("New peer with highest TD =", p.td) + d.syncCh <- syncPack{p, p.recentHash, false} } @@ -307,7 +309,7 @@ out: if len(d.queue.fetching) == 0 { d.queue.reset() - return fmt.Errorf("%v avaialable = %d. total = %d", errPeersUnavailable, len(availablePeers), len(d.peers)) + return fmt.Errorf("%v peers avaialable = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(availablePeers), len(d.peers), d.queue.hashPool.Size()) } } else if len(d.queue.fetching) == 0 { diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 6cf99b678..249d8a533 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -73,7 +73,7 @@ func (dl *downloadTester) insertChain(blocks types.Blocks) error { } func (dl *downloadTester) getHashes(hash common.Hash) error { - dl.downloader.HashCh <- dl.hashes + dl.downloader.hashCh <- dl.hashes return nil } @@ -109,6 +109,8 @@ func TestDownload(t *testing.T) { glog.SetV(logger.Detail) glog.SetToStderr(true) + minDesiredPeerCount = 4 + hashes := createHashes(0, 1000) blocks := createBlocksFromHashes(hashes) tester := newTester(t, hashes, blocks) @@ -123,7 +125,7 @@ success: case <-tester.done: break success case <-time.After(10 * time.Second): // XXX this could actually fail on a slow computer - t.Error("timout") + t.Error("timeout") } } diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 7065ca105..bcb8ad43a 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -71,7 +71,7 @@ type peer struct { td *big.Int recentHash common.Hash - requested *set.Set + ignored *set.Set getHashes hashFetcherFn getBlocks blockFetcherFn @@ -86,7 +86,7 @@ func newPeer(id string, td *big.Int, hash common.Hash, getHashes hashFetcherFn, getHashes: getHashes, getBlocks: getBlocks, state: idleState, - requested: set.New(), + ignored: set.New(), } } @@ -99,8 +99,6 @@ func (p *peer) fetch(chunk *chunk) error { return errors.New("peer already fetching chunk") } - p.requested.Merge(chunk.hashes) - // set working state p.state = workingState // convert the set to a fetchable slice @@ -137,5 +135,5 @@ func (p *peer) demote() { func (p *peer) reset() { p.state = idleState - p.requested.Clear() + p.ignored.Clear() } diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go index ce3aa9850..adbc2a0d0 100644 --- a/eth/downloader/queue.go +++ b/eth/downloader/queue.go @@ -56,16 +56,18 @@ func (c *queue) get(p *peer, max int) *chunk { // Create a new set of hashes hashes, i := set.New(), 0 c.hashPool.Each(func(v interface{}) bool { + // break on limit if i == limit { return false } - - // Skip any hashes that have previously been requested from the peer - if !p.requested.Has(v) { - hashes.Add(v) - i++ + // skip any hashes that have previously been requested from the peer + if p.ignored.Has(v) { + return true } + hashes.Add(v) + i++ + return true }) // if no hashes can be requested return a nil chunk @@ -79,7 +81,7 @@ func (c *queue) get(p *peer, max int) *chunk { // Create a new chunk for the seperated hashes. The time is being used // to reset the chunk (timeout) - chunk := &chunk{hashes, time.Now()} + chunk := &chunk{p, hashes, time.Now()} // register as 'fetching' state c.fetching[p.id] = chunk @@ -111,6 +113,12 @@ func (c *queue) deliver(id string, blocks []*types.Block) { // If the chunk was never requested simply ignore it if chunk != nil { delete(c.fetching, id) + // check the length of the returned blocks. If the length of blocks is 0 + // we'll assume the peer doesn't know about the chain. + if len(blocks) == 0 { + // So we can ignore the blocks we didn't know about + chunk.peer.ignored.Merge(chunk.hashes) + } // seperate the blocks and the hashes blockHashes := chunk.fetchedHashes(blocks) @@ -118,7 +126,6 @@ func (c *queue) deliver(id string, blocks []*types.Block) { c.blockHashes.Merge(blockHashes) // Add the blocks c.blocks = append(c.blocks, blocks...) - // Add back whatever couldn't be delivered c.hashPool.Merge(chunk.hashes) c.fetchPool.Separate(chunk.hashes) @@ -134,6 +141,7 @@ func (c *queue) put(hashes *set.Set) { } type chunk struct { + peer *peer hashes *set.Set itime time.Time } diff --git a/eth/handler.go b/eth/handler.go index f1f462a89..780ec3931 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -36,6 +36,7 @@ pm.chainman.InsertChain(blocks) import ( "fmt" + "math/big" "sync" "github.com/ethereum/go-ethereum/common" @@ -273,12 +274,11 @@ func (self *ProtocolManager) handleMsg(p *peer) error { if self.chainman.HasBlock(hash) { break } - /* XXX unsure about this + /* XXX unsure about this */ if self.chainman.Td().Cmp(request.TD) > 0 && new(big.Int).Add(request.Block.Number(), big.NewInt(7)).Cmp(self.chainman.CurrentBlock().Number()) < 0 { - glog.V(logger.Debug).Infoln("dropped block", request.Block.Number(), "due to low TD", request.TD) + glog.V(logger.Debug).Infof("[%s] dropped block %v due to low TD %v\n", p.id, request.Block.Number(), request.TD) break } - */ // Attempt to insert the newly received by checking if the parent exists. // if the parent exists we process the block and propagate to our peers diff --git a/eth/peer.go b/eth/peer.go index 8cedbd85a..972880845 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -6,6 +6,8 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/logger" + "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/p2p" "gopkg.in/fatih/set.v0" ) @@ -85,12 +87,12 @@ func (p *peer) sendNewBlock(block *types.Block) error { } func (p *peer) requestHashes(from common.Hash) error { - p.Debugf("fetching hashes (%d) %x...\n", maxHashes, from[0:4]) + glog.V(logger.Debug).Infof("[%s] fetching hashes (%d) %x...\n", p.id, maxHashes, from[:4]) return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesMsgData{from, maxHashes}) } func (p *peer) requestBlocks(hashes []common.Hash) error { - p.Debugf("fetching %v blocks", len(hashes)) + glog.V(logger.Debug).Infof("[%s] fetching %v blocks\n", p.id, len(hashes)) return p2p.Send(p.rw, GetBlocksMsg, hashes) } |