驱动

全局变量

  1. package sql
  2. var drivers = make(map[string]driver.Driver)�

驱动的接口

数据库驱动必须实现这个接口,Driver可以通过name得到一个数据库的连接(实现Conn接口)。通过这个Conn可以对数据库进行各种操作。name的格式由使用的驱动种类决定。

  1. type Driver interface {
  2. Open(name string) (Conn, error)
  3. }

DB

概览

SQL - 图1

结构体

  1. type DB struct {
  2. connector driver.Connector// 用于获取driver.Conn 可以由驱动层实现,否则用sql.dsnConnector
  3. numClosed uint64 // 是一个原子计数器,代表总的关闭连接数量
  4. mu sync.Mutex
  5. freeConn []*driverConn //空闲连接池
  6. connRequests map[uint64]chan connRequest // 无可用连接时,处于 Pending 状态的连接请求
  7. nextRequest uint64
  8. numOpen int // 打开和准备打开的连接总数
  9. openerCh chan struct{} // 用来传信号的管道 表示需要多少新连接
  10. resetterCh chan *driverConn // 用来传需要重置 Session 的 driverConn
  11. closed bool
  12. dep map[finalCloser]depSet // 依赖记录
  13. lastPut map[*driverConn]string
  14. maxIdle int
  15. maxOpen int
  16. maxLifetime time.Duration // 连接的生命后期
  17. cleanerCh chan struct{} // 传信号 表示需要清理freeConn空闲池中已经关掉的driverConn
  18. stop func()
  19. }

连接数据库

连接数据库需要connector,如果数据库驱动实现了DriverContext接口则可以通过OpenConnector方法获得一个connector(数据库驱动自定义)。如果没有,那么sql包提供了一个实现此接口的dsnConnector结构体。
使用dsnConnector每获取一个Conn则驱动就要解析一次name,并无权使用context。
使用数据库驱动自定义的connector则驱动只解析一次name,并有权使用context。

  1. func Open(driverName, dataSourceName string) (*DB, error) {
  2. driversMu.RLock()
  3. driveri, ok := drivers[driverName]
  4. driversMu.RUnlock()
  5. if !ok {
  6. return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
  7. }
  8. if driverCtx, ok := driveri.(driver.DriverContext); ok {
  9. connector, err := driverCtx.OpenConnector(dataSourceName)
  10. if err != nil {
  11. return nil, err
  12. }
  13. return OpenDB(connector), nil
  14. }
  15. return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
  16. }

driverContext定义:

  1. type DriverContext interface {
  2. OpenConnector(name string) (Connector, error)
  3. }

执行部分:

  1. func OpenDB(c driver.Connector) *DB {
  2. ctx, cancel := context.WithCancel(context.Background())
  3. db := &DB{
  4. connector: c,
  5. openerCh: make(chan struct{}, connectionRequestQueueSize),
  6. resetterCh: make(chan *driverConn, 50),
  7. lastPut: make(map[*driverConn]string),
  8. connRequests: make(map[uint64]chan connRequest),
  9. stop: cancel,
  10. }
  11. go db.connectionOpener(ctx) // 新建连接的goroutine,db.openerCh传来东西就为DB创建一个连接
  12. go db.connectionResetter(ctx) // 重置session的goroutine
  13. return db
  14. }

DB连接管理

如果DB刚刚打开,此时没有连接创建,需要时调用DB.conn函数开始创建连接。任何时候若有连接创建失败,或者连接关闭,会自动调用maybeOpenNewConnections,发送尽可能多的新建信号,收到后openNewConnection就会新建连接直到DB连接数量达到饱和。建立的连接会被putConnDBLocked拿去优先满足connRequest请求,其次放入freeConn。

监听openerCh管道发来的新建连接信号

