Projekt

Obecné

Profil

Stáhnout (3.88 KB) Statistiky
| Větev: | Revize:
1
<?php
2

    
3
namespace Illuminate\Bus;
4

    
5
use Closure;
6
use RuntimeException;
7
use Illuminate\Pipeline\Pipeline;
8
use Illuminate\Contracts\Queue\Queue;
9
use Illuminate\Contracts\Queue\ShouldQueue;
10
use Illuminate\Contracts\Container\Container;
11
use Illuminate\Contracts\Bus\QueueingDispatcher;
12

    
13
class Dispatcher implements QueueingDispatcher
14
{
15
    /**
16
     * The container implementation.
17
     *
18
     * @var \Illuminate\Contracts\Container\Container
19
     */
20
    protected $container;
21

    
22
    /**
23
     * The pipeline instance for the bus.
24
     *
25
     * @var \Illuminate\Pipeline\Pipeline
26
     */
27
    protected $pipeline;
28

    
29
    /**
30
     * The pipes to send commands through before dispatching.
31
     *
32
     * @var array
33
     */
34
    protected $pipes = [];
35

    
36
    /**
37
     * The queue resolver callback.
38
     *
39
     * @var \Closure|null
40
     */
41
    protected $queueResolver;
42

    
43
    /**
44
     * Create a new command dispatcher instance.
45
     *
46
     * @param  \Illuminate\Contracts\Container\Container  $container
47
     * @param  \Closure|null  $queueResolver
48
     * @return void
49
     */
50
    public function __construct(Container $container, Closure $queueResolver = null)
51
    {
52
        $this->container = $container;
53
        $this->queueResolver = $queueResolver;
54
        $this->pipeline = new Pipeline($container);
55
    }
56

    
57
    /**
58
     * Dispatch a command to its appropriate handler.
59
     *
60
     * @param  mixed  $command
61
     * @return mixed
62
     */
63
    public function dispatch($command)
64
    {
65
        if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
66
            return $this->dispatchToQueue($command);
67
        } else {
68
            return $this->dispatchNow($command);
69
        }
70
    }
71

    
72
    /**
73
     * Dispatch a command to its appropriate handler in the current process.
74
     *
75
     * @param  mixed  $command
76
     * @return mixed
77
     */
78
    public function dispatchNow($command)
79
    {
80
        return $this->pipeline->send($command)->through($this->pipes)->then(function ($command) {
81
            return $this->container->call([$command, 'handle']);
82
        });
83
    }
84

    
85
    /**
86
     * Determine if the given command should be queued.
87
     *
88
     * @param  mixed  $command
89
     * @return bool
90
     */
91
    protected function commandShouldBeQueued($command)
92
    {
93
        return $command instanceof ShouldQueue;
94
    }
95

    
96
    /**
97
     * Dispatch a command to its appropriate handler behind a queue.
98
     *
99
     * @param  mixed  $command
100
     * @return mixed
101
     *
102
     * @throws \RuntimeException
103
     */
104
    public function dispatchToQueue($command)
105
    {
106
        $connection = isset($command->connection) ? $command->connection : null;
107

    
108
        $queue = call_user_func($this->queueResolver, $connection);
109

    
110
        if (! $queue instanceof Queue) {
111
            throw new RuntimeException('Queue resolver did not return a Queue implementation.');
112
        }
113

    
114
        if (method_exists($command, 'queue')) {
115
            return $command->queue($queue, $command);
116
        } else {
117
            return $this->pushCommandToQueue($queue, $command);
118
        }
119
    }
120

    
121
    /**
122
     * Push the command onto the given queue instance.
123
     *
124
     * @param  \Illuminate\Contracts\Queue\Queue  $queue
125
     * @param  mixed  $command
126
     * @return mixed
127
     */
128
    protected function pushCommandToQueue($queue, $command)
129
    {
130
        if (isset($command->queue, $command->delay)) {
131
            return $queue->laterOn($command->queue, $command->delay, $command);
132
        }
133

    
134
        if (isset($command->queue)) {
135
            return $queue->pushOn($command->queue, $command);
136
        }
137

    
138
        if (isset($command->delay)) {
139
            return $queue->later($command->delay, $command);
140
        }
141

    
142
        return $queue->push($command);
143
    }
144

    
145
    /**
146
     * Set the pipes through which commands should be piped before dispatching.
147
     *
148
     * @param  array  $pipes
149
     * @return $this
150
     */
151
    public function pipeThrough(array $pipes)
152
    {
153
        $this->pipes = $pipes;
154

    
155
        return $this;
156
    }
157
}
(2-2/4)