185 lines
6.5 KiB
PHP
185 lines
6.5 KiB
PHP
<?php
|
||
/*
|
||
php /data/ciyon/fork/fork_apinotify.php
|
||
|
||
/etc/systemd/system/ciyapinotify.service
|
||
[Unit]
|
||
Description=ciy apinotify
|
||
After=network.target
|
||
[Service]
|
||
WorkingDirectory=/data/ciyon/fork/
|
||
ExecStart=/usr/bin/php /data/ciyon/fork/fork_apinotify.php
|
||
Restart=always
|
||
RestartSec=5s
|
||
[Install]
|
||
WantedBy=multi-user.target
|
||
|
||
systemctl enable ciyapinotify.service
|
||
systemctl start ciyapinotify.service
|
||
|
||
*/
|
||
$path = __DIR__;
|
||
$path = str_replace('\\', '/', $path);
|
||
$path = dirname($path);
|
||
chdir($path);
|
||
require $path . '/zciyphp/comm.php';
|
||
require $path . '/web/admin/common.php';
|
||
set_time_limit(0);
|
||
$shouldStop = false;
|
||
$currentProcesses = 0;
|
||
|
||
pcntl_signal(SIGTERM, function ($signo) use (&$shouldStop, &$currentProcesses) {
|
||
outlog('优雅关闭中:' . $currentProcesses);
|
||
$shouldStop = true;
|
||
});
|
||
pcntl_signal(SIGCHLD, function ($signo) use (&$currentProcesses) {
|
||
echo "pcntl_signal: $signo\n";
|
||
while (pcntl_waitpid(-1, $status, WNOHANG) > 0) {
|
||
//echo "pcntl_wexitstatus: $status\n";
|
||
//$exitCode = pcntl_wexitstatus($status);
|
||
//echo "子进程退出,状态码: $exitCode\n";
|
||
$currentProcesses--;
|
||
if ($currentProcesses < 0)
|
||
$currentProcesses = 0;
|
||
}
|
||
});
|
||
function outlog($msg) {
|
||
savelogfile('fork_apinotify', $msg);
|
||
}
|
||
function mainProcess() {
|
||
global $shouldStop, $currentProcesses, $logpath;
|
||
$db = new \ciy\db();
|
||
$maxProcesses = 30; //1GB=15-20个进程
|
||
$batchSize = 20; // 每次批量获取的记录数
|
||
$spawnInterval = 100; // 子进程启动间隔(毫秒)
|
||
// 安装SIGCHLD信号处理器,避免僵尸进程
|
||
|
||
outlog('主进程启动,PID: ' . getmypid());
|
||
$isbusy = false;
|
||
while (true) {
|
||
pcntl_signal_dispatch();
|
||
if ($shouldStop) {
|
||
$time = time();
|
||
while ($currentProcesses > 0 && $time > time() - 60) {
|
||
pcntl_signal_dispatch();
|
||
sleep(1);
|
||
}
|
||
outlog('已关闭:' . $currentProcesses);
|
||
exit(0);
|
||
}
|
||
|
||
if ($currentProcesses >= $maxProcesses) {
|
||
//echo "达到最大子进程数限制 ($maxProcesses),等待子进程退出...\n";
|
||
sleep(1);
|
||
continue;
|
||
}
|
||
file_put_contents($logpath . 'fork_apinotify.tak', 'curr=' . $currentProcesses . ',max=' . $maxProcesses . ',batch=' . $batchSize . ',spawn=' . $spawnInterval . ',time=' . time());
|
||
|
||
$csql = new \ciy\sql('ap_transfer');
|
||
//默认nexttimes=9999999999999,有90/100通知后,nexttimes=0,等待推送。
|
||
//推成功,nexttimes=9999999999998
|
||
//没成功,首次retimes=now,nexttimes=now+retimesspan+10s,直至24小时后=9999999999997
|
||
//retimes>0判断是否有二次推送,判断商户api质量
|
||
//nexttimes=9999999999997,判断是否始终推送失败,判断商户api接入能力
|
||
//nexttimes=9999999999996,fork执行脚本非法跳出bug,或强制关闭导致
|
||
$csql->where('nexttimes<', time());
|
||
$csql->limit(1, $batchSize);
|
||
$orderrows = $db->get($csql);
|
||
if ($orderrows === false || count($orderrows) == 0) {
|
||
//echo "没有待处理任务,等待3秒...\n";
|
||
sleep(3);
|
||
continue;
|
||
}
|
||
//outlog('发现待处理任务: ' . count($notifyrows) . '个');
|
||
foreach ($orderrows as $orderrow) {
|
||
if (empty($orderrow['notifyurl']))
|
||
continue;
|
||
if ($currentProcesses >= $maxProcesses) {
|
||
if (!$isbusy)
|
||
outlog('任务拥堵:' . $currentProcesses . '/' . $maxProcesses);
|
||
$isbusy = true;
|
||
sleep(3);
|
||
break;
|
||
}
|
||
if ($currentProcesses < $maxProcesses / 2) {
|
||
if ($isbusy)
|
||
outlog('拥堵暂缓:' . $currentProcesses . '/' . $maxProcesses);
|
||
$isbusy = false;
|
||
}
|
||
$pid = pcntl_fork();
|
||
if ($pid == -1) {
|
||
outlog('创建子进程失败:' . pcntl_strerror(posix_get_last_error()));
|
||
} elseif ($pid) {
|
||
//echo "创建子进程成功,PID: $pid, 处理任务 ID: $id\n";
|
||
$currentProcesses++;
|
||
usleep($spawnInterval * 1000);
|
||
} else {
|
||
//echo "子进程开始执行任务,PID: " . getmypid() . ", 任务 ID: $id\n";
|
||
fork_apinotify($orderrow);
|
||
exit;
|
||
}
|
||
}
|
||
usleep(500 * 1000);
|
||
}
|
||
}
|
||
if (!extension_loaded('pcntl')) {
|
||
die("PCNTL扩展未加载,无法使用多进程功能\n");
|
||
}
|
||
mainProcess();
|
||
|
||
function fork_apinotify($orderrow, $debug = false) {
|
||
$db = new \ciy\db();
|
||
$id = $orderrow['id'];
|
||
$updata = array();
|
||
$updata['nexttimes'] = 9999999999996;
|
||
$csql = new \ciy\sql('ap_transfer');
|
||
$csql->where('id', $id);
|
||
$db->update($csql, $updata);
|
||
//orderrow就是数据集,再加上时间戳、sign、pairsign
|
||
$retapi = $orderrow;
|
||
unset($retapi['signature']);
|
||
$signstr = '';
|
||
ksort($retapi);
|
||
foreach ($retapi as $key => $value) {
|
||
$signstr .= $key . '=' . $value . '&';
|
||
}
|
||
$signstr = substr($signstr, 0, -1);
|
||
$sign = sha256($signstr);
|
||
$retapi['hash'] = $sign;
|
||
|
||
$http = new \ciy\http();
|
||
$http->set_timeout(10);
|
||
$http->request($orderrow['notifyurl'], json_encode($retapi));
|
||
$result = $http->get_data();
|
||
if (strpos($result, 'success') !== false) {
|
||
$updata = array();
|
||
$updata['nexttimes'] = 9999999999998;
|
||
$csql = new \ciy\sql('ap_transfer');
|
||
$csql->where('id', $id);
|
||
if ($db->update($csql, $updata) === false) {
|
||
if ($debug) clog('操作notify失败succ:' . $db->error);
|
||
savelogfile('err_db', '操作notify失败succ:' . $db->error);
|
||
}
|
||
} else {
|
||
$updata = array();
|
||
$retimes = $orderrow['retimes'];
|
||
$time = time();
|
||
if ($retimes == 0) {
|
||
$updata['retimes'] = $time;
|
||
$updata['nexttimes'] = $time + 10;
|
||
} else {
|
||
$time = $time + ($time - $retimes) + 10;
|
||
if ($time - $retimes < 136400)
|
||
$updata['nexttimes'] = $time + ($time - $retimes) + 10;
|
||
else
|
||
$updata['nexttimes'] = 9999999999997;
|
||
}
|
||
$csql = new \ciy\sql('ap_transfer');
|
||
$csql->where('id', $id);
|
||
if ($db->update($csql, $updata) === false) {
|
||
if ($debug) clog('操作notify失败fail:' . $db->error);
|
||
savelogfile('err_db', '操作notify失败fail:' . $db->error);
|
||
}
|
||
}
|
||
exit(1);
|
||
} |