Skip to content
This repository has been archived by the owner on Sep 4, 2024. It is now read-only.

Latest commit

 

History

History
1822 lines (1265 loc) · 72.9 KB

IObservable.md

File metadata and controls

1822 lines (1265 loc) · 72.9 KB

com.industry.rx_epl.IObservable <>

IObservable is the interface returned by almost all RxEPL operators, this allows the operators to be chained in with a fluent syntax.

Observable.fromValues([1,2,3,4)
	.map(Lambda.function1("x => x * 10"))
	.sum()
	.subscribe(Subscriber.create().onNext(logValue));

Operators and Methods

Transforms

# .map(action<value: T1> returns T2) returns IObservable<T2> <>

Apply a function to each value and pass on the result.

action multiplyBy10(integer value) returns integer {
	return value * 10;
}

Observable.fromValues([1,2,3])
	.map(multiplyBy10)
	...
	
// Output: 10, 20, 30

See also FlatMap.

# .flatMap(action<value: T1> returns (sequence<T2> | IObservable<T2> | ISubject<T2>)) returns IObservable<T2> <>

Apply a mapping to each value that results in multiple values (or an Observable containing 1 or more values) and merge each item in the result into the output.

action valueAndPlus1(integer value) returns sequence<integer> {
	return [value, value + 1];
}

Observable.fromValues([1,3,5])
	.flatMap(valueAndPlus1)
	...

// Output: 1, 2, 3, 4, 5, 6

See also: Map.

# .pluck(fieldName: any) returns IObservable<T> <>

Select a particular field from the incoming value.

Note: The fieldName can be of any type (Eg. an integer for IObservable<sequence> or the key type for an IObservable<dictionary>)

event E {
	integer value;
}

Observable.fromValues([E(1), E(2), E(3), E(4)])
	.pluck("value")
	...

// Output: 1,2,3,4

# .scan(action<aggregate: T2, value: T1> returns T2) returns IObservable<T2> <>

Aggregate the data and emit the aggregated value for every incoming event.

Note: The first value is emitted without aggregation

action sum(integer currentSum, integer value) returns integer {
	return currentSum + value;
}

Observable.fromValues([1,2,3])
	.scan(sum)
	...

// Output: 1, 3, 6

See also: ScanWithInitial, Reduce

# .scanWithInitial(action<aggregate: T2, value: T1> returns T2, initialValue: T2) returns IObservable<T2> <>

Aggregate the data and emit the aggregated value for every incoming event. The initial value for the aggregation is supplied.

action sum(integer currentSum, integer value) returns integer {
	return currentSum + value;
}

Observable.fromValues([1,2,3])
	.scanWithInitial(sum, 5)
	...

// Output: 6, 8, 11

See also: Scan, ReduceWithInitial

# .groupBy(action<value: T1> returns T2) returns IObservable<IObservable<T1>> <>

Group the data into separate observables based on a calculated group identifier. These observables are sent on the resulting IObservable. The provided action takes a value and returns a group identifier.

event Fruit {
	string type;
	integer value;
}

action groupId(Fruit fruit) returns string {
	return fruit.type;
}

action sumValuesInGroup(IObservable group) returns IObservable {
	return group
		.reduceWithInitial(Lambda.function2("groupResult, fruit => [fruit.type, groupResult[1] + fruit.value]"), [<any> "", 0]);
}

Observable.fromValues([Fruit("apple", 1), Fruit("banana", 2), Fruit("apple", 3)])
	.groupBy(groupId)
	.flatMap(sumValuesInGroup)
	...

// Output: ["banana", 2], ["apple", 4]

See also: GroupByField, GroupByWindow

# .groupByField(fieldName: any) returns IObservable<IObservable<T>> <>

Group the data into separate observables based on a key field. These observables are sent on the resulting IObservable.

Note: The fieldName can be of any type (Eg. an integer for IObservable<sequence> or the key type for an IObservable<dictionary>)

event Fruit {
	string type;
	integer value;
}

action sumValuesInGroup(IObservable group) returns IObservable {
	return group
		.reduceWithInitial(Lambda.function2("groupResult, fruit => [fruit.type, groupResult[1] + fruit.value]"), ["", 0]);
}

Observable.fromValues([Fruit("apple", 1), Fruit("banana", 2), Fruit("apple", 3)])
	.groupByField("type")
	.flatMap(sumValuesInGroup)
	...

// Output: ["banana", 2], ["apple", 4]

See also: GroupBy, GroupByWindow

# .groupByWindow(trigger: IObservable) returns IObservable<IObservable<T>> <>

Partition each value into the current observable window. The observable windows are sent on the resulting IObservable. The provided trigger determines when a new window is created.

Note: The current window is completed when a new window is created.

action source(IResolver r) {
	listener l := on all wait(0.25) {
		r.next(1);
	}
	r.onUnsubscribe(l.quit);
}

action sumValuesInWindow(IObservable group) returns IObservable {
	return group.sumInteger();
}

Observable.create(source)
	.groupByWindow(Observable.interval(1.0))
	.flatMap(sumValuesInWindow)
	...

// Output: 3, 4, 4...

See also: Buffer, GroupBy

# .windowTime(seconds: float) returns IObservable<IObservable<T>> <>

Partition each value into the current observable window. The observable windows are sent on the resulting IObservable. A new window is created every t seconds (starting when the subscription begins).

Note: The current window is completed when a new window is created.

action source(IResolver r) {
	listener l := on all wait(0.25) {
		r.next(1);
	}
	r.onUnsubscribe(l.quit);
}

action sumValuesInWindow(IObservable group) returns IObservable {
	return group.sumInteger();
}

Observable.create(source)
	.windowTime(1.0)
	.flatMap(sumValuesInWindow)
	...

// Output: 3, 4, 4...

See also: BufferTime, GroupBy

# .windowCount(count: integer) returns IObservable<IObservable<T>> <>

Partition each value into the current observable window. The observable windows are sent on the resulting IObservable. A new window is created after every n values.

Note: The current window is completed when a new window is created.

action sumValuesInWindow(IObservable group) returns IObservable {
	return group.sumInteger();
}

Observable.fromValues([1,2,3,4,5,6])
	.windowCount(2)
	.flatMap(sumValuesInWindow)
	...

// Output: 3, 7, 11

See also: BufferCount, GroupBy

# .windowTimeOrCount(seconds: float, count: integer) returns IObservable<IObservable<T>> <>

Partition each value into the current observable window. The observable windows are sent on the resulting IObservable. A new window is created after every t seconds or n values (whichever comes first).

Note: The current window is completed when a new window is created.

See also: BufferTimeOrCount, GroupBy

# .buffer(trigger: IObservable) returns IObservable<sequence<any>> <>

Store each value in the current bucket, emitting the bucket (as a sequence<any>) when the trigger fires.

Note: The final bucket will be emitted on completion of the source. Unsubscribing will not trigger emission of the the current bucket.

Observable.interval(0.25) // Emits an incrementing integer every 250 millis
	.buffer(Observable.interval(1.0))
	...

// Output: [0,1,2,3], [4,5,6,7], [8,9,10,11]...

See also: GroupByWindow, GroupBy

# .bufferTime(seconds: float) returns IObservable<sequence<any>> <>

Store each value in the current bucket, emitting the bucket (as a sequence<any>) every t seconds.

Note: The final bucket will be emitted on completion of the source. Unsubscribing will not trigger emission of the the current bucket.

Observable.interval(0.25) // Emits an incrementing integer every 250 millis
	.bufferTime(1.0)
	...

// Output: [0,1,2,3], [4,5,6,7], [8,9,10,11]...

See also: WindowTime, GroupBy

# .bufferCount(count: integer) returns IObservable<sequence<any>> <>

Store each value in the current bucket, emitting the bucket (as a sequence<any>) every n values.

Note: The final bucket will be emitted on completion of the source. Unsubscribing will not trigger emission of the the current bucket.

Observable.fromValues([1,2,3,4,5,6])
	.bufferCount(2)
	...

// Output: [1,2], [3,4], [5,6]

See also: WindowCount, BufferCountSkip, Pairwise

# .bufferCountSkip(count: integer, skip: integer) returns IObservable<sequence<any>> <>

Store each value in the current bucket, emitting the bucket (as a sequence<any>) every count values. The bucket 'slides' such that, after filling for the first time, it emits every skip values with the last count items.

Note: The final bucket and all partial buckets (caused by the sliding) will be emitted on completion. Unsubscribing will not trigger emission of the the current bucket.

Note 2: The skip value can be greater than the count, this will cause a gap between buckets.

Observable.fromValues([1,2,3,4,5,6])
	.bufferCountSkip(3,1)
	...

// Output: [1,2,3], [2,3,4], [3,4,5], [4,5,6], [5,6], [6]

See also: BufferCount, Pairwise

# .bufferTimeOrCount(seconds: float, count: integer) returns IObservable<sequence<any>> <>

Store each value in the current bucket, emitting the bucket (as a sequence<any>) every count values or when the time seconds has elapsed (Whichever comes first).

Note: The final bucket will be emitted on completion of the source. Unsubscribing will not trigger emission of the the current bucket.

See also: BufferCount, BufferCountSkip

# .pairwise() returns IObservable<sequence<any>> <>

Emit every value and the previous value in a sequence<any>.

Note: If only 1 value is received then no values are emitted.

Observable.fromValues([1,2,3,4,5,6])
	.pairwise()
	...

// Output: [1,2], [2,3], [3,4], [4,5], [5,6]

See also: BufferCount, BufferCountSkip

# .sort(comparator: action<left: T, right: T> returns number) returns IObservable<T> <>

Sort the values by a comparator. The comparator takes 2 values from the observable and should produce a number to indicate which one is larger.

A positive number indicates that the right value should be later in the output, whereas a negative number indicates the left value.

Note: Uses the heapsort algorithm - there's no guaranteed ordering for equal values.

action comparator(integer left, integer right) returns integer {
	return right - left;
}

Observable.fromValues([4,1,3,2])
	.sort(comparator)
	...

// Output: 1,2,3,4

# .sortAsc() returns IObservable<T> <>

Sort the values by the standard > or < comparator. Numbers with different types will be coerced to to same type before comparison.

Note: Uses the heapsort algorithm - there's no guaranteed ordering for equal values

Observable.fromValues([4,1,3,2])
	.sortAsc()
	...

// Output: 1,2,3,4

# .sortDesc() returns IObservable<T> <>

Sort the values by the standard > or < comparator. Numbers with different types will be coerced to to same type before comparison.

Note: Uses the heapsort algorithm - there's no guaranteed ordering for equal values

Observable.fromValues([4,1,3,2])
	.sortDesc()
	...

// Output: 4,3,2,1

# .toSortedList(comparator: action<left: T, right: T> returns number) returns IObservable<sequence<T>> <>

Sort the values by a comparator. The comparator takes 2 values from the observable and should produce a number to indicate which one is larger. The sorted values are output as a sequence<any>.

A positive number indicates that the right value should be later in the output, whereas a negative number indicates the left value.

Note: Uses the heapsort algorithm - there's no guaranteed ordering for equal values.

action comparator(integer left, integer right) returns integer {
	return right - left;
}

Observable.fromValues([4,1,3,2])
	.toSortedList(comparator)
	...

// Output: [1,2,3,4]

# .toSortedListAsc() returns IObservable<sequence<T>> <>

Sort the values by the standard > or < comparator. Numbers with different types will be coerced to to same type before comparison. The sorted values are output as a sequence<any>.

Note: Uses the heapsort algorithm - there's no guaranteed ordering for equal values

Observable.fromValues([4,1,3,2])
	.toSortedListAsc()
	...

// Output: [1,2,3,4]

# .toSortedListDesc() returns IObservable<sequence<T>> <>

Sort the values by the standard > or < comparator. Numbers with different types will be coerced to to same type before comparison. The sorted values are output as a sequence<any>.

Note: Uses the heapsort algorithm - there's no guaranteed ordering for equal values

Observable.fromValues([4,1,3,2])
	.toSortedListDesc()
	...

// Output: [4,3,2,1]

Filters

# .filter(predicate: action<value: T> returns boolean) returns IObservable<T> <>

Filter the values by the provided predicate.

action greaterThan3(integer value) returns boolean {
	return value > 3;
}

Observable.fromValues([1,2,3,4,5,6])
	.filter(greaterThan3)
	...

// Output: 4, 5, 6

# .distinct() returns IObservable<T> <>

Remove all duplicate values.

Note: This requires storing every unique value and therefore care should be taken when used on long running observables. A safer alternative is DistinctUntilChanged

Observable.fromValues([1,2,1,2,3,2,3])
	.distinct()
	...

// Output: 1, 2, 3

See also: DistinctUntilChanged

# .distinctUntilChanged() returns IObservable<T> <>

Remove all back-to-back duplicates.

Observable.fromValues([1,1,2,2,1,2,3,3])
	.distinctUntil()
	...

// Output: 1, 2, 1, 2, 3

See also: Distinct

# .distinctBy(getKey: action<value: T1> returns T2) returns IObservable<T1> <>

Remove all duplicates.

The getKey action returns the value by which to measure uniqueness.

Note: This requires storing every unique value and therefore care should be taken when used on long running observables. A safer alternative is DistinctByUntilChanged

action getUniqueKey(integer value) returns integer {
	return value % 3;
}

Observable.fromValues([1,2,3,4,5,6])
	.distinctBy(getUniqueKey)
	...

// Output: 1, 2, 3

# .distinctByUntilChanged(getKey: action<value: T1> returns T2) returns IObservable<T1> <>

Remove all back-to-back duplicates.

The getKey action returns the value by which to measure uniqueness.

action getUniqueKey(integer value) returns integer {
	return value % 3;
}

Observable.fromValues([1,1,1,4,4,4,5,6,7,8])
	.distinctBy(getUniqueKey)
	...

// Output: 1,5,6,7,8

# .distinctByField(fieldName: any) returns IObservable<T> <>

Remove values with a duplicate fieldName value.

Note: This requires storing every unique value and therefore care should be taken when used on long running observables. A safer alternative is DistinctByFieldUntilChanged

Note: The fieldName can be of any type (Eg. an integer for IObservable<sequence> or the key type for an IObservable<dictionary>)

event E {
	integer value;
}

Observable.fromValues([E(1),E(2),E(1),E(2),E(3)])
	.distinctByField("value")
	...

// Output: E(1), E(2), E(3)

# .distinctByFieldUntilChanged(fieldName: any) returns IObservable<T> <>

Remove back-to-back values with a duplicate fieldName value.

Note: The fieldName can be of any type (Eg. an integer for IObservable<sequence> or the key type for an IObservable<dictionary>)

event E {
	integer value;
}

Observable.fromValues([E(1),E(1),E(2),E(1),E(3)])
	.distinctByFieldUntilChanged("value")
	...

// Output: E(1), E(2), E(1), E(3)

# .take(count: integer) returns IObservable<T> <>

Take only the first count items.

Observable.fromValues([1,2,3,4])
	.take(3)
	...

// Output: 1,2,3

# .first() returns IObservable<T> <>

Take only the first item.

Observable.fromValues([1,2,3,4])
	.first()
	...

// Output: 1

# .takeLast(count: integer) returns IObservable<T> <>

Take only the last count items.

Note: The Observable must complete otherwise you won't receive a value.

Observable.fromValues([1,2,3,4])
	.takeLast(3)
	...

// Output: 2,3,4

# .last() returns IObservable<T> <>

Take only the last item.

Note: The Observable must complete otherwise you won't receive a value.

Observable.fromValues([1,2,3,4])
	.last()
	...

// Output: 4

# .skip(count: integer) returns IObservable<T> <>

Skip the first count values.

Observable.fromValues([1,2,3,4])
	.skip(2)
	...

// Output: 3,4

# .skipLast(count: integer) returns IObservable<T> <>

Skip the last count values.

Observable.fromValues([1,2,3,4])
	.skipLast(2)
	...

// Output: 1,2

# .takeUntil(trigger: IObservable) returns IObservable<T> <>

Take values until the trigger receives a value.

Observable.interval(0.1) // Emits an incrementing integer every 100 millis
	.takeUntil(Observable.interval(1.0))
	...

// Output: 0,1,2,3,4,5,6,7,8,9

# .takeWhile(predicate: action<value: T> returns boolean) returns IObservable<T> <>

Take until the predicate for a received value returns false.

action isLessThan3(integer value) returns boolean {
	return values < 3;
}

Observable.fromValues([0,1,2,3,4])
	.takeWhile(isLessThan3)
	...

// Output: 0,1,2

# .skipUntil(trigger: IObservable) returns IObservable<T> <>

Skip values until the trigger receives a value.

Observable.interval(0.1) // Emits an incrementing integer every 100 millis
	.skipUntil(Observable.interval(1.0))
	...

// Output: 10,11,12,13...

# .skipWhile(predicate: action<value: T> returns boolean) returns IObservable<T> <>

Skip until the predicate for a received value returns false.

action isLessThan3(integer value) returns boolean {
	return values < 3;
}

Observable.fromValues([0,1,2,3,4])
	.skipWhile(isLessThan3)
	...

// Output: 3,4

# .debounce(seconds: float) returns IObservable<T> <>

Only emit values after there has been a t seconds gap between values.

Note: If there is never a t seconds gap in the data then a value will not be emitted.

action source(IResolver resolver) {
	on wait(0.0) { resolver.next(0); }
	on wait(0.1) { resolver.next(1); }
	on wait(0.2) { resolver.next(2); }
	on wait(1.3) { resolver.next(3); }
	on wait(1.4) { resolver.next(4); }
	on wait(3.0) { resolver.complete(); }
}

Observable.create(source)
	.debounce(1.0)
	...

// Output: 2,4

# .throttleFirst(seconds: float) returns IObservable<T> <>

When a value arrives emit it but then don't emit any more until t seconds have elapsed.

Note: the time windows starts when a value is received, and restarts when a value is received after the time has elapsed.

action source(IResolver resolver) {
	on wait(0.0) { resolver.next(0); }
	on wait(0.1) { resolver.next(1); }
	on wait(0.2) { resolver.next(2); }
	on wait(1.3) { resolver.next(3); }
	on wait(1.4) { resolver.next(4); }
	on wait(3.0) { resolver.complete(); }
}

Observable.create(source)
	.throttleFirst(1.0)
	...

// Output: 0, 3

# .throttleLast(seconds: float) returns IObservable<T> <>

When a value arrives start throttling (without sending a value). After t seconds have elapsed emit the last value received in the throttling period.

Note: the time windows starts when a value is received, and restarts when a value is received after the time has elapsed.

action source(IResolver resolver) {
	on wait(0.0) { resolver.next(0); }
	on wait(0.1) { resolver.next(1); }
	on wait(0.2) { resolver.next(2); }
	on wait(1.3) { resolver.next(3); }
	on wait(1.4) { resolver.next(4); }
	on wait(3.0) { resolver.complete(); }
}

Observable.create(source)
	.throttleLast(1.0)
	...

// Output: 2, 4

# .sample(trigger: IObservable) returns IObservable<T> <>

Take the most recently received value whenever the trigger fires.

Observable.interval(0.1) // Emits an incrementing integer every 100 millis
	.sample(Observable.interval(1.0))
	...

// Output: 9,19,29

# .sampleTime(seconds: float) returns IObservable<T> <>

Take the most recently received value every t seconds.

Observable.interval(0.1) // Emits an incrementing integer every 100 millis
	.sample(1.0)
	...

// Output: 9,19,29

# .sampleCount(count: integer) returns IObservable<T> <>

Take only every count value.

Observable.fromValues([0,1,2,3,4,5])
	.sampleCount(2)
	...

// Output: 1,3,5

# .sampleTimeOrCount(seconds: float, count: integer) returns IObservable<T> <>

Take a value whenever t seconds or count values have been received (Whichever comes first).

Note: The timer is reset after a value is emitted.

# .elementAt(index: integer) returns IObservable<T> <>

Take only the n'th index element.

Observable.fromValues([0,1,2,3])
	.elementAt(2)
	...

// Output: 2

Combiners

# .merge(other: sequence<IObservable<T>>) returns IObservable<T> <>

Merge the outputs of all of the provided other observables.

Observable.interval(0.1)
	.merge([Observable.interval(0.1)])
	...

// Output: 0,0,1,1,2,2,3,3...

# .mergeAll() returns IObservable<T> <>

Removes a layer of nesting from observables. Received values which are IObservables are merged into the output. Received values which are sequences/dictionaries have their values merged into the output.

Observable.fromValues([<any> Observable.interval(0.1), [1,2,3]])
	.mergeAll()
	...

// Output: 1,2,3,0,1,2,3,4...

# .withLatestFrom(other: sequence<IObservable<any>>, combiner: action<values: sequence<any>> returns any) returns IObservable<T> <>

Every time a value is received, take the latest values from the other observables and produce an output by running the combiner.

Note: The combiner takes the values in the same order as the observables are defined (starting with the main source observable).

action createSequenceString(sequence<any> values) returns any {
	sequence<string> strings := new sequence<string>;
	any value;
	for value in values {
		strings.append(value.valueToString());
	}
	return "[" + ",".join(strings) + "]";
}

Observable.interval(1.0)
	.withLatestFrom([Observable.interval(0.1)], createSequenceString)
	...

// Output: "[0,9]","[1,19]","[2,29]"...

# .withLatestFromToSequence(other: sequence<IObservable<any>>) returns IObservable<T> <>

Every time a value is received, take the latest values from the other observables and produce a sequence<any> containing the values from all.

Note: The resulting sequence contains the values in the same order as the observables are defined (starting with the main source observable).

Observable.interval(1.0)
	.withLatestFrom([Observable.interval(0.1)])
	...

// Output: [0,9], [1,19], [2,29]...

# .combineLatest(other: sequence<IObservable<any>>, combiner: action<values: sequence<any>> returns any) returns IObservable<T> <>

Every time a value is received, from either the source or the other observables, produce an output by running the combiner.

Note: The combiner takes the values in the same order as the observables are defined (starting with the main source observable).

action createSequenceString(sequence<any> values) returns any {
	sequence<string> strings := new sequence<string>;
	any value;
	for value in values {
		strings.append(value.valueToString());
	}
	return "[" + ",".join(strings) + "]";
}

Observable.interval(1.0)
	.combineLatest([Observable.interval(0.5)], createSequenceString)
	...

// Output: "[0,0]","[0,1]","[0,2]","[1,2]","[1,3]","[1,4]","[2,4]"...

# .combineLatestToSequence(other: sequence<IObservable<any>>) returns IObservable<T> <>

Every time a value is received, from either the source or the other observables, produce a sequence<any> containing the values from all.

Note: The resulting sequence contains the values in the same order as the observables are defined (starting with the main source observable).

Observable.interval(1.0)
	.combineLatestToSequence([Observable.interval(0.1)])
	...

// Output: [0,0],[0,1],[0,2],[1,2],[1,3],[1,4],[2,4]...

# .zip(other: sequence<IObservable<any>>, combiner: action<values: sequence<any>> returns any) returns IObservable<T> <>

Combine multiple observables by taking the n'th value from every observable, producing an output by running the combiner.

I.e. The first output is the result from the combiner running on the first value from every source.

Note: The combiner takes the values in the same order as the observables are defined (starting with the main source observable).

Note2: The result is terminated when any of the sources run out of values, but only after the output for those values has been generated.

Note3: This requires storing values until their counterpart in another observable is found, this could be expensive with long running observables.

action createSequenceString(sequence<any> values) returns any {
	sequence<string> strings := new sequence<string>;
	any value;
	for value in values {
		strings.append(value.valueToString());
	}
	return "[" + ",".join(strings) + "]";
}

Observable.interval(1.0)
	.zip([Observable.interval(0.5), Observable.fromValues(["a","b","c"])], createSequenceString)
	...

// Output: "[0,0,a]","[1,1,b]","[2,2,c]"

# .zipToSequence(other: sequence<IObservable<any>>) returns IObservable<T> <>

Combine multiple observables by taking the n'th value from every observable, producing a sequence<any> containing all of the n'th values.

I.e. The first output is a sequence<any> containing the first value from every source.

Note: The output sequence<any> contains the values in the same order as the observables are defined (starting with the main source observable).

Note2: The result is terminated when any of the sources run out of values, but only after the output for those values has been generated.

Note3: This requires storing values until their counterpart in another observable is found, this could be expensive with long running observables.

Observable.interval(1.0)
	.zipToSequence([Observable.interval(0.5), Observable.fromValues(["a","b","c"])])
	...

// Output: [0,0,a],[1,1,b],[2,2,c]

# .concat(other: sequence<IObservable<any>>) returns IObservable<T> <>

After the current observable completes, instead of completing, connect to the next source observable. This repeats until all sources have completed.

Note: This will potentially miss values if the other observables are "hot" (Miss values when not connected. Eg. Values from a channel or stream).

Observable.fromValues([1,2,3])
	.concat([Observable.fromValues([4,5,6])])
	...

// Output: 1,2,3,4,5,6

See also: Repeat

# .concatAll() returns IObservable<T2> <>

For a source that provides IObservables (or sequences) it will concatenate the contents. The next IObservable will be connected to after the previous one completes.

Note: This will potentially miss values if the subsequent observables are "hot" (Miss values when not connected. Eg. Values from a channel or stream).

Observable.fromValues([
		Observable.range(1,5).delay(3.0),
		Observable.fromValues([6,7,8])
	])
	.concatAll()
	...

// Output: 1,2,3,4,5,6,7,8

See also: MergeAll

# .startWith(startingValues: sequence<any>) returns IObservable<T> <>

Start the current observable with the values provided.

Observable.fromValues([1,2,3])
	.startWith([<any>4,5,6])
	...

// Output: 4,5,6,1,2,3

# .switchMap(mapper: action<T1> returns IObservable<T2>) returns IObservable<T2> <>

Run a mapping function on every value, the result of which is an IObservable. This observable will be the source of output values until the next observable is provided by the mapping function.

Note: This will complete after the last produced observable completes.

action toRange0ToN(integer n) returns IObservable {
	return Observable.range(0, n);
}

Observable.fromValues([1,2,3,4])
	.switchMap(toRange0ToN)
	...

// Output: 0,1,0,1,2,0,1,2,3,0,1,2,3,4

# .switchOnNext() returns IObservable<any> <>

Takes every provided observable and connects to it until another is received, at which point it switches to the new one.

The source must be an IObservable<IObservable<any>>.

Note: This will complete after the last produced observable completes.

Observable.fromValues([Observable.range(0,3), Observable.range(4,6)])
	.switchOnNext()
	...

// Output: 0,1,2,3,4,5,6

Error Handling

# .catchError(substitute: IObservable<T>) returns IObservable<T> <>

Switch to an alternative data source in the event of an error.

action someActionThatThrows(integer value) returns integer {
	throw com.apama.exceptions.Exception("Ahhhh", "RuntimeException");
}

Observable.fromValues([1,2,3,4])
	.map(someActionThatThrows)
	.catchError(Observable.fromValues(5,6,7,8))
	...

// Output: 5,6,7,8

# .retry(attempts: integer) returns IObservable<T> <>

Reconnect to the datasource in the event of an error. At most attempts times.

action sometimesThrows(integer value) returns integer {
	if (5).rand() = 0 {
		throw com.apama.exceptions.Exception("Ahhhh", "RuntimeException");
	}
	return value;
}

Observable.fromValues([1,2,3,4])
	.map(sometimesThrows)
	.retry(3)
	...

// Example Output: 1,1,2,3,1,2,3,4
// Example Output: 1,1,2,1,2,1, Exception("Ahhhh", "RuntimeException")

Utils

# .subscribe(Subscriber) returns ISubscription <>

Connect to the source observable, and register listeners for values, errors and completion.

The returned ISubscription can be used to manually terminate the subscription.

action logValue(any value) {
	log value.valueToString();
}

action logError(com.apama.exceptions.Exception e) {
	log e.toString();
}

action logComplete() {
	log "Done!";
}

Observable.fromValues([1,2,3])
	.subscribe(Subscriber.create().onNext(logValue).onError(logError).onComplete(logComplete));

// Output: 1,2,3,Done!

# .subscribeOn(Subscriber, context) returns ISubscription <>
# .subscribeOnNew(Subscriber) returns ISubscription <>

Connect to the source observable on a different context, and register listeners for values, errors and completion.

The returned ISubscription can be used to manually terminate the subscription.

Important Note: This will not recommended for complicated observables! Instead create the observable on a spawned context, or use ObserveOn.

action logValue(any value) {
	log value.valueToString();
}

action logComplete() {
	log "Done!";
}

Observable.fromValues([1,2,3])
	.subscribeOn(Subscriber.create().onNext(logValue).onComplete(logComplete), context("Context2"));

// Output (on Context2): 1,2,3,Done!

A better alternative:

action createAndRunObservable() {
	ISubscription s := Observable.fromValues([1,2,3])
		.subscribe(Subscriber.create().onNext(logValue).onComplete(logComplete))
}

spawn createAndRunObservable() to context("Context2");

# .do(Subscriber) returns IObservable<T> <>

Snoops the output of an observable (at the point where the do is added), registering listeners for values, errors and completion, without subscribing to the observable.

This is very useful for debugging.

action logValue(integer value) {
	log value.toString();
}

action logDone() {
	log "Done!";
}

Observable.fromValues([1,2,3,4])
	.do(Subscriber.create().onNext(logValue).onComplete(logDone))
	...

// Do Output: 1,2,3,4,Done!
// Output: 1,2,3,4

See also: Subscribe

# .delay(seconds:float) returns IObservable<T> <>

Delays values and completion by a number of seconds.

Note: This will also make every value asynchronous.

Observable.fromValues([1,2,3])
	.delay(1.0)
	...

// Output (after 1 second): 1,2,3

# .async() returns IObservable<T> <>

Make every value asynchronous.

Observable.fromValues([1,2,3])
	.async()
	...

// Output (Async): 1,2,3

# .observeOn(action<source: IObservable, dispose: action<>> , context) returns IDisposable <>
# .observeOnNew(action<source: IObservable, dispose: action<>>) returns IDisposable <>

Continue processing the observable on a different context.

The dispose action terminates terminates the cross-context communication. The cross-context communication can be terminated from either side by calling either the provided dispose action or by calling the .dispose() method of the returned IDisposable.

Important Note: It is important to terminate the cross-context communication to avoid a memory leak.

action doOnDifferentContext(IObservable source, action<> dispose) {
	ISubscription s := source
		.map(...)
		.reduce(...)
		.subscribe(...);
	
	s.onUnsubscribe(dispose); // We'll never reconnect so, to avoid a memory leak, we notify the original source of the data that we are done
}

IDisposable d := Observable.fromValues([1,2,3,4])
	.observeOn(doOnDifferentContext, context("Context2"));

# .toChannel(channelName: string) returns IDisposable <>

Send every value to a channel. Primitive values are wrapped inside a WrappedAny so that they can be sent.

The resulting IDisposable can be used to manually terminate the observable.

Note: Errors are thrown when received, completion causes the observable to terminate.

IDisposable d := Observable.fromValue([1,2,3,E(4)])
	.toChannel("OutputChannel")

// Output on "OutputChannel": WrappedAny(1),WrappedAny(2),WrappedAny(3),E(4)

# .toStream() returns DisposableStream<any> <>

Output every value into a stream.

The resulting DisposableStream should be used to manually terminate the stream (rather than the normal .quit()), the stream will automatically terminate if the source completes.

DisposableStream d := Observable.fromValues([1,2,3,4])
	.toStream();

stream<any> strm := d.getStream();

d.dispose(); // Use instead of strm.quit();

# .timestamp() returns IObservable<TimestampedValue<T>> <>

Give every value a timestamp as it arrives at the operator.

Note: Uses currentTime which, by default, has only 100 millisecond precision.

Observable.interval(1.0)
	.timestamp()
	...

// Output: TimestampedValue(0, 1.0), TimestampedValue(1, 2.0), TimestampedValue(2, 3.0)... 

# .updateTimestamp() returns IObservable<TimestampedValue<T>> <>

Update the timestamp on every item as it arrives at the operator.

Un-timestamped values will be timestamped.

Note: Uses currentTime which, by default, has only 100 millisecond precision.

Observable.interval(1.0)
	.timestamp()
	.delay(1.0)
	.updateTimestamp()
	...

// Output: TimestampedValue(0, 2.0), TimestampedValue(1, 3.0), TimestampedValue(2, 4.0)... 

# .timeInterval() returns IObservable<TimeInterval<T>> <>

Emit the time between events, as a TimeInterval. Emitting the time between subscription and receiving the value for the first value.

Note: Uses currentTime which, by default, has only 100 millisecond precision.

Observable.interval(1.0)
	.timeInterval()
	...

// Output: TimeInterval(1.0,0), TimeInterval(1.0,1), TimeInterval(1.0,2)...

# .pausable(trigger: IObservable) returns IObservable<T> <>

Enable and disable receiving values based on the trigger. Every time a value is received on the trigger observable the gate flips open or closed. The gate starts closed, the first value received opens the gate.

Values received while the gate is closed are discarded. To instead buffer them, use PausableBuffered.

Observable.interval(1.0)
	.pausable(Observable.interval(2.0))
	...
// Output: 1, 2, 5, 6, 9

# .pausableBuffered(trigger: IObservable) returns IObservable<T> <>

Enable and disable buffering of values based on the trigger. Every time a value is received on the trigger observable the gate flips open or closed. Values received while the gate is closed are buffered and emitted as soon as the gate reopens. The gate starts closed, the first value received opens the gate.

Observable.interval(1.0)
	.pausableBuffered(Observable.interval(2.0))
	...
// Output: 1,2,3,4

# .let(operator: Pipeable) returns IObservable<any> <>

Add a single operator to the observable. Can be used by extension libraries or for custom operators.

Note: Pipeable = action<action<IObserver> returns ISubscription> returns action<IObserver> returns ISubscription

using com.industry.rx_epl.operators.Sum;

Observable.fromValues([1,2,3,4])
	.let(Sum.create())
	...

// Output: 10

# .pipe(operators: sequence<Pipeable>) returns IObservable<any> <>

Add a multiple operators to the observable. Can be used by extension libraries or for custom operators.

Note: Pipeable = action<action<IObserver> returns ISubscription> returns action<IObserver> returns ISubscription

using com.industry.rx_epl.operators.Map;
using com.industry.rx_epl.operators.Sum;

action multiplyBy10(integer value) returns integer {
	return value * 10;
}

Observable.fromValues([1,2,3,4])
	.pipe([Map.create(multiplyBy10), Sum.create()])
	...

// Output: 100

# .pipeOn(operators: sequence<Pipeable>, context) returns IObservable<any> <>
# .pipeOnNew(operators: sequence<Pipeable>) returns IObservable<any> <>

Add a multiple operators to the observable and run those operators on a different context. Useful to allow slow operators to run without blocking the main thread. The results are returned to the original context.

Note: Pipeable = action<action<IObserver> returns ISubscription> returns action<IObserver> returns ISubscription

using com.industry.rx_epl.operators.Map;
using com.industry.rx_epl.operators.Sum;

action multiplyBy10(integer value) returns integer {
	return value * 10;
}

Observable.fromValues([1,2,3,4])
	.pipeOn([Map.create(multiplyBy10), Sum.create()], context("Context2"))
	...

// Output: 100

See also: ObserveOn

# .complexPipe(operator: action<IObservable<T1>> returns IObservable<T2>) returns IObservable<T2> <>

Add a series of custom operations to the observable.

action customOperator(IObservable source) returns IObservable {
	return source.withLatestFromToSequence([source.skipLast(1)]); // Similar to pairwise...
}

Observable.interval(1.0)
	.complexPipe(customOperator)
	...

// Output: [1,0], [2,1], [3,2]

# .complexPipeOn(operator: action<IObservable<T1>> returns IObservable<T2>, context) returns IObservable<T2> <>
# .complexPipeOnNew(operator: action<IObservable<T1>> returns IObservable<T2>) returns IObservable<T2> <>

Add a series of custom operations to the observable and run those operations on a different context. Useful to allow slow operators to run without blocking the main thread. The results are returned to the original context.

action customOperator(IObservable source) returns IObservable {
	return source.withLatestFromToSequence([source.skipLast(1)]); // Similar to pairwise...
}

Observable.interval(1.0)
	.complexPipeOn(customOperator, context("Context2"))
	...

// Output: [1,0], [2,1], [3,2]

# .decouple() returns IObservable<T> <>

Separate the resulting observable from the source observable.

Uses: If the subscription uses SubscribeOn but the source is a shared observable you may need to decouple to get the expected results. (See: Gotcha)

Important Note: A similar result can be achieved with ObserveOn and this is advised.

# .getSync() returns any <>

Get the value from a synchronous observable. An empty any will be returned if the observable does not return a value immediately.

Note: If multiple values are received then the last synchronous value will be returned.

integer value := <integer> Observable.fromValues([1,2,3,4])
	.sumInteger()
	.getSync();

// value = 10

# .getSyncOr(default: any) returns any <>

Get the value from a synchronous observable. The default value will be returned if the observable does not return a value immediately.

Note: If multiple values are received then the last synchronous value will be returned.

integer value := <integer> Observable.just(10)
	.getSyncOr(3);
	
// value = 10

integer value2 := <integer> Observable.timer(10, 1.0)
	.getSyncOr(3);
	
// value2 = 3

# .repeat(count: integer) returns IObservable<T> <>

Repeat an observable, by reconnecting to the source count-1 times.

Observable.fromValues([1,2,3,4])
	.repeat(2)
	...

// Output: 1,2,3,4,1,2,3,4

# .publish() returns IObservable<T> <>

Publish allows a single observable subscription to be shared among various other subscribers. Delaying upstream subscription until .connect() is called.

This is explained further in the User Guide.

IObservable o := Observable.interval(1.0).publish();

ISubscription s := o.subscribe(Subscriber.create().onNext(printValue));
// Output (When connect is called): 0, 1, 2, 3...

on wait(2.0) {
    ISubscription s2 := o.subscribe(Subscriber.create().onNext(printValue));
    // Output (When connect is called): 0, 1, 2, 3...

    IDisposable d := o.connect();
}

# .connect() returns IDisposable <>

Connect to a published observable, causing it to start emitting values.

The returned IDisposable can be used to disconnect, terminating the connected observables (without completion).

See also: RefCount

# .publishReplay(count: integer) returns IObservable<T> <>

Similar to Publish, except it stores up to count values received after .connect() is called, replaying the latest to any late subscriptions.

Note: Values are replayed to new subscriptions even if the source completes. To empty the replay cache dispose of the IDisposable returned from .connect().

IObservable o := Observable.interval(1.0).publishReplay(2);

ISubscription s := o.subscribe(Subscriber.create().onNext(printValue));
// Output (When connect is called): 0, 1, 2, 3...
IDisposable d := o.connect();

on wait(2.0) {
    ISubscription s2 := o.subscribe(Subscriber.create().onNext(printValue));
    // Output: 0, 1, 2, 3...
    // Would have missed 0,1 if .publish() had been used
}

# .refCount() returns IObservable<T> <>

Automatically .connect() and disconnect from the upstream observable after the first subscription and last unsubscription.

IObservable o := Observable.interval(1.0).publish().refCount(); // can be replaced with .share()

ISubscription s := o.subscribe(Subscriber.create().onNext(printValue));
// Output: 0, 1, 2, 3...

on wait(2.0) {
    ISubscription s2 := o.subscribe(Subscriber.create().onNext(printValue));
    // Output: 2, 3...
}

# .share() returns IObservable<T> <>

Shorthand for .publish().refCount()

# .shareReplay() returns IObservable<T> <>

Shorthand for .publishReplay().refCount()

# .observeToChannel(channel: string) returns IDisposable <>

ObserveToChannel and ObserveFromChannel are useful for sending data between different monitor instances which may or may not be on different contexts.

Note: .dispose() should be called on the returned IDisposable when all subscribers are finished to avoid a memory leak.

// Ideally should dispose of this when all subscribers are finished (if ever)
IDisposable d := Observable.interval(1.0).observeToChannel("channelName");

// This could be in a different monitor
ISubscription s := Observable.observeFromChannel("channelName")
        .subscribe(Subscriber.create().onNext(printValue));
        
// Output: 0, 1, 2, 3

# .ignoreElements() returns IObservable<T> <>

Ignore all of the values from the source but pass on termination events (Error or Completion).

Observable.interval(1.0).take(5)
	.ignoreElements()
	.subscribe(Subscriber.create().onNext(logValue).onComplete(logComplete))
	
// Output (5 seconds later): Complete!

Conditional

# .contains(predicate: action<value: T> returns boolean) returns IObservable<boolean> <>

Check if at least one value matches the predicate.

Note: An empty observable returns false.

action greaterThan3(integer value) returns boolean {
	return value > 3;
}

Observable.fromValues([1,2,3,4])
	.contains(greaterThan3)
	...

// Output: true

# .every(predicate: action<value: T> returns boolean) returns IObservable<boolean> <>

Check if every value matches the predicate.

Note: An empty observable returns true.

action lessThan3(integer value) returns boolean {
	return value < 3;
}

Observable.fromValues([1,2,3,4])
	.every(lessThan3)
	...

// Output: false

# .sequenceEqual(others: sequence<IObservable<T>>) returns IObservable<boolean> <>

Check if all of the provided observables contain the same values, in the same order.

Observable.interval(1.0).take(4)
	.sequenceEqual([Observable.fromValues([0,1,2,3]), Observable.range(0,3)])
	...

// Output: true

# .isEmpty() returns IObservable<boolean> <>

Check if the observable completes without emitting any values.

Note: It will return false immediately if any values are received.

Observable.fromValues([1,2,3])
	.isEmpty()
	...

// Output: false

Observable.empty()
	.isEmpty()
	...

// Output: true

# .amb(others: sequence<IObservable<T>>) returns IObservable<T> <>
# .race(others: sequence<IObservable<T>>) returns IObservable<T> <>

Race the observables, the one which provides values first provides all of the values.

Note: Amb and Race are aliases.

Observable.timer("I lost", 5.0)
	.amb([Observable.timer("I won!", 1.0)])
	...

// Output: "I won!"

# .defaultIfEmpty(value: T) returns IObservable<T> <>

Emit a default value if the source completes without emitting any values.

Observable.empty()
	.defaultIfEmpty("A default value")
	...

// Output: "A default value"

# .switchIfEmpty(alternative: IObservable<T>) returns IObservable<T> <>

Switch to a different observable source if the main source completes without emitting any values.

Observable.empty()
	.switchIfEmpty(Observable.fromValues([1,2,3]))
	...

// Output: 1,2,3

Math and Aggregation

# .reduce(action<aggregate: T2, value: T1> returns T2) returns IObservable<T2> <>

Aggregate the data, emitting the aggregated value on completion.

Note: The first value is used as the starting value for the aggregation.

action sum(integer currentSum, integer value) returns integer {
	return currentSum + value;
}

Observable.fromValues([1,2,3])
	.reduce(sum)
	...

// Output: 6

See also: Scan

# .reduceWithInitial(action<aggregate: T2, value: T1> returns T2, initialValue: T2) returns IObservable<T2> <>

Aggregate the data, emitting the aggregated value on completion.. The initial value for the aggregation is supplied.

action sum(integer currentSum, integer value) returns integer {
	return currentSum + value;
}

Observable.fromValues([1,2,3])
	.reduceWithInitial(sum, 5)
	...

// Output: 11

See also: ScanWithInitial

# .count() returns IObservable<integer> <>

Count the number of items emitted by the source.

Note: This is final count, not a rolling count.

Observable.fromValues([1,2,3,4])
	.count()
	...

// Output: 4

# .sum() returns IObservable<float> <>
# .sumInteger() returns IObservable<integer> <>
# .sumFloat() returns IObservable<float> <>
# .sumDecimal() returns IObservable<decimal> <>

Sum the provided values.

Note: .sum() will work for any numeric type (returning a float), whereas .sumInteger(), .sumFloat(), .sumDecimal() will only operate on their respective types.

Observable.fromValues([1,2,3,4])
	.sum()
	...

// Output: 10.0

# .concatString() returns IObservable<string> <>

Concatenate the .valueToString() of every value.

Observable.fromValues([<any> "Hello ", "World", "! ", 1, 2, 3])
	.concatString()
	...

// Output: "Hello World! 123"

# .max() returns IObservable<float> <>
# .maxInteger() returns IObservable<integer> <>
# .maxFloat() returns IObservable<float> <>
# .maxDecimal() returns IObservable<decimal> <>

Find the largest value.

Note: .max() will work for any numeric type (returning a float), whereas .maxInteger(), .maxFloat(), .maxDecimal() will only operate on their respective types.

Observable.fromValues([1,2,3,4])
	.max()
	...

// Output: 4.0

# .min() returns IObservable<float> <>
# .minInteger() returns IObservable<integer> <>
# .minFloat() returns IObservable<float> <>
# .minDecimal() returns IObservable<decimal> <>

Find the smallest value.

Note: .min() will work for any numeric type (returning a float), whereas .minInteger(), .minFloat(), .minDecimal() will only operate on their respective types.

Observable.fromValues([1,2,3,4])
	.min()
	...

// Output: 1.0

# .average() returns IObservable<float> <>
# .averageDecimal() returns IObservable<decimal> <>

Find the arithmetic mean.

Note: Both .average() and .averageDecimal() will operate on numbers of any type, they differ on output type only.

Observable.fromValues([1,2,3,4])
	.average()
	...

// Output: 2.5