node.js: batching parallel async http requests

Some DNS providers limit the number of parallel, asynchronous HTTP requests that you can run simultaneously. Not all providers have this feature and it is commonly called DNS RRL, or DNS Response Rate Limiting. These rate limits are typically used for the very good reason of thwarting DDoS attacks. However, these DNS management tools can also limit legitimate (non-spam) IT shops that are performing their jobs.

The good news is there is a pattern you can use within a Node.js application that let’s you use a blocking timer to wait for one batch of asynchronous parallel requests to complete before the next batch is run. Depending on your provider, this can help you help prevent DNS errors as well as incurring the wrath of your DNS provider. And, it’s still much more efficient than running synchronous requests.

Why not just use async.parallelLimit or something similar? As of the time of this writing, the async library doesn’t have batch capabilities. Therefore, the pattern I’m proposing allows you to fine tune the output HTTP request throttling by giving you the ability to set the delay that occurs between batches. Additionally, you can tokenize your batches to provide more control over handling specific tasks related to the order in which tokens are received.

The commercial use cases for this scenario include polling multiple RSS feeds, as well as JSON and xml feeds. These patterns are typical of large news feed aggregators and monitoring software.

Here is the code you’ll need to accomplish this. NOTE: you’ll want to run this code as a child process.  I’m skipping the nitty-gritty of how to use the Node async library and child processes so that this post can stay focused. You may also want to read my post on Node.js: moving intensive tasks to a child process.

Step 1. Set up a function that executes async.parallel and allows you to pass in both your data array and a token. Since async.parallel doesn’t let you inject a callback, we use a custom event emitter to announce when each batch is complete. This let’s you decouple the blocking timer task from the async task completion event.

this._async = function(/* Array */ data, /* Number */ token){
    try{
        async.parallel(data,function(err,results){
            console.log("Data retrieved! COUNT = " + results.length);
            try{
                var object = {
                    "results":results,
                    "token":token
                }
                event.emit("AsyncComplete",object);
            }
            catch(err){
                console.log("_async process.send() error: " + err.message + "\n" + err.stack);
            }
	}.bind(this._async))
     }
     catch(err){
         console.log("_async error: " + err.message + ", " + err.stack);
     }
}

Step 2. Here we set up our timer to loop in intervals of one second to wait until the current batch job completes. Once it’s complete then we fire off the next batch to async.parallel. We also listen for our custom AsyncComplete event to fire, and once that does then we take the results and build an array until we have all the tokens back. Once all tokens are received then we send the final results array back to the parent process via process.send().

this._loopArray = function(/* Array */ arr){

    var previousVal = 0;
    var segment = null;
    var length = arr.length;

    var remaining = 0; // number of tokens remaining
    var t = 0;         // number of loops counter
    var token = 0;     // token received back from async
    var count = 1;     // internal token up counter

    var totalTokens = this._isEven(arr / 10) ? arr.length / 10 : Math.ceil( arr.length /10);

    var resultsArray = [];
    var result = null;

    event.on("AsyncComplete",function(event){

        console.log("async complete " + event.token);

        result = event.results;
        resultsArray.push(result);
        token = parseInt(event.token);

        if(count == totalTokens){
            console.log("total has been reached");
            process.send(resultsArray);
        }
    }.bind(this))

    if(length > 0){
        var timer = setInterval(function(){

        console.log("t= " + t + ", token= " + token + ", " + totalTokens)

            if(t == token && count <= totalTokens){
                count++;
                if(t <= length) t+=10;
                remaining = length - t;

                if(remaining > 10){
                    segment = arr.slice(previousVal,t);
                    console.log("segment length " + segment.length)
                    previousVal = t;
                }
                else{
                    segment = arr.slice(previousVal,length);
                }

                console.log("remaining = " + remaining);

                if(segment != null && t != 0){
                    this._async(segment,t);
                }

                if(remaining <10)clearTimeout(timer);
            }

	    console.log("tick");
        }.bind(this),1000);
    }
}

Complete code snippet. Here’s all the code you’ll need for the child process.


//Retriever.js - batch processor for async.parallel requests

var http = require("http");
var async = require("async");
var Event = require("events").EventEmitter;

process.on('message',function(msg){

    this._async = function(/* Array */ data, /* Number */ token){
        try{
            async.parallel(data,function(err,results){
                console.log("Data retrieved! COUNT = " + results.length);
                try{
                    var object = {
                        "results":results,
                        "token":token
                    }
                    event.emit("AsyncComplete",object);
                }
                catch(err){
                    console.log("_async process.send() error: " + err.message + "\n" + err.stack);
                }
             }.bind(this._async))
         }
         catch(err){
             console.log("_async error: " + err.message + ", " + err.stack);
         }
    }

    this._loopArray = function(/* Array */ arr){

        var previousVal = 0;
        var segment = null;
        var length = arr.length;

        var remaining = 0; // number of tokens remaining
        var t = 0;         // number of loops counter
        var token = 0;     // token received back from async
        var count = 1;     // internal token up counter

        var totalTokens = this._isEven(arr / 10) ? arr.length / 10 : Math.ceil( arr.length /10);

        var resultsArray = [];
        var result = null;

        event.on("AsyncComplete",function(event){

            console.log("async complete " + event.token);

            result = event.results;
            resultsArray.push(result);
            token = parseInt(event.token);

            if(count == totalTokens){
                console.log("total has been reached");
                process.send(resultsArray);
            }
        }.bind(this))

        if(length > 0){
            var timer = setInterval(function(){
                console.log("t= " + t + ", token= " + token + ", " + totalTokens)

                if(t == token && count <= totalTokens){
                    count++;
                    if(t <= length) t+=10;
                    remaining = length - t;

                    if(remaining > 10){
                        segment = arr.slice(previousVal,t);
                        console.log("segment length " + segment.length)
                        previousVal = t;
                    }
                    else{
                        segment = arr.slice(previousVal,length);
                    }

                    console.log("remaining = " + remaining);

                    if(segment != null && t != 0){
                        this._async(segment,t);
                    }

                    if(remaining <10)clearTimeout(timer);
                }

	        console.log("tick");
            }.bind(this),1000);
        }
    }

    this._isEven= function(value){
        if(value%2 == 0)
            return true;
        else
            return false;
    }

    this._init = function(){
        this._loopArr(this._someArr);
    }.bind(this)()
}

process.on('uncaughtException',function(err){
    console.log("retriever.js uncaught exception: " + err.message + "\n" + err.stack);
})