-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathiter.go
419 lines (360 loc) · 11.2 KB
/
iter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
package trakt
import (
"encoding/json"
"errors"
"net/http"
"sync"
)
// NoLimit defines that we have no limit on the amount
// of pages to paginate through.
const NoLimit int64 = -1
// default pagination values.
const (
defaultPage = 1
defaultLimit = 10
)
// listMeta is the structure that contains the common properties
// of List iterators. The Count property is only populated if the
// total_count include option is passed in (see tests for example).
type listMeta struct {
currentPage int64
limit int64
totalPages int64
totalCount int64
}
func (l *listMeta) meta() *listMeta { return l }
// unmarshalHeaders allows us to unmarshal a response from
// the response HTTP headers.
// this is the case for pagination values where they are supplied
// in headers and not the response.
func (f *frame) unmarshalHeaders(h http.Header) error {
(*f).listMeta = &listMeta{
limit: parseInt(h.Get(`X-Pagination-limit`)),
currentPage: parseInt(h.Get(`X-Pagination-Page`)),
totalPages: parseInt(h.Get(`X-Pagination-Page-Count`)),
totalCount: parseInt(h.Get(`X-Pagination-Item-Count`)),
}
(*f).h = h
return nil
}
// newEmptyFrame generates a new empty frame which is
// available to consume the next "frame" of results.
func newEmptyFrame() iterationFrame {
rcv := make([]*json.RawMessage, 0)
return &frame{r: &rcv, listMeta: &listMeta{}}
}
// frame a frame is a wrapped set of results for a single
// "frame" basically a single page in the entire result.
type frame struct {
// the pagination information, derived from headers.
*listMeta
// the raw response for the current frame.
// we utilise json.RawMessage so that the initial
// json.Unmarshal on this frame receiver just
// confirms that we can unmarshal into a slice.
// we hold each entry in the slice as the raw JSON.
// this is done for two reasons:
// - we dont have to know what structure to unmarshal into.
// - its more performant to unmarshal each entry as needed
// rather than unmarshal the entire set and not use them all.
r *[]*json.RawMessage
// the set of headers from the response, this is updated
// on every frame, however, we only store the first set on the iterator.
// used to retrieve variables that may be passed in headers
// which dont change during the lifetime of the iterator
// i.e we retrieve sorting headers when getting a users watchlist, these
// dont change between frames.
h http.Header
}
// rcv implements iterationFrame, returns the pointer to
// the interface which the raw HTTP body should be unmarshalled into.
func (f *frame) rcv() *[]*json.RawMessage { return f.r }
// headers returns the headers from the HTTP request.
func (f *frame) headers() http.Header { return f.h }
// iterationFrame represents a window or slice of
// an entire pagination result.
type iterationFrame interface {
// rcv returns the receiver which the response data is unmarshalled into.
rcv() *[]*json.RawMessage
// meta the current metadata for the position we are in the complete set.
meta() *listMeta
// headers returns the headers from the HTTP request.
headers() http.Header
}
// BasicIterator represents an iterator which does not deal with paging.
// this can be used where the iterator does not require a page limit be set.
type BasicIterator interface {
// Err retrieves any error that occurred performing
// an operation to get a "frame". A frame represents a single
// paginated set of results (the results for a single page)
Err() error
// Next moves the cursor to the next point in the results
// performing an operation to retrieve the next result frame
// from the API if required.
Next() bool
// Scan scans the current data into the supplied receiver.
Scan(rcv interface{}) error
// getPage internal function which allows us to retrieve the next page
// of results.
getPage() iterationFrame
// headers retrieves the headers for the initial result.
headers() http.Header
}
// Iterator a generic representation of an iterator.
type Iterator interface {
BasicIterator
// PageLimit sets an absolute limit on how many pages to iterate through.
// if the result set has less pages than the limit, obviously it will
// finish before that.
PageLimit(page int64)
}
// singleIter this type of iterator
// is used when we wat to simulate pagination on
// an entire result.
// this is considered thread-safe and
// all exported functions can be called across
// multiple go-routines.
type singleIter struct {
sync.RWMutex
// the current pointer in the result.
cur *json.RawMessage
// err is the error from retrieving the current frame.
err error
// values is the current set of frame values.
values []*json.RawMessage
// query is the function to trigger to retrieve
// a single frame of results.
query queryFunc
// listParams is the initial set of parameters setting the initial
// page and limit for the frame to use.
listParams ListParamsContainer
// lazyLoad controls whether we want to load the initial page upfront
// or wait until the first call to Next.
lazyLoad bool
// loaded whether initial load has been performed.
loaded bool
// initialHeaders these are the headers received in the initial response.
//
// WARNING on lazy-loaded iterators this will only be available after the
// initial call to Next.
initialHeaders http.Header
}
// isLazyLoad returns whether lazy load is enabled on the iterator.
func (s *singleIter) isLazyLoad() bool {
s.RLock()
defer s.RUnlock()
return s.lazyLoad
}
// hasLoaded returns whether the initial load has been performed.
func (s *singleIter) hasLoaded() bool {
s.RLock()
defer s.RUnlock()
return s.loaded
}
// withRLock helper function which wraps a function with a read lock
// enabled.
func (s *singleIter) withRLock(fn func()) {
s.RLock()
defer s.RUnlock()
fn()
}
// withLock helper function which wraps a function with a read/write lock
// enabled.
func (s *singleIter) withLock(fn func()) {
s.Lock()
defer s.Unlock()
fn()
}
// headers returns the headers from the initial response the iterator made.
func (s *singleIter) headers() http.Header {
s.RLock()
defer s.RUnlock()
return s.initialHeaders
}
// Scan attempts to can the current entry in the result set
// into the provided receiver rcv.
// if there is no pointer available or we failed to scan the receiver
// then an error is returned.
func (s *singleIter) Scan(rcv interface{}) error {
var cur *json.RawMessage
s.withLock(func() { cur = s.cur })
if cur == nil {
return errors.New("nothing left in result set")
}
return json.Unmarshal(*cur, rcv)
}
// Err returns the error from performing any query in the lifetime of
// the iterator.
func (s *singleIter) Err() error {
s.RLock()
defer s.RUnlock()
return s.err
}
// PageLimit is not required on a single iterator as it only deals
// with a single page. So the limit is already enforced to 1.
func (s *singleIter) PageLimit(_ int64) {}
// getPage retrieves the next page using the defined query.
func (s *singleIter) getPage() iterationFrame {
var window iterationFrame
var cp ListParamsContainer
s.withRLock(func() {
cp = s.listParams
})
// we pass a copy through so that internally we dont update our reference.
s.withLock(func() {
window, s.err = s.query(cp)
if s.err != nil {
s.loaded = true
return
}
// if we have a receiver response on the frame.
if rcv := window.rcv(); rcv != nil {
s.values = *rcv
}
// if this is the initial request, set the headers.
if !s.loaded {
s.initialHeaders = window.headers()
}
// ensure we always set that we have loaded /
// attempted to load at least a single frame.
s.loaded = true
})
return window
}
// Next moves the cursor in the result set to the next entry
// and returns true if there is a pointer available.
// this function returns false if no cursors are available in
// the result set.
func (s *singleIter) Next() bool {
var next bool
// if we are lazy loading
// check to see if we have any results.
if s.isLazyLoad() && !s.hasLoaded() {
var v []*json.RawMessage
s.withRLock(func() {
v = s.values
})
if len(v) == 0 {
s.getPage()
}
}
s.withLock(func() {
if s.err != nil {
return
}
if len(s.values) == 0 {
return
}
s.cur = s.values[0]
s.values = s.values[1:]
next = true
})
return next
}
// iter this iterator builds on-top of
// a single iterator and allows us to paginate between
// pages.
// this is considered thread-safe and
// all exported functions can be called across
// multiple go-routines.
type iter struct {
singleIter
meta *listMeta
limit int64
}
// PageLimit allows us to limit the amount of pages to paginate.
func (it *iter) PageLimit(page int64) {
it.Lock()
defer it.Unlock()
it.limit = page
}
// Next moves the cursor in the result set to the next entry
// and returns true if there is a pointer available.
// this function returns false if no cursors are available in
// the result set.
func (it *iter) Next() bool {
var (
err error
lm *listMeta
v []*json.RawMessage
)
it.withLock(func() {
err, lm, v = it.err, it.meta, it.values
})
if err != nil {
return false
}
if it.isLazyLoad() && !it.hasLoaded() {
// perform the initial page query.
it.getPage()
return it.singleIter.Next()
}
np := lm.currentPage + 1
if len(v) == 0 && (lm.totalPages > lm.currentPage) && !it.hasReachedLimit(np) {
// update the page number.
it.withLock(func() {
it.listParams.setPagination(np, lm.limit)
})
it.getPage()
}
return it.singleIter.Next()
}
// hasReachedLimit determines if we have reached the defined page limit.
func (it *iter) hasReachedLimit(nextPage int64) bool {
it.Lock()
defer it.Unlock()
if it.limit == NoLimit {
return false
}
return it.limit < nextPage
}
// getPage performs the query to retrieve the next frame of data.
func (it *iter) getPage() iterationFrame {
window := it.singleIter.getPage()
if window != nil {
it.withLock(func() {
it.meta = window.meta()
})
}
return window
}
// queryFunc is the function used to get a page listing.
type queryFunc func(ListParamsContainer) (iterationFrame, error)
// newIterator returns a new iter for a given queryFunc and its options.
//
// lazyLoad determines if the initial page of results is loaded when the iterator is generated
// or when we call our initial call to Next.
func newIterator(p ListParamsContainer, query queryFunc, lazyLoad bool) Iterator {
iter := &iter{
singleIter: singleIter{
listParams: p,
query: query,
lazyLoad: lazyLoad,
},
limit: NoLimit, // by default paginate through all results available.
}
// ensure default pagination values are defined.
p.setDefaultPagination(defaultPage, defaultLimit)
if !lazyLoad {
iter.getPage()
}
return iter
}
// newSimulatedIterator a simulated iterator is an iterator which
// only works its way though the single frame (result set) it will never
// attempt to move to another page. This is used so we can have all lists of
// results (paginated or not) will utilise an iterator so the library is consistent.
//
// lazyLoad determines if the initial page of results is loaded when the iterator is generated
// or when we call our initial call to Next.
func newSimulatedIterator(p ListParamsContainer, query queryFunc, lazyLoad bool) Iterator {
iter := &singleIter{
query: query,
listParams: p,
lazyLoad: lazyLoad,
}
if !lazyLoad {
iter.getPage()
}
return iter
}