Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rename finalBlocksOnly #27

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import { banner } from "./src/banner.js";
import { toJSON, toText } from "./src/http.js";
import { ping } from "./src/ping.js";
import { keyPair, parsePrivateKey } from "./src/auth.js";
import { Metadata, boolean } from "./src/schemas.js";

export async function action(options: WebhookRunOptions) {
const cursor = fileCursor.readCursor(options.cursorPath);
const finalBlocksOnly = boolean.parse(options.finalBlocksOnly);

// Block Emitter
const { emitter, moduleHash } = await setup({...options, cursor});
Expand All @@ -34,7 +36,7 @@ export async function action(options: WebhookRunOptions) {
emitter.on("output", async (data, cursor, clock) => {
if (!clock.timestamp) return;
const chain = new URL(options.substreamsEndpoint).hostname.split(".")[0];
const metadata = {
const metadata: Metadata = {
status: 200,
cursor,
session: {
Expand All @@ -49,7 +51,7 @@ export async function action(options: WebhookRunOptions) {
manifest: {
substreamsEndpoint: options.substreamsEndpoint,
chain,
finalBlockOnly: options.finalBlocksOnly,
finalBlocksOnly,
moduleName: options.moduleName,
type: data.getType().typeName,
moduleHash,
Expand All @@ -60,7 +62,7 @@ export async function action(options: WebhookRunOptions) {

// Queue POST
queue.add(async () => {
await postWebhook(options.webhookUrl, body, privateKey, options);
await postWebhook(options.webhookUrl, body, metadata, privateKey, options);
fs.writeFileSync(options.cursorPath, cursor);
});
});
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"version": "0.9.1",
"version": "0.9.2",
"name": "substreams-sink-webhook",
"description": "Substreams Sink Webhook",
"type": "module",
Expand Down
4 changes: 2 additions & 2 deletions src/ping.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export async function ping(url: string, privateKey?: Hex) {

// send valid signature (must respond with 200)
try {
await postWebhook(url, body, privateKey, { maximumAttempts: 0 });
await postWebhook(url, body, undefined, privateKey, { maximumAttempts: 0 });
} catch (_e) {
return false;
}
Expand All @@ -20,7 +20,7 @@ export async function ping(url: string, privateKey?: Hex) {

// send invalid signature (must NOT respond with 200)
try {
await postWebhook(url, body, invalidprivateKey, { maximumAttempts: 0 });
await postWebhook(url, body, undefined, invalidprivateKey, { maximumAttempts: 0 });
return false;
} catch (_e) {
return true;
Expand Down
10 changes: 6 additions & 4 deletions src/postWebhook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Hex } from "@noble/curves/abstract/utils";
import { logger } from "substreams-sink";
import { createTimestamp, sign } from "./auth.js";
import logUpdate from "log-update";
import { Metadata } from "./schemas.js";

function awaitSetTimeout(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
Expand All @@ -21,16 +22,17 @@ let start = now();
// let lastUpdate = now();

// TO-DO replace with Prometheus metrics
function logProgress() {
function logProgress(metadata?: Metadata) {
if ( !metadata ) return;
const delta = now() - start;
const rate = Math.round(blocks / delta);
const minutes = Math.round(delta / 60);
const seconds = delta % 60;
if ( blocks ) logUpdate(`[app] blocks=${blocks} [${rate} b/s] (${minutes}m ${seconds}s)`);
if ( blocks ) logUpdate(`[app] timestamp=${metadata.clock.timestamp} block_number=${metadata.clock.number} blocks=${blocks} [${rate} b/s] (${minutes}m ${seconds}s)`);
blocks++;
}

export async function postWebhook(url: string, body: string, secretKey?: Hex, options: PostWebhookOptions = {}) {
export async function postWebhook(url: string, body: string, metadata?: Metadata, secretKey?: Hex, options: PostWebhookOptions = {}) {
// Retry Policy
const initialInterval = 1000; // 1s
const maximumAttempts = options.maximumAttempts ?? 100 * initialInterval;
Expand Down Expand Up @@ -81,7 +83,7 @@ export async function postWebhook(url: string, body: string, secretKey?: Hex, op
continue;
}
// success
logProgress();
logProgress(metadata);
return { url, status };
} catch (e: any) {
const error = e.cause;
Expand Down
22 changes: 17 additions & 5 deletions src/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,29 @@ export const ManifestSchema = z.object({
type: z.string(),
moduleHash: z.string(),
chain: z.string(),
finalBlockOnly: boolean,
finalBlocksOnly: boolean,
});
export type Manifest = z.infer<typeof ManifestSchema>;

export const SessionSchema = z.object({
traceId: z.string(),
resolvedStartBlock: z.number(),
});
export type Session = z.infer<typeof SessionSchema>;

export const MetadataSchema = z.object({
status: z.number(),
cursor: z.string(),
session: SessionSchema,
clock: ClockSchema,
manifest: ManifestSchema,
});
export type Metadata = z.infer<typeof MetadataSchema>;

export const makePayloadBody = <S extends z.Schema>(dataSchema: S) =>
z.object({
cursor: z.string(),
session: z.object({
traceId: z.string(),
resolvedStartBlock: z.number(),
}),
session: SessionSchema,
clock: ClockSchema,
manifest: ManifestSchema,
data: dataSchema,
Expand Down
Loading