PHP 使用协同程序实现合作多任务
原文地址:
http://www.oschina.net/translate/cooperative-multitasking-using-coroutines-in-php
鸟哥加强版
http://www.laruence.com/2015/05/28/3038.html
原文内容(书读百遍其义自见)
PHP5.5一个比较好的新功能是实现对生成器和协同程序的支持。对于生成器,PHP的文档和各种其他的博客文章(就像这一个或这一个)已经有了非常详细的讲解。协同程序相对受到的关注就少了,所以协同程序虽然有很强大的功能但也很难被知晓,解释起来也比较困难。
这篇文章指导你通过使用协同程序来实施任务调度,通过实例实现对技术的理解。我将在前三节做一个简单的背景介绍。如果你已经有了比较好的基础,可以直接跳到“协同多任务处理”一节。
生成器
生成器最基本的思想也是一个函数,这个函数的返回值是依次输出,而不是只返回一个单独的值。或者,换句话说,生成器使你更方便的实现了迭代器接口。下面通过实现一个xrange函数来简单说明:
<?php function xrange($start, $end, $step = 1) { for ($i = $start; $i <= $end; $i += $step) { yield $i; } } foreach (xrange(1, 1000000) as $num) { echo $num, "\n"; }
上面这个xrange()函数提供了和PHP的内建函数range()一样的功能。但是不同的是range()函数返回的是一个包含属组值从1到100万的数组(注:请查看手册)。而xrange()函数返回的是依次输出这些值的一个迭代器,而且并不会真正以数组形式计算。
这种方法的优点是显而易见的。它可以让你在处理大数据集合的时候不用一次性的加载到内存中。甚至你可以处理无限大的数据流。
当然,也可以不同通过生成器来实现这个功能,而是可以通过继承Iterator接口实现。通过使用生成器实现起来会更方便,而不用再去实现iterator接口中的5个方法了。
生成器为可中断的函数
要从生成器认识协同程序,理解它们内部是如何工作的非常重要:生成器是可中断的函数,在它里面,yield构成了中断点。
紧接着上面的例子,如果你调用xrange(1,1000000)的话,xrange()函数里代码没有真正地运行。相反,PHP只是返回了一个实现了迭代器接口的 生成器类实例:
<?php $range = xrange(1, 1000000); var_dump($range); // object(Generator)#1 var_dump($range instanceof Iterator); // bool(true)
你对某个对象调用迭代器方法一次,其中的代码运行一次。例如,如果你调用$range->rewind(),那么xrange()里的代码运行到控制流 第一次出现yield的地方。在这种情况下,这就意味着当$i=$start时yield $i才运行。传递给yield语句的值是使用$range->current()获取的。
为了继续执行生成器中的代码,你必须调用$range->next()方法。这将再次启动生成器,直到yield语句出现。因此,连续调用next()和current()方法 你将能从生成器里获得所有的值,直到某个点没有再出现yield语句。对xrange()来说,这种情形出现在$i超过$end时。在这中情况下, 控制流将到达函数的终点,因此将不执行任何代码。一旦这种情况发生,vaild()方法将返回假,这时迭代结束。
协程
协程给上面功能添加的主要东西是回送数据给生成器的能力。这将把生成器到调用者的单向通信转变为两者之间的双向通信。
通过调用生成器的send()方法而不是其next()方法传递数据给协程。下面的logger()协程是这种通信如何运行的例子:
<?php function logger($fileName) { $fileHandle = fopen($fileName, 'a'); while (true) { fwrite($fileHandle, yield . "\n"); } } $logger = logger(__DIR__ . '/log'); $logger->send('Foo'); $logger->send('Bar')
正如你能看到,这儿yield没有作为一个语句来使用,而是用作一个表达式。即它有一个返回值。yield的返回值是传递给send()方法的值。 在这个例子里,yield将首先返回”Foo”,然后返回”Bar”。
上面的例子里yield仅作为接收者。混合两种用法是可能的,即既可接收也可发送。接收和发送通信如何进行的例子如下:
<?php function gen() { $ret = (yield 'yield1'); var_dump($ret); $ret = (yield 'yield2'); var_dump($ret); } $gen = gen(); var_dump($gen->current()); // string(6) "yield1" var_dump($gen->send('ret1')); // string(4) "ret1" (the first var_dump in gen) // string(6) "yield2" (the var_dump of the ->send() return value) var_dump($gen->send('ret2')); // string(4) "ret2" (again from within gen) // NULL (the return value of ->send())
马上理解输出的精确顺序有点困难,因此确定你知道为什按照这种方式输出。我愿意特别指出的有两点:第一点,yield表达式两边使用 圆括号不是偶然。由于技术原因(虽然我已经考虑为赋值增加一个异常,就像Python那样),圆括号是必须的。第二点,你可能已经注意到 调用current()之前没有调用rewind()。如果是这么做的,那么已经隐含地执行了rewind操作。
多任务协作
如果阅读了上面的logger()例子,那么你认为“为了双向通信我为什么要使用协程呢? 为什么我不能只用常见的类呢?”,你这么问完全正确。上面的例子演示了基本用法,然而上下文中没有真正的展示出使用协程的优点。这就是列举许多协程例子的理由。正如上面介绍里提到的,协程是非常强大的概念,不过这样的应用很稀少而且常常十分复杂。给出一些简单而真实的例子很难。
在这篇文章里,我决定去做的是使用协程实现多任务协作。我们尽力解决的问题是你想并发地运行多任务(或者“程序”)。不过处理器在一个时刻只能运行一个任务(这篇文章的目标是不考虑多核的)。因此处理器需要在不同的任务之间进行切换,而且总是让每个任务运行 “一小会儿”。
多任务协作这个术语中的“协作”说明了如何进行这种切换的:它要求当前正在运行的任务自动把控制传回给调度器,这样它就可以运行其他任务了。这与“抢占”多任务相反,抢占多任务是这样的:调度器可以中断运行了一段时间的任务,不管它喜欢还是不喜欢。协作多任务在Windows的早期版本(windows95)和Mac OS中有使用,不过它们后来都切换到使用抢先多任务了。理由相当明确:如果你依靠程序自动传回 控制的话,那么坏行为的软件将很容易为自身占用整个CPU,不与其他任务共享。
这个时候你应当明白协程和任务调度之间的联系:yield指令提供了任务中断自身的一种方法,然后把控制传递给调度器。因此协程可以运行多个其他任务。更进一步来说,yield可以用来在任务和调度器之间进行通信。
我们的目的是 对 “任务”用更轻量级的包装的协程函数:
<?php class Task { protected $taskId; protected $coroutine; protected $sendValue = null; protected $beforeFirstYield = true; public function __construct($taskId, Generator $coroutine) { $this->taskId = $taskId; $this->coroutine = $coroutine; } public function getTaskId() { return $this->taskId; } public function setSendValue($sendValue) { $this->sendValue = $sendValue; } public function run() { if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } public function isFinished() { return !$this->coroutine->valid(); } }
一个任务是用 任务ID标记一个协程。使用setSendValue()方法,你可以指定哪些值将被发送到下次的恢复(在之后你会了解到我们需要这个)。 run()函数确实没有做什么,除了调用send()方法的协同程序。要理解为什么添加beforeFirstYieldflag,需要考虑下面的代码片段:
<?php class Scheduler { protected $maxTaskId = 0; protected $taskMap = []; // taskId => task protected $taskQueue; public function __construct() { $this->taskQueue = new SplQueue(); } public function newTask(Generator $coroutine) { $tid = ++$this->maxTaskId; $task = new Task($tid, $coroutine); $this->taskMap[$tid] = $task; $this->schedule($task); return $tid; } public function schedule(Task $task) { $this->taskQueue->enqueue($task); } public function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $task->run(); if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } } }
通过添加 beforeFirstYieldcondition 我们可以确定 first yield 的值 被返回。
调度器现在不得不比多任务循环要做稍微多点了,然后才运行多任务:
<?php class Scheduler { protected $maxTaskId = 0; protected $taskMap = []; // taskId => task protected $taskQueue; public function __construct() { $this->taskQueue = new SplQueue(); } public function newTask(Generator $coroutine) { $tid = ++$this->maxTaskId; $task = new Task($tid, $coroutine); $this->taskMap[$tid] = $task; $this->schedule($task); return $tid; } public function schedule(Task $task) { $this->taskQueue->enqueue($task); } public function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $task->run(); if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } } }
newTask()方法(使用下一个空闲的任务id)创建一个新任务,然后把这个任务放入任务映射数组里。接着它通过把任务放入任务队列里来实现对任务的调度。接着run()方法扫描任务队列,运行任务。如果一个任务结束了,那么它将从队列里删除,否则它将在队列的末尾再次被调度。
让我们看看下面具有两个简单(并且没有什么意义)任务的调度器:
<?php function task1() { for ($i = 1; $i <= 10; ++$i) { echo "This is task 1 iteration $i.\n"; yield; } } function task2() { for ($i = 1; $i <= 5; ++$i) { echo "This is task 2 iteration $i.\n"; yield; } } $scheduler = new Scheduler; $scheduler->newTask(task1()); $scheduler->newTask(task2()); $scheduler->run();
两个任务都仅仅回显一条信息,然后使用yield把控制回传给调度器。输出结果如下:
This is task 1 iteration 1. This is task 2 iteration 1. This is task 1 iteration 2. This is task 2 iteration 2. This is task 1 iteration 3. This is task 2 iteration 3. This is task 1 iteration 4. This is task 2 iteration 4. This is task 1 iteration 5. This is task 2 iteration 5. This is task 1 iteration 6. This is task 1 iteration 7. This is task 1 iteration 8. This is task 1 iteration 9. This is task 1 iteration 10.
输出确实如我们所期望的:对前五个迭代来说,两个任务是交替运行的,接着第二个任务结束后,只有第一个任务继续运行。
与调度器之间通信
既然调度器已经运行了,那么我们就转向日程表的下一项:任务和调度器之间的通信。我们将使用进程用来和操作系统会话的同样的方式来通信:系统调用。我们需要系统调用的理由是操作系统与进程相比它处在不同的权限级别上。因此为了执行特权级别的操作(如杀死另一个进程),就不得不以某种方式把控制传回给内核,这样内核就可以执行所说的操作了。再说一遍,这种行为在内部是通过使用中断指令来实现的。过去使用的是通用的int指令,如今使用的是更特殊并且更快速的syscall/sysenter指令。
我们的任务调度系统将反映这种设计:不是简单地把调度器传递给任务(这样久允许它做它想做的任何事),我们将通过给yield表达式传递信息来与系统调用通信。这儿yield即是中断,也是传递信息给调度器(和从调度器传递出信息)的方法。
为了说明系统调用,我将对可调用的系统调用做一个小小的封装:
<?php class SystemCall { protected $callback; public function __construct(callable $callback) { $this->callback = $callback; } public function __invoke(Task $task, Scheduler $scheduler) { $callback = $this->callback; // Can't call it directly in PHP :/ return $callback($task, $scheduler); } }
它将像其他任何可调用那样(使用_invoke)运行,不过它要求调度器把正在调用的任务和自身传递给这个函数。为了解决这个问题 我们不得不微微的修改调度器的run方法:
<?php public function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $retval = $task->run(); if ($retval instanceof SystemCall) { $retval($task, $this); continue; } if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } }
第一个系统调用除了返回任务ID外什么都没有做:
<?php function task($max) { $tid = (yield getTaskId()); // <-- here's the syscall! for ($i = 1; $i <= $max; ++$i) { echo "This is task $tid iteration $i.\n"; yield; } } $scheduler = new Scheduler; $scheduler->newTask(task(10)); $scheduler->newTask(task(5)); $scheduler->run();
这个函数确实设置任务id为下一次发送的值,并再次调度了这个任务。由于使用了系统调用,所以调度器不能自动调用任务,我们需要手工调度任务(稍后你将明白为什么这么做)。要使用这个新的系统调用的话,我们要重新编写以前的例子:
<?php function task($max) { $tid = (yield getTaskId()); // <-- here's the syscall! for ($i = 1; $i <= $max; ++$i) { echo "This is task $tid iteration $i.\n"; yield; } } $scheduler = new Scheduler; $scheduler->newTask(task(10)); $scheduler->newTask(task(5)); $scheduler->run();
这段代码将给出与前一个例子相同的输出。注意系统调用同其他任何调用一样正常地运行,不过预先增加了yield。要创建新的任务,然后再杀死它们的话,需要两个以上的系统调用:
<?php function newTask(Generator $coroutine) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($coroutine) { $task->setSendValue($scheduler->newTask($coroutine)); $scheduler->schedule($task); } ); } function killTask($tid) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($tid) { $task->setSendValue($scheduler->killTask($tid)); $scheduler->schedule($task); } ); }
killTask函数需要在调度器里增加一个方法:
<?php public function killTask($tid) { if (!isset($this->taskMap[$tid])) { return false; } unset($this->taskMap[$tid]); // This is a bit ugly and could be optimized so it does not have to walk the queue, // but assuming that killing tasks is rather rare I won't bother with it now foreach ($this->taskQueue as $i => $task) { if ($task->getTaskId() === $tid) { unset($this->taskQueue[$i]); break; } } return true; }
用来测试新功能的微脚本:
<?php function childTask() { $tid = (yield getTaskId()); while (true) { echo "Child task $tid still alive!\n"; yield; } } function task() { $tid = (yield getTaskId()); $childTid = (yield newTask(childTask())); for ($i = 1; $i <= 6; ++$i) { echo "Parent task $tid iteration $i.\n"; yield; if ($i == 3) yield killTask($childTid); } } $scheduler = new Scheduler; $scheduler->newTask(task()); $scheduler->run();
这段代码将打印以下信息:
Parent task 1 iteration 1. Child task 2 still alive! Parent task 1 iteration 2. Child task 2 still alive! Parent task 1 iteration 3. Child task 2 still alive! Parent task 1 iteration 4. Parent task 1 iteration 5. Parent task 1 iteration 6.
整体的代码
//任务类 class Task { protected $taskId; protected $coroutine; protected $sendValue = null; protected $beforeFirstYield = true; public function __construct($taskId, Generator $coroutine) { $this->taskId = $taskId; $this->coroutine = $coroutine; } public function getTaskId() { return $this->taskId; } public function setSendValue($sendValue) { $this->sendValue = $sendValue; } public function run() { if ($this->beforeFirstYield) { $this->beforeFirstYield = false; return $this->coroutine->current(); } else { $retval = $this->coroutine->send($this->sendValue); $this->sendValue = null; return $retval; } } public function isFinished() { return !$this->coroutine->valid(); } } //调度器 class Scheduler { protected $maxTaskId = 0; protected $taskMap = []; // taskId => task protected $taskQueue; public function __construct() { $this->taskQueue = new SplQueue(); } public function newTask(Generator $coroutine) { $tid = ++$this->maxTaskId; $task = new Task($tid, $coroutine); $this->taskMap[$tid] = $task; $this->schedule($task); return $tid; } public function killTask($tid) { if (!isset($this->taskMap[$tid])) { return false; } unset($this->taskMap[$tid]); // This is a bit ugly and could be optimized so it does not have to walk the queue, // but assuming that killing tasks is rather rare I won't bother with it now foreach ($this->taskQueue as $i => $task) { if ($task->getTaskId() === $tid) { unset($this->taskQueue[$i]); break; } } return true; } public function schedule(Task $task) { $this->taskQueue->enqueue($task); } public function run_() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $task->run(); if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } } public function run() { while (!$this->taskQueue->isEmpty()) { $task = $this->taskQueue->dequeue(); $retval = $task->run(); if ($retval instanceof SystemCall) { $retval($task, $this); continue; } if ($task->isFinished()) { unset($this->taskMap[$task->getTaskId()]); } else { $this->schedule($task); } } } } //回调函数 class SystemCall { protected $callback; public function __construct(callable $callback) { $this->callback = $callback; } public function __invoke(Task $task, Scheduler $scheduler) { $callback = $this->callback; // Can't call it directly in PHP :/ return $callback($task, $scheduler); } } function getTaskId() { return new SystemCall(function(Task $task, Scheduler $scheduler) { $task->setSendValue($task->getTaskId()); $scheduler->schedule($task); }); } function newTask(Generator $coroutine) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($coroutine) { $task->setSendValue($scheduler->newTask($coroutine)); $scheduler->schedule($task); } ); } function killTask($tid) { return new SystemCall( function(Task $task, Scheduler $scheduler) use ($tid) { $task->setSendValue($scheduler->killTask($tid)); $scheduler->schedule($task); } ); } //第三次测试的代码 function childTask() { $tid = (yield getTaskId()); while (true) { echo "Child task $tid still alive!\n<br/>"; yield; } } function task() { $tid = (yield getTaskId()); $childTid = (yield newTask(childTask())); for ($i = 1; $i <= 6; ++$i) { echo "Parent task $tid iteration $i.\n<br/>"; yield; if ($i == 3) yield killTask($childTid); } } $scheduler = new Scheduler; $scheduler->newTask(task()); $scheduler->run();