不同数据库的驱动一定不同,对于一种语言来说,不可能在标准库中实现所有的数据库驱动,那么一种语言该如何实现与数据库相关的标准库部分呢?Golang 通过定义一系列与数据库驱动相关的 Interface 实现了数据库驱动的逻辑与具体实现的解藕,十分优雅的解决了这个问题。
同时,Golang 标准库的 database/sql 包以空闲连接池的方式实现了连接的复用,从而达到了提高性能的效果。

Driver

Interface

sql-第 4 页.drawio.svg
图 1: Interface design

数据库接口设计如图 1 所示。Connector 通过 Driver 方法获取数据库驱动,通过 Connect 方法与数据库建立连接,并且可以猜测 Connector 是基于 Driver 获取连接的,通过连接可以开启一个事务或者创建一个 SQL 语句,根据 SQL 语句的种类选择应该使用 Exec 方法获得 Result 接口还是使用 Query 方法获得 Rows 接口。全部接口定义如下

  1. type Connector interface {
  2. Connect(context.Context) (Conn, error)
  3. Driver() Driver
  4. }
  5. type Driver interface {
  6. Open(name string) (Conn, error)
  7. }
  8. type Conn interface {
  9. Prepare(query string) (Stmt, error)
  10. Close() error
  11. Begin() (Tx, error)
  12. }
  13. type Tx interface {
  14. Commit() error
  15. Rollback() error
  16. }
  17. type Stmt interface {
  18. Close() error
  19. NumInput() int
  20. Exec(args []Value) (Result, error)
  21. Query(args []Value) (Rows, error)
  22. }
  23. type Result interface {
  24. LastInsertId() (int64, error)
  25. RowsAffected() (int64, error)
  26. }
  27. type Rows interface {
  28. Columns() []string
  29. Close() error
  30. Next(dest []Value) error
  31. }

SQL

Register

对于驱动实现者来说,只需根据 Golang 标准库定义的接口来实现驱动,然后通过 Register 函数将驱动注册即可,注册详情如下

  1. func Register(name string, driver driver.Driver) {
  2. driversMu.Lock()
  3. defer driversMu.Unlock()
  4. if driver == nil {
  5. panic("sql: Register driver is nil")
  6. }
  7. if _, dup := drivers[name]; dup {
  8. panic("sql: Register called twice for driver " + name)
  9. }
  10. drivers[name] = driver
  11. }

L10 将传入的驱动名称和驱动实例保存到 drivers,drivers 是 database/sql 中定义的一个全局变量。

  1. var (
  2. driversMu sync.RWMutex
  3. drivers = make(map[string]driver.Driver)
  4. )

L2 创建了一个读写锁,因为 Golang 中的 map 类型不是并发安全的,所以在访问 drivers 的时候需要加锁来保证其安全性。

Connect Manager

database-connect_manager.drawio.svg
图 2: Connect Manager

连接管理如图 2 所示,其中核心结构是 DB,DB 中有 3 个主要字段,connector 负责底层连接的创建;connRequests 负责连接请求的管理;freeConn 负责空闲连接的存储。
在对数据库进行操作之前必须要获取连接,获取连接的途径有 3 个,从空闲连接池获取;从 connRequests 中的信道获取连接请求,再通过连接请求获取;通过 connector 的 Connect 方法获取底层连接后构建连接。
连接被使用之后,要么形成新的连接请求发送到 connRequests 中的信道,要么作为空闲连接放入空闲连接池。

下面让我们通过阅读源码来了解数据库连接管理的详细过程,首先从 DB 的结构开始。

  1. type DB struct {
  2. // Atomic access only. At top of struct to prevent mis-alignment
  3. // on 32-bit platforms. Of type time.Duration.
  4. waitDuration int64 // Total time waited for new connections.
  5. connector driver.Connector
  6. // numClosed is an atomic counter which represents a total number of
  7. // closed connections. Stmt.openStmt checks it before cleaning closed
  8. // connections in Stmt.css.
  9. numClosed uint64
  10. mu sync.Mutex // protects following fields
  11. freeConn []*driverConn // free connections ordered by returnedAt oldest to newest
  12. connRequests map[uint64]chan connRequest
  13. nextRequest uint64 // Next key to use in connRequests.
  14. numOpen int // number of opened and pending open connections
  15. // Used to signal the need for new connections
  16. // a goroutine running connectionOpener() reads on this chan and
  17. // maybeOpenNewConnections sends on the chan (one send per needed connection)
  18. // It is closed during db.Close(). The close tells the connectionOpener
  19. // goroutine to exit.
  20. openerCh chan struct{}
  21. closed bool
  22. dep map[finalCloser]depSet
  23. lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
  24. maxIdleCount int // zero means defaultMaxIdleConns; negative means 0
  25. maxOpen int // <= 0 means unlimited
  26. maxLifetime time.Duration // maximum amount of time a connection may be reused
  27. maxIdleTime time.Duration // maximum amount of time a connection may be idle before being closed
  28. cleanerCh chan struct{}
  29. waitCount int64 // Total number of connections waited for.
  30. maxIdleClosed int64 // Total number of connections closed due to idle count.
  31. maxIdleTimeClosed int64 // Total number of connections closed due to idle time.
  32. maxLifetimeClosed int64 // Total number of connections closed due to max connection lifetime limit.
  33. stop func() // stop cancels the connection opener.
  34. }

