Skip to main content

Consume Messages Using REST API

info

If you do not have a Kafka cluster and/or topic already, follow these steps to create one.

In the cluster details section of the Upstash Console, scroll down the REST API section and and copy UPSTASH_KAFKA_REST_URL, UPSTASH_KAFKA_REST_USERNAME and UPSTASH_KAFKA_REST_PASSWORD using the copy icons next to them.

We will use a Node.js sample code to show how to consume messages using the REST API. Our sample will use a topic named cities and consume previously produced city names from this topic using Kafka consumer groups and automatic offset committing.

Replace following parameters in the code snippets below with your actual values.

const address = "https://tops-stingray-7863-eu1-rest-kafka.upstash.io"
const user = 'G9wcy1zdGluZ3JheS03ODYzJMUX'
const pass = 'eUmYCkAlxEhihIc7Hooi2IA2pz2fw=='
const auth = Buffer.from(`${user}:${pass}`).toString('base64')
const topic = 'cities'

Following code will consume city names using mygroup consumer group id and myconsumer consumer id from the topic starting from the latest offset and print the consumed messages and their offsets to the console:

async function consumeTopic(groupId, consumerId, topic) {
const response = await fetch(`${address}/consume/${groupId}/${consumerId}/${topic}`, {
headers: {'Authorization': `Basic ${auth}`}
});
const messages = await response.json();
messages.forEach(m => {
console.log(`Message: ${m.value}, Offset: ${m.offset}`);
});
}

consumeTopic('mygroup', 'myconsumer', topic)

By default consume API starts consuming from the latest offset. It's also possible to start from the earliest offset by passing Kafka-Auto-Offset-Reset: earliest request header:

async function consumeTopic(groupId, consumerId, topic, offsetReset) {
const response = await fetch(`${address}/consume/${groupId}/${consumerId}/${topic}`, {
headers: {
'Authorization': `Basic ${auth}`,
'Kafka-Auto-Offset-Reset': offsetReset
}
});
const messages = await response.json();
messages.forEach(m => {
console.log(`Message: ${m.value}, Offset: ${m.offset}`);
});
}

consumeTopic('mygroup', 'myconsumer', topic, 'earliest')

We can also go deeper and turn off auto-commit behaviour of the consumer to manually commit the offsets later. To turn off auto commit, we should send Kafka-Enable-Auto-Commit: false header. This allows us to commit the offsets only when all messages processed successfully.

async function consumeTopicWithoutCommit(groupId, consumerId, topic, offsetReset) {
const response = await fetch(`${address}/consume/${groupId}/${consumerId}/${topic}`, {
headers: {
'Authorization': `Basic ${auth}`,
'Kafka-Auto-Offset-Reset': offsetReset,
'Kafka-Enable-Auto-Commit': 'false'
}
});
const messages = await response.json();
messages.forEach(m => {
console.log(`Message: ${m.value}, Offset: ${m.offset}`);
});
}

async function commitOffsetsFor(groupId, consumerId) {
const response = await fetch(`${address}/commit/${groupId}/${consumerId}`, {
headers: {'Authorization': `Basic ${auth}`}
});
const resp = await response.json();
console.log(`Result: ${resp.result}, Error: ${resp.error}, Status: ${resp.status}`);
}

consumeTopicWithoutCommit('mygroup', 'myconsumer', topic, 'earliest')
commitOffsetsFor('mygroup', 'myconsumer')

For more info about using the REST API see Kafka REST Consume API section.