node.js: batching parallel async http requests

Some DNS providers limit the number of parallel, asynchronous HTTP requests that you can run simultaneously. Not all providers have this feature and it is commonly called DNS RRL, or DNS Response Rate Limiting. These rate limits are typically used for the very good reason of thwarting DDoS attacks. However, these DNS management tools can also limit legitimate (non-spam) IT shops that are performing their jobs.

The good news is there is a pattern you can use within a Node.js application that let’s you use a blocking timer to wait for one batch of asynchronous parallel requests to complete before the next batch is run. Depending on your provider, this can help you help prevent DNS errors as well as incurring the wrath of your DNS provider. And, it’s still much more efficient than running synchronous requests.

Why not just use async.parallelLimit or something similar? As of the time of this writing, the async library doesn’t have batch capabilities. Therefore, the pattern I’m proposing allows you to fine tune the output HTTP request throttling by giving you the ability to set the delay that occurs between batches. Additionally, you can tokenize your batches to provide more control over handling specific tasks related to the order in which tokens are received.

The commercial use cases for this scenario include polling multiple RSS feeds, as well as JSON and xml feeds. These patterns are typical of large news feed aggregators and monitoring software.

Here is the code you’ll need to accomplish this. NOTE: you’ll want to run this code as a child process.  I’m skipping the nitty-gritty of how to use the Node async library and child processes so that this post can stay focused. You may also want to read my post on Node.js: moving intensive tasks to a child process.

Step 1. Set up a function that executes async.parallel and allows you to pass in both your data array and a token. Since async.parallel doesn’t let you inject a callback, we use a custom event emitter to announce when each batch is complete. This let’s you decouple the blocking timer task from the async task completion event.

this._async = function(/* Array */ data, /* Number */ token){
    try{
        async.parallel(data,function(err,results){
            console.log("Data retrieved! COUNT = " + results.length);
            try{
                var object = {
                    "results":results,
                    "token":token
                }
                event.emit("AsyncComplete",object);
            }
            catch(err){
                console.log("_async process.send() error: " + err.message + "\n" + err.stack);
            }
	}.bind(this._async))
     }
     catch(err){
         console.log("_async error: " + err.message + ", " + err.stack);
     }
}

Step 2. Here we set up our timer to loop in intervals of one second to wait until the current batch job completes. Once it’s complete then we fire off the next batch to async.parallel. We also listen for our custom AsyncComplete event to fire, and once that does then we take the results and build an array until we have all the tokens back. Once all tokens are received then we send the final results array back to the parent process via process.send().

this._loopArray = function(/* Array */ arr){

    var previousVal = 0;
    var segment = null;
    var length = arr.length;

    var remaining = 0; // number of tokens remaining
    var t = 0;         // number of loops counter
    var token = 0;     // token received back from async
    var count = 1;     // internal token up counter

    var totalTokens = this._isEven(arr / 10) ? arr.length / 10 : Math.ceil( arr.length /10);

    var resultsArray = [];
    var result = null;

    event.on("AsyncComplete",function(event){

        console.log("async complete " + event.token);

        result = event.results;
        resultsArray.push(result);
        token = parseInt(event.token);

        if(count == totalTokens){
            console.log("total has been reached");
            process.send(resultsArray);
        }
    }.bind(this))

    if(length > 0){
        var timer = setInterval(function(){

        console.log("t= " + t + ", token= " + token + ", " + totalTokens)

            if(t == token && count <= totalTokens){
                count++;
                if(t <= length) t+=10;
                remaining = length - t;

                if(remaining > 10){
                    segment = arr.slice(previousVal,t);
                    console.log("segment length " + segment.length)
                    previousVal = t;
                }
                else{
                    segment = arr.slice(previousVal,length);
                }

                console.log("remaining = " + remaining);

                if(segment != null && t != 0){
                    this._async(segment,t);
                }

                if(remaining <10)clearTimeout(timer);
            }

	    console.log("tick");
        }.bind(this),1000);
    }
}

Complete code snippet. Here’s all the code you’ll need for the child process.


//Retriever.js - batch processor for async.parallel requests

var http = require("http");
var async = require("async");
var Event = require("events").EventEmitter;

