Skip to content

Commit

Permalink
Use separate eventstream per namespace (#135)
Browse files Browse the repository at this point in the history
Use separate eventstream per namespace

Signed-off-by: Nicko Guyer <nicko.guyer@kaleido.io>
  • Loading branch information
nguyer authored Feb 29, 2024
1 parent 6f759a2 commit 572b515
Show file tree
Hide file tree
Showing 44 changed files with 601 additions and 368 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
PORT=3000
ETHCONNECT_URL=http://127.0.0.1:5102
ETHCONNECT_INSTANCE=/contracts/erc1155
ETHCONNECT_TOPIC=token
CONTRACT_ADDRESS=
AUTO_INIT=true
10 changes: 5 additions & 5 deletions .github/workflows/docker_main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
docker:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- name: Set build tag
id: build_tag_generator
Expand All @@ -25,16 +25,16 @@ jobs:
--label build_date=$(date -u +"%Y-%m-%dT%H:%M:%SZ") \
--label tag=${{ steps.build_tag_generator.outputs.BUILD_TAG }} \
--tag ghcr.io/hyperledger/firefly-tokens-erc1155:${{ steps.build_tag_generator.outputs.BUILD_TAG }} .
- name: Tag release
run: docker tag ghcr.io/hyperledger/firefly-tokens-erc1155:${{ steps.build_tag_generator.outputs.BUILD_TAG }} ghcr.io/hyperledger/firefly-tokens-erc1155:head

- name: Push docker image
run: |
echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin
docker push ghcr.io/hyperledger/firefly-tokens-erc1155:${{ steps.build_tag_generator.outputs.BUILD_TAG }}
- name: Push head tag
run: |
echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin
docker push ghcr.io/hyperledger/firefly-tokens-erc1155:head
docker push ghcr.io/hyperledger/firefly-tokens-erc1155:head
4 changes: 2 additions & 2 deletions .github/workflows/docker_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
docker:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
with:
fetch-depth: 0

Expand All @@ -30,7 +30,7 @@ jobs:
run: |
echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin
docker push ghcr.io/hyperledger/firefly-tokens-erc1155:${GITHUB_REF##*/}
- name: Push head tag
run: |
echo ${{ secrets.GITHUB_TOKEN }} | docker login ghcr.io -u $GITHUB_ACTOR --password-stdin
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Use Node.js
uses: actions/setup-node@v3
with:
node-version: '16.x'
node-version: '20.9.0'
- run: npm ci
- run: npm run test

Expand All @@ -25,12 +25,12 @@ jobs:
run:
working-directory: ./samples/solidity
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4

- name: Use Node.js
uses: actions/setup-node@v3
with:
node-version: '16.x'
node-version: '20.9.0'
- run: npm ci
- run: npm run compile
- run: npm run test
33 changes: 33 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Run Tests",
"runtimeExecutable": "npm",
"args": ["run", "test"],
"request": "launch",
"type": "node",
"outputCapture": "std"
},
{
"name": "Run E2E Tests",
"runtimeExecutable": "npm",
"args": ["run", "test:e2e"],
"request": "launch",
"type": "node",
"outputCapture": "std"
},
{
"type": "node",
"request": "launch",
"name": "Launch Program",
"skipFiles": ["<node_internals>/**"],
"program": "${file}",
"preLaunchTask": "tsc: build - tsconfig.json",
"outFiles": ["${workspaceFolder}/dist/**/*.js"]
}
]
}
9 changes: 9 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"solidity.compileUsingRemoteVersion": "v0.6.12+commit.27d51765",
"editor.codeActionsOnSave": {
"source.fixAll.eslint": "explicit"
},
"eslint.validate": ["javascript"],
"solidity.defaultCompiler": "remote",
"cSpell.words": ["eventstream", "fftm"]
}
16 changes: 9 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM node:16-alpine3.15 as solidity-builder
FROM node:20-alpine3.17 as solidity-build
RUN apk add python3 alpine-sdk
USER node
WORKDIR /home/node
Expand All @@ -7,22 +7,24 @@ RUN npm install
ADD --chown=node:node ./samples/solidity .
RUN npx hardhat compile

