Skip to content

Commit

Permalink
refactor: clean up pipeline with generic split, improved fetch, error…
Browse files Browse the repository at this point in the history
… reporting
  • Loading branch information
jhohlfeld committed Oct 19, 2023
1 parent fa851fe commit dd42c2f
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 48 deletions.
16 changes: 9 additions & 7 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
import { pipeline } from "node:stream/promises";
import { logger } from "./logger";
import { request, response } from "./stream/destatis";
import { EntityStream } from "./stream/entity";
import { FetchStream } from "./stream/fetch";
import { extractData, jsonl, split } from "./stream/util";
import { fetcher } from "./stream/fetcher";
import { TimerStream } from "./stream/timer";
import { logger } from "./logger";
import { extractData, jsonParse, jsonl, split } from "./stream/util";

const entities = new EntityStream();
const timer = new TimerStream({ throttleTime: 1500 });
const fetcher = new FetchStream();
const timer = new TimerStream({ throttleTime: 300 });

pipeline(
process.stdin,
split,
split("\n"),
jsonParse,
entities,
request,
timer,
Expand All @@ -21,7 +21,9 @@ pipeline(
extractData,
jsonl,
process.stdout
);
)
.then(() => logger.info("Pipeline succeeded"))
.catch((err: Error) => logger.error({ msg: "Pipeline failed", err }));

fetcher.on("data", ({ responseTime }) => {
logger.info({ msg: "fetching done", responseTime });
Expand Down
18 changes: 16 additions & 2 deletions src/stream/destatis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,22 @@ export const response = new Transform({
_,
callback
) => {
const { data } = JSON.parse(await response.text())[0];
callback(null, {
let data;
let error;
try {
data = JSON.parse(await response.text())[0].data;
} catch (err) {
error = new Error(
`Error parsing response for this entity: ${JSON.stringify(entity)}`,
{
cause: {
err: err instanceof Error ? err : new Error(String(err)),
entity,
},
}
);
}
callback(error, {
entity,
data: parseLocalityData(data)
.flatMap(filterLocalityFields(localityFields))
Expand Down
31 changes: 0 additions & 31 deletions src/stream/fetch.ts

This file was deleted.

25 changes: 25 additions & 0 deletions src/stream/fetcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Transform } from "node:stream";
import { Entity } from "./entity";

export const fetcher = new Transform({
objectMode: true,
async transform(
{ entity, request }: { entity: Entity; request: Request },
_,
callback
) {
const start = Date.now();

let error;
let data;

try {
const response = await fetch(request);
data = { entity, response, responseTime: Date.now() - start };
} catch (err) {
error = err instanceof Error ? err : new Error(String(err));
}

callback(error, data);
},
});
59 changes: 51 additions & 8 deletions src/stream/util.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
import { Transform } from "node:stream";

export const split = new Transform({
export function split(delimiter: string) {
const stream = new Transform({
readableObjectMode: true,
transform: (chunk: Buffer, _, callback) => {
chunk
.toString("utf8")
.split(delimiter)
.filter((str) => str.length > 0)
.map((str) => str.trim())
.forEach((str) => stream.push(str));
callback();
},
});
return stream;
}

export const jsonParse = new Transform({
objectMode: true,
transform: (chunk: Buffer, _, callback) => {
chunk
.toString("utf8")
.split(",")
.map((str) => str.trim())
.forEach((str) => split.push(str));
callback(null);
transform(chunk: string, _, callback) {
try {
callback(null, JSON.parse(chunk));
} catch (err) {
const error = new Error(`Error parsing chunk '${chunk}'`, {
cause: {
chunk,
err: err instanceof Error ? err : new Error(String(err)),
},
});
callback(error);
}
},
});

Expand All @@ -26,6 +47,28 @@ export const jsonl = new Transform({
},
});

export function getUpperQuartile(data: Array<number>): number | undefined {
if (data.length < 4) {
return data.slice(-1).shift();
}

const sortedData = [...data].sort((a, b) => a - b);
const medianIndex = Math.floor(sortedData.length / 2);

const upperHalf =
sortedData.length % 2 === 0
? sortedData.slice(medianIndex)
: sortedData.slice(medianIndex + 1);

if (upperHalf.length % 2 === 0) {
const upperMiddle = upperHalf.length / 2;
return (upperHalf[upperMiddle - 1] + upperHalf[upperMiddle]) / 2;
} else {
const upperMedianIndex = Math.floor(upperHalf.length / 2);
return upperHalf[upperMedianIndex];
}
}

export function getMedian(data: Array<number>): number | undefined {
const sortedData = data.toSorted((a, b) => a - b);
const medianIndex = Math.floor(sortedData.length / 2);
Expand Down

0 comments on commit dd42c2f

Please sign in to comment.