Skip to main content

Job Processing and Event Queue with Serverless Redis

Motivation#

Serverless functions are great for many tasks with their dynamic scaling and flexible pricing models. But when you have a task which is composed of long running complex steps, it is not feasible to run it in a single serverless function. A simple solution is simply to offload complicated tasks from the serverless function. You can process those asynchronously in your preferred environment, this can be other serverless functions, serverless containers or traditional server based processes too. To offload your tasks, you need a reliable event queue. In this article we will use Upstash Redis for this purpose.

Scenario#

You are developing a New Employee Registration form for your company. Saving employee records to the database is the easy part. Here possible things to do:

  • Create accounts (email, slack etc).
  • Send email to the employee.
  • Send email to the hiring manager and others.
  • Create a JIRA ticket for the IT department so they will set up the employee’s computer.

This list can be longer for bigger companies.

  • You want the form to be responsive. You do want a new employee to wait for minutes after clicking submit.
  • The above steps are subject to change. You do not want to update your code whenever a new procedure is added.

Decoupling the side procedures will solve the above issues. When a new employee is registered, you can push a new event to the related task queue; then another process will consume the task.

Let’s build the sample application:

Project Setup#

The project will consist of two modules:

  • Producer will be a serverless function which will receive input parameters required to register a new employee. It will also produce events for the task queue.
  • Consumer will be a worker application which will continuously consume the task queue.

(See the source code)

Tech Stack#

Upstash Database#

You can create a free Redis database from Upstash. The good thing about Upstash is its per-request pricing with a $120 price cap. So you will pay what you use but not higher than $120.

After creating a database, copy the endpoint, port and password as you will need in the next steps.

Producer Code#

Our producer will be the serverless function which will get the request parameters and produce the task for the queue. In the real world this code should do things like saving to the database but I will not implement this for the sake of simplicity.

1- Create a Serverless project by serverless command.

➜ serverless
Serverless: No project detected. Do you want to create a new one? Yes
Serverless: What do you want to make? AWS Node.js
Serverless: What do you want to call this project? producer
Project successfully created in 'producer' folder.
You can monitor, troubleshoot, and test your new service with a free Serverless account.
Serverless: Would you like to enable this? No
You can run the “serverless” command again if you change your mind later.

2- Install bull:

npm install bull

3- Function code:

var Queue = require('bull');
var settings = {
stalledInterval: 300000, // How often check for stalled jobs (use 0 for never checking).
guardInterval: 5000, // Poll interval for delayed jobs and added jobs.
drainDelay: 300 // A timeout for when the queue is in drained state (empty waiting for jobs).
}
module.exports.hello = async (event) => {
var taskQueue = new Queue('employee registration',
{redis: {port: 32016, host: 'us1-upward-ant-32016.upstash.io', password: 'ake4ff120d6b4216df220736be7eab087', tls: {}}}
, settings);
await taskQueue.add({event: event})
// TODO save the employee record to a database
return { message: 'New employee event enqueued! 34', event };
};

Note1: Do not forget to replace your own Redis endpoint, port and password. Remove the TLS part if you disabled TLS.

Note2: We give extra parameters (settings) to the event queue (Bull), so it will not exploit Upstash quotas. Update the interval parameters depending on your tolerance to event latency.

Consumer Code#

We will write a basic Node application to consume the events. Create a new directory and run npm init and npm install bull. Then create index.js as below:

var Queue = require('bull');
var settings = {
stalledInterval: 300000, // How often check for stalled jobs (use 0 for never checking).
guardInterval: 5000, // Poll interval for delayed jobs and added jobs.
drainDelay: 300 // A timeout for when the queue is in drained state (empty waiting for jobs).
}
var taskQueue = new Queue('employee registration',
{redis: {port: 32016, host: 'us1-upward-ant-32016.upstash.io', password: 'ake4ff120d6b4216df220736be7eab087', tls: {}}}
, settings);
taskQueue.process(function (job, done) {
console.log(job.data)
// TODO process the new employee event
done();
}).catch(err => {
console.log(err)
});

Note1: Do not forget to replace your own Redis endpoint, port and password. Remove the TLS part if you disabled TLS.

Note2: We give extra parameters (settings) to the event queue (Bull), so it will not exploit Upstash quotas. Update the interval parameters depending on your tolerance to event latency.

Test the Application#

First run the consumer application with

node index

To test the producer code, run:

serverless invoke local -f hello -d "{name:'Bill Gates', email:'bill@upstash.com', position:'Developer', date:'20210620'}"

You will see producer will log as below:

alt_text

And consumer will log as below:

alt_text