Using a Persistent Local Queue in Node.js

Datetime:2016-08-23 00:45:09          Topic: Node.js  Leveldb           Share

As we've seen in the previous article of this series , an in-memory queue can be useful in streamlining a work load and retrying by recovering from errors, but it's subject to some problems.

If, for a period of time, the production of messages exceeds the worker capacity, the queue may start taking too much memory.

Also, if the Node.js process goes down, you will lose whatever pending work it had. This may or not pose a problem, depending on the application.

By having the work queue taken out of memory and put into persistent storage we can obviate these problems: by keeping only the current work in memory, we can absorb work peaks, as they will get written onto the disk. Also, when the Node.js process starts, any pending work is resumed.

For the sake of an example, let's say that you're building a home alarm system and want to relay every event (alarm switched off, alarm switched on, door opened, alert, etc.), into a remote service for storage and future audit purposes. This remote service may be down or unreachable at times, but you still want the events to eventually — sorry for the pun — get there.

Let's then build a module that creates and exports a persistent queue where security events will be stored. This module will also be responsible for relaying these events into the remote service.

event_relay.js:

var level = require('level');  
var db = level('./db');

var Jobs = require('level-jobs');

var maxConcurrency = 1;  
var queue = Jobs(db, worker, maxConcurrency);

module.exports = queue;

function worker(event, cb) {  
  sendEventToRemoteService(event, function(err) {
    if (err) console.error('Error processing event %s: %s', event.id, err.message);
    else console.log('event %s successfully relayed', event.id);
    cb(err);
  });
}


function sendEventToRemoteService(event, cb) {  
  setTimeout(function() {
    var err;
    if (Math.random() > 0.5) err = Error('something awful has happened');
    cb(err);
  }, 100);
}

Here we start off by creating a LevelDB database where all the events will be persistently stored, inside a folder named db inside the current directory:

var level = require('level');  
var db = level('./db');  

LevelDB is a generic key-value store, but here we'll not use it directly; instead we'll hand it off to the level-jobs package, which implements a worker queue on top of a LevelDB database:

var Jobs = require('level-jobs');

var maxConcurrency = 1;  
var queue = Jobs(db, worker, maxConcurrency);  

Here we're creating a job queue by providing the LevelDB database that we created earlier and a worker function, and defining a maximum concurrency of 1. The worker function is a function very similar to the async.queue worker function: it accepts a work unit as the first argument and a callback function as the second argument:

function worker(event, cb) {  
  sendEventToRemoteService(event, function(err) {
    if (err) console.error('Error processing event %s: %s', event.id, err.message);
    else console.log('event %s successfully relayed', event.id);
    cb(err);
  });
}

The worker function simply tries to send the work unit (an event, in our case) to the remote event storage service using this sendEventToRemoteService function. If the sending fails for some reason, our callback is invoked with an error, which we propagate to the worker callback. When the worker callback is invoked with an error, level-jobs retries until succeeded (up to a pre-defined maximum number of attempts, using a back-off algorithm internally as in the previous section).

We then simulate event-delivery errors with a 50% probability in a fake implementation of the sendEventToRemoteService function:

function sendEventToRemoteService(event, cb) {  
  setTimeout(function() {
    var err;
    if (Math.random() > 0.5) err = Error('something awful has happened');
    cb(err);
  }, 100);
}

Let's then create an event producer so that we can test our event relay module:

event_producer.js:

var relay = require('./event_relay');

for(var i = 0 ; i < 10; i ++) {  
  relay.push({id: i, event: 'door opened', when: Date.now()});
}

and run it:

$ node event_producer
event 0 successfully relayed  
event 1 successfully relayed  
event 2 successfully relayed  
event 3 successfully relayed  
Error processing event 4: something awful has happened  
Error processing event 4: something awful has happened  
Error processing event 4: something awful has happened  
Error processing event 4: something awful has happened  
event 4 successfully relayed  
Error processing event 5: something awful has happened  
Error processing event 5: something awful has happened  
Error processing event 5: something awful has happened  
event 5 successfully relayed  
event 6 successfully relayed  
Error processing event 7: something awful has happened  
event 7 successfully relayed  
Error processing event 8: something awful has happened  
event 8 successfully relayed  
Error processing event 9: something awful has happened  
event 9 successfully relayed  

We can see by the output that, even though some failed, all the events eventually got relayed. Also, by setting the concurrency to 1, we make sure that no more than one pending work unit is being processed at any given time, no matter what the job creation rate is, thus saving memory and streamlining the work load.

Next article

So far we've seen the usefulness of creating a local queue for streamlining the work load and persisting the work throughout process restarts. However, depending on the type of work that you're doing, this approach still has some problems: if this work is somehow CPU-intensive, you may need to outsource the work to a set of external worker processes.

In the next article of this series we'll be looking at how we can use a queue service to distribute the load between several node processes.

This article was extracted from the Work Queues book, a book from the Node Patterns series .





About List