Projekt

Obecné

Profil

Stáhnout (8.43 KB) Statistiky
| Větev: | Revize:
1
<?php
2

    
3
namespace Illuminate\Queue;
4

    
5
use DateTime;
6
use Carbon\Carbon;
7
use Illuminate\Database\Connection;
8
use Illuminate\Queue\Jobs\DatabaseJob;
9
use Illuminate\Contracts\Queue\Queue as QueueContract;
10

    
11
class DatabaseQueue extends Queue implements QueueContract
12
{
13
    /**
14
     * The database connection instance.
15
     *
16
     * @var \Illuminate\Database\Connection
17
     */
18
    protected $database;
19

    
20
    /**
21
     * The database table that holds the jobs.
22
     *
23
     * @var string
24
     */
25
    protected $table;
26

    
27
    /**
28
     * The name of the default queue.
29
     *
30
     * @var string
31
     */
32
    protected $default;
33

    
34
    /**
35
     * The expiration time of a job.
36
     *
37
     * @var int|null
38
     */
39
    protected $expire = 60;
40

    
41
    /**
42
     * Create a new database queue instance.
43
     *
44
     * @param  \Illuminate\Database\Connection  $database
45
     * @param  string  $table
46
     * @param  string  $default
47
     * @param  int  $expire
48
     * @return void
49
     */
50
    public function __construct(Connection $database, $table, $default = 'default', $expire = 60)
51
    {
52
        $this->table = $table;
53
        $this->expire = $expire;
54
        $this->default = $default;
55
        $this->database = $database;
56
    }
57

    
58
    /**
59
     * Push a new job onto the queue.
60
     *
61
     * @param  string  $job
62
     * @param  mixed   $data
63
     * @param  string  $queue
64
     * @return mixed
65
     */
66
    public function push($job, $data = '', $queue = null)
67
    {
68
        return $this->pushToDatabase(0, $queue, $this->createPayload($job, $data));
69
    }
70

    
71
    /**
72
     * Push a raw payload onto the queue.
73
     *
74
     * @param  string  $payload
75
     * @param  string  $queue
76
     * @param  array   $options
77
     * @return mixed
78
     */
79
    public function pushRaw($payload, $queue = null, array $options = [])
80
    {
81
        return $this->pushToDatabase(0, $queue, $payload);
82
    }
83

    
84
    /**
85
     * Push a new job onto the queue after a delay.
86
     *
87
     * @param  \DateTime|int  $delay
88
     * @param  string  $job
89
     * @param  mixed   $data
90
     * @param  string  $queue
91
     * @return void
92
     */
93
    public function later($delay, $job, $data = '', $queue = null)
94
    {
95
        return $this->pushToDatabase($delay, $queue, $this->createPayload($job, $data));
96
    }
97

    
98
    /**
99
     * Push an array of jobs onto the queue.
100
     *
101
     * @param  array   $jobs
102
     * @param  mixed   $data
103
     * @param  string  $queue
104
     * @return mixed
105
     */
106
    public function bulk($jobs, $data = '', $queue = null)
107
    {
108
        $queue = $this->getQueue($queue);
109

    
110
        $availableAt = $this->getAvailableAt(0);
111

    
112
        $records = array_map(function ($job) use ($queue, $data, $availableAt) {
113
            return $this->buildDatabaseRecord(
114
                $queue, $this->createPayload($job, $data), $availableAt
115
            );
116
        }, (array) $jobs);
117

    
118
        return $this->database->table($this->table)->insert($records);
119
    }
120

    
121
    /**
122
     * Release a reserved job back onto the queue.
123
     *
124
     * @param  string  $queue
125
     * @param  \StdClass  $job
126
     * @param  int  $delay
127
     * @return mixed
128
     */
129
    public function release($queue, $job, $delay)
130
    {
131
        return $this->pushToDatabase($delay, $queue, $job->payload, $job->attempts);
132
    }
133

    
134
    /**
135
     * Push a raw payload to the database with a given delay.
136
     *
137
     * @param  \DateTime|int  $delay
138
     * @param  string|null  $queue
139
     * @param  string  $payload
140
     * @param  int  $attempts
141
     * @return mixed
142
     */
143
    protected function pushToDatabase($delay, $queue, $payload, $attempts = 0)
144
    {
145
        $attributes = $this->buildDatabaseRecord(
146
            $this->getQueue($queue), $payload, $this->getAvailableAt($delay), $attempts
147
        );
148

    
149
        return $this->database->table($this->table)->insertGetId($attributes);
150
    }
151

    
152
    /**
153
     * Pop the next job off of the queue.
154
     *
155
     * @param  string  $queue
156
     * @return \Illuminate\Contracts\Queue\Job|null
157
     */
158
    public function pop($queue = null)
159
    {
160
        $queue = $this->getQueue($queue);
161

    
162
        $this->database->beginTransaction();
163

    
164
        if ($job = $this->getNextAvailableJob($queue)) {
165
            $job = $this->markJobAsReserved($job);
166

    
167
            $this->database->commit();
168

    
169
            return new DatabaseJob(
170
                $this->container, $this, $job, $queue
171
            );
172
        }
173

    
174
        $this->database->commit();
175
    }
176

    
177
    /**
178
     * Get the next available job for the queue.
179
     *
180
     * @param  string|null  $queue
181
     * @return \StdClass|null
182
     */
183
    protected function getNextAvailableJob($queue)
184
    {
185
        $job = $this->database->table($this->table)
186
                    ->lockForUpdate()
187
                    ->where('queue', $this->getQueue($queue))
188
                    ->where(function ($query) {
189
                        $this->isAvailable($query);
190
                        $this->isReservedButExpired($query);
191
                    })
192
                    ->orderBy('id', 'asc')
193
                    ->first();
194

    
195
        return $job ? (object) $job : null;
196
    }
197

    
198
    /**
199
     * Modify the query to check for available jobs.
200
     *
201
     * @param  \Illuminate\Database\Query\Builder  $query
202
     * @return void
203
     */
204
    protected function isAvailable($query)
205
    {
206
        $query->where(function ($query) {
207
            $query->where('reserved', 0);
208
            $query->where('available_at', '<=', $this->getTime());
209
        });
210
    }
211

    
212
    /**
213
     * Modify the query to check for jobs that are reserved but have expired.
214
     *
215
     * @param  \Illuminate\Database\Query\Builder  $query
216
     * @return void
217
     */
218
    protected function isReservedButExpired($query)
219
    {
220
        $expiration = Carbon::now()->subSeconds($this->expire)->getTimestamp();
221

    
222
        $query->orWhere(function ($query) use ($expiration) {
223
            $query->where('reserved', 1);
224
            $query->where('reserved_at', '<=', $expiration);
225
        });
226
    }
227

    
228
    /**
229
     * Mark the given job ID as reserved.
230
     *
231
     * @param \stdClass $job
232
     * @return \stdClass
233
     */
234
    protected function markJobAsReserved($job)
235
    {
236
        $job->reserved = 1;
237
        $job->attempts = $job->attempts + 1;
238
        $job->reserved_at = $this->getTime();
239

    
240
        $this->database->table($this->table)->where('id', $job->id)->update([
241
            'reserved' => $job->reserved,
242
            'reserved_at' => $job->reserved_at,
243
            'attempts' => $job->attempts,
244
        ]);
245

    
246
        return $job;
247
    }
248

    
249
    /**
250
     * Delete a reserved job from the queue.
251
     *
252
     * @param  string  $queue
253
     * @param  string  $id
254
     * @return void
255
     */
256
    public function deleteReserved($queue, $id)
257
    {
258
        $this->database->beginTransaction();
259

    
260
        if ($this->database->table($this->table)->lockForUpdate()->find($id)) {
261
            $this->database->table($this->table)->where('id', $id)->delete();
262
        }
263

    
264
        $this->database->commit();
265
    }
266

    
267
    /**
268
     * Get the "available at" UNIX timestamp.
269
     *
270
     * @param  \DateTime|int  $delay
271
     * @return int
272
     */
273
    protected function getAvailableAt($delay)
274
    {
275
        $availableAt = $delay instanceof DateTime ? $delay : Carbon::now()->addSeconds($delay);
276

    
277
        return $availableAt->getTimestamp();
278
    }
279

    
280
    /**
281
     * Create an array to insert for the given job.
282
     *
283
     * @param  string|null  $queue
284
     * @param  string  $payload
285
     * @param  int  $availableAt
286
     * @param  int  $attempts
287
     * @return array
288
     */
289
    protected function buildDatabaseRecord($queue, $payload, $availableAt, $attempts = 0)
290
    {
291
        return [
292
            'queue' => $queue,
293
            'attempts' => $attempts,
294
            'reserved' => 0,
295
            'reserved_at' => null,
296
            'available_at' => $availableAt,
297
            'created_at' => $this->getTime(),
298
            'payload' => $payload,
299
        ];
300
    }
301

    
302
    /**
303
     * Get the queue or return the default.
304
     *
305
     * @param  string|null  $queue
306
     * @return string
307
     */
308
    protected function getQueue($queue)
309
    {
310
        return $queue ?: $this->default;
311
    }
312

    
313
    /**
314
     * Get the underlying database instance.
315
     *
316
     * @return \Illuminate\Database\Connection
317
     */
318
    public function getDatabase()
319
    {
320
        return $this->database;
321
    }
322

    
323
    /**
324
     * Get the expiration time in seconds.
325
     *
326
     * @return int|null
327
     */
328
    public function getExpire()
329
    {
330
        return $this->expire;
331
    }
332

    
333
    /**
334
     * Set the expiration time in seconds.
335
     *
336
     * @param  int|null  $seconds
337
     * @return void
338
     */
339
    public function setExpire($seconds)
340
    {
341
        $this->expire = $seconds;
342
    }
343
}
(4-4/18)