DB 实例可以通过 Open 函数进行创建,最终是调用 OpenDB 函数创建的 DB 实例,调用 OpenDB 还需要获得 Connector 接口作为参数。Connector 可以通过两种方式获取,L9 ~ L15 表明如果设计驱动的同时实现了 DriverContext 接口,那么直接调用 DriverContext 接口中的 OpenConnector 方法获取 Connector;L17 表明如果驱动没有实现 DriverContext 接口,则使用标准库中默认的 dsnConnector 类型作为 Connector 接口。这样设计既允许自己定义 Connector 获取的方法,也提供了默认的 Connector,提高了灵活性,在一些场景可考虑使用。

  1. func Open(driverName, dataSourceName string) (*DB, error) {
  2. driversMu.RLock()
  3. driveri, ok := drivers[driverName]
  4. driversMu.RUnlock()
  5. if !ok {
  6. return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
  7. }
  8. if driverCtx, ok := driveri.(driver.DriverContext); ok {
  9. connector, err := driverCtx.OpenConnector(dataSourceName)
  10. if err != nil {
  11. return nil, err
  12. }
  13. return OpenDB(connector), nil
  14. }
  15. return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
  16. }

OpenDB 详细内容如下

  1. func OpenDB(c driver.Connector) *DB {
  2. ctx, cancel := context.WithCancel(context.Background())
  3. db := &DB{
  4. connector: c,
  5. openerCh: make(chan struct{}, connectionRequestQueueSize),
  6. lastPut: make(map[*driverConn]string),
  7. connRequests: make(map[uint64]chan connRequest),
  8. stop: cancel,
  9. }
  10. go db.connectionOpener(ctx)
  11. return db
  12. }

[1] L3 ~ L9: 创建 DB 实例;
[2] L11: 开启一个创建连接的协程。

协程详情如下

  1. func (db *DB) connectionOpener(ctx context.Context) {
  2. for {
  3. select {
  4. case <-ctx.Done():
  5. return
  6. case <-db.openerCh:
  7. db.openNewConnection(ctx)
  8. }
  9. }
  10. }

协程的主要内容是监听一个信号,并且在收到信号后调用 oepnNewConnection 方法打开一个新的连接,那么问题来了,什么时候会发送信号到 openerCh 中呢?通过详细阅读可以发现,在maybeOpenNewConnections 方法中会向 openerCh 发送信号。

  1. func (db *DB) maybeOpenNewConnections() {
  2. numRequests := len(db.connRequests)
  3. if db.maxOpen > 0 {
  4. numCanOpen := db.maxOpen - db.numOpen
  5. if numRequests > numCanOpen {
  6. numRequests = numCanOpen
  7. }
  8. }
  9. for numRequests > 0 {
  10. db.numOpen++ // optimistically
  11. numRequests--
  12. if db.closed {
  13. return
  14. }
  15. db.openerCh <- struct{}{}
  16. }
  17. }

[1] L2 ~ L8: 根据 maxOpen 和 numOpen 计算出还能打开的连接数量,如果现在连接请求数量大于还能打开的连接数量,将还能打开的连接数量赋值给连接请求数量;
[2] L9 ~ L16: 存在多少连接请求就向 openerCh 中发送多少次信号。

