diff options
author | ethersphere <thesw@rm.eth> | 2018-06-20 20:06:27 +0800 |
---|---|---|
committer | ethersphere <thesw@rm.eth> | 2018-06-22 03:10:31 +0800 |
commit | e187711c6545487d4cac3701f0f506bb536234e2 (patch) | |
tree | d2f6150f70b84b36e49a449082aeda267b4b9046 /swarm/network/stream/intervals | |
parent | 574378edb50c907b532946a1d4654dbd6701b20a (diff) | |
download | go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.gz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.bz2 go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.lz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.xz go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.tar.zst go-tangerine-e187711c6545487d4cac3701f0f506bb536234e2.zip |
swarm: network rewrite merge
Diffstat (limited to 'swarm/network/stream/intervals')
-rw-r--r-- | swarm/network/stream/intervals/dbstore_test.go | 42 | ||||
-rw-r--r-- | swarm/network/stream/intervals/intervals.go | 206 | ||||
-rw-r--r-- | swarm/network/stream/intervals/intervals_test.go | 395 | ||||
-rw-r--r-- | swarm/network/stream/intervals/store_test.go | 80 |
4 files changed, 723 insertions, 0 deletions
diff --git a/swarm/network/stream/intervals/dbstore_test.go b/swarm/network/stream/intervals/dbstore_test.go new file mode 100644 index 000000000..6716e593a --- /dev/null +++ b/swarm/network/stream/intervals/dbstore_test.go @@ -0,0 +1,42 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package intervals + +import ( + "io/ioutil" + "os" + "testing" + + "github.com/ethereum/go-ethereum/swarm/state" +) + +// TestDBStore tests basic functionality of DBStore. +func TestDBStore(t *testing.T) { + dir, err := ioutil.TempDir("", "intervals_test_db_store") + if err != nil { + panic(err) + } + defer os.RemoveAll(dir) + + store, err := state.NewDBStore(dir) + if err != nil { + t.Fatal(err) + } + defer store.Close() + + testStore(t, store) +} diff --git a/swarm/network/stream/intervals/intervals.go b/swarm/network/stream/intervals/intervals.go new file mode 100644 index 000000000..5fd820da8 --- /dev/null +++ b/swarm/network/stream/intervals/intervals.go @@ -0,0 +1,206 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package intervals + +import ( + "bytes" + "fmt" + "strconv" + "sync" +) + +// Intervals store a list of intervals. Its purpose is to provide +// methods to add new intervals and retrieve missing intervals that +// need to be added. +// It may be used in synchronization of streaming data to persist +// retrieved data ranges between sessions. +type Intervals struct { + start uint64 + ranges [][2]uint64 + mu sync.RWMutex +} + +// New creates a new instance of Intervals. +// Start argument limits the lower bound of intervals. +// No range bellow start bound will be added by Add method or +// returned by Next method. This limit may be used for +// tracking "live" synchronization, where the sync session +// starts from a specific value, and if "live" sync intervals +// need to be merged with historical ones, it can be safely done. +func NewIntervals(start uint64) *Intervals { + return &Intervals{ + start: start, + } +} + +// Add adds a new range to intervals. Range start and end are values +// are both inclusive. +func (i *Intervals) Add(start, end uint64) { + i.mu.Lock() + defer i.mu.Unlock() + + i.add(start, end) +} + +func (i *Intervals) add(start, end uint64) { + if start < i.start { + start = i.start + } + if end < i.start { + return + } + minStartJ := -1 + maxEndJ := -1 + j := 0 + for ; j < len(i.ranges); j++ { + if minStartJ < 0 { + if (start <= i.ranges[j][0] && end+1 >= i.ranges[j][0]) || (start <= i.ranges[j][1]+1 && end+1 >= i.ranges[j][1]) { + if i.ranges[j][0] < start { + start = i.ranges[j][0] + } + minStartJ = j + } + } + if (start <= i.ranges[j][1] && end+1 >= i.ranges[j][1]) || (start <= i.ranges[j][0] && end+1 >= i.ranges[j][0]) { + if i.ranges[j][1] > end { + end = i.ranges[j][1] + } + maxEndJ = j + } + if end+1 <= i.ranges[j][0] { + break + } + } + if minStartJ < 0 && maxEndJ < 0 { + i.ranges = append(i.ranges[:j], append([][2]uint64{{start, end}}, i.ranges[j:]...)...) + return + } + if minStartJ >= 0 { + i.ranges[minStartJ][0] = start + } + if maxEndJ >= 0 { + i.ranges[maxEndJ][1] = end + } + if minStartJ >= 0 && maxEndJ >= 0 && minStartJ != maxEndJ { + i.ranges[maxEndJ][0] = start + i.ranges = append(i.ranges[:minStartJ], i.ranges[maxEndJ:]...) + } +} + +// Merge adds all the intervals from the the m Interval to current one. +func (i *Intervals) Merge(m *Intervals) { + m.mu.RLock() + defer m.mu.RUnlock() + i.mu.Lock() + defer i.mu.Unlock() + + for _, r := range m.ranges { + i.add(r[0], r[1]) + } +} + +// Next returns the first range interval that is not fulfilled. Returned +// start and end values are both inclusive, meaning that the whole range +// including start and end need to be added in order to full the gap +// in intervals. +// Returned value for end is 0 if the next interval is after the whole +// range that is stored in Intervals. Zero end value represents no limit +// on the next interval length. +func (i *Intervals) Next() (start, end uint64) { + i.mu.RLock() + defer i.mu.RUnlock() + + l := len(i.ranges) + if l == 0 { + return i.start, 0 + } + if i.ranges[0][0] != i.start { + return i.start, i.ranges[0][0] - 1 + } + if l == 1 { + return i.ranges[0][1] + 1, 0 + } + return i.ranges[0][1] + 1, i.ranges[1][0] - 1 +} + +// Last returns the value that is at the end of the last interval. +func (i *Intervals) Last() (end uint64) { + i.mu.RLock() + defer i.mu.RUnlock() + + l := len(i.ranges) + if l == 0 { + return 0 + } + return i.ranges[l-1][1] +} + +// String returns a descriptive representation of range intervals +// in [] notation, as a list of two element vectors. +func (i *Intervals) String() string { + return fmt.Sprint(i.ranges) +} + +// MarshalBinary encodes Intervals parameters into a semicolon separated list. +// The first element in the list is base36-encoded start value. The following +// elements are two base36-encoded value ranges separated by comma. +func (i *Intervals) MarshalBinary() (data []byte, err error) { + d := make([][]byte, len(i.ranges)+1) + d[0] = []byte(strconv.FormatUint(i.start, 36)) + for j := range i.ranges { + r := i.ranges[j] + d[j+1] = []byte(strconv.FormatUint(r[0], 36) + "," + strconv.FormatUint(r[1], 36)) + } + return bytes.Join(d, []byte(";")), nil +} + +// UnmarshalBinary decodes data according to the Intervals.MarshalBinary format. +func (i *Intervals) UnmarshalBinary(data []byte) (err error) { + d := bytes.Split(data, []byte(";")) + l := len(d) + if l == 0 { + return nil + } + if l >= 1 { + i.start, err = strconv.ParseUint(string(d[0]), 36, 64) + if err != nil { + return err + } + } + if l == 1 { + return nil + } + + i.ranges = make([][2]uint64, 0, l-1) + for j := 1; j < l; j++ { + r := bytes.SplitN(d[j], []byte(","), 2) + if len(r) < 2 { + return fmt.Errorf("range %d has less then 2 elements", j) + } + start, err := strconv.ParseUint(string(r[0]), 36, 64) + if err != nil { + return fmt.Errorf("parsing the first element in range %d: %v", j, err) + } + end, err := strconv.ParseUint(string(r[1]), 36, 64) + if err != nil { + return fmt.Errorf("parsing the second element in range %d: %v", j, err) + } + i.ranges = append(i.ranges, [2]uint64{start, end}) + } + + return nil +} diff --git a/swarm/network/stream/intervals/intervals_test.go b/swarm/network/stream/intervals/intervals_test.go new file mode 100644 index 000000000..b5212f0d9 --- /dev/null +++ b/swarm/network/stream/intervals/intervals_test.go @@ -0,0 +1,395 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package intervals + +import "testing" + +// Test tests Interval methods Add, Next and Last for various +// initial state. +func Test(t *testing.T) { + for i, tc := range []struct { + startLimit uint64 + initial [][2]uint64 + start uint64 + end uint64 + expected string + nextStart uint64 + nextEnd uint64 + last uint64 + }{ + { + initial: nil, + start: 0, + end: 0, + expected: "[[0 0]]", + nextStart: 1, + nextEnd: 0, + last: 0, + }, + { + initial: nil, + start: 0, + end: 10, + expected: "[[0 10]]", + nextStart: 11, + nextEnd: 0, + last: 10, + }, + { + initial: nil, + start: 5, + end: 15, + expected: "[[5 15]]", + nextStart: 0, + nextEnd: 4, + last: 15, + }, + { + initial: [][2]uint64{{0, 0}}, + start: 0, + end: 0, + expected: "[[0 0]]", + nextStart: 1, + nextEnd: 0, + last: 0, + }, + { + initial: [][2]uint64{{0, 0}}, + start: 5, + end: 15, + expected: "[[0 0] [5 15]]", + nextStart: 1, + nextEnd: 4, + last: 15, + }, + { + initial: [][2]uint64{{5, 15}}, + start: 5, + end: 15, + expected: "[[5 15]]", + nextStart: 0, + nextEnd: 4, + last: 15, + }, + { + initial: [][2]uint64{{5, 15}}, + start: 5, + end: 20, + expected: "[[5 20]]", + nextStart: 0, + nextEnd: 4, + last: 20, + }, + { + initial: [][2]uint64{{5, 15}}, + start: 10, + end: 20, + expected: "[[5 20]]", + nextStart: 0, + nextEnd: 4, + last: 20, + }, + { + initial: [][2]uint64{{5, 15}}, + start: 0, + end: 20, + expected: "[[0 20]]", + nextStart: 21, + nextEnd: 0, + last: 20, + }, + { + initial: [][2]uint64{{5, 15}}, + start: 2, + end: 10, + expected: "[[2 15]]", + nextStart: 0, + nextEnd: 1, + last: 15, + }, + { + initial: [][2]uint64{{5, 15}}, + start: 2, + end: 4, + expected: "[[2 15]]", + nextStart: 0, + nextEnd: 1, + last: 15, + }, + { + initial: [][2]uint64{{5, 15}}, + start: 2, + end: 5, + expected: "[[2 15]]", + nextStart: 0, + nextEnd: 1, + last: 15, + }, + { + initial: [][2]uint64{{5, 15}}, + start: 2, + end: 3, + expected: "[[2 3] [5 15]]", + nextStart: 0, + nextEnd: 1, + last: 15, + }, + { + initial: [][2]uint64{{5, 15}}, + start: 2, + end: 4, + expected: "[[2 15]]", + nextStart: 0, + nextEnd: 1, + last: 15, + }, + { + initial: [][2]uint64{{0, 1}, {5, 15}}, + start: 2, + end: 4, + expected: "[[0 15]]", + nextStart: 16, + nextEnd: 0, + last: 15, + }, + { + initial: [][2]uint64{{0, 5}, {15, 20}}, + start: 2, + end: 10, + expected: "[[0 10] [15 20]]", + nextStart: 11, + nextEnd: 14, + last: 20, + }, + { + initial: [][2]uint64{{0, 5}, {15, 20}}, + start: 8, + end: 18, + expected: "[[0 5] [8 20]]", + nextStart: 6, + nextEnd: 7, + last: 20, + }, + { + initial: [][2]uint64{{0, 5}, {15, 20}}, + start: 2, + end: 17, + expected: "[[0 20]]", + nextStart: 21, + nextEnd: 0, + last: 20, + }, + { + initial: [][2]uint64{{0, 5}, {15, 20}}, + start: 2, + end: 25, + expected: "[[0 25]]", + nextStart: 26, + nextEnd: 0, + last: 25, + }, + { + initial: [][2]uint64{{0, 5}, {15, 20}}, + start: 5, + end: 14, + expected: "[[0 20]]", + nextStart: 21, + nextEnd: 0, + last: 20, + }, + { + initial: [][2]uint64{{0, 5}, {15, 20}}, + start: 6, + end: 14, + expected: "[[0 20]]", + nextStart: 21, + nextEnd: 0, + last: 20, + }, + { + initial: [][2]uint64{{0, 5}, {15, 20}, {30, 40}}, + start: 6, + end: 29, + expected: "[[0 40]]", + nextStart: 41, + nextEnd: 0, + last: 40, + }, + { + initial: [][2]uint64{{0, 5}, {15, 20}, {30, 40}, {50, 60}}, + start: 3, + end: 55, + expected: "[[0 60]]", + nextStart: 61, + nextEnd: 0, + last: 60, + }, + { + initial: [][2]uint64{{0, 5}, {15, 20}, {30, 40}, {50, 60}}, + start: 21, + end: 49, + expected: "[[0 5] [15 60]]", + nextStart: 6, + nextEnd: 14, + last: 60, + }, + { + initial: [][2]uint64{{0, 5}, {15, 20}, {30, 40}, {50, 60}}, + start: 0, + end: 100, + expected: "[[0 100]]", + nextStart: 101, + nextEnd: 0, + last: 100, + }, + { + startLimit: 100, + initial: nil, + start: 0, + end: 0, + expected: "[]", + nextStart: 100, + nextEnd: 0, + last: 0, + }, + { + startLimit: 100, + initial: nil, + start: 20, + end: 30, + expected: "[]", + nextStart: 100, + nextEnd: 0, + last: 0, + }, + { + startLimit: 100, + initial: nil, + start: 50, + end: 100, + expected: "[[100 100]]", + nextStart: 101, + nextEnd: 0, + last: 100, + }, + { + startLimit: 100, + initial: nil, + start: 50, + end: 110, + expected: "[[100 110]]", + nextStart: 111, + nextEnd: 0, + last: 110, + }, + { + startLimit: 100, + initial: nil, + start: 120, + end: 130, + expected: "[[120 130]]", + nextStart: 100, + nextEnd: 119, + last: 130, + }, + { + startLimit: 100, + initial: nil, + start: 120, + end: 130, + expected: "[[120 130]]", + nextStart: 100, + nextEnd: 119, + last: 130, + }, + } { + intervals := NewIntervals(tc.startLimit) + intervals.ranges = tc.initial + intervals.Add(tc.start, tc.end) + got := intervals.String() + if got != tc.expected { + t.Errorf("interval #%d: expected %s, got %s", i, tc.expected, got) + } + nextStart, nextEnd := intervals.Next() + if nextStart != tc.nextStart { + t.Errorf("interval #%d, expected next start %d, got %d", i, tc.nextStart, nextStart) + } + if nextEnd != tc.nextEnd { + t.Errorf("interval #%d, expected next end %d, got %d", i, tc.nextEnd, nextEnd) + } + last := intervals.Last() + if last != tc.last { + t.Errorf("interval #%d, expected last %d, got %d", i, tc.last, last) + } + } +} + +func TestMerge(t *testing.T) { + for i, tc := range []struct { + initial [][2]uint64 + merge [][2]uint64 + expected string + }{ + { + initial: nil, + merge: nil, + expected: "[]", + }, + { + initial: [][2]uint64{{10, 20}}, + merge: nil, + expected: "[[10 20]]", + }, + { + initial: nil, + merge: [][2]uint64{{15, 25}}, + expected: "[[15 25]]", + }, + { + initial: [][2]uint64{{0, 100}}, + merge: [][2]uint64{{150, 250}}, + expected: "[[0 100] [150 250]]", + }, + { + initial: [][2]uint64{{0, 100}}, + merge: [][2]uint64{{101, 250}}, + expected: "[[0 250]]", + }, + { + initial: [][2]uint64{{0, 10}, {30, 40}}, + merge: [][2]uint64{{20, 25}, {41, 50}}, + expected: "[[0 10] [20 25] [30 50]]", + }, + { + initial: [][2]uint64{{0, 5}, {15, 20}, {30, 40}, {50, 60}}, + merge: [][2]uint64{{6, 25}}, + expected: "[[0 25] [30 40] [50 60]]", + }, + } { + intervals := NewIntervals(0) + intervals.ranges = tc.initial + m := NewIntervals(0) + m.ranges = tc.merge + + intervals.Merge(m) + + got := intervals.String() + if got != tc.expected { + t.Errorf("interval #%d: expected %s, got %s", i, tc.expected, got) + } + } +} diff --git a/swarm/network/stream/intervals/store_test.go b/swarm/network/stream/intervals/store_test.go new file mode 100644 index 000000000..0ab14c065 --- /dev/null +++ b/swarm/network/stream/intervals/store_test.go @@ -0,0 +1,80 @@ +// Copyright 2018 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>. + +package intervals + +import ( + "errors" + "testing" + + "github.com/ethereum/go-ethereum/swarm/state" +) + +var ErrNotFound = errors.New("not found") + +// TestInmemoryStore tests basic functionality of InmemoryStore. +func TestInmemoryStore(t *testing.T) { + testStore(t, state.NewInmemoryStore()) +} + +// testStore is a helper function to test various Store implementations. +func testStore(t *testing.T, s state.Store) { + key1 := "key1" + i1 := NewIntervals(0) + i1.Add(10, 20) + if err := s.Put(key1, i1); err != nil { + t.Fatal(err) + } + i := &Intervals{} + err := s.Get(key1, i) + if err != nil { + t.Fatal(err) + } + if i.String() != i1.String() { + t.Errorf("expected interval %s, got %s", i1, i) + } + + key2 := "key2" + i2 := NewIntervals(0) + i2.Add(10, 20) + if err := s.Put(key2, i2); err != nil { + t.Fatal(err) + } + err = s.Get(key2, i) + if err != nil { + t.Fatal(err) + } + if i.String() != i2.String() { + t.Errorf("expected interval %s, got %s", i2, i) + } + + if err := s.Delete(key1); err != nil { + t.Fatal(err) + } + if err := s.Get(key1, i); err != state.ErrNotFound { + t.Errorf("expected error %v, got %s", state.ErrNotFound, err) + } + if err := s.Get(key2, i); err != nil { + t.Errorf("expected error %v, got %s", nil, err) + } + + if err := s.Delete(key2); err != nil { + t.Fatal(err) + } + if err := s.Get(key2, i); err != state.ErrNotFound { + t.Errorf("expected error %v, got %s", state.ErrNotFound, err) + } +} |