Skip to content

drivly/kafka.do

Repository files navigation

kafka.do

kafka.do is a simple API for managing Kafka-based queues. Below are the available endpoints.

Endpoints

List all registered servers

GET /

Consume from a queue

GET /:queue

Produce a message to a queue

GET /:queue/send/:message

Send a batch of messages

POST /:queue/sendBatch

Payload:

["message1", "message2"]

Acknowledge all messages as consumed

GET /:queue/ackAll

Mark all messages to be retried

GET /:queue/retryAll

Acknowledge a message as consumed

GET /:queue/ack/:messageId

Mark a message to be retried

GET /:queue/retry/:messageId

Parameters

Each queue endpoint registers a topic if it is not already registered and accepts the following parameters to change queue behavior:

  • maxBatchSize: The maximum number of messages allowed in each batch.
  • maxBatchTimeout: The maximum number of seconds to wait until a batch is full.
  • maxRetries: The maximum number of retries for a message, if it fails or retryAll is invoked.
  • deadLetterQueue: The name of another queue to send a message if it fails processing at least maxRetries times. If a deadLetterQueue is not defined, messages that repeatedly fail processing will eventually be discarded. If there is no queue with the specified name, it will be created automatically.
  • maxConcurrency: The maximum number of concurrent consumers allowed to run at once. Leaving this unset means that the number of invocations will scale to the currently supported maximum.

Cloudflare Worker Queue Compatibility

Producer

import { QueueProducer } from 'kafka.do'

export default {
fetch: (req, env, ctx) = {
    QueueProducer('MY_QUEUE', env)
    await env.MY_QUEUE.send({
      url: req.url,
      method: req.method,
      headers: Object.fromEntries(req.headers),
    }
    return new Response('Sent!')
  }
}

And you can send multiple at once:

const sendResultsToQueue = async (results: Array<any>, env: Environment) => {
  const batch: MessageSendRequest[] = results.map((value) => ({
    body: JSON.stringify(value),
  }))
  await env.queue.sendBatch(batch)
}

QueuesContentType

type QueuesContentType = 'text' | 'bytes' | 'json' | 'v8'

Consumer

import { QueueConsumer } from 'kafka.do'

export default QueueConsumer({
  async queue(batch: MessageBatch, env: Environment, ctx: ExecutionContext): Promise<void> {
    for (const message of batch.messages) {
      console.log('Received', message)
    }
  },
})

MessageBatch

interface MessageBatch<Body = unknown> {
  readonly queue: string
  readonly messages: Message<Body>[]
  ackAll(): void
  retryAll(): void
}

Message

interface Message<Body = unknown> {
  readonly id: string
  readonly timestamp: Date
  readonly body: Body
  ack(): void
  retry(): void
}

About

Cloudflare Worker API for Kafka with webhooks

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published