Skip to content

Commit

Permalink
feat(dropRepeats): implement core instance operator dropRepeats()
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Apr 6, 2016
1 parent 76879a5 commit b7dccf9
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 1 deletion.
7 changes: 6 additions & 1 deletion src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {MapToOperator} from './operator/MapToOperator';
import {FilterOperator} from './operator/FilterOperator';
import {TakeOperator} from './operator/TakeOperator';
import {DropOperator} from './operator/DropOperator';
import {DropRepeatsOperator} from './operator/DropRepeatsOperator';
import {DebugOperator} from './operator/DebugOperator';
import {FoldOperator} from './operator/FoldOperator';
import {LastOperator} from './operator/LastOperator';
Expand Down Expand Up @@ -141,7 +142,7 @@ export class Stream<T> implements InternalListener<T> {
});
}

static ['throw'](err: any): Stream<void> {
static throw(err: any): Stream<void> {
return new Stream<void>({
_start(il: InternalListener<void>) { il._e(err); },
_stop: noop,
Expand Down Expand Up @@ -196,6 +197,10 @@ export class Stream<T> implements InternalListener<T> {
return new Stream<T>(new DropOperator(amount, this));
}

dropRepeats(isEqual: (x: T, y: T) => boolean = null): Stream<T> {
return new Stream<T>(new DropRepeatsOperator(isEqual, this));
}

last(): Stream<T> {
return new Stream<T>(new LastOperator(this));
}
Expand Down
48 changes: 48 additions & 0 deletions src/operator/DropRepeatsOperator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import {InternalListener} from '../InternalListener';
import {Operator} from '../Operator';
import {Stream} from '../Stream';
import {emptyListener} from '../utils/emptyListener';
import {empty} from '../utils/empty';

export class Proxy<T> implements InternalListener<T> {
constructor(private out: Stream<T>,
private op: DropRepeatsOperator<T>) {
}

isEq(x: T, y: T) {
return this.op.compare ? this.op.compare(x, y) : x === y;
}

_n(t: T) {
const op = this.op;
if (op.v === empty || !this.isEq(t, op.v)) {
this.out._n(t);
}
op.v = t;
}

_e(err: any) {
this.out._e(err);
}

_c() {
this.out._c();
}
}

export class DropRepeatsOperator<T> implements Operator<T, T> {
private proxy: InternalListener<T> = emptyListener;
public v: T = <any> empty;

constructor(public compare: (x: T, y: T) => boolean,
public ins: Stream<T>) {
}

_start(out: Stream<T>): void {
this.ins._add(this.proxy = new Proxy(out, this));
}

_stop(): void {
this.ins._remove(this.proxy);
}
}
37 changes: 37 additions & 0 deletions tests/operator/dropRepeats.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import xs from '../../src/index';
import * as assert from 'assert';

describe('Stream.prototype.dropRepeats', () => {
it('should drop consecutive duplicate numbers (as events)', (done) => {
const stream = xs.of(1, 2, 1, 1, 1, 2, 3, 4, 3, 3).dropRepeats();
const expected = [1, 2, 1, 2, 3, 4, 3];

stream.addListener({
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
},
});
});

it('should drop consecutive \'duplicate\' strings, with a custom isEqual', (done) => {
const stream = xs.of('a', 'b', 'a', 'A', 'B', 'b')
.dropRepeats((x, y) => x.toLowerCase() === y.toLowerCase());
const expected = ['a', 'b', 'a', 'B'];

stream.addListener({
next: (x: string) => {
assert.equal(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.equal(expected.length, 0);
done();
},
});
});
});
1 change: 1 addition & 0 deletions tests/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ describe('Stream', () => {
assert.equal(typeof stream.filter, 'function');
assert.equal(typeof stream.take, 'function');
assert.equal(typeof stream.drop, 'function');
assert.equal(typeof stream.dropRepeats, 'function');
assert.equal(typeof stream.last, 'function');
assert.equal(typeof stream.startWith, 'function');
assert.equal(typeof stream.endWhen, 'function');
Expand Down

0 comments on commit b7dccf9

Please sign in to comment.