PHP-Swoole同步/异步任务(task)的使用介绍

1.前言

swoole中的task和进程池比较类似,都是多进程任务的处理,task封装在server里面,在socket方面支持的更加友善。task进程通信方式和进程池一样有socket和消息队列等方式,下面我们来看下使用示例。

2.socket模式

<?php
const MAXREPLAY = 5;

$server = new swoole_server('0.0.0.0', 7777, SWOOLE_BASE);

$server->set([
    'worker_num' => 1,
    'task_worker_num' => 2,
    'task_ipc_mode' => 1, //1:socket 2:消息队列 3:消息队列竞争模式
]);

$server->on('workerStart', function($server, $fd) {
    if ($server->taskworker) {
        echo 'task worker:';
    } else {
        echo 'worker:';
    }
    echo  $server->worker_id . PHP_EOL;
});

$server->on('PipeMessage',function ($server, $src_worker_id,$message) {
    echo "来自于{$src_worker_id}的管道消息".PHP_EOL;
    var_dump($message);
    $server->task($message); // 异步任务
    $server->taskwait($message ,10); //同步任务
});

$server->on('receive', function($server, $fd, $reactor_id, $data) {
    var_dump('接收消息' , $data);
    $data = ['tid' => time() , 'data' => $data, 'fd' => $fd];
    $server->task($data);
});

$server->on('task', function($server, $task_id, $reactor_id, $data) {
    if (isset($data['task_id'])) {
        if ($data['replay'] < MAXREPLAY) {
            //to do
            ++$data['replay'];
            $server->sendMessage($data,0);
        } else {
            $finish['task_id'] = $data['task_id'];
            $finish['status'] = 0;
            $finish['error'] = '任务执行次数超过' . MAXREPLAY . '执行失败';
            $finish['fd'] = $data['fd'];
            $server->finish($finish);
        }
    } else {
        $data['replay'] = 0;
        $data['task_id'] = $task_id;
        $server->sendMessage($data,0);
    }
});

$server->on('finish', function($server, $task_id, $data) {
    echo 'finish task_id:' . $data['task_id'] . PHP_EOL;
    $server->send($data['fd'], $data['error']);
});

$server->on('close', function() {
    echo 'close' . PHP_EOL;
});

$server->start();
<?php

$client=new swoole\Client(SWOOLE_SOCK_TCP);

$client->connect('127.0.0.1',7777);

$message = '123';

$client->send($message);

echo $client->recv() . PHP_EOL;

$client->close();

3.消息队列

服务端采用消息队列处理任务以后,客户端有两种方式的实现:

1.客户端采用TCP方式发送任务

2.客户端采用消息队列的方式发送任务

后一种发送消息的时候,需要封装成swoole的消息队列头才能被服务器识别,而且这种方式,服务器需要配置消息队列竞争模式才能够从消息队列中读取数据。

<?php
$server = new swoole_server('0.0.0.0', 7777, SWOOLE_BASE);

$server->set([
    'worker_num' => 1,
    'task_worker_num' => 2,
    'task_ipc_mode' => 3, //1:socket 2:消息队列 3:消息队列竞争模式
    'message_queue_key' => 0x70,
    'task_tmpdir' => '/tmp/',
]);

$server->on('workerStart', function($server, $fd) {
    if ($server->taskworker) {
        echo 'task worker:';
    } else {
        echo 'worker:';
    }
    echo  $server->worker_id . PHP_EOL;
});

$server->on('PipeMessage',function ($server, $src_worker_id,$message) {
    echo "来自于{$src_worker_id}的管道消息".PHP_EOL;
    var_dump($message);
});

$server->on('receive', function($server, $fd, $reactor_id, $data) {
    var_dump('接收消息' , $data);
});

$server->on('task', function($server, $task_id, $reactor_id, $data) {
   var_dump($data);
});

$server->on('finish', function($server, $task_id, $data) {
    echo 'finish task_id:' . $task_id . PHP_EOL;
});

$server->on('close', function() {
    echo 'close' . PHP_EOL;
});

$server->start();
<?php

$msg_queue = msg_get_queue(0x70);

$taskId = 0;
$workId = 0;

function pack_swoole($data)
{
    global $taskId, $workId;
    $fromFd = 0;
    $type = 7;
    if (!is_string($data)) {
        $data = serialize($data);
        $fromFd |= 2;
    }
    if (strlen($data) >= 8180) {
        $tmpFile = tempnam('/tmp/', 'swoole.task');
        file_put_contents($tmpFile, $data);
        $data = pack('l', strlen($data)) . $tmpFile . "\0";
        $fromFd |= 1;
        $len = 128 + 24;
    } else {
        $len = strlen($data);
    }
    //typedef struct _swDataHead
    //{
    //    int fd;
    //    uint16_t len;
    //    int16_t reactor_id;
    //    uint8_t type;
    //    uint8_t flags;
    //    uint16_t server_fd;
    //} swDataHead;
    return pack('lSsCCS', $taskId++, $len, $workId, $type, 0, $fromFd) . $data;
}
// 字符串
echo msg_send($msg_queue, $workId + 1, pack_swoole('XXXXhi php'), false);
// 数组
echo msg_send($msg_queue, $workId + 1, pack_swoole(['data' => str_repeat('A', 1024), 'type' => 1]), false);
// 大包
echo msg_send($msg_queue, $workId + 1, pack_swoole(['data' => str_repeat('B', 1024 * 32), 'type' => 2]), false);




客户端端的TCP模式可以参考socket客户端示例。客户端发送消息队列模式,我的swoole版本是4.4,会有一些bug,比如服务端接收数据的时候会忽略消息的前4个字节。

bug已经修复 https://github.com/swoole/swoole-src/commit/0efcc6f9a7a5534de1eb82bfeeab319cefc904ab

如无特殊说明,文章均为本站原创,转载请注明出处。如发现有什么不对的地方,希望得到您的指点。

发表评论

电子邮件地址不会被公开。 必填项已用*标注