Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: leveldb #44

Merged
merged 2 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
"ethers": "^5.7.2",
"exponential-backoff": "^3.1.1",
"graphql-request": "^6.1.0",
"level": "^8.0.0",
"level-ts": "^2.1.0",
"node-fetch": "2",
"node-slack": "^0.0.7",
"ts-node": "^10.9.1",
Expand Down
11 changes: 10 additions & 1 deletion src/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
export interface WatchtowerOptions {
contract: string;
publish: boolean;
}

Expand All @@ -11,8 +10,18 @@ export interface RunOptions extends WatchtowerOptions {
rpc: string[];
deploymentBlock: string[];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was the deploymentBlock an array?
is this how you plan to run it in multiple networks? If so, in principle, it feels a better idea to use separated instances

pageSize: string;
silent: boolean;
slackWebhook?: string;
sentryDsn?: string;
logglyToken?: string;
oneShot: boolean;
}

export type SingularRunOptions = Omit<RunOptions, "rpc" | "deploymentBlock"> & {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should have "plural" TBH, this should be the default run

rpc: string;
deploymentBlock: string;
};

export interface ReplayBlockOptions extends WatchtowerReplayOptions {
block: string;
}
Expand Down
137 changes: 87 additions & 50 deletions src/types/model.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,31 @@
import Slack = require("node-slack");

import { Context, Storage } from "@tenderly/actions";
import { Transaction as SentryTransaction } from "@sentry/node";
import { BytesLike, ethers } from "ethers";

import { apiUrl, getProvider } from "./utils";
import type { IConditionalOrder } from "./types/ComposableCoW";
import { apiUrl } from "../utils";
import type { IConditionalOrder } from "./generated/ComposableCoW";
import { PollResult, SupportedChainId } from "@cowprotocol/cow-sdk";
import DBService from "../utils/db";

// Standardise the storage key
const LAST_NOTIFIED_ERROR_STORAGE_KEY = "LAST_NOTIFIED_ERROR";
const LAST_PROCESSED_BLOCK_STORAGE_KEY = "LAST_PROCESSED_BLOCK";
const CONDITIONAL_ORDER_REGISTRY_STORAGE_KEY = "CONDITIONAL_ORDER_REGISTRY";
const CONDITIONAL_ORDER_REGISTRY_VERSION_KEY =
"CONDITIONAL_ORDER_REGISTRY_VERSION";
const CONDITIONAL_ORDER_REGISTRY_VERSION = 1;

