diff options
-rw-r--r-- | VERSION | 2 | ||||
-rw-r--r-- | cmd/swarm/upload.go | 82 | ||||
-rw-r--r-- | core/database_util.go | 6 | ||||
-rw-r--r-- | les/fetcher.go | 46 | ||||
-rw-r--r-- | les/flowcontrol/control.go | 121 | ||||
-rw-r--r-- | les/flowcontrol/manager.go | 41 | ||||
-rw-r--r-- | les/handler.go | 69 | ||||
-rw-r--r-- | les/helper_test.go | 19 | ||||
-rw-r--r-- | les/odr.go | 10 | ||||
-rw-r--r-- | les/odr_test.go | 9 | ||||
-rw-r--r-- | les/peer.go | 4 | ||||
-rw-r--r-- | les/request_test.go | 7 | ||||
-rw-r--r-- | les/serverpool.go | 66 | ||||
-rw-r--r-- | light/txpool_test.go | 35 | ||||
-rw-r--r-- | params/version.go | 2 |
15 files changed, 354 insertions, 165 deletions
@@ -1 +1 @@ -1.5.6 +1.5.7 diff --git a/cmd/swarm/upload.go b/cmd/swarm/upload.go index d048bbc40..d8039d45b 100644 --- a/cmd/swarm/upload.go +++ b/cmd/swarm/upload.go @@ -50,8 +50,6 @@ func upload(ctx *cli.Context) { var ( file = args[0] client = &client{api: bzzapi} - mroot manifest - entry manifestEntry ) fi, err := os.Stat(expandPath(file)) if err != nil { @@ -61,14 +59,21 @@ func upload(ctx *cli.Context) { if !recursive { log.Fatal("argument is a directory and recursive upload is disabled") } - mroot, err = client.uploadDirectory(file, defaultPath) - } else { - entry, err = client.uploadFile(file, fi) - mroot = manifest{[]manifestEntry{entry}} + if !wantManifest { + log.Fatal("manifest is required for directory uploads") + } + mhash, err := client.uploadDirectory(file, defaultPath) + if err != nil { + log.Fatal(err) + } + fmt.Println(mhash) + return } + entry, err := client.uploadFile(file, fi) if err != nil { log.Fatalln("upload failed:", err) } + mroot := manifest{[]manifestEntry{entry}} if !wantManifest { // Print the manifest. This is the only output to stdout. mrootJSON, _ := json.MarshalIndent(mroot, "", " ") @@ -123,43 +128,43 @@ type manifest struct { Entries []manifestEntry `json:"entries,omitempty"` } -func (c *client) uploadFile(file string, fi os.FileInfo) (manifestEntry, error) { - hash, err := c.uploadFileContent(file, fi) - m := manifestEntry{ - Hash: hash, - ContentType: mime.TypeByExtension(filepath.Ext(fi.Name())), +func (c *client) uploadDirectory(dir string, defaultPath string) (string, error) { + mhash, err := c.postRaw("application/json", 2, ioutil.NopCloser(bytes.NewReader([]byte("{}")))) + if err != nil { + return "", fmt.Errorf("failed to upload empty manifest") } - return m, err -} - -func (c *client) uploadDirectory(dir string, defaultPath string) (manifest, error) { - dirm := manifest{} if len(defaultPath) > 0 { fi, err := os.Stat(defaultPath) if err != nil { - log.Fatal(err) + return "", err } - entry, err := c.uploadFile(defaultPath, fi) + mhash, err = c.uploadToManifest(mhash, "", defaultPath, fi) if err != nil { - log.Fatal(err) + return "", err } - entry.Path = "" - dirm.Entries = append(dirm.Entries, entry) } prefix := filepath.ToSlash(filepath.Clean(dir)) + "/" - err := filepath.Walk(dir, func(path string, fi os.FileInfo, err error) error { + err = filepath.Walk(dir, func(path string, fi os.FileInfo, err error) error { if err != nil || fi.IsDir() { return err } if !strings.HasPrefix(path, dir) { return fmt.Errorf("path %s outside directory %s", path, dir) } - entry, err := c.uploadFile(path, fi) - entry.Path = strings.TrimPrefix(filepath.ToSlash(filepath.Clean(path)), prefix) - dirm.Entries = append(dirm.Entries, entry) + uripath := strings.TrimPrefix(filepath.ToSlash(filepath.Clean(path)), prefix) + mhash, err = c.uploadToManifest(mhash, uripath, path, fi) return err }) - return dirm, err + return mhash, err +} + +func (c *client) uploadFile(file string, fi os.FileInfo) (manifestEntry, error) { + hash, err := c.uploadFileContent(file, fi) + m := manifestEntry{ + Hash: hash, + ContentType: mime.TypeByExtension(filepath.Ext(fi.Name())), + } + return m, err } func (c *client) uploadFileContent(file string, fi os.FileInfo) (string, error) { @@ -181,6 +186,31 @@ func (c *client) uploadManifest(m manifest) (string, error) { return c.postRaw("application/json", int64(len(jsm)), ioutil.NopCloser(bytes.NewReader(jsm))) } +func (c *client) uploadToManifest(mhash string, path string, fpath string, fi os.FileInfo) (string, error) { + fd, err := os.Open(fpath) + if err != nil { + return "", err + } + defer fd.Close() + log.Printf("uploading file %s (%d bytes) and adding path %v", fpath, fi.Size(), path) + req, err := http.NewRequest("PUT", c.api+"/bzz:/"+mhash+"/"+path, fd) + if err != nil { + return "", err + } + req.Header.Set("content-type", mime.TypeByExtension(filepath.Ext(fi.Name()))) + req.ContentLength = fi.Size() + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + if resp.StatusCode >= 400 { + return "", fmt.Errorf("bad status: %s", resp.Status) + } + content, err := ioutil.ReadAll(resp.Body) + return string(content), err +} + func (c *client) postRaw(mimetype string, size int64, body io.ReadCloser) (string, error) { req, err := http.NewRequest("POST", c.api+"/bzzr:/", body) if err != nil { diff --git a/core/database_util.go b/core/database_util.go index 84669de35..2060b8b6a 100644 --- a/core/database_util.go +++ b/core/database_util.go @@ -23,6 +23,7 @@ import ( "errors" "fmt" "math/big" + "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -63,6 +64,8 @@ var ( oldBlockHashPrefix = []byte("block-hash-") // [deprecated by the header/block split, remove eventually] ChainConfigNotFoundErr = errors.New("ChainConfig not found") // general config not found error + + mipmapBloomMu sync.Mutex // protect against race condition when updating mipmap blooms ) // encodeBlockNumber encodes a block number as big endian uint64 @@ -564,6 +567,9 @@ func mipmapKey(num, level uint64) []byte { // WriteMapmapBloom writes each address included in the receipts' logs to the // MIP bloom bin. func WriteMipmapBloom(db ethdb.Database, number uint64, receipts types.Receipts) error { + mipmapBloomMu.Lock() + defer mipmapBloomMu.Unlock() + batch := db.NewBatch() for _, level := range MIPMapLevels { key := mipmapKey(number, level) diff --git a/les/fetcher.go b/les/fetcher.go index 4a0830a8a..de706de5e 100644 --- a/les/fetcher.go +++ b/les/fetcher.go @@ -125,7 +125,7 @@ func (f *lightFetcher) syncLoop() { f.pm.wg.Add(1) defer f.pm.wg.Done() - requestStarted := false + requesting := false for { select { case <-f.pm.quitSync: @@ -134,13 +134,13 @@ func (f *lightFetcher) syncLoop() { // no further requests are necessary or possible case newAnnounce := <-f.requestChn: f.lock.Lock() - s := requestStarted - requestStarted = false + s := requesting + requesting = false if !f.syncing && !(newAnnounce && s) { - if peer, node, amount := f.nextRequest(); node != nil { - requestStarted = true - reqID, started := f.request(peer, node, amount) - if started { + reqID := getNextReqID() + if peer, node, amount, retry := f.nextRequest(reqID); node != nil { + requesting = true + if reqID, ok := f.request(peer, reqID, node, amount); ok { go func() { time.Sleep(softRequestTimeout) f.reqMu.Lock() @@ -154,6 +154,14 @@ func (f *lightFetcher) syncLoop() { f.requestChn <- false }() } + } else { + if retry { + requesting = true + go func() { + time.Sleep(time.Millisecond * 100) + f.requestChn <- false + }() + } } } f.lock.Unlock() @@ -344,10 +352,11 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo } // request initiates a header download request from a certain peer -func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint64, bool) { +func (f *lightFetcher) request(p *peer, reqID uint64, n *fetcherTreeNode, amount uint64) (uint64, bool) { fp := f.peers[p] if fp == nil { glog.V(logger.Debug).Infof("request: unknown peer") + p.fcServer.DeassignRequest(reqID) return 0, false } if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) { @@ -357,10 +366,10 @@ func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint f.pm.synchronise(p) f.syncDone <- p }() + p.fcServer.DeassignRequest(reqID) return 0, false } - reqID := getNextReqID() n.requested = true cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount)) p.fcServer.SendRequest(reqID, cost) @@ -400,7 +409,7 @@ func (f *lightFetcher) requestedID(reqID uint64) bool { // nextRequest selects the peer and announced head to be requested next, amount // to be downloaded starting from the head backwards is also returned -func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) { +func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint64, bool) { var ( bestHash common.Hash bestAmount uint64 @@ -420,21 +429,24 @@ func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) { } } if bestTd == f.maxConfirmedTd { - return nil, nil, 0 + return nil, nil, 0, false } - peer := f.pm.serverPool.selectPeer(func(p *peer) (bool, uint64) { + peer, _, locked := f.pm.serverPool.selectPeer(reqID, func(p *peer) (bool, time.Duration) { fp := f.peers[p] if fp == nil || fp.nodeByHash[bestHash] == nil { return false, 0 } return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount))) }) + if !locked { + return nil, nil, 0, true + } var node *fetcherTreeNode if peer != nil { node = f.peers[peer].nodeByHash[bestHash] } - return peer, node, bestAmount + return peer, node, bestAmount, false } // deliverHeaders delivers header download request responses for processing @@ -442,9 +454,10 @@ func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer} } -// processResponse processes header download request responses +// processResponse processes header download request responses, returns true if successful func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool { if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash { + glog.V(logger.Debug).Infof("response mismatch %v %016x != %v %016x", len(resp.headers), resp.headers[0].Hash().Bytes()[:8], req.amount, req.hash[:8]) return false } headers := make([]*types.Header, req.amount) @@ -452,12 +465,17 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo headers[int(req.amount)-1-i] = header } if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil { + if err == core.BlockFutureErr { + return true + } + glog.V(logger.Debug).Infof("InsertHeaderChain error: %v", err) return false } tds := make([]*big.Int, len(headers)) for i, header := range headers { td := f.chain.GetTd(header.Hash(), header.Number.Uint64()) if td == nil { + glog.V(logger.Debug).Infof("TD not found for header %v of %v", i+1, len(headers)) return false } tds[i] = td diff --git a/les/flowcontrol/control.go b/les/flowcontrol/control.go index acb131ea4..e45537cf5 100644 --- a/les/flowcontrol/control.go +++ b/les/flowcontrol/control.go @@ -24,7 +24,7 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" ) -const fcTimeConst = 1000000 +const fcTimeConst = time.Millisecond type ServerParams struct { BufLimit, MinRecharge uint64 @@ -33,7 +33,7 @@ type ServerParams struct { type ClientNode struct { params *ServerParams bufValue uint64 - lastTime int64 + lastTime mclock.AbsTime lock sync.Mutex cm *ClientManager cmNode *cmNode @@ -44,7 +44,7 @@ func NewClientNode(cm *ClientManager, params *ServerParams) *ClientNode { cm: cm, params: params, bufValue: params.BufLimit, - lastTime: getTime(), + lastTime: mclock.Now(), } node.cmNode = cm.addNode(node) return node @@ -54,12 +54,12 @@ func (peer *ClientNode) Remove(cm *ClientManager) { cm.removeNode(peer.cmNode) } -func (peer *ClientNode) recalcBV(time int64) { +func (peer *ClientNode) recalcBV(time mclock.AbsTime) { dt := uint64(time - peer.lastTime) if time < peer.lastTime { dt = 0 } - peer.bufValue += peer.params.MinRecharge * dt / fcTimeConst + peer.bufValue += peer.params.MinRecharge * dt / uint64(fcTimeConst) if peer.bufValue > peer.params.BufLimit { peer.bufValue = peer.params.BufLimit } @@ -70,7 +70,7 @@ func (peer *ClientNode) AcceptRequest() (uint64, bool) { peer.lock.Lock() defer peer.lock.Unlock() - time := getTime() + time := mclock.Now() peer.recalcBV(time) return peer.bufValue, peer.cm.accept(peer.cmNode, time) } @@ -79,7 +79,7 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) { peer.lock.Lock() defer peer.lock.Unlock() - time := getTime() + time := mclock.Now() peer.recalcBV(time) peer.bufValue -= cost peer.recalcBV(time) @@ -94,66 +94,127 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) { } type ServerNode struct { - bufEstimate uint64 - lastTime int64 - params *ServerParams - sumCost uint64 // sum of req costs sent to this server - pending map[uint64]uint64 // value = sumCost after sending the given req - lock sync.RWMutex + bufEstimate uint64 + lastTime mclock.AbsTime + params *ServerParams + sumCost uint64 // sum of req costs sent to this server + pending map[uint64]uint64 // value = sumCost after sending the given req + assignedRequest uint64 // when != 0, only the request with the given ID can be sent to this peer + assignToken chan struct{} // send to this channel before assigning, read from it after deassigning + lock sync.RWMutex } func NewServerNode(params *ServerParams) *ServerNode { return &ServerNode{ bufEstimate: params.BufLimit, - lastTime: getTime(), + lastTime: mclock.Now(), params: params, pending: make(map[uint64]uint64), + assignToken: make(chan struct{}, 1), } } -func getTime() int64 { - return int64(mclock.Now()) -} - -func (peer *ServerNode) recalcBLE(time int64) { +func (peer *ServerNode) recalcBLE(time mclock.AbsTime) { dt := uint64(time - peer.lastTime) if time < peer.lastTime { dt = 0 } - peer.bufEstimate += peer.params.MinRecharge * dt / fcTimeConst + peer.bufEstimate += peer.params.MinRecharge * dt / uint64(fcTimeConst) if peer.bufEstimate > peer.params.BufLimit { peer.bufEstimate = peer.params.BufLimit } peer.lastTime = time } -func (peer *ServerNode) canSend(maxCost uint64) uint64 { +// safetyMargin is added to the flow control waiting time when estimated buffer value is low +const safetyMargin = time.Millisecond * 200 + +func (peer *ServerNode) canSend(maxCost uint64) time.Duration { + maxCost += uint64(safetyMargin) * peer.params.MinRecharge / uint64(fcTimeConst) + if maxCost > peer.params.BufLimit { + maxCost = peer.params.BufLimit + } if peer.bufEstimate >= maxCost { return 0 } - return (maxCost - peer.bufEstimate) * fcTimeConst / peer.params.MinRecharge + return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge) } -func (peer *ServerNode) CanSend(maxCost uint64) uint64 { +// CanSend returns the minimum waiting time required before sending a request +// with the given maximum estimated cost +func (peer *ServerNode) CanSend(maxCost uint64) time.Duration { peer.lock.RLock() defer peer.lock.RUnlock() return peer.canSend(maxCost) } +// AssignRequest tries to assign the server node to the given request, guaranteeing +// that once it returns true, no request will be sent to the node before this one +func (peer *ServerNode) AssignRequest(reqID uint64) bool { + select { + case peer.assignToken <- struct{}{}: + default: + return false + } + peer.lock.Lock() + peer.assignedRequest = reqID + peer.lock.Unlock() + return true +} + +// MustAssignRequest waits until the node can be assigned to the given request. +// It is always guaranteed that assignments are released in a short amount of time. +func (peer *ServerNode) MustAssignRequest(reqID uint64) { + peer.assignToken <- struct{}{} + peer.lock.Lock() + peer.assignedRequest = reqID + peer.lock.Unlock() +} + +// DeassignRequest releases a request assignment in case the planned request +// is not being sent. +func (peer *ServerNode) DeassignRequest(reqID uint64) { + peer.lock.Lock() + if peer.assignedRequest == reqID { + peer.assignedRequest = 0 + <-peer.assignToken + } + peer.lock.Unlock() +} + +// IsAssigned returns true if the server node has already been assigned to a request +// (note that this function returning false does not guarantee that you can assign a request +// immediately afterwards, its only purpose is to help peer selection) +func (peer *ServerNode) IsAssigned() bool { + peer.lock.RLock() + locked := peer.assignedRequest != 0 + peer.lock.RUnlock() + return locked +} + // blocks until request can be sent func (peer *ServerNode) SendRequest(reqID, maxCost uint64) { peer.lock.Lock() defer peer.lock.Unlock() - peer.recalcBLE(getTime()) - for peer.bufEstimate < maxCost { - wait := time.Duration(peer.canSend(maxCost)) + if peer.assignedRequest != reqID { + peer.lock.Unlock() + peer.MustAssignRequest(reqID) + peer.lock.Lock() + } + + peer.recalcBLE(mclock.Now()) + wait := peer.canSend(maxCost) + for wait > 0 { peer.lock.Unlock() time.Sleep(wait) peer.lock.Lock() - peer.recalcBLE(getTime()) + peer.recalcBLE(mclock.Now()) + wait = peer.canSend(maxCost) } + peer.assignedRequest = 0 + <-peer.assignToken peer.bufEstimate -= maxCost peer.sumCost += maxCost if reqID >= 0 { @@ -162,14 +223,18 @@ func (peer *ServerNode) SendRequest(reqID, maxCost uint64) { } func (peer *ServerNode) GotReply(reqID, bv uint64) { + peer.lock.Lock() defer peer.lock.Unlock() + if bv > peer.params.BufLimit { + bv = peer.params.BufLimit + } sc, ok := peer.pending[reqID] if !ok { return } delete(peer.pending, reqID) peer.bufEstimate = bv - (peer.sumCost - sc) - peer.lastTime = getTime() + peer.lastTime = mclock.Now() } diff --git a/les/flowcontrol/manager.go b/les/flowcontrol/manager.go index f9f029466..28cc6f0fe 100644 --- a/les/flowcontrol/manager.go +++ b/les/flowcontrol/manager.go @@ -20,22 +20,23 @@ package flowcontrol import ( "sync" "time" + + "github.com/ethereum/go-ethereum/common/mclock" ) const rcConst = 1000000 type cmNode struct { - node *ClientNode - lastUpdate int64 - reqAccepted int64 - serving, recharging bool - rcWeight uint64 - rcValue, rcDelta int64 - finishRecharge, startValue int64 + node *ClientNode + lastUpdate mclock.AbsTime + serving, recharging bool + rcWeight uint64 + rcValue, rcDelta, startValue int64 + finishRecharge mclock.AbsTime } -func (node *cmNode) update(time int64) { - dt := time - node.lastUpdate +func (node *cmNode) update(time mclock.AbsTime) { + dt := int64(time - node.lastUpdate) node.rcValue += node.rcDelta * dt / rcConst node.lastUpdate = time if node.recharging && time >= node.finishRecharge { @@ -62,7 +63,7 @@ func (node *cmNode) set(serving bool, simReqCnt, sumWeight uint64) { } if node.recharging { node.rcDelta = -int64(node.node.cm.rcRecharge * node.rcWeight / sumWeight) - node.finishRecharge = node.lastUpdate + node.rcValue*rcConst/(-node.rcDelta) + node.finishRecharge = node.lastUpdate + mclock.AbsTime(node.rcValue*rcConst/(-node.rcDelta)) } } @@ -73,7 +74,7 @@ type ClientManager struct { maxSimReq, maxRcSum uint64 rcRecharge uint64 resumeQueue chan chan bool - time int64 + time mclock.AbsTime } func NewClientManager(rcTarget, maxSimReq, maxRcSum uint64) *ClientManager { @@ -98,7 +99,7 @@ func (self *ClientManager) Stop() { } func (self *ClientManager) addNode(cnode *ClientNode) *cmNode { - time := getTime() + time := mclock.Now() node := &cmNode{ node: cnode, lastUpdate: time, @@ -109,7 +110,7 @@ func (self *ClientManager) addNode(cnode *ClientNode) *cmNode { defer self.lock.Unlock() self.nodes[node] = struct{}{} - self.update(getTime()) + self.update(mclock.Now()) return node } @@ -117,14 +118,14 @@ func (self *ClientManager) removeNode(node *cmNode) { self.lock.Lock() defer self.lock.Unlock() - time := getTime() + time := mclock.Now() self.stop(node, time) delete(self.nodes, node) self.update(time) } // recalc sumWeight -func (self *ClientManager) updateNodes(time int64) (rce bool) { +func (self *ClientManager) updateNodes(time mclock.AbsTime) (rce bool) { var sumWeight, rcSum uint64 for node := range self.nodes { rc := node.recharging @@ -142,7 +143,7 @@ func (self *ClientManager) updateNodes(time int64) (rce bool) { return } -func (self *ClientManager) update(time int64) { +func (self *ClientManager) update(time mclock.AbsTime) { for { firstTime := time for node := range self.nodes { @@ -172,7 +173,7 @@ func (self *ClientManager) queueProc() { for { time.Sleep(time.Millisecond * 10) self.lock.Lock() - self.update(getTime()) + self.update(mclock.Now()) cs := self.canStartReq() self.lock.Unlock() if cs { @@ -183,7 +184,7 @@ func (self *ClientManager) queueProc() { } } -func (self *ClientManager) accept(node *cmNode, time int64) bool { +func (self *ClientManager) accept(node *cmNode, time mclock.AbsTime) bool { self.lock.Lock() defer self.lock.Unlock() @@ -205,7 +206,7 @@ func (self *ClientManager) accept(node *cmNode, time int64) bool { return true } -func (self *ClientManager) stop(node *cmNode, time int64) { +func (self *ClientManager) stop(node *cmNode, time mclock.AbsTime) { if node.serving { self.update(time) self.simReqCnt-- @@ -214,7 +215,7 @@ func (self *ClientManager) stop(node *cmNode, time int64) { } } -func (self *ClientManager) processed(node *cmNode, time int64) (rcValue, rcCost uint64) { +func (self *ClientManager) processed(node *cmNode, time mclock.AbsTime) (rcValue, rcCost uint64) { self.lock.Lock() defer self.lock.Unlock() diff --git a/les/handler.go b/les/handler.go index b024841f2..603ce9ad4 100644 --- a/les/handler.go +++ b/les/handler.go @@ -24,6 +24,7 @@ import ( "math/big" "net" "sync" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" @@ -228,6 +229,12 @@ func (pm *ProtocolManager) removePeer(id string) { if peer == nil { return } + if err := pm.peers.Unregister(id); err != nil { + if err == errNotRegistered { + return + } + glog.V(logger.Error).Infoln("Removal failed:", err) + } glog.V(logger.Debug).Infoln("Removing peer", id) // Unregister the peer from the downloader and Ethereum peer set @@ -241,9 +248,6 @@ func (pm *ProtocolManager) removePeer(id string) { pm.fetcher.removePeer(peer) } } - if err := pm.peers.Unregister(id); err != nil { - glog.V(logger.Error).Infoln("Removal failed:", err) - } // Hard disconnect at the networking layer if peer != nil { peer.Peer.Disconnect(p2p.DiscUselessPeer) @@ -340,12 +344,14 @@ func (pm *ProtocolManager) handle(p *peer) error { requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error { reqID := getNextReqID() cost := p.GetRequestCost(GetBlockHeadersMsg, amount) + p.fcServer.MustAssignRequest(reqID) p.fcServer.SendRequest(reqID, cost) return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse) } requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error { reqID := getNextReqID() cost := p.GetRequestCost(GetBlockHeadersMsg, amount) + p.fcServer.MustAssignRequest(reqID) p.fcServer.SendRequest(reqID, cost) return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse) } @@ -404,26 +410,23 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { return err } - var costs *requestCosts - var reqCnt, maxReqs int - glog.V(logger.Debug).Infoln("msg:", msg.Code, msg.Size) - if rc, ok := p.fcCosts[msg.Code]; ok { // check if msg is a supported request type - costs = rc - if p.fcClient == nil { - return errResp(ErrRequestRejected, "") + + costs := p.fcCosts[msg.Code] + reject := func(reqCnt, maxCnt uint64) bool { + if p.fcClient == nil || reqCnt > maxCnt { + return true } - bv, ok := p.fcClient.AcceptRequest() - if !ok || bv < costs.baseCost { - return errResp(ErrRequestRejected, "") + bufValue, _ := p.fcClient.AcceptRequest() + cost := costs.baseCost + reqCnt*costs.reqCost + if cost > pm.server.defParams.BufLimit { + cost = pm.server.defParams.BufLimit } - maxReqs = 10000 - if bv < pm.server.defParams.BufLimit { - d := bv - costs.baseCost - if d/10000 < costs.reqCost { - maxReqs = int(d / costs.reqCost) - } + if cost > bufValue { + glog.V(logger.Error).Infof("Request from %v came %v too early", p.id, time.Duration((cost-bufValue)*1000000/pm.server.defParams.MinRecharge)) + return true } + return false } if msg.Size > ProtocolMaxMsgSize { @@ -450,7 +453,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth) if pm.fetcher != nil { - go pm.fetcher.announce(p, &req) + pm.fetcher.announce(p, &req) } case GetBlockHeadersMsg: @@ -465,7 +468,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } query := req.Query - if query.Amount > uint64(maxReqs) || query.Amount > MaxHeaderFetch { + if reject(query.Amount, MaxHeaderFetch) { return errResp(ErrRequestRejected, "") } @@ -573,8 +576,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { bytes int bodies []rlp.RawValue ) - reqCnt = len(req.Hashes) - if reqCnt > maxReqs || reqCnt > MaxBodyFetch { + reqCnt := len(req.Hashes) + if reject(uint64(reqCnt), MaxBodyFetch) { return errResp(ErrRequestRejected, "") } for _, hash := range req.Hashes { @@ -627,8 +630,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { bytes int data [][]byte ) - reqCnt = len(req.Reqs) - if reqCnt > maxReqs || reqCnt > MaxCodeFetch { + reqCnt := len(req.Reqs) + if reject(uint64(reqCnt), MaxCodeFetch) { return errResp(ErrRequestRejected, "") } for _, req := range req.Reqs { @@ -688,8 +691,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { bytes int receipts []rlp.RawValue ) - reqCnt = len(req.Hashes) - if reqCnt > maxReqs || reqCnt > MaxReceiptFetch { + reqCnt := len(req.Hashes) + if reject(uint64(reqCnt), MaxReceiptFetch) { return errResp(ErrRequestRejected, "") } for _, hash := range req.Hashes { @@ -751,8 +754,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { bytes int proofs proofsData ) - reqCnt = len(req.Reqs) - if reqCnt > maxReqs || reqCnt > MaxProofsFetch { + reqCnt := len(req.Reqs) + if reject(uint64(reqCnt), MaxProofsFetch) { return errResp(ErrRequestRejected, "") } for _, req := range req.Reqs { @@ -818,8 +821,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { bytes int proofs []ChtResp ) - reqCnt = len(req.Reqs) - if reqCnt > maxReqs || reqCnt > MaxHeaderProofsFetch { + reqCnt := len(req.Reqs) + if reject(uint64(reqCnt), MaxHeaderProofsFetch) { return errResp(ErrRequestRejected, "") } for _, req := range req.Reqs { @@ -872,8 +875,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if err := msg.Decode(&txs); err != nil { return errResp(ErrDecode, "msg %v: %v", msg, err) } - reqCnt = len(txs) - if reqCnt > maxReqs || reqCnt > MaxTxSend { + reqCnt := len(txs) + if reject(uint64(reqCnt), MaxTxSend) { return errResp(ErrRequestRejected, "") } diff --git a/les/helper_test.go b/les/helper_test.go index ec98389cb..3d6bf3c29 100644 --- a/les/helper_test.go +++ b/les/helper_test.go @@ -336,10 +336,23 @@ func (p *testPeer) close() { p.app.Close() } -type testServerPool peer +type testServerPool struct { + peer *peer + lock sync.RWMutex +} + +func (p *testServerPool) setPeer(peer *peer) { + p.lock.Lock() + defer p.lock.Unlock() + + p.peer = peer +} + +func (p *testServerPool) selectPeerWait(uint64, func(*peer) (bool, time.Duration), <-chan struct{}) *peer { + p.lock.RLock() + defer p.lock.RUnlock() -func (p *testServerPool) selectPeer(func(*peer) (bool, uint64)) *peer { - return (*peer)(p) + return p.peer } func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) { diff --git a/les/odr.go b/les/odr.go index 8878508c4..88c7d85a5 100644 --- a/les/odr.go +++ b/les/odr.go @@ -40,7 +40,7 @@ var ( type peerDropFn func(id string) type odrPeerSelector interface { - selectPeer(func(*peer) (bool, uint64)) *peer + selectPeerWait(uint64, func(*peer) (bool, time.Duration), <-chan struct{}) *peer adjustResponseTime(*poolEntry, time.Duration, bool) } @@ -116,6 +116,7 @@ func (self *LesOdr) Deliver(peer *peer, msg *Msg) error { if req.valFunc(self.db, msg) { close(delivered) req.lock.Lock() + delete(req.sentTo, peer) if req.answered != nil { close(req.answered) req.answered = nil @@ -150,6 +151,7 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha select { case <-delivered: case <-time.After(hardRequestTimeout): + glog.V(logger.Debug).Infof("ODR hard request timeout from peer %v", peer.id) go self.removePeer(peer.id) case <-self.stop: return @@ -187,12 +189,12 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro for { var p *peer if self.serverPool != nil { - p = self.serverPool.selectPeer(func(p *peer) (bool, uint64) { - if !lreq.CanSend(p) { + p = self.serverPool.selectPeerWait(reqID, func(p *peer) (bool, time.Duration) { + if _, ok := exclude[p]; ok || !lreq.CanSend(p) { return false, 0 } return true, p.fcServer.CanSend(lreq.GetCost(p)) - }) + }, ctx.Done()) } if p == nil { select { diff --git a/les/odr_test.go b/les/odr_test.go index b5cbda838..622d89e5c 100644 --- a/les/odr_test.go +++ b/les/odr_test.go @@ -160,7 +160,8 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { pm, db, odr := newTestProtocolManagerMust(t, false, 4, testChainGen) lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) - pool := (*testServerPool)(lpeer) + pool := &testServerPool{} + pool.setPeer(lpeer) odr.serverPool = pool select { case <-time.After(time.Millisecond * 100): @@ -190,13 +191,13 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) { } // temporarily remove peer to test odr fails - odr.serverPool = nil + pool.setPeer(nil) // expect retrievals to fail (except genesis block) without a les peer test(expFail) - odr.serverPool = pool + pool.setPeer(lpeer) // expect all retrievals to pass test(5) - odr.serverPool = nil + pool.setPeer(nil) // still expect all retrievals to pass, now data should be cached locally test(5) } diff --git a/les/peer.go b/les/peer.go index 8d4a83f59..d5008ded1 100644 --- a/les/peer.go +++ b/les/peer.go @@ -241,7 +241,9 @@ func (p *peer) RequestHeaderProofs(reqID, cost uint64, reqs []*ChtReq) error { func (p *peer) SendTxs(cost uint64, txs types.Transactions) error { glog.V(logger.Debug).Infof("%v relaying %v txs", p, len(txs)) - p.fcServer.SendRequest(0, cost) + reqID := getNextReqID() + p.fcServer.MustAssignRequest(reqID) + p.fcServer.SendRequest(reqID, cost) return p2p.Send(p.rw, SendTxMsg, txs) } diff --git a/les/request_test.go b/les/request_test.go index 03b946771..10e9edf8b 100644 --- a/les/request_test.go +++ b/les/request_test.go @@ -71,7 +71,8 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) { pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen) lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil) _, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm) - pool := (*testServerPool)(lpeer) + pool := &testServerPool{} + pool.setPeer(lpeer) odr.serverPool = pool select { case <-time.After(time.Millisecond * 100): @@ -102,10 +103,10 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) { } // temporarily remove peer to test odr fails - odr.serverPool = nil + pool.setPeer(nil) // expect retrievals to fail (except genesis block) without a les peer test(0) - odr.serverPool = pool + pool.setPeer(lpeer) // expect all retrievals to pass test(5) } diff --git a/les/serverpool.go b/les/serverpool.go index 80c446eef..e3b7cf620 100644 --- a/les/serverpool.go +++ b/les/serverpool.go @@ -265,33 +265,77 @@ func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration, type selectPeerItem struct { peer *peer weight int64 + wait time.Duration } func (sp selectPeerItem) Weight() int64 { return sp.weight } -// selectPeer selects a suitable peer for a request -func (pool *serverPool) selectPeer(canSend func(*peer) (bool, uint64)) *peer { +// selectPeer selects a suitable peer for a request, also returning a necessary waiting time to perform the request +// and a "locked" flag meaning that the request has been assigned to the given peer and its execution is guaranteed +// after the given waiting time. If locked flag is false, selectPeer should be called again after the waiting time. +func (pool *serverPool) selectPeer(reqID uint64, canSend func(*peer) (bool, time.Duration)) (*peer, time.Duration, bool) { pool.lock.Lock() - defer pool.lock.Unlock() - + type selectPeer struct { + peer *peer + rstat, tstat float64 + } + var list []selectPeer sel := newWeightedRandomSelect() for _, entry := range pool.entries { if entry.state == psRegistered { - p := entry.peer - ok, cost := canSend(p) - if ok { - w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(entry.responseStats.recentAvg()+float64(cost))/float64(responseScoreTC))*math.Pow((1-entry.timeoutStats.recentAvg()), timeoutPow))) - sel.update(selectPeerItem{peer: p, weight: w}) + if !entry.peer.fcServer.IsAssigned() { + list = append(list, selectPeer{entry.peer, entry.responseStats.recentAvg(), entry.timeoutStats.recentAvg()}) } } } + pool.lock.Unlock() + + for _, sp := range list { + ok, wait := canSend(sp.peer) + if ok { + w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(sp.rstat+float64(wait))/float64(responseScoreTC))*math.Pow((1-sp.tstat), timeoutPow))) + sel.update(selectPeerItem{peer: sp.peer, weight: w, wait: wait}) + } + } choice := sel.choose() if choice == nil { - return nil + return nil, 0, false + } + peer, wait := choice.(selectPeerItem).peer, choice.(selectPeerItem).wait + locked := false + if wait < time.Millisecond*100 { + if peer.fcServer.AssignRequest(reqID) { + ok, w := canSend(peer) + wait = time.Duration(w) + if ok && wait < time.Millisecond*100 { + locked = true + } else { + peer.fcServer.DeassignRequest(reqID) + wait = time.Millisecond * 100 + } + } + } else { + wait = time.Millisecond * 100 + } + return peer, wait, locked +} + +// selectPeer selects a suitable peer for a request, waiting until an assignment to +// the request is guaranteed or the process is aborted. +func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool, time.Duration), abort <-chan struct{}) *peer { + for { + peer, wait, locked := pool.selectPeer(reqID, canSend) + if locked { + return peer + } + select { + case <-abort: + return nil + case <-time.After(wait): + } } - return choice.(selectPeerItem).peer } // eventLoop handles pool events and mutex locking for all internal functions diff --git a/light/txpool_test.go b/light/txpool_test.go index 6927c54f8..e5a4670aa 100644 --- a/light/txpool_test.go +++ b/light/txpool_test.go @@ -32,20 +32,22 @@ import ( ) type testTxRelay struct { - send, nhMined, nhRollback, discard int + send, discard, mined chan int } func (self *testTxRelay) Send(txs types.Transactions) { - self.send = len(txs) + self.send <- len(txs) } func (self *testTxRelay) NewHead(head common.Hash, mined []common.Hash, rollback []common.Hash) { - self.nhMined = len(mined) - self.nhRollback = len(rollback) + m := len(mined) + if m != 0 { + self.mined <- m + } } func (self *testTxRelay) Discard(hashes []common.Hash) { - self.discard = len(hashes) + self.discard <- len(hashes) } const poolTestTxs = 1000 @@ -94,7 +96,11 @@ func TestTxPool(t *testing.T) { } odr := &testOdr{sdb: sdb, ldb: ldb} - relay := &testTxRelay{} + relay := &testTxRelay{ + send: make(chan int, 1), + discard: make(chan int, 1), + mined: make(chan int, 1), + } lightchain, _ := NewLightChain(odr, testChainConfig(), pow, evmux) lightchain.SetValidator(bproc{}) txPermanent = 50 @@ -106,36 +112,33 @@ func TestTxPool(t *testing.T) { s := sentTx(i - 1) e := sentTx(i) for i := s; i < e; i++ { - relay.send = 0 pool.Add(ctx, testTx[i]) - got := relay.send + got := <-relay.send exp := 1 if got != exp { t.Errorf("relay.Send expected len = %d, got %d", exp, got) } } - relay.nhMined = 0 - relay.nhRollback = 0 - relay.discard = 0 if _, err := lightchain.InsertHeaderChain([]*types.Header{block.Header()}, 1); err != nil { panic(err) } - time.Sleep(time.Millisecond * 30) - got := relay.nhMined + got := <-relay.mined exp := minedTx(i) - minedTx(i-1) if got != exp { t.Errorf("relay.NewHead expected len(mined) = %d, got %d", exp, got) } - got = relay.discard exp = 0 if i > int(txPermanent)+1 { exp = minedTx(i-int(txPermanent)-1) - minedTx(i-int(txPermanent)-2) } - if got != exp { - t.Errorf("relay.Discard expected len = %d, got %d", exp, got) + if exp != 0 { + got = <-relay.discard + if got != exp { + t.Errorf("relay.Discard expected len = %d, got %d", exp, got) + } } } } diff --git a/params/version.go b/params/version.go index ba7a57381..6eb920a21 100644 --- a/params/version.go +++ b/params/version.go @@ -21,7 +21,7 @@ import "fmt" const ( VersionMajor = 1 // Major version component of the current release VersionMinor = 5 // Minor version component of the current release - VersionPatch = 6 // Patch version component of the current release + VersionPatch = 7 // Patch version component of the current release VersionMeta = "unstable" // Version metadata to append to the version string ) |