DB打开时就为此函数开始一个goroutine,当openerCh传来一个struct{}{},就新建一个连接。任何时候连接创建失败、连接关闭,就会调用openNewConnection,然后connectionOpener才会收到信号。

  1. func (db *DB) connectionOpener(ctx context.Context) {
  2. for {
  3. select {
  4. case <-ctx.Done():
  5. return
  6. case <-db.openerCh:
  7. db.openNewConnection(ctx)
  8. }
  9. }
  10. }

新建一个新连接

  1. func (db *DB) openNewConnection(ctx context.Context) {
  2. ci, err := db.connector.Connect(ctx) // 建立一个driver.Conn
  3. db.mu.Lock()
  4. defer db.mu.Unlock()
  5. if db.closed {
  6. if err == nil { // DB已关闭,但driver.Conn建立成功
  7. ci.Close()
  8. }
  9. db.numOpen-- // maybeOpenNewConnctions往db.openerCh发送信号前已经执行db.numOpen++,此处减回去
  10. return
  11. }
  12. if err != nil { // driver.Conn建立失败
  13. db.numOpen--
  14. db.putConnDBLocked(nil, err)
  15. db.maybeOpenNewConnections() // 发送尽可能多的新建连接信号
  16. return
  17. }
  18. dc := &driverConn{ // 创建driverConn实例
  19. db: db,
  20. createdAt: nowFunc(),
  21. ci: ci,
  22. }
  23. if db.putConnDBLocked(dc, err) { // 把driverConn放入DB中
  24. db.addDepLocked(dc, dc) // 添加driverConn依赖记录
  25. } else {
  26. db.numOpen--
  27. ci.Close()
  28. }
  29. }

把连接放入DB

优先用来满足一个connRequest,若无请求,则放入空闲池

  1. func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
  2. if db.closed {
  3. return false
  4. }
  5. if db.maxOpen > 0 && db.numOpen > db.maxOpen { // 连接已达到上限
  6. return false
  7. }
  8. if c := len(db.connRequests); c > 0 { // 即使dc==nil也传进connRequest,因为后面管道接收时会判断
  9. var req chan connRequest
  10. var reqKey uint64
  11. for reqKey, req = range db.connRequests { // 取一个connRequest管道
  12. break
  13. }
  14. delete(db.connRequests, reqKey) // 删除该连接请求
  15. if err == nil {
  16. dc.inUse = true
  17. }
  18. req <- connRequest{ // 满足这个connRequest管道的请求
  19. conn: dc,
  20. err: err,
  21. }
  22. return true
  23. } else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) { //dc==nil不能放入freeConn
  24. db.freeConn = append(db.freeConn, dc)
  25. db.startCleanerLocked() // 只会开启一次,开启后会直到DB.closed||DB.numOpen==0才关闭
  26. return true
  27. }
  28. return false
  29. }

发送尽可能多的新建连接的信号

  1. func (db *DB) maybeOpenNewConnections() {
  2. numRequests := len(db.connRequests)
  3. if db.maxOpen > 0 {
  4. numCanOpen := db.maxOpen - db.numOpen
  5. if numRequests > numCanOpen {
  6. numRequests = numCanOpen // 剩余多少空间,就发送多少信号
  7. }
  8. }
  9. for numRequests > 0 {
  10. db.numOpen++ // 乐观地加1,后面若创建失败会减去
  11. numRequests--
  12. if db.closed {
  13. return
  14. }
  15. db.openerCh <- struct{}{} // 发送信号
  16. }
  17. }

从DB获取连接(driverConn)