process.on('message',function(msg){

    this._async = function(/* Array */ data, /* Number */ token){
        try{
            async.parallel(data,function(err,results){
                console.log("Data retrieved! COUNT = " + results.length);
                try{
                    var object = {
                        "results":results,
                        "token":token
                    }
                    event.emit("AsyncComplete",object);
                }
                catch(err){
                    console.log("_async process.send() error: " + err.message + "\n" + err.stack);
                }
             }.bind(this._async))
         }
         catch(err){
             console.log("_async error: " + err.message + ", " + err.stack);
         }
    }

    this._loopArray = function(/* Array */ arr){

        var previousVal = 0;
        var segment = null;
        var length = arr.length;

        var remaining = 0; // number of tokens remaining
        var t = 0;         // number of loops counter
        var token = 0;     // token received back from async
        var count = 1;     // internal token up counter

        var totalTokens = this._isEven(arr / 10) ? arr.length / 10 : Math.ceil( arr.length /10);

        var resultsArray = [];
        var result = null;

        event.on("AsyncComplete",function(event){

            console.log("async complete " + event.token);

            result = event.results;
            resultsArray.push(result);
            token = parseInt(event.token);

            if(count == totalTokens){
                console.log("total has been reached");
                process.send(resultsArray);
            }
        }.bind(this))

        if(length > 0){
            var timer = setInterval(function(){
                console.log("t= " + t + ", token= " + token + ", " + totalTokens)

                if(t == token && count <= totalTokens){
                    count++;
                    if(t <= length) t+=10;
                    remaining = length - t;

                    if(remaining > 10){
                        segment = arr.slice(previousVal,t);
                        console.log("segment length " + segment.length)
                        previousVal = t;
                    }
                    else{
                        segment = arr.slice(previousVal,length);
                    }

                    console.log("remaining = " + remaining);

                    if(segment != null && t != 0){
                        this._async(segment,t);
                    }

                    if(remaining <10)clearTimeout(timer);
                }

	        console.log("tick");
            }.bind(this),1000);
        }
    }

    this._isEven= function(value){
        if(value%2 == 0)
            return true;
        else
            return false;
    }

    this._init = function(){
        this._loopArr(this._someArr);
    }.bind(this)()
}

process.on('uncaughtException',function(err){
    console.log("retriever.js uncaught exception: " + err.message + "\n" + err.stack);
})

Node.js: moving intensive tasks to a child process

If you have code in your Node application that runs longer than a few seconds then you should consider moving it off the main thread, especially if it is a task that you will be running many times per hour or day. Because Node is single threaded, long running processes can block other code from executing and give end users the perception that your application slow. An example of a CPU intensive task might be retrieving an RSS feed, or multiple RSS feeds, at regular intervals and then post-processing the data.

There are several different ways to handle intensive and repetitive tasks in Node, for this post I’m only going to focus on #2:

  1. External Cron Job. You can implement a server-based cron process that runs completely outside of your application process and kicks off a node application at regular intervals and then writes the results to a database or static file.
  2. Child Process. Or, you can create a new, separate process to handle your long running code and then use what’s called a ‘signal’ to pass data between the new process and your main Node application. This pattern is based on Unix/Linux signals.

There are plenty of articles on the internet that discuss how to set up cron jobs so I’m going to skip that. There are very few if any full blog articles that discuss in detail how to build an application with child processes that handle repetitive and intensive tasks. There are Stack Overflow snippets that are great, but they leave out much of the nitty-gritty of how to get everything working, especially for newb’s.

The good news is the steps for implementing a child process are fairly straightforward. The following psuedo-code snippets demonstrate the steps. You can download or fork the full source code from this github repo.

Step 1. Create a new file for the code that you want to run in a separate process. We’ll name this file retriever.js and it will contain a timer and our mock intensive task. Note that the timer doesn’t have to be in the same file, I just put it there for convenience to help illustrate my point.

I also recommend setting up a counter to keep track of the total number of errors related to sending data from retriever.js back to index.js or specifically related to your intensive task. It’s important for you to know that the child process “can” continue to run even if the parent process stops accepting signals. When this happens the child process will throw errors. By counting the number of errors associated with your task or sending/receiving you can force the child process to fail gracefully.

var timers = require("timers"),
    http = require("http")
    ___backgroundTimer;

process.on('message',function(msg){

    this._longRunningTask = function(data){
        var finalArray = []
        for(var url in data){
            //TODO do something here to create the 'result'
            finalArray.push(result);
        }

        //Send the results back to index.js
        if(finalArray != []){
            var data = {
                "error":null,
                "content":finalArray
            }

            try{
                process.send(data);
            }
            catch(err){
                console.log("retriever.js: problem with process.send() " + err.message + ", " + err.stack);
            }
        }
        else{
            console.log("retriever.js: no data processed");
        }
    }

    this._startTimer = function(){
        var count = 0;

        ___backgroundTimer = timers.setInterval(function(){

            try{
                var date = new Date();
                console.log("retriever.js: datetime tick: " + date.toUTCString());
                this._longRunningTask(msg.content);
            }
            catch(err){
                count++;
                if(count == 3){
                    console.log("retriever.js: shutdown timer...too many errors. " + err.message);
                    clearInterval(___backgroundTimer);
                    process.disconnect();
                }
                else{
                    console.log("retriever.js error: " + err.message + "\n" + err.stack);
                }
            }
        },msg.interval);
    }

    this._init = function(){
        if(msg.content != null || msg.content != "" && msg.start == true){
            this._startTimer();
        }
        else{
            console.log("retriever.js: content empty. Unable to start timer.");
        }
    }.bind(this)()

})

process.on('uncaughtException',function(err){
    console.log("retriever.js: " + err.message + "\n" + err.stack + "\n Stopping background timer");
    clearInterval(___backgroundTimer);
})

Step 2. Create a fork of the current process in index.js. The fork request is executed immediately by the application.

var childProcess = require("child_process");
this._retrieveChild = childProcess.fork("./background/retriever");

Step 3. Pass message(s) from index.js to the forked process using the send() method. Note, since the data being passed back-and-forth is automatically serialized you’ll need to use JavaScript primitives such as Object, String, Integer and Array. Any non-Primitive data will have to be manually serialized and de-serialized down to its component parts.


var _finalizedData = null,
    _httpRequestArray = ["https://someurl","https://someurl2","https://someurl3"];

var data = {
    "start":true,
    "interval": 60 * 60 * 1000,
    "content": _httpRequestArray
}

this._retrieveChild.send(data);

Step 4. Receive messages from the forked process and process them in index.js.

this._retrieveChild.on('message', function(msg){
    console.log("Recv'd message from background process.");
    _finalizedData = msg.content;
}.bind(this))

Step 5. Verify that everything works by running the application and opening it in a web page. You can also use a terminal window and grep for any node processes. If your code was implemented correctly it should run without any errors and grep show your background process running separately from node:


bash-3.2$ ps aux | grep node
andy        79497   1.2  0.1  3039268  15040 s000  S     1:27PM   0:02.87 /usr/local/bin/node --debug-brk ./samples/currentweather/utils/retriever
andy        79531   0.0  0.0  2432768    612 s000  U+    1:31PM   0:00.00 grep node

References:

Github repo: node-background-processer
Node Child Process Class
Linux Signals Fundamentals – Part 1