Implementing a job queue in Nodejs
What is a job queue and when will you need one?
A job queue holds a list of processes waiting to be executed after some condition is met or the workers/processors are ready to execute them.
This is pretty useful when you want to execute a time-consuming process at a given pace.
For example, your server receives 10 requests in a minute, but you can execute only 5 requests per minute because of computing power limitations. In this case, if you try to execute all 10 processes in a minute, your system might not take the load and possibly crash. In order to solve this, you can execute 5 of the requests in a minute and add the other 5 requests to a queue. These will be executed once the worker is idle or have extra computing power.
The Naive approach
The first approach that came to me was using an array to store the job payload and pop the jobs one by one from the array.
The code:
// array for storing and simulating a queue
const queue = []
// a counter for keeping track of currently running jobs
let running = 0
// max number of jobs that can be executed concurrently
const MAX_JOBS = 3 // change this to any number according to your requirement
app.post("/", (req, res) => {
try {
// pick the data you need
const { name, id } = req.body
// add the data to the end of the queue
queue.push({name, id})
} catch (e) {
console.error(e)
}
res.status(200).end()
})
const execute = async () => {
// execute only if currenly running processes are less than max_jobs
if (running < MAX_JOBS && queue.length !== 0) {
// increment currently running jobs
running += 1
// get the first element/job from the queue
const { name, id } = queue.shift()
// replace the below function with your actual code
await timeConsumingFunction(name, id)
running -= 1
}
}
// poll every minute to call execute function
setInterval(() => {
execute()
}, 60 * 1000)
The above code gets the job done in an ideal world. But, this will fail in the real world.
- What happens when you restart the server? You lose the track of currently running processes and also the current jobs in the queue.
- What happens when a job fails during processing? You lose that job forever because it's already been popped from the queue.
- What about multiple workers/instances? You cant actually share the queue with other instances because it lives in your server and is only accessible on your server. So no chance of a distributed system.
To take care of the above scenario we have to use persistent storage to solve the above problems. Let's use Redis.
What is Redis?
Redis is a key-value pair based database. We are using Redis, particularly for the reason that it provides features like data structures (list, set, map, etc) and TTL (time to live) out of the box.
We will be using the list
data structure of Redis to simulate a queue. To learn more about Redis, head to https://redis.io/docs/
And for interacting with Redis we will be using the ioredis
library for Nodejs https://github.com/luin/ioredis
Here, we will still be using the same pattern as before, only the code about creating and handling the queue will change. We will also be now having two queues todo
and processing
. The todo
queue will contain jobs waiting to get executed and processing
will contain jobs that are currently getting processed. We are doing this to have a fail-safe mechanism.
When a job is ready to get executed we move that job from todo
to processing
and only remove the job from processing if the execution was successful. If the execution fails, the job will be moved back to the todo
list at the back of the queue. This way we never really lose the failed jobs and can execute them again.
Let's revisit the three questions again:
- What happens when you restart the server? No data is lost as the job data is stored in a persistent database. On a restart, you can still access the job data waiting to get processed.
- What happens when a job fails during processing? If the system crashes or job execution fails, the failed job is again moved to the
todo
list from theprocessing
list and it gets processed again after a while. - What about multiple workers/instances? Multiple workers and instances can connect to Redis and execute the jobs without relying on your main server.
We will be having two extra parameters for each job to handle failures.
We will use tries
to keep track of how many times the job has been sent to the processing
list. We will just delete the job after 3 tries if it doesn't succeed.
Another parameter will be timestamp
to store check if a job has been in the processing
queue for a long time (most probably the system crashed during the execution). If the difference between the current time and the timestamp
is more than 10 minutes, move this process back to the todo
queue.
// import the library after installing it
const { default: Redis } = require("ioredis");
// pass the connection parameters or pass nothing if connecting to local host
// for localhost, change to `const redis = new Redis()`
const redis = new Redis({
port: process.env.REDIS_PORT,
host: process.env.REDIS_HOST,
password: process.env.REDIS_PASSWORD
});
// export the connection to import in index file
module.exports = { redis };
Let's also change the index.js
file to use Redis instead of an array.
First, we update the code to use a Redis list instead of an array. We import the file above into our main file to use the Redis connection and push new jobs to the back of the Redis todo
queue
// other imports
// importing the redis client connection instance that we created above
const { redis } = require("./redis");
// a counter for keeping track of currently running jobs
let running = 0
// max number of jobs that can be executed concurrently
const MAX_JOBS = 3 // change this to any number according to your requirement
app.post("/", (req, res) => {
try {
// pick the data you need
const { name, id } = req.body
// push the data to the back of the list/queue
await redis.rpush(
'todo', // list name in redis, this will store jobs to be processed
JSON.stringify({ name, id })
)
} catch (e) {
console.error(e)
}
res.status(200).end()
})
A worker will always choose to execute the first/oldest job in the queue. But suppose there are two workers that are executed at the same time and they are accessing the same job i.e. first element of the queue.
We don't want this to be happening otherwise there can be multiple executions of the same job or other unexpected behaviors.
For this reason, we are going to use a locking system that will lock the queue for the current worker and other workers will wait before this worker has finished accessing the job (note that I said accessing the job and not executing it, we will release the lock as soon as the job is accessed/popped from the queue).
Don't worry if you are seeing this concept for the first time. The implementation is quite simple using Redis.
We are using a simple SET
command in Redis to keep a flag if a queue is locked or not. We check every 5 seconds if the flag is not set, if it's not set then we can acquire the lock and set the flag to locked
(this is just a string and can be anything you want!).
Also, we set the lock to expire after 10 seconds because that's more than enough to access the job.
// function for acquiring the lock for this worker
// param `key` is the name of the queue
// so `key` can either take value of `todo` or `processing` as these are our lists
const acquireLock = async (key) => {
return new Promise((resolve, reject) => {
// check every 5 seconds if lock can be acquired
const interval = setInterval(() => {
// set the `todo-lock` or `processing-lock` to "locked", if it doesnt already exist
// "NX" option stands for Only set this value if it not exists and return OK, otherwise return null
// "EX", 10 is the expire time of the lock, this key is deleted after 10 seconds
redis
.set(`${key}-lock`, "locked", "NX", "EX", 10)
.then((res) => {
if (res) { // if res is not null, then we set the lock successfully and can clear the interval
resolve(res);
clearInterval(interval);
}
})
.catch((err) => {
reject(err);
clearInterval(interval);
});
}, 5000);
});
};
// for deleting the lock programatically
// although it will expire after 10 seconds
const releaseLock = async (key) => {
// delete the lock for `todo` or `processing` queue
await redis.del(`${key}-lock`);
};
Let's also update the execute
function. This will first try to acquire the lock for the todo
queue, then access the element and transfer it to processing
queue, then release the lock for todo
queue and execute the job.
This will also delete the job from the processing
queue after acquiring the lock.
const execute = async () => {
// execute only if currenly running processes are less than max_jobs
if (running < MAX_JOBS) {
try {
// try to acquire the lock for `todo` queue if another process havent acquired that already
await acquireLock("todo");
// get the first job
const newProcess = await redis.lindex("todo", 0);
if (newProcess) {
const copy = JSON.parse(newProcess);
copy.timestamp = Date.now(); // add a unix timestamp to the job, we will use this later
copy.tries = copy.tries ? copy.tries + 1 : 1; // update the number of tries for this job
await redis
.multi() // multi is for chaining redis commands in one transaction
.lset("todo", 0, JSON.stringify(copy)) // set the first job with `timestamp` and `tries`
.lmove("todo", "processing", "LEFT", "RIGHT") // move the first job to the back of `processing`
.exec(); // if any command fails, revert everything back
// release lock on `todo` queue as we have accessed the job and moved to `processing` queue
await releaseLock("todo");
// replace this function with your actual code
await timeConsumingFunction();
// job has been executed at this point
await acquireLock("processing"); // acquire lock for `processing` queue
await redis.lrem("processing", 1, JSON.stringify(copy)); // delete the current job from processing as its been executed
await releaseLock("processing"); // release lock for `processing` queue
} else {
await releaseLock("todo"); // release lock for `todo` queue if there was no job in the queue
}
} catch (err) {
console.error(err);
}
running -= 1
}
}
// check for new jobs every five minutes
setInterval(() => {
execute();
}, 5 * 60 * 1000); // tweak this as you need
At this point, we have a system that will check for new jobs and execute them. But what about the failed jobs that crashed due to system crashes? We got that covered!
Every 10 minutes (change this wrt your need), we check if there is a job that's been in the the processing
queue for more than 10 minutes (again, change this wrt your need) using the timestamp
we provided.
Also, delete the job altogether if the tries
have exceeded 3 times (this is optional for your use case).
Also, this doesn't have to be on the same server as your above code, but for simplicity, I have deployed this on the same server.
const retry = async () => {
try {
await acquireLock("processing"); // acquire lock for `processing` to avoid conflicts
const res = await redis.lindex("processing", 0); // get the first processing job
if (res) {
const processing = await JSON.parse(res);
if (Date.now() - Number(processing.timestamp) >= 10 * 60 * 1000) { // check if its been more than 10 minutes
if (Number(processing.tries) < 3) { // if `tries` are less than 3, move back to `todo` queue
await redis
.multi()
.lset("processing", 0, JSON.stringify(processing))
.lmove("processing", "todo", "LEFT", "RIGHT")
.exec();
} else {
await redis.lpop("processing"); // otherwise if tries execeeded, delete the job
}
}
await releaseLock("processing"); // release the lock
} else {
await releaseLock("processing"); // relase the lock if no job
}
} catch (err) {
console.error(err);
}
};
// check every 10 minutes for retrying
setInterval(() => {
retry();
}, 10 * 60 * 1000);
That was all for the code, we now have a working queue with a failsafe mechanism.
The whole code:
// index.js
// other imports
// importing the redis client connection instance that we created above
const { redis } = require("./redis");
// a counter for keeping track of currently running jobs
let running = 0
// max number of jobs that can be executed concurrently
const MAX_JOBS = 3 // change this to any number according to your requirement
app.post("/", (req, res) => {
try {
// pick the data you need
const { name, id } = req.body
// push the data to the back of the list/queue
await redis.rpush(
'todo', // list name in redis, this will store jobs to be processed
JSON.stringify({ name, id })
)
} catch (e) {
console.error(e)
}
res.status(200).end()
})
// function for acquiring the lock for this worker
// param `key` is the name of the queue
// so `key` can either take value of `todo` or `processing` as these are our lists
const acquireLock = async (key) => {
return new Promise((resolve, reject) => {
// check every 5 seconds if lock can be acquired
const interval = setInterval(() => {
// set the `todo-lock` or `processing-lock` to "locked", if it doesnt already exist
// "NX" option stands for Only set this value if it not exists and return OK, otherwise return null
// "EX", 10 is the expire time of the lock, this key is deleted after 10 seconds
redis
.set(`${key}-lock`, "locked", "NX", "EX", 10)
.then((res) => {
if (res) { // if res is not null, then we set the lock successfully and can clear the interval
resolve(res);
clearInterval(interval);
}
})
.catch((err) => {
reject(err);
clearInterval(interval);
});
}, 5000);
});
};
// for deleting the lock programatically
// although it will expire after 10 seconds
const releaseLock = async (key) => {
// delete the lock for `todo` or `processing` queue
await redis.del(`${key}-lock`);
};
const execute = async () => {
// execute only if currenly running processes are less than max_jobs
if (running < MAX_JOBS) {
try {
// try to acquire the lock for `todo` queue if another process havent acquired that already
await acquireLock("todo");
// get the first job
const newProcess = await redis.lindex("todo", 0);
if (newProcess) {
const copy = JSON.parse(newProcess);
copy.timestamp = Date.now(); // add a unix timestamp to the job, we will use this later
copy.tries = copy.tries ? copy.tries + 1 : 1; // update the number of tries for this job
await redis
.multi() // multi is for chaining redis commands in one transaction
.lset("todo", 0, JSON.stringify(copy)) // set the first job with `timestamp` and `tries`
.lmove("todo", "processing", "LEFT", "RIGHT") // move the first job to the back of `processing`
.exec(); // if any command fails, revert everything back
// release lock on `todo` queue as we have accessed the job and moved to `processing` queue
await releaseLock("todo");
// replace this function with your actual code
await timeConsumingFunction();
// job has been executed at this point
await acquireLock("processing"); // acquire lock for `processing` queue
await redis.lrem("processing", 1, JSON.stringify(copy)); // delete the current job from processing as its been executed
await releaseLock("processing"); // release lock for `processing` queue
} else {
await releaseLock("todo"); // release lock for `todo` queue if there was no job in the queue
}
} catch (err) {
console.error(err);
}
running -= 1
}
}
// check for new jobs every five minutes
setInterval(() => {
execute();
}, 5 * 60 * 1000); // tweak this as you need
const retry = async () => {
try {
await acquireLock("processing"); // acquire lock for `processing` to avoid conflicts
const res = await redis.lindex("processing", 0); // get the first processing job
if (res) {
const processing = await JSON.parse(res);
if (Date.now() - Number(processing.timestamp) >= 10 * 60 * 1000) { // check if its been more than 10 minutes
if (Number(processing.tries) < 3) { // if `tries` are less than 3, move back to `todo` queue
await redis
.multi()
.lset("processing", 0, JSON.stringify(processing))
.lmove("processing", "todo", "LEFT", "RIGHT")
.exec();
} else {
await redis.lpop("processing"); // otherwise if tries execeeded, delete the job
}
}
await releaseLock("processing"); // release the lock
} else {
await releaseLock("processing"); // relase the lock if no job
}
} catch (err) {
console.error(err);
}
};
// check every 10 minutes for retrying
setInterval(() => {
retry();
}, 10 * 60 * 1000);
// redis.js
// import the library after installing it
const { default: Redis } = require("ioredis");
// pass the connection parameters or pass nothing if connecting to local host
// for localhost, change to `const redis = new Redis()`
const redis = new Redis({
port: process.env.REDIS_PORT,
host: process.env.REDIS_HOST,
password: process.env.REDIS_PASSWORD
});
// export the connection to import in index file
module.exports = { redis };
We have successfully implemented a job queue from scratch. But if this was too much for you, there are many Nodejs libraries that provide this feature, one of them is BullMQ https://docs.bullmq.io/
This library also uses Redis behind the scenes to implement tons of features. Let's look at an example of how we can use it.
import { Queue, Worker, QueueScheduler } from 'bullmq'
// necessary for retrying failed jobs
const myQueueScheduler = new QueueScheduler('todo');
const MAX_JOBS = 3
// Create a queue, we dont need two queues here
// because BullMQ handles problems like locking resources, fail safes, etc
const queue = new Queue('todo', {
connection: {
port: process.env.REDIS_PORT, // pass in the redis credentials
host: process.env.REDIS_HOST,
password: process.env.REDIS_PASSWORD
},
defaultJobOptions: {
attempts: 3, // no of attempts for failed jobs
backoff: { // retry after every 2^n times where n=1, 2, 3,...
type: 'exponential',
delay: 1000, // 1 sec
},
},
});
// worker is like the execute function we wrote above
// is executes the jobs in the queue as soon as it is available
const worker = new Worker('execute', async (job) => {
const { name, id } = job.data;
// your actual time consuming process
await yourTimeConsumingFunction(name, id);
}, {
// max number of jobs that can run concurrently
// another way is removing this option and making multiple workers like worker1, worker2, etx
concurrency: MAX_JOBS,
connection: {
port: process.env.REDIS_PORT, // pass in the redis credentials
host: process.env.REDIS_HOST,
password: process.env.REDIS_PASSWORD
}
});
app.post("/", (req, res) => {
try {
// pick the data you need
const { name, id } = req.body
// push the data to the back of the above queue
await queue.add(
'job', // job name, doesnt have to be unique
{ name, id } // actual data to be used during execution
)
} catch (e) {
console.error(e)
}
res.status(200).end()
})
That's all folks, thanks for reading! See you at the next one!