aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--VERSION2
-rw-r--r--cmd/swarm/upload.go82
-rw-r--r--core/database_util.go6
-rw-r--r--les/fetcher.go46
-rw-r--r--les/flowcontrol/control.go121
-rw-r--r--les/flowcontrol/manager.go41
-rw-r--r--les/handler.go69
-rw-r--r--les/helper_test.go19
-rw-r--r--les/odr.go10
-rw-r--r--les/odr_test.go9
-rw-r--r--les/peer.go4
-rw-r--r--les/request_test.go7
-rw-r--r--les/serverpool.go66
-rw-r--r--light/txpool_test.go35
-rw-r--r--params/version.go2
15 files changed, 354 insertions, 165 deletions
diff --git a/VERSION b/VERSION
index eac1e0ada..f01291b87 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.5.6
+1.5.7
diff --git a/cmd/swarm/upload.go b/cmd/swarm/upload.go
index d048bbc40..d8039d45b 100644
--- a/cmd/swarm/upload.go
+++ b/cmd/swarm/upload.go
@@ -50,8 +50,6 @@ func upload(ctx *cli.Context) {
var (
file = args[0]
client = &client{api: bzzapi}
- mroot manifest
- entry manifestEntry
)
fi, err := os.Stat(expandPath(file))
if err != nil {
@@ -61,14 +59,21 @@ func upload(ctx *cli.Context) {
if !recursive {
log.Fatal("argument is a directory and recursive upload is disabled")
}
- mroot, err = client.uploadDirectory(file, defaultPath)
- } else {
- entry, err = client.uploadFile(file, fi)
- mroot = manifest{[]manifestEntry{entry}}
+ if !wantManifest {
+ log.Fatal("manifest is required for directory uploads")
+ }
+ mhash, err := client.uploadDirectory(file, defaultPath)
+ if err != nil {
+ log.Fatal(err)
+ }
+ fmt.Println(mhash)
+ return
}
+ entry, err := client.uploadFile(file, fi)
if err != nil {
log.Fatalln("upload failed:", err)
}
+ mroot := manifest{[]manifestEntry{entry}}
if !wantManifest {
// Print the manifest. This is the only output to stdout.
mrootJSON, _ := json.MarshalIndent(mroot, "", " ")
@@ -123,43 +128,43 @@ type manifest struct {
Entries []manifestEntry `json:"entries,omitempty"`
}
-func (c *client) uploadFile(file string, fi os.FileInfo) (manifestEntry, error) {
- hash, err := c.uploadFileContent(file, fi)
- m := manifestEntry{
- Hash: hash,
- ContentType: mime.TypeByExtension(filepath.Ext(fi.Name())),
+func (c *client) uploadDirectory(dir string, defaultPath string) (string, error) {
+ mhash, err := c.postRaw("application/json", 2, ioutil.NopCloser(bytes.NewReader([]byte("{}"))))
+ if err != nil {
+ return "", fmt.Errorf("failed to upload empty manifest")
}
- return m, err
-}
-
-func (c *client) uploadDirectory(dir string, defaultPath string) (manifest, error) {
- dirm := manifest{}
if len(defaultPath) > 0 {
fi, err := os.Stat(defaultPath)
if err != nil {
- log.Fatal(err)
+ return "", err
}
- entry, err := c.uploadFile(defaultPath, fi)
+ mhash, err = c.uploadToManifest(mhash, "", defaultPath, fi)
if err != nil {
- log.Fatal(err)
+ return "", err
}
- entry.Path = ""
- dirm.Entries = append(dirm.Entries, entry)
}
prefix := filepath.ToSlash(filepath.Clean(dir)) + "/"
- err := filepath.Walk(dir, func(path string, fi os.FileInfo, err error) error {
+ err = filepath.Walk(dir, func(path string, fi os.FileInfo, err error) error {
if err != nil || fi.IsDir() {
return err
}
if !strings.HasPrefix(path, dir) {
return fmt.Errorf("path %s outside directory %s", path, dir)
}
- entry, err := c.uploadFile(path, fi)
- entry.Path = strings.TrimPrefix(filepath.ToSlash(filepath.Clean(path)), prefix)
- dirm.Entries = append(dirm.Entries, entry)
+ uripath := strings.TrimPrefix(filepath.ToSlash(filepath.Clean(path)), prefix)
+ mhash, err = c.uploadToManifest(mhash, uripath, path, fi)
return err
})
- return dirm, err
+ return mhash, err
+}
+
+func (c *client) uploadFile(file string, fi os.FileInfo) (manifestEntry, error) {
+ hash, err := c.uploadFileContent(file, fi)
+ m := manifestEntry{
+ Hash: hash,
+ ContentType: mime.TypeByExtension(filepath.Ext(fi.Name())),
+ }
+ return m, err
}
func (c *client) uploadFileContent(file string, fi os.FileInfo) (string, error) {
@@ -181,6 +186,31 @@ func (c *client) uploadManifest(m manifest) (string, error) {
return c.postRaw("application/json", int64(len(jsm)), ioutil.NopCloser(bytes.NewReader(jsm)))
}
+func (c *client) uploadToManifest(mhash string, path string, fpath string, fi os.FileInfo) (string, error) {
+ fd, err := os.Open(fpath)
+ if err != nil {
+ return "", err
+ }
+ defer fd.Close()
+ log.Printf("uploading file %s (%d bytes) and adding path %v", fpath, fi.Size(), path)
+ req, err := http.NewRequest("PUT", c.api+"/bzz:/"+mhash+"/"+path, fd)
+ if err != nil {
+ return "", err
+ }
+ req.Header.Set("content-type", mime.TypeByExtension(filepath.Ext(fi.Name())))
+ req.ContentLength = fi.Size()
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return "", err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode >= 400 {
+ return "", fmt.Errorf("bad status: %s", resp.Status)
+ }
+ content, err := ioutil.ReadAll(resp.Body)
+ return string(content), err
+}
+
func (c *client) postRaw(mimetype string, size int64, body io.ReadCloser) (string, error) {
req, err := http.NewRequest("POST", c.api+"/bzzr:/", body)
if err != nil {
diff --git a/core/database_util.go b/core/database_util.go
index 84669de35..2060b8b6a 100644
--- a/core/database_util.go
+++ b/core/database_util.go
@@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"math/big"
+ "sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@@ -63,6 +64,8 @@ var (
oldBlockHashPrefix = []byte("block-hash-") // [deprecated by the header/block split, remove eventually]
ChainConfigNotFoundErr = errors.New("ChainConfig not found") // general config not found error
+
+ mipmapBloomMu sync.Mutex // protect against race condition when updating mipmap blooms
)
// encodeBlockNumber encodes a block number as big endian uint64
@@ -564,6 +567,9 @@ func mipmapKey(num, level uint64) []byte {
// WriteMapmapBloom writes each address included in the receipts' logs to the
// MIP bloom bin.
func WriteMipmapBloom(db ethdb.Database, number uint64, receipts types.Receipts) error {
+ mipmapBloomMu.Lock()
+ defer mipmapBloomMu.Unlock()
+
batch := db.NewBatch()
for _, level := range MIPMapLevels {
key := mipmapKey(number, level)
diff --git a/les/fetcher.go b/les/fetcher.go
index 4a0830a8a..de706de5e 100644
--- a/les/fetcher.go
+++ b/les/fetcher.go
@@ -125,7 +125,7 @@ func (f *lightFetcher) syncLoop() {
f.pm.wg.Add(1)
defer f.pm.wg.Done()
- requestStarted := false
+ requesting := false
for {
select {
case <-f.pm.quitSync:
@@ -134,13 +134,13 @@ func (f *lightFetcher) syncLoop() {
// no further requests are necessary or possible
case newAnnounce := <-f.requestChn:
f.lock.Lock()
- s := requestStarted
- requestStarted = false
+ s := requesting
+ requesting = false
if !f.syncing && !(newAnnounce && s) {
- if peer, node, amount := f.nextRequest(); node != nil {
- requestStarted = true
- reqID, started := f.request(peer, node, amount)
- if started {
+ reqID := getNextReqID()
+ if peer, node, amount, retry := f.nextRequest(reqID); node != nil {
+ requesting = true
+ if reqID, ok := f.request(peer, reqID, node, amount); ok {
go func() {
time.Sleep(softRequestTimeout)
f.reqMu.Lock()
@@ -154,6 +154,14 @@ func (f *lightFetcher) syncLoop() {
f.requestChn <- false
}()
}
+ } else {
+ if retry {
+ requesting = true
+ go func() {
+ time.Sleep(time.Millisecond * 100)
+ f.requestChn <- false
+ }()
+ }
}
}
f.lock.Unlock()
@@ -344,10 +352,11 @@ func (f *lightFetcher) peerHasBlock(p *peer, hash common.Hash, number uint64) bo
}
// request initiates a header download request from a certain peer
-func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint64, bool) {
+func (f *lightFetcher) request(p *peer, reqID uint64, n *fetcherTreeNode, amount uint64) (uint64, bool) {
fp := f.peers[p]
if fp == nil {
glog.V(logger.Debug).Infof("request: unknown peer")
+ p.fcServer.DeassignRequest(reqID)
return 0, false
}
if fp.bestConfirmed == nil || fp.root == nil || !f.checkKnownNode(p, fp.root) {
@@ -357,10 +366,10 @@ func (f *lightFetcher) request(p *peer, n *fetcherTreeNode, amount uint64) (uint
f.pm.synchronise(p)
f.syncDone <- p
}()
+ p.fcServer.DeassignRequest(reqID)
return 0, false
}
- reqID := getNextReqID()
n.requested = true
cost := p.GetRequestCost(GetBlockHeadersMsg, int(amount))
p.fcServer.SendRequest(reqID, cost)
@@ -400,7 +409,7 @@ func (f *lightFetcher) requestedID(reqID uint64) bool {
// nextRequest selects the peer and announced head to be requested next, amount
// to be downloaded starting from the head backwards is also returned
-func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) {
+func (f *lightFetcher) nextRequest(reqID uint64) (*peer, *fetcherTreeNode, uint64, bool) {
var (
bestHash common.Hash
bestAmount uint64
@@ -420,21 +429,24 @@ func (f *lightFetcher) nextRequest() (*peer, *fetcherTreeNode, uint64) {
}
}
if bestTd == f.maxConfirmedTd {
- return nil, nil, 0
+ return nil, nil, 0, false
}
- peer := f.pm.serverPool.selectPeer(func(p *peer) (bool, uint64) {
+ peer, _, locked := f.pm.serverPool.selectPeer(reqID, func(p *peer) (bool, time.Duration) {
fp := f.peers[p]
if fp == nil || fp.nodeByHash[bestHash] == nil {
return false, 0
}
return true, p.fcServer.CanSend(p.GetRequestCost(GetBlockHeadersMsg, int(bestAmount)))
})
+ if !locked {
+ return nil, nil, 0, true
+ }
var node *fetcherTreeNode
if peer != nil {
node = f.peers[peer].nodeByHash[bestHash]
}
- return peer, node, bestAmount
+ return peer, node, bestAmount, false
}
// deliverHeaders delivers header download request responses for processing
@@ -442,9 +454,10 @@ func (f *lightFetcher) deliverHeaders(peer *peer, reqID uint64, headers []*types
f.deliverChn <- fetchResponse{reqID: reqID, headers: headers, peer: peer}
}
-// processResponse processes header download request responses
+// processResponse processes header download request responses, returns true if successful
func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) bool {
if uint64(len(resp.headers)) != req.amount || resp.headers[0].Hash() != req.hash {
+ glog.V(logger.Debug).Infof("response mismatch %v %016x != %v %016x", len(resp.headers), resp.headers[0].Hash().Bytes()[:8], req.amount, req.hash[:8])
return false
}
headers := make([]*types.Header, req.amount)
@@ -452,12 +465,17 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo
headers[int(req.amount)-1-i] = header
}
if _, err := f.chain.InsertHeaderChain(headers, 1); err != nil {
+ if err == core.BlockFutureErr {
+ return true
+ }
+ glog.V(logger.Debug).Infof("InsertHeaderChain error: %v", err)
return false
}
tds := make([]*big.Int, len(headers))
for i, header := range headers {
td := f.chain.GetTd(header.Hash(), header.Number.Uint64())
if td == nil {
+ glog.V(logger.Debug).Infof("TD not found for header %v of %v", i+1, len(headers))
return false
}
tds[i] = td
diff --git a/les/flowcontrol/control.go b/les/flowcontrol/control.go
index acb131ea4..e45537cf5 100644
--- a/les/flowcontrol/control.go
+++ b/les/flowcontrol/control.go
@@ -24,7 +24,7 @@ import (
"github.com/ethereum/go-ethereum/common/mclock"
)
-const fcTimeConst = 1000000
+const fcTimeConst = time.Millisecond
type ServerParams struct {
BufLimit, MinRecharge uint64
@@ -33,7 +33,7 @@ type ServerParams struct {
type ClientNode struct {
params *ServerParams
bufValue uint64
- lastTime int64
+ lastTime mclock.AbsTime
lock sync.Mutex
cm *ClientManager
cmNode *cmNode
@@ -44,7 +44,7 @@ func NewClientNode(cm *ClientManager, params *ServerParams) *ClientNode {
cm: cm,
params: params,
bufValue: params.BufLimit,
- lastTime: getTime(),
+ lastTime: mclock.Now(),
}
node.cmNode = cm.addNode(node)
return node
@@ -54,12 +54,12 @@ func (peer *ClientNode) Remove(cm *ClientManager) {
cm.removeNode(peer.cmNode)
}
-func (peer *ClientNode) recalcBV(time int64) {
+func (peer *ClientNode) recalcBV(time mclock.AbsTime) {
dt := uint64(time - peer.lastTime)
if time < peer.lastTime {
dt = 0
}
- peer.bufValue += peer.params.MinRecharge * dt / fcTimeConst
+ peer.bufValue += peer.params.MinRecharge * dt / uint64(fcTimeConst)
if peer.bufValue > peer.params.BufLimit {
peer.bufValue = peer.params.BufLimit
}
@@ -70,7 +70,7 @@ func (peer *ClientNode) AcceptRequest() (uint64, bool) {
peer.lock.Lock()
defer peer.lock.Unlock()
- time := getTime()
+ time := mclock.Now()
peer.recalcBV(time)
return peer.bufValue, peer.cm.accept(peer.cmNode, time)
}
@@ -79,7 +79,7 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) {
peer.lock.Lock()
defer peer.lock.Unlock()
- time := getTime()
+ time := mclock.Now()
peer.recalcBV(time)
peer.bufValue -= cost
peer.recalcBV(time)
@@ -94,66 +94,127 @@ func (peer *ClientNode) RequestProcessed(cost uint64) (bv, realCost uint64) {
}
type ServerNode struct {
- bufEstimate uint64
- lastTime int64
- params *ServerParams
- sumCost uint64 // sum of req costs sent to this server
- pending map[uint64]uint64 // value = sumCost after sending the given req
- lock sync.RWMutex
+ bufEstimate uint64
+ lastTime mclock.AbsTime
+ params *ServerParams
+ sumCost uint64 // sum of req costs sent to this server
+ pending map[uint64]uint64 // value = sumCost after sending the given req
+ assignedRequest uint64 // when != 0, only the request with the given ID can be sent to this peer
+ assignToken chan struct{} // send to this channel before assigning, read from it after deassigning
+ lock sync.RWMutex
}
func NewServerNode(params *ServerParams) *ServerNode {
return &ServerNode{
bufEstimate: params.BufLimit,
- lastTime: getTime(),
+ lastTime: mclock.Now(),
params: params,
pending: make(map[uint64]uint64),
+ assignToken: make(chan struct{}, 1),
}
}
-func getTime() int64 {
- return int64(mclock.Now())
-}
-
-func (peer *ServerNode) recalcBLE(time int64) {
+func (peer *ServerNode) recalcBLE(time mclock.AbsTime) {
dt := uint64(time - peer.lastTime)
if time < peer.lastTime {
dt = 0
}
- peer.bufEstimate += peer.params.MinRecharge * dt / fcTimeConst
+ peer.bufEstimate += peer.params.MinRecharge * dt / uint64(fcTimeConst)
if peer.bufEstimate > peer.params.BufLimit {
peer.bufEstimate = peer.params.BufLimit
}
peer.lastTime = time
}
-func (peer *ServerNode) canSend(maxCost uint64) uint64 {
+// safetyMargin is added to the flow control waiting time when estimated buffer value is low
+const safetyMargin = time.Millisecond * 200
+
+func (peer *ServerNode) canSend(maxCost uint64) time.Duration {
+ maxCost += uint64(safetyMargin) * peer.params.MinRecharge / uint64(fcTimeConst)
+ if maxCost > peer.params.BufLimit {
+ maxCost = peer.params.BufLimit
+ }
if peer.bufEstimate >= maxCost {
return 0
}
- return (maxCost - peer.bufEstimate) * fcTimeConst / peer.params.MinRecharge
+ return time.Duration((maxCost - peer.bufEstimate) * uint64(fcTimeConst) / peer.params.MinRecharge)
}
-func (peer *ServerNode) CanSend(maxCost uint64) uint64 {
+// CanSend returns the minimum waiting time required before sending a request
+// with the given maximum estimated cost
+func (peer *ServerNode) CanSend(maxCost uint64) time.Duration {
peer.lock.RLock()
defer peer.lock.RUnlock()
return peer.canSend(maxCost)
}
+// AssignRequest tries to assign the server node to the given request, guaranteeing
+// that once it returns true, no request will be sent to the node before this one
+func (peer *ServerNode) AssignRequest(reqID uint64) bool {
+ select {
+ case peer.assignToken <- struct{}{}:
+ default:
+ return false
+ }
+ peer.lock.Lock()
+ peer.assignedRequest = reqID
+ peer.lock.Unlock()
+ return true
+}
+
+// MustAssignRequest waits until the node can be assigned to the given request.
+// It is always guaranteed that assignments are released in a short amount of time.
+func (peer *ServerNode) MustAssignRequest(reqID uint64) {
+ peer.assignToken <- struct{}{}
+ peer.lock.Lock()
+ peer.assignedRequest = reqID
+ peer.lock.Unlock()
+}
+
+// DeassignRequest releases a request assignment in case the planned request
+// is not being sent.
+func (peer *ServerNode) DeassignRequest(reqID uint64) {
+ peer.lock.Lock()
+ if peer.assignedRequest == reqID {
+ peer.assignedRequest = 0
+ <-peer.assignToken
+ }
+ peer.lock.Unlock()
+}
+
+// IsAssigned returns true if the server node has already been assigned to a request
+// (note that this function returning false does not guarantee that you can assign a request
+// immediately afterwards, its only purpose is to help peer selection)
+func (peer *ServerNode) IsAssigned() bool {
+ peer.lock.RLock()
+ locked := peer.assignedRequest != 0
+ peer.lock.RUnlock()
+ return locked
+}
+
// blocks until request can be sent
func (peer *ServerNode) SendRequest(reqID, maxCost uint64) {
peer.lock.Lock()
defer peer.lock.Unlock()
- peer.recalcBLE(getTime())
- for peer.bufEstimate < maxCost {
- wait := time.Duration(peer.canSend(maxCost))
+ if peer.assignedRequest != reqID {
+ peer.lock.Unlock()
+ peer.MustAssignRequest(reqID)
+ peer.lock.Lock()
+ }
+
+ peer.recalcBLE(mclock.Now())
+ wait := peer.canSend(maxCost)
+ for wait > 0 {
peer.lock.Unlock()
time.Sleep(wait)
peer.lock.Lock()
- peer.recalcBLE(getTime())
+ peer.recalcBLE(mclock.Now())
+ wait = peer.canSend(maxCost)
}
+ peer.assignedRequest = 0
+ <-peer.assignToken
peer.bufEstimate -= maxCost
peer.sumCost += maxCost
if reqID >= 0 {
@@ -162,14 +223,18 @@ func (peer *ServerNode) SendRequest(reqID, maxCost uint64) {
}
func (peer *ServerNode) GotReply(reqID, bv uint64) {
+
peer.lock.Lock()
defer peer.lock.Unlock()
+ if bv > peer.params.BufLimit {
+ bv = peer.params.BufLimit
+ }
sc, ok := peer.pending[reqID]
if !ok {
return
}
delete(peer.pending, reqID)
peer.bufEstimate = bv - (peer.sumCost - sc)
- peer.lastTime = getTime()
+ peer.lastTime = mclock.Now()
}
diff --git a/les/flowcontrol/manager.go b/les/flowcontrol/manager.go
index f9f029466..28cc6f0fe 100644
--- a/les/flowcontrol/manager.go
+++ b/les/flowcontrol/manager.go
@@ -20,22 +20,23 @@ package flowcontrol
import (
"sync"
"time"
+
+ "github.com/ethereum/go-ethereum/common/mclock"
)
const rcConst = 1000000
type cmNode struct {
- node *ClientNode
- lastUpdate int64
- reqAccepted int64
- serving, recharging bool
- rcWeight uint64
- rcValue, rcDelta int64
- finishRecharge, startValue int64
+ node *ClientNode
+ lastUpdate mclock.AbsTime
+ serving, recharging bool
+ rcWeight uint64
+ rcValue, rcDelta, startValue int64
+ finishRecharge mclock.AbsTime
}
-func (node *cmNode) update(time int64) {
- dt := time - node.lastUpdate
+func (node *cmNode) update(time mclock.AbsTime) {
+ dt := int64(time - node.lastUpdate)
node.rcValue += node.rcDelta * dt / rcConst
node.lastUpdate = time
if node.recharging && time >= node.finishRecharge {
@@ -62,7 +63,7 @@ func (node *cmNode) set(serving bool, simReqCnt, sumWeight uint64) {
}
if node.recharging {
node.rcDelta = -int64(node.node.cm.rcRecharge * node.rcWeight / sumWeight)
- node.finishRecharge = node.lastUpdate + node.rcValue*rcConst/(-node.rcDelta)
+ node.finishRecharge = node.lastUpdate + mclock.AbsTime(node.rcValue*rcConst/(-node.rcDelta))
}
}
@@ -73,7 +74,7 @@ type ClientManager struct {
maxSimReq, maxRcSum uint64
rcRecharge uint64
resumeQueue chan chan bool
- time int64
+ time mclock.AbsTime
}
func NewClientManager(rcTarget, maxSimReq, maxRcSum uint64) *ClientManager {
@@ -98,7 +99,7 @@ func (self *ClientManager) Stop() {
}
func (self *ClientManager) addNode(cnode *ClientNode) *cmNode {
- time := getTime()
+ time := mclock.Now()
node := &cmNode{
node: cnode,
lastUpdate: time,
@@ -109,7 +110,7 @@ func (self *ClientManager) addNode(cnode *ClientNode) *cmNode {
defer self.lock.Unlock()
self.nodes[node] = struct{}{}
- self.update(getTime())
+ self.update(mclock.Now())
return node
}
@@ -117,14 +118,14 @@ func (self *ClientManager) removeNode(node *cmNode) {
self.lock.Lock()
defer self.lock.Unlock()
- time := getTime()
+ time := mclock.Now()
self.stop(node, time)
delete(self.nodes, node)
self.update(time)
}
// recalc sumWeight
-func (self *ClientManager) updateNodes(time int64) (rce bool) {
+func (self *ClientManager) updateNodes(time mclock.AbsTime) (rce bool) {
var sumWeight, rcSum uint64
for node := range self.nodes {
rc := node.recharging
@@ -142,7 +143,7 @@ func (self *ClientManager) updateNodes(time int64) (rce bool) {
return
}
-func (self *ClientManager) update(time int64) {
+func (self *ClientManager) update(time mclock.AbsTime) {
for {
firstTime := time
for node := range self.nodes {
@@ -172,7 +173,7 @@ func (self *ClientManager) queueProc() {
for {
time.Sleep(time.Millisecond * 10)
self.lock.Lock()
- self.update(getTime())
+ self.update(mclock.Now())
cs := self.canStartReq()
self.lock.Unlock()
if cs {
@@ -183,7 +184,7 @@ func (self *ClientManager) queueProc() {
}
}
-func (self *ClientManager) accept(node *cmNode, time int64) bool {
+func (self *ClientManager) accept(node *cmNode, time mclock.AbsTime) bool {
self.lock.Lock()
defer self.lock.Unlock()
@@ -205,7 +206,7 @@ func (self *ClientManager) accept(node *cmNode, time int64) bool {
return true
}
-func (self *ClientManager) stop(node *cmNode, time int64) {
+func (self *ClientManager) stop(node *cmNode, time mclock.AbsTime) {
if node.serving {
self.update(time)
self.simReqCnt--
@@ -214,7 +215,7 @@ func (self *ClientManager) stop(node *cmNode, time int64) {
}
}
-func (self *ClientManager) processed(node *cmNode, time int64) (rcValue, rcCost uint64) {
+func (self *ClientManager) processed(node *cmNode, time mclock.AbsTime) (rcValue, rcCost uint64) {
self.lock.Lock()
defer self.lock.Unlock()
diff --git a/les/handler.go b/les/handler.go
index b024841f2..603ce9ad4 100644
--- a/les/handler.go
+++ b/les/handler.go
@@ -24,6 +24,7 @@ import (
"math/big"
"net"
"sync"
+ "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
@@ -228,6 +229,12 @@ func (pm *ProtocolManager) removePeer(id string) {
if peer == nil {
return
}
+ if err := pm.peers.Unregister(id); err != nil {
+ if err == errNotRegistered {
+ return
+ }
+ glog.V(logger.Error).Infoln("Removal failed:", err)
+ }
glog.V(logger.Debug).Infoln("Removing peer", id)
// Unregister the peer from the downloader and Ethereum peer set
@@ -241,9 +248,6 @@ func (pm *ProtocolManager) removePeer(id string) {
pm.fetcher.removePeer(peer)
}
}
- if err := pm.peers.Unregister(id); err != nil {
- glog.V(logger.Error).Infoln("Removal failed:", err)
- }
// Hard disconnect at the networking layer
if peer != nil {
peer.Peer.Disconnect(p2p.DiscUselessPeer)
@@ -340,12 +344,14 @@ func (pm *ProtocolManager) handle(p *peer) error {
requestHeadersByHash := func(origin common.Hash, amount int, skip int, reverse bool) error {
reqID := getNextReqID()
cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
+ p.fcServer.MustAssignRequest(reqID)
p.fcServer.SendRequest(reqID, cost)
return p.RequestHeadersByHash(reqID, cost, origin, amount, skip, reverse)
}
requestHeadersByNumber := func(origin uint64, amount int, skip int, reverse bool) error {
reqID := getNextReqID()
cost := p.GetRequestCost(GetBlockHeadersMsg, amount)
+ p.fcServer.MustAssignRequest(reqID)
p.fcServer.SendRequest(reqID, cost)
return p.RequestHeadersByNumber(reqID, cost, origin, amount, skip, reverse)
}
@@ -404,26 +410,23 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
return err
}
- var costs *requestCosts
- var reqCnt, maxReqs int
-
glog.V(logger.Debug).Infoln("msg:", msg.Code, msg.Size)
- if rc, ok := p.fcCosts[msg.Code]; ok { // check if msg is a supported request type
- costs = rc
- if p.fcClient == nil {
- return errResp(ErrRequestRejected, "")
+
+ costs := p.fcCosts[msg.Code]
+ reject := func(reqCnt, maxCnt uint64) bool {
+ if p.fcClient == nil || reqCnt > maxCnt {
+ return true
}
- bv, ok := p.fcClient.AcceptRequest()
- if !ok || bv < costs.baseCost {
- return errResp(ErrRequestRejected, "")
+ bufValue, _ := p.fcClient.AcceptRequest()
+ cost := costs.baseCost + reqCnt*costs.reqCost
+ if cost > pm.server.defParams.BufLimit {
+ cost = pm.server.defParams.BufLimit
}
- maxReqs = 10000
- if bv < pm.server.defParams.BufLimit {
- d := bv - costs.baseCost
- if d/10000 < costs.reqCost {
- maxReqs = int(d / costs.reqCost)
- }
+ if cost > bufValue {
+ glog.V(logger.Error).Infof("Request from %v came %v too early", p.id, time.Duration((cost-bufValue)*1000000/pm.server.defParams.MinRecharge))
+ return true
}
+ return false
}
if msg.Size > ProtocolMaxMsgSize {
@@ -450,7 +453,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
glog.V(logger.Detail).Infoln("AnnounceMsg:", req.Number, req.Hash, req.Td, req.ReorgDepth)
if pm.fetcher != nil {
- go pm.fetcher.announce(p, &req)
+ pm.fetcher.announce(p, &req)
}
case GetBlockHeadersMsg:
@@ -465,7 +468,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
query := req.Query
- if query.Amount > uint64(maxReqs) || query.Amount > MaxHeaderFetch {
+ if reject(query.Amount, MaxHeaderFetch) {
return errResp(ErrRequestRejected, "")
}
@@ -573,8 +576,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
bytes int
bodies []rlp.RawValue
)
- reqCnt = len(req.Hashes)
- if reqCnt > maxReqs || reqCnt > MaxBodyFetch {
+ reqCnt := len(req.Hashes)
+ if reject(uint64(reqCnt), MaxBodyFetch) {
return errResp(ErrRequestRejected, "")
}
for _, hash := range req.Hashes {
@@ -627,8 +630,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
bytes int
data [][]byte
)
- reqCnt = len(req.Reqs)
- if reqCnt > maxReqs || reqCnt > MaxCodeFetch {
+ reqCnt := len(req.Reqs)
+ if reject(uint64(reqCnt), MaxCodeFetch) {
return errResp(ErrRequestRejected, "")
}
for _, req := range req.Reqs {
@@ -688,8 +691,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
bytes int
receipts []rlp.RawValue
)
- reqCnt = len(req.Hashes)
- if reqCnt > maxReqs || reqCnt > MaxReceiptFetch {
+ reqCnt := len(req.Hashes)
+ if reject(uint64(reqCnt), MaxReceiptFetch) {
return errResp(ErrRequestRejected, "")
}
for _, hash := range req.Hashes {
@@ -751,8 +754,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
bytes int
proofs proofsData
)
- reqCnt = len(req.Reqs)
- if reqCnt > maxReqs || reqCnt > MaxProofsFetch {
+ reqCnt := len(req.Reqs)
+ if reject(uint64(reqCnt), MaxProofsFetch) {
return errResp(ErrRequestRejected, "")
}
for _, req := range req.Reqs {
@@ -818,8 +821,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
bytes int
proofs []ChtResp
)
- reqCnt = len(req.Reqs)
- if reqCnt > maxReqs || reqCnt > MaxHeaderProofsFetch {
+ reqCnt := len(req.Reqs)
+ if reject(uint64(reqCnt), MaxHeaderProofsFetch) {
return errResp(ErrRequestRejected, "")
}
for _, req := range req.Reqs {
@@ -872,8 +875,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
if err := msg.Decode(&txs); err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
- reqCnt = len(txs)
- if reqCnt > maxReqs || reqCnt > MaxTxSend {
+ reqCnt := len(txs)
+ if reject(uint64(reqCnt), MaxTxSend) {
return errResp(ErrRequestRejected, "")
}
diff --git a/les/helper_test.go b/les/helper_test.go
index ec98389cb..3d6bf3c29 100644
--- a/les/helper_test.go
+++ b/les/helper_test.go
@@ -336,10 +336,23 @@ func (p *testPeer) close() {
p.app.Close()
}
-type testServerPool peer
+type testServerPool struct {
+ peer *peer
+ lock sync.RWMutex
+}
+
+func (p *testServerPool) setPeer(peer *peer) {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ p.peer = peer
+}
+
+func (p *testServerPool) selectPeerWait(uint64, func(*peer) (bool, time.Duration), <-chan struct{}) *peer {
+ p.lock.RLock()
+ defer p.lock.RUnlock()
-func (p *testServerPool) selectPeer(func(*peer) (bool, uint64)) *peer {
- return (*peer)(p)
+ return p.peer
}
func (p *testServerPool) adjustResponseTime(*poolEntry, time.Duration, bool) {
diff --git a/les/odr.go b/les/odr.go
index 8878508c4..88c7d85a5 100644
--- a/les/odr.go
+++ b/les/odr.go
@@ -40,7 +40,7 @@ var (
type peerDropFn func(id string)
type odrPeerSelector interface {
- selectPeer(func(*peer) (bool, uint64)) *peer
+ selectPeerWait(uint64, func(*peer) (bool, time.Duration), <-chan struct{}) *peer
adjustResponseTime(*poolEntry, time.Duration, bool)
}
@@ -116,6 +116,7 @@ func (self *LesOdr) Deliver(peer *peer, msg *Msg) error {
if req.valFunc(self.db, msg) {
close(delivered)
req.lock.Lock()
+ delete(req.sentTo, peer)
if req.answered != nil {
close(req.answered)
req.answered = nil
@@ -150,6 +151,7 @@ func (self *LesOdr) requestPeer(req *sentReq, peer *peer, delivered, timeout cha
select {
case <-delivered:
case <-time.After(hardRequestTimeout):
+ glog.V(logger.Debug).Infof("ODR hard request timeout from peer %v", peer.id)
go self.removePeer(peer.id)
case <-self.stop:
return
@@ -187,12 +189,12 @@ func (self *LesOdr) networkRequest(ctx context.Context, lreq LesOdrRequest) erro
for {
var p *peer
if self.serverPool != nil {
- p = self.serverPool.selectPeer(func(p *peer) (bool, uint64) {
- if !lreq.CanSend(p) {
+ p = self.serverPool.selectPeerWait(reqID, func(p *peer) (bool, time.Duration) {
+ if _, ok := exclude[p]; ok || !lreq.CanSend(p) {
return false, 0
}
return true, p.fcServer.CanSend(lreq.GetCost(p))
- })
+ }, ctx.Done())
}
if p == nil {
select {
diff --git a/les/odr_test.go b/les/odr_test.go
index b5cbda838..622d89e5c 100644
--- a/les/odr_test.go
+++ b/les/odr_test.go
@@ -160,7 +160,8 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
pm, db, odr := newTestProtocolManagerMust(t, false, 4, testChainGen)
lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
- pool := (*testServerPool)(lpeer)
+ pool := &testServerPool{}
+ pool.setPeer(lpeer)
odr.serverPool = pool
select {
case <-time.After(time.Millisecond * 100):
@@ -190,13 +191,13 @@ func testOdr(t *testing.T, protocol int, expFail uint64, fn odrTestFn) {
}
// temporarily remove peer to test odr fails
- odr.serverPool = nil
+ pool.setPeer(nil)
// expect retrievals to fail (except genesis block) without a les peer
test(expFail)
- odr.serverPool = pool
+ pool.setPeer(lpeer)
// expect all retrievals to pass
test(5)
- odr.serverPool = nil
+ pool.setPeer(nil)
// still expect all retrievals to pass, now data should be cached locally
test(5)
}
diff --git a/les/peer.go b/les/peer.go
index 8d4a83f59..d5008ded1 100644
--- a/les/peer.go
+++ b/les/peer.go
@@ -241,7 +241,9 @@ func (p *peer) RequestHeaderProofs(reqID, cost uint64, reqs []*ChtReq) error {
func (p *peer) SendTxs(cost uint64, txs types.Transactions) error {
glog.V(logger.Debug).Infof("%v relaying %v txs", p, len(txs))
- p.fcServer.SendRequest(0, cost)
+ reqID := getNextReqID()
+ p.fcServer.MustAssignRequest(reqID)
+ p.fcServer.SendRequest(reqID, cost)
return p2p.Send(p.rw, SendTxMsg, txs)
}
diff --git a/les/request_test.go b/les/request_test.go
index 03b946771..10e9edf8b 100644
--- a/les/request_test.go
+++ b/les/request_test.go
@@ -71,7 +71,8 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
pm, db, _ := newTestProtocolManagerMust(t, false, 4, testChainGen)
lpm, ldb, odr := newTestProtocolManagerMust(t, true, 0, nil)
_, err1, lpeer, err2 := newTestPeerPair("peer", protocol, pm, lpm)
- pool := (*testServerPool)(lpeer)
+ pool := &testServerPool{}
+ pool.setPeer(lpeer)
odr.serverPool = pool
select {
case <-time.After(time.Millisecond * 100):
@@ -102,10 +103,10 @@ func testAccess(t *testing.T, protocol int, fn accessTestFn) {
}
// temporarily remove peer to test odr fails
- odr.serverPool = nil
+ pool.setPeer(nil)
// expect retrievals to fail (except genesis block) without a les peer
test(0)
- odr.serverPool = pool
+ pool.setPeer(lpeer)
// expect all retrievals to pass
test(5)
}
diff --git a/les/serverpool.go b/les/serverpool.go
index 80c446eef..e3b7cf620 100644
--- a/les/serverpool.go
+++ b/les/serverpool.go
@@ -265,33 +265,77 @@ func (pool *serverPool) adjustResponseTime(entry *poolEntry, time time.Duration,
type selectPeerItem struct {
peer *peer
weight int64
+ wait time.Duration
}
func (sp selectPeerItem) Weight() int64 {
return sp.weight
}
-// selectPeer selects a suitable peer for a request
-func (pool *serverPool) selectPeer(canSend func(*peer) (bool, uint64)) *peer {
+// selectPeer selects a suitable peer for a request, also returning a necessary waiting time to perform the request
+// and a "locked" flag meaning that the request has been assigned to the given peer and its execution is guaranteed
+// after the given waiting time. If locked flag is false, selectPeer should be called again after the waiting time.
+func (pool *serverPool) selectPeer(reqID uint64, canSend func(*peer) (bool, time.Duration)) (*peer, time.Duration, bool) {
pool.lock.Lock()
- defer pool.lock.Unlock()
-
+ type selectPeer struct {
+ peer *peer
+ rstat, tstat float64
+ }
+ var list []selectPeer
sel := newWeightedRandomSelect()
for _, entry := range pool.entries {
if entry.state == psRegistered {
- p := entry.peer
- ok, cost := canSend(p)
- if ok {
- w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(entry.responseStats.recentAvg()+float64(cost))/float64(responseScoreTC))*math.Pow((1-entry.timeoutStats.recentAvg()), timeoutPow)))
- sel.update(selectPeerItem{peer: p, weight: w})
+ if !entry.peer.fcServer.IsAssigned() {
+ list = append(list, selectPeer{entry.peer, entry.responseStats.recentAvg(), entry.timeoutStats.recentAvg()})
}
}
}
+ pool.lock.Unlock()
+
+ for _, sp := range list {
+ ok, wait := canSend(sp.peer)
+ if ok {
+ w := int64(1000000000 * (peerSelectMinWeight + math.Exp(-(sp.rstat+float64(wait))/float64(responseScoreTC))*math.Pow((1-sp.tstat), timeoutPow)))
+ sel.update(selectPeerItem{peer: sp.peer, weight: w, wait: wait})
+ }
+ }
choice := sel.choose()
if choice == nil {
- return nil
+ return nil, 0, false
+ }
+ peer, wait := choice.(selectPeerItem).peer, choice.(selectPeerItem).wait
+ locked := false
+ if wait < time.Millisecond*100 {
+ if peer.fcServer.AssignRequest(reqID) {
+ ok, w := canSend(peer)
+ wait = time.Duration(w)
+ if ok && wait < time.Millisecond*100 {
+ locked = true
+ } else {
+ peer.fcServer.DeassignRequest(reqID)
+ wait = time.Millisecond * 100
+ }
+ }
+ } else {
+ wait = time.Millisecond * 100
+ }
+ return peer, wait, locked
+}
+
+// selectPeer selects a suitable peer for a request, waiting until an assignment to
+// the request is guaranteed or the process is aborted.
+func (pool *serverPool) selectPeerWait(reqID uint64, canSend func(*peer) (bool, time.Duration), abort <-chan struct{}) *peer {
+ for {
+ peer, wait, locked := pool.selectPeer(reqID, canSend)
+ if locked {
+ return peer
+ }
+ select {
+ case <-abort:
+ return nil
+ case <-time.After(wait):
+ }
}
- return choice.(selectPeerItem).peer
}
// eventLoop handles pool events and mutex locking for all internal functions
diff --git a/light/txpool_test.go b/light/txpool_test.go
index 6927c54f8..e5a4670aa 100644
--- a/light/txpool_test.go
+++ b/light/txpool_test.go
@@ -32,20 +32,22 @@ import (
)
type testTxRelay struct {
- send, nhMined, nhRollback, discard int
+ send, discard, mined chan int
}
func (self *testTxRelay) Send(txs types.Transactions) {
- self.send = len(txs)
+ self.send <- len(txs)
}
func (self *testTxRelay) NewHead(head common.Hash, mined []common.Hash, rollback []common.Hash) {
- self.nhMined = len(mined)
- self.nhRollback = len(rollback)
+ m := len(mined)
+ if m != 0 {
+ self.mined <- m
+ }
}
func (self *testTxRelay) Discard(hashes []common.Hash) {
- self.discard = len(hashes)
+ self.discard <- len(hashes)
}
const poolTestTxs = 1000
@@ -94,7 +96,11 @@ func TestTxPool(t *testing.T) {
}
odr := &testOdr{sdb: sdb, ldb: ldb}
- relay := &testTxRelay{}
+ relay := &testTxRelay{
+ send: make(chan int, 1),
+ discard: make(chan int, 1),
+ mined: make(chan int, 1),
+ }
lightchain, _ := NewLightChain(odr, testChainConfig(), pow, evmux)
lightchain.SetValidator(bproc{})
txPermanent = 50
@@ -106,36 +112,33 @@ func TestTxPool(t *testing.T) {
s := sentTx(i - 1)
e := sentTx(i)
for i := s; i < e; i++ {
- relay.send = 0
pool.Add(ctx, testTx[i])
- got := relay.send
+ got := <-relay.send
exp := 1
if got != exp {
t.Errorf("relay.Send expected len = %d, got %d", exp, got)
}
}
- relay.nhMined = 0
- relay.nhRollback = 0
- relay.discard = 0
if _, err := lightchain.InsertHeaderChain([]*types.Header{block.Header()}, 1); err != nil {
panic(err)
}
- time.Sleep(time.Millisecond * 30)
- got := relay.nhMined
+ got := <-relay.mined
exp := minedTx(i) - minedTx(i-1)
if got != exp {
t.Errorf("relay.NewHead expected len(mined) = %d, got %d", exp, got)
}
- got = relay.discard
exp = 0
if i > int(txPermanent)+1 {
exp = minedTx(i-int(txPermanent)-1) - minedTx(i-int(txPermanent)-2)
}
- if got != exp {
- t.Errorf("relay.Discard expected len = %d, got %d", exp, got)
+ if exp != 0 {
+ got = <-relay.discard
+ if got != exp {
+ t.Errorf("relay.Discard expected len = %d, got %d", exp, got)
+ }
}
}
}
diff --git a/params/version.go b/params/version.go
index ba7a57381..6eb920a21 100644
--- a/params/version.go
+++ b/params/version.go
@@ -21,7 +21,7 @@ import "fmt"
const (
VersionMajor = 1 // Major version component of the current release
VersionMinor = 5 // Minor version component of the current release
- VersionPatch = 6 // Patch version component of the current release
+ VersionPatch = 7 // Patch version component of the current release
VersionMeta = "unstable" // Version metadata to append to the version string
)