Skip to content

Commit

Permalink
feat: add graph producers (#1)
Browse files Browse the repository at this point in the history
* feat: add graph producers

* feat: add docker compose

* feat: containerization

* fix: remove workspace config

* feat: enable node 20.*

* fix: linter cmds

* feat: adjust producers to return events

* fix: tests

* chore: finish migration from notifications to events
  • Loading branch information
aleortega authored Jul 5, 2024
1 parent 1f6c774 commit 2b47109
Show file tree
Hide file tree
Showing 38 changed files with 2,647 additions and 45 deletions.
33 changes: 33 additions & 0 deletions .env.default
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# This file should be safe to commit and can act as documentation for all
# the possible configurations of our server.

# This file contains the default environment variables, by default,
# it is third in precedence:
# 1. process environment variables
# 2. `.env` file contents
# 3. `.env.default` file contents.

HTTP_SERVER_PORT=5002
HTTP_SERVER_HOST=0.0.0.0
ENV=

# Needed to run migrations
CONNECTION_STRING=''

PG_COMPONENT_PSQL_PORT=''
PG_COMPONENT_PSQL_HOST=''
PG_COMPONENT_PSQL_DATABASE=''
PG_COMPONENT_PSQL_USER=''
PG_COMPONENT_PSQL_PASSWORD=''

MARKETPLACE_BASE_URL=https://decentraland.org/marketplace

NETWORK=mainnet
MARKETPLACE_SUBGRAPH_URL=https://api.thegraph.com/subgraphs/name/decentraland/marketplace
COLLECTIONS_L2_SUBGRAPH_URL=https://api.thegraph.com/subgraphs/name/decentraland/collections-matic-mainnet
RENTALS_SUBGRAPH_URL=https://api.thegraph.com/subgraphs/name/decentraland/rentals-ethereum-mainnet
LAND_MANAGER_SUBGRAPH_URL=https://api.thegraph.com/subgraphs/name/decentraland/land-manager

AWS_SNS_ARN=''
AWS_SNS_ENDPOINT=''
AWS_SQS_ENDPOINT=''
17 changes: 17 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
version: 2
updates:
- package-ecosystem: npm
directory: "/"
schedule:
interval: weekly
day: monday
time: "09:00"
timezone: "America/Buenos_Aires"
allow:
- dependency-name: "@dcl/*"
- dependency-name: "@catalyst/*"
- dependency-name: "@well-known-components/*"
- dependency-name: "dcl-*"
versioning-strategy: auto
commit-message:
prefix: "chore: "
9 changes: 9 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: build

on:
push:
pull_request:

jobs:
build:
uses: decentraland/platform-actions/.github/workflows/apps-with-db-build.yml@main
11 changes: 11 additions & 0 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: CI/CD on PR

on:
pull_request:

jobs:
pr:
uses: decentraland/platform-actions/.github/workflows/apps-pr.yml@main
with:
service-name: worlds-content-server
secrets: inherit
9 changes: 9 additions & 0 deletions .github/workflows/validate-pr-title.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
name: validate-pr-title

on:
pull_request:
types: [edited, opened, reopened, synchronize]

jobs:
title-matches-convention:
uses: decentraland/actions/.github/workflows/validate-pr-title.yml@main
51 changes: 51 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
ARG RUN

FROM node:lts-alpine as builderenv

RUN apk add --no-cache git

WORKDIR /app

# some packages require a build step
COPY package.json /app/package.json
COPY yarn.lock /app/yarn.lock
RUN yarn install --frozen-lockfile

# build the app
COPY . /app

# Make commit hash available to application
ARG COMMIT_HASH
RUN echo "COMMIT_HASH=$COMMIT_HASH" >> .env

RUN yarn build

# remove devDependencies, keep only used dependencies
RUN yarn install --prod --frozen-lockfile

########################## END OF BUILD STAGE ##########################

FROM node:lts-alpine

RUN apk update && apk upgrade
RUN apk add --no-cache tini

# NODE_ENV is used to configure some runtime options, like JSON logger
ENV NODE_ENV production

ARG COMMIT_HASH=local
ENV COMMIT_HASH=${COMMIT_HASH:-local}

ARG CURRENT_VERSION=Unknown
ENV CURRENT_VERSION=${CURRENT_VERSION:-Unknown}

WORKDIR /app
COPY --from=builderenv /app /app

# Please _DO NOT_ use a custom ENTRYPOINT because it may prevent signals
# (i.e. SIGTERM) to reach the service
# Read more here: https://aws.amazon.com/blogs/containers/graceful-shutdowns-with-ecs/
# and: https://www.ctl.io/developers/blog/post/gracefully-stopping-docker-containers/
ENTRYPOINT ["/sbin/tini", "--", "/app/entrypoint.sh"]

RUN chmod +x /app/entrypoint.sh
17 changes: 17 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
version: '3.8'
services:
postgres:
container_name: "blockchain_event_notifier_db"
image: 'postgres:latest'
restart: always
user: postgres
volumes:
- postgres_volume:/var/lib/postgresql/data
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=pass1234
- POSTGRES_DB=blockchain_event_notifier_db
ports:
- '5455:5432'
volumes:
postgres_volume:
18 changes: 18 additions & 0 deletions entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/sh

finish() {
echo "killing service..."
kill -SIGTERM "$pid" 2>/dev/null;
}

trap finish SIGINT SIGQUIT SIGTERM

echo "running migrations"
./node_modules/.bin/node-pg-migrate -m lib/migrations -d CONNECTION_STRING up

echo "starting service..."
/usr/local/bin/node --trace-warnings --abort-on-uncaught-exception --unhandled-rejections=strict dist/index.js &

pid=$!
echo "runnig on $pid"
wait "$pid"
11 changes: 11 additions & 0 deletions jest.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module.exports = {
moduleFileExtensions: ['ts', 'js'],
transform: {
'^.+\\.(ts|tsx)$': ['ts-jest', { tsconfig: 'test/tsconfig.json' }]
},
coverageDirectory: 'coverage',
collectCoverageFrom: ['src/**/*.ts'],
coveragePathIgnorePatterns: ['/node_modules/', '/src/index.ts'],
testMatch: ['**/*.spec.(ts)'],
testEnvironment: 'node'
}
23 changes: 19 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,43 @@
"license": "Apache-2.0",
"private": true,
"engines": {
"node": "18.*",
"node": "18.* || 20.*",
"npm": "8.* || 9.*"
},
"scripts": {
"build": "tsc -p tsconfig.json",
"start": "node --trace-warnings --abort-on-uncaught-exception --unhandled-rejections=strict dist/index.js",
"migrate": "./node_modules/.bin/node-pg-migrate --envPath ./.env.local --tsconfig ./tsconfig.json -j ts -m ./src/migrations -d CONNECTION_STRING",
"test": "jest --forceExit --detectOpenHandles --coverage --verbose --passWithNoTests",
"lint:check": "yarn workspaces run lint:check",
"lint:fix": "yarn workspaces run lint:fix"
"rundb:local": "docker-compose up -d && docker exec blockchain_event_notifier_db bash -c \"until pg_isready; do sleep 1; done\" && sleep 5",
"lint:check": "eslint '**/*.{js,ts}'",
"lint:fix": "eslint '**/*.{js,ts}' --fix"
},
"dependencies": {
"@aws-sdk/client-sns": "^3.609.0",
"@dcl/catalyst-contracts": "^4.4.1",
"@dcl/platform-server-commons": "^0.0.4",
"@dcl/schemas": "^11.10.4",
"@well-known-components/env-config-provider": "^1.2.0",
"@well-known-components/interfaces": "^1.4.3",
"@well-known-components/logger": "^3.1.3",
"@well-known-components/metrics": "^2.1.0"
"@well-known-components/metrics": "^2.1.0",
"@well-known-components/pg-component": "^0.2.3",
"@well-known-components/thegraph-component": "^1.6.0",
"cron": "^3.1.7"
},
"devDependencies": {
"@dcl/eslint-config": "^2.2.1",
"@types/node": "^20.14.9",
"@well-known-components/test-helpers": "^1.5.6",
"ts-node": "^10.9.2",
"typescript": "^5.5.3"
},
"prettier": {
"printWidth": 120,
"semi": false,
"singleQuote": true,
"trailingComma": "none",
"tabWidth": 2
}
}
56 changes: 56 additions & 0 deletions src/adapters/create-producer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { CronJob } from 'cron'
import { AppComponents, IEventGenerator, IEventProducer } from '../types'

