370 lines
12 KiB
PHP
370 lines
12 KiB
PHP
<?php
|
||
/* =================================================================================
|
||
* License: GPL-2.0 license
|
||
* Author: 众产® https://ciy.cn/code
|
||
* Version: 0.1.3
|
||
* =================================================================================*/
|
||
|
||
/**
|
||
* getone 获取一条数据。 无数据返回null 出错返回false 判断is_array,确认数据有效
|
||
* get 获取数据集合。 无数据返回array() 出错返回false
|
||
* insert 新增数据。 成功返回影响行数 出错返回false
|
||
* update 更新数据。 成功返回影响行数 出错返回false
|
||
* delete 删除及备份数据。 成功返回影响行数 出错返回false
|
||
* begin 开始事务 成功返回true 出错返回false
|
||
* commit 事务提交 成功返回true 出错返回false
|
||
* rollback 事务回滚 成功返回true 出错返回false
|
||
* tran 事务回调 成功返回true 出错返回false
|
||
* insert_id 获取刚新增数据的id
|
||
*
|
||
*
|
||
$mongo = new \ciy\dbmongo();
|
||
$cmd = new \stdClass();
|
||
$cmd->collection = 'movies';
|
||
$cmd->pageno = 1;
|
||
|
||
$count = -1;
|
||
$rows = $mongo->get($cmd, $count);
|
||
if ($rows === false)
|
||
return clog($mongo->error);
|
||
clog($count);
|
||
foreach ($rows as $row) {
|
||
clog($row);
|
||
}
|
||
return;
|
||
*
|
||
*/
|
||
|
||
namespace ciy;
|
||
|
||
class dbmongo {
|
||
private $dbpst;
|
||
private $linkdb;
|
||
private $session;
|
||
private $database;
|
||
private $lastinsertid;
|
||
private $isTransaction;
|
||
public $error;
|
||
function __construct($dbpst = '') {
|
||
$this->linkdb = false;
|
||
$this->dbpst = $dbpst;
|
||
$this->isTransaction = false;
|
||
$this->error = '';
|
||
}
|
||
|
||
private function errdata($errmsg, $ret = false) {
|
||
$this->error = $errmsg;
|
||
return $ret;
|
||
}
|
||
|
||
private function connect() {
|
||
if ($this->linkdb !== false)
|
||
return $this->linkdb;
|
||
$cfg = webini('mongodb' . $this->dbpst);
|
||
if (is_string($cfg))
|
||
return $cfg;
|
||
|
||
try {
|
||
$uriOptions = [];
|
||
$driverOptions = [];
|
||
$this->linkdb = new \MongoDB\Driver\Manager(
|
||
$cfg['uri'],
|
||
$uriOptions,
|
||
$driverOptions
|
||
);
|
||
|
||
$pingCommand = new \MongoDB\Driver\Command(['ping' => 1]);
|
||
$this->linkdb->executeCommand('admin', $pingCommand);
|
||
$this->database = $cfg['name'];
|
||
return $this->linkdb;
|
||
} catch (\Exception $e) {
|
||
return 'MongoDB connection failed: ' . $e->getMessage();
|
||
}
|
||
}
|
||
private function getTransactionOptions($defaultOptions = null): array {
|
||
if ($this->isTransaction && $this->session) {
|
||
return ['session' => $this->session];
|
||
}
|
||
|
||
return $defaultOptions ? ['writeConcern' => $defaultOptions] : [];
|
||
}
|
||
|
||
/**
|
||
* 获取查询的第一条数据
|
||
*
|
||
* @param \stdClass $params 查询参数
|
||
* - collection: string 集合名
|
||
* - filter: array 查询条件 (可选)
|
||
* - options: array 查询选项 (可选)
|
||
* @return array|false 查询结果或false表示失败
|
||
*/
|
||
public function getone(\stdClass $params) {
|
||
try {
|
||
$link = $this->connect();
|
||
if (is_string($link))
|
||
return $link;
|
||
$params->options = $params->options ?? [];
|
||
$params->options['limit'] = 1;
|
||
$params->filter = $params->filter ?? [];
|
||
$query = new \MongoDB\Driver\Query($params->filter, $params->options);
|
||
$cursor = $link->executeQuery(
|
||
"{$this->database}.{$params->collection}",
|
||
$query,
|
||
$this->getTransactionOptions()
|
||
);
|
||
foreach ($cursor as $document) {
|
||
return $document;
|
||
}
|
||
return null;
|
||
} catch (\Exception $e) {
|
||
return $this->errdata('Query getone failed: ' . $e->getMessage());
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 获取查询的多条数据(带分页)
|
||
*
|
||
* @param \stdClass $params 查询参数
|
||
* - collection: string 集合名
|
||
* - filter: array 查询条件 (可选)
|
||
* - options: array 查询选项 (可选)
|
||
* - pageno: int 页码 (默认1)
|
||
* - pagecount: int 每页数量 (默认10)
|
||
* @return \stdClass|false 包含数据和总数量的对象或false表示失败
|
||
*/
|
||
public function get(\stdClass $params, &$rowcount = -1) {
|
||
try {
|
||
$link = $this->connect();
|
||
if (is_string($link))
|
||
return $link;
|
||
$params->filter = $params->filter ?? new \stdClass();
|
||
$params->options = $params->options ?? [];
|
||
$params->pageno = $params->pageno ?? 1;
|
||
$params->pagecount = $params->pagecount ?? 10;
|
||
|
||
if ($rowcount == -1) {
|
||
$countCommand = new \MongoDB\Driver\Command([
|
||
'count' => $params->collection,
|
||
'query' => $params->filter
|
||
]);
|
||
$countResult = $link->executeCommand($this->database, $countCommand, $this->getTransactionOptions());
|
||
$rowcount = $countResult->toArray()[0]->n ?? 0;
|
||
}
|
||
|
||
$params->options['skip'] = ($params->pageno - 1) * $params->pagecount;
|
||
$params->options['limit'] = $params->pagecount;
|
||
|
||
// 执行查询
|
||
$query = new \MongoDB\Driver\Query($params->filter, $params->options);
|
||
$cursor = $link->executeQuery(
|
||
"{$this->database}.{$params->collection}",
|
||
$query,
|
||
$this->getTransactionOptions()
|
||
);
|
||
return $cursor;
|
||
} catch (\Exception $e) {
|
||
return $this->errdata('Query get failed: ' . $e->getMessage());
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 插入数据
|
||
*
|
||
* @param \stdClass $params 插入参数
|
||
* - collection: string 集合名
|
||
* - document: array 要插入的文档
|
||
* - options: array 插入选项 (可选)
|
||
* @return int|false 插入数量或false表示失败
|
||
*/
|
||
public function insert(\stdClass $params) {
|
||
if (empty($params->document) || !is_array($params->document)) {
|
||
return $this->errdata('Document must be provided and must be an array');
|
||
}
|
||
|
||
try {
|
||
$link = $this->connect();
|
||
if (is_string($link))
|
||
return $link;
|
||
$bulk = new \MongoDB\Driver\BulkWrite();
|
||
$this->lastinsertid = $bulk->insert($params->document);
|
||
|
||
$writeConcern = new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000);
|
||
$writeResult = $link->executeBulkWrite("{$this->database}.{$params->collection}", $bulk, $this->getTransactionOptions($writeConcern));
|
||
|
||
if ($writeResult === false) {
|
||
return $this->errdata('Write Error');
|
||
}
|
||
|
||
return $writeResult->getInsertedCount();
|
||
} catch (\Exception $e) {
|
||
return $this->errdata('Insert operation failed: ' . $e->getMessage());
|
||
}
|
||
}
|
||
|
||
public function insert_id() {
|
||
return $this->lastinsertid;
|
||
}
|
||
|
||
/**
|
||
* 更新数据
|
||
*
|
||
* @param \stdClass $params 更新参数
|
||
* - collection: string 集合名
|
||
* - filter: array 查询条件
|
||
* - update: array 更新操作
|
||
* - options: array 更新选项 (可选)
|
||
* @return int|false 更新的文档数量或false表示失败
|
||
*/
|
||
public function update(\stdClass $params) {
|
||
if (empty($params->filter) || !is_array($params->filter)) {
|
||
return $this->errdata('Filter must be provided and must be an array');
|
||
}
|
||
|
||
if (empty($params->update) || !is_array($params->update)) {
|
||
return $this->errdata('Update must be provided and must be an array');
|
||
}
|
||
|
||
try {
|
||
$link = $this->connect();
|
||
if (is_string($link))
|
||
return $link;
|
||
$bulk = new \MongoDB\Driver\BulkWrite();
|
||
|
||
$params->options = $params->options ?? [];
|
||
if (!isset($params->options['multi'])) {
|
||
$params->options['multi'] = false;
|
||
}
|
||
$bulk->update($params->filter, $params->update, $params->options);
|
||
$writeConcern = new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000);
|
||
$writeResult = $link->executeBulkWrite("{$this->database}.{$params->collection}", $bulk, $this->getTransactionOptions($writeConcern));
|
||
|
||
if ($writeResult === false) {
|
||
return $this->errdata('Write Error');
|
||
}
|
||
return $writeResult->getModifiedCount();
|
||
} catch (\Exception $e) {
|
||
return $this->errdata('Update operation failed: ' . $e->getMessage());
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 删除数据
|
||
*
|
||
* @param \stdClass $params 删除参数
|
||
* - collection: string 集合名
|
||
* - filter: array 查询条件
|
||
* - options: array 删除选项 (可选)
|
||
* @return int|false 删除的文档数量或false表示失败
|
||
*/
|
||
public function delete(\stdClass $params) {
|
||
if (empty($params->filter) || !is_array($params->filter)) {
|
||
return $this->errdata('Filter must be provided and must be an array');
|
||
}
|
||
|
||
try {
|
||
$link = $this->connect();
|
||
if (is_string($link))
|
||
return $link;
|
||
$bulk = new \MongoDB\Driver\BulkWrite();
|
||
|
||
$params->options = $params->options ?? [];
|
||
if (!isset($params->options['limit'])) {
|
||
$params->options['limit'] = false;
|
||
}
|
||
|
||
$bulk->delete($params->filter, $params->options);
|
||
|
||
$writeConcern = new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000);
|
||
$writeResult = $link->executeBulkWrite("{$this->database}.{$params->collection}", $bulk, $this->getTransactionOptions($writeConcern));
|
||
|
||
if ($writeResult === false) {
|
||
return $this->errdata('Write Error');
|
||
}
|
||
return $writeResult->getDeletedCount();
|
||
} catch (\Exception $e) {
|
||
return $this->errdata('Delete operation failed: ' . $e->getMessage());
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 开始事务
|
||
* @return bool 是否成功
|
||
*/
|
||
public function begin() {
|
||
if ($this->isTransaction) {
|
||
return $this->errdata('Transaction already started');
|
||
}
|
||
|
||
try {
|
||
$link = $this->connect();
|
||
if (is_string($link))
|
||
return $link;
|
||
$this->session = $link->startSession();
|
||
$this->session->startTransaction();
|
||
$this->isTransaction = true;
|
||
return true;
|
||
} catch (\Exception $e) {
|
||
return $this->errdata('Failed to start transaction: ' . $e->getMessage());
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 提交事务
|
||
* @return bool 是否成功
|
||
*/
|
||
public function commit() {
|
||
if (!$this->isTransaction) {
|
||
return $this->errdata('No active transaction to commit');
|
||
}
|
||
|
||
try {
|
||
$this->session->commitTransaction();
|
||
$this->session->endSession();
|
||
$this->isTransaction = false;
|
||
return true;
|
||
} catch (\Exception $e) {
|
||
$this->isTransaction = false;
|
||
return $this->errdata('Failed to commit transaction: ' . $e->getMessage());
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 回滚事务
|
||
* @return bool 是否成功
|
||
*/
|
||
public function rollback() {
|
||
if (!$this->isTransaction) {
|
||
return $this->errdata('No active transaction to rollback');
|
||
}
|
||
|
||
try {
|
||
$this->session->abortTransaction();
|
||
$this->session->endSession();
|
||
$this->isTransaction = false;
|
||
return true;
|
||
} catch (\Exception $e) {
|
||
$this->isTransaction = false;
|
||
return $this->errdata('Failed to rollback transaction: ' . $e->getMessage());
|
||
}
|
||
}
|
||
public function tran($func) {
|
||
if (!($func instanceof \Closure)) {
|
||
$this->error = '没有传递正确的闭包函数';
|
||
return false;
|
||
}
|
||
$this->begin();
|
||
$ret = false;
|
||
try {
|
||
$ret = $func();
|
||
} catch (\Exception $ex) {
|
||
$ret = false;
|
||
$this->error = $ex->getMessage();
|
||
}
|
||
if ($ret === false)
|
||
$this->rollback();
|
||
else
|
||
$this->commit();
|
||
return $ret;
|
||
}
|
||
}
|