c5_labsci/fork/fork_order.php

207 lines
7.2 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
/*
php /data/ciyon/fork/fork_order.php
/etc/systemd/system/ciyorder.service
[Unit]
Description=ciy order
After=network.target
[Service]
WorkingDirectory=/data/ciyon/fork/
ExecStart=/usr/bin/php /data/ciyon/fork/fork_order.php
Restart=always
RestartSec=5s
[Install]
WantedBy=multi-user.target
systemctl enable ciyorder.service
systemctl start ciyorder.service
*/
$path = __DIR__;
$path = str_replace('\\', '/', $path);
$path = dirname($path);
chdir($path);
require $path . '/zciyphp/comm.php';
require $path . '/web/admin/common.php';
set_time_limit(0);
$shouldStop = false;
$currentProcesses = 0;
pcntl_signal(SIGTERM, function ($signo) use (&$shouldStop, &$currentProcesses) {
outlog('优雅关闭中:' . $currentProcesses);
$shouldStop = true;
});
pcntl_signal(SIGCHLD, function ($signo) use (&$currentProcesses) {
echo "pcntl_signal: $signo\n";
while (pcntl_waitpid(-1, $status, WNOHANG) > 0) {
//echo "pcntl_wexitstatus: $status\n";
//$exitCode = pcntl_wexitstatus($status);
//echo "子进程退出,状态码: $exitCode\n";
$currentProcesses--;
if ($currentProcesses < 0)
$currentProcesses = 0;
}
});
function outlog($msg) {
savelogfile('fork_order', $msg);
}
function mainProcess() {
global $shouldStop, $currentProcesses, $logpath;
$db = new \ciy\db();
$maxProcesses = 30; //1GB=15-20个进程
$batchSize = 20; // 每次批量获取的记录数
$spawnInterval = 100; // 子进程启动间隔(毫秒)
// 安装SIGCHLD信号处理器避免僵尸进程
outlog('主进程启动PID: ' . getmypid());
$isbusy = false;
while (true) {
pcntl_signal_dispatch();
if ($shouldStop) {
$time = time();
while ($currentProcesses > 0 && $time > time() - 60) {
pcntl_signal_dispatch();
sleep(1);
}
outlog('已关闭:' . $currentProcesses);
exit(0);
}
if ($currentProcesses >= $maxProcesses) {
//echo "达到最大子进程数限制 ($maxProcesses),等待子进程退出...\n";
sleep(1);
continue;
}
file_put_contents($logpath . 'fork_order.tak', 'curr=' . $currentProcesses . ',max=' . $maxProcesses . ',batch=' . $batchSize . ',spawn=' . $spawnInterval . ',time=' . time());
$csql = new \ciy\sql('ap_transfer');
$csql->where('orderstatus=20');
$csql->limit(1, $batchSize);
$orderrows = $db->get($csql);
if ($orderrows === false || count($orderrows) == 0) {
//echo "没有待处理任务等待3秒...\n";
sleep(3);
continue;
}
//outlog('发现待处理任务: ' . count($orderrows) . '个');
foreach ($orderrows as $orderrow) {
if ($currentProcesses >= $maxProcesses) {
if (!$isbusy)
outlog('任务拥堵:' . $currentProcesses . '/' . $maxProcesses);
$isbusy = true;
sleep(3);
break;
}
if ($currentProcesses < $maxProcesses / 2) {
if ($isbusy)
outlog('拥堵暂缓:' . $currentProcesses . '/' . $maxProcesses);
$isbusy = false;
}
$pid = pcntl_fork();
if ($pid == -1) {
outlog('创建子进程失败:' . pcntl_strerror(posix_get_last_error()));
} elseif ($pid) {
//echo "创建子进程成功PID: $pid, 处理任务 ID: $id\n";
$currentProcesses++;
usleep($spawnInterval * 1000);
} else {
//echo "子进程开始执行任务PID: " . getmypid() . ", 任务 ID: $id\n";
fork_order($orderrow);
exit;
}
}
usleep(500 * 1000);
}
}
if (!extension_loaded('pcntl')) {
die("PCNTL扩展未加载无法使用多进程功能\n");
}
mainProcess();
function fork_order($orderrow, $debug = false) {
$db = new \ciy\db();
$id = $orderrow['id'];
$updata = array();
$updata['orderstatus'] = 30;
$csql = new \ciy\sql('ap_transfer');
$csql->where('id', $id);
if ($db->update($csql, $updata) === false) {
if ($debug) clog('操作status=30:' . $db->error);
savelogfile('err_db', '操作status=30:' . $db->error);
exit(1);
}
//验签一次验签失败则90错误
$fail = null;
if (!empty($orderrow['signature'])) {
$csql = new \ciy\sql('ap_api');
$csql->where('id', $orderrow['apiid']);
$apirow = $db->getone($csql);
if (!is_array($apirow))
$fail = array('errmsg' => 'API未找到', 'status' => 90);
else {
if (empty($apirow['pubkey'])) {
$fail = array('errmsg' => '数字证书无公钥', 'status' => 90);
} else {
$waitsignstr = 'amount=' . $orderrow['amount'] . ',addtimes=' . $orderrow['addtimes'];
$retsign = verifysign_api($apirow['pubkey'], $orderrow['signature'], $waitsignstr);
if (is_string($retsign))
$fail = array('errmsg' => $retsign, 'status' => 90);
}
}
}
//$fail = array('errmsg' => $ret, 'status' => 90); //处理具体业务,失败赋值
if ($fail) {
try {
$db->begin();
$updata = array();
$updata['uptimes'] = tostamp();
$updata['orderstatus'] = $fail['status'];
if ($fail['status'] == 90)
$updata['nexttimes'] = 0;
$csql = new \ciy\sql('ap_transfer');
$csql->where('id', $id);
if ($db->update($csql, $updata) === false)
throw new \Exception('操作order fail失败:' . $db->error);
$db->commit();
} catch (\Exception $ex) {
$db->rollback();
savelogfile('err_db', $ex->getMessage());
}
exit(4);
}else{
try {
$db->begin();
$updata = array();
$updata['uptimes'] = tostamp();
$updata['orderstatus'] = 100;
$updata['nexttimes'] = 0;
$csql = new \ciy\sql('ap_transfer');
$csql->where('id', $id);
if ($db->update($csql, $updata) === false)
throw new \Exception('操作order succ失败:' . $db->error);
$db->commit();
} catch (\Exception $ex) {
$db->rollback();
savelogfile('err_db', $ex->getMessage());
}
exit(0);
}
}
function verifysign_api($signKey, $signature, $waitsignstr) {
if (strpos($signKey, '-----BEGIN RSA PUBLIC KEY-----') === false && strpos($signKey, '-----BEGIN PUBLIC KEY-----') === false)
$signKey = "-----BEGIN PUBLIC KEY-----\n" . wordwrap($signKey, 64, "\n") . "\n-----END PUBLIC KEY-----";
$signbin = hex2bin($signature);
if ($signbin === false)
return errjson('签名格式错误');
$hashbin = hex2bin(hash('sha256', $waitsignstr));
$result = openssl_verify($hashbin, $signbin, $signKey, OPENSSL_ALGO_SHA256);
if ($result === 0)
return '数字证书验签失败';
else if ($result !== 1)
return '数字证书验签错误:' . openssl_error_string();
return true;
}