From 4800c94392e814a2cb9d343aab4706be0cd0851d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Wed, 6 May 2015 15:32:53 +0300 Subject: eth/downloader: prioritize block fetch based on chain position, cap memory use --- Godeps/Godeps.json | 4 + .../cookiejar.v2/collections/prque/example_test.go | 44 +++++++++ .../cookiejar.v2/collections/prque/prque.go | 75 ++++++++++++++ .../cookiejar.v2/collections/prque/prque_test.go | 110 +++++++++++++++++++++ .../cookiejar.v2/collections/prque/sstack.go | 103 +++++++++++++++++++ .../cookiejar.v2/collections/prque/sstack_test.go | 93 +++++++++++++++++ 6 files changed, 429 insertions(+) create mode 100644 Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/example_test.go create mode 100644 Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque.go create mode 100644 Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque_test.go create mode 100644 Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack.go create mode 100644 Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack_test.go (limited to 'Godeps') diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 2480ff9a2..a5b27e76c 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -98,6 +98,10 @@ "Comment": "v0.1.0-3-g27c4092", "Rev": "27c40922c40b43fe04554d8223a402af3ea333f3" }, + { + "ImportPath": "gopkg.in/karalabe/cookiejar.v2/collections/prque", + "Rev": "cf5d8079df7c4501217638e1e3a6e43f94822548" + }, { "ImportPath": "gopkg.in/qml.v1/cdata", "Rev": "1116cb9cd8dee23f8d444ded354eb53122739f99" diff --git a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/example_test.go b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/example_test.go new file mode 100644 index 000000000..7b2e5ee84 --- /dev/null +++ b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/example_test.go @@ -0,0 +1,44 @@ +// CookieJar - A contestant's algorithm toolbox +// Copyright (c) 2013 Peter Szilagyi. All rights reserved. +// +// CookieJar is dual licensed: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later +// version. +// +// The toolbox is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// Alternatively, the CookieJar toolbox may be used in accordance with the terms +// and conditions contained in a signed written agreement between you and the +// author(s). + +package prque_test + +import ( + "fmt" + + "gopkg.in/karalabe/cookiejar.v2/collections/prque" +) + +// Insert some data into a priority queue and pop them out in prioritized order. +func Example_usage() { + // Define some data to push into the priority queue + prio := []float32{77.7, 22.2, 44.4, 55.5, 11.1, 88.8, 33.3, 99.9, 0.0, 66.6} + data := []string{"zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine"} + + // Create the priority queue and insert the prioritized data + pq := prque.New() + for i := 0; i < len(data); i++ { + pq.Push(data[i], prio[i]) + } + // Pop out the data and print them + for !pq.Empty() { + val, prio := pq.Pop() + fmt.Printf("%.1f:%s ", prio, val) + } + // Output: + // 99.9:seven 88.8:five 77.7:zero 66.6:nine 55.5:three 44.4:two 33.3:six 22.2:one 11.1:four 0.0:eight +} diff --git a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque.go b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque.go new file mode 100644 index 000000000..8225e8c53 --- /dev/null +++ b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque.go @@ -0,0 +1,75 @@ +// CookieJar - A contestant's algorithm toolbox +// Copyright (c) 2013 Peter Szilagyi. All rights reserved. +// +// CookieJar is dual licensed: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later +// version. +// +// The toolbox is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// Alternatively, the CookieJar toolbox may be used in accordance with the terms +// and conditions contained in a signed written agreement between you and the +// author(s). + +// Package prque implements a priority queue data structure supporting arbitrary +// value types and float priorities. +// +// The reasoning behind using floats for the priorities vs. ints or interfaces +// was larger flexibility without sacrificing too much performance or code +// complexity. +// +// If you would like to use a min-priority queue, simply negate the priorities. +// +// Internally the queue is based on the standard heap package working on a +// sortable version of the block based stack. +package prque + +import ( + "container/heap" +) + +// Priority queue data structure. +type Prque struct { + cont *sstack +} + +// Creates a new priority queue. +func New() *Prque { + return &Prque{newSstack()} +} + +// Pushes a value with a given priority into the queue, expanding if necessary. +func (p *Prque) Push(data interface{}, priority float32) { + heap.Push(p.cont, &item{data, priority}) +} + +// Pops the value with the greates priority off the stack and returns it. +// Currently no shrinking is done. +func (p *Prque) Pop() (interface{}, float32) { + item := heap.Pop(p.cont).(*item) + return item.value, item.priority +} + +// Pops only the item from the queue, dropping the associated priority value. +func (p *Prque) PopItem() interface{} { + return heap.Pop(p.cont).(*item).value +} + +// Checks whether the priority queue is empty. +func (p *Prque) Empty() bool { + return p.cont.Len() == 0 +} + +// Returns the number of element in the priority queue. +func (p *Prque) Size() int { + return p.cont.Len() +} + +// Clears the contents of the priority queue. +func (p *Prque) Reset() { + p.cont.Reset() +} diff --git a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque_test.go b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque_test.go new file mode 100644 index 000000000..811c53c73 --- /dev/null +++ b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque_test.go @@ -0,0 +1,110 @@ +// CookieJar - A contestant's algorithm toolbox +// Copyright (c) 2013 Peter Szilagyi. All rights reserved. +// +// CookieJar is dual licensed: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later +// version. +// +// The toolbox is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// Alternatively, the CookieJar toolbox may be used in accordance with the terms +// and conditions contained in a signed written agreement between you and the +// author(s). + +package prque + +import ( + "math/rand" + "testing" +) + +func TestPrque(t *testing.T) { + // Generate a batch of random data and a specific priority order + size := 16 * blockSize + prio := rand.Perm(size) + data := make([]int, size) + for i := 0; i < size; i++ { + data[i] = rand.Int() + } + queue := New() + for rep := 0; rep < 2; rep++ { + // Fill a priority queue with the above data + for i := 0; i < size; i++ { + queue.Push(data[i], float32(prio[i])) + if queue.Size() != i+1 { + t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1) + } + } + // Create a map the values to the priorities for easier verification + dict := make(map[float32]int) + for i := 0; i < size; i++ { + dict[float32(prio[i])] = data[i] + } + // Pop out the elements in priority order and verify them + prevPrio := float32(size + 1) + for !queue.Empty() { + val, prio := queue.Pop() + if prio > prevPrio { + t.Errorf("invalid priority order: %v after %v.", prio, prevPrio) + } + prevPrio = prio + if val != dict[prio] { + t.Errorf("push/pop mismatch: have %v, want %v.", val, dict[prio]) + } + delete(dict, prio) + } + } +} + +func TestReset(t *testing.T) { + // Fill the queue with some random data + size := 16 * blockSize + queue := New() + for i := 0; i < size; i++ { + queue.Push(rand.Int(), rand.Float32()) + } + // Reset and ensure it's empty + queue.Reset() + if !queue.Empty() { + t.Errorf("priority queue not empty after reset: %v", queue) + } +} + +func BenchmarkPush(b *testing.B) { + // Create some initial data + data := make([]int, b.N) + prio := make([]float32, b.N) + for i := 0; i < len(data); i++ { + data[i] = rand.Int() + prio[i] = rand.Float32() + } + // Execute the benchmark + b.ResetTimer() + queue := New() + for i := 0; i < len(data); i++ { + queue.Push(data[i], prio[i]) + } +} + +func BenchmarkPop(b *testing.B) { + // Create some initial data + data := make([]int, b.N) + prio := make([]float32, b.N) + for i := 0; i < len(data); i++ { + data[i] = rand.Int() + prio[i] = rand.Float32() + } + queue := New() + for i := 0; i < len(data); i++ { + queue.Push(data[i], prio[i]) + } + // Execute the benchmark + b.ResetTimer() + for !queue.Empty() { + queue.Pop() + } +} diff --git a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack.go b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack.go new file mode 100644 index 000000000..55375a091 --- /dev/null +++ b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack.go @@ -0,0 +1,103 @@ +// CookieJar - A contestant's algorithm toolbox +// Copyright (c) 2013 Peter Szilagyi. All rights reserved. +// +// CookieJar is dual licensed: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later +// version. +// +// The toolbox is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// Alternatively, the CookieJar toolbox may be used in accordance with the terms +// and conditions contained in a signed written agreement between you and the +// author(s). + +package prque + +// The size of a block of data +const blockSize = 4096 + +// A prioritized item in the sorted stack. +type item struct { + value interface{} + priority float32 +} + +// Internal sortable stack data structure. Implements the Push and Pop ops for +// the stack (heap) functionality and the Len, Less and Swap methods for the +// sortability requirements of the heaps. +type sstack struct { + size int + capacity int + offset int + + blocks [][]*item + active []*item +} + +// Creates a new, empty stack. +func newSstack() *sstack { + result := new(sstack) + result.active = make([]*item, blockSize) + result.blocks = [][]*item{result.active} + result.capacity = blockSize + return result +} + +// Pushes a value onto the stack, expanding it if necessary. Required by +// heap.Interface. +func (s *sstack) Push(data interface{}) { + if s.size == s.capacity { + s.active = make([]*item, blockSize) + s.blocks = append(s.blocks, s.active) + s.capacity += blockSize + s.offset = 0 + } else if s.offset == blockSize { + s.active = s.blocks[s.size/blockSize] + s.offset = 0 + } + s.active[s.offset] = data.(*item) + s.offset++ + s.size++ +} + +// Pops a value off the stack and returns it. Currently no shrinking is done. +// Required by heap.Interface. +func (s *sstack) Pop() (res interface{}) { + s.size-- + s.offset-- + if s.offset < 0 { + s.offset = blockSize - 1 + s.active = s.blocks[s.size/blockSize] + } + res, s.active[s.offset] = s.active[s.offset], nil + return +} + +// Returns the length of the stack. Required by sort.Interface. +func (s *sstack) Len() int { + return s.size +} + +// Compares the priority of two elements of the stack (higher is first). +// Required by sort.Interface. +func (s *sstack) Less(i, j int) bool { + return s.blocks[i/blockSize][i%blockSize].priority > s.blocks[j/blockSize][j%blockSize].priority +} + +// Swapts two elements in the stack. Required by sort.Interface. +func (s *sstack) Swap(i, j int) { + ib, io, jb, jo := i/blockSize, i%blockSize, j/blockSize, j%blockSize + s.blocks[ib][io], s.blocks[jb][jo] = s.blocks[jb][jo], s.blocks[ib][io] +} + +// Resets the stack, effectively clearing its contents. +func (s *sstack) Reset() { + s.size = 0 + s.offset = 0 + s.active = s.blocks[0] + s.capacity = blockSize +} diff --git a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack_test.go b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack_test.go new file mode 100644 index 000000000..7bdc08bf5 --- /dev/null +++ b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack_test.go @@ -0,0 +1,93 @@ +// CookieJar - A contestant's algorithm toolbox +// Copyright (c) 2013 Peter Szilagyi. All rights reserved. +// +// CookieJar is dual licensed: you can redistribute it and/or modify it under +// the terms of the GNU General Public License as published by the Free Software +// Foundation, either version 3 of the License, or (at your option) any later +// version. +// +// The toolbox is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// Alternatively, the CookieJar toolbox may be used in accordance with the terms +// and conditions contained in a signed written agreement between you and the +// author(s). + +package prque + +import ( + "math/rand" + "sort" + "testing" +) + +func TestSstack(t *testing.T) { + // Create some initial data + size := 16 * blockSize + data := make([]*item, size) + for i := 0; i < size; i++ { + data[i] = &item{rand.Int(), rand.Float32()} + } + stack := newSstack() + for rep := 0; rep < 2; rep++ { + // Push all the data into the stack, pop out every second + secs := []*item{} + for i := 0; i < size; i++ { + stack.Push(data[i]) + if i%2 == 0 { + secs = append(secs, stack.Pop().(*item)) + } + } + rest := []*item{} + for stack.Len() > 0 { + rest = append(rest, stack.Pop().(*item)) + } + // Make sure the contents of the resulting slices are ok + for i := 0; i < size; i++ { + if i%2 == 0 && data[i] != secs[i/2] { + t.Errorf("push/pop mismatch: have %v, want %v.", secs[i/2], data[i]) + } + if i%2 == 1 && data[i] != rest[len(rest)-i/2-1] { + t.Errorf("push/pop mismatch: have %v, want %v.", rest[len(rest)-i/2-1], data[i]) + } + } + } +} + +func TestSstackSort(t *testing.T) { + // Create some initial data + size := 16 * blockSize + data := make([]*item, size) + for i := 0; i < size; i++ { + data[i] = &item{rand.Int(), float32(i)} + } + // Push all the data into the stack + stack := newSstack() + for _, val := range data { + stack.Push(val) + } + // Sort and pop the stack contents (should reverse the order) + sort.Sort(stack) + for _, val := range data { + out := stack.Pop() + if out != val { + t.Errorf("push/pop mismatch after sort: have %v, want %v.", out, val) + } + } +} + +func TestSstackReset(t *testing.T) { + // Push some stuff onto the stack + size := 16 * blockSize + stack := newSstack() + for i := 0; i < size; i++ { + stack.Push(&item{i, float32(i)}) + } + // Clear and verify + stack.Reset() + if stack.Len() != 0 { + t.Errorf("stack not empty after reset: %v", stack) + } +} -- cgit v1.2.3 From 43901c92825389b694fb5488c520cf5122f022de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Thu, 7 May 2015 14:40:50 +0300 Subject: eth/downloader: fix priority queue reset, add throttling test --- Godeps/Godeps.json | 2 +- .../cookiejar.v2/collections/prque/prque.go | 2 +- .../cookiejar.v2/collections/prque/prque_test.go | 43 ++++++++++++++++++---- .../cookiejar.v2/collections/prque/sstack.go | 7 +--- .../cookiejar.v2/collections/prque/sstack_test.go | 30 +++++++++++---- 5 files changed, 63 insertions(+), 21 deletions(-) (limited to 'Godeps') diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index a5b27e76c..012475c08 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -100,7 +100,7 @@ }, { "ImportPath": "gopkg.in/karalabe/cookiejar.v2/collections/prque", - "Rev": "cf5d8079df7c4501217638e1e3a6e43f94822548" + "Rev": "0b2e270613f5d7ba262a5749b9e32270131497a2" }, { "ImportPath": "gopkg.in/qml.v1/cdata", diff --git a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque.go b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque.go index 8225e8c53..a1009f3be 100644 --- a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque.go +++ b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque.go @@ -71,5 +71,5 @@ func (p *Prque) Size() int { // Clears the contents of the priority queue. func (p *Prque) Reset() { - p.cont.Reset() + *p = *New() } diff --git a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque_test.go b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque_test.go index 811c53c73..daba691e1 100644 --- a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque_test.go +++ b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque_test.go @@ -61,16 +61,45 @@ func TestPrque(t *testing.T) { } func TestReset(t *testing.T) { - // Fill the queue with some random data + // Generate a batch of random data and a specific priority order size := 16 * blockSize - queue := New() + prio := rand.Perm(size) + data := make([]int, size) for i := 0; i < size; i++ { - queue.Push(rand.Int(), rand.Float32()) + data[i] = rand.Int() } - // Reset and ensure it's empty - queue.Reset() - if !queue.Empty() { - t.Errorf("priority queue not empty after reset: %v", queue) + queue := New() + for rep := 0; rep < 2; rep++ { + // Fill a priority queue with the above data + for i := 0; i < size; i++ { + queue.Push(data[i], float32(prio[i])) + if queue.Size() != i+1 { + t.Errorf("queue size mismatch: have %v, want %v.", queue.Size(), i+1) + } + } + // Create a map the values to the priorities for easier verification + dict := make(map[float32]int) + for i := 0; i < size; i++ { + dict[float32(prio[i])] = data[i] + } + // Pop out half the elements in priority order and verify them + prevPrio := float32(size + 1) + for i := 0; i < size/2; i++ { + val, prio := queue.Pop() + if prio > prevPrio { + t.Errorf("invalid priority order: %v after %v.", prio, prevPrio) + } + prevPrio = prio + if val != dict[prio] { + t.Errorf("push/pop mismatch: have %v, want %v.", val, dict[prio]) + } + delete(dict, prio) + } + // Reset and ensure it's empty + queue.Reset() + if !queue.Empty() { + t.Errorf("priority queue not empty after reset: %v", queue) + } } } diff --git a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack.go b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack.go index 55375a091..c11347f9d 100644 --- a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack.go +++ b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack.go @@ -88,7 +88,7 @@ func (s *sstack) Less(i, j int) bool { return s.blocks[i/blockSize][i%blockSize].priority > s.blocks[j/blockSize][j%blockSize].priority } -// Swapts two elements in the stack. Required by sort.Interface. +// Swaps two elements in the stack. Required by sort.Interface. func (s *sstack) Swap(i, j int) { ib, io, jb, jo := i/blockSize, i%blockSize, j/blockSize, j%blockSize s.blocks[ib][io], s.blocks[jb][jo] = s.blocks[jb][jo], s.blocks[ib][io] @@ -96,8 +96,5 @@ func (s *sstack) Swap(i, j int) { // Resets the stack, effectively clearing its contents. func (s *sstack) Reset() { - s.size = 0 - s.offset = 0 - s.active = s.blocks[0] - s.capacity = blockSize + *s = *newSstack() } diff --git a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack_test.go b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack_test.go index 7bdc08bf5..bcb5b830b 100644 --- a/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack_test.go +++ b/Godeps/_workspace/src/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack_test.go @@ -79,15 +79,31 @@ func TestSstackSort(t *testing.T) { } func TestSstackReset(t *testing.T) { - // Push some stuff onto the stack + // Create some initial data size := 16 * blockSize - stack := newSstack() + data := make([]*item, size) for i := 0; i < size; i++ { - stack.Push(&item{i, float32(i)}) + data[i] = &item{rand.Int(), rand.Float32()} } - // Clear and verify - stack.Reset() - if stack.Len() != 0 { - t.Errorf("stack not empty after reset: %v", stack) + stack := newSstack() + for rep := 0; rep < 2; rep++ { + // Push all the data into the stack, pop out every second + secs := []*item{} + for i := 0; i < size; i++ { + stack.Push(data[i]) + if i%2 == 0 { + secs = append(secs, stack.Pop().(*item)) + } + } + // Reset and verify both pulled and stack contents + stack.Reset() + if stack.Len() != 0 { + t.Errorf("stack not empty after reset: %v", stack) + } + for i := 0; i < size; i++ { + if i%2 == 0 && data[i] != secs[i/2] { + t.Errorf("push/pop mismatch: have %v, want %v.", secs[i/2], data[i]) + } + } } } -- cgit v1.2.3