Skip to content

Commit

Permalink
feat(multiplex): add multiplex operator to WebSocketSubject
Browse files Browse the repository at this point in the history
- fix to ensure expected web socket close behavior
  • Loading branch information
benlesh committed Jan 13, 2016
1 parent 580f69a commit 904d617
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 43 deletions.
39 changes: 39 additions & 0 deletions spec/observables/dom/webSocket-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,45 @@ describe('Observable.webSocket', function () {
expect(closes[1]).toBe(expected[1]);
});
});

describe('multiplex', function () {
it('should multiplex over the websocket', function () {
var results = [];
var subject = Observable.webSocket('ws://websocket');
var source = subject.multiplex(function () {
return { sub: 'foo'};
}, function () {
return { unsub: 'foo' };
}, function (value) {
return value.name === 'foo';
});

var sub = source.subscribe(function (x) {
results.push(x.value);
});
var socket = MockWebSocket.lastSocket();
socket.open();

expect(socket.lastMessageSent()).toEqual({ sub: 'foo' });

[1, 2, 3, 4, 5].map(function (x) {
return {
name: x % 3 === 0 ? 'bar' : 'foo',
value: x
};
}).forEach(function (x) {
socket.triggerMessage(JSON.stringify(x));
});

expect(results).toEqual([1, 2, 4, 5]);

spyOn(socket, 'close').and.callThrough();
sub.unsubscribe();
expect(socket.lastMessageSent()).toEqual({ unsub: 'foo' });

expect(socket.close).toHaveBeenCalled();
});
});
});

var sockets = [];
Expand Down
76 changes: 33 additions & 43 deletions src/observable/dom/webSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,38 @@ export class WebSocketSubject<T> extends Subject<T> {
return sock;
}

multiplex(subMsg: any, unsubMsg: any, messageFilter: (value: T) => boolean) {
return this.lift(new MultiplexOperator(this, subMsg, unsubMsg, messageFilter));
// TODO: factor this out to be a proper Operator/Subscriber implementation and eliminate closures
multiplex(subMsg: () => any, unsubMsg: () => any, messageFilter: (value: T) => boolean) {
const self = this;
return new Observable(observer => {
const result = tryCatch(subMsg)();
if (result === errorObject) {
observer.error(errorObject.e);
} else {
self.next(result);
}

const subscription = self.subscribe(x => {
const result = tryCatch(messageFilter)(x);
if (result === errorObject) {
observer.error(errorObject.e);
} else if (result) {
observer.next(x);
}
},
err => observer.error(err),
() => observer.complete());

return () => {
const result = tryCatch(unsubMsg)();
if (result === errorObject) {
observer.error(errorObject.e);
} else {
self.next(result);
}
subscription.unsubscribe();
};
});
}

_unsubscribe() {
Expand Down Expand Up @@ -157,12 +187,11 @@ export class WebSocketSubject<T> extends Subject<T> {
self._finalNext(result);
}
};
return subscription;
}

return new Subscription(() => {
subscription.unsubscribe();
if (this.observers.length === 0) {
if (!this.observers || this.observers.length === 0) {
const { socket } = this;
if (socket && socket.readyState < 2) {
socket.close();
Expand All @@ -173,43 +202,4 @@ export class WebSocketSubject<T> extends Subject<T> {
}
});
}
}

export class MultiplexOperator<T, R> implements Operator<T, R> {
constructor(private socketSubject: WebSocketSubject<T>,
private subscribeMessage: any,
private unsubscribeMessage,
private messageFilter: (data: any) => R) {
// noop
}

call(subscriber: Subscriber<R>) {
return new MultiplexSubscriber(subscriber, this.socketSubject, this.subscribeMessage, this.unsubscribeMessage, this.messageFilter);
}
}

export class MultiplexSubscriber<T> extends Subscriber<T> {
constructor(destination: Observer<T>,
private socketSubject: WebSocketSubject<any>,
private subscribeMessage: any,
private unsubscribeMessage: any,
private messageFilter: (data: any) => T) {
super(destination);

socketSubject.next(subscribeMessage);
}

next(value: any) {
const pass = tryCatch(this.messageFilter)(value);
if (pass === errorObject) {
this.destination.error(errorObject.e);
} else if (pass) {
this.destination.next(value);
}
}

unsubscribe() {
this.socketSubject.next(this.unsubscribeMessage);
super.unsubscribe();
}
}

0 comments on commit 904d617

Please sign in to comment.