如果可能,优先选择空闲池的连接。
如果空闲池空了,且打开连接达到上限,那么创建一个connRequest管道加入等待map,当有连接放回DB会封装成connRequest传入管道。
如果空闲池空了,连接没达到上限,使用DB.connector创建一个连接(driver.Conn),封装成driverConn,并返回。
代码:

  1. func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
  2. db.mu.Lock()
  3. if db.closed {
  4. db.mu.Unlock()
  5. return nil, errDBClosed
  6. }
  7. // 检查context是否过期
  8. select {
  9. default:
  10. case <-ctx.Done():
  11. db.mu.Unlock()
  12. return nil, ctx.Err()
  13. }
  14. lifetime := db.maxLifetime
  15. // 如果可能,优先选择空闲池的连接
  16. numFree := len(db.freeConn)
  17. if strategy == cachedOrNewConn && numFree > 0 {
  18. conn := db.freeConn[0] // 取第一个连接
  19. copy(db.freeConn, db.freeConn[1:]) // 从空闲池删除此连接
  20. db.freeConn = db.freeConn[:numFree-1]
  21. conn.inUse = true // 重置inUse
  22. db.mu.Unlock()
  23. if conn.expired(lifetime) { // 连接过期则关闭该连接,并返回一个ErrBadConn
  24. conn.Close()
  25. return nil, driver.ErrBadConn
  26. }
  27. // 锁会使检查lastErr操作等到resetSession执行完后再执行,lastErr记录resetSession产生的错误
  28. conn.Lock()
  29. err := conn.lastErr
  30. conn.Unlock()
  31. if err == driver.ErrBadConn {
  32. conn.Close()
  33. return nil, driver.ErrBadConn
  34. }
  35. return conn, nil
  36. }
  37. // 如果空闲池空了,且打开连接达到限制
  38. if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
  39. req := make(chan connRequest, 1) // 初始化一个管道
  40. reqKey := db.nextRequestKeyLocked()
  41. db.connRequests[reqKey] = req // 加入connRequests连接请求
  42. db.mu.Unlock()
  43. select {
  44. case <-ctx.Done(): //context作为定时器
  45. db.mu.Lock()
  46. delete(db.connRequests, reqKey) // 移除连接请求管道
  47. db.mu.Unlock()
  48. select {
  49. default:
  50. case ret, ok := <-req: // 连接请求传来connRequest
  51. if ok {
  52. db.putConn(ret.conn, ret.err, false) // 放回空闲池
  53. }
  54. }
  55. return nil, ctx.Err()
  56. case ret, ok := <-req: // 满足连接请求
  57. if !ok {
  58. return nil, errDBClosed
  59. }
  60. if ret.err == nil && ret.conn.expired(lifetime) { // 检查连接是否过期
  61. ret.conn.Close()
  62. return nil, driver.ErrBadConn
  63. }
  64. if ret.conn == nil { // 可能传来conn==nil
  65. return nil, ret.err
  66. }
  67. ret.conn.Lock()
  68. err := ret.conn.lastErr
  69. ret.conn.Unlock()
  70. if err == driver.ErrBadConn {
  71. ret.conn.Close()
  72. return nil, driver.ErrBadConn
  73. }
  74. return ret.conn, ret.err
  75. }
  76. }
  77. db.numOpen++ // 乐观地
  78. db.mu.Unlock()
  79. ci, err := db.connector.Connect(ctx)
  80. if err != nil {
  81. db.mu.Lock()
  82. db.numOpen-- // 纠正之前的乐观
  83. db.maybeOpenNewConnections()
  84. db.mu.Unlock()
  85. return nil, err
  86. }
  87. db.mu.Lock()
  88. dc := &driverConn{ // 创建driverConn实例
  89. db: db,
  90. createdAt: nowFunc(),
  91. ci: ci,
  92. inUse: true,
  93. }
  94. db.addDepLocked(dc, dc) // 添加driverConn与自身的依赖
  95. db.mu.Unlock()
  96. return dc, nil
  97. }

依赖

DB.dep map[finalCloser]depSet

driverConn与自身依赖
不属于事物的Stmt与自身依赖
不属于事物的Stmt与子Stmt,具有依赖关系
Stmt与查询得到的Rows,具有依赖关系

finalCloser被interface{}依赖,直到fianlCloser的interface{}全部移除,fianlCloser才自动调用finalClose方法最终关闭。
SQL - 图2

DB查询

SQL - 图3

策略

