From 525116dbff916825463931361f75e75e955c12e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felf=C3=B6ldi=20Zsolt?= Date: Wed, 22 Mar 2017 20:44:22 +0100 Subject: les: implement request distributor, fix blocking issues (#3660) * les: implement request distributor, fix blocking issues * core: moved header validation before chain mutex lock --- les/distributor_test.go | 192 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 les/distributor_test.go (limited to 'les/distributor_test.go') diff --git a/les/distributor_test.go b/les/distributor_test.go new file mode 100644 index 000000000..f2eb80729 --- /dev/null +++ b/les/distributor_test.go @@ -0,0 +1,192 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +// Package light implements on-demand retrieval capable state and chain objects +// for the Ethereum Light Client. +package les + +import ( + "math/rand" + "sync" + "testing" + "time" +) + +type testDistReq struct { + cost, procTime, order uint64 + canSendTo map[*testDistPeer]struct{} +} + +func (r *testDistReq) getCost(dp distPeer) uint64 { + return r.cost +} + +func (r *testDistReq) canSend(dp distPeer) bool { + _, ok := r.canSendTo[dp.(*testDistPeer)] + return ok +} + +func (r *testDistReq) request(dp distPeer) func() { + return func() { dp.(*testDistPeer).send(r) } +} + +type testDistPeer struct { + sent []*testDistReq + sumCost uint64 + lock sync.RWMutex +} + +func (p *testDistPeer) send(r *testDistReq) { + p.lock.Lock() + defer p.lock.Unlock() + + p.sent = append(p.sent, r) + p.sumCost += r.cost +} + +func (p *testDistPeer) worker(t *testing.T, checkOrder bool, stop chan struct{}) { + var last uint64 + for { + wait := time.Millisecond + p.lock.Lock() + if len(p.sent) > 0 { + rq := p.sent[0] + wait = time.Duration(rq.procTime) + p.sumCost -= rq.cost + if checkOrder { + if rq.order <= last { + t.Errorf("Requests processed in wrong order") + } + last = rq.order + } + p.sent = p.sent[1:] + } + p.lock.Unlock() + select { + case <-stop: + return + case <-time.After(wait): + } + } +} + +const ( + testDistBufLimit = 10000000 + testDistMaxCost = 1000000 + testDistPeerCount = 5 + testDistReqCount = 50000 + testDistMaxResendCount = 3 +) + +func (p *testDistPeer) waitBefore(cost uint64) (time.Duration, float64) { + p.lock.RLock() + sumCost := p.sumCost + cost + p.lock.RUnlock() + if sumCost < testDistBufLimit { + return 0, float64(testDistBufLimit-sumCost) / float64(testDistBufLimit) + } else { + return time.Duration(sumCost - testDistBufLimit), 0 + } +} + +func (p *testDistPeer) canQueue() bool { + return true +} + +func (p *testDistPeer) queueSend(f func()) { + f() +} + +func TestRequestDistributor(t *testing.T) { + testRequestDistributor(t, false) +} + +func TestRequestDistributorResend(t *testing.T) { + testRequestDistributor(t, true) +} + +func testRequestDistributor(t *testing.T, resend bool) { + stop := make(chan struct{}) + defer close(stop) + + var peers [testDistPeerCount]*testDistPeer + for i, _ := range peers { + peers[i] = &testDistPeer{} + go peers[i].worker(t, !resend, stop) + } + + dist := newRequestDistributor(func() map[distPeer]struct{} { + m := make(map[distPeer]struct{}) + for _, peer := range peers { + m[peer] = struct{}{} + } + return m + }, stop) + + var wg sync.WaitGroup + + for i := 1; i <= testDistReqCount; i++ { + cost := uint64(rand.Int63n(testDistMaxCost)) + procTime := uint64(rand.Int63n(int64(cost + 1))) + rq := &testDistReq{ + cost: cost, + procTime: procTime, + order: uint64(i), + canSendTo: make(map[*testDistPeer]struct{}), + } + for _, peer := range peers { + if rand.Intn(2) != 0 { + rq.canSendTo[peer] = struct{}{} + } + } + + wg.Add(1) + req := &distReq{ + getCost: rq.getCost, + canSend: rq.canSend, + request: rq.request, + } + chn := dist.queue(req) + go func() { + cnt := 1 + if resend && len(rq.canSendTo) != 0 { + cnt = rand.Intn(testDistMaxResendCount) + 1 + } + for i := 0; i < cnt; i++ { + if i != 0 { + chn = dist.queue(req) + } + p := <-chn + if p == nil { + if len(rq.canSendTo) != 0 { + t.Errorf("Request that could have been sent was dropped") + } + } else { + peer := p.(*testDistPeer) + if _, ok := rq.canSendTo[peer]; !ok { + t.Errorf("Request sent to wrong peer") + } + } + } + wg.Done() + }() + if rand.Intn(1000) == 0 { + time.Sleep(time.Duration(rand.Intn(5000000))) + } + } + + wg.Wait() +} -- cgit v1.2.3