可以发现协程的唤醒与连接请求个数有关,那么连接请求究竟是为了解决什么问题而设计的呢?让我们带着疑问回归 openNewConnection 函数的阅读。

  1. func (db *DB) openNewConnection(ctx context.Context) {
  2. // maybeOpenNewConnections has already executed db.numOpen++ before it sent
  3. // on db.openerCh. This function must execute db.numOpen-- if the
  4. // connection fails or is closed before returning.
  5. ci, err := db.connector.Connect(ctx)
  6. db.mu.Lock()
  7. defer db.mu.Unlock()
  8. if db.closed {
  9. if err == nil {
  10. ci.Close()
  11. }
  12. db.numOpen--
  13. return
  14. }
  15. if err != nil {
  16. db.numOpen--
  17. db.putConnDBLocked(nil, err)
  18. db.maybeOpenNewConnections()
  19. return
  20. }
  21. dc := &driverConn{
  22. db: db,
  23. createdAt: nowFunc(),
  24. returnedAt: nowFunc(),
  25. ci: ci,
  26. }
  27. if db.putConnDBLocked(dc, err) {
  28. db.addDepLocked(dc, dc)
  29. } else {
  30. db.numOpen--
  31. ci.Close()
  32. }
  33. }

[1] L5: 通过 Connector 打开一个连接;
[2] L6 ~ L14: 如果 DB 已经关闭并且成功打开了连接,将连接关闭,并且将打开的连接数量减 1 并返回;
[3] L15 ~ L20: 如果数据库没有关闭并且打开连接出现了错误,将打开的连接数量减 1,并将空连接和详细错误加入到 DB,并调用 maybeOpenNewConnections 进行错误处理;
[4] L21 ~ L32: 如果数据库没有错误并且成功打开了连接,创建 dc 实例,并将新打开的连接加入其中,再将 dc 加入 DB 中,如果加入失败,将打开的连接数量减 1,关闭连接。

可以发现在 DB 未关闭的情况下最终都会调用 putConnDBLocked 方法,并且方法执行失败一定会将打开连接数量减 1,因此可以猜测在协程接收到信号之前一定进行了连接数量加 1 的操作(猜想可以通过 maybeOpenNewConnections 方法得到验证),并且 putConnDBLocked 方法一定是核心操作。

  1. func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
  2. if db.closed {
  3. return false
  4. }
  5. if db.maxOpen > 0 && db.numOpen > db.maxOpen {
  6. return false
  7. }
  8. if c := len(db.connRequests); c > 0 {
  9. var req chan connRequest
  10. var reqKey uint64
  11. for reqKey, req = range db.connRequests {
  12. break
  13. }
  14. delete(db.connRequests, reqKey) // Remove from pending requests.
  15. if err == nil {
  16. dc.inUse = true
  17. }
  18. req <- connRequest{
  19. conn: dc,
  20. err: err,
  21. }
  22. return true
  23. } else if err == nil && !db.closed {
  24. if db.maxIdleConnsLocked() > len(db.freeConn) {
  25. db.freeConn = append(db.freeConn, dc)
  26. db.startCleanerLocked()
  27. return true
  28. }
  29. db.maxIdleClosed++
  30. }
  31. return false
  32. }

[1] L8 ~ L22: 在 connRequests 中随机选取一个 chan connRequest,将连接和错误传入其中;
[2] L23 ~ L30: 如果最大空闲连接大于当前空闲连接池中的连接数量,将连接加入空闲连接池中,并且开启定时清理空闲连接池中无效连接的协程。

