aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--cmd/geth/main.go4
-rw-r--r--eth/downloader/peer.go45
-rw-r--r--eth/downloader/queue.go12
-rw-r--r--jsre/jsre_test.go2
-rw-r--r--tests/init.go7
-rw-r--r--whisper/whisper_test.go13
6 files changed, 65 insertions, 18 deletions
diff --git a/cmd/geth/main.go b/cmd/geth/main.go
index e74ddd0d0..0fb654eed 100644
--- a/cmd/geth/main.go
+++ b/cmd/geth/main.go
@@ -542,10 +542,10 @@ func unlockAccount(ctx *cli.Context, am *accounts.Manager, addr string, i int, i
func blockRecovery(ctx *cli.Context) {
utils.CheckLegalese(utils.MustDataDir(ctx))
- arg := ctx.Args().First()
- if len(ctx.Args()) < 1 && len(arg) > 0 {
+ if len(ctx.Args()) < 1 {
glog.Fatal("recover requires block number or hash")
}
+ arg := ctx.Args().First()
cfg := utils.MakeEthConfig(ClientIdentifier, nodeNameVersion, ctx)
utils.CheckLegalese(cfg.DataDir)
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 1f457cb15..9ba6dabbd 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -28,9 +28,11 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
- "gopkg.in/fatih/set.v0"
)
+// Maximum number of entries allowed on the list or lacking items.
+const maxLackingHashes = 4096
+
// Hash and block fetchers belonging to eth/61 and below
type relativeHashFetcherFn func(common.Hash) error
type absoluteHashFetcherFn func(uint64, int) error
@@ -67,7 +69,8 @@ type peer struct {
receiptStarted time.Time // Time instance when the last receipt fetch was started
stateStarted time.Time // Time instance when the last node data fetch was started
- ignored *set.Set // Set of hashes not to request (didn't have previously)
+ lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
+ lackingLock sync.RWMutex // Lock protecting the lacking hashes list
getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash
getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position
@@ -95,7 +98,7 @@ func newPeer(id string, version int, head common.Hash,
blockCapacity: 1,
receiptCapacity: 1,
stateCapacity: 1,
- ignored: set.New(),
+ lacking: make(map[common.Hash]struct{}),
getRelHashes: getRelHashes,
getAbsHashes: getAbsHashes,
@@ -119,7 +122,10 @@ func (p *peer) Reset() {
atomic.StoreInt32(&p.blockCapacity, 1)
atomic.StoreInt32(&p.receiptCapacity, 1)
atomic.StoreInt32(&p.stateCapacity, 1)
- p.ignored.Clear()
+
+ p.lackingLock.Lock()
+ p.lacking = make(map[common.Hash]struct{})
+ p.lackingLock.Unlock()
}
// Fetch61 sends a block retrieval request to the remote peer.
@@ -305,13 +311,42 @@ func (p *peer) Demote() {
}
}
+// MarkLacking appends a new entity to the set of items (blocks, receipts, states)
+// that a peer is known not to have (i.e. have been requested before). If the
+// set reaches its maximum allowed capacity, items are randomly dropped off.
+func (p *peer) MarkLacking(hash common.Hash) {
+ p.lackingLock.Lock()
+ defer p.lackingLock.Unlock()
+
+ for len(p.lacking) >= maxLackingHashes {
+ for drop, _ := range p.lacking {
+ delete(p.lacking, drop)
+ break
+ }
+ }
+ p.lacking[hash] = struct{}{}
+}
+
+// Lacks retrieves whether the hash of a blockchain item is on the peers lacking
+// list (i.e. whether we know that the peer does not have it).
+func (p *peer) Lacks(hash common.Hash) bool {
+ p.lackingLock.RLock()
+ defer p.lackingLock.RUnlock()
+
+ _, ok := p.lacking[hash]
+ return ok
+}
+
// String implements fmt.Stringer.
func (p *peer) String() string {
+ p.lackingLock.RLock()
+ defer p.lackingLock.RUnlock()
+
return fmt.Sprintf("Peer %s [%s]", p.id,
fmt.Sprintf("reputation %3d, ", atomic.LoadInt32(&p.rep))+
fmt.Sprintf("block cap %3d, ", atomic.LoadInt32(&p.blockCapacity))+
fmt.Sprintf("receipt cap %3d, ", atomic.LoadInt32(&p.receiptCapacity))+
- fmt.Sprintf("ignored %4d", p.ignored.Size()),
+ fmt.Sprintf("lacking %4d", len(p.lacking)),
)
}
diff --git a/eth/downloader/queue.go b/eth/downloader/queue.go
index 56b46e285..1fb5b6e12 100644
--- a/eth/downloader/queue.go
+++ b/eth/downloader/queue.go
@@ -501,7 +501,7 @@ func (q *queue) reserveHashes(p *peer, count int, taskQueue *prque.Prque, taskGe
for proc := 0; (allowance == 0 || proc < allowance) && len(send) < count && !taskQueue.Empty(); proc++ {
hash, priority := taskQueue.Pop()
- if p.ignored.Has(hash) {
+ if p.Lacks(hash.(common.Hash)) {
skip[hash.(common.Hash)] = int(priority)
} else {
send[hash.(common.Hash)] = int(priority)
@@ -607,7 +607,7 @@ func (q *queue) reserveHeaders(p *peer, count int, taskPool map[common.Hash]*typ
continue
}
// Otherwise unless the peer is known not to have the data, add to the retrieve list
- if p.ignored.Has(header.Hash()) {
+ if p.Lacks(header.Hash()) {
skip = append(skip, header)
} else {
send = append(send, header)
@@ -781,7 +781,7 @@ func (q *queue) DeliverBlocks(id string, blocks []*types.Block) error {
// If no blocks were retrieved, mark them as unavailable for the origin peer
if len(blocks) == 0 {
for hash, _ := range request.Hashes {
- request.Peer.ignored.Add(hash)
+ request.Peer.MarkLacking(hash)
}
}
// Iterate over the downloaded blocks and add each of them
@@ -877,8 +877,8 @@ func (q *queue) deliver(id string, taskPool map[common.Hash]*types.Header, taskQ
// If no data items were retrieved, mark them as unavailable for the origin peer
if results == 0 {
- for hash, _ := range request.Headers {
- request.Peer.ignored.Add(hash)
+ for _, header := range request.Headers {
+ request.Peer.MarkLacking(header.Hash())
}
}
// Assemble each of the results with their headers and retrieved data parts
@@ -944,7 +944,7 @@ func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, i
// If no data was retrieved, mark their hashes as unavailable for the origin peer
if len(data) == 0 {
for hash, _ := range request.Hashes {
- request.Peer.ignored.Add(hash)
+ request.Peer.MarkLacking(hash)
}
}
// Iterate over the downloaded data and verify each of them
diff --git a/jsre/jsre_test.go b/jsre/jsre_test.go
index 8450f546c..ffb6999db 100644
--- a/jsre/jsre_test.go
+++ b/jsre/jsre_test.go
@@ -85,7 +85,7 @@ func TestNatto(t *testing.T) {
if err != nil {
t.Errorf("expected no error, got %v", err)
}
- time.Sleep(time.Millisecond * 10)
+ time.Sleep(100 * time.Millisecond)
val, err := jsre.Run("msg")
if err != nil {
t.Errorf("expected no error, got %v", err)
diff --git a/tests/init.go b/tests/init.go
index 3f8b8c684..a86970499 100644
--- a/tests/init.go
+++ b/tests/init.go
@@ -56,13 +56,16 @@ var (
VmSkipTests = []string{}
)
+// Disable reporting bad blocks for the tests
+func init() {
+ core.DisableBadBlockReporting = true
+}
+
func readJson(reader io.Reader, value interface{}) error {
data, err := ioutil.ReadAll(reader)
if err != nil {
return fmt.Errorf("Error reading JSON file", err.Error())
}
-
- core.DisableBadBlockReporting = true
if err = json.Unmarshal(data, &value); err != nil {
if syntaxerr, ok := err.(*json.SyntaxError); ok {
line := findLine(data, syntaxerr.Offset)
diff --git a/whisper/whisper_test.go b/whisper/whisper_test.go
index b5a919984..1a9a8667a 100644
--- a/whisper/whisper_test.go
+++ b/whisper/whisper_test.go
@@ -189,13 +189,22 @@ func TestMessageExpiration(t *testing.T) {
t.Fatalf("failed to inject message: %v", err)
}
// Check that the message is inside the cache
- if _, ok := node.messages[envelope.Hash()]; !ok {
+ node.poolMu.RLock()
+ _, found := node.messages[envelope.Hash()]
+ node.poolMu.RUnlock()
+
+ if !found {
t.Fatalf("message not found in cache")
}
// Wait for expiration and check cache again
time.Sleep(time.Second) // wait for expiration
time.Sleep(expirationCycle) // wait for cleanup cycle
- if _, ok := node.messages[envelope.Hash()]; ok {
+
+ node.poolMu.RLock()
+ _, found = node.messages[envelope.Hash()]
+ node.poolMu.RUnlock()
+
+ if found {
t.Fatalf("message not expired from cache")
}
}