[Pkg-javascript-commits] [node-async] 50/480: added async.queue

Jonas Smedegaard js at moszumanska.debian.org
Fri May 2 08:58:11 UTC 2014


This is an automated email from the git hooks/post-receive script.

js pushed a commit to branch master
in repository node-async.

commit 0d031be8a1d1fe0ac9e11369eb1ebb3c23800e57
Author: Caolan McMahon <caolan at caolanmcmahon.com>
Date:   Tue Nov 16 12:59:11 2010 +0000

    added async.queue
---
 README.md          |  50 ++++++++++++++++++++++++
 lib/async.js       |  27 +++++++++++++
 test/test-async.js | 110 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 187 insertions(+)

diff --git a/README.md b/README.md
index 657a69d..68d4a76 100644
--- a/README.md
+++ b/README.md
@@ -82,6 +82,7 @@ So far its been tested in IE6, IE7, IE8, FF3.6 and Chrome 5. Usage:
 * [whilst](#whilst)
 * [until](#until)
 * [waterfall](#waterfall)
+* [queue](#queue)
 * [auto](#auto)
 * [iterator](#iterator)
 * [apply](#apply)
@@ -641,6 +642,55 @@ __Example__
 
 ---------------------------------------
 
+<a name="queue" />
+### queue(worker, concurrency)
+
+Creates a queue object with the specified concurrency. Tasks added to the
+queue will be processed in parallel (up to the concurrency limit). If all
+workers are in progress, the task is queued until one is available. Once
+a worker has completed a task, the task's callback is called.
+
+__Arguments__
+
+* worker(task, callback) - An asynchronous function for processing a queued
+  task.
+* concurrency - An optional callback which is called when all the tasks have
+  been completed. The callback may receive an error as an argument.
+
+__Queue objects__
+
+The queue object returned by this function has the following properties and
+methods:
+
+* length() - a function returning the number of items waiting to be processed.
+* concurrency - an integer for determining how many worker functions should be
+  run in parallel. This property can be changed after a queue is created to
+  alter the concurrency on-the-fly.
+* push(task, callback) - add a new task to the queue, the callback is called
+  once the worker has finished processing the task.
+
+__Example__
+
+    // create a queue object with concurrency 2
+
+    var q = async.queue(function (task, callback) {
+        console.log('hello ' + task.name).
+        callback();
+    }, 2);
+
+
+    // add some items to the queue
+
+    q.push({name: 'foo'}, function (err) {
+        console.log('finished processing foo');
+    });
+    q.push({name: 'bar'}, function (err) {
+        console.log('finished processing bar');
+    });
+
+
+---------------------------------------
+
 <a name="auto" />
 ### auto(tasks, [callback])
 
diff --git a/lib/async.js b/lib/async.js
index 9593fb4..3ae2c97 100644
--- a/lib/async.js
+++ b/lib/async.js
@@ -460,6 +460,33 @@
         else callback();
     };
 
+    async.queue = function (worker, concurrency) {
+        var workers = 0;
+        var tasks = [];
+        var q = {
+            concurrency: concurrency,
+            push: function (data, callback) {
+                tasks.push({data: data, callback: callback});
+                async.nextTick(q.process);
+            },
+            process: function () {
+                if (workers < q.concurrency && tasks.length) {
+                    var task = tasks.splice(0,1)[0];
+                    workers++;
+                    worker(task.data, function () {
+                        workers--;
+                        task.callback.apply(task, arguments);
+                        q.process();
+                    });
+                }
+            },
+            length: function () {
+                return tasks.length;
+            }
+        };
+        return q;
+    };
+
     var _console_fn = function(name){
         return function(fn){
             var args = Array.prototype.slice.call(arguments, 1);
diff --git a/test/test-async.js b/test/test-async.js
index 09df38c..7683c66 100644
--- a/test/test-async.js
+++ b/test/test-async.js
@@ -1050,3 +1050,113 @@ exports['whilst'] = function (test) {
         }
     );
 };
+
+exports['queue'] = function (test) {
+    var call_order = [],
+        delays = [20,10,30,10];
+
+    // worker1: --1-4
+    // worker2: -2---3
+    // order of completion: 2,1,4,3
+
+    var q = async.queue(function (task, callback) {
+        setTimeout(function () {
+            call_order.push('process ' + task);
+            callback('error', 'arg');
+        }, delays.splice(0,1)[0]);
+    }, 2);
+
+    q.push(1, function (err, arg) {
+        test.equal(err, 'error');
+        test.equal(arg, 'arg');
+        test.equal(q.length(), 1);
+        call_order.push('callback ' + 1);
+    });
+    q.push(2, function (err, arg) {
+        test.equal(err, 'error');
+        test.equal(arg, 'arg');
+        test.equal(q.length(), 2);
+        call_order.push('callback ' + 2);
+    });
+    q.push(3, function (err, arg) {
+        test.equal(err, 'error');
+        test.equal(arg, 'arg');
+        test.equal(q.length(), 0);
+        call_order.push('callback ' + 3);
+    });
+    q.push(4, function (err, arg) {
+        test.equal(err, 'error');
+        test.equal(arg, 'arg');
+        test.equal(q.length(), 0);
+        call_order.push('callback ' + 4);
+    });
+    test.equal(q.length(), 4);
+    test.equal(q.concurrency, 2);
+
+    setTimeout(function () {
+        test.same(call_order, [
+            'process 2', 'callback 2',
+            'process 1', 'callback 1',
+            'process 4', 'callback 4',
+            'process 3', 'callback 3'
+        ]);
+        test.equal(q.concurrency, 2);
+        test.equal(q.length(), 0);
+        test.done();
+    }, 60);
+};
+
+exports['queue changing concurrency'] = function (test) {
+    var call_order = [],
+        delays = [20,10,30,10];
+
+    // worker1: --1-2---3-4
+    // order of completion: 1,2,3,4
+
+    var q = async.queue(function (task, callback) {
+        setTimeout(function () {
+            call_order.push('process ' + task);
+            callback('error', 'arg');
+        }, delays.splice(0,1)[0]);
+    }, 2);
+
+    q.push(1, function (err, arg) {
+        test.equal(err, 'error');
+        test.equal(arg, 'arg');
+        test.equal(q.length(), 3);
+        call_order.push('callback ' + 1);
+    });
+    q.push(2, function (err, arg) {
+        test.equal(err, 'error');
+        test.equal(arg, 'arg');
+        test.equal(q.length(), 2);
+        call_order.push('callback ' + 2);
+    });
+    q.push(3, function (err, arg) {
+        test.equal(err, 'error');
+        test.equal(arg, 'arg');
+        test.equal(q.length(), 1);
+        call_order.push('callback ' + 3);
+    });
+    q.push(4, function (err, arg) {
+        test.equal(err, 'error');
+        test.equal(arg, 'arg');
+        test.equal(q.length(), 0);
+        call_order.push('callback ' + 4);
+    });
+    test.equal(q.length(), 4);
+    test.equal(q.concurrency, 2);
+    q.concurrency = 1;
+
+    setTimeout(function () {
+        test.same(call_order, [
+            'process 1', 'callback 1',
+            'process 2', 'callback 2',
+            'process 3', 'callback 3',
+            'process 4', 'callback 4'
+        ]);
+        test.equal(q.concurrency, 1);
+        test.equal(q.length(), 0);
+        test.done();
+    }, 100);
+};

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-javascript/node-async.git



More information about the Pkg-javascript-commits mailing list