那么问题来了,chan connRequest 只有发送端,接受端在哪里呢?通过继续阅读可以发现,接收端在 DB 的 conn 方法中。还记得文章一开始介绍的那些接口吗,DB 实现了其中的大部分,包括 Conn、Stmt、TX 等,实现这些接口的方法中都有调用 conn 方法,即无论是创建执行语句的 Prepare、开启事务的 Begin、执行非查询语句的 Exec、还是执行查询语句的 Query,都需要先执行 conn 方法。

  1. func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
  2. db.mu.Lock()
  3. if db.closed {
  4. db.mu.Unlock()
  5. return nil, errDBClosed
  6. }
  7. // Check if the context is expired.
  8. select {
  9. default:
  10. case <-ctx.Done():
  11. db.mu.Unlock()
  12. return nil, ctx.Err()
  13. }
  14. lifetime := db.maxLifetime
  15. // Prefer a free connection, if possible.
  16. last := len(db.freeConn) - 1
  17. if strategy == cachedOrNewConn && last >= 0 {
  18. // Reuse the lowest idle time connection so we can close
  19. // connections which remain idle as soon as possible.
  20. conn := db.freeConn[last]
  21. db.freeConn = db.freeConn[:last]
  22. conn.inUse = true
  23. if conn.expired(lifetime) {
  24. db.maxLifetimeClosed++
  25. db.mu.Unlock()
  26. conn.Close()
  27. return nil, driver.ErrBadConn
  28. }
  29. db.mu.Unlock()
  30. // Reset the session if required.
  31. if err := conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {
  32. conn.Close()
  33. return nil, err
  34. }
  35. return conn, nil
  36. }
  37. // Out of free connections or we were asked not to use one. If we're not
  38. // allowed to open any more connections, make a request and wait.
  39. if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
  40. // Make the connRequest channel. It's buffered so that the
  41. // connectionOpener doesn't block while waiting for the req to be read.
  42. req := make(chan connRequest, 1)
  43. reqKey := db.nextRequestKeyLocked()
  44. db.connRequests[reqKey] = req
  45. db.waitCount++
  46. db.mu.Unlock()
  47. waitStart := nowFunc()
  48. // Timeout the connection request with the context.
  49. select {
  50. case <-ctx.Done():
  51. // Remove the connection request and ensure no value has been sent
  52. // on it after removing.
  53. db.mu.Lock()
  54. delete(db.connRequests, reqKey)
  55. db.mu.Unlock()
  56. atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
  57. select {
  58. default:
  59. case ret, ok := <-req:
  60. if ok && ret.conn != nil {
  61. db.putConn(ret.conn, ret.err, false)
  62. }
  63. }
  64. return nil, ctx.Err()
  65. case ret, ok := <-req:
  66. atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
  67. if !ok {
  68. return nil, errDBClosed
  69. }
  70. // Only check if the connection is expired if the strategy is cachedOrNewConns.
  71. // If we require a new connection, just re-use the connection without looking
  72. // at the expiry time. If it is expired, it will be checked when it is placed
  73. // back into the connection pool.
  74. // This prioritizes giving a valid connection to a client over the exact connection
  75. // lifetime, which could expire exactly after this point anyway.
  76. if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) {
  77. db.mu.Lock()
  78. db.maxLifetimeClosed++
  79. db.mu.Unlock()
  80. ret.conn.Close()
  81. return nil, driver.ErrBadConn
  82. }
  83. if ret.conn == nil {
  84. return nil, ret.err
  85. }
  86. // Reset the session if required.
  87. if err := ret.conn.resetSession(ctx); errors.Is(err, driver.ErrBadConn) {
  88. ret.conn.Close()
  89. return nil, err
  90. }
  91. return ret.conn, ret.err
  92. }
  93. }
  94. db.numOpen++ // optimistically
  95. db.mu.Unlock()
  96. ci, err := db.connector.Connect(ctx)
  97. if err != nil {
  98. db.mu.Lock()
  99. db.numOpen-- // correct for earlier optimism
  100. db.maybeOpenNewConnections()
  101. db.mu.Unlock()
  102. return nil, err
  103. }
  104. db.mu.Lock()
  105. dc := &driverConn{
  106. db: db,
  107. createdAt: nowFunc(),
  108. returnedAt: nowFunc(),
  109. ci: ci,
  110. inUse: true,
  111. }
  112. db.addDepLocked(dc, dc)
  113. db.mu.Unlock()
  114. return dc, nil
  115. }

[1] L18 ~ L39: 取出空闲连接池中最后一个连接,即剩余空闲时间最少的连接,将其设为正在使用状态,判断连接是否过期,如果过期,则将由于生命周期限制而关闭的连接总数加 1,之后关闭连接,并返回空连接和错误,如果未过期,判断是否需要重设 session,如果需要则重设,最后返回连接;
[2] L43 ~ L103: 如果空闲连接池为空或者使用了 alwaysNewConn 策略,同时已打开连接数量大于等于最大连接数量,那么将创建连接请求的 channel,并且会监听关闭信号和连接请求 channel。收到关闭信号后会将连接放入空闲连接池;收到连接请求后会对连接请求中的连接进行检验,如果有问题返回 nil,没有问题返回连接。这样就明白了连接请求的设计本质上是为了保证 maxOpen 的有效性;
[3] L107 ~ L125: 通过 Connector 创建连接,如果连接失败,调用 maybeOpenNewConnections 尝试再次打开连接。

