Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support transactions #20

Merged
merged 17 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM node:9.11.1-alpine AS builder
FROM node:15.12.0-alpine AS builder

RUN apk --no-cache add \
bash \
Expand All @@ -20,7 +20,7 @@ COPY package-lock.json /app/package-lock.json

RUN npm install --production

FROM node:9.11.1-alpine
FROM node:15.12.0-alpine

RUN apk --no-cache add \
libsasl openssl lz4-libs
Expand Down
3 changes: 3 additions & 0 deletions bin/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#! /bin/sh

docker-compose -f ./docker-compose.yml up --build && docker-compose rm -f
21 changes: 19 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
version: '2'
version: "3.4"
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
logging:
driver: none
kafka:
image: wurstmeister/kafka
ports:
Expand All @@ -13,8 +15,23 @@ services:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
logging:
driver: none
exporter:
build: .
build:
context: .
dockerfile: Dockerfile
args:
NODE_ENV: development
depends_on:
- zookeeper
- kafka
environment:
KAFKA_URL: kafka:9092
ZOOKEEPER_URL: zookeeper:2181
KAFKA_TOPIC: san_exporter_test
ports:
# port for checking health.
- "127.0.0.1:3000:3000"
entrypoint: "/bin/sh"
command: ["-c", "docker/wait_for_services.sh && node examples/send_dates.js && node examples/send_dates_transaction.js"]
10 changes: 10 additions & 0 deletions docker/wait_for_services.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#! /bin/sh

# Wait for Kafka
until nc -z -v -w30 kafka 9092
do
echo 'Waiting for Kafka...'
sleep 1
done

echo "Kafka is up and running"
26 changes: 13 additions & 13 deletions examples/send_dates.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,17 @@ const pkg = require('../package.json');
const { Exporter } = require('../index')
const exporter = new Exporter(pkg.name)

