Swoole深入学习(2)Process

php 进阶

Posted by feimo on January 11, 2018

swoole-1.7.2增加了一个进程管理模块,用来替代PHP的pcntl扩展。pcntl是php新增的一个多进程扩展,用来实现多进程,但是有很多不完善的地方,swoole 就完善了这些地方,而且使得使用非常简单。

创建一个多进程

swoole创建多进程很简单:new Swoole\Process(‘callback_function’) 就可以了。

比如我要同时创建10个进程,就for 循环10次就可以了。

for($i=0; $i<=10 ; $i++){
    $process = new Swoole\Process('callback_function');
    $pid = $process->start();
    echo PHP_EOL . $pid;//
}

进程间的通信

如果是非常简单的多进程执行任务,那么进程间就不需要通讯了,实际情况下,很多业务是需要通讯的,比如,发邮件,如果子进程发送失败了,那么是要通知主进程的等等。

再看进程间通信之前,先看下 Swoole\Process 的几个参数:

Swoole\Process(mixed $function, $redirect_stdin_stdout = false, $create_pipe = true);

它有三个参数:

  • $function:子进程创建成功后要执行的函数

  • $redirect_stdin_stdout:重定向子进程的标准输入和输出。 设置为true,则在进程内echo将不是打印屏幕,而是写入到管道,读取键盘输入将变为从管道中读取数据。 默认为false,阻塞读取。

  • $create_pipe:是否创建管道,启用$redirect_stdin_stdout后,此选项将忽略用户参数,强制为true 如果子进程内没有进程间通信,可以设置为false。

swoole_process进程间支持2种通信方式

  • 管道pipe
  • 消息队列

管道通信

管道通信是swoole_process默认的一种通信方式。当然我们也可以在实例化的时候通过参数来设定:

$process = new Swoole\Process('callback_function', false, true);

这样就创建了一个管道通信进程。我们打印下$process这个对象的值:

var_dump($process)
object(swoole_process)#1 (3) {
  ["pipe"]=>
  int(2)
  ["callback"]=>
  string(26) "callback_function"
  ["pid"]=>
  int(4333)
}

里面有个字段pipe是管道id,还有一个pid是进程id,所以: 每次创建一个进程后,就会随之创建一个管道,主进程想和哪一个进程通信,就向那个进程的管道写入/读取数据。 管道有2个方法,分别来写入数据,和读取数据:

write()  和  read() 

来个例子,看下如何通过管道通信

<?php
/**
 * Created by PhpStorm.
 * User: feimo
 * Date: 2018\1\11 
 * Time: 14:41
 */

//进程数量
$worker_num = 2;
$workers = [];
for ($i = 0; $i < $worker_num; $i++) {
    $process = new Swoole\Process('callback_function', false);
    $pid = $process->start();
    //将每一个进程的句柄存起来
    $workers[$pid] = $process;
}
// 主进程,通过管道给子进程发送数据
foreach ($workers as $pid => $process) {
    //向子进程管道里写内容:$process->write($data);
    $process->write("hello worker[$pid]\n");
    //从子进程管道里面读取信息:$process->read();
    echo "From Worker: ".$process->read();
}
//子进程执行的回调函数
function callback_function($worker){
    //从主进程管道中读取
    $recv = $worker->read();
    echo PHP_EOL. "From Master: $recv\n";
    //向主进程管道中写入数据
    $worker->write("hello master , this pipe  is ". $worker->pipe .";  this  pid  is ".$worker->pid."\n");
    $worker->exit(0);
}

运行看下效果:

$ php process_pipe.php
From Master: hello worker[6759]
From Worker: hello master , this pipe  is 4;  this  pid  is 6759
From Master: hello worker[6760]
From Worker: hello master , this pipe  is 6;  this  pid  is 6760

注意 主进程和子进程中的 read() 不能一开始都读,得写反。不然管道中没数据,就会阻塞住了。

所以可以参考下面的图: 所以一般的顺序是:

master-->write()

work-->read()

work-->write()

master-->read()

这样才能有序的使用通道,才不会被阻塞。而且是一对一的,write 2 次,也要read 2次,先write先read。

