laravel 源码分析具体注释见 https://github.com/FX-Max/source-analysis-laravel
前言 队列 (Queue) 是 laravel 中比较常用的一个功能,队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求和响应的时间。本文我们就来分析下队列创建和执行的源码。
本文笔者基于 laravel 5.8.* 版本
队列任务的创建 先通过命令创建一个 Job 类,成功之后会创建如下文件 laravel-src/laravel/app/Jobs/DemoJob.php。
1 2 3 > php artisan make:job DemoJob > Job created successfully.
下面我们来分析一下 Job 类的具体生成过程。
执行 php artisan make:job DemoJob
后,会触发调用如下方法。
laravel-src/laravel/vendor/laravel/framework/src/Illuminate/Foundation/Providers/ArtisanServiceProvider.php
1 2 3 4 5 6 7 8 9 10 11 protected function registerJobMakeCommand ( ) { $this ->app->singleton ('command.job.make' , function ($app ) { return new JobMakeCommand ($app ['files' ]); }); }
接着我们来看下 JobMakeCommand 这个类,这个类里面没有过多的处理逻辑,处理方法在其父类中。
1 class JobMakeCommand extends GeneratorCommand
我们直接看父类中的处理方法,GeneratorCommand->handle(),以下是该方法中的主要方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 public function handle ( ) { $name = $this ->qualifyClass ($this ->getNameInput ()); $path = $this ->getPath ($name ); $this ->makeDirectory ($path ); $this ->files->put ($path , $this ->buildClass ($name )); $this ->info ($this ->type.' created successfully.' ); }
方法就是通过目录和文件,创建对应的类文件,至于新文件的内容,都是基于已经设置好的模板来创建的,具体的内容在 buildClass($name) 方法中。
1 2 3 4 5 6 7 protected function buildClass ($name ) { $stub = $this ->files->get ($this ->getStub ()); return $this ->replaceNamespace ($stub , $name )->replaceClass ($stub , $name ); }
获取模板文件
1 2 3 4 5 6 protected function getStub ( ) { return $this ->option ('sync' ) ? __DIR__ .'/stubs/job.stub' : __DIR__ .'/stubs/job-queued.stub' ; }
job.stub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 <?php namespace DummyNamespace ;use Illuminate \Bus \Queueable ;use Illuminate \Foundation \Bus \Dispatchable ;class DummyClass { use Dispatchable , Queueable ; public function __construct ( ) { } public function handle ( ) { } }
job-queued.stub
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 <?php namespace DummyNamespace ;use Illuminate \Bus \Queueable ;use Illuminate \Queue \SerializesModels ;use Illuminate \Queue \InteractsWithQueue ;use Illuminate \Contracts \Queue \ShouldQueue ;use Illuminate \Foundation \Bus \Dispatchable ;class DummyClass implements ShouldQueue { use Dispatchable , InteractsWithQueue , Queueable , SerializesModels ; public function __construct ( ) { } public function handle ( ) { } }
下面看一下前面我们创建的一个 Job 类,DemoJob.php,就是来源于模板 job-queued.stub。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 <?php namespace App \Jobs ;use Illuminate \Bus \Queueable ;use Illuminate \Queue \SerializesModels ;use Illuminate \Queue \InteractsWithQueue ;use Illuminate \Contracts \Queue \ShouldQueue ;use Illuminate \Foundation \Bus \Dispatchable ;class DemoJob implements ShouldQueue { use Dispatchable , InteractsWithQueue , Queueable , SerializesModels ; public function __construct ( ) { } public function handle ( ) { } }
至此,我们已经大致明白了队列任务类是如何创建的了。下面我们来分析下其是如何生效运行的。
队列任务的分发 任务类创建后,我们就可以在需要的地方进行任务的分发,常见的方法如下:
1 2 DemoJob::dispatch(); // 任务分发 DemoJob::dispatchNow(); // 同步调度,队列任务不会排队,并立即在当前进程中进行
下面先以 dispatch() 为例分析下分发过程。
1 2 3 4 5 6 7 trait Dispatchable { public static function dispatch() { return new PendingDispatch(new static(...func_get_args())); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 class PendingDispatch { protected $job; public function __construct($job) { echo '[Max] ' . 'PendingDispatch ' . '__construct' . PHP_EOL; $this->job = $job; } public function __destruct() { echo '[Max] ' . 'PendingDispatch ' . '__destruct' . PHP_EOL; app(Dispatcher::class)->dispatch($this->job); } }
重点是 app(Dispatcher::class)->dispatch($this->job) 这部分。
我们先来分析下前部分 app(Dispatcher::class),它是在 laravel 框架中自带的 BusServiceProvider 中向 $app 中注入的。
1 2 3 4 5 6 7 8 9 10 11 class BusServiceProvider extends ServiceProvider implements DeferrableProvider { public function register() { $this->app->singleton(Dispatcher::class, function ($app) { return new Dispatcher($app, function ($connection = null) use ($app) { return $app[QueueFactoryContract::class]->connection($connection); }); }); } }
看一下 Dispatcher 的构造方法,至此,我们已经知道前半部分 app(Dispatcher::class) 是如何来的了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 class Dispatcher implements QueueingDispatcher { protected $container; protected $pipeline; protected $queueResolver; public function __construct(Container $container, Closure $queueResolver = null) { $this->container = $container; /** * Illuminate/Bus/BusServiceProvider.php->register()中 * $queueResolver 传入的是一个闭包 * function ($connection = null) use ($app) { * return $app[QueueFactoryContract::class]->connection($connection); * } */ $this->queueResolver = $queueResolver; $this->pipeline = new Pipeline($container); } public function dispatch($command) { if ($this->queueResolver && $this->commandShouldBeQueued($command)) { // 将 $command 存入队列 return $this->dispatchToQueue($command); } return $this->dispatchNow($command); } }
BusServiceProvider 中注册了 Dispatcher::class ,然后 app(Dispatcher::class)->dispatch($this->job) 调用的即是 Dispatcher->dispatch()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public function dispatchToQueue($command) { // 获取任务所属的 connection $connection = $command->connection ?? null; /* * 获取队列实例,根据 config/queue.php 中的配置 * 此处我们配置 QUEUE_CONNECTION=redis 为例,则获取的是 RedisQueue * 至于如何通过 QUEUE_CONNECTION 的配置获取 queue ,此处先跳过,本文后面会具体分析。 */ $queue = call_user_func($this->queueResolver, $connection); if (! $queue instanceof Queue) { throw new RuntimeException('Queue resolver did not return a Queue implementation.'); } // 我们创建的 DemoJob 无 queue 方法,则不会调用 if (method_exists($command, 'queue')) { return $command->queue($queue, $command); } // 将 job 放入队列 return $this->pushCommandToQueue($queue, $command); } protected function pushCommandToQueue($queue, $command) { // 在指定了 queue 或者 delay 时会调用不同的方法,基本大同小异 if (isset($command->queue, $command->delay)) { return $queue->laterOn($command->queue, $command->delay, $command); } if (isset($command->queue)) { return $queue->pushOn($command->queue, $command); } if (isset($command->delay)) { return $queue->later($command->delay, $command); } // 此处我们先看最简单的无参数时的情况,调用 push() return $queue->push($command); }
笔者的配置是 QUEUE_CONNECTION=redis ,估以此来分析,其他类型的原理基本类似。
配置的是 redis 时, $queue 是 RedisQueue 实例,下面我们看下 RedisQueue->push() 的内容。
Illuminate/Queue/RedisQueue.php
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public function push($job, $data = '', $queue = null) { /** * 获取队列名称 * var_dump($this->getQueue($queue)); * 创建统一的 payload,转成 json * var_dump($this->createPayload($job, $this->getQueue($queue), $data)); */ // 将任务和数据存入队列 return $this->pushRaw($this->createPayload($job, $this->getQueue($queue), $data), $queue); } public function pushRaw($payload, $queue = null, array $options = []) { // 写入 redis 中 $this->getConnection()->eval( LuaScripts::push(), 2, $this->getQueue($queue), $this->getQueue($queue).':notify', $payload ); // 返回 id return json_decode($payload, true)['id'] ?? null; }
至此,我们已经分析完了任务是如何被加入到队列中的。