export const getOrdersStorageKey = (network: string): string => {
return `CONDITIONAL_ORDER_REGISTRY_${network}`;
export const getNetworkStorageKey = (key: string, network: string): string => {
return `${key}_${network}`;
};

export interface ExecutionContext {
registry: Registry;
notificationsEnabled: boolean;
slack?: Slack;
sentryTransaction?: SentryTransaction;
context: Context;
storage: DBService;
}

export interface ReplayPlan {
Expand Down Expand Up @@ -95,9 +97,10 @@ export type ConditionalOrder = {
export class Registry {
version = CONDITIONAL_ORDER_REGISTRY_VERSION;
ownerOrders: Map<Owner, Set<ConditionalOrder>>;
storage: Storage;
storage: DBService;
network: string;
lastNotifiedError: Date | null;
lastProcessedBlock: number | null;

/**
* Instantiates a registry.
Expand All @@ -107,14 +110,16 @@ export class Registry {
*/
constructor(
ownerOrders: Map<Owner, Set<ConditionalOrder>>,
storage: Storage,
storage: DBService,
network: string,
lastNotifiedError: Date | null
lastNotifiedError: Date | null,
lastProcessedBlock: number | null
) {
this.ownerOrders = ownerOrders;
this.storage = storage;
this.network = network;
this.lastNotifiedError = lastNotifiedError;
this.lastProcessedBlock = lastProcessedBlock;
}

/**
Expand All @@ -124,18 +129,33 @@ export class Registry {
* @returns a registry instance
*/
public static async load(
context: Context,
network: string
storage: DBService,
network: string,
genesisBlockNumber: number
): Promise<Registry> {
const str = await context.storage.getStr(getOrdersStorageKey(network));
const lastNotifiedError = await context.storage
.getStr(LAST_NOTIFIED_ERROR_STORAGE_KEY)
.then((isoDate) => (isoDate ? new Date(isoDate) : null))
const db = storage.getDB();
const str = await db.get(
getNetworkStorageKey(CONDITIONAL_ORDER_REGISTRY_STORAGE_KEY, network)
);
const lastNotifiedError = await db
.get(getNetworkStorageKey(LAST_NOTIFIED_ERROR_STORAGE_KEY, network))
.then((isoDate: string | number | Date) =>
isoDate ? new Date(isoDate) : null
)
.catch(() => null);

const lastProcessedBlock = await db
.get(getNetworkStorageKey(LAST_PROCESSED_BLOCK_STORAGE_KEY, network))
.then((blockNumber: string | number) =>
blockNumber ? Number(blockNumber) : genesisBlockNumber
)
.catch(() => null);

// Get the persisted registry version
const version = await context.storage
.getStr(CONDITIONAL_ORDER_REGISTRY_VERSION_KEY)
const version = await db
.get(
getNetworkStorageKey(CONDITIONAL_ORDER_REGISTRY_VERSION_KEY, network)
)
.then((versionString) => Number(versionString))
.catch(() => undefined);

Expand All @@ -148,9 +168,10 @@ export class Registry {
// Return registry (on its latest version)
return new Registry(
ownerOrders,
context.storage,
storage,
network,
lastNotifiedError
lastNotifiedError,
lastProcessedBlock
);
}

Expand All @@ -162,38 +183,54 @@ export class Registry {
* Write the registry to storage.
*/
public async write(): Promise<void> {
await Promise.all([
this.writeOrders(),
this.writeConditionalOrderRegistryVersion(),
this.writeLastNotifiedError(),
]);
}

private async writeOrders(): Promise<void> {
await this.storage.putStr(
getOrdersStorageKey(this.network),
this.stringifyOrders()
);
}

public stringifyOrders(): string {
return JSON.stringify(this.ownerOrders, replacer);
}

private async writeConditionalOrderRegistryVersion(): Promise<void> {
await this.storage.putStr(
CONDITIONAL_ORDER_REGISTRY_VERSION_KEY,
this.version.toString()
);
}
const batch = this.storage
.getDB()
.batch()
.put(
getNetworkStorageKey(
CONDITIONAL_ORDER_REGISTRY_VERSION_KEY,
this.network
),
this.version.toString()
)
.put(
getNetworkStorageKey(
CONDITIONAL_ORDER_REGISTRY_STORAGE_KEY,
this.network
),
this.stringifyOrders()
);

private async writeLastNotifiedError(): Promise<void> {
// Write or delete last notified error
if (this.lastNotifiedError !== null) {
await this.storage.putStr(
LAST_NOTIFIED_ERROR_STORAGE_KEY,
batch.put(
getNetworkStorageKey(LAST_NOTIFIED_ERROR_STORAGE_KEY, this.network),
this.lastNotifiedError.toISOString()
);
} else {
batch.del(
getNetworkStorageKey(LAST_NOTIFIED_ERROR_STORAGE_KEY, this.network)
);
}

// Write or delete last processed block
if (this.lastProcessedBlock !== null) {
batch.put(
getNetworkStorageKey(LAST_PROCESSED_BLOCK_STORAGE_KEY, this.network),
this.lastProcessedBlock.toString()
);
} else {
batch.del(
getNetworkStorageKey(LAST_PROCESSED_BLOCK_STORAGE_KEY, this.network)
);
}

// Write all atomically
await batch.write();
}

public stringifyOrders(): string {
return JSON.stringify(this.ownerOrders, replacer);
}
}

Expand All @@ -213,11 +250,11 @@ export class ChainContext {
}

public static async create(
context: Context,
chainId: SupportedChainId
storage: DBService,
url: string
): Promise<ChainContext> {
const provider = await getProvider(context, chainId);
await provider.getNetwork();
const provider = new ethers.providers.JsonRpcProvider(url);
const chainId = (await provider.getNetwork()).chainId;
return new ChainContext(provider, apiUrl(chainId), chainId);
}
}
Expand Down
65 changes: 29 additions & 36 deletions src/utils/context.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Slack = require("node-slack");
import { Context } from "@tenderly/actions";
import { backOff } from "exponential-backoff";
import DBService from "./db";

import {
init as sentryInit,
Expand All @@ -9,9 +9,10 @@ import {
} from "@sentry/node";
import { CaptureConsole as CaptureConsoleIntegration } from "@sentry/integrations";

import { ExecutionContext, Registry } from "../model";
import { ExecutionContext, Registry } from "../types/model";
import { SupportedChainId } from "@cowprotocol/cow-sdk";
import { initLogging } from "./logging";
import { SingularRunOptions } from "../types";

const NOTIFICATION_WAIT_PERIOD = 1000 * 60 * 60 * 2; // 2h - Don't send more than one notification every 2h

Expand All @@ -20,22 +21,26 @@ let executionContext: ExecutionContext | undefined;
export async function initContext(
transactionName: string,
chainId: SupportedChainId,
context: Context
options: SingularRunOptions
): Promise<ExecutionContext> {
// Init Logging
await _initLogging(transactionName, chainId, context);
_initLogging(transactionName, chainId, options);

// Init registry
const registry = await Registry.load(context, chainId.toString());
// Init storage
const storage = DBService.getInstance();

// Get notifications config (enabled by default)
const notificationsEnabled = await _getNotificationsEnabled(context);
// Init registry
const registry = await Registry.load(
storage,
chainId.toString(),
Number(options.deploymentBlock)
);

// Init slack
const slack = await _getSlack(notificationsEnabled, context);
const slack = _getSlack(options);

// Init Sentry
const sentryTransaction = await _getSentry(transactionName, chainId, context);
const sentryTransaction = _getSentry(transactionName, chainId, options);
if (!sentryTransaction) {
console.warn("SENTRY_DSN secret is not set. Sentry will be disabled");
}
Expand All @@ -44,54 +49,42 @@ export async function initContext(
registry,
slack,
sentryTransaction,
notificationsEnabled,
context,
notificationsEnabled: !options.silent,
storage,
};

return executionContext;
}

async function _getNotificationsEnabled(context: Context): Promise<boolean> {
// Get notifications config (enabled by default)
return context.secrets
.get("NOTIFICATIONS_ENABLED")
.then((value) => (value ? value !== "false" : true))
.catch(() => true);
}

async function _getSlack(
notificationsEnabled: boolean,
context: Context
): Promise<Slack | undefined> {
function _getSlack(options: SingularRunOptions): Slack | undefined {
if (executionContext) {
return executionContext?.slack;
}

// Init slack
const webhookUrl = await context.secrets
.get("SLACK_WEBHOOK_URL")
.catch(() => "");
if (!notificationsEnabled) {
const webhookUrl = options.slackWebhook || "";

if (options.silent && !webhookUrl) {
return undefined;
}

if (!webhookUrl) {
throw new Error(
"SLACK_WEBHOOK_URL secret is required when NOTIFICATIONS_ENABLED is true"
"SLACK_WEBHOOK_URL must be set if not running in silent mode"
);
}

return new Slack(webhookUrl);
}

async function _getSentry(
function _getSentry(
transactionName: string,
chainId: SupportedChainId,
context: Context
): Promise<SentryTransaction | undefined> {
options: SingularRunOptions
): SentryTransaction | undefined {
// Init Sentry
if (!executionContext) {
const sentryDsn = await context.secrets.get("SENTRY_DSN").catch(() => "");
const sentryDsn = options.sentryDsn || "";
sentryInit({
dsn: sentryDsn,
debug: false,
Expand Down Expand Up @@ -237,12 +230,12 @@ function handlePromiseErrors<T>(
/**
* Init Logging with Loggly
*/
async function _initLogging(
function _initLogging(
transactionName: string,
chainId: SupportedChainId,
context: Context
options: SingularRunOptions
) {
const logglyToken = await context.secrets.get("LOGGLY_TOKEN").catch(() => "");
const { logglyToken } = options;
if (logglyToken) {
initLogging(logglyToken, [transactionName, `chain_${chainId}`]);
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/utils/contracts.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ethers } from "ethers";
import { ComposableCoW__factory } from "../types";
import { ComposableCoW__factory } from "../types/generated";

// Selectors that are required to be part of the contract's bytecode in order to be considered compatible
const REQUIRED_SELECTORS = [
Expand Down
Loading