1.前言
swoole中的task和进程池比较类似,都是多进程任务的处理,task封装在server里面,在socket方面支持的更加友善。task进程通信方式和进程池一样有socket和消息队列等方式,下面我们来看下使用示例。
2.socket模式
<?php
const MAXREPLAY = 5;
$server = new swoole_server('0.0.0.0', 7777, SWOOLE_BASE);
$server->set([
'worker_num' => 1,
'task_worker_num' => 2,
'task_ipc_mode' => 1, //1:socket 2:消息队列 3:消息队列竞争模式
]);
$server->on('workerStart', function($server, $fd) {
if ($server->taskworker) {
echo 'task worker:';
} else {
echo 'worker:';
}
echo $server->worker_id . PHP_EOL;
});
$server->on('PipeMessage',function ($server, $src_worker_id,$message) {
echo "来自于{$src_worker_id}的管道消息".PHP_EOL;
var_dump($message);
$server->task($message); // 异步任务
$server->taskwait($message ,10); //同步任务
});
$server->on('receive', function($server, $fd, $reactor_id, $data) {
var_dump('接收消息' , $data);
$data = ['tid' => time() , 'data' => $data, 'fd' => $fd];
$server->task($data);
});
$server->on('task', function($server, $task_id, $reactor_id, $data) {
if (isset($data['task_id'])) {
if ($data['replay'] < MAXREPLAY) {
//to do
++$data['replay'];
$server->sendMessage($data,0);
} else {
$finish['task_id'] = $data['task_id'];
$finish['status'] = 0;
$finish['error'] = '任务执行次数超过' . MAXREPLAY . '执行失败';
$finish['fd'] = $data['fd'];
$server->finish($finish);
}
} else {
$data['replay'] = 0;
$data['task_id'] = $task_id;
$server->sendMessage($data,0);
}
});
$server->on('finish', function($server, $task_id, $data) {
echo 'finish task_id:' . $data['task_id'] . PHP_EOL;
$server->send($data['fd'], $data['error']);
});
$server->on('close', function() {
echo 'close' . PHP_EOL;
});
$server->start();
<?php
$client=new swoole\Client(SWOOLE_SOCK_TCP);
$client->connect('127.0.0.1',7777);
$message = '123';
$client->send($message);
echo $client->recv() . PHP_EOL;
$client->close();
3.消息队列
服务端采用消息队列处理任务以后,客户端有两种方式的实现:
1.客户端采用TCP方式发送任务
2.客户端采用消息队列的方式发送任务
后一种发送消息的时候,需要封装成swoole的消息队列头才能被服务器识别,而且这种方式,服务器需要配置消息队列竞争模式才能够从消息队列中读取数据。
<?php
$server = new swoole_server('0.0.0.0', 7777, SWOOLE_BASE);
$server->set([
'worker_num' => 1,
'task_worker_num' => 2,
'task_ipc_mode' => 3, //1:socket 2:消息队列 3:消息队列竞争模式
'message_queue_key' => 0x70,
'task_tmpdir' => '/tmp/',
]);
$server->on('workerStart', function($server, $fd) {
if ($server->taskworker) {
echo 'task worker:';
} else {
echo 'worker:';
}
echo $server->worker_id . PHP_EOL;
});
$server->on('PipeMessage',function ($server, $src_worker_id,$message) {
echo "来自于{$src_worker_id}的管道消息".PHP_EOL;
var_dump($message);
});
$server->on('receive', function($server, $fd, $reactor_id, $data) {
var_dump('接收消息' , $data);
});
$server->on('task', function($server, $task_id, $reactor_id, $data) {
var_dump($data);
});
$server->on('finish', function($server, $task_id, $data) {
echo 'finish task_id:' . $task_id . PHP_EOL;
});
$server->on('close', function() {
echo 'close' . PHP_EOL;
});
$server->start();
<?php
$msg_queue = msg_get_queue(0x70);
$taskId = 0;
$workId = 0;
function pack_swoole($data)
{
global $taskId, $workId;
$fromFd = 0;
$type = 7;
if (!is_string($data)) {
$data = serialize($data);
$fromFd |= 2;
}
if (strlen($data) >= 8180) {
$tmpFile = tempnam('/tmp/', 'swoole.task');
file_put_contents($tmpFile, $data);
$data = pack('l', strlen($data)) . $tmpFile . "\0";
$fromFd |= 1;
$len = 128 + 24;
} else {
$len = strlen($data);
}
//typedef struct _swDataHead
//{
// int fd;
// uint16_t len;
// int16_t reactor_id;
// uint8_t type;
// uint8_t flags;
// uint16_t server_fd;
//} swDataHead;
return pack('lSsCCS', $taskId++, $len, $workId, $type, 0, $fromFd) . $data;
}
// 字符串
echo msg_send($msg_queue, $workId + 1, pack_swoole('XXXXhi php'), false);
// 数组
echo msg_send($msg_queue, $workId + 1, pack_swoole(['data' => str_repeat('A', 1024), 'type' => 1]), false);
// 大包
echo msg_send($msg_queue, $workId + 1, pack_swoole(['data' => str_repeat('B', 1024 * 32), 'type' => 2]), false);
客户端端的TCP模式可以参考socket客户端示例。客户端发送消息队列模式,我的swoole版本是4.4,会有一些bug,比如服务端接收数据的时候会忽略消息的前4个字节。
bug已经修复 https://github.com/swoole/swoole-src/commit/0efcc6f9a7a5534de1eb82bfeeab319cefc904ab
如无特殊说明,文章均为本站原创,转载请注明出处。如发现有什么不对的地方,希望得到您的指点。