Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(skipLast): add skipLast operator #2316

Merged
merged 4 commits into from
May 9, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ enabling "composite" subscription behavior.
|`shareValue`|No longer implemented|
|`singleInstance`|`share`|
|`skipLastWithTime`|No longer implemented|
|`skipLast`|No longer implemented|
|`skipUntilWithTime`|No longer implemented|
|`slice(start, end)`|`skip(start).take(end - start)`|
|`some`|`first(fn, () => true, false)`|
Expand Down
3 changes: 3 additions & 0 deletions doc/decision-tree-widget/tree.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ children:
- label: based on custom logic
children:
- label: skipWhile
- label: from the end of the Observable
children:
- label: skipLast
- label: until another Observable emits a value
children:
- label: skipUntil
Expand Down
1 change: 1 addition & 0 deletions doc/operators.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ There are operators for different purposes, and they may be categorized as: crea
- [`sampleTime`](../class/es6/Observable.js~Observable.html#instance-method-sampleTime)
- [`single`](../class/es6/Observable.js~Observable.html#instance-method-single)
- [`skip`](../class/es6/Observable.js~Observable.html#instance-method-skip)
- [`skipLast`](../class/es6/Observable.js~Observable.html#instance-method-skipLast)
- [`skipUntil`](../class/es6/Observable.js~Observable.html#instance-method-skipUntil)
- [`skipWhile`](../class/es6/Observable.js~Observable.html#instance-method-skipWhile)
- [`take`](../class/es6/Observable.js~Observable.html#instance-method-take)
Expand Down
18 changes: 18 additions & 0 deletions perf/micro/current-thread-scheduler/operators/skiplast.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldSkipLastWithImmediateScheduler = RxOld.Observable.range(0, 500, RxOld.Scheduler.currentThread).skipLast(50);
var newSkipLastWithImmediateScheduler = RxNew.Observable.range(0, 500, RxNew.Scheduler.queue).skipLast(50);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old skipLast with current thread scheduler', function () {
oldSkipLastWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new skipLast with current thread scheduler', function () {
newSkipLastWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
18 changes: 18 additions & 0 deletions perf/micro/immediate-scheduler/operators/skiplast.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldSkipLastWithImmediateScheduler = RxOld.Observable.range(0, 500, RxOld.Scheduler.immediate).skipLast(50);
var newSkipLastWithImmediateScheduler = RxNew.Observable.range(0, 500).skipLast(50);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old skipLast with immediate scheduler', function () {
oldSkipLastWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new skipLast with immediate scheduler', function () {
newSkipLastWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
155 changes: 155 additions & 0 deletions spec/operators/skipLast-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, asDiagram, expectObservable, expectSubscriptions};

const Observable = Rx.Observable;

/** @test {takeLast} */
describe('Observable.prototype.skipLast', () => {
asDiagram('skipLast(2)')('should skip two values of an observable with many values', () => {
const e1 = cold('--a-----b----c---d--|');
const e1subs = '^ !';
const expected = '-------------a---b--|';
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is something really wrong about this marble diagram. The bottom a should not be emitted at the same time as the top c, because when c occurs, we don't actually yet know that it's the second-last emission. We can only know that when the e1 completes. So the expected should be (ab|) when the source completes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As soon as c is emitted, it is known that a should not be skipped because it is not one of the last 2 elements and can be emitted.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, funny. I may have misunderstood skipLast. Is this how RxJS 4 works?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this looks correct. if the skipLast(n) buffer grows to n + 1, we know we can drain to n and emit.


expectObservable(e1.skipLast(2)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should skip last three values', () => {
const e1 = cold('--a-----b----c---d--|');
const e1subs = '^ !';
const expected = '-----------------a--|';

expectObservable(e1.skipLast(3)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should skip all values when trying to take larger then source', () => {
const e1 = cold('--a-----b----c---d--|');
const e1subs = '^ !';
const expected = '--------------------|';

expectObservable(e1.skipLast(5)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should skip all element when try to take exact', () => {
const e1 = cold('--a-----b----c---d--|');
const e1subs = '^ !';
const expected = '--------------------|';

expectObservable(e1.skipLast(4)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should not skip any values', () => {
const e1 = cold('--a-----b----c---d--|');
const e1subs = '^ !';
const expected = '--a-----b----c---d--|';

expectObservable(e1.skipLast(0)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should work with empty', () => {
const e1 = cold('|');
const e1subs = '(^!)';
const expected = '|';

expectObservable(e1.skipLast(42)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should go on forever on never', () => {
const e1 = cold('-');
const e1subs = '^';
const expected = '-';

expectObservable(e1.skipLast(42)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should skip one value from an observable with one value', () => {
const e1 = hot('---(a|)');
const e1subs = '^ ! ';
const expected = '---| ';

expectObservable(e1.skipLast(1)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should skip one value from an observable with many values', () => {
const e1 = hot('--a--^--b----c---d--|');
const e1subs = '^ !';
const expected = '--------b---c--|';

expectObservable(e1.skipLast(1)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should work with empty and early emission', () => {
const e1 = hot('--a--^----|');
const e1subs = '^ !';
const expected = '-----|';

expectObservable(e1.skipLast(42)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should propagate error from the source observable', () => {
const e1 = hot('---^---#', null, 'too bad');
const e1subs = '^ !';
const expected = '----#';

expectObservable(e1.skipLast(42)).toBe(expected, null, 'too bad');
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should propagate error from an observable with values', () => {
const e1 = hot('---^--a--b--#');
const e1subs = '^ !';
const expected = '---------#';

expectObservable(e1.skipLast(42)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should allow unsubscribing explicitly and early', () => {
const e1 = hot('---^--a--b-----c--d--e--|');
const unsub = ' ! ';
const e1subs = '^ ! ';
const expected = '---------- ';

expectObservable(e1.skipLast(42), unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should work with throw', () => {
const e1 = cold('#');
const e1subs = '(^!)';
const expected = '#';

expectObservable(e1.skipLast(42)).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});

it('should throw if total is less than zero', () => {
expect(() => { Observable.range(0, 10).skipLast(-1); })
.to.throw(Rx.ArgumentOutOfRangeError);
});

it('should not break unsubscription chain when unsubscribed explicitly', () => {
const e1 = hot('---^--a--b-----c--d--e--|');
const unsub = ' ! ';
const e1subs = '^ ! ';
const expected = '---------- ';

const result = e1
.mergeMap((x: string) => Observable.of(x))
.skipLast(42)
.mergeMap((x: string) => Observable.of(x));

expectObservable(result, unsub).toBe(expected);
expectSubscriptions(e1.subscriptions).toBe(e1subs);
});
});
1 change: 1 addition & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ import './add/operator/sequenceEqual';
import './add/operator/share';
import './add/operator/single';
import './add/operator/skip';
import './add/operator/skipLast';
import './add/operator/skipUntil';
import './add/operator/skipWhile';
import './add/operator/startWith';
Expand Down
10 changes: 10 additions & 0 deletions src/add/operator/skipLast.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Observable } from '../../Observable';
import { skipLast } from '../../operator/skipLast';

Observable.prototype.skipLast = skipLast;

declare module '../../Observable' {
interface Observable<T> {
skipLast: typeof skipLast;
}
}
88 changes: 88 additions & 0 deletions src/operator/skipLast.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { Operator } from '../Operator';
import { Subscriber } from '../Subscriber';
import { ArgumentOutOfRangeError } from '../util/ArgumentOutOfRangeError';
import { Observable } from '../Observable';
import { TeardownLogic } from '../Subscription';

/**
* Skip the last `count` values emitted by the source Observable.
*
* <img src="./img/skipLast.png" width="100%">
*
* `skipLast` returns an Observable that accumulates a queue with a length
* enough to store the first `count` values. As more values are received,
* values are taken from the front of the queue and produced on the result
* sequence. This causes values to be delayed.
*
* @example <caption>Skip the last 2 values of an Observable with many values</caption>
* var many = Rx.Observable.range(1, 5);
* var skipLastTwo = many.skipLast(2);
* skipLastTwo.subscribe(x => console.log(x));
*
* // Results in:
* // 1 2 3
*
* @see {@link skip}
* @see {@link skipUntil}
* @see {@link skipWhile}
* @see {@link take}
*
* @throws {ArgumentOutOfRangeError} When using `skipLast(i)`, it throws
* ArgumentOutOrRangeError if `i < 0`.
*
* @param {number} count Number of elements to skip from the end of the source Observable.
* @returns {Observable<T>} An Observable that skips the last count values
* emitted by the source Observable.
* @method skipLast
* @owner Observable
*/
export function skipLast<T>(this: Observable<T>, count: number): Observable<T> {
return this.lift(new SkipLastOperator(count));
}

class SkipLastOperator<T> implements Operator<T, T> {
constructor(private total: number) {
if (this.total < 0) {
throw new ArgumentOutOfRangeError;
}
}

call(subscriber: Subscriber<T>, source: any): TeardownLogic {
if (this.total === 0) {
// If we don't want to skip any values then just subscribe
// to Subscriber without any further logic.
return source.subscribe(new Subscriber(subscriber));
} else {
return source.subscribe(new SkipLastSubscriber(subscriber, this.total));
}
}
}

/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class SkipLastSubscriber<T> extends Subscriber<T> {
private ring: T[] = [];
private count: number = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things:

  1. rename these to start with underscores. I know we're not doing that everywhere, but we should be with private and protected properties as a signal to non-TS users.
  2. Rename total to _skipCount or _numberToSkip, something more descriptive.
  3. We should be initializing the ring buffer in the ctor to be the same size as the _skipCount, it should perform better than constantly using push.
  4. Instead of keeping a _count, we can keep a _currentIndex or _index, such that when we increment it we do: this._currentIndex = (_currentIndex + 1) % _skipCount; That way it doesn't inflate past where it needs to be.

Copy link
Contributor Author

@martinsik martinsik Feb 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Blesh I think we have to keep _count instead of just _currentIndex because we need to know when to start remitting buffered items since the buffer is now fixed size.


constructor(destination: Subscriber<T>, private total: number) {
super(destination);
}

protected _next(value: T): void {
const len = this.ring.length;

if (len < this.total) {
this.ring.push(value);
this.count++;
} else {
const idx = this.count++ % this.total;
const oldValue = this.ring[idx];

this.ring[idx] = value;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

be sure if your accessing the same property more than once to pull it out into a const. That way we don't have the cost of looking it up over and over.

this.destination.next(oldValue);
}
}
}