第二个参数 $redirect_stdin_stdout 说,设置为 true ,子进程会将 echo 写入到主管道。我把上面的代码改一下,看下输出结果是啥。

$process = new Swoole\Process('callback_function', true, true);

紧紧改动了这一行,再看下:

//进程数量
$worker_num = 2;
$workers = [];
for ($i = 0; $i < $worker_num; $i++) {
    $process = new Swoole\Process('callback_function', true);
    $pid = $process->start();
    //将每一个进程的句柄存起来
    $workers[$pid] = $process;
}
// 主进程,通过管道给子进程发送数据 
foreach ($workers as $pid => $process) {
    //向子进程管道里写内容:$process->write($data);
    $process->write("hello worker[$pid]\n");
    //从子进程管道里面读取信息:$process->read();
    echo "From Worker: ".$process->read();
}
//子进程执行的回调函数
function callback_function($worker){
    //从主进程管道中读取    
    $recv = $worker->read();
    //这个echo 相当于在往master管道里写数据。write('From Master: hello worker[9251]')
    echo "From Master: $recv\n";
    //第二次写, 但是主进程没有第二次read(),所以没有被读到。
    $worker->write("hello master , this pipe  is ". $worker->pipe .";  this  pid  is ".$worker->pid."\n");
    $worker->exit(0);
}

运行下输出结果为:

$ php test.php
From Worker: From Master: hello worker[9251]
From Worker: From Master: hello worker[9252]

来分析下,因为$redirect_stdin_stdout为true,所以子进程中echo的内容就到了主管道里面,而不是打印在屏幕上,所以,主进程从管道里读到的内容,就是子进程中echo的内容。 也就造成了上面的输出结果。

那么如何使子进程中的第二个write,能被主进程读到呢?很简单,在主进程中在 read() 一次就可以了:

// 主进程,通过管道给子进程发送数据 
foreach ($workers as $pid => $process) {
    //向子进程管道里写内容:$process->write($data);
    $process->write("hello worker[$pid]\n");
    //从子进程管道里面读取信息:$process->read();
    echo "From Worker: ".$process->read();
    //第二次读
    echo "From Worker: ".$process->read();
}

再看下打印结果:

From Worker: From Master: hello worker[9328]
From Worker: hello master , this pipe  is 4;  this  pid  is 9328
From Worker: From Master: hello worker[9329]
From Worker: hello master , this pipe  is 6;  this  pid  is 9329

消息队列

swoole进程通信还有第二种方式就是“消息队列”,这个消息队列其实就是Linux系统里面的msgqueue。

swoole提供了2个方法,来实现消息队列的通信。

pop() 和 push() 

要使用队列,必须在start方法前使用useQueue。

<?php
$workers = [];
$worker_num = 2;
for($i = 0; $i < $worker_num; $i++)
{
    $process = new Swoole\Process('callback_function',false,false);
    $process->useQueue();
    $pid = $process->start();
    $workers[$pid] = $process;
}
foreach($workers as $pid => $process)
{
    //给子进程发布消息
    $process->push("hello worker[$pid]\n");
}
function callback_function($worker)
{
    //接受来自主进程的消息
    $recv = $worker->pop();
    echo "From Master: $recv\n";
    $worker->exit(0);
}
//等待消息停止
for($i = 0; $i < $worker_num; $i++)
{
    $ret = Swoole\Process::wait();
    $pid = $ret['pid'];
    unset($workers[$pid]);
    echo "Worker Exit, PID=".$pid.PHP_EOL;
}

运行下:

$ php process_msg.php
From Master: hello worker[18219]
From Master: hello worker[18218]
Worker Exit, PID=18218
Worker Exit, PID=18219

消息队列,依赖于linux系统ipcs,所以,如果空间满了,可能会阻塞。或者出现其他一些异常,具体可以参考swoole内核设置:https://wiki.swoole.com/wiki/page/p-server/sysctl.html

与消息队列相关的几个系统命令:

查看消息队列

ipcs -q

ipcrm 删除消息队列

ipcrm -q MessageId

批量删除所有的队列

ipcs -q | sed "$ d; 1,2d" |  awk '{ print "Removing " $2; system("ipcrm -q " $2) }'