管道通信PIPE
管道用于承载简称之间的通讯数据。为了方便理解,可以将管道比作文件,进程A将数据写到管道P中,然后进程B从管道P中读取数据。php提供的管道操作API与操作文件的API基本一样,除了创建管道使用posix_mkfifo函数,读写等操作均与文件操作函数相同。当然,你可以直接使用文件模拟管道,但是那样无法使用管道的特性了。
通过管道通信的大概思路是,首先创建一个管道,然后子进程向管道中写入信息,父进程从管道中读取信息,这样就可以做到父子进程直接实现通信了。
<?php // 创建管道 $pipePath = "pipe"; if( !file_exists( $pipePath ) ){ if( !posix_mkfifo( $pipePath, 0666) ){ exit('make pipe false!' . PHP_EOL); } } // 创建进程,子进程写管道,父进程读管道 // 通过 pcntl_fork函数创建一个子进程。 // pcntl_fork 函数 很特殊,它调用一次拥有 多个返回值。 // 在父进程中:它返回 子进程的ID 这个值是 大于0 的。 // 在子进程中,它返回0。当返回 -1 时表示创建进程失败。 $pid = pcntl_fork(); if( $pid == 0 ){ // 子进程写管道 $file = fopen( $pipePath, 'w'); fwrite( $file, 'hello world'); sleep(1); exit; }else{ // 父进程读管道 $file = fopen( $pipePath, 'r'); // 设置成读取非阻塞 // 当读取是非阻塞的情况下,父进程进行读取信息的时候不会等待, // 管道中没有消息也会立马返回。 // stream_set_blocking( $file, False); echo fread( $file, 20) . PHP_EOL; pcntl_wait($status); // 回收子进程 }
消息队列
消息队列是存放在内存中的一种队列数据结构。
<?php // 获取父进程id $parentPid = posix_getpid(); echo "parent progress pid:{$parentPid}\n"; $childList = array(); // 创建消息队列,定义消息类型 $id = ftok(__FILE__, 'm'); $msgQueue = msg_get_queue($id); const MSG_TYEP = 1; // 生产者 function producer() { global $msgQueue; $pid = posix_getpid(); $repeatNum = 5; for ($i = 0; $i <= $repeatNum; $i++) { $str = "({$pid}) progress create! {$i}"; msg_send($msgQueue, MSG_TYEP, $str); $rand = rand(1, 3); sleep($rand); } } // 消费者 function consumer() { global $msgQueue; $pid = posix_getpid(); $repeatNum = 6; for ($i = 1; $i<= $repeatNum; $i++) { $rel = msg_receive($msgQueue, MSG_TYEP, $msgType, 1024, $message); echo "{$message} | consumer({$pid}) destroy \n"; $rand = rand(1, 3); sleep($rand); } } function createProgress($callback) { $pid = pcntl_fork(); if ($pid == -1) { // 创建失败 exit("fork progresses error\n"); } elseif ($pid == 0) { // 子进程执行程序 $pid = posix_getpid(); $callback(); exit("({$pid})child progress end!\n"); } else { // 父进程 return $pid; } } for ($i = 0; $i < 3; $i++) { $pid = createProgress('producer'); $childList[$pid] = 1; echo "create producer progresses: {$pid}\n"; } for ($i = 0; $i < 2; $i++) { $pid = createProgress('consumer'); $childList[$pid] = 1; echo "create consumer progresses: {$pid}\n"; } while (!empty($childList)) { $childPid = pcntl_wait($status); if ($childPid > 0) { unset($childList[$childPid]); } } echo "({$parentPid})main progress end!\n";
运行结果:
create producer progresses: 21432
create producer progresses: 21433
create producer progresses: 21434
create consumer progresses: 21435
(21426) progress create! 2 | consumer(21435) destroy
(21424) progress create! 1 | consumer(21436) destroy
create consumer progresses: 21436
(21426) progress create! 3 | consumer(21436) destroy
(21426) progress create! 4 | consumer(21435) destroy
(21425) progress create! 3 | consumer(21436) destroy
(21424) progress create! 2 | consumer(21435) destroy
(21426) progress create! 5 | consumer(21435) destroy
(21424) progress create! 3 | consumer(21436) destroy
(21433)child progress end!
(21425) progress create! 4 | consumer(21435) destroy
(21424) progress create! 4 | consumer(21436) destroy
(21434)child progress end!
(21424) progress create! 5 | consumer(21435) destroy
(21425) progress create! 5 | consumer(21436) destroy
(21432)child progress end!
(21435)child progress end!
(21436)child progress end!
(21431)main progress end!
信号量与共享内存
<?php $parentPid = posix_getpid(); echo "parent progress pid:{$parentPid}\n"; // 创建共享内存,创建信号量,定义共享key // ftok(文件路径,资源标识符) 创建一个IPC通信所需的id $shm_id = ftok(__FILE__, 'm'); $shm_id = ftok(__FILE__, 's'); // shm_attach(id) 创建或者打开一个共享内存 $shareMemory = shm_attach($shm_id); // 返回一个可用户访问系统信号量的id $signal = sem_get($shm_id); const SHARE_KEY = 1; // 生产者 function producer() { global $shareMemory; global $signal; $pid = posix_getpid(); $repeatNum = 5; for ($i = 1; $i <= $repeatNum; $i++) { // 获得信号量 - 阻塞进程,直到信号量被获取到[lock锁机制的关键] sem_acquire($signal); // 检查某个key是否存在与共享内存中 if (shm_has_var($shareMemory, SHARE_KEY)) { // 获取共享内存中的key的值 $count = shm_get_var($shareMemory, SHARE_KEY); $count ++; // 为共享内存中的key赋值 shm_put_var($shareMemory, SHARE_KEY, $count); echo "({$pid}) count: {$count}\n"; } else { // 初始化 shm_put_var($shareMemory, SHARE_KEY, 0); echo "({$pid}) count: 0\n"; } // 释放 sem_release($signal); } } function createProgress($callback) { $pid = pcntl_fork(); if ($pid == -1) { // 创建失败 exit("fork progress error!\n"); } elseif ($pid == 0) { // 子进程 $pid = posix_getpid(); $callback(); exit("({$pid}) child progress end!\n"); } else { // 父进程 return $pid; } } // 3个写进程 for ($i = 0; $i < 3; $i ++) { $pid = createProgress('producer'); $childList[$pid] = 1; echo "create producer child progress: {$pid} \n"; } // 等待所有子进程 while (!empty($childList)) { $childPid = pcntl_wait($status); if ($childPid > 0) { unset($childList[$childPid]); } } // 释放共享内存与信号量 shm_remove($shareMemory); sem_remove($signal); echo "({$parentPid}) main progress end!\n";
运行结果:
使用信号量来实现共享内存的锁机制
parent progress pid:31720
create producer child progress: 31721
create producer child progress: 31722
(31721) count: 0
(31721) count: 1
(31721) count: 2
(31721) count: 3
(31721) count: 4
(31721) child progress end!
create producer child progress: 31723
(31722) count: 5
(31722) count: 6
(31722) count: 7
(31722) count: 8
(31722) count: 9
(31722) child progress end!
(31723) count: 10
(31723) count: 11
(31723) count: 12
(31723) count: 13
(31723) count: 14
(31723) child progress end!
(31720) main progress end!
无锁情况
Warning: sem_release(): SysV semaphore 4357894312 (key 0x73048925) is not currently acquired in /Users/easyboom/www/example/信号量与共享内存.php on line 38
以上就是PHP进程间通信的几种方法详解的详细内容,更多关于PHP进程间通信的资料请关注寻技术其它相关文章!