aboutsummaryrefslogtreecommitdiffstats
path: root/common/prque/lazyqueue.go
blob: 92ddd77f677a15b64f7ef95378517026336b9753 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package prque

import (
    "container/heap"
    "time"

    "github.com/ethereum/go-ethereum/common/mclock"
)

// LazyQueue is a priority queue data structure where priorities can change over
// time and are only evaluated on demand.
// Two callbacks are required:
// - priority evaluates the actual priority of an item
// - maxPriority gives an upper estimate for the priority in any moment between
//   now and the given absolute time
// If the upper estimate is exceeded then Update should be called for that item.
// A global Refresh function should also be called periodically.
type LazyQueue struct {
    clock mclock.Clock
    // Items are stored in one of two internal queues ordered by estimated max
    // priority until the next and the next-after-next refresh. Update and Refresh
    // always places items in queue[1].
    queue       [2]*sstack
    popQueue    *sstack
    period      time.Duration
    maxUntil    mclock.AbsTime
    indexOffset int
    setIndex    SetIndexCallback
    priority    PriorityCallback
    maxPriority MaxPriorityCallback
}

type (
    PriorityCallback    func(data interface{}, now mclock.AbsTime) int64   // actual priority callback
    MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback
)

// NewLazyQueue creates a new lazy queue
func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue {
    q := &LazyQueue{
        popQueue:    newSstack(nil),
        setIndex:    setIndex,
        priority:    priority,
        maxPriority: maxPriority,
        clock:       clock,
        period:      refreshPeriod}
    q.Reset()
    q.Refresh()
    return q
}

// Reset clears the contents of the queue
func (q *LazyQueue) Reset() {
    q.queue[0] = newSstack(q.setIndex0)
    q.queue[1] = newSstack(q.setIndex1)
}

// Refresh should be called at least with the frequency specified by the refreshPeriod parameter
func (q *LazyQueue) Refresh() {
    q.maxUntil = q.clock.Now() + mclock.AbsTime(q.period)
    for q.queue[0].Len() != 0 {
        q.Push(heap.Pop(q.queue[0]).(*item).value)
    }
    q.queue[0], q.queue[1] = q.queue[1], q.queue[0]
    q.indexOffset = 1 - q.indexOffset
    q.maxUntil += mclock.AbsTime(q.period)
}

// Push adds an item to the queue
func (q *LazyQueue) Push(data interface{}) {
    heap.Push(q.queue[1], &item{data, q.maxPriority(data, q.maxUntil)})
}

// Update updates the upper priority estimate for the item with the given queue index
func (q *LazyQueue) Update(index int) {
    q.Push(q.Remove(index))
}

// Pop removes and returns the item with the greatest actual priority
func (q *LazyQueue) Pop() (interface{}, int64) {
    var (
        resData interface{}
        resPri  int64
    )
    q.MultiPop(func(data interface{}, priority int64) bool {
        resData = data
        resPri = priority
        return false
    })
    return resData, resPri
}

// peekIndex returns the index of the internal queue where the item with the
// highest estimated priority is or -1 if both are empty
func (q *LazyQueue) peekIndex() int {
    if q.queue[0].Len() != 0 {
        if q.queue[1].Len() != 0 && q.queue[1].blocks[0][0].priority > q.queue[0].blocks[0][0].priority {
            return 1
        }
        return 0
    }
    if q.queue[1].Len() != 0 {
        return 1
    }
    return -1
}

// MultiPop pops multiple items from the queue and is more efficient than calling
// Pop multiple times. Popped items are passed to the callback. MultiPop returns
// when the callback returns false or there are no more items to pop.
func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) {
    now := q.clock.Now()
    nextIndex := q.peekIndex()
    for nextIndex != -1 {
        data := heap.Pop(q.queue[nextIndex]).(*item).value
        heap.Push(q.popQueue, &item{data, q.priority(data, now)})
        nextIndex = q.peekIndex()
        for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) {
            i := heap.Pop(q.popQueue).(*item)
            if !callback(i.value, i.priority) {
                for q.popQueue.Len() != 0 {
                    q.Push(heap.Pop(q.popQueue).(*item).value)
                }
                return
            }
        }
    }
}

// PopItem pops the item from the queue only, dropping the associated priority value.
func (q *LazyQueue) PopItem() interface{} {
    i, _ := q.Pop()
    return i
}

// Remove removes removes the item with the given index.
func (q *LazyQueue) Remove(index int) interface{} {
    if index < 0 {
        return nil
    }
    return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item).value
}

// Empty checks whether the priority queue is empty.
func (q *LazyQueue) Empty() bool {
    return q.queue[0].Len() == 0 && q.queue[1].Len() == 0
}

// Size returns the number of items in the priority queue.
func (q *LazyQueue) Size() int {
    return q.queue[0].Len() + q.queue[1].Len()
}

// setIndex0 translates internal queue item index to the virtual index space of LazyQueue
func (q *LazyQueue) setIndex0(data interface{}, index int) {
    if index == -1 {
        q.setIndex(data, -1)
    } else {
        q.setIndex(data, index+index)
    }
}

// setIndex1 translates internal queue item index to the virtual index space of LazyQueue
func (q *LazyQueue) setIndex1(data interface{}, index int) {
    q.setIndex(data, index+index+1)
}