Skip to content

Commit

Permalink
fix: share and connect no longer bundle scheduling code by default
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Mar 9, 2022
1 parent c45f9d2 commit a054287
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 6 deletions.
3 changes: 2 additions & 1 deletion src/internal/observable/innerFrom.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { isArrayLike } from '../util/isArrayLike';
import { isPromise } from '../util/isPromise';
import { Observable } from '../Observable';
import { ObservableInput, ReadableStreamLike } from '../types';
import { ObservableInput, ObservedValueOf, ReadableStreamLike } from '../types';
import { isInteropObservable } from '../util/isInteropObservable';
import { isAsyncIterable } from '../util/isAsyncIterable';
import { createInvalidObservableTypeError } from '../util/throwUnobservableError';
Expand All @@ -12,6 +12,7 @@ import { isFunction } from '../util/isFunction';
import { reportUnhandledError } from '../util/reportUnhandledError';
import { observable as Symbol_observable } from '../symbol/observable';

export function innerFrom<O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>;
export function innerFrom<T>(input: ObservableInput<T>): Observable<T> {
if (input instanceof Observable) {
return input;
Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/connect.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { OperatorFunction, ObservableInput, ObservedValueOf, SubjectLike } from '../types';
import { Observable } from '../Observable';
import { Subject } from '../Subject';
import { from } from '../observable/from';
import { innerFrom } from '../observable/innerFrom';
import { operate } from '../util/lift';
import { fromSubscribable } from '../observable/fromSubscribable';

Expand Down Expand Up @@ -103,7 +103,7 @@ export function connect<T, O extends ObservableInput<unknown>>(
const { connector } = config;
return operate((source, subscriber) => {
const subject = connector();
from(selector(fromSubscribable(subject))).subscribe(subscriber);
innerFrom(selector(fromSubscribable(subject))).subscribe(subscriber);
subscriber.add(source.subscribe(subject));
});
}
2 changes: 1 addition & 1 deletion src/internal/operators/onErrorResumeNext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export function onErrorResumeNext<T, A extends readonly unknown[]>(
if (remaining.length > 0) {
let nextSource: Observable<A[number]>;
try {
nextSource = innerFrom<T | A[number]>(remaining.shift()!);
nextSource = innerFrom(remaining.shift()!);
} catch (err) {
subscribeNext();
return;
Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/share.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Observable } from '../Observable';
import { from } from '../observable/from';
import { innerFrom } from '../observable/innerFrom';
import { take } from '../operators/take';
import { Subject } from '../Subject';
import { SafeSubscriber } from '../Subscriber';
Expand Down Expand Up @@ -232,7 +232,7 @@ export function share<T>(options: ShareConfig<T> = {}): MonoTypeOperatorFunction
dest.complete();
},
});
from(source).subscribe(connection);
innerFrom(source).subscribe(connection);
}
})(wrapperSource);
};
Expand Down

0 comments on commit a054287

Please sign in to comment.