export async function createProducer(
components: Pick<AppComponents, 'logs' | 'database' | 'eventPublisher'>,
producer: IEventGenerator
): Promise<IEventProducer> {
const { logs, database, eventPublisher } = components
const logger = logs.getLogger(`producer-${producer.eventType}`)

let lastSuccessfulRun: number | undefined

async function runProducer(lastSuccessfulRun: number) {
logger.info(`Checking for updates since ${lastSuccessfulRun}.`)

const produced = await producer.run(lastSuccessfulRun)
await database.updateLastUpdateForEventType(produced.eventType, produced.lastRun)

for (const event of produced.records) {
await eventPublisher.publishMessage(event)
}
logger.info(`Published ${produced.records.length} new events.`)

return produced.lastRun
}

async function start(): Promise<void> {
logger.info(`Scheduling producer for ${producer.eventType}.`)

const job = new CronJob(
'0 * * * * *',
async function () {
try {
if (!lastSuccessfulRun) {
lastSuccessfulRun = await database.fetchLastUpdateForEventType(producer.eventType)
}
lastSuccessfulRun = await runProducer(lastSuccessfulRun!)
} catch (e: any) {
logger.error(`Couldn't run producer: ${e.message}.`)
}
},
null,
false,
'UCT'
)
job.start()
}

return {
start,
eventType: () => producer.eventType,
runProducerSinceDate: async (date: number) => {
await runProducer(date)
}
}
}
39 changes: 39 additions & 0 deletions src/adapters/database.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import SQL from 'sql-template-strings'
import { IPgComponent } from '@well-known-components/pg-component'
import { DatabaseComponent } from '../types'