按不同的策略进行多次查询,直到成功

  1. func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
  2. var rows *Rows
  3. var err error
  4. for i := 0; i < maxBadConnRetries; i++ { // 前maxBadConnRetries次使用cachedOrNewConn策略
  5. rows, err = db.query(ctx, query, args, cachedOrNewConn)
  6. if err != driver.ErrBadConn {
  7. break
  8. }
  9. }
  10. if err == driver.ErrBadConn { // 循环一直返回ErrBadConn,改用alwaysNewConn策略
  11. return db.query(ctx, query, args, alwaysNewConn)
  12. }
  13. return rows, err
  14. }

先从DB获取连接

  1. func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
  2. dc, err := db.conn(ctx, strategy) // 获取一个连接
  3. if err != nil {
  4. return nil, err
  5. }
  6. return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args) // 使用连接查询
  7. }

执行代码

  1. func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
  2. queryerCtx, ok := dc.ci.(driver.QueryerContext) // ci是否有QueryerContext方法
  3. var queryer driver.Queryer
  4. if !ok {
  5. queryer, ok = dc.ci.(driver.Queryer) // ci是否有Query方法
  6. }
  7. if ok {
  8. var nvdargs []driver.NamedValue
  9. var rowsi driver.Rows
  10. var err error
  11. withLock(dc, func() {
  12. nvdargs, err = driverArgsConnLocked(dc.ci, nil, args) // 参数转换
  13. if err != nil {
  14. return
  15. }
  16. rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs) //查询,得到driver.Rows
  17. })
  18. if err != driver.ErrSkip {
  19. if err != nil {
  20. releaseConn(err)
  21. return nil, err
  22. }
  23. rows := &Rows{ // 创建sql.Rows实例
  24. dc: dc,
  25. releaseConn: releaseConn,
  26. rowsi: rowsi,
  27. }
  28. rows.initContextClose(ctx, txctx) // 等到DB结束或事物结束,Rows才关闭
  29. return rows, nil
  30. }
  31. }
  32. var si driver.Stmt
  33. var err error
  34. withLock(dc, func() { // 调用driver实现的prepare方法,得到driver.Stmt
  35. si, err = ctxDriverPrepare(ctx, dc.ci, query)
  36. })
  37. if err != nil {
  38. releaseConn(err)
  39. return nil, err
  40. }
  41. ds := &driverStmt{Locker: dc, si: si} // 组合driverConn与driver.Stmt得到driverStmt实例
  42. rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...) // 得到driver.Rows
  43. if err != nil {
  44. ds.Close()
  45. releaseConn(err)
  46. return nil, err
  47. }
  48. rows := &Rows{ // driver.Conn传递给了sql.Rows,使用releaseConn释放连接
  49. dc: dc,
  50. releaseConn: releaseConn,
  51. rowsi: rowsi,
  52. closeStmt: ds,
  53. }
  54. rows.initContextClose(ctx, txctx)
  55. return rows, nil
  56. }

DB的exec

策略

  1. func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
  2. var res Result
  3. var err error
  4. for i := 0; i < maxBadConnRetries; i++ {
  5. res, err = db.exec(ctx, query, args, cachedOrNewConn) // 与QueryContext代码结构相同,使用exec
  6. if err != driver.ErrBadConn {
  7. break
  8. }
  9. }
  10. if err == driver.ErrBadConn {
  11. return db.exec(ctx, query, args, alwaysNewConn)
  12. }
  13. return res, err
  14. }

