Projekt

Obecné

Profil

Stáhnout (3.38 KB) Statistiky
| Větev: | Revize:
1 cb15593b Cajova-Houba
<?php
2
3
namespace Illuminate\Queue;
4
5
use Pheanstalk\Pheanstalk;
6
use Pheanstalk\Job as PheanstalkJob;
7
use Illuminate\Queue\Jobs\BeanstalkdJob;
8
use Illuminate\Contracts\Queue\Queue as QueueContract;
9
10
class BeanstalkdQueue extends Queue implements QueueContract
11
{
12
    /**
13
     * The Pheanstalk instance.
14
     *
15
     * @var \Pheanstalk\Pheanstalk
16
     */
17
    protected $pheanstalk;
18
19
    /**
20
     * The name of the default tube.
21
     *
22
     * @var string
23
     */
24
    protected $default;
25
26
    /**
27
     * The "time to run" for all pushed jobs.
28
     *
29
     * @var int
30
     */
31
    protected $timeToRun;
32
33
    /**
34
     * Create a new Beanstalkd queue instance.
35
     *
36
     * @param  \Pheanstalk\Pheanstalk  $pheanstalk
37
     * @param  string  $default
38
     * @param  int  $timeToRun
39
     * @return void
40
     */
41
    public function __construct(Pheanstalk $pheanstalk, $default, $timeToRun)
42
    {
43
        $this->default = $default;
44
        $this->timeToRun = $timeToRun;
45
        $this->pheanstalk = $pheanstalk;
46
    }
47
48
    /**
49
     * Push a new job onto the queue.
50
     *
51
     * @param  string  $job
52
     * @param  mixed   $data
53
     * @param  string  $queue
54
     * @return mixed
55
     */
56
    public function push($job, $data = '', $queue = null)
57
    {
58
        return $this->pushRaw($this->createPayload($job, $data), $queue);
59
    }
60
61
    /**
62
     * Push a raw payload onto the queue.
63
     *
64
     * @param  string  $payload
65
     * @param  string  $queue
66
     * @param  array   $options
67
     * @return mixed
68
     */
69
    public function pushRaw($payload, $queue = null, array $options = [])
70
    {
71
        return $this->pheanstalk->useTube($this->getQueue($queue))->put(
72
            $payload, Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, $this->timeToRun
73
        );
74
    }
75
76
    /**
77
     * Push a new job onto the queue after a delay.
78
     *
79
     * @param  \DateTime|int  $delay
80
     * @param  string  $job
81
     * @param  mixed   $data
82
     * @param  string  $queue
83
     * @return mixed
84
     */
85
    public function later($delay, $job, $data = '', $queue = null)
86
    {
87
        $payload = $this->createPayload($job, $data);
88
89
        $pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue));
90
91
        return $pheanstalk->put($payload, Pheanstalk::DEFAULT_PRIORITY, $this->getSeconds($delay), $this->timeToRun);
92
    }
93
94
    /**
95
     * Pop the next job off of the queue.
96
     *
97
     * @param  string  $queue
98
     * @return \Illuminate\Contracts\Queue\Job|null
99
     */
100
    public function pop($queue = null)
101
    {
102
        $queue = $this->getQueue($queue);
103
104
        $job = $this->pheanstalk->watchOnly($queue)->reserve(0);
105
106
        if ($job instanceof PheanstalkJob) {
107
            return new BeanstalkdJob($this->container, $this->pheanstalk, $job, $queue);
108
        }
109
    }
110
111
    /**
112
     * Delete a message from the Beanstalk queue.
113
     *
114
     * @param  string  $queue
115
     * @param  string  $id
116
     * @return void
117
     */
118
    public function deleteMessage($queue, $id)
119
    {
120
        $this->pheanstalk->useTube($this->getQueue($queue))->delete($id);
121
    }
122
123
    /**
124
     * Get the queue or return the default.
125
     *
126
     * @param  string|null  $queue
127
     * @return string
128
     */
129
    public function getQueue($queue)
130
    {
131
        return $queue ?: $this->default;
132
    }
133
134
    /**
135
     * Get the underlying Pheanstalk instance.
136
     *
137
     * @return \Pheanstalk\Pheanstalk
138
     */
139
    public function getPheanstalk()
140
    {
141
        return $this->pheanstalk;
142
    }
143
}