Skip to content

Commit

Permalink
fix(retryWhen): fix internal unsubscriptions
Browse files Browse the repository at this point in the history
Reimplement retryWhen operator. Fix retryWhen to tear down resources
(internal retried subscriptions and the subscription to the notifier
Observable) whenever the result is unsubscribed.
  • Loading branch information
Andre Medeiros authored and benlesh committed Oct 17, 2015
1 parent aa683f9 commit 5aff5e8
Showing 1 changed file with 80 additions and 21 deletions.
101 changes: 80 additions & 21 deletions src/operators/retryWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,49 +14,108 @@ export default function retryWhen<T>(notifier: (errors: Observable<any>) => Obse

class RetryWhenOperator<T, R> implements Operator<T, R> {
constructor(protected notifier: (errors: Observable<any>) => Observable<any>,
protected original: Observable<T>) {
protected source: Observable<T>) {
}

call(subscriber: Subscriber<T>): Subscriber<T> {
return new RetryWhenSubscriber<T>(subscriber, this.notifier, this.original);
return new FirstRetryWhenSubscriber<T>(subscriber, this.notifier, this.source);
}
}

class RetryWhenSubscriber<T> extends Subscriber<T> {
class FirstRetryWhenSubscriber<T> extends Subscriber<T> {
lastSubscription: Subscription<T>;
notificationSubscription: Subscription<T>;
errors: Subject<any>;
retryNotifications: Observable<any>;

constructor(destination: Subscriber<T>,
constructor(public destination: Subscriber<T>,
public notifier: (errors: Observable<any>) => Observable<any>,
public original: Observable<T>) {
super(destination);
public source: Observable<T>) {
super(null);
this.lastSubscription = this;
}

_error(err: any) {
if (!this.retryNotifications) {
this.errors = new Subject();
const notifications = tryCatch(this.notifier).call(this, this.errors);
if (notifications === errorObject) {
this.destination.error(errorObject.e);
} else {
this.retryNotifications = notifications;
this.add(notifications._subscribe(new RetryNotificationSubscriber(this)));
_next(value: T) {
this.destination.next(value);
}

error(err?) {
if (!this.isUnsubscribed) {
super.unsubscribe();
if (!this.retryNotifications) {
this.errors = new Subject();
const notifications = tryCatch(this.notifier).call(this, this.errors);
if (notifications === errorObject) {
this.destination.error(errorObject.e);
} else {
this.retryNotifications = notifications;
const notificationSubscriber = new RetryNotificationSubscriber(this);
this.notificationSubscription = notifications.subscribe(notificationSubscriber);
}
}
this.errors.next(err);
}
this.errors.next(err);
}

finalError(err: any) {
destinationError(err: any) {
this.tearDown();
this.destination.error(err);
}

_complete() {
this.destinationComplete();
}

destinationComplete() {
this.tearDown();
this.destination.complete();
}

unsubscribe() {
const lastSubscription = this.lastSubscription;
if (lastSubscription === this) {
super.unsubscribe();
} else {
this.tearDown();
}
}

tearDown() {
super.unsubscribe();
this.lastSubscription.unsubscribe();
const notificationSubscription = this.notificationSubscription;
if (notificationSubscription) {
notificationSubscription.unsubscribe();
}
}

resubscribe() {
this.original.subscribe(this);
this.lastSubscription.unsubscribe();
const nextSubscriber = new MoreRetryWhenSubscriber(this);
this.lastSubscription = this.source.subscribe(nextSubscriber);
}
}

class MoreRetryWhenSubscriber<T> extends Subscriber<T> {
constructor(private parent: FirstRetryWhenSubscriber<T>) {
super(null);
}

_next(value: T) {
this.parent.destination.next(value);
}

_error(err: any) {
this.parent.errors.next(err);
}

_complete() {
this.parent.destinationComplete();
}
}

class RetryNotificationSubscriber<T> extends Subscriber<T> {
constructor(public parent: RetryWhenSubscriber<any>) {
constructor(public parent: FirstRetryWhenSubscriber<any>) {
super(null);
}

Expand All @@ -65,10 +124,10 @@ class RetryNotificationSubscriber<T> extends Subscriber<T> {
}

_error(err: any) {
this.parent.finalError(err);
this.parent.destinationError(err);
}

_complete() {
this.parent.complete();
this.parent.destinationComplete();
}
}

0 comments on commit 5aff5e8

Please sign in to comment.