Projekt

Obecné

Profil

Stáhnout (3.75 KB) Statistiky
| Větev: | Revize:
1
<?php
2

    
3
namespace Illuminate\Queue;
4

    
5
use Aws\Sqs\SqsClient;
6
use Illuminate\Queue\Jobs\SqsJob;
7
use Illuminate\Contracts\Queue\Queue as QueueContract;
8

    
9
class SqsQueue extends Queue implements QueueContract
10
{
11
    /**
12
     * The Amazon SQS instance.
13
     *
14
     * @var \Aws\Sqs\SqsClient
15
     */
16
    protected $sqs;
17

    
18
    /**
19
     * The name of the default tube.
20
     *
21
     * @var string
22
     */
23
    protected $default;
24

    
25
    /**
26
     * The sqs prefix url.
27
     *
28
     * @var string
29
     */
30
    protected $prefix;
31

    
32
    /**
33
     * The job creator callback.
34
     *
35
     * @var callable|null
36
     */
37
    protected $jobCreator;
38

    
39
    /**
40
     * Create a new Amazon SQS queue instance.
41
     *
42
     * @param  \Aws\Sqs\SqsClient  $sqs
43
     * @param  string  $default
44
     * @param  string  $prefix
45
     * @return void
46
     */
47
    public function __construct(SqsClient $sqs, $default, $prefix = '')
48
    {
49
        $this->sqs = $sqs;
50
        $this->prefix = $prefix;
51
        $this->default = $default;
52
    }
53

    
54
    /**
55
     * Push a new job onto the queue.
56
     *
57
     * @param  string  $job
58
     * @param  mixed   $data
59
     * @param  string  $queue
60
     * @return mixed
61
     */
62
    public function push($job, $data = '', $queue = null)
63
    {
64
        return $this->pushRaw($this->createPayload($job, $data), $queue);
65
    }
66

    
67
    /**
68
     * Push a raw payload onto the queue.
69
     *
70
     * @param  string  $payload
71
     * @param  string  $queue
72
     * @param  array   $options
73
     * @return mixed
74
     */
75
    public function pushRaw($payload, $queue = null, array $options = [])
76
    {
77
        $response = $this->sqs->sendMessage(['QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload]);
78

    
79
        return $response->get('MessageId');
80
    }
81

    
82
    /**
83
     * Push a new job onto the queue after a delay.
84
     *
85
     * @param  \DateTime|int  $delay
86
     * @param  string  $job
87
     * @param  mixed   $data
88
     * @param  string  $queue
89
     * @return mixed
90
     */
91
    public function later($delay, $job, $data = '', $queue = null)
92
    {
93
        $payload = $this->createPayload($job, $data);
94

    
95
        $delay = $this->getSeconds($delay);
96

    
97
        return $this->sqs->sendMessage([
98
            'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'DelaySeconds' => $delay,
99

    
100
        ])->get('MessageId');
101
    }
102

    
103
    /**
104
     * Pop the next job off of the queue.
105
     *
106
     * @param  string  $queue
107
     * @return \Illuminate\Contracts\Queue\Job|null
108
     */
109
    public function pop($queue = null)
110
    {
111
        $queue = $this->getQueue($queue);
112

    
113
        $response = $this->sqs->receiveMessage(
114
            ['QueueUrl' => $queue, 'AttributeNames' => ['ApproximateReceiveCount']]
115
        );
116

    
117
        if (count($response['Messages']) > 0) {
118
            if ($this->jobCreator) {
119
                return call_user_func($this->jobCreator, $this->container, $this->sqs, $queue, $response);
120
            } else {
121
                return new SqsJob($this->container, $this->sqs, $queue, $response['Messages'][0]);
122
            }
123
        }
124
    }
125

    
126
    /**
127
     * Define the job creator callback for the connection.
128
     *
129
     * @param  callable  $callback
130
     * @return $this
131
     */
132
    public function createJobsUsing(callable $callback)
133
    {
134
        $this->jobCreator = $callback;
135

    
136
        return $this;
137
    }
138

    
139
    /**
140
     * Get the queue or return the default.
141
     *
142
     * @param  string|null  $queue
143
     * @return string
144
     */
145
    public function getQueue($queue)
146
    {
147
        $queue = $queue ?: $this->default;
148

    
149
        if (filter_var($queue, FILTER_VALIDATE_URL) !== false) {
150
            return $queue;
151
        }
152

    
153
        return rtrim($this->prefix, '/').'/'.($queue);
154
    }
155

    
156
    /**
157
     * Get the underlying SQS instance.
158
     *
159
     * @return \Aws\Sqs\SqsClient
160
     */
161
    public function getSqs()
162
    {
163
        return $this->sqs;
164
    }
165
}
(15-15/18)