async function pushData() {
const timestamp = Math.floor(new Date() / 1000)

async function pushData(num_iteration) {
let lastPosition = await exporter.getLastPosition()
console.log(`Last position: ${JSON.stringify(lastPosition)}`)
let key = 1
if(lastPosition) {
key = lastPosition.key + 1
}
const key = lastPosition ? lastPosition.key + 1 : 1

console.log("Sending data...")
const now = new Date()
const timestamp = Math.floor(now.getTime() / 1000)
console.log("Sending data, iteration", num_iteration);
await exporter.sendDataWithKey({
timestamp: timestamp,
iso_date: new Date().toISOString(),
iso_date: now.toISOString(),
key: key
}, "key")

Expand All @@ -25,14 +22,17 @@ async function pushData() {

let newPosition = await exporter.getLastPosition()
console.log(`New position: ${JSON.stringify(newPosition)}`)

setTimeout(pushData, 10000)
}

async function work() {
await exporter.connect()
await exporter.connect();
console.log("Connected to Kafka");

for (i = 0; i <= 10; i++) {
await pushData(i)
}

await pushData()
exporter.disconnect();
}

work()
41 changes: 41 additions & 0 deletions examples/send_dates_transaction.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
const pkg = require('../package.json');
const { Exporter } = require('../index')
const exporter = new Exporter(pkg.name, true)

async function pushData() {
let lastPosition = await exporter.getLastPosition()
console.log(`Last position: ${JSON.stringify(lastPosition)}`)
const key = lastPosition ? lastPosition.key + 1 : 1

const now = new Date()
const timestamp = Math.floor(now.getTime() / 1000)
console.log("Sending data with transaction...")
await exporter.sendDataWithKey({
timestamp: timestamp,
iso_date: now.toISOString(),
key: key
}, "key")

let position = {timestamp: timestamp, key: key}
console.log(`Saving position: ${JSON.stringify(position)}`)
await exporter.savePosition(position)

let newPosition = await exporter.getLastPosition()
console.log(`New position: ${JSON.stringify(newPosition)}`)
YordanPavlov marked this conversation as resolved.
Show resolved Hide resolved
}

async function work() {
console.log("Start sending data with transactions.");
await exporter.connect();
exporter.initTransactions();
exporter.beginTransaction();

for (i = 0; i <= 10; i++) {
await pushData()
}

exporter.commitTransaction();
exporter.disconnect();
}

work()
96 changes: 83 additions & 13 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
const Kafka = require("node-rdkafka");
const Kafka = require("@santiment-network/node-rdkafka");
const zk = require("node-zookeeper-client-async");
const { logger } = require('./logger')

const ZOOKEEPER_URL = process.env.ZOOKEEPER_URL || "localhost:2181";
const zookeeperClient = zk.createAsyncClient(ZOOKEEPER_URL);

const KAFKA_COMPRESSION_CODEC = process.env.KAFKA_COMPRESSION_CODEC || "lz4";
const KAFKA_URL = process.env.KAFKA_URL || "localhost:9092";
Expand All @@ -13,6 +12,7 @@ const BUFFERING_MAX_MESSAGES = parseInt(
);
const KAFKA_MESSAGE_MAX_BYTES = parseInt(process.env.KAFKA_MESSAGE_MAX_BYTES || "10485760")
const FORMAT_HEADER = "format=json;";
const TRANSACTIONS_TIMEOUT_MS = 2000;

process.on("unhandledRejection", (reason, p) => {
// Otherwise unhandled promises are not possible to trace with the information logged
Expand All @@ -28,16 +28,23 @@ process.on("unhandledRejection", (reason, p) => {
});

exports.Exporter = class {
constructor(exporter_name) {
constructor(exporter_name, transactional=false) {
this.exporter_name = exporter_name;
this.producer = new Kafka.Producer({
var producer_settings = {
"metadata.broker.list": KAFKA_URL,
"client.id": this.exporter_name,
"compression.codec": KAFKA_COMPRESSION_CODEC,
"queue.buffering.max.messages": BUFFERING_MAX_MESSAGES,
"message.max.bytes": KAFKA_MESSAGE_MAX_BYTES,
"dr_cb": true
});
}
if (transactional) {
producer_settings['transactional.id'] = this.topic_name;
producer_settings['enable.idempotence'] = true;
}

this.producer = new Kafka.Producer(producer_settings);
this.zookeeperClient = zk.createAsyncClient(ZOOKEEPER_URL);
}

get topic_name() {
Expand All @@ -52,26 +59,56 @@ exports.Exporter = class {
return `/${this.exporter_name}/${this.topic_name}/block-number`;
}

/**
* @returns {Promise} Promise, resolved on connection completed.
*/
async connect() {
logger.info(`Connecting to zookeeper host ${ZOOKEEPER_URL}`);
await zookeeperClient.connectAsync();

try {
await this.zookeeperClient.connectAsync();
}
catch(ex) {
console.error("Error connecting to Zookeeper: ", ex);
throw ex;
}

logger.info(`Connecting to kafka host ${KAFKA_URL}`);
this.producer.connect();
return new Promise((resolve, reject) => {
var promise_result = new Promise((resolve, reject) => {
this.producer.on("ready", resolve);
this.producer.on("event.error", reject);
// The user can provide a callback for delivery reports with the
// dedicated method 'subscribeDeliveryReports'.
this.producer.on("delivery-report", function(err, report) {
if(err) {
throw err;
}
});
});
this.producer.connect();
return promise_result;
}

/**
* Disconnect from Zookeeper and Kafka.
* This method is completed once the callback is invoked.
*/
disconnect(callback) {
logger.info(`Disconnecting from zookeeper host ${ZOOKEEPER_URL}`);
this.zookeeperClient.closeAsync().then( () => {
if (this.producer.isConnected()) {
logger.info(`Disconnecting from kafka host ${KAFKA_URL}`);
this.producer.disconnect(callback);
}
else {
logger.info(`Producer is NOT connected to kafka host ${KAFKA_URL}`);
}
})
}

async getLastPosition() {
if (await zookeeperClient.existsAsync(this.zookeeperPositionNode)) {
const previousPosition = await zookeeperClient.getDataAsync(
if (await this.zookeeperClient.existsAsync(this.zookeeperPositionNode)) {
const previousPosition = await this.zookeeperClient.getDataAsync(
this.zookeeperPositionNode
);

Expand Down Expand Up @@ -100,13 +137,13 @@ exports.Exporter = class {
"utf-8"
);

if (await zookeeperClient.existsAsync(this.zookeeperPositionNode)) {
return zookeeperClient.setDataAsync(
if (await this.zookeeperClient.existsAsync(this.zookeeperPositionNode)) {
return this.zookeeperClient.setDataAsync(
this.zookeeperPositionNode,
newNodeValue
);
} else {
return zookeeperClient.mkdirpAsync(
return this.zookeeperClient.mkdirpAsync(
this.zookeeperPositionNode,
newNodeValue
);
Expand Down Expand Up @@ -151,4 +188,37 @@ exports.Exporter = class {
})
);
}

/**
* Subscribe to delivery reports.
* @param {Function} Callback to be invoked on message delivery.
*/
async subscribeDeliveryReports(callback) {
this.producer.on("delivery-report", callback);
}

/**
* Unsubscribe from delivery reports, restoring the default error checking.
*/
async unSubscribeDeliveryReports() {
this.producer.on("delivery-report", function(err, report) {
if(err) {
throw err;
}
});
}

initTransactions() {
return this.producer.initTransactions(TRANSACTIONS_TIMEOUT_MS);
}
beginTransaction() {
return this.producer.beginTransaction();
}
commitTransaction() {
return this.producer.commitTransaction(TRANSACTIONS_TIMEOUT_MS);
}
abortTransaction() {
return this.producer.abortTransaction(TRANSACTIONS_TIMEOUT_MS);
}

};
Loading