先从DB获取连接

  1. func (db *DB) exec(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (Result, error) {
  2. dc, err := db.conn(ctx, strategy) // 获得连接
  3. if err != nil {
  4. return nil, err
  5. }
  6. return db.execDC(ctx, dc, dc.releaseConn, query, args) // 使用连接进行exec
  7. }

执行代码

  1. func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []interface{}) (res Result, err error) {
  2. defer func() { // 此处与queryDC不同,queryDC的连接传给Rows,由Rows释放。
  3. release(err)
  4. }()
  5. execerCtx, ok := dc.ci.(driver.ExecerContext)
  6. var execer driver.Execer
  7. if !ok {
  8. execer, ok = dc.ci.(driver.Execer)
  9. }
  10. if ok {
  11. var nvdargs []driver.NamedValue
  12. var resi driver.Result
  13. withLock(dc, func() {
  14. nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
  15. if err != nil {
  16. return
  17. }
  18. resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs)
  19. })
  20. if err != driver.ErrSkip {
  21. if err != nil {
  22. return nil, err
  23. }
  24. return driverResult{dc, resi}, nil
  25. }
  26. }
  27. var si driver.Stmt
  28. withLock(dc, func() {
  29. si, err = ctxDriverPrepare(ctx, dc.ci, query)
  30. })
  31. if err != nil {
  32. return nil, err
  33. }
  34. ds := &driverStmt{Locker: dc, si: si}
  35. defer ds.Close() // 此处也与queryDC不同,查询时driverStmt等Rows关闭再关闭
  36. return resultFromStatement(ctx, dc.ci, ds, args...)
  37. }

事务

结构定义

  1. type Tx struct {
  2. db *DB
  3. closemu sync.RWMutex // 关闭事物加互斥锁,使用连接进行查询等操作加读锁。防止操作过程中事物被关掉
  4. dc *driverConn // 由该事物独享,提交或回滚时才使用putConn返还给DB
  5. txi driver.Tx
  6. releaseConn func(error)
  7. // 提交或者回滚时从0->1,所有查询等操作将失败并返回ErrTxDone。使用原子操作检查该值
  8. done int32
  9. // 记录该事物上准备的sql.Stmt,提交或回滚时会关闭全部
  10. stmts struct {
  11. sync.Mutex
  12. v []*Stmt
  13. }
  14. cancel func()
  15. ctx context.Context
  16. }

结构图:
SQL - 图4

开始事务

流程图:
SQL - 图5

与前面 DB 查询、DB 的 exec 的代码结构相同:策略选择,从 DB 得到一个连接,然后执行代码。

执行代码:

  1. func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
  2. var txi driver.Tx
  3. withLock(dc, func() {
  4. txi, err = ctxDriverBegin(ctx, opts, dc.ci) // 获取driver.Tx
  5. })
  6. if err != nil {
  7. release(err) // driver.Tx获取失败,释放连接
  8. return nil, err
  9. }
  10. ctx, cancel := context.WithCancel(ctx) // 为该事物分配一个子context
  11. tx = &Tx{ // 创建事物(sql.Tx)实例
  12. db: db,
  13. dc: dc,
  14. releaseConn: release,
  15. txi: txi,
  16. cancel: cancel,
  17. ctx: ctx,
  18. }
  19. go tx.awaitDone() // 等待事物结束
  20. return tx, nil
  21. }

事务查询

  • 取事务唯一的连接

执行代码:

  1. func (tx *Tx) grabConn(ctx context.Context) (*driverConn, releaseConn, error) {
  2. select {
  3. default:
  4. case <-ctx.Done():
  5. return nil, nil, ctx.Err()
  6. }
  7. //使用这个连接过程中加锁,是为了防止查询或者exec过程中事物关闭
  8. tx.closemu.RLock()
  9. if tx.isDone() {
  10. tx.closemu.RUnlock()
  11. return nil, nil, ErrTxDone
  12. }
  13. if hookTxGrabConn != nil { // test hook
  14. hookTxGrabConn()
  15. }
  16. return tx.dc, tx.closemuRUnlockRelease, nil // 返回的releaseConn不是把连接放回DB,而是解开锁,保证操作执行完事物才关闭
  17. }
  • 用此连接,调用 DB 的 queryDC 方法。