那么问题来了,使用 conn 方法获取连接并使用完毕之后对于连接是如何处理的,我们可以通过 Exec 方法进行查看,Exec 方法最终会调用 exec 方法,因此直接查看 exec 即可

  1. func (db *DB) exec(ctx context.Context, query string, args []any, strategy connReuseStrategy) (Result, error) {
  2. dc, err := db.conn(ctx, strategy)
  3. if err != nil {
  4. return nil, err
  5. }
  6. return db.execDC(ctx, dc, dc.releaseConn, query, args)
  7. }

conn 方法之前已经分析过了,看起来第 6 行的 releaseConn 方法是连接的后续处理方法

  1. func (dc *driverConn) releaseConn(err error) {
  2. dc.db.putConn(dc, err, true)
  3. }

继续查看 putConn 方法

  1. func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
  2. if !errors.Is(err, driver.ErrBadConn) {
  3. if !dc.validateConnection(resetSession) {
  4. err = driver.ErrBadConn
  5. }
  6. }
  7. db.mu.Lock()
  8. if !dc.inUse {
  9. db.mu.Unlock()
  10. if debugGetPut {
  11. fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
  12. }
  13. panic("sql: connection returned that was never out")
  14. }
  15. if !errors.Is(err, driver.ErrBadConn) && dc.expired(db.maxLifetime) {
  16. db.maxLifetimeClosed++
  17. err = driver.ErrBadConn
  18. }
  19. if debugGetPut {
  20. db.lastPut[dc] = stack()
  21. }
  22. dc.inUse = false
  23. dc.returnedAt = nowFunc()
  24. for _, fn := range dc.onPut {
  25. fn()
  26. }
  27. dc.onPut = nil
  28. if errors.Is(err, driver.ErrBadConn) {
  29. // Don't reuse bad connections.
  30. // Since the conn is considered bad and is being discarded, treat it
  31. // as closed. Don't decrement the open count here, finalClose will
  32. // take care of that.
  33. db.maybeOpenNewConnections()
  34. db.mu.Unlock()
  35. dc.Close()
  36. return
  37. }
  38. if putConnHook != nil {
  39. putConnHook(db, dc)
  40. }
  41. added := db.putConnDBLocked(dc, nil)
  42. db.mu.Unlock()
  43. if !added {
  44. dc.Close()
  45. return
  46. }
  47. }

可以发现第 44 行调用了 putConnDBLocked 方法,结合之前 putConnDBLocked 方法的分析可知,连接使用完之后,如果连接请求集合不为空,则作为连接请求被使用;如果连接请求为空并且空闲连接池的容量还没有达到上限,则加入空闲连接池中。

Usage

database-Connector_acquisition.drawio.svg
图 3: Connector acquisition

首先通过 Open 函数创建 DB 实例,创建过程中最重要的是 Connector 的获取,因为获取新连接都需要通过 Connector 实现,Connector 的获取详情如图 3 所示,然后使用 Exec 或 Query 方法执行 SQL 语句即可。

  1. func Open(driverName, dataSourceName string) (*DB, error) {
  2. driversMu.RLock()
  3. driveri, ok := drivers[driverName]
  4. driversMu.RUnlock()
  5. if !ok {
  6. return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
  7. }
  8. if driverCtx, ok := driveri.(driver.DriverContext); ok {
  9. connector, err := driverCtx.OpenConnector(dataSourceName)
  10. if err != nil {
  11. return nil, err
  12. }
  13. return OpenDB(connector), nil
  14. }
  15. return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
  16. }

