Projekt

Obecné

Profil

Stáhnout (2.68 KB) Statistiky
| Větev: | Revize:
1
'use strict'
2
module.exports = RunQueue
3

    
4
var validate = require('aproba')
5

    
6
function RunQueue (opts) {
7
  validate('Z|O', [opts])
8
  if (!opts) opts = {}
9
  this.finished = false
10
  this.inflight = 0
11
  this.maxConcurrency = opts.maxConcurrency || 1
12
  this.queued = 0
13
  this.queue = []
14
  this.currentPrio = null
15
  this.currentQueue = null
16
  this.Promise = opts.Promise || global.Promise
17
  this.deferred = {}
18
}
19

    
20
RunQueue.prototype = {}
21

    
22
RunQueue.prototype.run = function () {
23
  if (arguments.length !== 0) throw new Error('RunQueue.run takes no arguments')
24
  var self = this
25
  var deferred = this.deferred
26
  if (!deferred.promise) {
27
    deferred.promise = new this.Promise(function (resolve, reject) {
28
      deferred.resolve = resolve
29
      deferred.reject = reject
30
      self._runQueue()
31
    })
32
  }
33
  return deferred.promise
34
}
35

    
36
RunQueue.prototype._runQueue = function () {
37
  var self = this
38

    
39
  while ((this.inflight < this.maxConcurrency) && this.queued) {
40
    if (!this.currentQueue || this.currentQueue.length === 0) {
41
      // wait till the current priority is entirely processed before
42
      // starting a new one
43
      if (this.inflight) return
44
      var prios = Object.keys(this.queue)
45
      for (var ii = 0; ii < prios.length; ++ii) {
46
        var prioQueue = this.queue[prios[ii]]
47
        if (prioQueue.length) {
48
          this.currentQueue = prioQueue
49
          this.currentPrio = prios[ii]
50
          break
51
        }
52
      }
53
    }
54

    
55
    --this.queued
56
    ++this.inflight
57
    var next = this.currentQueue.shift()
58
    var args = next.args || []
59

    
60
    // we explicitly construct a promise here so that queue items can throw
61
    // or immediately return to resolve
62
    var queueEntry = new this.Promise(function (resolve) {
63
      resolve(next.cmd.apply(null, args))
64
    })
65

    
66
    queueEntry.then(function () {
67
      --self.inflight
68
      if (self.finished) return
69
      if (self.queued <= 0 && self.inflight <= 0) {
70
        self.finished = true
71
        self.deferred.resolve()
72
      }
73
      self._runQueue()
74
    }, function (err) {
75
      self.finished = true
76
      self.deferred.reject(err)
77
    })
78
  }
79
}
80

    
81
RunQueue.prototype.add = function (prio, cmd, args) {
82
  if (this.finished) throw new Error("Can't add to a finished queue. Create a new queue.")
83
  if (Math.abs(Math.floor(prio)) !== prio) throw new Error('Priorities must be a positive integer value.')
84
  validate('NFA|NFZ', [prio, cmd, args])
85
  prio = Number(prio)
86
  if (!this.queue[prio]) this.queue[prio] = []
87
  ++this.queued
88
  this.queue[prio].push({cmd: cmd, args: args})
89
  // if this priority is higher than the one we're currently processing,
90
  // switch back to processing its queue.
91
  if (this.currentPrio > prio) {
92
    this.currentQueue = this.queue[prio]
93
    this.currentPrio = prio
94
  }
95
}
(3-3/3)