FROM node:16-alpine3.15 as builder
FROM node:20-alpine3.17 as build
WORKDIR /root
ADD package*.json ./
RUN npm install
ADD . .
RUN npm run build

FROM node:16-alpine3.15
FROM node:20-alpine3.17
RUN apk add curl
# We also need to keep copying it to the old location to maintain compatibility with the FireFly CLI
COPY --from=solidity-build --chown=1001:0 /home/node/artifacts/contracts/ERC1155MixedFungible.sol/ERC1155MixedFungible.json /root/contracts/
WORKDIR /app
ADD package*.json ./
RUN npm install --production
COPY --from=solidity-builder /home/node/contracts contracts/source
COPY --from=solidity-builder /home/node/artifacts/contracts/ERC1155MixedFungible.sol contracts
COPY --from=builder /root/dist dist
COPY --from=builder /root/.env /app/.env
COPY --from=solidity-build /home/node/contracts contracts/source
COPY --from=solidity-build /home/node/artifacts/contracts/ERC1155MixedFungible.sol contracts
COPY --from=build /root/dist dist
COPY --from=build /root/.env /app/.env
RUN chgrp -R 0 /app/ \
&& chmod -R g+rwX /app/
USER 1001
Expand Down
2 changes: 1 addition & 1 deletion src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2021 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/event-stream.interfaces.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/event-stream.module.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
2 changes: 1 addition & 1 deletion src/event-stream/event-stream.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2022 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down
78 changes: 46 additions & 32 deletions src/event-stream/event-stream.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,11 +18,11 @@ import { HttpService } from '@nestjs/axios';
import { Injectable, Logger } from '@nestjs/common';
import { AxiosRequestConfig } from 'axios';
import { lastValueFrom } from 'rxjs';
import * as WebSocket from 'ws';
import { IAbiMethod } from '../tokens/tokens.interfaces';
import { getHttpRequestOptions, getWebsocketOptions } from '../utils';
import { Context } from '../request-context/request-context.decorator';
import WebSocket from 'ws';
import { FFRequestIDHeader } from '../request-context/constants';
import { Context, newContext } from '../request-context/request-context.decorator';
import { IAbiMethod } from '../tokens/tokens.interfaces';
import { eventStreamName, getHttpRequestOptions, getWebsocketOptions } from '../utils';
import {
Event,
EventBatch,
Expand All @@ -46,6 +46,7 @@ export class EventStreamSocket {
constructor(
private url: string,
private topic: string,
private namespace: string,
private username: string,
private password: string,
private handleEvents: (events: EventBatch) => void,
Expand All @@ -67,7 +68,7 @@ export class EventStreamSocket {
} else {
this.logger.log('Event stream websocket connected');
}
this.produce({ type: 'listen', topic: this.topic });
this.produce({ type: 'listen', topic: `${eventStreamName(this.topic, this.namespace)}` });
this.produce({ type: 'listenreplies' });
this.ping();
})
Expand All @@ -83,6 +84,7 @@ export class EventStreamSocket {
}
})
.on('message', (message: string) => {
this.logger.verbose(`WS => ${message}`);
this.handleMessage(JSON.parse(message));
})
.on('pong', () => {
Expand All @@ -109,7 +111,19 @@ export class EventStreamSocket {
}

ack(batchNumber: number | undefined) {
this.produce({ type: 'ack', topic: this.topic, batchNumber });
this.produce({
type: 'ack',
topic: `${eventStreamName(this.topic, this.namespace)}`,
batchNumber,
});
}

nack(batchNumber: number | undefined) {
this.produce({
type: 'nack',
topic: `${eventStreamName(this.topic, this.namespace)}`,
batchNumber,
});
}

close() {
Expand Down Expand Up @@ -176,11 +190,12 @@ export class EventStreamService {
return config;
}

async getStreams(): Promise<EventStream[]> {
async getStreams(ctx: Context): Promise<EventStream[]> {
const response = await lastValueFrom(
this.http.get<EventStream[]>(new URL('/eventstreams', this.baseUrl).href, {
...getHttpRequestOptions(this.username, this.password),
}),
this.http.get<EventStream[]>(
new URL('/eventstreams', this.baseUrl).href,
this.requestOptions(ctx),
),
);
return response.data;
}
Expand All @@ -192,13 +207,14 @@ export class EventStreamService {
batchSize: 50,
batchTimeoutMS: 500,
type: 'websocket',
websocket: { topic },
websocket: { topic: name },
blockedReryDelaySec: 30, // intentional due to spelling error in ethconnect
inputs: true,
timestamps: true,
};

const existingStreams = await this.getStreams();
const existingStreams = await this.getStreams(ctx);

const stream = existingStreams.find(s => s.name === streamDetails.name);
if (stream) {
const patchedStreamRes = await lastValueFrom(
Expand All @@ -207,12 +223,10 @@ export class EventStreamService {
{
...streamDetails,
},
{
...this.requestOptions(ctx),
},
this.requestOptions(ctx),
),
);
this.logger.log(`Event stream for ${topic}: ${stream.id}`);
this.logger.log(`Event stream for ${name}: ${stream.id}`);
return patchedStreamRes.data;
}
const newStreamRes = await lastValueFrom(
Expand All @@ -221,28 +235,25 @@ export class EventStreamService {
{
...streamDetails,
},
{
...this.requestOptions(ctx),
},
this.requestOptions(ctx),
),
);
this.logger.log(`Event stream for ${topic}: ${newStreamRes.data.id}`);
this.logger.log(`Event stream for ${name}: ${newStreamRes.data.id}`);
return newStreamRes.data;
}

async deleteStream(ctx: Context, id: string) {
await lastValueFrom(
this.http.delete(new URL(`/eventstreams/${id}`, this.baseUrl).href, {
...this.requestOptions(ctx),
}),
this.http.delete(new URL(`/eventstreams/${id}`, this.baseUrl).href, this.requestOptions(ctx)),
);
}

async getSubscriptions(ctx: Context): Promise<EventStreamSubscription[]> {
const response = await lastValueFrom(
this.http.get<EventStreamSubscription[]>(new URL('/subscriptions', this.baseUrl).href, {
...this.requestOptions(ctx),
}),
this.http.get<EventStreamSubscription[]>(
new URL('/subscriptions', this.baseUrl).href,
this.requestOptions(ctx),
),
);
return response.data;
}
Expand Down Expand Up @@ -275,7 +286,7 @@ export class EventStreamService {
): Promise<EventStreamSubscription> {
const response = await lastValueFrom(
this.http.post<EventStreamSubscription>(
new URL(`/subscriptions`, instancePath).href,
`${instancePath}/subscriptions`,
{
name,
stream: streamId,
Expand All @@ -284,9 +295,7 @@ export class EventStreamService {
address,
methods,
},
{
...this.requestOptions(ctx),
},
this.requestOptions(ctx),
),
);
this.logger.log(`Created subscription ${name}: ${response.data.id}`);
Expand Down Expand Up @@ -337,15 +346,20 @@ export class EventStreamService {
return true;
}

connect(
async connect(
url: string,
topic: string,
namespace: string,
handleEvents: (events: EventBatch) => void,
handleReceipt: (receipt: EventStreamReply) => void,
) {
const name = eventStreamName(topic, namespace);
await this.createOrUpdateStream(newContext(), name, topic);

return new EventStreamSocket(
url,
topic,
namespace,
this.username,
this.password,
handleEvents,
Expand Down
Loading

0 comments on commit 572b515

Please sign in to comment.