Skip to main content

Intro and Kafka Pipeline Setup

In this tutorial series, we will show how to build an end to end real time analytics system. We will stream the traffic (click) events from our web application to Upstash Kafka then we will analyse it on real time using different products. We will implement one simply query with each of the tools:

SELECT city, count() FROM kafka_topic_page_views where  timestamp > now() - INTERVAL 15 MINUTE group by city

Namely, we will query the number of page views from different cities in last 15 minutes. We keep the query and scenario intentionally simple to make the series easy to understand. But you can easily extend the model for your more complex realtime analytics scenarios.

This is the first part of the series where we will set up our pipeline which will stream your web app's click events to Upstash Kafka. We will use edge functions to intercept the web request to send the event to Kafka. We will show how to do that using Cloudflare Workers or Vercel Edge.

In the next parts, we will analyse the data consuming Kafka topic using different products including:

  • ksqlDB
  • Decodable
  • Startree
  • Materialize
  • Tinybird
  • Rockset
  • Immerok

At the end of the series we will share our comparative experiences with each product.

Kafka Setup

Create an Upstash Kafka cluster, and a topic as explained here. You will use the url and credentials in next steps.

Option 1: Cloudflare Workers

You can use Cloudflare Workers to intercept incoming requests to your web site and run a serverless function. You can add Cloudflare Workers to your existing web site without changing any code. You need to create a site in Cloudflare and set nameservers for your domain accordingly. So Cloudflare will be able to intercept the requests to your website and run the Workers function.

We will use Wrangler for Cloudflare Workers set up, so first install Wrangler.

Create a folder for your project and run wrangler init. Choose the default options:

 kafka-examples git:(master) ✗ wrangler init
⛅️ wrangler 2.1.3
-------------------
Using npm as package manager.
✨ Created wrangler.toml
No package.json found. Would you like to create one? (y/n)
✨ Created package.json
Would you like to use TypeScript? (y/n)
✨ Created tsconfig.json
Would you like to create a Worker at src/index.ts?
None
❯ Fetch handler
Scheduled handler
✨ Created src/index.ts

Update src/index.ts as below:

import { Kafka } from "@upstash/kafka"

export interface Env {
}

export default {
async fetch(
request: Request,
env: Env,
ctx: ExecutionContext
): Promise<Response> {
if (new URL(request.url).pathname == "/favicon.ico") {
return new Response(null, { status: 200 });
}

let message = {
country: request.cf?.country,
city: request.cf?.city,
region: request.cf?.region,
url: request.url,
ip: request.headers.get("x-real-ip"),
mobile: request.headers.get("sec-ch-ua-mobile"),
platform: request.headers.get("sec-ch-ua-platform"),
useragent: request.headers.get("user-agent")
}

const kafka = new Kafka({
url: "https://real-goldfish-14081-us1-rest-kafka.upstash.io",
username: "ZGVmaW5453tZ29sZhgf65yzQy26tVXO_fm8Y",
password: "v02ibEEtghfg56OybgOHwjmRdNS3nthrt=",
})

const p = kafka.producer()
const topic = "mytopic"

ctx.waitUntil(p.produce(topic, JSON.stringify(message)))
// if you use CF Workers to intercept your existing site, uncomment below
// return await fetch(request);
return new Response("My website");
},
};

Replace url, username and password above. Also update the name of the topic.

Above, we simply parse the request object and send useful information to Kafka. You may add/remove information depending on your own requirements.

You can test the function locally with wrangler dev. Deploy your function to Cloudflare with wrangler publish

The endpoint of the function will be printed. You can check if logs are collected in Kafka by copying the curl expression from the console:

curl https://real-goldfish-14081-us1-rest-kafka.upstash.io/consume/GROUP_NAME/GROUP_INSTANCE_NAME/mytopic -H "Kafka-Auto-Offset-Reset: earliest" -u \
REPLACE_HERE

Option 2: Vercel Edge Middleware

Vercel Edge middleware allows you to intercept the requests to your application served by Vercel platform. We will create a simple Next.js application and send the traffic events to Upstash Kafka using the Vercel Edge functions.

Create a Next.js application:

npx create-next-app@latest --typescript

Create a middleware.ts (or .js) file at the same level as your pages directory. Update the file as below:

import {NextResponse} from 'next/server';
import type {NextRequest, NextFetchEvent} from 'next/server';
import {Kafka} from "@upstash/kafka";

// Trigger this middleware to run on the `/secret-page` route
export const config = {
matcher: '/',
};

export async function middleware(req: NextRequest, event: NextFetchEvent) {
// Extract country. Default to US if not found.
console.log(req.url)

const kafka = new Kafka({
url: "https://real-goldfish-14081-us1-rest-kafka.upstash.io",
username: "ZGVmaW5453tZ29sZhgf65yzQy26tVXO_fm8Y",
password: "v02ibEEtghfg56OybgOHwjmRdNS3nthrt=",
})

let message = {
country: req.geo?.country,
city: req.geo?.city,
region: req.geo?.region,
url: req.url,
ip: req.headers.get("x-real-ip"),
mobile: req.headers.get("sec-ch-ua-mobile"),
platform: req.headers.get("sec-ch-ua-platform"),
useragent: req.headers.get("user-agent")
}
const p = kafka.producer()
const topic = "mytopic"

event.waitUntil(p.produce(topic, JSON.stringify(message)))

// Rewrite to URL
return NextResponse.next();
}

Replace url, username and password above. Also update the name of the topic.

Above, we simply parse the request object and send useful information to Kafka. You may add/remove information depending on your own requirements.

You can test the function locally with npm run dev. Deploy your function to Vercel with vercel --prod

The endpoint of the function will be printed. You can check if logs are collected in Kafka by copying the curl expression from the console:

curl https://real-goldfish-14081-us1-rest-kafka.upstash.io/consume/GROUP_NAME/GROUP_INSTANCE_NAME/mytopic -H "Kafka-Auto-Offset-Reset: earliest" -u \
REPLACE_HERE

Conclusion

We have successfully built to the pipeline to collect the traffic data from our web application to Upstash Kafka. In the remaining of the series, we will analyze the data in Kafka with different realtime analytics tool which are capable to read from Kafka.