执行代码:

  1. func (tx *Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
  2. dc, release, err := tx.grabConn(ctx) // 事物唯一的连接
  3. if err != nil {
  4. return nil, err
  5. }
  6. return tx.db.queryDC(ctx, tx.ctx, dc, release, query, args) // 使用这个连接进行查询
  7. }

事务继承Stmt

事务可以继承已经存在的 Stmt(该Stmt不属于事务或 sql.Conn),得到一个子 Stmt。好处是子 Stmt 的 query 内容与父 Stmt 的相同。

  1. // 为该事物创建一个Stmt,且此Stmt继承自一个已经存在的Stmt,两者的query字段相同
  2. func (tx *Tx) StmtContext(ctx context.Context, stmt *Stmt) *Stmt {
  3. dc, release, err := tx.grabConn(ctx)
  4. if err != nil {
  5. return &Stmt{stickyErr: err}
  6. }
  7. defer release(nil) // 返回后解开连接的读锁
  8. if tx.db != stmt.db {
  9. return &Stmt{stickyErr: errors.New("sql: Tx.Stmt: statement from different database used")}
  10. }
  11. var si driver.Stmt
  12. var parentStmt *Stmt
  13. stmt.mu.Lock()
  14. if stmt.closed || stmt.cg != nil { // Stmt已经关闭或者属于某个事物,需要重新准备driver.Stmt,也没有父子Stmt的关系
  15. stmt.mu.Unlock()
  16. withLock(dc, func() {
  17. si, err = ctxDriverPrepare(ctx, dc.ci, stmt.query)
  18. })
  19. if err != nil {
  20. return &Stmt{stickyErr: err}
  21. }
  22. } else {
  23. stmt.removeClosedStmtLocked() // 移除该Stmt.css中driverConn已关闭的记录
  24. for _, v := range stmt.css { // 找一下有没有在这个driverConn上准备的driverStmt记录
  25. if v.dc == dc {
  26. si = v.ds.si
  27. break
  28. }
  29. }
  30. stmt.mu.Unlock()
  31. if si == nil { // 没找到在这个driverConn上已经准备好的driverStmt
  32. var ds *driverStmt
  33. withLock(dc, func() {
  34. ds, err = stmt.prepareOnConnLocked(ctx, dc) // 通过driverConn的prepare方法得到driverStmt,并将这个connStmt{dc, ds}记录到stmt.css
  35. })
  36. if err != nil {
  37. return &Stmt{stickyErr: err}
  38. }
  39. si = ds.si
  40. }
  41. parentStmt = stmt
  42. }
  43. txs := &Stmt{ // 创建Stmt实例
  44. db: tx.db,
  45. cg: tx,
  46. cgds: &driverStmt{
  47. Locker: dc,
  48. si: si,
  49. },
  50. parentStmt: parentStmt,
  51. query: stmt.query,
  52. }
  53. if parentStmt != nil {
  54. tx.db.addDep(parentStmt, txs) // parentStmt一对多Stmt,记入DB依赖记录
  55. }
  56. tx.stmts.Lock()
  57. tx.stmts.v = append(tx.stmts.v, txs) // tx记录所有属于它的Stmt
  58. tx.stmts.Unlock()
  59. return txs
  60. }

提交

1.调用接口,让数据库驱动完成commit
2.tx.closePrepared 关闭属于事务的所有Stmt
3.tx.close 释放连接并把dc、txi字段设置为nil

  1. func (tx *Tx) Commit() error {
  2. // 首先检查context,是为了保证tx.done与COMMIT操作的一致性。
  3. // 若改变次序,则可能tx.done变化后,DB结束,context也结束,事物未提交。
  4. select {
  5. default:
  6. case <-tx.ctx.Done():
  7. if atomic.LoadInt32(&tx.done) == 1 {
  8. return ErrTxDone
  9. }
  10. return tx.ctx.Err()
  11. }
  12. if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
  13. return ErrTxDone
  14. }
  15. var err error
  16. withLock(tx.dc, func() {
  17. err = tx.txi.Commit() // 调用驱动接口的Commit
  18. })
  19. if err != driver.ErrBadConn { // 如果连接没有问题,则应该关闭该连接上准备的所有Stmt
  20. tx.closePrepared()
  21. }
  22. tx.close(err)
  23. return err
  24. }

