diff options
Diffstat (limited to 'eth/downloader/downloader.go')
-rw-r--r-- | eth/downloader/downloader.go | 91 |
1 files changed, 44 insertions, 47 deletions
diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index a5d03d17e..0861e917a 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -205,27 +205,26 @@ func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, chain BlockC } dl := &Downloader{ - mode: mode, - mux: mux, - queue: newQueue(), - peers: newPeerSet(), - stateDB: stateDb, - rttEstimate: uint64(rttMaxEstimate), - rttConfidence: uint64(1000000), - chain: chain, - lightchain: lightchain, - dropPeer: dropPeer, - headerCh: make(chan dataPack, 1), - bodyCh: make(chan dataPack, 1), - receiptCh: make(chan dataPack, 1), - bodyWakeCh: make(chan bool, 1), - receiptWakeCh: make(chan bool, 1), - headerProcCh: make(chan []*types.Header, 1), - quitCh: make(chan struct{}), - // for stateFetcher + mode: mode, + stateDB: stateDb, + mux: mux, + queue: newQueue(), + peers: newPeerSet(), + rttEstimate: uint64(rttMaxEstimate), + rttConfidence: uint64(1000000), + chain: chain, + lightchain: lightchain, + dropPeer: dropPeer, + headerCh: make(chan dataPack, 1), + bodyCh: make(chan dataPack, 1), + receiptCh: make(chan dataPack, 1), + bodyWakeCh: make(chan bool, 1), + receiptWakeCh: make(chan bool, 1), + headerProcCh: make(chan []*types.Header, 1), + quitCh: make(chan struct{}), + stateCh: make(chan dataPack), stateSyncStart: make(chan *stateSync), trackStateReq: make(chan *stateReq), - stateCh: make(chan dataPack), } go dl.qosTuner() go dl.stateFetcher() @@ -269,13 +268,11 @@ func (d *Downloader) Synchronising() bool { // RegisterPeer injects a new download peer into the set of block source to be // used for fetching hashes and blocks from. -func (d *Downloader) RegisterPeer(id string, version int, currentHead currentHeadRetrievalFn, - getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, - getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error { +func (d *Downloader) RegisterPeer(id string, version int, peer Peer) error { logger := log.New("peer", id) logger.Trace("Registering sync peer") - if err := d.peers.Register(newPeer(id, version, currentHead, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData, logger)); err != nil { + if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil { logger.Error("Failed to register sync peer", "err", err) return err } @@ -395,7 +392,7 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode // syncWithPeer starts a block synchronization based on the hash chain from the // specified peer and head hash. -func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err error) { +func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) { d.mux.Post(StartEvent{}) defer func() { // reset on error @@ -548,12 +545,12 @@ func (d *Downloader) Terminate() { // fetchHeight retrieves the head header of the remote peer to aid in estimating // the total time a pending synchronisation would take. -func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { +func (d *Downloader) fetchHeight(p *peerConnection) (*types.Header, error) { p.log.Debug("Retrieving remote chain height") // Request the advertised remote head block and wait for the response - head, _ := p.currentHead() - go p.getRelHeaders(head, 1, 0, false) + head, _ := p.peer.Head() + go p.peer.RequestHeadersByHash(head, 1, 0, false) ttl := d.requestTTL() timeout := time.After(ttl) @@ -594,7 +591,7 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { // on the correct chain, checking the top N links should already get us a match. // In the rare scenario when we ended up on a long reorganisation (i.e. none of // the head links match), we do a binary search to find the common ancestor. -func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { +func (d *Downloader) findAncestor(p *peerConnection, height uint64) (uint64, error) { // Figure out the valid ancestor range to prevent rewrite attacks floor, ceil := int64(-1), d.lightchain.CurrentHeader().Number.Uint64() @@ -622,7 +619,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { if count > limit { count = limit } - go p.getAbsHeaders(uint64(from), count, 15, false) + go p.peer.RequestHeadersByNumber(uint64(from), count, 15, false) // Wait for the remote response to the head fetch number, hash := uint64(0), common.Hash{} @@ -704,7 +701,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { ttl := d.requestTTL() timeout := time.After(ttl) - go p.getAbsHeaders(uint64(check), 1, 0, false) + go p.peer.RequestHeadersByNumber(uint64(check), 1, 0, false) // Wait until a reply arrives to this request for arrived := false; !arrived; { @@ -765,7 +762,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) { // other peers are only accepted if they map cleanly to the skeleton. If no one // can fill in the skeleton - not even the origin peer - it's assumed invalid and // the origin is dropped. -func (d *Downloader) fetchHeaders(p *peer, from uint64) error { +func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error { p.log.Debug("Directing header downloads", "origin", from) defer p.log.Debug("Header download terminated") @@ -785,10 +782,10 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error { if skeleton { p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) - go p.getAbsHeaders(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) + go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false) } else { p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from) - go p.getAbsHeaders(from, MaxHeaderFetch, 0, false) + go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false) } } // Start pulling the header chain skeleton until all is done @@ -890,12 +887,12 @@ func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ( } expire = func() map[string]int { return d.queue.ExpireHeaders(d.requestTTL()) } throttle = func() bool { return false } - reserve = func(p *peer, count int) (*fetchRequest, bool, error) { + reserve = func(p *peerConnection, count int) (*fetchRequest, bool, error) { return d.queue.ReserveHeaders(p, count), false, nil } - fetch = func(p *peer, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } - capacity = func(p *peer) int { return p.HeaderCapacity(d.requestRTT()) } - setIdle = func(p *peer, accepted int) { p.SetHeadersIdle(accepted) } + fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) } + capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.requestRTT()) } + setIdle = func(p *peerConnection, accepted int) { p.SetHeadersIdle(accepted) } ) err := d.fetchParts(errCancelHeaderFetch, d.headerCh, deliver, d.queue.headerContCh, expire, d.queue.PendingHeaders, d.queue.InFlightHeaders, throttle, reserve, @@ -919,9 +916,9 @@ func (d *Downloader) fetchBodies(from uint64) error { return d.queue.DeliverBodies(pack.peerId, pack.transactions, pack.uncles) } expire = func() map[string]int { return d.queue.ExpireBodies(d.requestTTL()) } - fetch = func(p *peer, req *fetchRequest) error { return p.FetchBodies(req) } - capacity = func(p *peer) int { return p.BlockCapacity(d.requestRTT()) } - setIdle = func(p *peer, accepted int) { p.SetBodiesIdle(accepted) } + fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) } + capacity = func(p *peerConnection) int { return p.BlockCapacity(d.requestRTT()) } + setIdle = func(p *peerConnection, accepted int) { p.SetBodiesIdle(accepted) } ) err := d.fetchParts(errCancelBodyFetch, d.bodyCh, deliver, d.bodyWakeCh, expire, d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ShouldThrottleBlocks, d.queue.ReserveBodies, @@ -943,9 +940,9 @@ func (d *Downloader) fetchReceipts(from uint64) error { return d.queue.DeliverReceipts(pack.peerId, pack.receipts) } expire = func() map[string]int { return d.queue.ExpireReceipts(d.requestTTL()) } - fetch = func(p *peer, req *fetchRequest) error { return p.FetchReceipts(req) } - capacity = func(p *peer) int { return p.ReceiptCapacity(d.requestRTT()) } - setIdle = func(p *peer, accepted int) { p.SetReceiptsIdle(accepted) } + fetch = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) } + capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.requestRTT()) } + setIdle = func(p *peerConnection, accepted int) { p.SetReceiptsIdle(accepted) } ) err := d.fetchParts(errCancelReceiptFetch, d.receiptCh, deliver, d.receiptWakeCh, expire, d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ShouldThrottleReceipts, d.queue.ReserveReceipts, @@ -981,9 +978,9 @@ func (d *Downloader) fetchReceipts(from uint64) error { // - setIdle: network callback to set a peer back to idle and update its estimated capacity (traffic shaping) // - kind: textual label of the type being downloaded to display in log mesages func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool, - expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peer, int) (*fetchRequest, bool, error), - fetchHook func([]*types.Header), fetch func(*peer, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peer) int, - idle func() ([]*peer, int), setIdle func(*peer, int), kind string) error { + expire func() map[string]int, pending func() int, inFlight func() bool, throttle func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, error), + fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int, + idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int), kind string) error { // Create a ticker to detect expired retrieval tasks ticker := time.NewTicker(100 * time.Millisecond) @@ -1261,7 +1258,7 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot { frequency = 1 } - if n, err := d.chain.InsertHeaderChain(chunk, frequency); err != nil { + if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil { // If some headers were inserted, add them too to the rollback list if n > 0 { rollback = append(rollback, chunk[:n]...) |