export type DatabaseComponents = {
pg: IPgComponent
}

export function createDatabaseComponent({ pg }: Pick<DatabaseComponents, 'pg'>): DatabaseComponent {
async function fetchLastUpdateForEventType(eventType: string): Promise<number> {
const result = await pg.query<{ last_successful_run_at: number }>(SQL`
SELECT *
FROM cursors
WHERE id = ${eventType};
`)
if (result.rowCount === 0) {
return Date.now()
}

return result.rows[0].last_successful_run_at
}

async function updateLastUpdateForEventType(eventType: string, timestamp: number): Promise<void> {
const query = SQL`
INSERT INTO cursors (id, last_successful_run_at, created_at, updated_at)
VALUES (${eventType}, ${timestamp}, ${Date.now()}, ${Date.now()})
ON CONFLICT (id) DO UPDATE
SET last_successful_run_at = ${timestamp},
updated_at = ${Date.now()};
`

await pg.query<any>(query)
}

return {
fetchLastUpdateForEventType,
updateLastUpdateForEventType
}
}
24 changes: 24 additions & 0 deletions src/adapters/event-publisher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { PublishCommand, SNSClient } from '@aws-sdk/client-sns'
import { AppComponents, EventNotification, IEventPublisher } from '../types'

export async function createEventPublisher({ config }: Pick<AppComponents, 'config'>): Promise<IEventPublisher> {
const snsArn = await config.requireString('AWS_SNS_ARN')
const optionalEndpoint = await config.getString('AWS_SNS_ENDPOINT')

const client = new SNSClient({
endpoint: optionalEndpoint
})

async function publishMessage(event: EventNotification): Promise<string | undefined> {
const { MessageId } = await client.send(
new PublishCommand({
TopicArn: snsArn,
Message: JSON.stringify(event)
})
)

return MessageId
}

return { publishMessage }
}
33 changes: 33 additions & 0 deletions src/adapters/producer-registry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { AppComponents, IEventProducer, IProducerRegistry } from '../types'

export async function createProducerRegistry(components: Pick<AppComponents, 'logs'>): Promise<IProducerRegistry> {
const producers: Map<string, IEventProducer> = new Map<string, IEventProducer>()
const { logs } = components
const logger = logs.getLogger('producer-registry')

function addProducer(producer: IEventProducer) {
if (producers.has(producer.eventType())) {
throw new Error(`Producer for ${producer.eventType} already exists`)
}
logger.info(`Adding producer for ${producer.eventType()}.`)
producers.set(producer.eventType(), producer)
}

async function start(): Promise<void> {
await Promise.all([...producers.values()].map((producer) => producer.start()))
}

function getProducer(eventType: string): IEventProducer {
const producer = producers.get(eventType)
if (!producer) {
throw new Error(`Producer for ${eventType} not found`)
}
return producer
}

return {
addProducer,
getProducer,
start
}
}
Loading

0 comments on commit 2b47109

Please sign in to comment.