Projekt

Obecné

Profil

Stáhnout (7.96 KB) Statistiky
| Větev: | Revize:
1 cb15593b Cajova-Houba
<?php
2
3
namespace Illuminate\Queue;
4
5
use Illuminate\Support\Arr;
6
use Illuminate\Support\Str;
7
use Illuminate\Redis\Database;
8
use Illuminate\Queue\Jobs\RedisJob;
9
use Illuminate\Contracts\Queue\Queue as QueueContract;
10
11
class RedisQueue extends Queue implements QueueContract
12
{
13
    /**
14
     * The Redis database instance.
15
     *
16
     * @var \Illuminate\Redis\Database
17
     */
18
    protected $redis;
19
20
    /**
21
     * The connection name.
22
     *
23
     * @var string
24
     */
25
    protected $connection;
26
27
    /**
28
     * The name of the default queue.
29
     *
30
     * @var string
31
     */
32
    protected $default;
33
34
    /**
35
     * The expiration time of a job.
36
     *
37
     * @var int|null
38
     */
39
    protected $expire = 60;
40
41
    /**
42
     * Create a new Redis queue instance.
43
     *
44
     * @param  \Illuminate\Redis\Database  $redis
45
     * @param  string  $default
46
     * @param  string  $connection
47
     * @return void
48
     */
49
    public function __construct(Database $redis, $default = 'default', $connection = null)
50
    {
51
        $this->redis = $redis;
52
        $this->default = $default;
53
        $this->connection = $connection;
54
    }
55
56
    /**
57
     * Push a new job onto the queue.
58
     *
59
     * @param  string  $job
60
     * @param  mixed   $data
61
     * @param  string  $queue
62
     * @return mixed
63
     */
64
    public function push($job, $data = '', $queue = null)
65
    {
66
        return $this->pushRaw($this->createPayload($job, $data), $queue);
67
    }
68
69
    /**
70
     * Push a raw payload onto the queue.
71
     *
72
     * @param  string  $payload
73
     * @param  string  $queue
74
     * @param  array   $options
75
     * @return mixed
76
     */
77
    public function pushRaw($payload, $queue = null, array $options = [])
78
    {
79
        $this->getConnection()->rpush($this->getQueue($queue), $payload);
80
81
        return Arr::get(json_decode($payload, true), 'id');
82
    }
83
84
    /**
85
     * Push a new job onto the queue after a delay.
86
     *
87
     * @param  \DateTime|int  $delay
88
     * @param  string  $job
89
     * @param  mixed   $data
90
     * @param  string  $queue
91
     * @return mixed
92
     */
93
    public function later($delay, $job, $data = '', $queue = null)
94
    {
95
        $payload = $this->createPayload($job, $data);
96
97
        $delay = $this->getSeconds($delay);
98
99
        $this->getConnection()->zadd($this->getQueue($queue).':delayed', $this->getTime() + $delay, $payload);
100
101
        return Arr::get(json_decode($payload, true), 'id');
102
    }
103
104
    /**
105
     * Release a reserved job back onto the queue.
106
     *
107
     * @param  string  $queue
108
     * @param  string  $payload
109
     * @param  int  $delay
110
     * @param  int  $attempts
111
     * @return void
112
     */
113
    public function release($queue, $payload, $delay, $attempts)
114
    {
115
        $payload = $this->setMeta($payload, 'attempts', $attempts);
116
117
        $this->getConnection()->zadd($this->getQueue($queue).':delayed', $this->getTime() + $delay, $payload);
118
    }
119
120
    /**
121
     * Pop the next job off of the queue.
122
     *
123
     * @param  string  $queue
124
     * @return \Illuminate\Contracts\Queue\Job|null
125
     */
126
    public function pop($queue = null)
127
    {
128
        $original = $queue ?: $this->default;
129
130
        $queue = $this->getQueue($queue);
131
132
        if (! is_null($this->expire)) {
133
            $this->migrateAllExpiredJobs($queue);
134
        }
135
136
        $job = $this->getConnection()->lpop($queue);
137
138
        if (! is_null($job)) {
139
            $this->getConnection()->zadd($queue.':reserved', $this->getTime() + $this->expire, $job);
140
141
            return new RedisJob($this->container, $this, $job, $original);
142
        }
143
    }
144
145
    /**
146
     * Delete a reserved job from the queue.
147
     *
148
     * @param  string  $queue
149
     * @param  string  $job
150
     * @return void
151
     */
152
    public function deleteReserved($queue, $job)
153
    {
154
        $this->getConnection()->zrem($this->getQueue($queue).':reserved', $job);
155
    }
156
157
    /**
158
     * Migrate all of the waiting jobs in the queue.
159
     *
160
     * @param  string  $queue
161
     * @return void
162
     */
163
    protected function migrateAllExpiredJobs($queue)
164
    {
165
        $this->migrateExpiredJobs($queue.':delayed', $queue);
166
167
        $this->migrateExpiredJobs($queue.':reserved', $queue);
168
    }
169
170
    /**
171
     * Migrate the delayed jobs that are ready to the regular queue.
172
     *
173
     * @param  string  $from
174
     * @param  string  $to
175
     * @return void
176
     */
177
    public function migrateExpiredJobs($from, $to)
178
    {
179
        $options = ['cas' => true, 'watch' => $from, 'retry' => 10];
180
181
        $this->getConnection()->transaction($options, function ($transaction) use ($from, $to) {
182
            // First we need to get all of jobs that have expired based on the current time
183
            // so that we can push them onto the main queue. After we get them we simply
184
            // remove them from this "delay" queues. All of this within a transaction.
185
            $jobs = $this->getExpiredJobs(
186
                $transaction, $from, $time = $this->getTime()
187
            );
188
189
            // If we actually found any jobs, we will remove them from the old queue and we
190
            // will insert them onto the new (ready) "queue". This means they will stand
191
            // ready to be processed by the queue worker whenever their turn comes up.
192
            if (count($jobs) > 0) {
193
                $this->removeExpiredJobs($transaction, $from, $time);
194
195
                $this->pushExpiredJobsOntoNewQueue($transaction, $to, $jobs);
196
            }
197
        });
198
    }
199
200
    /**
201
     * Get the expired jobs from a given queue.
202
     *
203
     * @param  \Predis\Transaction\MultiExec  $transaction
204
     * @param  string  $from
205
     * @param  int  $time
206
     * @return array
207
     */
208
    protected function getExpiredJobs($transaction, $from, $time)
209
    {
210
        return $transaction->zrangebyscore($from, '-inf', $time);
211
    }
212
213
    /**
214
     * Remove the expired jobs from a given queue.
215
     *
216
     * @param  \Predis\Transaction\MultiExec  $transaction
217
     * @param  string  $from
218
     * @param  int  $time
219
     * @return void
220
     */
221
    protected function removeExpiredJobs($transaction, $from, $time)
222
    {
223
        $transaction->multi();
224
225
        $transaction->zremrangebyscore($from, '-inf', $time);
226
    }
227
228
    /**
229
     * Push all of the given jobs onto another queue.
230
     *
231
     * @param  \Predis\Transaction\MultiExec  $transaction
232
     * @param  string  $to
233
     * @param  array  $jobs
234
     * @return void
235
     */
236
    protected function pushExpiredJobsOntoNewQueue($transaction, $to, $jobs)
237
    {
238
        call_user_func_array([$transaction, 'rpush'], array_merge([$to], $jobs));
239
    }
240
241
    /**
242
     * Create a payload string from the given job and data.
243
     *
244
     * @param  string  $job
245
     * @param  mixed   $data
246
     * @param  string  $queue
247
     * @return string
248
     */
249
    protected function createPayload($job, $data = '', $queue = null)
250
    {
251
        $payload = parent::createPayload($job, $data);
252
253
        $payload = $this->setMeta($payload, 'id', $this->getRandomId());
254
255
        return $this->setMeta($payload, 'attempts', 1);
256
    }
257
258
    /**
259
     * Get a random ID string.
260
     *
261
     * @return string
262
     */
263
    protected function getRandomId()
264
    {
265
        return Str::random(32);
266
    }
267
268
    /**
269
     * Get the queue or return the default.
270
     *
271
     * @param  string|null  $queue
272
     * @return string
273
     */
274
    protected function getQueue($queue)
275
    {
276
        return 'queues:'.($queue ?: $this->default);
277
    }
278
279
    /**
280
     * Get the connection for the queue.
281
     *
282
     * @return \Predis\ClientInterface
283
     */
284
    protected function getConnection()
285
    {
286
        return $this->redis->connection($this->connection);
287
    }
288
289
    /**
290
     * Get the underlying Redis instance.
291
     *
292
     * @return \Illuminate\Redis\Database
293
     */
294
    public function getRedis()
295
    {
296
        return $this->redis;
297
    }
298
299
    /**
300
     * Get the expiration time in seconds.
301
     *
302
     * @return int|null
303
     */
304
    public function getExpire()
305
    {
306
        return $this->expire;
307
    }
308
309
    /**
310
     * Set the expiration time in seconds.
311
     *
312
     * @param  int|null  $seconds
313
     * @return void
314
     */
315
    public function setExpire($seconds)
316
    {
317
        $this->expire = $seconds;
318
    }
319
}