-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcompletable.go
298 lines (235 loc) · 8.18 KB
/
completable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
package promise
import (
"fmt"
"sync"
"sync/atomic"
)
// Unfortunately there are no atomic operations smaller values than 32
const (
PENDING uint32 = iota
FULFILLED
REJECTED
)
type CompletablePromise struct {
state uint32
cause error
value interface{}
mutex sync.Mutex
cond *sync.Cond
compute func(interface{}) interface{}
handle func(error)
dependencies []Completable
}
func completable(compute func(interface{}) interface{}, handle func(error)) *CompletablePromise {
completable := new(CompletablePromise)
completable.cond = sync.NewCond(&completable.mutex)
completable.compute = compute
completable.handle = handle
completable.state = PENDING
completable.dependencies = make([]Completable, 0)
return completable
}
// Generate a new completable promise. This provides an implementation of the
// `promise.Completable` interface which is threadsafe.
func Promise() Completable {
return completable(func(x interface{}) interface{} { return x }, nil)
}
func (promise *CompletablePromise) State() uint32 {
return atomic.LoadUint32(&promise.state)
}
// Determine if the promise has been resolved.
func (promise *CompletablePromise) Resolved() bool {
return promise.State() == FULFILLED
}
func (promise *CompletablePromise) Rejected() bool {
return promise.State() == REJECTED
}
// Return the value of the promise, if it was resolved successfully, or return
// the cause of failure if it was not. Block until the promise is either
// completed or rejected.
func (promise *CompletablePromise) Get() (interface{}, error) {
if promise.State() == PENDING {
promise.mutex.Lock()
for promise.State() == PENDING {
// wait unlocks its associated mutex (incase you were wondering)
// so we cannot guarantee that the state has actually changed.
promise.cond.Wait()
}
promise.mutex.Unlock()
}
return promise.value, promise.cause
}
func (promise *CompletablePromise) depend(compute func(interface{}) interface{}) Thenable {
andThen := completable(compute, nil)
promise.dependencies = append(promise.dependencies, andThen)
return andThen
}
// The private version of this is used for `Combine` to call, so that it won't
// attempt to acquire the mutex twice.
func (promise *CompletablePromise) then(compute func(interface{}) interface{}) Thenable {
switch promise.State() {
case PENDING:
return promise.depend(compute)
case REJECTED:
return Rejected(promise.cause)
case FULFILLED:
return Completed(compute(promise.value))
}
panic("Invalid state")
}
// Compose this promise into one which is complete when the following code has
// executed.
func (promise *CompletablePromise) Then(compute func(interface{}) interface{}) Thenable {
switch promise.State() {
case PENDING:
promise.mutex.Lock()
defer promise.mutex.Unlock()
return promise.then(compute)
case REJECTED:
return Rejected(promise.cause)
case FULFILLED:
return Completed(compute(promise.value))
}
panic("Invalid state")
}
// Compose this promise into another one which handles an upstream error with
// the given handler.
func (promise *CompletablePromise) Catch(handle func(error)) Thenable {
if promise.State() == PENDING {
promise.mutex.Lock()
defer promise.mutex.Unlock()
// Double check now that we have the lock that this is still true.
if promise.State() == PENDING {
rejectable := completable(nil, handle)
promise.dependencies = append(promise.dependencies, rejectable)
return rejectable
}
}
if promise.State() == REJECTED {
handle(promise.cause)
return Rejected(promise.cause)
}
return promise
}
// Error due to an illegal second state transition, after figuring out what
// caused the previous state transition.
func panicStateComplete(rejected bool) {
var method string
if rejected {
method = "Reject()"
} else {
method = "Complete()"
}
panic(fmt.Sprintf("%s was already called on this promise", method))
}
func (promise *CompletablePromise) complete(value interface{}) interface{} {
// This should rarely actually be blocking, there's a separate mutex for
// each completable promise and the mutex is only acquired during assembly
// and completion.
promise.mutex.Lock()
defer promise.mutex.Unlock()
composed := value
if promise.compute != nil {
// Because this composition function
composed = promise.compute(value)
}
if promise.State() != PENDING {
panicStateComplete(promise.State() == REJECTED)
}
if composed != nil {
promise.value = composed
}
atomic.StoreUint32(&promise.state, FULFILLED)
return composed
}
// Complete this promise with a given value.
// It is considered a programming error to complete a promise multiple times.
// The promise is to be completed once, and not thereafter.
func (promise *CompletablePromise) Complete(value interface{}) {
// Transition the state of this promise (which requires the lock). At this
// point all subsequent calls to Then() or Complete() will be called on a
// Completed promise, meaning they will be satisfied immediately.
composed := promise.complete(value)
// So now that the condition has been satisified, broadcast to all waiters
// that thie task is now complete. They should be in the `Get()` wait loop,
// above.
promise.cond.Broadcast()
for _, dependency := range promise.dependencies {
dependency.Complete(composed)
}
}
// Reject this promise and all of its dependencies.
// Reject this promise, and along with it all promises which were derived from
// it.
func (promise *CompletablePromise) Reject(cause error) {
if cause == nil {
panic(fmt.Sprintf("Reject() requires a non-nil cause"))
}
promise.mutex.Lock()
if promise.State() != PENDING {
panicStateComplete(promise.State() == REJECTED)
}
promise.cause = cause
atomic.StoreUint32(&promise.state, REJECTED)
promise.mutex.Unlock()
// Unlike the Complete() routine, which executes the transformation
// *before* actually storing the value or transitioning the state, this
// transitions after. The reason for that is two-fold: The return value of
// the handle() callback is not *stored*, and the second is that we want a
// promise accessed from within the Catch() handler to be in a rejected
// state.
if promise.handle != nil {
promise.handle(cause)
}
// Now that this is all done, notify all of the handlers that yeah, we're
// done.
promise.cond.Broadcast()
for _, dependency := range promise.dependencies {
dependency.Reject(cause)
}
}
// Combine this promise with another by applying the combinator `create` to the
// value once it is available. `create` must return an instance of a
// `Thenable`. The instance *may* be `Completable`. Returns a new completable
// promise which is completed when the returned promise, and this promise, are
// completed...but no sooner.
func (promise *CompletablePromise) Combine(create func(interface{}) Thenable) Thenable {
if promise.State() == PENDING {
promise.mutex.Lock()
defer promise.mutex.Unlock()
if promise.State() == PENDING {
// So, this may seem a little whacky, but what is happening here is
// that seeing as there is presently no value from which to generate
// the new promise, a callback is registered using Then() which
// executes the supplied transform function, and when the promise that
// was returned by *that* transform produces a result, it is copied
// over to the placeholder thus satisfying the request.
placeholder := Promise()
// So, is it possible that Combine() is called, and the promise is
// completed while it's being combined? Should *not* be.
//
// Perhaps all access to promise.state should be atomic. We are
// using the double lock idiom here, after all...
// It's important that the internal then() is used here, because the
// external one allocates a mutex lock. sync.Mutex is not a reentrant lock
// type, unfortunately.
promise.depend(func(awaited interface{}) interface{} {
create(awaited).Then(func(composed interface{}) interface{} {
placeholder.Complete(composed)
return nil
}).Catch(func(err error) {
placeholder.Reject(err)
})
return nil
}).Catch(func(err error) {
placeholder.Reject(err)
})
return placeholder
}
}
if promise.state == REJECTED {
return Rejected(promise.cause)
} else {
return create(promise.value)
}
}