回滚

正常的回滚是不会丢弃driverConn的,若是DB关闭造成context结束从而事务context结束,就会丢弃driverConn。这是通过函数awaitDone实现的,awaitDone会一直阻塞,直到context结束,开始回滚,且discardConn为true。

  1. func (tx *Tx) rollback(discardConn bool) error {
  2. if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) { // 事物状态改成完成
  3. return ErrTxDone
  4. }
  5. var err error
  6. withLock(tx.dc, func() {
  7. err = tx.txi.Rollback() // 调用驱动接口,进行回滚
  8. })
  9. if err != driver.ErrBadConn {
  10. tx.closePrepared() // 关闭事物的所有Stmt
  11. }
  12. if discardConn { // 选择是否丢弃连接
  13. err = driver.ErrBadConn
  14. }
  15. tx.close(err) // 释放事物使用的连接
  16. return err
  17. }

�Stmt

结构体

Stmt可以安全地被并发使用

  1. type Stmt struct {
  2. db *DB
  3. query string
  4. stickyErr error // 如果不为空,所有操作将返回这个Err
  5. closemu sync.RWMutex
  6. // 如果cg==nil,Stmt会在DB中随机抢一个连接。然后检查css决定是否还有在连接上prepare
  7. cg stmtConnGrabber // 若属于一个事物,则cg!=nil
  8. cgds *driverStmt
  9. parentStmt *Stmt
  10. mu sync.Mutex // 保护下面的字段
  11. closed bool
  12. css []connStmt
  13. lastNumClosed uint64
  14. }

Stmt两种结构图:cg==nil 与 cg!=nil
SQL - 图6

�生成Stmt

生成流程图:
SQL - 图7

生成代码:
获取连接的过程的代码结构与DB查询、DB的exec、DB开始事务一致

  1. func (db *DB) prepareDC(ctx context.Context, dc *driverConn, release func(error), cg stmtConnGrabber, query string) (*Stmt, error) {
  2. var ds *driverStmt
  3. var err error
  4. defer func() {
  5. release(err) // 得到Stmt后释放连接
  6. }()
  7. withLock(dc, func() {
  8. ds, err = dc.prepareLocked(ctx, cg, query) // 得到driverStmt
  9. })
  10. if err != nil {
  11. return nil, err
  12. }
  13. stmt := &Stmt{ // 创建Stmt实例
  14. db: db,
  15. query: query,
  16. cg: cg,
  17. cgds: ds,
  18. }
  19. // 当cg == nil,即该Stmt不属于某个事物。则需要记录driverStmt与对应建立他们的driverConn
  20. if cg == nil {
  21. stmt.css = []connStmt{{dc, ds}}
  22. stmt.lastNumClosed = atomic.LoadUint64(&db.numClosed)
  23. db.addDep(stmt, stmt) // stmt创建需要在DB中记录依赖
  24. }
  25. return stmt, nil
  26. }

Stmt查询

Stmt查询流程图:
SQL - 图8

查询参数可以是任何占位符。
主要步骤是:
1.得到一个dc与在该dc上建立的ds。
2.查询得到driver.Rows。
3.封装成sql.Rows。
4.添加Stmt与Rows依赖关系。
5.开启goroutine等Rows结束

Stmt获取连接及在该dc上建立的ds

如果Stmt.cg !=nil ,则通过grabConn得到dc,Stmt.cgds作为ds返回
如果Stmt.cg ==nil : 先清理Stmt.css,然后向DB随机要到一个连接,在css中查找该连接是否储存在其中,若是则避免在连接上准备,直接返回连接与其ds。不幸运地,css当中没找到这个连接,需要在连接上准备,然后在css中记录,最后返回dc与ds。

