aboutsummaryrefslogtreecommitdiffstats
path: root/swarm/network/stream/intervals/intervals.go
blob: 5fd820da877302620c44c280a20975dfdcef18f4 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
// Copyright 2018 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package intervals

import (
    "bytes"
    "fmt"
    "strconv"
    "sync"
)

// Intervals store a list of intervals. Its purpose is to provide
// methods to add new intervals and retrieve missing intervals that
// need to be added.
// It may be used in synchronization of streaming data to persist
// retrieved data ranges between sessions.
type Intervals struct {
    start  uint64
    ranges [][2]uint64
    mu     sync.RWMutex
}

// New creates a new instance of Intervals.
// Start argument limits the lower bound of intervals.
// No range bellow start bound will be added by Add method or
// returned by Next method. This limit may be used for
// tracking "live" synchronization, where the sync session
// starts from a specific value, and if "live" sync intervals
// need to be merged with historical ones, it can be safely done.
func NewIntervals(start uint64) *Intervals {
    return &Intervals{
        start: start,
    }
}

// Add adds a new range to intervals. Range start and end are values
// are both inclusive.
func (i *Intervals) Add(start, end uint64) {
    i.mu.Lock()
    defer i.mu.Unlock()

    i.add(start, end)
}

func (i *Intervals) add(start, end uint64) {
    if start < i.start {
        start = i.start
    }
    if end < i.start {
        return
    }
    minStartJ := -1
    maxEndJ := -1
    j := 0
    for ; j < len(i.ranges); j++ {
        if minStartJ < 0 {
            if (start <= i.ranges[j][0] && end+1 >= i.ranges[j][0]) || (start <= i.ranges[j][1]+1 && end+1 >= i.ranges[j][1]) {
                if i.ranges[j][0] < start {
                    start = i.ranges[j][0]
                }
                minStartJ = j
            }
        }
        if (start <= i.ranges[j][1] && end+1 >= i.ranges[j][1]) || (start <= i.ranges[j][0] && end+1 >= i.ranges[j][0]) {
            if i.ranges[j][1] > end {
                end = i.ranges[j][1]
            }
            maxEndJ = j
        }
        if end+1 <= i.ranges[j][0] {
            break
        }
    }
    if minStartJ < 0 && maxEndJ < 0 {
        i.ranges = append(i.ranges[:j], append([][2]uint64{{start, end}}, i.ranges[j:]...)...)
        return
    }
    if minStartJ >= 0 {
        i.ranges[minStartJ][0] = start
    }
    if maxEndJ >= 0 {
        i.ranges[maxEndJ][1] = end
    }
    if minStartJ >= 0 && maxEndJ >= 0 && minStartJ != maxEndJ {
        i.ranges[maxEndJ][0] = start
        i.ranges = append(i.ranges[:minStartJ], i.ranges[maxEndJ:]...)
    }
}

// Merge adds all the intervals from the the m Interval to current one.
func (i *Intervals) Merge(m *Intervals) {
    m.mu.RLock()
    defer m.mu.RUnlock()
    i.mu.Lock()
    defer i.mu.Unlock()

    for _, r := range m.ranges {
        i.add(r[0], r[1])
    }
}

// Next returns the first range interval that is not fulfilled. Returned
// start and end values are both inclusive, meaning that the whole range
// including start and end need to be added in order to full the gap
// in intervals.
// Returned value for end is 0 if the next interval is after the whole
// range that is stored in Intervals. Zero end value represents no limit
// on the next interval length.
func (i *Intervals) Next() (start, end uint64) {
    i.mu.RLock()
    defer i.mu.RUnlock()

    l := len(i.ranges)
    if l == 0 {
        return i.start, 0
    }
    if i.ranges[0][0] != i.start {
        return i.start, i.ranges[0][0] - 1
    }
    if l == 1 {
        return i.ranges[0][1] + 1, 0
    }
    return i.ranges[0][1] + 1, i.ranges[1][0] - 1
}

// Last returns the value that is at the end of the last interval.
func (i *Intervals) Last() (end uint64) {
    i.mu.RLock()
    defer i.mu.RUnlock()

    l := len(i.ranges)
    if l == 0 {
        return 0
    }
    return i.ranges[l-1][1]
}

// String returns a descriptive representation of range intervals
// in [] notation, as a list of two element vectors.
func (i *Intervals) String() string {
    return fmt.Sprint(i.ranges)
}

// MarshalBinary encodes Intervals parameters into a semicolon separated list.
// The first element in the list is base36-encoded start value. The following
// elements are two base36-encoded value ranges separated by comma.
func (i *Intervals) MarshalBinary() (data []byte, err error) {
    d := make([][]byte, len(i.ranges)+1)
    d[0] = []byte(strconv.FormatUint(i.start, 36))
    for j := range i.ranges {
        r := i.ranges[j]
        d[j+1] = []byte(strconv.FormatUint(r[0], 36) + "," + strconv.FormatUint(r[1], 36))
    }
    return bytes.Join(d, []byte(";")), nil
}

// UnmarshalBinary decodes data according to the Intervals.MarshalBinary format.
func (i *Intervals) UnmarshalBinary(data []byte) (err error) {
    d := bytes.Split(data, []byte(";"))
    l := len(d)
    if l == 0 {
        return nil
    }
    if l >= 1 {
        i.start, err = strconv.ParseUint(string(d[0]), 36, 64)
        if err != nil {
            return err
        }
    }
    if l == 1 {
        return nil
    }

    i.ranges = make([][2]uint64, 0, l-1)
    for j := 1; j < l; j++ {
        r := bytes.SplitN(d[j], []byte(","), 2)
        if len(r) < 2 {
            return fmt.Errorf("range %d has less then 2 elements", j)
        }
        start, err := strconv.ParseUint(string(r[0]), 36, 64)
        if err != nil {
            return fmt.Errorf("parsing the first element in range %d: %v", j, err)
        }
        end, err := strconv.ParseUint(string(r[1]), 36, 64)
        if err != nil {
            return fmt.Errorf("parsing the second element in range %d: %v", j, err)
        }
        i.ranges = append(i.ranges, [2]uint64{start, end})
    }

    return nil
}