0) { //echo "pcntl_wexitstatus: $status\n"; //$exitCode = pcntl_wexitstatus($status); //echo "子进程退出,状态码: $exitCode\n"; $currentProcesses--; if ($currentProcesses < 0) $currentProcesses = 0; } }); function outlog($msg) { savelogfile('fork_apinotify', $msg); } function mainProcess() { global $shouldStop, $currentProcesses, $logpath; $db = new \ciy\db(); $maxProcesses = 30; //1GB=15-20个进程 $batchSize = 20; // 每次批量获取的记录数 $spawnInterval = 100; // 子进程启动间隔(毫秒) // 安装SIGCHLD信号处理器,避免僵尸进程 outlog('主进程启动,PID: ' . getmypid()); $isbusy = false; while (true) { pcntl_signal_dispatch(); if ($shouldStop) { $time = time(); while ($currentProcesses > 0 && $time > time() - 60) { pcntl_signal_dispatch(); sleep(1); } outlog('已关闭:' . $currentProcesses); exit(0); } if ($currentProcesses >= $maxProcesses) { //echo "达到最大子进程数限制 ($maxProcesses),等待子进程退出...\n"; sleep(1); continue; } file_put_contents($logpath . 'fork_apinotify.tak', 'curr=' . $currentProcesses . ',max=' . $maxProcesses . ',batch=' . $batchSize . ',spawn=' . $spawnInterval . ',time=' . time()); $csql = new \ciy\sql('ap_transfer'); //默认nexttimes=9999999999999,有90/100通知后,nexttimes=0,等待推送。 //推成功,nexttimes=9999999999998 //没成功,首次retimes=now,nexttimes=now+retimesspan+10s,直至24小时后=9999999999997 //retimes>0判断是否有二次推送,判断商户api质量 //nexttimes=9999999999997,判断是否始终推送失败,判断商户api接入能力 //nexttimes=9999999999996,fork执行脚本非法跳出bug,或强制关闭导致 $csql->where('nexttimes<', time()); $csql->limit(1, $batchSize); $orderrows = $db->get($csql); if ($orderrows === false || count($orderrows) == 0) { //echo "没有待处理任务,等待3秒...\n"; sleep(3); continue; } //outlog('发现待处理任务: ' . count($notifyrows) . '个'); foreach ($orderrows as $orderrow) { if (empty($orderrow['notifyurl'])) continue; if ($currentProcesses >= $maxProcesses) { if (!$isbusy) outlog('任务拥堵:' . $currentProcesses . '/' . $maxProcesses); $isbusy = true; sleep(3); break; } if ($currentProcesses < $maxProcesses / 2) { if ($isbusy) outlog('拥堵暂缓:' . $currentProcesses . '/' . $maxProcesses); $isbusy = false; } $pid = pcntl_fork(); if ($pid == -1) { outlog('创建子进程失败:' . pcntl_strerror(posix_get_last_error())); } elseif ($pid) { //echo "创建子进程成功,PID: $pid, 处理任务 ID: $id\n"; $currentProcesses++; usleep($spawnInterval * 1000); } else { //echo "子进程开始执行任务,PID: " . getmypid() . ", 任务 ID: $id\n"; fork_apinotify($orderrow); exit; } } usleep(500 * 1000); } } if (!extension_loaded('pcntl')) { die("PCNTL扩展未加载,无法使用多进程功能\n"); } mainProcess(); function fork_apinotify($orderrow, $debug = false) { $db = new \ciy\db(); $id = $orderrow['id']; $updata = array(); $updata['nexttimes'] = 9999999999996; $csql = new \ciy\sql('ap_transfer'); $csql->where('id', $id); $db->update($csql, $updata); //orderrow就是数据集,再加上时间戳、sign、pairsign $retapi = $orderrow; unset($retapi['signature']); $signstr = ''; ksort($retapi); foreach ($retapi as $key => $value) { $signstr .= $key . '=' . $value . '&'; } $signstr = substr($signstr, 0, -1); $sign = sha256($signstr); $retapi['hash'] = $sign; $http = new \ciy\http(); $http->set_timeout(10); $http->request($orderrow['notifyurl'], json_encode($retapi)); $result = $http->get_data(); if (strpos($result, 'success') !== false) { $updata = array(); $updata['nexttimes'] = 9999999999998; $csql = new \ciy\sql('ap_transfer'); $csql->where('id', $id); if ($db->update($csql, $updata) === false) { if ($debug) clog('操作notify失败succ:' . $db->error); savelogfile('err_db', '操作notify失败succ:' . $db->error); } } else { $updata = array(); $retimes = $orderrow['retimes']; $time = time(); if ($retimes == 0) { $updata['retimes'] = $time; $updata['nexttimes'] = $time + 10; } else { $time = $time + ($time - $retimes) + 10; if ($time - $retimes < 136400) $updata['nexttimes'] = $time + ($time - $retimes) + 10; else $updata['nexttimes'] = 9999999999997; } $csql = new \ciy\sql('ap_transfer'); $csql->where('id', $id); if ($db->update($csql, $updata) === false) { if ($debug) clog('操作notify失败fail:' . $db->error); savelogfile('err_db', '操作notify失败fail:' . $db->error); } } exit(1); }