代码:

  1. func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) {
  2. if err = s.stickyErr; err != nil { //Stmt增删查改操作都会调用connStmt,若stickyErr != nil所有操作都返回err
  3. return
  4. }
  5. s.mu.Lock()
  6. if s.closed {
  7. s.mu.Unlock()
  8. err = errors.New("sql: statement is closed")
  9. return
  10. }
  11. // 在事物当中,只使用建立 Stmt 的这一个连接
  12. if s.cg != nil {
  13. s.mu.Unlock()
  14. dc, releaseConn, err = s.cg.grabConn(ctx) // 由Tx或Conn实现
  15. if err != nil {
  16. return
  17. }
  18. return dc, releaseConn, s.cgds, nil // s.cgds是在dc上准备的
  19. }
  20. // 当cg == nil 执行下面语句
  21. s.removeClosedStmtLocked() // 移除Stmt.css中driverConn关闭的connStmt
  22. s.mu.Unlock()
  23. dc, err = s.db.conn(ctx, strategy) // 向db要连接
  24. if err != nil {
  25. return nil, nil, nil, err
  26. }
  27. s.mu.Lock()
  28. for _, v := range s.css {
  29. if v.dc == dc { // 幸运情况,如果向DB随机要的dc恰好存储在css中,避免再准备
  30. s.mu.Unlock()
  31. return dc, dc.releaseConn, v.ds, nil
  32. }
  33. }
  34. s.mu.Unlock()
  35. // 不幸运情况,需要在连接上准备ds。会在当前Stmt.css中记录一个connStmt
  36. withLock(dc, func() {
  37. ds, err = s.prepareOnConnLocked(ctx, dc)
  38. })
  39. if err != nil {
  40. dc.releaseConn(err) // 将连接返回给DB
  41. return nil, nil, nil, err
  42. }
  43. return dc, dc.releaseConn, ds, nil
  44. }

查询代码:

  1. func (s *Stmt) QueryContext(ctx context.Context, args ...interface{}) (*Rows, error) {
  2. s.closemu.RLock()
  3. defer s.closemu.RUnlock()
  4. var rowsi driver.Rows
  5. strategy := cachedOrNewConn
  6. for i := 0; i < maxBadConnRetries+1; i++ {
  7. if i == maxBadConnRetries { // 若到了最后一次,策略使用alwaysNewConn
  8. strategy = alwaysNewConn
  9. }
  10. dc, releaseConn, ds, err := s.connStmt(ctx, strategy) // 得到ds与对应dc
  11. if err != nil {
  12. if err == driver.ErrBadConn {
  13. continue
  14. }
  15. return nil, err
  16. }
  17. rowsi, err = rowsiFromStatement(ctx, dc.ci, ds, args...) // 通过driverStmt得到driver.Rows
  18. if err == nil {
  19. rows := &Rows{ // 把连接传给Rows
  20. dc: dc,
  21. rowsi: rowsi,
  22. }
  23. // 添加依赖要在initContextClose前面,否则可能在添加之前移除
  24. s.db.addDep(s, rows)
  25. // 释放连接的设定也要在initContextClose前面,否则可能在设定好之前调用它
  26. rows.releaseConn = func(err error) { // 释放连接,同时删除Stmt与Rows依赖
  27. releaseConn(err)
  28. s.db.removeDep(s, rows)
  29. }
  30. var txctx context.Context
  31. if s.cg != nil { // 若属于一个事物,则txctx!=nil
  32. txctx = s.cg.txCtx()
  33. }
  34. rows.initContextClose(ctx, txctx)
  35. return rows, nil
  36. }
  37. releaseConn(err)
  38. if err != driver.ErrBadConn {
  39. return nil, err
  40. }
  41. }
  42. return nil, driver.ErrBadConn
  43. }

Stmt的exec过程与查询过程类似,只是返回的不是Rows而是Result。