Skip to main content

Connect with upstash-kafka

upstash-kafka is an HTTP/REST based Kafka client built on top of Upstash Kafka REST API.

It is the only connectionless (HTTP based) Kafka client and designed to work with:

  • Serverless functions (AWS Lambda ...)
  • Cloudflare Workers (see the example)
  • Fastly Compute@Edge
  • Next.js Edge, Remix, Nuxt ...
  • Client side web/mobile applications
  • WebAssembly
  • and other environments where HTTP is preferred over TCP.

Quick Start

Install

npm install @upstash/kafka

Authenticate

Copy URL, username and password from Upstash Console

import { Kafka } from "@upstash/kafka"

const kafka = new Kafka({
url: "<UPSTASH_KAFKA_REST_URL>",
username: "<UPSTASH_KAFKA_REST_USERNAME>",
password: "<UPSTASH_KAFKA_REST_PASSWORD>",
})

Produce

const p = kafka.producer()
const message = { hello: "world" } // Objects will get serialized using `JSON.stringify`
const response = await p.produce("TOPIC", message)
const response2 = await p.produce("TOPIC", message, {
partition: 1,
timestamp: 4567,
key: "KEY",
headers: [{ key: "TRACE-ID", value: "32h67jk" }],
})

Produce Many

const p = kafka.producer()
const res = await p.produceMany([
{
topic: "TOPIC",
value: "MESSAGE",
// ...options
},
{
topic: "TOPIC-2",
value: "MESSAGE-2",
// ...options
},
])

Consume

note

When a new consumer instance is created, it may return empty messages until consumer group coordination is completed.

const c = kafka.consumer()
const messages = await c.consume({
consumerGroupId: "group_1",
instanceId: "instance_1",
topics: ["test.topic"],
autoOffsetReset: "earliest",
})

Commit

While consume commits automatically, you can commit manually as below:

const consumerGroupId = "mygroup"
const instanceId = "myinstance"
const topic = "my.topic"

const c = kafka.consumer()
const messages = await c.consume({
consumerGroupId,
instanceId,
topics: [topic],
autoCommit: false,
})

for (const message of messages) {
// message handling logic

await c.commit({
consumerGroupId,
instanceId,
offset: {
topic: message.topic,
partition: message.partition,
offset: message.offset,
},
})
}

Fetch

const c = kafka.consumer()
const messages = await c.fetch({
topic: "greeting",
partition: 3,
offset: 42,
timeout: 1000,
})

Examples

See here for more examples.