下面是一个使用 database/sql 操作 mysql 数据库的例子,数据库相关方法封装如下

  1. package mysql
  2. var (
  3. DBName = "test"
  4. TableName = "user"
  5. )
  6. type User struct {
  7. ID uint32
  8. Name string
  9. }
  10. const (
  11. mysqlUserCreateDatabase = iota
  12. mysqlUserCreateTable
  13. mysqlUserInsert
  14. mysqlUserSelectByID
  15. mysqlUserSelectAll
  16. )
  17. var (
  18. errInvalidInsert = errors.New("[user] invalid insert ")
  19. userSQLString = map[int]string{
  20. fmt.Sprintf(`CREATE DATABASE IF NOT EXISTS %s`, DBName),
  21. fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s(
  22. id INT UNSIGNED AUTO_INCREMENT PRIMARY KEY,
  23. name VARCHAR(56) NOT NULL UNIQUE COMMENT '用户名',
  24. )ENGINE=InnoDB CHARSET=utf8mb4 COLLATE=utf8mb4_bin;`, DBName, TableName),
  25. fmt.Sprintf(`INSERT INTO %s.%s (name) VALUES (?)`, DBName, TableName),
  26. fmt.Sprintf(`SELECT id, name FROM %s.%s WHERE id = ? LIMIT 1`, DBName, TableName),
  27. fmt.Sprintf(`SELECT id, name FROM %s.%s`, DBName, TableName),
  28. }
  29. )
  30. func CreateDatabase(db *sql.DB) error {
  31. _, err := db.Exec(userSQLString[mysqlUserCreateDatabase])
  32. return err
  33. }
  34. func CreateTable(db *sql.DB) error {
  35. _, err := db.Exec(userSQLString[mysqlUserCreateTable])
  36. return err
  37. }
  38. func InsertUser(db *sql.DB, name string) error {
  39. result, err := db.Exec(userSQLString[mysqlUserInsert], name)
  40. if err != nil {
  41. return err
  42. }
  43. if rows, _ := result.RowsAffected(); rows == 0 {
  44. return errInvalidInsert
  45. }
  46. return nil
  47. }
  48. func TxInsertUser(tx *sql.Tx, name string) error {
  49. result, err := tx.Exec(userSQLString[mysqlUserInsert], name)
  50. if err != nil {
  51. return err
  52. }
  53. if rows, _ := result.RowsAffected(); rows == 0 {
  54. return errInvalidInsert
  55. }
  56. return nil
  57. }
  58. func SelectUserByID(db *sql.DB, id uint32) (*User, error) {
  59. var name string
  60. row := db.QueryRow(userSQLString[mysqlUserSelectByID], id)
  61. if err := row.Scan(&id, &name); err != nil {
  62. return nil, err
  63. }
  64. return &Account{
  65. ID: id,
  66. Name: name,
  67. }, nil
  68. }
  69. func TxSelectUserByID(tx *sql.Tx, id uint32) (*User, error) {
  70. var name string
  71. row := tx.QueryRow(userSQLString[mysqlUserSelectByID], id)
  72. if err := row.Scan(&id, &name); err != nil {
  73. return nil, err
  74. }
  75. return &User{
  76. ID: id,
  77. Name: name,
  78. }, nil
  79. }
  80. func ListUsers(db *sql.DB) ([]*User, error) {
  81. var (
  82. Users = make([]*User, 0)
  83. ID uint32
  84. Name string
  85. )
  86. rows, err := db.Query(userSQLString[mysqlUserSelectAll])
  87. if err != nil {
  88. return nil, err
  89. }
  90. defer rows.Close()
  91. for rows.Next() {
  92. if err := rows.Scan(&ID, &Name); err != nil {
  93. return nil, err
  94. }
  95. User := &User{
  96. ID: ID,
  97. Name: Name,
  98. }
  99. Users = append(Users, User)
  100. }
  101. return Users, nil
  102. }
  103. func TxListUsers(tx *sql.Tx) ([]*User, error) {
  104. var (
  105. Users = make([]*User, 0)
  106. ID uint32
  107. Name string
  108. )
  109. rows, err := tx.Query(userSQLString[mysqlUserSelectAll])
  110. if err != nil {
  111. return nil, err
  112. }
  113. defer rows.Close()
  114. for rows.Next() {
  115. if err := rows.Scan(&ID, &Name); err != nil {
  116. return nil, err
  117. }
  118. User := &User{
  119. ID: ID,
  120. Name: Name,
  121. }
  122. Users = append(Users, User)
  123. }
  124. return Users, nil
  125. }

使用方法前需要导入 mysql 驱动,在 mysql 驱动相关官网找到 dsn 格式用来初始化 DB 实例,最后调用封装好的方法即可。

  1. package main
  2. import (
  3. "fmt"
  4. "database/sql"
  5. _ "github.com/go-sql-driver/mysql"
  6. )
  7. func main() {
  8. dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=%s&parseTime=%t&loc=%s",
  9. username, password, host, port, database, charset, true, "Local")
  10. db := sql.Open("mysql", dsn)
  11. db.SetMaxOpenConns(maxOpenConns)
  12. db.SetMaxIdleConns(maxIdleConns)
  13. db.SetConnMaxLifetime(time.Duration(maxLifetime) * time.Second)
  14. if err := mysql.CreateDatabase(db); err != nil {
  15. panic(err)
  16. }
  17. }

[1] L15: 设置最大打开连接数量来控制最大并发量;
[2] L16: 设置最大空闲连接来控制空闲连接池大小;
[3] L17: 设置连接的最长存活时间。