diff --git a/MIGRATION.md b/MIGRATION.md index 311deb9af8..b01487527e 100644 --- a/MIGRATION.md +++ b/MIGRATION.md @@ -110,7 +110,6 @@ enabling "composite" subscription behavior. |`shareValue`|No longer implemented| |`singleInstance`|`share`| |`skipLastWithTime`|No longer implemented| -|`skipLast`|No longer implemented| |`skipUntilWithTime`|No longer implemented| |`slice(start, end)`|`skip(start).take(end - start)`| |`some`|`first(fn, () => true, false)`| diff --git a/doc/decision-tree-widget/tree.yml b/doc/decision-tree-widget/tree.yml index 1b49884565..7bd1485a38 100644 --- a/doc/decision-tree-widget/tree.yml +++ b/doc/decision-tree-widget/tree.yml @@ -59,6 +59,9 @@ children: - label: based on custom logic children: - label: skipWhile + - label: from the end of the Observable + children: + - label: skipLast - label: until another Observable emits a value children: - label: skipUntil diff --git a/doc/operators.md b/doc/operators.md index f6e9b6291b..9a2fd27baa 100644 --- a/doc/operators.md +++ b/doc/operators.md @@ -178,6 +178,7 @@ There are operators for different purposes, and they may be categorized as: crea - [`sampleTime`](../class/es6/Observable.js~Observable.html#instance-method-sampleTime) - [`single`](../class/es6/Observable.js~Observable.html#instance-method-single) - [`skip`](../class/es6/Observable.js~Observable.html#instance-method-skip) +- [`skipLast`](../class/es6/Observable.js~Observable.html#instance-method-skipLast) - [`skipUntil`](../class/es6/Observable.js~Observable.html#instance-method-skipUntil) - [`skipWhile`](../class/es6/Observable.js~Observable.html#instance-method-skipWhile) - [`take`](../class/es6/Observable.js~Observable.html#instance-method-take) diff --git a/perf/micro/current-thread-scheduler/operators/skiplast.js b/perf/micro/current-thread-scheduler/operators/skiplast.js new file mode 100644 index 0000000000..a885498e17 --- /dev/null +++ b/perf/micro/current-thread-scheduler/operators/skiplast.js @@ -0,0 +1,18 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldSkipLastWithImmediateScheduler = RxOld.Observable.range(0, 500, RxOld.Scheduler.currentThread).skipLast(50); + var newSkipLastWithImmediateScheduler = RxNew.Observable.range(0, 500, RxNew.Scheduler.queue).skipLast(50); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old skipLast with current thread scheduler', function () { + oldSkipLastWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new skipLast with current thread scheduler', function () { + newSkipLastWithImmediateScheduler.subscribe(_next, _error, _complete); + }); +}; diff --git a/perf/micro/immediate-scheduler/operators/skiplast.js b/perf/micro/immediate-scheduler/operators/skiplast.js new file mode 100644 index 0000000000..390f076561 --- /dev/null +++ b/perf/micro/immediate-scheduler/operators/skiplast.js @@ -0,0 +1,18 @@ +var RxOld = require('rx'); +var RxNew = require('../../../../index'); + +module.exports = function (suite) { + var oldSkipLastWithImmediateScheduler = RxOld.Observable.range(0, 500, RxOld.Scheduler.immediate).skipLast(50); + var newSkipLastWithImmediateScheduler = RxNew.Observable.range(0, 500).skipLast(50); + + function _next(x) { } + function _error(e) { } + function _complete() { } + return suite + .add('old skipLast with immediate scheduler', function () { + oldSkipLastWithImmediateScheduler.subscribe(_next, _error, _complete); + }) + .add('new skipLast with immediate scheduler', function () { + newSkipLastWithImmediateScheduler.subscribe(_next, _error, _complete); + }); +}; diff --git a/spec/operators/skipLast-spec.ts b/spec/operators/skipLast-spec.ts new file mode 100644 index 0000000000..e3688b71ec --- /dev/null +++ b/spec/operators/skipLast-spec.ts @@ -0,0 +1,155 @@ +import {expect} from 'chai'; +import * as Rx from '../../dist/cjs/Rx'; +declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions}; + +const Observable = Rx.Observable; + +/** @test {takeLast} */ +describe('Observable.prototype.skipLast', () => { + asDiagram('skipLast(2)')('should skip two values of an observable with many values', () => { + const e1 = cold('--a-----b----c---d--|'); + const e1subs = '^ !'; + const expected = '-------------a---b--|'; + + expectObservable(e1.skipLast(2)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should skip last three values', () => { + const e1 = cold('--a-----b----c---d--|'); + const e1subs = '^ !'; + const expected = '-----------------a--|'; + + expectObservable(e1.skipLast(3)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should skip all values when trying to take larger then source', () => { + const e1 = cold('--a-----b----c---d--|'); + const e1subs = '^ !'; + const expected = '--------------------|'; + + expectObservable(e1.skipLast(5)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should skip all element when try to take exact', () => { + const e1 = cold('--a-----b----c---d--|'); + const e1subs = '^ !'; + const expected = '--------------------|'; + + expectObservable(e1.skipLast(4)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should not skip any values', () => { + const e1 = cold('--a-----b----c---d--|'); + const e1subs = '^ !'; + const expected = '--a-----b----c---d--|'; + + expectObservable(e1.skipLast(0)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should work with empty', () => { + const e1 = cold('|'); + const e1subs = '(^!)'; + const expected = '|'; + + expectObservable(e1.skipLast(42)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should go on forever on never', () => { + const e1 = cold('-'); + const e1subs = '^'; + const expected = '-'; + + expectObservable(e1.skipLast(42)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should skip one value from an observable with one value', () => { + const e1 = hot('---(a|)'); + const e1subs = '^ ! '; + const expected = '---| '; + + expectObservable(e1.skipLast(1)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should skip one value from an observable with many values', () => { + const e1 = hot('--a--^--b----c---d--|'); + const e1subs = '^ !'; + const expected = '--------b---c--|'; + + expectObservable(e1.skipLast(1)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should work with empty and early emission', () => { + const e1 = hot('--a--^----|'); + const e1subs = '^ !'; + const expected = '-----|'; + + expectObservable(e1.skipLast(42)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should propagate error from the source observable', () => { + const e1 = hot('---^---#', null, 'too bad'); + const e1subs = '^ !'; + const expected = '----#'; + + expectObservable(e1.skipLast(42)).toBe(expected, null, 'too bad'); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should propagate error from an observable with values', () => { + const e1 = hot('---^--a--b--#'); + const e1subs = '^ !'; + const expected = '---------#'; + + expectObservable(e1.skipLast(42)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should allow unsubscribing explicitly and early', () => { + const e1 = hot('---^--a--b-----c--d--e--|'); + const unsub = ' ! '; + const e1subs = '^ ! '; + const expected = '---------- '; + + expectObservable(e1.skipLast(42), unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should work with throw', () => { + const e1 = cold('#'); + const e1subs = '(^!)'; + const expected = '#'; + + expectObservable(e1.skipLast(42)).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); + + it('should throw if total is less than zero', () => { + expect(() => { Observable.range(0, 10).skipLast(-1); }) + .to.throw(Rx.ArgumentOutOfRangeError); + }); + + it('should not break unsubscription chain when unsubscribed explicitly', () => { + const e1 = hot('---^--a--b-----c--d--e--|'); + const unsub = ' ! '; + const e1subs = '^ ! '; + const expected = '---------- '; + + const result = e1 + .mergeMap((x: string) => Observable.of(x)) + .skipLast(42) + .mergeMap((x: string) => Observable.of(x)); + + expectObservable(result, unsub).toBe(expected); + expectSubscriptions(e1.subscriptions).toBe(e1subs); + }); +}); \ No newline at end of file diff --git a/src/Rx.ts b/src/Rx.ts index 3bebef958c..709612afc2 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -112,6 +112,7 @@ import './add/operator/sequenceEqual'; import './add/operator/share'; import './add/operator/single'; import './add/operator/skip'; +import './add/operator/skipLast'; import './add/operator/skipUntil'; import './add/operator/skipWhile'; import './add/operator/startWith'; diff --git a/src/add/operator/skipLast.ts b/src/add/operator/skipLast.ts new file mode 100644 index 0000000000..31484fe523 --- /dev/null +++ b/src/add/operator/skipLast.ts @@ -0,0 +1,10 @@ +import { Observable } from '../../Observable'; +import { skipLast } from '../../operator/skipLast'; + +Observable.prototype.skipLast = skipLast; + +declare module '../../Observable' { + interface Observable { + skipLast: typeof skipLast; + } +} \ No newline at end of file diff --git a/src/operator/skipLast.ts b/src/operator/skipLast.ts new file mode 100644 index 0000000000..a4dfe5ed35 --- /dev/null +++ b/src/operator/skipLast.ts @@ -0,0 +1,90 @@ +import { Operator } from '../Operator'; +import { Subscriber } from '../Subscriber'; +import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError'; +import { Observable } from '../Observable'; +import { TeardownLogic } from '../Subscription'; + +/** + * Skip the last `count` values emitted by the source Observable. + * + * + * + * `skipLast` returns an Observable that accumulates a queue with a length + * enough to store the first `count` values. As more values are received, + * values are taken from the front of the queue and produced on the result + * sequence. This causes values to be delayed. + * + * @example Skip the last 2 values of an Observable with many values + * var many = Rx.Observable.range(1, 5); + * var skipLastTwo = many.skipLast(2); + * skipLastTwo.subscribe(x => console.log(x)); + * + * // Results in: + * // 1 2 3 + * + * @see {@link skip} + * @see {@link skipUntil} + * @see {@link skipWhile} + * @see {@link take} + * + * @throws {ArgumentOutOfRangeError} When using `skipLast(i)`, it throws + * ArgumentOutOrRangeError if `i < 0`. + * + * @param {number} count Number of elements to skip from the end of the source Observable. + * @returns {Observable} An Observable that skips the last count values + * emitted by the source Observable. + * @method skipLast + * @owner Observable + */ +export function skipLast(this: Observable, count: number): Observable { + return this.lift(new SkipLastOperator(count)); +} + +class SkipLastOperator implements Operator { + constructor(private _skipCount: number) { + if (this._skipCount < 0) { + throw new ArgumentOutOfRangeError; + } + } + + call(subscriber: Subscriber, source: any): TeardownLogic { + if (this._skipCount === 0) { + // If we don't want to skip any values then just subscribe + // to Subscriber without any further logic. + return source.subscribe(new Subscriber(subscriber)); + } else { + return source.subscribe(new SkipLastSubscriber(subscriber, this._skipCount)); + } + } +} + +/** + * We need this JSDoc comment for affecting ESDoc. + * @ignore + * @extends {Ignored} + */ +class SkipLastSubscriber extends Subscriber { + private _ring: T[]; + private _count: number = 0; + + constructor(destination: Subscriber, private _skipCount: number) { + super(destination); + this._ring = new Array(_skipCount); + } + + protected _next(value: T): void { + const skipCount = this._skipCount; + const count = this._count++; + + if (count < skipCount) { + this._ring[count] = value; + } else { + const currentIndex = count % skipCount; + const ring = this._ring; + const oldValue = ring[currentIndex]; + + ring[currentIndex] = value; + this.destination.next(oldValue); + } + } +} \ No newline at end of file