0) { //echo "pcntl_wexitstatus: $status\n"; //$exitCode = pcntl_wexitstatus($status); //echo "子进程退出,状态码: $exitCode\n"; $currentProcesses--; if ($currentProcesses < 0) $currentProcesses = 0; } }); function outlog($msg) { savelogfile('fork_order', $msg); } function mainProcess() { global $shouldStop, $currentProcesses, $logpath; $db = new \ciy\db(); $maxProcesses = 30; //1GB=15-20个进程 $batchSize = 20; // 每次批量获取的记录数 $spawnInterval = 100; // 子进程启动间隔(毫秒) // 安装SIGCHLD信号处理器,避免僵尸进程 outlog('主进程启动,PID: ' . getmypid()); $isbusy = false; while (true) { pcntl_signal_dispatch(); if ($shouldStop) { $time = time(); while ($currentProcesses > 0 && $time > time() - 60) { pcntl_signal_dispatch(); sleep(1); } outlog('已关闭:' . $currentProcesses); exit(0); } if ($currentProcesses >= $maxProcesses) { //echo "达到最大子进程数限制 ($maxProcesses),等待子进程退出...\n"; sleep(1); continue; } file_put_contents($logpath . 'fork_order.tak', 'curr=' . $currentProcesses . ',max=' . $maxProcesses . ',batch=' . $batchSize . ',spawn=' . $spawnInterval . ',time=' . time()); $csql = new \ciy\sql('ap_transfer'); $csql->where('orderstatus=20'); $csql->limit(1, $batchSize); $orderrows = $db->get($csql); if ($orderrows === false || count($orderrows) == 0) { //echo "没有待处理任务,等待3秒...\n"; sleep(3); continue; } //outlog('发现待处理任务: ' . count($orderrows) . '个'); foreach ($orderrows as $orderrow) { if ($currentProcesses >= $maxProcesses) { if (!$isbusy) outlog('任务拥堵:' . $currentProcesses . '/' . $maxProcesses); $isbusy = true; sleep(3); break; } if ($currentProcesses < $maxProcesses / 2) { if ($isbusy) outlog('拥堵暂缓:' . $currentProcesses . '/' . $maxProcesses); $isbusy = false; } $pid = pcntl_fork(); if ($pid == -1) { outlog('创建子进程失败:' . pcntl_strerror(posix_get_last_error())); } elseif ($pid) { //echo "创建子进程成功,PID: $pid, 处理任务 ID: $id\n"; $currentProcesses++; usleep($spawnInterval * 1000); } else { //echo "子进程开始执行任务,PID: " . getmypid() . ", 任务 ID: $id\n"; fork_order($orderrow); exit; } } usleep(500 * 1000); } } if (!extension_loaded('pcntl')) { die("PCNTL扩展未加载,无法使用多进程功能\n"); } mainProcess(); function fork_order($orderrow, $debug = false) { $db = new \ciy\db(); $id = $orderrow['id']; $updata = array(); $updata['orderstatus'] = 30; $csql = new \ciy\sql('ap_transfer'); $csql->where('id', $id); if ($db->update($csql, $updata) === false) { if ($debug) clog('操作status=30:' . $db->error); savelogfile('err_db', '操作status=30:' . $db->error); exit(1); } //验签一次,验签失败,则90错误 $fail = null; if (!empty($orderrow['signature'])) { $csql = new \ciy\sql('ap_api'); $csql->where('id', $orderrow['apiid']); $apirow = $db->getone($csql); if (!is_array($apirow)) $fail = array('errmsg' => 'API未找到', 'status' => 90); else { if (empty($apirow['pubkey'])) { $fail = array('errmsg' => '数字证书无公钥', 'status' => 90); } else { $waitsignstr = 'amount=' . $orderrow['amount'] . ',addtimes=' . $orderrow['addtimes']; $retsign = verifysign_api($apirow['pubkey'], $orderrow['signature'], $waitsignstr); if (is_string($retsign)) $fail = array('errmsg' => $retsign, 'status' => 90); } } } //$fail = array('errmsg' => $ret, 'status' => 90); //处理具体业务,失败赋值 if ($fail) { try { $db->begin(); $updata = array(); $updata['uptimes'] = tostamp(); $updata['orderstatus'] = $fail['status']; if ($fail['status'] == 90) $updata['nexttimes'] = 0; $csql = new \ciy\sql('ap_transfer'); $csql->where('id', $id); if ($db->update($csql, $updata) === false) throw new \Exception('操作order fail失败:' . $db->error); $db->commit(); } catch (\Exception $ex) { $db->rollback(); savelogfile('err_db', $ex->getMessage()); } exit(4); }else{ try { $db->begin(); $updata = array(); $updata['uptimes'] = tostamp(); $updata['orderstatus'] = 100; $updata['nexttimes'] = 0; $csql = new \ciy\sql('ap_transfer'); $csql->where('id', $id); if ($db->update($csql, $updata) === false) throw new \Exception('操作order succ失败:' . $db->error); $db->commit(); } catch (\Exception $ex) { $db->rollback(); savelogfile('err_db', $ex->getMessage()); } exit(0); } } function verifysign_api($signKey, $signature, $waitsignstr) { if (strpos($signKey, '-----BEGIN RSA PUBLIC KEY-----') === false && strpos($signKey, '-----BEGIN PUBLIC KEY-----') === false) $signKey = "-----BEGIN PUBLIC KEY-----\n" . wordwrap($signKey, 64, "\n") . "\n-----END PUBLIC KEY-----"; $signbin = hex2bin($signature); if ($signbin === false) return errjson('签名格式错误'); $hashbin = hex2bin(hash('sha256', $waitsignstr)); $result = openssl_verify($hashbin, $signbin, $signKey, OPENSSL_ALGO_SHA256); if ($result === 0) return '数字证书验签失败'; else if ($result !== 1) return '数字证书验签错误:' . openssl_error_string(); return true; }