forked from bjornbytes/RxLua
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrx.lua
2318 lines (1913 loc) · 64.6 KB
/
rx.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
-- RxLua v0.0.3
-- https://github.com/bjornbytes/rxlua
-- MIT License
local util = {}
util.pack = table.pack or function(...) return { n = select('#', ...), ... } end
util.unpack = table.unpack or unpack
util.eq = function(x, y) return x == y end
util.noop = function() end
util.identity = function(x) return x end
util.constant = function(x) return function() return x end end
util.isa = function(object, class)
return type(object) == 'table' and getmetatable(object).__index == class
end
util.tryWithObserver = function(observer, fn, ...)
local success, result = pcall(fn, ...)
if not success then
observer:onError(result)
end
return success, result
end
--- @class Subscription
-- @description A handle representing the link between an Observer and an Observable, as well as any
-- work required to clean up after the Observable completes or the Observer unsubscribes.
local Subscription = {}
Subscription.__index = Subscription
Subscription.__tostring = util.constant('Subscription')
--- Creates a new Subscription.
-- @arg {function=} action - The action to run when the subscription is unsubscribed. It will only
-- be run once.
-- @returns {Subscription}
function Subscription.create(action)
local self = {
action = action or util.noop,
unsubscribed = false
}
return setmetatable(self, Subscription)
end
--- Unsubscribes the subscription, performing any necessary cleanup work.
function Subscription:unsubscribe()
if self.unsubscribed then return end
self.action(self)
self.unsubscribed = true
end
--- @class Observer
-- @description Observers are simple objects that receive values from Observables.
local Observer = {}
Observer.__index = Observer
Observer.__tostring = util.constant('Observer')
--- Creates a new Observer.
-- @arg {function=} onNext - Called when the Observable produces a value.
-- @arg {function=} onError - Called when the Observable terminates due to an error.
-- @arg {function=} onCompleted - Called when the Observable completes normally.
-- @returns {Observer}
function Observer.create(onNext, onError, onCompleted)
local self = {
_onNext = onNext or util.noop,
_onError = onError or error,
_onCompleted = onCompleted or util.noop,
stopped = false
}
return setmetatable(self, Observer)
end
--- Pushes zero or more values to the Observer.
-- @arg {*...} values
function Observer:onNext(...)
if not self.stopped then
self._onNext(...)
end
end
--- Notify the Observer that an error has occurred.
-- @arg {string=} message - A string describing what went wrong.
function Observer:onError(message)
if not self.stopped then
self.stopped = true
self._onError(message)
end
end
--- Notify the Observer that the sequence has completed and will produce no more values.
function Observer:onCompleted()
if not self.stopped then
self.stopped = true
self._onCompleted()
end
end
--- @class Observable
-- @description Observables push values to Observers.
local Observable = {}
Observable.__index = Observable
Observable.__tostring = util.constant('Observable')
--- Creates a new Observable.
-- @arg {function} subscribe - The subscription function that produces values.
-- @returns {Observable}
function Observable.create(subscribe)
local self = {
_subscribe = subscribe
}
return setmetatable(self, Observable)
end
--- Shorthand for creating an Observer and passing it to this Observable's subscription function.
-- @arg {function} onNext - Called when the Observable produces a value.
-- @arg {function} onError - Called when the Observable terminates due to an error.
-- @arg {function} onCompleted - Called when the Observable completes normally.
function Observable:subscribe(onNext, onError, onCompleted)
if type(onNext) == 'table' then
return self._subscribe(onNext)
else
return self._subscribe(Observer.create(onNext, onError, onCompleted))
end
end
--- Returns an Observable that immediately completes without producing a value.
function Observable.empty()
return Observable.create(function(observer)
observer:onCompleted()
end)
end
--- Returns an Observable that never produces values and never completes.
function Observable.never()
return Observable.create(function(observer) end)
end
--- Returns an Observable that immediately produces an error.
function Observable.throw(message)
return Observable.create(function(observer)
observer:onError(message)
end)
end
--- Creates an Observable that produces a set of values.
-- @arg {*...} values
-- @returns {Observable}
function Observable.of(...)
local args = {...}
local argCount = select('#', ...)
return Observable.create(function(observer)
for i = 1, argCount do
observer:onNext(args[i])
end
observer:onCompleted()
end)
end
--- Creates an Observable that produces a range of values in a manner similar to a Lua for loop.
-- @arg {number} initial - The first value of the range, or the upper limit if no other arguments
-- are specified.
-- @arg {number=} limit - The second value of the range.
-- @arg {number=1} step - An amount to increment the value by each iteration.
-- @returns {Observable}
function Observable.fromRange(initial, limit, step)
if not limit and not step then
initial, limit = 1, initial
end
step = step or 1
return Observable.create(function(observer)
for i = initial, limit, step do
observer:onNext(i)
end
observer:onCompleted()
end)
end
--- Creates an Observable that produces values from a table.
-- @arg {table} table - The table used to create the Observable.
-- @arg {function=pairs} iterator - An iterator used to iterate the table, e.g. pairs or ipairs.
-- @arg {boolean} keys - Whether or not to also emit the keys of the table.
-- @returns {Observable}
function Observable.fromTable(t, iterator, keys)
iterator = iterator or pairs
return Observable.create(function(observer)
for key, value in iterator(t) do
observer:onNext(value, keys and key or nil)
end
observer:onCompleted()
end)
end
--- Creates an Observable that produces values when the specified coroutine yields.
-- @arg {thread|function} fn - A coroutine or function to use to generate values. Note that if a
-- coroutine is used, the values it yields will be shared by all
-- subscribed Observers (influenced by the Scheduler), whereas a new
-- coroutine will be created for each Observer when a function is used.
-- @returns {Observable}
function Observable.fromCoroutine(fn, scheduler)
return Observable.create(function(observer)
local thread = type(fn) == 'function' and coroutine.create(fn) or fn
return scheduler:schedule(function()
while not observer.stopped do
local success, value = coroutine.resume(thread)
if success then
observer:onNext(value)
else
return observer:onError(value)
end
if coroutine.status(thread) == 'dead' then
return observer:onCompleted()
end
coroutine.yield()
end
end)
end)
end
--- Creates an Observable that produces values from a file, line by line.
-- @arg {string} filename - The name of the file used to create the Observable
-- @returns {Observable}
function Observable.fromFileByLine(filename)
return Observable.create(function(observer)
local file = io.open(filename, 'r')
if file then
file:close()
for line in io.lines(filename) do
observer:onNext(line)
end
return observer:onCompleted()
else
return observer:onError(filename)
end
end)
end
--- Creates an Observable that creates a new Observable for each observer using a factory function.
-- @arg {function} factory - A function that returns an Observable.
-- @returns {Observable}
function Observable.defer(fn)
if not fn or type(fn) ~= 'function' then
error('Expected a function')
end
return setmetatable({
subscribe = function(_, ...)
local observable = fn()
return observable:subscribe(...)
end
}, Observable)
end
--- Returns an Observable that repeats a value a specified number of times.
-- @arg {*} value - The value to repeat.
-- @arg {number=} count - The number of times to repeat the value. If left unspecified, the value
-- is repeated an infinite number of times.
-- @returns {Observable}
function Observable.replicate(value, count)
return Observable.create(function(observer)
while count == nil or count > 0 do
observer:onNext(value)
if count then
count = count - 1
end
end
observer:onCompleted()
end)
end
--- Subscribes to this Observable and prints values it produces.
-- @arg {string=} name - Prefixes the printed messages with a name.
-- @arg {function=tostring} formatter - A function that formats one or more values to be printed.
function Observable:dump(name, formatter)
name = name and (name .. ' ') or ''
formatter = formatter or tostring
local onNext = function(...) print(name .. 'onNext: ' .. formatter(...)) end
local onError = function(e) print(name .. 'onError: ' .. e) end
local onCompleted = function() print(name .. 'onCompleted') end
return self:subscribe(onNext, onError, onCompleted)
end
--- Determine whether all items emitted by an Observable meet some criteria.
-- @arg {function=identity} predicate - The predicate used to evaluate objects.
function Observable:all(predicate)
predicate = predicate or util.identity
return Observable.create(function(observer)
local function onNext(...)
util.tryWithObserver(observer, function(...)
if not predicate(...) then
observer:onNext(false)
observer:onCompleted()
end
end, ...)
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
observer:onNext(true)
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Given a set of Observables, produces values from only the first one to produce a value.
-- @arg {Observable...} observables
-- @returns {Observable}
function Observable.amb(a, b, ...)
if not a or not b then return a end
return Observable.create(function(observer)
local subscriptionA, subscriptionB
local function onNextA(...)
if subscriptionB then subscriptionB:unsubscribe() end
observer:onNext(...)
end
local function onErrorA(e)
if subscriptionB then subscriptionB:unsubscribe() end
observer:onError(e)
end
local function onCompletedA()
if subscriptionB then subscriptionB:unsubscribe() end
observer:onCompleted()
end
local function onNextB(...)
if subscriptionA then subscriptionA:unsubscribe() end
observer:onNext(...)
end
local function onErrorB(e)
if subscriptionA then subscriptionA:unsubscribe() end
observer:onError(e)
end
local function onCompletedB()
if subscriptionA then subscriptionA:unsubscribe() end
observer:onCompleted()
end
subscriptionA = a:subscribe(onNextA, onErrorA, onCompletedA)
subscriptionB = b:subscribe(onNextB, onErrorB, onCompletedB)
return Subscription.create(function()
subscriptionA:unsubscribe()
subscriptionB:unsubscribe()
end)
end):amb(...)
end
--- Returns an Observable that produces the average of all values produced by the original.
-- @returns {Observable}
function Observable:average()
return Observable.create(function(observer)
local sum, count = 0, 0
local function onNext(value)
sum = sum + value
count = count + 1
end
local function onError(e)
observer:onError(e)
end
local function onCompleted()
if count > 0 then
observer:onNext(sum / count)
end
observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns an Observable that buffers values from the original and produces them as multiple
-- values.
-- @arg {number} size - The size of the buffer.
function Observable:buffer(size)
if not size or type(size) ~= 'number' then
error('Expected a number')
end
return Observable.create(function(observer)
local buffer = {}
local function emit()
if #buffer > 0 then
observer:onNext(util.unpack(buffer))
buffer = {}
end
end
local function onNext(...)
local values = {...}
for i = 1, #values do
table.insert(buffer, values[i])
if #buffer >= size then
emit()
end
end
end
local function onError(message)
emit()
return observer:onError(message)
end
local function onCompleted()
emit()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns an Observable that intercepts any errors from the previous and replace them with values
-- produced by a new Observable.
-- @arg {function|Observable} handler - An Observable or a function that returns an Observable to
-- replace the source Observable in the event of an error.
-- @returns {Observable}
function Observable:catch(handler)
handler = handler and (type(handler) == 'function' and handler or util.constant(handler))
return Observable.create(function(observer)
local subscription
local function onNext(...)
return observer:onNext(...)
end
local function onError(e)
if not handler then
return observer:onCompleted()
end
local success, _continue = pcall(handler, e)
if success and _continue then
if subscription then subscription:unsubscribe() end
_continue:subscribe(observer)
else
observer:onError(success and e or _continue)
end
end
local function onCompleted()
observer:onCompleted()
end
subscription = self:subscribe(onNext, onError, onCompleted)
return subscription
end)
end
--- Returns a new Observable that runs a combinator function on the most recent values from a set
-- of Observables whenever any of them produce a new value. The results of the combinator function
-- are produced by the new Observable.
-- @arg {Observable...} observables - One or more Observables to combine.
-- @arg {function} combinator - A function that combines the latest result from each Observable and
-- returns a single value.
-- @returns {Observable}
function Observable:combineLatest(...)
local sources = {...}
local combinator = table.remove(sources)
if type(combinator) ~= 'function' then
table.insert(sources, combinator)
combinator = function(...) return ... end
end
table.insert(sources, 1, self)
return Observable.create(function(observer)
local latest = {}
local pending = {util.unpack(sources)}
local completed = {}
local subscription = {}
local function onNext(i)
return function(value)
latest[i] = value
pending[i] = nil
if not next(pending) then
util.tryWithObserver(observer, function()
observer:onNext(combinator(util.unpack(latest)))
end)
end
end
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted(i)
return function()
table.insert(completed, i)
if #completed == #sources then
observer:onCompleted()
end
end
end
for i = 1, #sources do
subscription[i] = sources[i]:subscribe(onNext(i), onError, onCompleted(i))
end
return Subscription.create(function ()
for i = 1, #sources do
if subscription[i] then subscription[i]:unsubscribe() end
end
end)
end)
end
--- Returns a new Observable that produces the values of the first with falsy values removed.
-- @returns {Observable}
function Observable:compact()
return self:filter(util.identity)
end
--- Returns a new Observable that produces the values produced by all the specified Observables in
-- the order they are specified.
-- @arg {Observable...} sources - The Observables to concatenate.
-- @returns {Observable}
function Observable:concat(other, ...)
if not other then return self end
local others = {...}
return Observable.create(function(observer)
local function onNext(...)
return observer:onNext(...)
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
local function chain()
return other:concat(util.unpack(others)):subscribe(onNext, onError, onCompleted)
end
return self:subscribe(onNext, onError, chain)
end)
end
--- Returns a new Observable that produces a single boolean value representing whether or not the
-- specified value was produced by the original.
-- @arg {*} value - The value to search for. == is used for equality testing.
-- @returns {Observable}
function Observable:contains(value)
return Observable.create(function(observer)
local subscription
local function onNext(...)
local args = util.pack(...)
if #args == 0 and value == nil then
observer:onNext(true)
if subscription then subscription:unsubscribe() end
return observer:onCompleted()
end
for i = 1, #args do
if args[i] == value then
observer:onNext(true)
if subscription then subscription:unsubscribe() end
return observer:onCompleted()
end
end
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
observer:onNext(false)
return observer:onCompleted()
end
subscription = self:subscribe(onNext, onError, onCompleted)
return subscription
end)
end
--- Returns an Observable that produces a single value representing the number of values produced
-- by the source value that satisfy an optional predicate.
-- @arg {function=} predicate - The predicate used to match values.
function Observable:count(predicate)
predicate = predicate or util.constant(true)
return Observable.create(function(observer)
local count = 0
local function onNext(...)
util.tryWithObserver(observer, function(...)
if predicate(...) then
count = count + 1
end
end, ...)
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
observer:onNext(count)
observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new throttled Observable that waits to produce values until a timeout has expired, at
-- which point it produces the latest value from the source Observable. Whenever the source
-- Observable produces a value, the timeout is reset.
-- @arg {number|function} time - An amount in milliseconds to wait before producing the last value.
-- @arg {Scheduler} scheduler - The scheduler to run the Observable on.
-- @returns {Observable}
function Observable:debounce(time, scheduler)
time = time or 0
return Observable.create(function(observer)
local debounced = {}
local function wrap(key)
return function(...)
local value = util.pack(...)
if debounced[key] then
debounced[key]:unsubscribe()
end
local values = util.pack(...)
debounced[key] = scheduler:schedule(function()
return observer[key](observer, util.unpack(values))
end, time)
end
end
local subscription = self:subscribe(wrap('onNext'), wrap('onError'), wrap('onCompleted'))
return Subscription.create(function()
if subscription then subscription:unsubscribe() end
for _, timeout in pairs(debounced) do
timeout:unsubscribe()
end
end)
end)
end
--- Returns a new Observable that produces a default set of items if the source Observable produces
-- no values.
-- @arg {*...} values - Zero or more values to produce if the source completes without emitting
-- anything.
-- @returns {Observable}
function Observable:defaultIfEmpty(...)
local defaults = util.pack(...)
return Observable.create(function(observer)
local hasValue = false
local function onNext(...)
hasValue = true
observer:onNext(...)
end
local function onError(e)
observer:onError(e)
end
local function onCompleted()
if not hasValue then
observer:onNext(util.unpack(defaults))
end
observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that produces the values of the original delayed by a time period.
-- @arg {number|function} time - An amount in milliseconds to delay by, or a function which returns
-- this value.
-- @arg {Scheduler} scheduler - The scheduler to run the Observable on.
-- @returns {Observable}
function Observable:delay(time, scheduler)
time = type(time) ~= 'function' and util.constant(time) or time
return Observable.create(function(observer)
local actions = {}
local function delay(key)
return function(...)
local arg = util.pack(...)
local handle = scheduler:schedule(function()
observer[key](observer, util.unpack(arg))
end, time())
table.insert(actions, handle)
end
end
local subscription = self:subscribe(delay('onNext'), delay('onError'), delay('onCompleted'))
return Subscription.create(function()
if subscription then subscription:unsubscribe() end
for i = 1, #actions do
actions[i]:unsubscribe()
end
end)
end)
end
--- Returns a new Observable that produces the values from the original with duplicates removed.
-- @returns {Observable}
function Observable:distinct()
return Observable.create(function(observer)
local values = {}
local function onNext(x)
if not values[x] then
observer:onNext(x)
end
values[x] = true
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns an Observable that only produces values from the original if they are different from
-- the previous value.
-- @arg {function} comparator - A function used to compare 2 values. If unspecified, == is used.
-- @returns {Observable}
function Observable:distinctUntilChanged(comparator)
comparator = comparator or util.eq
return Observable.create(function(observer)
local first = true
local currentValue = nil
local function onNext(value, ...)
local values = util.pack(...)
util.tryWithObserver(observer, function()
if first or not comparator(value, currentValue) then
observer:onNext(value, util.unpack(values))
currentValue = value
first = false
end
end)
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns an Observable that produces the nth element produced by the source Observable.
-- @arg {number} index - The index of the item, with an index of 1 representing the first.
-- @returns {Observable}
function Observable:elementAt(index)
if not index or type(index) ~= 'number' then
error('Expected a number')
end
return Observable.create(function(observer)
local subscription
local i = 1
local function onNext(...)
if i == index then
observer:onNext(...)
observer:onCompleted()
if subscription then
subscription:unsubscribe()
end
else
i = i + 1
end
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
subscription = self:subscribe(onNext, onError, onCompleted)
return subscription
end)
end
--- Returns a new Observable that only produces values of the first that satisfy a predicate.
-- @arg {function} predicate - The predicate used to filter values.
-- @returns {Observable}
function Observable:filter(predicate)
predicate = predicate or util.identity
return Observable.create(function(observer)
local function onNext(...)
util.tryWithObserver(observer, function(...)
if predicate(...) then
return observer:onNext(...)
end
end, ...)
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that produces the first value of the original that satisfies a
-- predicate.
-- @arg {function} predicate - The predicate used to find a value.
function Observable:find(predicate)
predicate = predicate or util.identity
return Observable.create(function(observer)
local function onNext(...)
util.tryWithObserver(observer, function(...)
if predicate(...) then
observer:onNext(...)
return observer:onCompleted()
end
end, ...)
end
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
return observer:onCompleted()
end
return self:subscribe(onNext, onError, onCompleted)
end)
end
--- Returns a new Observable that only produces the first result of the original.
-- @returns {Observable}
function Observable:first()
return self:take(1)
end
--- Returns a new Observable that transform the items emitted by an Observable into Observables,
-- then flatten the emissions from those into a single Observable
-- @arg {function} callback - The function to transform values from the original Observable.
-- @returns {Observable}
function Observable:flatMap(callback)
callback = callback or util.identity
return self:map(callback):flatten()
end
--- Returns a new Observable that uses a callback to create Observables from the values produced by
-- the source, then produces values from the most recent of these Observables.
-- @arg {function=identity} callback - The function used to convert values to Observables.
-- @returns {Observable}
function Observable:flatMapLatest(callback)
callback = callback or util.identity
return Observable.create(function(observer)
local innerSubscription
local function onNext(...)
observer:onNext(...)
end
local function onError(e)
return observer:onError(e)
end
local function onCompleted()
return observer:onCompleted()
end
local function subscribeInner(...)
if innerSubscription then
innerSubscription:unsubscribe()
end
return util.tryWithObserver(observer, function(...)
innerSubscription = callback(...):subscribe(onNext, onError)
end, ...)
end
local subscription = self:subscribe(subscribeInner, onError, onCompleted)
return Subscription.create(function()
if innerSubscription then
innerSubscription:unsubscribe()
end
if subscription then
subscription:unsubscribe()
end
end)
end)
end
--- Returns a new Observable that subscribes to the Observables produced by the original and
-- produces their values.
-- @returns {Observable}
function Observable:flatten()
return Observable.create(function(observer)
local subscriptions = {}
local remaining = 1
local function onError(message)
return observer:onError(message)
end
local function onCompleted()
remaining = remaining - 1
if remaining == 0 then
return observer:onCompleted()
end
end
local function onNext(observable)
local function innerOnNext(...)
observer:onNext(...)
end
remaining = remaining + 1
local subscription = observable:subscribe(innerOnNext, onError, onCompleted)
subscriptions[#subscriptions + 1] = subscription
end
subscriptions[#subscriptions + 1] = self:subscribe(onNext, onError, onCompleted)
return Subscription.create(function ()
for i = 1, #subscriptions do
subscriptions[i]:unsubscribe()
end
end)
end)
end
--- Returns an Observable that terminates when the source terminates but does not produce any
-- elements.
-- @returns {Observable}