php socket服务的模型以及完成多进程IO复用libevent
发布时间:2022-02-23 23:33:36 所属栏目:PHP教程 来源:互联网
导读:端口复用技术,这样就可以很好的解决惊群问题和stream_socket_server性能瓶颈的问题. /** * 多进程IO复用libevent * 同时处理多个连接 * 端口复用---建议php7 */ class Xtgxiso_server { public $socket = false; public $master = array(); public $onCon
端口复用技术,这样就可以很好的解决惊群问题和stream_socket_server性能瓶颈的问题. /** * 多进程IO复用libevent * 同时处理多个连接 * 端口复用---建议php7 */ class Xtgxiso_server { public $socket = false; public $master = array(); public $onConnect = null; public $onMessage = null; public $onClose = null; public $process_num = 2; private $pids = array(); public $receive = array(); private $host='127.0.0.1'; private $port = 1215; function __construct($host="0.0.0.0",$port=1215){ //产生子进程分支 $pid = pcntl_fork(); if ($pid == -1) { die("could not fork"); //pcntl_fork返回-1标明创建子进程失败 } else if ($pid) { exit(); //父进程中pcntl_fork返回创建的子进程进程号 } else { // 子进程pcntl_fork返回的时0 } // 从当前终端分离 if (posix_setsid() == -1) { die("could not detach from terminal"); } umask(0); $this->host = $host; $this->port = $port; } private function start_worker_process(){ $pid = pcntl_fork(); switch ($pid) { case -1: echo "fork error : {$i} /r/n"; exit; case 0: $context_option['socket']['so_reuseport'] = 1; $context = stream_context_create($context_option); $this->socket = stream_socket_server("tcp://".$this->host.":".$this->port, $errno, $errstr,STREAM_SERVER_BIND | STREAM_SERVER_LISTEN,$context); if (!$this->socket) die($errstr."--".$errno); stream_set_blocking($this->socket,0); $id = (int)$this->socket; $this->master[$id] = $this->socket; $base = event_base_new(); $event = event_new(); event_set($event, $this->socket, EV_READ | EV_PERSIST, array(__CLASS__, 'ev_accept'), $base); event_base_set($event, $base); event_add($event); echo posix_getpid()." start run.../n"; event_base_loop($base); default: $this->pids[$pid] = $pid; break; } } public function run(){ for($i = 1; $i <= $this->process_num; $i++){ $this->start_worker_process(); } while(1){ foreach ($this->pids as $i => $pid) { if($pid) { $res = pcntl_waitpid($pid, $status,WNOHANG); if ( $res == -1 || $res > 0 ){ $this->start_worker_process(); unset($this->pids[$pid]); } } } sleep(1); } } public function ev_accept($socket, $flag, $base){ $connection = @stream_socket_accept($socket); echo posix_getpid()." -- accepted " . stream_socket_get_name($connection,true) . "/n"; if ( !$connection ){ return; } stream_set_blocking($connection, 0); $id = (int)$connection; if($this->onConnect) { call_user_func($this->onConnect, $connection); } $buffer = event_buffer_new($connection, array(__CLASS__, 'ev_read'), array(__CLASS__, 'ev_write'), array(__CLASS__, 'ev_error'), $id); event_buffer_base_set($buffer, $base); event_buffer_timeout_set($buffer, 30, 30); event_buffer_watermark_set($buffer, EV_READ, 0, 0xffffff); event_buffer_priority_set($buffer, 10); event_buffer_enable($buffer, EV_READ | EV_PERSIST); $this->master[$id] = $connection; $this->buffer[$id] = $buffer; $this->receive[$id] = ''; } function ev_read($buffer, $id) { while( 1 ) { $read = event_buffer_read($buffer, 3); if($read === '' || $read === false) { break; } $pos = strpos($read, "/n"); if($pos === false) { $this->receive[$id] .= $read; //echo "received:".$read.";not all package,continue recdiveing/n"; }else{ $this->receive[$id] .= trim(substr ($read,0,$pos+1)); $read = substr($read,$pos+1); if($this->onMessage) { call_user_func($this->onMessage,$this->master[$id],$this->receive[$id]); } $server = new Xtgxiso_server(); $server->onConnect = function($conn){ echo "onConnect -- accepted " . stream_socket_get_name($conn,true) . "/n"; fwrite($conn,"conn success/n"); }; $server->onMessage = function($conn,$msg){ echo "onMessage --" . $msg . "/n"; fwrite($conn,"received ".$msg."/n"); }; //Cuoxin.com $server->onClose = function($conn){ echo "onClose --" . stream_socket_get_name($conn,true) . "/n"; }; $server->run(); 经过多次服务模型的演变,基本我们实现了一个高性能的服务模型! (编辑:黄山站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
站长推荐