is a simple API for managing Kafka-based queues. Below are the available endpoints.
GET /:queue
GET /:queue/send/:message
POST /:queue/sendBatch
["message1", "message2"]
GET /:queue/ackAll
GET /:queue/retryAll
GET /:queue/ack/:messageId
GET /:queue/retry/:messageId
Each queue endpoint registers a topic if it is not already registered and accepts the following parameters to change queue behavior:
: The maximum number of messages allowed in each batch.maxBatchTimeout
: The maximum number of seconds to wait until a batch is full.maxRetries
: The maximum number of retries for a message, if it fails or retryAll is invoked.deadLetterQueue
: The name of another queue to send a message if it fails processing at least maxRetries times. If a deadLetterQueue is not defined, messages that repeatedly fail processing will eventually be discarded. If there is no queue with the specified name, it will be created automatically.maxConcurrency
: The maximum number of concurrent consumers allowed to run at once. Leaving this unset means that the number of invocations will scale to the currently supported maximum.
import { QueueProducer } from 'kafka.do'
export default {
fetch: (req, env, ctx) = {
QueueProducer('MY_QUEUE', env)
await env.MY_QUEUE.send({
url: req.url,
method: req.method,
headers: Object.fromEntries(req.headers),
return new Response('Sent!')
And you can send multiple at once:
const sendResultsToQueue = async (results: Array<any>, env: Environment) => {
const batch: MessageSendRequest[] = results.map((value) => ({
body: JSON.stringify(value),
await env.queue.sendBatch(batch)
type QueuesContentType = 'text' | 'bytes' | 'json' | 'v8'
import { QueueConsumer } from 'kafka.do'
export default QueueConsumer({
async queue(batch: MessageBatch, env: Environment, ctx: ExecutionContext): Promise<void> {
for (const message of batch.messages) {
console.log('Received', message)
interface MessageBatch<Body = unknown> {
readonly queue: string
readonly messages: Message<Body>[]
ackAll(): void
retryAll(): void
interface Message<Body = unknown> {
readonly id: string
readonly timestamp: Date
readonly body: Body
ack(): void
retry(): void