1
|
<?php
|
2
|
|
3
|
namespace Illuminate\Queue;
|
4
|
|
5
|
use Pheanstalk\Pheanstalk;
|
6
|
use Pheanstalk\Job as PheanstalkJob;
|
7
|
use Illuminate\Queue\Jobs\BeanstalkdJob;
|
8
|
use Illuminate\Contracts\Queue\Queue as QueueContract;
|
9
|
|
10
|
class BeanstalkdQueue extends Queue implements QueueContract
|
11
|
{
|
12
|
/**
|
13
|
* The Pheanstalk instance.
|
14
|
*
|
15
|
* @var \Pheanstalk\Pheanstalk
|
16
|
*/
|
17
|
protected $pheanstalk;
|
18
|
|
19
|
/**
|
20
|
* The name of the default tube.
|
21
|
*
|
22
|
* @var string
|
23
|
*/
|
24
|
protected $default;
|
25
|
|
26
|
/**
|
27
|
* The "time to run" for all pushed jobs.
|
28
|
*
|
29
|
* @var int
|
30
|
*/
|
31
|
protected $timeToRun;
|
32
|
|
33
|
/**
|
34
|
* Create a new Beanstalkd queue instance.
|
35
|
*
|
36
|
* @param \Pheanstalk\Pheanstalk $pheanstalk
|
37
|
* @param string $default
|
38
|
* @param int $timeToRun
|
39
|
* @return void
|
40
|
*/
|
41
|
public function __construct(Pheanstalk $pheanstalk, $default, $timeToRun)
|
42
|
{
|
43
|
$this->default = $default;
|
44
|
$this->timeToRun = $timeToRun;
|
45
|
$this->pheanstalk = $pheanstalk;
|
46
|
}
|
47
|
|
48
|
/**
|
49
|
* Push a new job onto the queue.
|
50
|
*
|
51
|
* @param string $job
|
52
|
* @param mixed $data
|
53
|
* @param string $queue
|
54
|
* @return mixed
|
55
|
*/
|
56
|
public function push($job, $data = '', $queue = null)
|
57
|
{
|
58
|
return $this->pushRaw($this->createPayload($job, $data), $queue);
|
59
|
}
|
60
|
|
61
|
/**
|
62
|
* Push a raw payload onto the queue.
|
63
|
*
|
64
|
* @param string $payload
|
65
|
* @param string $queue
|
66
|
* @param array $options
|
67
|
* @return mixed
|
68
|
*/
|
69
|
public function pushRaw($payload, $queue = null, array $options = [])
|
70
|
{
|
71
|
return $this->pheanstalk->useTube($this->getQueue($queue))->put(
|
72
|
$payload, Pheanstalk::DEFAULT_PRIORITY, Pheanstalk::DEFAULT_DELAY, $this->timeToRun
|
73
|
);
|
74
|
}
|
75
|
|
76
|
/**
|
77
|
* Push a new job onto the queue after a delay.
|
78
|
*
|
79
|
* @param \DateTime|int $delay
|
80
|
* @param string $job
|
81
|
* @param mixed $data
|
82
|
* @param string $queue
|
83
|
* @return mixed
|
84
|
*/
|
85
|
public function later($delay, $job, $data = '', $queue = null)
|
86
|
{
|
87
|
$payload = $this->createPayload($job, $data);
|
88
|
|
89
|
$pheanstalk = $this->pheanstalk->useTube($this->getQueue($queue));
|
90
|
|
91
|
return $pheanstalk->put($payload, Pheanstalk::DEFAULT_PRIORITY, $this->getSeconds($delay), $this->timeToRun);
|
92
|
}
|
93
|
|
94
|
/**
|
95
|
* Pop the next job off of the queue.
|
96
|
*
|
97
|
* @param string $queue
|
98
|
* @return \Illuminate\Contracts\Queue\Job|null
|
99
|
*/
|
100
|
public function pop($queue = null)
|
101
|
{
|
102
|
$queue = $this->getQueue($queue);
|
103
|
|
104
|
$job = $this->pheanstalk->watchOnly($queue)->reserve(0);
|
105
|
|
106
|
if ($job instanceof PheanstalkJob) {
|
107
|
return new BeanstalkdJob($this->container, $this->pheanstalk, $job, $queue);
|
108
|
}
|
109
|
}
|
110
|
|
111
|
/**
|
112
|
* Delete a message from the Beanstalk queue.
|
113
|
*
|
114
|
* @param string $queue
|
115
|
* @param string $id
|
116
|
* @return void
|
117
|
*/
|
118
|
public function deleteMessage($queue, $id)
|
119
|
{
|
120
|
$this->pheanstalk->useTube($this->getQueue($queue))->delete($id);
|
121
|
}
|
122
|
|
123
|
/**
|
124
|
* Get the queue or return the default.
|
125
|
*
|
126
|
* @param string|null $queue
|
127
|
* @return string
|
128
|
*/
|
129
|
public function getQueue($queue)
|
130
|
{
|
131
|
return $queue ?: $this->default;
|
132
|
}
|
133
|
|
134
|
/**
|
135
|
* Get the underlying Pheanstalk instance.
|
136
|
*
|
137
|
* @return \Pheanstalk\Pheanstalk
|
138
|
*/
|
139
|
public function getPheanstalk()
|
140
|
{
|
141
|
return $this->pheanstalk;
|
142
|
}
|
143
|
}
|