Skip to content

Commit bbde8e9

Browse files
authored
Merge pull request #83 from rimbu-org/feature/add-stream-transformers
Feature/add stream transformers
2 parents 4d6b741 + 050facc commit bbde8e9

28 files changed

+2661
-214
lines changed

deno_dist/stream/async-custom/async-fast-iterator-base.ts

+110
Original file line numberDiff line numberDiff line change
@@ -1245,3 +1245,113 @@ export class AsyncReduceAllIterator<I, R> extends AsyncFastIteratorBase<R> {
12451245
) as any;
12461246
}
12471247
}
1248+
1249+
export class AsyncDistinctPreviousIterator<T> extends AsyncFastIteratorBase<T> {
1250+
constructor(readonly source: AsyncFastIterator<T>, readonly eq: Eq<T>) {
1251+
super();
1252+
this.return = (): Promise<void> => closeIters(source);
1253+
}
1254+
1255+
readonly previous = [] as T[];
1256+
1257+
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<T | O> {
1258+
const done = Symbol('Done');
1259+
1260+
let next: T | typeof done;
1261+
const source = this.source;
1262+
const previous = this.previous;
1263+
1264+
while (done !== (next = await source.fastNext(done))) {
1265+
previous.push(next);
1266+
1267+
if (previous.length === 1) {
1268+
return next;
1269+
}
1270+
1271+
const prev = previous.shift()!;
1272+
1273+
if (!this.eq(prev, next)) {
1274+
return next;
1275+
}
1276+
}
1277+
1278+
return AsyncOptLazy.toMaybePromise(otherwise!);
1279+
}
1280+
}
1281+
1282+
export class AsyncWindowIterator<T, R> extends AsyncFastIteratorBase<R> {
1283+
constructor(
1284+
readonly source: AsyncFastIterator<T>,
1285+
readonly windowSize: number,
1286+
readonly skipAmount: number,
1287+
readonly collector: AsyncReducer<T, R>
1288+
) {
1289+
super();
1290+
this.return = (): Promise<void> => closeIters(source);
1291+
}
1292+
1293+
state = new Set<{
1294+
result: unknown;
1295+
size: number;
1296+
halted: boolean;
1297+
halt: () => void;
1298+
}>();
1299+
index = 0;
1300+
1301+
async fastNext<O>(otherwise?: AsyncOptLazy<O>): Promise<R | O> {
1302+
const source = this.source;
1303+
const collector = this.collector;
1304+
const windowSize = this.windowSize;
1305+
const skipAmount = this.skipAmount;
1306+
const done = Symbol('Done');
1307+
const state = this.state;
1308+
1309+
let next: T | typeof done;
1310+
let result: R | typeof done = done;
1311+
1312+
while (done !== (next = await source.fastNext(done))) {
1313+
for (const current of state) {
1314+
current.result = await collector.next(
1315+
current.result,
1316+
next,
1317+
current.size,
1318+
current.halt
1319+
);
1320+
current.size++;
1321+
1322+
if (current.size >= windowSize || current.halted) {
1323+
result = await collector.stateToResult(current.result);
1324+
state.delete(current);
1325+
}
1326+
}
1327+
1328+
if (this.index % skipAmount === 0) {
1329+
const newState = {
1330+
result: await AsyncOptLazy.toMaybePromise(collector.init),
1331+
size: 1,
1332+
halted: false,
1333+
halt(): void {
1334+
this.halted = true;
1335+
},
1336+
};
1337+
1338+
newState.result = await collector.next(
1339+
AsyncOptLazy.toMaybePromise(collector.init),
1340+
next,
1341+
0,
1342+
newState.halt
1343+
);
1344+
1345+
state.add(newState);
1346+
}
1347+
1348+
this.index++;
1349+
1350+
if (done !== result) {
1351+
return result;
1352+
}
1353+
}
1354+
1355+
return AsyncOptLazy.toMaybePromise(otherwise!);
1356+
}
1357+
}

0 commit comments

Comments
 (0)