c5_labsci/zciyon/mysql.go
2026-01-27 00:52:00 +08:00

712 lines
17 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
=================================================================================
* License: GPL-2.0 license
* Author: 众产® https://ciy.cn/code
* Version: 0.1.0
=================================================================================
单数据库连接池 NewCiyMysql
=================================================================================
db, err = c.NewCiyMysql("user:pass@tcp(localhost:3306)/dbname", 最大连接数, 空闲连接数, 保活秒) //main初始化
defer db.Close()
db.Err() //查看最后一条错误信息。 db.ClearError() 清空历史错误。 db.Error //查看原始错误数组。
db.Get(csql, &total) //获取多行数据
db.Getone(csql) //获取单行数据,返回[]map[string]any
db.Get1(csql) //获取单行数据返回any
db.GetField(csql) //获取列备注及排序
db.Getdbcodes(csql) //获取列备注字典
db.Execute(csql, any...) //执行带问号的SQL语句返回影响行数和最后插入ID
db.Update(csql, map) //更新数据,返回影响行数
db.Insert(csql, map) //新增数据返回最后插入ID
db.Delete(csql, backup) //删除数据,返回影响行数。支持副表备份、删除标记、直接删除。
db.Tran(fn) //批处理事务。自动Begin/Commit/Rollback
=================================================================================
查询示例:
csql := c.NewCiySQL("user")
csql.Where("name like", "王").Where("age>", 18).Order("id desc").Limit(1, 10)
var total int //全部行数
list, err := c.DB.Get(csql, &total)
for _, v := range list {
id := c.Getint(v, "id")
}
增删改示例:
=================================================================================
*/
package zciyon
import (
"database/sql"
"fmt"
"runtime"
"strings"
"time"
)
type CiyMysql struct {
Db *sql.DB
Tx *sql.Tx
Error []string
UserID int
Health int //健康状态0-100越小越好
}
// 单个数据库连接池,平时单独使用
func NewCiyMysql(dsn string, maxopenconn int, maxidelconn int, maxlifesec int) (*CiyMysql, error) {
db := &CiyMysql{}
db.Error = make([]string, 0)
db.Health = 50
db.UserID = 0
var err error
db.Db, err = sql.Open("mysql", dsn)
if err != nil {
db.addError("NewCiyMysql.Open", err)
return nil, err
}
err = db.Db.Ping()
if err != nil {
db.addError("NewCiyMysql.Ping", err)
return nil, err
}
if maxopenconn <= 0 {
maxopenconn = runtime.NumCPU()*2 + 1
}
if maxidelconn < 1 {
maxidelconn = 2
}
if maxlifesec < 1 {
maxlifesec = 3600 * 6
}
db.Db.SetMaxOpenConns(maxopenconn) //最大连接数
db.Db.SetMaxIdleConns(maxidelconn) //保持空闲的连接数
db.Db.SetConnMaxLifetime(time.Duration(maxlifesec) * time.Second)
return db, nil
}
func (thos *CiyMysql) MonitorHealth(sec float64) {
go func() {
for {
Sleep(sec)
thos.Health = 50
stat := thos.Db.Stats()
//Health%=已连接数+等待连接数*2/最大连接数
thos.Health = (stat.OpenConnections + int(stat.WaitCount)*2) * 100 / stat.MaxOpenConnections
Clog("health:%d", thos.Health)
}
}()
}
func (thos *CiyMysql) Close() error {
return thos.Db.Close()
}
func (thos *CiyMysql) addError(from string, err error) {
thos.Error = append(thos.Error, from+":"+err.Error())
}
func (thos *CiyMysql) Err() string {
if len(thos.Error) == 0 {
return ""
}
return thos.Error[len(thos.Error)-1]
}
func (thos *CiyMysql) ClearError() {
thos.Error = make([]string, 0)
}
func (thos *CiyMysql) Execute(sqlstr string, argdata ...any) (int, int, error) {
var rett sql.Result
var err error
if thos.Tx != nil {
rett, err = thos.Tx.Exec(sqlstr, argdata...)
} else {
rett, err = thos.Db.Exec(sqlstr, argdata...)
}
if err != nil {
thos.addError("Execute.Exec", err)
return 0, 0, err
}
affect, err := rett.RowsAffected()
if err != nil {
thos.addError("Execute.RowsAffected", err)
return 0, 0, err
}
var lastid int64
if strings.HasPrefix(sqlstr, "insert") {
lastid, err = rett.LastInsertId()
if err != nil {
thos.addError("Execute.LastInsertId", err)
return 0, 0, err
}
}
return int(lastid), int(affect), nil
}
func (thos *CiyMysql) Update(csql *CiySQL, updata map[string]any) (int, error) {
if csql.rawsql != "" {
e := fmt.Errorf("forbid use rawsql")
thos.addError("Update.Init", e)
return 0, e
}
if len(updata) == 0 {
e := fmt.Errorf("updata empty")
thos.addError("Update.Init", e)
return 0, e
}
if csql.sqlwhere == "" {
e := fmt.Errorf("no where")
thos.addError("Update.Init", e)
return 0, e
}
sets := []string{}
datas := []any{}
for field, data := range updata {
if field == "id" {
continue
}
if arr, ok := data.([]string); ok {
sets = append(sets, "`"+field+"`="+arr[0])
} else {
sets = append(sets, "`"+field+"`=?")
datas = append(datas, data)
}
}
setstr := strings.Join(sets, ",")
combined := make([]any, 0, len(datas)+len(csql.tsmt))
combined = append(combined, datas...)
combined = append(combined, csql.tsmt...)
sqlstr := "update " + csql.table + " set " + setstr + csql.Buildwhere()
_, lastaffect, err := thos.Execute(sqlstr, combined...)
if err != nil {
thos.addError("Update.Execute", err)
return 0, err
}
return lastaffect, nil
}
func (thos *CiyMysql) Insert(csql *CiySQL, updata map[string]any) (int, error) {
if csql.rawsql != "" {
e := fmt.Errorf("forbid use rawsql")
thos.addError("Insert.Init", e)
return 0, e
}
if len(updata) == 0 {
e := fmt.Errorf("updata empty")
thos.addError("Insert.Init", e)
return 0, e
}
columns := []string{}
values := []string{}
datas := []any{}
for field, data := range updata {
columns = append(columns, "`"+field+"`")
if arr, ok := data.([]string); ok {
values = append(values, arr[0])
} else {
values = append(values, "?")
datas = append(datas, data)
}
}
columnstr := strings.Join(columns, ",")
valuestr := strings.Join(values, ",")
sqlstr := "insert into " + csql.table + " (" + columnstr + ") values (" + valuestr + ")"
lastid, _, err := thos.Execute(sqlstr, datas...)
if err != nil {
thos.addError("Insert.Execute", err)
return 0, err
}
return lastid, nil
}
func (thos *CiyMysql) Delete(csql *CiySQL, argbackup ...int) (int, error) {
if csql.rawsql != "" {
e := fmt.Errorf("forbid use rawsql")
thos.addError("Delete.Init", e)
return 0, e
}
if csql.sqlwhere == "" {
e := fmt.Errorf("no where")
thos.addError("Delete.Init", e)
return 0, e
}
var lastaffect int
var err error
var backup int = CIYDB_DELETE_BACKUP_NONE
if len(argbackup) > 0 {
backup = argbackup[0]
}
if backup == CIYDB_DELETE_BACKUP_TABLE { //写到备份表
var columns []string
columns, err = thos.getcolumn(csql)
if err != nil {
thos.addError("Delete.getcolumn", err)
return 0, err
}
columnstr := strings.Join(columns, ",")
sqlstr := "insert into " + csql.table + "_bak (" + columnstr + ") select " + columnstr + " from " + csql.table + csql.Buildwhere()
_, _, err = thos.Execute(sqlstr)
if err != nil {
thos.addError("Delete.insertbak", err)
return 0, err
}
}
if backup == CIYDB_DELETE_BACKUP_FIELD { //修改删除标志位
sqlstr := "update " + csql.table + " set deltimes=" + Tostr(Tostamp()) + csql.Buildwhere()
_, lastaffect, err = thos.Execute(sqlstr, csql.tsmt...)
if err != nil {
thos.addError("Delete.deltimes", err)
return 0, err
}
} else { //删除数据
sqlstr := "delete from " + csql.table + csql.Buildwhere()
_, lastaffect, err = thos.Execute(sqlstr, csql.tsmt...)
if err != nil {
thos.addError("Delete.delete", err)
return 0, err
}
}
return lastaffect, nil
}
func (thos *CiyMysql) Tran(fn func() error) (err error) {
thos.Tx, err = thos.Db.Begin()
if err != nil {
thos.Tx = nil
return err
}
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("%v", r)
thos.addError("Tran.catch", fmt.Errorf("%v", r))
if thos.Tx != nil {
err2 := thos.Tx.Rollback()
CiyVars.Func_rollback++
thos.Tx = nil
if err2 != nil {
thos.addError("Tran.catch.Rollback", err2)
err = fmt.Errorf("%v[%v]", err2, err)
}
}
} else {
if thos.Tx != nil {
err2 := thos.Tx.Commit()
CiyVars.Func_commit++
thos.Tx = nil
if err2 != nil {
thos.addError("Tran.catch.Commit", err2)
err = err2
}
}
}
}()
err = fn()
if err != nil {
if thos.Tx != nil {
errtx := thos.Tx.Rollback()
CiyVars.Func_rollback++
if errtx != nil {
thos.addError("Tran.Rollback", err)
}
thos.Tx = nil
}
thos.addError("Tran", err)
} else {
if thos.Tx != nil {
errtx := thos.Tx.Commit()
CiyVars.Func_commit++
thos.Tx = nil
if errtx != nil {
thos.addError("Tran.Commit", errtx)
}
}
}
return nil
}
func (thos *CiyMysql) getcolumn(csql *CiySQL) ([]string, error) {
rows, err := thos.Db.Query("select * from " + csql.table + " where id=0")
if err != nil {
return nil, fmt.Errorf("query: %v", err)
}
defer rows.Close()
columns, err := rows.Columns()
if err != nil {
return nil, fmt.Errorf("columns: %v", err)
}
return columns, nil
}
func (thos *CiyMysql) calcolumn(csql *CiySQL) error {
if csql.sqlcolumn == "" {
csql.sqlcolumn = "*"
}
if !strings.HasPrefix(csql.sqlcolumn, "!") {
return nil
}
columns, err := thos.getcolumn(csql)
if err != nil {
return err
}
csql.sqlcolumn = strings.TrimLeft(csql.sqlcolumn, "!")
fields := strings.Split(csql.sqlcolumn, ",")
excludedFields := make(map[string]bool)
for _, field := range fields {
excludedFields[strings.TrimSpace(field)] = true
}
var filteredColumns []string
for _, column := range columns {
if !excludedFields[column] {
filteredColumns = append(filteredColumns, column)
}
}
csql.sqlcolumn = strings.Join(filteredColumns, ",")
return nil
}
// 错误err没有数据返回正常空数组
func (thos *CiyMysql) Get(csql *CiySQL, argrowcount ...int) ([]map[string]any, int, error) {
var err error
rowcount := -1
if len(argrowcount) > 0 {
rowcount = argrowcount[0]
}
sqlstr := csql.rawsql
if sqlstr == "" {
err = thos.calcolumn(csql)
if err != nil {
thos.addError("Get.calcolumn", err)
return nil, -1, err
}
sqlstr, _ = csql.Buildsql(csql.sqlcolumn)
limit := csql.sqllimit
if thos.Tx != nil {
limit += " for update"
}
sqlstr += limit
}
csql.LastSQL = sqlstr
var rows *sql.Rows
if thos.Tx != nil {
rows, err = thos.Tx.Query(sqlstr, csql.tsmt...)
} else {
rows, err = thos.Db.Query(sqlstr, csql.tsmt...)
}
if err != nil {
thos.addError("Get.Query", err)
return nil, -1, err
}
defer rows.Close()
columns, err := rows.Columns()
if err != nil {
thos.addError("Get.Columns", err)
return nil, -1, err
}
scanArgs := make([]any, len(columns))
values := make([]any, len(columns))
for i := range values {
scanArgs[i] = &values[i]
}
ret := make([]map[string]any, 0)
for rows.Next() {
err := rows.Scan(scanArgs...)
if err != nil {
thos.addError("Get.Scan", err)
return nil, -1, err
}
record := make(map[string]any, len(columns))
for i, col := range values {
if col != nil {
switch col := col.(type) {
case []byte:
record[columns[i]] = string(col)
default:
record[columns[i]] = col
}
} else {
record[columns[i]] = ""
}
}
ret = append(ret, record)
}
if csql.rawsql == "" {
if rowcount == -1 {
sqlstr, _ := csql.Buildsql("count(*)")
row := thos.Db.QueryRow(sqlstr, csql.tsmt...)
if err := row.Err(); err != nil {
thos.addError("Get.count.QueryRow", err)
return ret, -1, err
}
var value any
err := row.Scan(&value)
if err != nil {
thos.addError("Get.count.Scan", err)
return ret, -1, err
}
return ret, Toint(value), nil
}
}
return ret, rowcount, nil
}
func (thos *CiyMysql) Getraw(sqlstr string) ([]map[string]any, error) {
var err error
var rows *sql.Rows
if thos.Tx != nil {
rows, err = thos.Tx.Query(sqlstr)
} else {
rows, err = thos.Db.Query(sqlstr)
}
if err != nil {
thos.addError("Get.Query", err)
return nil, err
}
defer rows.Close()
columns, err := rows.Columns()
if err != nil {
thos.addError("Get.Columns", err)
return nil, err
}
scanArgs := make([]any, len(columns))
values := make([]any, len(columns))
for i := range values {
scanArgs[i] = &values[i]
}
ret := make([]map[string]any, 0)
for rows.Next() {
err := rows.Scan(scanArgs...)
if err != nil {
thos.addError("Get.Scan", err)
return nil, err
}
record := make(map[string]any, len(columns))
for i, col := range values {
if col != nil {
switch col := col.(type) {
case []byte:
record[columns[i]] = string(col)
default:
record[columns[i]] = col
}
} else {
record[columns[i]] = ""
}
}
ret = append(ret, record)
}
return ret, nil
}
func (thos *CiyMysql) GetField(csql *CiySQL) (map[string]map[string]any, string) {
retfield := map[string]map[string]any{}
fshow := ""
rowsfield, err := thos.Db.Query("show full fields from " + csql.table)
if err != nil {
thos.addError("GetField.Query", err)
return retfield, fshow
}
defer rowsfield.Close()
columns, err := rowsfield.Columns()
if err != nil {
thos.addError("GetField.Columns", err)
return retfield, fshow
}
scanArgs := make([]any, len(columns))
values := make([]any, len(columns))
for i := range values {
scanArgs[i] = &values[i]
}
for rowsfield.Next() {
err := rowsfield.Scan(scanArgs...)
if err != nil {
thos.addError("GetField.Scan", err)
return retfield, fshow
}
field := ""
var comment string
for i, col := range values {
if columns[i] == "Field" {
field = string(col.([]byte))
}
if columns[i] == "Comment" {
comment = string(col.([]byte))
}
}
if field != "" {
if comment != "" && !strings.HasPrefix(comment, ",") {
fshow += field + ","
}
retfield[field] = map[string]any{"c": comment}
}
}
return retfield, strings.TrimRight(fshow, ",")
}
func (thos *CiyMysql) Getdbcodes(table, column string) []map[string]any {
ret := make([]map[string]any, 0)
rows, err := thos.Db.Query("show full fields from "+table+" where Field=?", column)
if err != nil {
thos.addError("Getdbcodes.Query", err)
return ret
}
defer rows.Close()
columns, err := rows.Columns()
if err != nil {
thos.addError("Getdbcodes.Columns", err)
return ret
}
scanArgs := make([]any, len(columns))
values := make([]any, len(columns))
for i := range values {
scanArgs[i] = &values[i]
}
for rows.Next() {
err := rows.Scan(scanArgs...)
if err != nil {
thos.addError("Getdbcodes.Scan", err)
return ret
}
var comment string
for i, col := range values {
if columns[i] == "Comment" {
comment = string(col.([]byte))
}
}
if comment != "" {
ind := strings.Index(comment, ",TINT")
if ind == -1 {
ind = strings.Index(comment, ",TBIN")
}
if ind == -1 {
ind = strings.Index(comment, ",BOOL")
}
if ind == -1 {
return ret
}
exts := strings.Split(comment[ind+5:], ",")
for i, ext := range exts {
excos := strings.Split(ext, ".")
if len(excos) > 1 {
ret = append(ret, map[string]any{"id": excos[0], "name": excos[1]})
} else if len(excos[0]) > 0 {
ret = append(ret, map[string]any{"id": i, "name": excos[0]})
}
}
if strings.Contains(comment, ",BOOL") {
if len(ret) == 0 {
ret = append(ret, map[string]any{"id": "1", "name": "✔"})
}
if len(ret) == 1 {
ret = append(ret, map[string]any{"id": "2", "name": "✘"})
}
}
}
}
return ret
}
// 错误err没有数据返回err只有有数据正常
func (thos *CiyMysql) Getone(csql *CiySQL) (map[string]any, error) {
var err error
sqlstr := csql.rawsql
if sqlstr == "" {
err = thos.calcolumn(csql)
if err != nil {
thos.addError("Getone.calcolumn", err)
return nil, err
}
sqlstr, _ = csql.Buildsql(csql.sqlcolumn)
limit := " limit 0,1"
if thos.Tx != nil {
limit += " for update"
}
sqlstr += limit
}
csql.LastSQL = sqlstr
var rows *sql.Rows
if thos.Tx != nil {
rows, err = thos.Tx.Query(sqlstr, csql.tsmt...)
} else {
rows, err = thos.Db.Query(sqlstr, csql.tsmt...)
}
if err != nil {
thos.addError("Getone.Query", err)
return nil, err
}
defer rows.Close()
columns, err := rows.Columns()
if err != nil {
thos.addError("Getone.Columns", err)
return nil, err
}
scanArgs := make([]any, len(columns))
values := make([]any, len(columns))
for i := range values {
scanArgs[i] = &values[i]
}
bdat := rows.Next()
if !bdat {
return nil, nil
}
err = rows.Scan(scanArgs...)
if err != nil {
thos.addError("Getone.Scan", err)
return nil, err
}
record := make(map[string]any, len(columns))
for i, col := range values {
if col != nil {
switch col := col.(type) {
case []byte:
record[columns[i]] = string(col)
default:
record[columns[i]] = col
}
} else {
record[columns[i]] = ""
}
}
return record, nil
}
// 无数据和错误都返回"",通过.Error判断
func (thos *CiyMysql) Get1(csql *CiySQL) any {
var err error
sqlstr := csql.rawsql
if sqlstr == "" {
col := csql.sqlcolumn
if col == "" || col == "*" || col[0] == '!' {
col = "count(*)"
}
var column string
sqlstr, column = csql.Buildsql(col)
limit := csql.sqllimit
if limit == "" && !strings.Contains(column, "(") {
limit = " limit 0,1"
}
if thos.Tx != nil {
limit += " for update"
}
sqlstr += limit
}
csql.LastSQL = sqlstr
var row *sql.Row
if thos.Tx != nil {
row = thos.Tx.QueryRow(sqlstr, csql.tsmt...)
} else {
row = thos.Db.QueryRow(sqlstr, csql.tsmt...)
}
if err := row.Err(); err != nil {
thos.addError("Get1.QueryRow", err)
return ""
}
var value any
err = row.Scan(&value)
if err != nil {
thos.addError("Get1.Scan", err)
return ""
}
if value == nil {
return ""
}
return value
}