diff options
author | ethersphere <thesw@rm.eth> | 2018-06-20 20:06:27 +0800 |
---|---|---|
committer | ethersphere <thesw@rm.eth> | 2018-06-22 03:10:31 +0800 |
commit | e187711c6545487d4cac3701f0f506bb536234e2 (patch) | |
tree | d2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/network/priorityqueue | |
parent | 574378edb50c907b532946a1d4654dbd6701b20a (diff) | |
download | go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2 go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.zip |
swarm: network rewrite merge
Diffstat (limited to 'swarm/network/priorityqueue')
-rw-r--r-- | swarm/network/priorityqueue/priorityqueue.go | 111 | ||||
-rw-r--r-- | swarm/network/priorityqueue/priorityqueue_test.go | 97 |
2 files changed, 208 insertions, 0 deletions
diff --git a/swarm/network/priorityqueue/priorityqueue.go b/swarm/network/priorityqueue/priorityqueue.go new file mode 100644 index 000000000..fab638c9e --- /dev/null +++ b/swarm/network/priorityqueue/priorityqueue.go @@ -0,0 +1,111 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +// package priority_queue implement a channel based priority queue +// over arbitrary types. It provides an +// an autopop loop applying a function to the items always respecting +// their priority. The structure is only quasi consistent ie., if a lower +// priority item is autopopped, it is guaranteed that there was a point +// when no higher priority item was present, ie. it is not guaranteed +// that there was any point where the lower priority item was present +// but the higher was not + +package priorityqueue + +import ( + "context" + "errors" +) + +var ( + errContention = errors.New("queue contention") + errBadPriority = errors.New("bad priority") + + wakey = struct{}{} +) + +// PriorityQueue is the basic structure +type PriorityQueue struct { + queues []chan interface{} + wakeup chan struct{} +} + +// New is the constructor for PriorityQueue +func New(n int, l int) *PriorityQueue { + var queues = make([]chan interface{}, n) + for i := range queues { + queues[i] = make(chan interface{}, l) + } + return &PriorityQueue{ + queues: queues, + wakeup: make(chan struct{}, 1), + } +} + +// Run is a forever loop popping items from the queues +func (pq *PriorityQueue) Run(ctx context.Context, f func(interface{})) { + top := len(pq.queues) - 1 + p := top +READ: + for { + q := pq.queues[p] + select { + case <-ctx.Done(): + return + case x := <-q: + f(x) + p = top + default: + if p > 0 { + p-- + continue READ + } + p = top + select { + case <-ctx.Done(): + return + case <-pq.wakeup: + } + } + } +} + +// Push pushes an item to the appropriate queue specified in the priority argument +// if context is given it waits until either the item is pushed or the Context aborts +// otherwise returns errContention if the queue is full +func (pq *PriorityQueue) Push(ctx context.Context, x interface{}, p int) error { + if p < 0 || p >= len(pq.queues) { + return errBadPriority + } + if ctx == nil { + select { + case pq.queues[p] <- x: + default: + return errContention + } + } else { + select { + case pq.queues[p] <- x: + case <-ctx.Done(): + return ctx.Err() + } + } + select { + case pq.wakeup <- wakey: + default: + } + return nil +} diff --git a/swarm/network/priorityqueue/priorityqueue_test.go b/swarm/network/priorityqueue/priorityqueue_test.go new file mode 100644 index 000000000..cd54250f8 --- /dev/null +++ b/swarm/network/priorityqueue/priorityqueue_test.go @@ -0,0 +1,97 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. +package priorityqueue + +import ( + "context" + "sync" + "testing" +) + +func TestPriorityQueue(t *testing.T) { + var results []string + wg := sync.WaitGroup{} + pq := New(3, 2) + wg.Add(1) + go pq.Run(context.Background(), func(v interface{}) { + results = append(results, v.(string)) + wg.Done() + }) + pq.Push(context.Background(), "2.0", 2) + wg.Wait() + if results[0] != "2.0" { + t.Errorf("expected first result %q, got %q", "2.0", results[0]) + } + +Loop: + for i, tc := range []struct { + priorities []int + values []string + results []string + errors []error + }{ + { + priorities: []int{0}, + values: []string{""}, + results: []string{""}, + }, + { + priorities: []int{0, 1}, + values: []string{"0.0", "1.0"}, + results: []string{"1.0", "0.0"}, + }, + { + priorities: []int{1, 0}, + values: []string{"1.0", "0.0"}, + results: []string{"1.0", "0.0"}, + }, + { + priorities: []int{0, 1, 1}, + values: []string{"0.0", "1.0", "1.1"}, + results: []string{"1.0", "1.1", "0.0"}, + }, + { + priorities: []int{0, 0, 0}, + values: []string{"0.0", "0.0", "0.1"}, + errors: []error{nil, nil, errContention}, + }, + } { + var results []string + wg := sync.WaitGroup{} + pq := New(3, 2) + wg.Add(len(tc.values)) + for j, value := range tc.values { + err := pq.Push(nil, value, tc.priorities[j]) + if tc.errors != nil && err != tc.errors[j] { + t.Errorf("expected push error %v, got %v", tc.errors[j], err) + continue Loop + } + if err != nil { + continue Loop + } + } + go pq.Run(context.Background(), func(v interface{}) { + results = append(results, v.(string)) + wg.Done() + }) + wg.Wait() + for k, result := range tc.results { + if results[k] != result { + t.Errorf("test case %v: expected %v element %q, got %q", i, k, result, results[k]) + } + } + } +} |