Background tasks in Node with Bull and Redis

Published on February 21st, 2022 in Express/NodeJs

Node is a great platform for asynchronous programming. However, since all the code executes inside a single thread, it is poor at taking advantage of modern multiprocessor systems. It is also fairly easy for one CPU-intensive task to block everything else, severely degrading performance.

A solution to this is to move the CPU-intensive work to other processes and free up the main process to continue serving user requests. A popular Node package for this is bull, which implements a messaging queue on top of Redis.

With bull, handing off work to a background worker consists of defining a queue:

import Queue from "bull";
const queue = new Queue("my-queue");

attaching consumers to the queue to receive the messages and do the required work:

queue.process(async job => {
    console.log(`Hello ${job.data.name}!`);
});

and putting messages on the queue to trigger the consumers:

queue.add({name: "world"});

Background task pattern

Working on the level of queues, messages, consumers, and producers is very flexible and allows us to use various different queuing patterns. However, for a specific problem of just offloading something to a background worker, it is a bit too verbose.

To simplify how we use it, we can accept some constraints and use them to hide away a few details:

Then we can have a helper module (I usually put this in src/utils/queue.js) that sets things up:

import Queue from "bull";

// This will import every exported symbol from "src/tasks.js" and then we'll
// treat it as a message consumer (ie. a worker function)
import * as taskHandlers from "../tasks.js";

// we'll only use one queue
const taskQueue = new Queue("background-tasks");

// proxy functions that will make it easy to invoke remote tasks
export const tasks = {};

// keep track of the jobs we started from this process so our proxies can return
// when the job is done
export const startedJobs = {};

/**
 * Initializes proxy functions for all consumers (workers) from "src/tasks.js"
 */
async function init() {
  for (const name in taskHandlers) {
    // anonymous proxy function
    tasks[name] = async (data) => {
      // adds the job to the queue with the provided data
      const job = await taskQueue.add(name, data);

      return new Promise((resolve, reject) => {
        // remember our promise so we can resolve/reject it later
        startedJobs[job.id] = { resolve, reject };
      });
    };
  }
}
init();

// Complete jobs that we (our process) started are resolved
taskQueue.on('global:completed', (jobId, result) => {
  const localJob = startedJobs[jobId];
  if (!localJob) return;

  delete startedJobs[jobId];
  // if the task returned a result, bull will have it JSON-encoded
  const resultData = JSON.parse(result);
  localJob.resolve(resultData);
});

// Failed jobs that we (our process) started are rejected
taskQueue.on('global:failed', (jobId, err) => {
  const localJob = startedJobs[jobId];
  if (!localJob) return;

  delete startedJobs[jobId];
  localJob.reject(err);
});

/**
 * Start task workers
 *
 * Attaches task consumers (handler functions) to
 * the queue to actually execute the tasks.
 *
 * This function should be called in processes that
 * want to listen to the task queue and execute queued
 * tasks.
 *
 * Usually, it's only used from "scripts/worker.js".
 */
export function startWorkers() {
  for (const name in taskHandlers) {
    const handler = taskHandlers[name];
    taskQueue.process(name, handler);
  }
}

I also like to define a worker script (scripts/worker.js) that just runs the worker functions:

import { startWorkers } from "../src/utils/queue.js";
startWorkers();

And for good measure, add it as a script in package.json:

scripts: {
    ...
    "worker": "node scripts/worker"
}

The tasks themselves are defined in src/tasks.js, here's an example:

export async function helloWorld() {
    console.log("Hello, World!");
}

Running background tasks

Now we can invoke background tasks like this:

import { tasks } from "src/utils.queue.js";

tasks.helloWorld();

When we run the worker:

npm run worker

We should see the output Hello, World!

We can also run multiple workers across different machines, or even add/remove workers as needed to scale our processing capacity.

Further work

This is just a simple but useful abstraction of messaging queue that the bull package gives us. We haven't touched on concurrency, priorities, multiple queues, delayed, repeated, or retried tasks, and other things. Be sure to read the package documentation to see what else bull is capable of. Also, there's a rewrite in TypeScript called BullMQ so if you prefer TypeScript be sure to check that one.

If you'd like to play around with background tasks and see this in action, you can create a demo Node/Express project on API Bakery and get this auto-generated and ready to test and use. Make sure you enable Bull support, and check src/utils/queue.js, src/tasks.js, scripts/worker.js and the documentation in README.md.


About API Bakery

API Bakery generates boilerplate code for your backend service in seconds. Out of the box you get routes, data models, authentication, validation, tests, and integrations - customized for you and ready for production.

Try it now for free No sign up required