c5_labsci/fork/fork_apinotify.php
2026-01-27 00:52:00 +08:00

185 lines
6.5 KiB
PHP
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
/*
php /data/ciyon/fork/fork_apinotify.php
/etc/systemd/system/ciyapinotify.service
[Unit]
Description=ciy apinotify
After=network.target
[Service]
WorkingDirectory=/data/ciyon/fork/
ExecStart=/usr/bin/php /data/ciyon/fork/fork_apinotify.php
Restart=always
RestartSec=5s
[Install]
WantedBy=multi-user.target
systemctl enable ciyapinotify.service
systemctl start ciyapinotify.service
*/
$path = __DIR__;
$path = str_replace('\\', '/', $path);
$path = dirname($path);
chdir($path);
require $path . '/zciyphp/comm.php';
require $path . '/web/admin/common.php';
set_time_limit(0);
$shouldStop = false;
$currentProcesses = 0;
pcntl_signal(SIGTERM, function ($signo) use (&$shouldStop, &$currentProcesses) {
outlog('优雅关闭中:' . $currentProcesses);
$shouldStop = true;
});
pcntl_signal(SIGCHLD, function ($signo) use (&$currentProcesses) {
echo "pcntl_signal: $signo\n";
while (pcntl_waitpid(-1, $status, WNOHANG) > 0) {
//echo "pcntl_wexitstatus: $status\n";
//$exitCode = pcntl_wexitstatus($status);
//echo "子进程退出,状态码: $exitCode\n";
$currentProcesses--;
if ($currentProcesses < 0)
$currentProcesses = 0;
}
});
function outlog($msg) {
savelogfile('fork_apinotify', $msg);
}
function mainProcess() {
global $shouldStop, $currentProcesses, $logpath;
$db = new \ciy\db();
$maxProcesses = 30; //1GB=15-20个进程
$batchSize = 20; // 每次批量获取的记录数
$spawnInterval = 100; // 子进程启动间隔(毫秒)
// 安装SIGCHLD信号处理器避免僵尸进程
outlog('主进程启动PID: ' . getmypid());
$isbusy = false;
while (true) {
pcntl_signal_dispatch();
if ($shouldStop) {
$time = time();
while ($currentProcesses > 0 && $time > time() - 60) {
pcntl_signal_dispatch();
sleep(1);
}
outlog('已关闭:' . $currentProcesses);
exit(0);
}
if ($currentProcesses >= $maxProcesses) {
//echo "达到最大子进程数限制 ($maxProcesses),等待子进程退出...\n";
sleep(1);
continue;
}
file_put_contents($logpath . 'fork_apinotify.tak', 'curr=' . $currentProcesses . ',max=' . $maxProcesses . ',batch=' . $batchSize . ',spawn=' . $spawnInterval . ',time=' . time());
$csql = new \ciy\sql('ap_transfer');
//默认nexttimes=9999999999999有90/100通知后nexttimes=0等待推送。
//推成功nexttimes=9999999999998
//没成功首次retimes=nownexttimes=now+retimesspan+10s直至24小时后=9999999999997
//retimes>0判断是否有二次推送判断商户api质量
//nexttimes=9999999999997判断是否始终推送失败判断商户api接入能力
//nexttimes=9999999999996fork执行脚本非法跳出bug或强制关闭导致
$csql->where('nexttimes<', time());
$csql->limit(1, $batchSize);
$orderrows = $db->get($csql);
if ($orderrows === false || count($orderrows) == 0) {
//echo "没有待处理任务等待3秒...\n";
sleep(3);
continue;
}
//outlog('发现待处理任务: ' . count($notifyrows) . '个');
foreach ($orderrows as $orderrow) {
if (empty($orderrow['notifyurl']))
continue;
if ($currentProcesses >= $maxProcesses) {
if (!$isbusy)
outlog('任务拥堵:' . $currentProcesses . '/' . $maxProcesses);
$isbusy = true;
sleep(3);
break;
}
if ($currentProcesses < $maxProcesses / 2) {
if ($isbusy)
outlog('拥堵暂缓:' . $currentProcesses . '/' . $maxProcesses);
$isbusy = false;
}
$pid = pcntl_fork();
if ($pid == -1) {
outlog('创建子进程失败:' . pcntl_strerror(posix_get_last_error()));
} elseif ($pid) {
//echo "创建子进程成功PID: $pid, 处理任务 ID: $id\n";
$currentProcesses++;
usleep($spawnInterval * 1000);
} else {
//echo "子进程开始执行任务PID: " . getmypid() . ", 任务 ID: $id\n";
fork_apinotify($orderrow);
exit;
}
}
usleep(500 * 1000);
}
}
if (!extension_loaded('pcntl')) {
die("PCNTL扩展未加载无法使用多进程功能\n");
}
mainProcess();
function fork_apinotify($orderrow, $debug = false) {
$db = new \ciy\db();
$id = $orderrow['id'];
$updata = array();
$updata['nexttimes'] = 9999999999996;
$csql = new \ciy\sql('ap_transfer');
$csql->where('id', $id);
$db->update($csql, $updata);
//orderrow就是数据集再加上时间戳、sign、pairsign
$retapi = $orderrow;
unset($retapi['signature']);
$signstr = '';
ksort($retapi);
foreach ($retapi as $key => $value) {
$signstr .= $key . '=' . $value . '&';
}
$signstr = substr($signstr, 0, -1);
$sign = sha256($signstr);
$retapi['hash'] = $sign;
$http = new \ciy\http();
$http->set_timeout(10);
$http->request($orderrow['notifyurl'], json_encode($retapi));
$result = $http->get_data();
if (strpos($result, 'success') !== false) {
$updata = array();
$updata['nexttimes'] = 9999999999998;
$csql = new \ciy\sql('ap_transfer');
$csql->where('id', $id);
if ($db->update($csql, $updata) === false) {
if ($debug) clog('操作notify失败succ:' . $db->error);
savelogfile('err_db', '操作notify失败succ:' . $db->error);
}
} else {
$updata = array();
$retimes = $orderrow['retimes'];
$time = time();
if ($retimes == 0) {
$updata['retimes'] = $time;
$updata['nexttimes'] = $time + 10;
} else {
$time = $time + ($time - $retimes) + 10;
if ($time - $retimes < 136400)
$updata['nexttimes'] = $time + ($time - $retimes) + 10;
else
$updata['nexttimes'] = 9999999999997;
}
$csql = new \ciy\sql('ap_transfer');
$csql->where('id', $id);
if ($db->update($csql, $updata) === false) {
if ($debug) clog('操作notify失败fail:' . $db->error);
savelogfile('err_db', '操作notify失败fail:' . $db->error);
}
}
exit(1);
}