PHP多线程pthreads-v3-线程池的使用及加锁同步的演示

1.线程池的介绍

当我们在进行多线程开发的时候,假如你需要100个线程来执行业务逻辑,你就得创建100个线程。这样依赖内存得开销非常大,线程也不太好管理。在此技术背景下,线程池的概念就出现了,通过预创建一个线程组,线程组中拥有处理线程的多个worker线程,把需要处理的业务线程添加到线程组,线程组再分配给worker,这样就不再需要创建100个线程,而是通过线程复用的方式来处理业务。当分配的job超过线程池中worker数量时,job会放到队列中等待被worker执行。

2.PHP线程池的结构图

3.worker的介绍

因为PHP的内存管理由脚本的生命周期决定,所以理论上来说每个线程在脚本执行完之前的上下文都是持久化的。而Worker类在PHP中继承自Pthread类,文档上说具备持久化线程上下文。通过添加Thread类,可以方便的管理多个Thread,当Worker执行的时候,就会调用被添加的Thread类的run方法。

3.1当job被添加到worker以后,本身不会创建线程

<?php
class MyWoker extends Worker
{

    public function run()
    {
        echo 'In Worker Tid:' . $this->getThreadId() . PHP_EOL;
    }

}

class Job extends Thread
{
    public function run()
    {
        echo 'In Job Tid:' . $this->getThreadId() . PHP_EOL;
        echo 'Worker Tid:' . $this->worker->getThreadId() . PHP_EOL;
    }
}

$worker = new MyWoker();
$job = new Job();
$worker->stack($job);
$worker->start();
$worker->join();

4.Pool的介绍

线程池就是可以看做是管理worker的worker,当job超出worker数量时,job会排队执行 。

4.1job超出worker数量时的演示

<?php
class Job extends Thread
{
    public function run()
    {
        echo 'Worker Tid:' . $this->worker->getThreadId() . PHP_EOL;
    }
}

$pool = new pool(3);
$pool->submit(new Job());
$pool->submit(new Job());
$pool->submit(new Job());
$pool->submit(new Job());
$pool->submit(new Job());
$pool->submit(new Job());

while($pool->collect());

$pool->shutdown();
我们看到worker的线程id始终是这3个,pool起到了线程复用的效果

4.2Pool线程池同步加锁的使用

<?php

class Mutex extends Threaded
{

    public function __construct($lock=false)
    {
        $this->lock = $lock;
        file_put_contents('data.txt', 0);
    }

    public function lock()
    {
        $this->lock = true;
    }

    public function unLock()
    {
        $this->lock = false;
    }

    public function isLock()
    {
        return $this->lock;
    }

    public function plusVal()
    {
        $val = $this->getVal();
        file_put_contents('data.txt', ++$val);
    }

    public function getVal()
    {
        return file_get_contents('data.txt');
    }
}

class MyWorker extends Worker
{
    public function __construct(Mutex $mutex)
    {
        $this->mutex = $mutex;
    }

    public function run()
    {
        echo 'In Worker Tid:' . $this->getThreadId() . PHP_EOL;
    }

    public function readWrite()
    {
        $this->synchronized(function($pthread){
            if (!$pthread->mutex->isLock()) {
                $pthread->mutex->lock();
                echo "value: " . $pthread->mutex->getVal() . PHP_EOL;
                $pthread->mutex->plusVal();
                $pthread->mutex->unLock();
            } else {
                echo 'locked ' . PHP_EOL;
            }
        },$this);
    }
}

class Job extends Thread
{
    public $lock = false;
    public function run()
    {
        echo 'In Job Worker Tid:' . $this->worker->getThreadId() . PHP_EOL;
        $this->worker->readWrite();

    }
}

$mutex = new Mutex();
$pool = new Pool(3, 'MyWorker', [$mutex]);

for($i = 0; $i < 100; $i++) {
    $pool->submit(new Job());
}

while($pool->collect());

$pool->shutdown();
如无特殊说明,文章均为本站原创,转载请注明出处。如发现有什么不对的地方,希望得到您的指点。

发表评论

电子邮件地址不会被公开。 必填项已用*标注