diff options
Diffstat (limited to 'les/fetcher.go')
-rw-r--r-- | les/fetcher.go | 295 |
1 files changed, 295 insertions, 0 deletions
diff --git a/les/fetcher.go b/les/fetcher.go new file mode 100644 index 000000000..3fa5cf0e2 --- /dev/null +++ b/les/fetcher.go @@ -0,0 +1,295 @@ +// Copyright 2015 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// Package les implements the Light Ethereum Subprotocol. +package les + +import ( + "math/big" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" +) + +type lightFetcher struct { + pm *ProtocolManager + odr *LesOdr + chain BlockChain + + headAnnouncedMu sync.Mutex + headAnnouncedBy map[common.Hash][]*peer + currentTd *big.Int + deliverChn chan fetchResponse + reqMu sync.RWMutex + requested map[uint64]fetchRequest + timeoutChn chan uint64 + notifyChn chan bool // true if initiated from outside + syncing bool + syncDone chan struct{} +} + +type fetchRequest struct { + hash common.Hash + amount uint64 + peer *peer +} + +type fetchResponse struct { + reqID uint64 + headers []*types.Header +} + +func newLightFetcher(pm *ProtocolManager) *lightFetcher { + f := &lightFetcher{ + pm: pm, + chain: pm.blockchain, + odr: pm.odr, + headAnnouncedBy: make(map[common.Hash][]*peer), + deliverChn: make(chan fetchResponse, 100), + requested: make(map[uint64]fetchRequest), + timeoutChn: make(chan uint64), + notifyChn: make(chan bool, 100), + syncDone: make(chan struct{}), + currentTd: big.NewInt(0), + } + go f.syncLoop() + return f +} + +func (f *lightFetcher) notify(p *peer, head *announceData) { + var headHash common.Hash + if head == nil { + // initial notify + headHash = p.Head() + } else { + if core.GetTd(f.pm.chainDb, head.Hash, head.Number) != nil { + head.haveHeaders = head.Number + } + //fmt.Println("notify", p.id, head.Number, head.ReorgDepth, head.haveHeaders) + if !p.addNotify(head) { + //fmt.Println("addNotify fail") + f.pm.removePeer(p.id) + } + headHash = head.Hash + } + f.headAnnouncedMu.Lock() + f.headAnnouncedBy[headHash] = append(f.headAnnouncedBy[headHash], p) + f.headAnnouncedMu.Unlock() + f.notifyChn <- true +} + +func (f *lightFetcher) gotHeader(header *types.Header) { + f.headAnnouncedMu.Lock() + defer f.headAnnouncedMu.Unlock() + + hash := header.Hash() + peerList := f.headAnnouncedBy[hash] + if peerList == nil { + return + } + number := header.GetNumberU64() + td := core.GetTd(f.pm.chainDb, hash, number) + for _, peer := range peerList { + peer.lock.Lock() + ok := peer.gotHeader(hash, number, td) + peer.lock.Unlock() + if !ok { + //fmt.Println("gotHeader fail") + f.pm.removePeer(peer.id) + } + } + delete(f.headAnnouncedBy, hash) +} + +func (f *lightFetcher) nextRequest() (*peer, *announceData) { + var bestPeer *peer + bestTd := f.currentTd + for _, peer := range f.pm.peers.AllPeers() { + peer.lock.RLock() + if !peer.headInfo.requested && (peer.headInfo.Td.Cmp(bestTd) > 0 || + (bestPeer != nil && peer.headInfo.Td.Cmp(bestTd) == 0 && peer.headInfo.haveHeaders > bestPeer.headInfo.haveHeaders)) { + bestPeer = peer + bestTd = peer.headInfo.Td + } + peer.lock.RUnlock() + } + if bestPeer == nil { + return nil, nil + } + bestPeer.lock.Lock() + res := bestPeer.headInfo + res.requested = true + bestPeer.lock.Unlock() + for _, peer := range f.pm.peers.AllPeers() { + if peer != bestPeer { + peer.lock.Lock() + if peer.headInfo.Hash == bestPeer.headInfo.Hash && peer.headInfo.haveHeaders == bestPeer.headInfo.haveHeaders { + peer.headInfo.requested = true + } + peer.lock.Unlock() + } + } + return bestPeer, res +} + +func (f *lightFetcher) deliverHeaders(reqID uint64, headers []*types.Header) { + f.deliverChn <- fetchResponse{reqID: reqID, headers: headers} +} + +func (f *lightFetcher) requestedID(reqID uint64) bool { + f.reqMu.RLock() + _, ok := f.requested[reqID] + f.reqMu.RUnlock() + return ok +} + +func (f *lightFetcher) request(p *peer, block *announceData) { + //fmt.Println("request", p.id, block.Number, block.haveHeaders) + amount := block.Number - block.haveHeaders + if amount == 0 { + return + } + if amount > 100 { + f.syncing = true + go func() { + //fmt.Println("f.pm.synchronise(p)") + f.pm.synchronise(p) + //fmt.Println("sync done") + f.syncDone <- struct{}{} + }() + return + } + + reqID := f.odr.getNextReqID() + f.reqMu.Lock() + f.requested[reqID] = fetchRequest{hash: block.Hash, amount: amount, peer: p} + f.reqMu.Unlock() + cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount)) + p.fcServer.SendRequest(reqID, cost) + go p.RequestHeadersByHash(reqID, cost, block.Hash, int(amount), 0, true) + go func() { + time.Sleep(hardRequestTimeout) + f.timeoutChn <- reqID + }() +} + +func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool { + if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash { + return false + } + headers := make([]*types.Header, req.amount) + for i, header := range resp.headers { + headers[int(req.amount)-1-i] = header + } + if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil { + return false + } + for _, header := range headers { + td := core.GetTd(f.pm.chainDb, header.Hash(), header.GetNumberU64()) + if td == nil { + return false + } + if td.Cmp(f.currentTd) > 0 { + f.currentTd = td + } + f.gotHeader(header) + } + return true +} + +func (f *lightFetcher) checkSyncedHeaders() { + //fmt.Println("checkSyncedHeaders()") + for _, peer := range f.pm.peers.AllPeers() { + peer.lock.Lock() + h := peer.firstHeadInfo + remove := false + loop: + for h != nil { + if td := core.GetTd(f.pm.chainDb, h.Hash, h.Number); td != nil { + //fmt.Println(" found", h.Number) + ok := peer.gotHeader(h.Hash, h.Number, td) + if !ok { + remove = true + break loop + } + if td.Cmp(f.currentTd) > 0 { + f.currentTd = td + } + } + h = h.next + } + peer.lock.Unlock() + if remove { + //fmt.Println("checkSync fail") + f.pm.removePeer(peer.id) + } + } +} + +func (f *lightFetcher) syncLoop() { + f.pm.wg.Add(1) + defer f.pm.wg.Done() + + srtoNotify := false + for { + select { + case <-f.pm.quitSync: + return + case ext := <-f.notifyChn: + //fmt.Println("<-f.notifyChn", f.syncing, ext, srtoNotify) + s := srtoNotify + srtoNotify = false + if !f.syncing && !(ext && s) { + if p, r := f.nextRequest(); r != nil { + srtoNotify = true + go func() { + time.Sleep(softRequestTimeout) + f.notifyChn <- false + }() + f.request(p, r) + } + } + case reqID := <-f.timeoutChn: + f.reqMu.Lock() + req, ok := f.requested[reqID] + if ok { + delete(f.requested, reqID) + } + f.reqMu.Unlock() + if ok { + //fmt.Println("hard timeout") + f.pm.removePeer(req.peer.id) + } + case resp := <-f.deliverChn: + //fmt.Println("<-f.deliverChn", f.syncing) + f.reqMu.Lock() + req, ok := f.requested[resp.reqID] + delete(f.requested, resp.reqID) + f.reqMu.Unlock() + if !ok || !(f.syncing || f.processResponse(req, resp)) { + //fmt.Println("processResponse fail") + f.pm.removePeer(req.peer.id) + } + case <-f.syncDone: + //fmt.Println("<-f.syncDone", f.syncing) + f.checkSyncedHeaders() + f.syncing = false + } + } +} |