im/extend/easyTask/Process/Win.php

459 lines
11 KiB
PHP
Raw Permalink Normal View History

2023-09-26 18:09:46 +08:00
<?php
namespace easyTask\Process;
use easyTask\Wts;
use easyTask\Wpc;
use easyTask\Env;
use easyTask\Helper;
use \Exception as Exception;
use \Throwable as Throwable;
/**
* Class Win
* @package easyTask\Process
*/
class Win extends Process
{
/**
* Wts服务
* @var Wts
*/
protected $wts;
/**
* 虚拟进程列表
* @var array
*/
protected $workerList;
/**
* 实体进程容器
* @var array
*/
protected $wpcContainer;
/**
* AutoRec事件
* @var bool
*/
protected $autoRecEvent;
/**
* 构造函数
* @param array $taskList
*/
public function __construct($taskList)
{
$this->wts = new Wts();
parent::__construct($taskList);
}
/**
* 开始运行
*/
public function start()
{
//构建基础
$this->make();
//启动检查
$this->checkForRun();
//进程分配
$func = function ($name) {
$this->executeByProcessName($name);
};
if (!$this->wts->allocateProcess($func))
{
Helper::showError('unexpected error, process has been allocated');
}
}
/**
* 启动检查
*/
protected function checkForRun()
{
if (!Env::get('phpPath'))
{
Helper::showError('please use setPhpPath api to set phpPath');
}
if (!$this->chkCanStart())
{
Helper::showError('please close the running process first');
}
}
/**
* 检查进程
* @return bool
*/
protected function chkCanStart()
{
$workerList = $this->workerList;
foreach ($workerList as $name => $item)
{
$status = $this->wts->getProcessStatus($name);
if (!$status)
{
return true;
}
}
return false;
}
/**
* 跟进进程名称执行任务
* @param string $name
* @throws Exception|Throwable
*/
protected function executeByProcessName($name)
{
switch ($name)
{
case 'master':
$this->master();
break;
case 'manager':
$this->manager();
break;
default:
$this->invoker($name);
}
}
/**
* 构建任务
*/
protected function make()
{
$list = [];
if (!$this->wts->getProcessStatus('manager'))
{
$list = ['master', 'manager'];
}
foreach ($list as $name)
{
$this->wts->joinProcess($name);
}
foreach ($this->taskList as $key => $item)
{
//提取参数
$alas = $item['alas'];
$used = $item['used'];
//根据Worker数构建
for ($i = 0; $i < $used; $i++)
{
$name = $item['name'] = $alas . '___' . $i;
$this->workerList[$name] = $item;
$this->wts->joinProcess($name);
}
}
}
/**
* 主进程
* @throws Exception
*/
protected function master()
{
//创建常驻进程
$this->forkItemExec();
//查询状态
$i = $this->taskCount + 15;
while ($i--)
{
$status = $this->wts->getProcessStatus('manager');
if ($status)
{
$this->status();
break;
}
Helper::sleep(1);
}
}
/**
* 常驻进程
*/
protected function manager()
{
//分配子进程
$this->allocate();
//后台常驻运行
$this->daemonWait();
}
/**
* 分配子进程
*/
protected function allocate()
{
//清理进程信息
$this->wts->cleanProcessInfo();
foreach ($this->taskList as $key => $item)
{
//提取参数
$used = $item['used'];
//根据Worker数创建子进程
for ($i = 0; $i < $used; $i++)
{
$this->joinWpcContainer($this->forkItemExec());
}
}
}
/**
* 注册实体进程
* @param Wpc $wpc
*/
protected function joinWpcContainer($wpc)
{
$this->wpcContainer[] = $wpc;
foreach ($this->wpcContainer as $key => $wpc)
{
if ($wpc->hasExited())
{
unset($this->wpcContainer[$key]);
}
}
}
/**
* 创建任务执行子进程
* @return Wpc
*/
protected function forkItemExec()
{
$wpc = null;
try
{
//提取参数
$argv = Helper::getCliInput(2);
$file = array_shift($argv);;
$char = join(' ', $argv);
$work = dirname(array_shift($argv));
$style = Env::get('daemon') ? 1 : 0;
//创建进程
$wpc = new Wpc();
$wpc->setFile($file);
$wpc->setArgument($char);
$wpc->setStyle($style);
$wpc->setWorkDir($work);
$pid = $wpc->start();
if (!$pid) Helper::showError('create process failed,please try again', true);
}
catch (Exception $exception)
{
Helper::showError(Helper::convert_char($exception->getMessage()), true);
}
return $wpc;
}
/**
* 执行器
* @param string $name 任务名称
* @throws Throwable
*/
protected function invoker($name)
{
//提取字典
$taskDict = $this->workerList;
if (!isset($taskDict[$name]))
{
Helper::showError("the task name $name is not exist" . json_encode($taskDict));
}
//提取Task字典
$item = $taskDict[$name];
//输出信息
$pid = getmypid();
$title = Env::get('prefix') . '_' . $item['alas'];
Helper::showInfo("this worker $title is start");
//设置进程标题
Helper::cli_set_process_title($title);
//保存进程信息
$item['pid'] = $pid;
$this->wts->saveProcessInfo([
'pid' => $pid,
'name' => $item['name'],
'alas' => $item['alas'],
'started' => date('Y-m-d H:i:s', $this->startTime),
'time' => $item['time']
]);
//执行任务
$this->executeInvoker($item);
}
/**
* 通过默认定时执行
* @param array $item 执行项目
* @throws Throwable
*/
protected function invokeByDefault($item)
{
while (true)
{
//CPU休息
Helper::sleep($item['time']);
//执行任务
$this->execute($item);
}
exit;
}
/**
* 检查常驻进程是否存活
* @param array $item
*/
protected function checkDaemonForExit($item)
{
//检查进程存活
$status = $this->wts->getProcessStatus('manager');
if (!$status)
{
$text = Env::get('prefix') . '_' . $item['alas'];
Helper::showInfo("listened exit command, this worker $text is exiting safely", true);
}
}
/**
* 后台常驻运行
*/
protected function daemonWait()
{
//进程标题
Helper::cli_set_process_title(Env::get('prefix'));
//输出信息
$text = "this manager";
Helper::showInfo("$text is start");;
//挂起进程
while (true)
{
//CPU休息
Helper::sleep(1);
//接收命令status/stop
$this->commander->waitCommandForExecute(2, function ($command) use ($text) {
$commandType = $command['type'];
switch ($commandType)
{
case 'status':
$this->commander->send([
'type' => 'status',
'msgType' => 1,
'status' => $this->getReport(),
]);
Helper::showInfo("listened status command, $text is reported");
break;
case 'stop':
if ($command['force']) $this->stopWorkerByForce();
Helper::showInfo("listened exit command, $text is exiting safely", true);
break;
}
}, $this->startTime);
//检查进程
if (Env::get('canAutoRec'))
{
$this->getReport(true);
if ($this->autoRecEvent)
{
$this->autoRecEvent = false;
}
}
}
}
/**
* 获取报告
* @param bool $output
* @return array
* @throws
*/
protected function getReport($output = false)
{
$report = $this->workerStatus($this->taskCount);
foreach ($report as $key => $item)
{
if ($item['status'] == 'stop' && Env::get('canAutoRec'))
{
$this->joinWpcContainer($this->forkItemExec());
if ($output)
{
$this->autoRecEvent = true;
Helper::showInfo("the worker {$item['name']}(pid:{$item['pid']}) is stop,try to fork a new one");
}
}
}
return $report;
}
/**
* 查看进程状态
* @param int $count
* @return array
*/
protected function workerStatus($count)
{
//构建报告
$report = $infoData = [];
$tryTotal = 10;
while ($tryTotal--)
{
Helper::sleep(1);
$infoData = $this->wts->getProcessInfo();
if ($count == count($infoData)) break;
}
//组装数据
$pid = getmypid();
$prefix = Env::get('prefix');
foreach ($infoData as $name => $item)
{
$report[] = [
'pid' => $item['pid'],
'name' => "{$prefix}_{$item['alas']}",
'started' => $item['started'],
'time' => $item['time'],
'status' => $this->wts->getProcessStatus($name) ? 'active' : 'stop',
'ppid' => $pid,
];
}
return $report;
}
/**
* 强制关闭所有进程
*/
protected function stopWorkerByForce()
{
foreach ($this->wpcContainer as $wpc)
{
try
{
$wpc->stop(2);
}
catch (Exception $exception)
{
Helper::showError(Helper::convert_char($exception->getMessage()), false);
}
}
}
}