207 lines
7.2 KiB
PHP
207 lines
7.2 KiB
PHP
<?php
|
||
/*
|
||
php /data/ciyon/fork/fork_order.php
|
||
|
||
/etc/systemd/system/ciyorder.service
|
||
[Unit]
|
||
Description=ciy order
|
||
After=network.target
|
||
[Service]
|
||
WorkingDirectory=/data/ciyon/fork/
|
||
ExecStart=/usr/bin/php /data/ciyon/fork/fork_order.php
|
||
Restart=always
|
||
RestartSec=5s
|
||
[Install]
|
||
WantedBy=multi-user.target
|
||
|
||
systemctl enable ciyorder.service
|
||
systemctl start ciyorder.service
|
||
|
||
*/
|
||
$path = __DIR__;
|
||
$path = str_replace('\\', '/', $path);
|
||
$path = dirname($path);
|
||
chdir($path);
|
||
require $path . '/zciyphp/comm.php';
|
||
require $path . '/web/admin/common.php';
|
||
set_time_limit(0);
|
||
$shouldStop = false;
|
||
$currentProcesses = 0;
|
||
|
||
pcntl_signal(SIGTERM, function ($signo) use (&$shouldStop, &$currentProcesses) {
|
||
outlog('优雅关闭中:' . $currentProcesses);
|
||
$shouldStop = true;
|
||
});
|
||
pcntl_signal(SIGCHLD, function ($signo) use (&$currentProcesses) {
|
||
echo "pcntl_signal: $signo\n";
|
||
while (pcntl_waitpid(-1, $status, WNOHANG) > 0) {
|
||
//echo "pcntl_wexitstatus: $status\n";
|
||
//$exitCode = pcntl_wexitstatus($status);
|
||
//echo "子进程退出,状态码: $exitCode\n";
|
||
$currentProcesses--;
|
||
if ($currentProcesses < 0)
|
||
$currentProcesses = 0;
|
||
}
|
||
});
|
||
function outlog($msg) {
|
||
savelogfile('fork_order', $msg);
|
||
}
|
||
function mainProcess() {
|
||
global $shouldStop, $currentProcesses, $logpath;
|
||
$db = new \ciy\db();
|
||
$maxProcesses = 30; //1GB=15-20个进程
|
||
$batchSize = 20; // 每次批量获取的记录数
|
||
$spawnInterval = 100; // 子进程启动间隔(毫秒)
|
||
// 安装SIGCHLD信号处理器,避免僵尸进程
|
||
|
||
outlog('主进程启动,PID: ' . getmypid());
|
||
$isbusy = false;
|
||
while (true) {
|
||
pcntl_signal_dispatch();
|
||
if ($shouldStop) {
|
||
$time = time();
|
||
while ($currentProcesses > 0 && $time > time() - 60) {
|
||
pcntl_signal_dispatch();
|
||
sleep(1);
|
||
}
|
||
outlog('已关闭:' . $currentProcesses);
|
||
exit(0);
|
||
}
|
||
|
||
if ($currentProcesses >= $maxProcesses) {
|
||
//echo "达到最大子进程数限制 ($maxProcesses),等待子进程退出...\n";
|
||
sleep(1);
|
||
continue;
|
||
}
|
||
file_put_contents($logpath . 'fork_order.tak', 'curr=' . $currentProcesses . ',max=' . $maxProcesses . ',batch=' . $batchSize . ',spawn=' . $spawnInterval . ',time=' . time());
|
||
|
||
$csql = new \ciy\sql('ap_transfer');
|
||
$csql->where('orderstatus=20');
|
||
$csql->limit(1, $batchSize);
|
||
$orderrows = $db->get($csql);
|
||
if ($orderrows === false || count($orderrows) == 0) {
|
||
//echo "没有待处理任务,等待3秒...\n";
|
||
sleep(3);
|
||
continue;
|
||
}
|
||
//outlog('发现待处理任务: ' . count($orderrows) . '个');
|
||
foreach ($orderrows as $orderrow) {
|
||
if ($currentProcesses >= $maxProcesses) {
|
||
if (!$isbusy)
|
||
outlog('任务拥堵:' . $currentProcesses . '/' . $maxProcesses);
|
||
$isbusy = true;
|
||
sleep(3);
|
||
break;
|
||
}
|
||
if ($currentProcesses < $maxProcesses / 2) {
|
||
if ($isbusy)
|
||
outlog('拥堵暂缓:' . $currentProcesses . '/' . $maxProcesses);
|
||
$isbusy = false;
|
||
}
|
||
$pid = pcntl_fork();
|
||
if ($pid == -1) {
|
||
outlog('创建子进程失败:' . pcntl_strerror(posix_get_last_error()));
|
||
} elseif ($pid) {
|
||
//echo "创建子进程成功,PID: $pid, 处理任务 ID: $id\n";
|
||
$currentProcesses++;
|
||
usleep($spawnInterval * 1000);
|
||
} else {
|
||
//echo "子进程开始执行任务,PID: " . getmypid() . ", 任务 ID: $id\n";
|
||
fork_order($orderrow);
|
||
exit;
|
||
}
|
||
}
|
||
usleep(500 * 1000);
|
||
}
|
||
}
|
||
if (!extension_loaded('pcntl')) {
|
||
die("PCNTL扩展未加载,无法使用多进程功能\n");
|
||
}
|
||
mainProcess();
|
||
|
||
function fork_order($orderrow, $debug = false) {
|
||
$db = new \ciy\db();
|
||
$id = $orderrow['id'];
|
||
$updata = array();
|
||
$updata['orderstatus'] = 30;
|
||
$csql = new \ciy\sql('ap_transfer');
|
||
$csql->where('id', $id);
|
||
if ($db->update($csql, $updata) === false) {
|
||
if ($debug) clog('操作status=30:' . $db->error);
|
||
savelogfile('err_db', '操作status=30:' . $db->error);
|
||
exit(1);
|
||
}
|
||
//验签一次,验签失败,则90错误
|
||
$fail = null;
|
||
if (!empty($orderrow['signature'])) {
|
||
$csql = new \ciy\sql('ap_api');
|
||
$csql->where('id', $orderrow['apiid']);
|
||
$apirow = $db->getone($csql);
|
||
if (!is_array($apirow))
|
||
$fail = array('errmsg' => 'API未找到', 'status' => 90);
|
||
else {
|
||
if (empty($apirow['pubkey'])) {
|
||
$fail = array('errmsg' => '数字证书无公钥', 'status' => 90);
|
||
} else {
|
||
$waitsignstr = 'amount=' . $orderrow['amount'] . ',addtimes=' . $orderrow['addtimes'];
|
||
$retsign = verifysign_api($apirow['pubkey'], $orderrow['signature'], $waitsignstr);
|
||
if (is_string($retsign))
|
||
$fail = array('errmsg' => $retsign, 'status' => 90);
|
||
}
|
||
}
|
||
}
|
||
//$fail = array('errmsg' => $ret, 'status' => 90); //处理具体业务,失败赋值
|
||
|
||
if ($fail) {
|
||
try {
|
||
$db->begin();
|
||
$updata = array();
|
||
$updata['uptimes'] = tostamp();
|
||
$updata['orderstatus'] = $fail['status'];
|
||
if ($fail['status'] == 90)
|
||
$updata['nexttimes'] = 0;
|
||
$csql = new \ciy\sql('ap_transfer');
|
||
$csql->where('id', $id);
|
||
if ($db->update($csql, $updata) === false)
|
||
throw new \Exception('操作order fail失败:' . $db->error);
|
||
$db->commit();
|
||
} catch (\Exception $ex) {
|
||
$db->rollback();
|
||
savelogfile('err_db', $ex->getMessage());
|
||
}
|
||
exit(4);
|
||
}else{
|
||
try {
|
||
$db->begin();
|
||
$updata = array();
|
||
$updata['uptimes'] = tostamp();
|
||
$updata['orderstatus'] = 100;
|
||
$updata['nexttimes'] = 0;
|
||
$csql = new \ciy\sql('ap_transfer');
|
||
$csql->where('id', $id);
|
||
if ($db->update($csql, $updata) === false)
|
||
throw new \Exception('操作order succ失败:' . $db->error);
|
||
$db->commit();
|
||
} catch (\Exception $ex) {
|
||
$db->rollback();
|
||
savelogfile('err_db', $ex->getMessage());
|
||
}
|
||
exit(0);
|
||
}
|
||
}
|
||
|
||
function verifysign_api($signKey, $signature, $waitsignstr) {
|
||
if (strpos($signKey, '-----BEGIN RSA PUBLIC KEY-----') === false && strpos($signKey, '-----BEGIN PUBLIC KEY-----') === false)
|
||
$signKey = "-----BEGIN PUBLIC KEY-----\n" . wordwrap($signKey, 64, "\n") . "\n-----END PUBLIC KEY-----";
|
||
$signbin = hex2bin($signature);
|
||
if ($signbin === false)
|
||
return errjson('签名格式错误');
|
||
$hashbin = hex2bin(hash('sha256', $waitsignstr));
|
||
$result = openssl_verify($hashbin, $signbin, $signKey, OPENSSL_ALGO_SHA256);
|
||
if ($result === 0)
|
||
return '数字证书验签失败';
|
||
else if ($result !== 1)
|
||
return '数字证书验签错误:' . openssl_error_string();
|
||
return true;
|
||
}
|