Skip to main content

Connect Using Kafka Clients

Connecting to Upstash Kafka using any Kafka client is very straightforward. If you do not have a Kafka cluster and/or topic already, follow these steps to create one.

After creating a cluster and a topic, just go to cluster details page on the Upstash Console and copy bootstrap endpoint, username and password.

Then replace following parameters in the code snippets of your favourite Kafka client or language below.

  • {{ BOOTSTRAP_ENDPOINT }}
  • {{ UPSTASH_KAFKA_USERNAME }}
  • {{ UPSTASH_KAFKA_PASSWORD }}
  • {{ TOPIC_NAME }}

Create a Topic

class CreateTopic {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");

try (var admin = Admin.create(props)) {
admin.createTopics(
Set.of(new NewTopic("{{ TOPIC_NAME }}", partitions, replicationFactor))
).all().get();
}
}
}

Produce a Message

class Produce {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

try (var producer = new KafkaProducer<String, String>(props)) {
producer.send(new ProducerRecord<>("{{ TOPIC_NAME }}", "Hello Upstash!"));
producer.flush();
}
}
}

Consume Messages

class Consume {
public static void main(String[] args) throws Exception {
var props = new Properties();
props.put("bootstrap.servers", "{{ BOOTSTRAP_ENDPOINT }}");
props.put("sasl.mechanism", "SCRAM-SHA-512");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"{{ UPSTASH_KAFKA_USERNAME }}\" " +
"password=\"{{ UPSTASH_KAFKA_PASSWORD }}\";");

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("group.id", "{{ GROUP_NAME }}");

try(var consumer = new KafkaConsumer<String, String>(props)) {
consumer.subscribe(Collections.singleton("{{ TOPIC_NAME }}"));
var records = consumer.poll(Duration.ofSeconds(10));
for (var record : records) {
System.out.println(record);
}
}
}
}