驱动
全局变量
package sql
var drivers = make(map[string]driver.Driver)�
驱动的接口
数据库驱动必须实现这个接口,Driver可以通过name得到一个数据库的连接(实现Conn接口)。通过这个Conn可以对数据库进行各种操作。name的格式由使用的驱动种类决定。
type Driver interface {
Open(name string) (Conn, error)
}
DB
概览
结构体
type DB struct {
connector driver.Connector// 用于获取driver.Conn 可以由驱动层实现,否则用sql.dsnConnector
numClosed uint64 // 是一个原子计数器,代表总的关闭连接数量
mu sync.Mutex
freeConn []*driverConn //空闲连接池
connRequests map[uint64]chan connRequest // 无可用连接时,处于 Pending 状态的连接请求
nextRequest uint64
numOpen int // 打开和准备打开的连接总数
openerCh chan struct{} // 用来传信号的管道 表示需要多少新连接
resetterCh chan *driverConn // 用来传需要重置 Session 的 driverConn
closed bool
dep map[finalCloser]depSet // 依赖记录
lastPut map[*driverConn]string
maxIdle int
maxOpen int
maxLifetime time.Duration // 连接的生命后期
cleanerCh chan struct{} // 传信号 表示需要清理freeConn空闲池中已经关掉的driverConn
stop func()
}
连接数据库
连接数据库需要connector,如果数据库驱动实现了DriverContext接口则可以通过OpenConnector方法获得一个connector(数据库驱动自定义)。如果没有,那么sql包提供了一个实现此接口的dsnConnector结构体。
使用dsnConnector每获取一个Conn则驱动就要解析一次name,并无权使用context。
使用数据库驱动自定义的connector则驱动只解析一次name,并有权使用context。
func Open(driverName, dataSourceName string) (*DB, error) {
driversMu.RLock()
driveri, ok := drivers[driverName]
driversMu.RUnlock()
if !ok {
return nil, fmt.Errorf("sql: unknown driver %q (forgotten import?)", driverName)
}
if driverCtx, ok := driveri.(driver.DriverContext); ok {
connector, err := driverCtx.OpenConnector(dataSourceName)
if err != nil {
return nil, err
}
return OpenDB(connector), nil
}
return OpenDB(dsnConnector{dsn: dataSourceName, driver: driveri}), nil
}
driverContext定义:
type DriverContext interface {
OpenConnector(name string) (Connector, error)
}
执行部分:
func OpenDB(c driver.Connector) *DB {
ctx, cancel := context.WithCancel(context.Background())
db := &DB{
connector: c,
openerCh: make(chan struct{}, connectionRequestQueueSize),
resetterCh: make(chan *driverConn, 50),
lastPut: make(map[*driverConn]string),
connRequests: make(map[uint64]chan connRequest),
stop: cancel,
}
go db.connectionOpener(ctx) // 新建连接的goroutine,db.openerCh传来东西就为DB创建一个连接
go db.connectionResetter(ctx) // 重置session的goroutine
return db
}
DB连接管理
如果DB刚刚打开,此时没有连接创建,需要时调用DB.conn函数开始创建连接。任何时候若有连接创建失败,或者连接关闭,会自动调用maybeOpenNewConnections,发送尽可能多的新建信号,收到后openNewConnection就会新建连接直到DB连接数量达到饱和。建立的连接会被putConnDBLocked拿去优先满足connRequest请求,其次放入freeConn。
监听openerCh管道发来的新建连接信号
DB打开时就为此函数开始一个goroutine,当openerCh传来一个struct{}{},就新建一个连接。任何时候连接创建失败、连接关闭,就会调用openNewConnection,然后connectionOpener才会收到信号。
func (db *DB) connectionOpener(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-db.openerCh:
db.openNewConnection(ctx)
}
}
}
新建一个新连接
func (db *DB) openNewConnection(ctx context.Context) {
ci, err := db.connector.Connect(ctx) // 建立一个driver.Conn
db.mu.Lock()
defer db.mu.Unlock()
if db.closed {
if err == nil { // DB已关闭,但driver.Conn建立成功
ci.Close()
}
db.numOpen-- // maybeOpenNewConnctions往db.openerCh发送信号前已经执行db.numOpen++,此处减回去
return
}
if err != nil { // driver.Conn建立失败
db.numOpen--
db.putConnDBLocked(nil, err)
db.maybeOpenNewConnections() // 发送尽可能多的新建连接信号
return
}
dc := &driverConn{ // 创建driverConn实例
db: db,
createdAt: nowFunc(),
ci: ci,
}
if db.putConnDBLocked(dc, err) { // 把driverConn放入DB中
db.addDepLocked(dc, dc) // 添加driverConn依赖记录
} else {
db.numOpen--
ci.Close()
}
}
把连接放入DB
优先用来满足一个connRequest,若无请求,则放入空闲池
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
if db.maxOpen > 0 && db.numOpen > db.maxOpen { // 连接已达到上限
return false
}
if c := len(db.connRequests); c > 0 { // 即使dc==nil也传进connRequest,因为后面管道接收时会判断
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests { // 取一个connRequest管道
break
}
delete(db.connRequests, reqKey) // 删除该连接请求
if err == nil {
dc.inUse = true
}
req <- connRequest{ // 满足这个connRequest管道的请求
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed && db.maxIdleConnsLocked() > len(db.freeConn) { //dc==nil不能放入freeConn
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked() // 只会开启一次,开启后会直到DB.closed||DB.numOpen==0才关闭
return true
}
return false
}
发送尽可能多的新建连接的信号
func (db *DB) maybeOpenNewConnections() {
numRequests := len(db.connRequests)
if db.maxOpen > 0 {
numCanOpen := db.maxOpen - db.numOpen
if numRequests > numCanOpen {
numRequests = numCanOpen // 剩余多少空间,就发送多少信号
}
}
for numRequests > 0 {
db.numOpen++ // 乐观地加1,后面若创建失败会减去
numRequests--
if db.closed {
return
}
db.openerCh <- struct{}{} // 发送信号
}
}
从DB获取连接(driverConn)
如果可能,优先选择空闲池的连接。
如果空闲池空了,且打开连接达到上限,那么创建一个connRequest管道加入等待map,当有连接放回DB会封装成connRequest传入管道。
如果空闲池空了,连接没达到上限,使用DB.connector创建一个连接(driver.Conn),封装成driverConn,并返回。
代码:
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
// 检查context是否过期
select {
default:
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
}
lifetime := db.maxLifetime
// 如果可能,优先选择空闲池的连接
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0] // 取第一个连接
copy(db.freeConn, db.freeConn[1:]) // 从空闲池删除此连接
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true // 重置inUse
db.mu.Unlock()
if conn.expired(lifetime) { // 连接过期则关闭该连接,并返回一个ErrBadConn
conn.Close()
return nil, driver.ErrBadConn
}
// 锁会使检查lastErr操作等到resetSession执行完后再执行,lastErr记录resetSession产生的错误
conn.Lock()
err := conn.lastErr
conn.Unlock()
if err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
// 如果空闲池空了,且打开连接达到限制
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
req := make(chan connRequest, 1) // 初始化一个管道
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req // 加入connRequests连接请求
db.mu.Unlock()
select {
case <-ctx.Done(): //context作为定时器
db.mu.Lock()
delete(db.connRequests, reqKey) // 移除连接请求管道
db.mu.Unlock()
select {
default:
case ret, ok := <-req: // 连接请求传来connRequest
if ok {
db.putConn(ret.conn, ret.err, false) // 放回空闲池
}
}
return nil, ctx.Err()
case ret, ok := <-req: // 满足连接请求
if !ok {
return nil, errDBClosed
}
if ret.err == nil && ret.conn.expired(lifetime) { // 检查连接是否过期
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil { // 可能传来conn==nil
return nil, ret.err
}
ret.conn.Lock()
err := ret.conn.lastErr
ret.conn.Unlock()
if err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
}
db.numOpen++ // 乐观地
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
db.numOpen-- // 纠正之前的乐观
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{ // 创建driverConn实例
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc) // 添加driverConn与自身的依赖
db.mu.Unlock()
return dc, nil
}
依赖
DB.dep map[finalCloser]depSet
driverConn与自身依赖
不属于事物的Stmt与自身依赖
不属于事物的Stmt与子Stmt,具有依赖关系
Stmt与查询得到的Rows,具有依赖关系
finalCloser被interface{}依赖,直到fianlCloser的interface{}全部移除,fianlCloser才自动调用finalClose方法最终关闭。
DB查询
策略
按不同的策略进行多次查询,直到成功
func (db *DB) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
var rows *Rows
var err error
for i := 0; i < maxBadConnRetries; i++ { // 前maxBadConnRetries次使用cachedOrNewConn策略
rows, err = db.query(ctx, query, args, cachedOrNewConn)
if err != driver.ErrBadConn {
break
}
}
if err == driver.ErrBadConn { // 循环一直返回ErrBadConn,改用alwaysNewConn策略
return db.query(ctx, query, args, alwaysNewConn)
}
return rows, err
}
先从DB获取连接
func (db *DB) query(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (*Rows, error) {
dc, err := db.conn(ctx, strategy) // 获取一个连接
if err != nil {
return nil, err
}
return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args) // 使用连接查询
}
执行代码
func (db *DB) queryDC(ctx, txctx context.Context, dc *driverConn, releaseConn func(error), query string, args []interface{}) (*Rows, error) {
queryerCtx, ok := dc.ci.(driver.QueryerContext) // ci是否有QueryerContext方法
var queryer driver.Queryer
if !ok {
queryer, ok = dc.ci.(driver.Queryer) // ci是否有Query方法
}
if ok {
var nvdargs []driver.NamedValue
var rowsi driver.Rows
var err error
withLock(dc, func() {
nvdargs, err = driverArgsConnLocked(dc.ci, nil, args) // 参数转换
if err != nil {
return
}
rowsi, err = ctxDriverQuery(ctx, queryerCtx, queryer, query, nvdargs) //查询,得到driver.Rows
})
if err != driver.ErrSkip {
if err != nil {
releaseConn(err)
return nil, err
}
rows := &Rows{ // 创建sql.Rows实例
dc: dc,
releaseConn: releaseConn,
rowsi: rowsi,
}
rows.initContextClose(ctx, txctx) // 等到DB结束或事物结束,Rows才关闭
return rows, nil
}
}
var si driver.Stmt
var err error
withLock(dc, func() { // 调用driver实现的prepare方法,得到driver.Stmt
si, err = ctxDriverPrepare(ctx, dc.ci, query)
})
if err != nil {
releaseConn(err)
return nil, err
}
ds := &driverStmt{Locker: dc, si: si} // 组合driverConn与driver.Stmt得到driverStmt实例
rowsi, err := rowsiFromStatement(ctx, dc.ci, ds, args...) // 得到driver.Rows
if err != nil {
ds.Close()
releaseConn(err)
return nil, err
}
rows := &Rows{ // driver.Conn传递给了sql.Rows,使用releaseConn释放连接
dc: dc,
releaseConn: releaseConn,
rowsi: rowsi,
closeStmt: ds,
}
rows.initContextClose(ctx, txctx)
return rows, nil
}
DB的exec
策略
func (db *DB) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) {
var res Result
var err error
for i := 0; i < maxBadConnRetries; i++ {
res, err = db.exec(ctx, query, args, cachedOrNewConn) // 与QueryContext代码结构相同,使用exec
if err != driver.ErrBadConn {
break
}
}
if err == driver.ErrBadConn {
return db.exec(ctx, query, args, alwaysNewConn)
}
return res, err
}
先从DB获取连接
func (db *DB) exec(ctx context.Context, query string, args []interface{}, strategy connReuseStrategy) (Result, error) {
dc, err := db.conn(ctx, strategy) // 获得连接
if err != nil {
return nil, err
}
return db.execDC(ctx, dc, dc.releaseConn, query, args) // 使用连接进行exec
}
执行代码
func (db *DB) execDC(ctx context.Context, dc *driverConn, release func(error), query string, args []interface{}) (res Result, err error) {
defer func() { // 此处与queryDC不同,queryDC的连接传给Rows,由Rows释放。
release(err)
}()
execerCtx, ok := dc.ci.(driver.ExecerContext)
var execer driver.Execer
if !ok {
execer, ok = dc.ci.(driver.Execer)
}
if ok {
var nvdargs []driver.NamedValue
var resi driver.Result
withLock(dc, func() {
nvdargs, err = driverArgsConnLocked(dc.ci, nil, args)
if err != nil {
return
}
resi, err = ctxDriverExec(ctx, execerCtx, execer, query, nvdargs)
})
if err != driver.ErrSkip {
if err != nil {
return nil, err
}
return driverResult{dc, resi}, nil
}
}
var si driver.Stmt
withLock(dc, func() {
si, err = ctxDriverPrepare(ctx, dc.ci, query)
})
if err != nil {
return nil, err
}
ds := &driverStmt{Locker: dc, si: si}
defer ds.Close() // 此处也与queryDC不同,查询时driverStmt等Rows关闭再关闭
return resultFromStatement(ctx, dc.ci, ds, args...)
}
事务
结构定义
type Tx struct {
db *DB
closemu sync.RWMutex // 关闭事物加互斥锁,使用连接进行查询等操作加读锁。防止操作过程中事物被关掉
dc *driverConn // 由该事物独享,提交或回滚时才使用putConn返还给DB
txi driver.Tx
releaseConn func(error)
// 提交或者回滚时从0->1,所有查询等操作将失败并返回ErrTxDone。使用原子操作检查该值
done int32
// 记录该事物上准备的sql.Stmt,提交或回滚时会关闭全部
stmts struct {
sync.Mutex
v []*Stmt
}
cancel func()
ctx context.Context
}
结构图:
开始事务
流程图:
与前面 DB 查询、DB 的 exec 的代码结构相同:策略选择,从 DB 得到一个连接,然后执行代码。
执行代码:
func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
var txi driver.Tx
withLock(dc, func() {
txi, err = ctxDriverBegin(ctx, opts, dc.ci) // 获取driver.Tx
})
if err != nil {
release(err) // driver.Tx获取失败,释放连接
return nil, err
}
ctx, cancel := context.WithCancel(ctx) // 为该事物分配一个子context
tx = &Tx{ // 创建事物(sql.Tx)实例
db: db,
dc: dc,
releaseConn: release,
txi: txi,
cancel: cancel,
ctx: ctx,
}
go tx.awaitDone() // 等待事物结束
return tx, nil
}
事务查询
- 取事务唯一的连接
执行代码:
func (tx *Tx) grabConn(ctx context.Context) (*driverConn, releaseConn, error) {
select {
default:
case <-ctx.Done():
return nil, nil, ctx.Err()
}
//使用这个连接过程中加锁,是为了防止查询或者exec过程中事物关闭
tx.closemu.RLock()
if tx.isDone() {
tx.closemu.RUnlock()
return nil, nil, ErrTxDone
}
if hookTxGrabConn != nil { // test hook
hookTxGrabConn()
}
return tx.dc, tx.closemuRUnlockRelease, nil // 返回的releaseConn不是把连接放回DB,而是解开锁,保证操作执行完事物才关闭
}
- 用此连接,调用 DB 的 queryDC 方法。
执行代码:
func (tx *Tx) QueryContext(ctx context.Context, query string, args ...interface{}) (*Rows, error) {
dc, release, err := tx.grabConn(ctx) // 事物唯一的连接
if err != nil {
return nil, err
}
return tx.db.queryDC(ctx, tx.ctx, dc, release, query, args) // 使用这个连接进行查询
}
事务继承Stmt
事务可以继承已经存在的 Stmt(该Stmt不属于事务或 sql.Conn),得到一个子 Stmt。好处是子 Stmt 的 query 内容与父 Stmt 的相同。
// 为该事物创建一个Stmt,且此Stmt继承自一个已经存在的Stmt,两者的query字段相同
func (tx *Tx) StmtContext(ctx context.Context, stmt *Stmt) *Stmt {
dc, release, err := tx.grabConn(ctx)
if err != nil {
return &Stmt{stickyErr: err}
}
defer release(nil) // 返回后解开连接的读锁
if tx.db != stmt.db {
return &Stmt{stickyErr: errors.New("sql: Tx.Stmt: statement from different database used")}
}
var si driver.Stmt
var parentStmt *Stmt
stmt.mu.Lock()
if stmt.closed || stmt.cg != nil { // Stmt已经关闭或者属于某个事物,需要重新准备driver.Stmt,也没有父子Stmt的关系
stmt.mu.Unlock()
withLock(dc, func() {
si, err = ctxDriverPrepare(ctx, dc.ci, stmt.query)
})
if err != nil {
return &Stmt{stickyErr: err}
}
} else {
stmt.removeClosedStmtLocked() // 移除该Stmt.css中driverConn已关闭的记录
for _, v := range stmt.css { // 找一下有没有在这个driverConn上准备的driverStmt记录
if v.dc == dc {
si = v.ds.si
break
}
}
stmt.mu.Unlock()
if si == nil { // 没找到在这个driverConn上已经准备好的driverStmt
var ds *driverStmt
withLock(dc, func() {
ds, err = stmt.prepareOnConnLocked(ctx, dc) // 通过driverConn的prepare方法得到driverStmt,并将这个connStmt{dc, ds}记录到stmt.css
})
if err != nil {
return &Stmt{stickyErr: err}
}
si = ds.si
}
parentStmt = stmt
}
txs := &Stmt{ // 创建Stmt实例
db: tx.db,
cg: tx,
cgds: &driverStmt{
Locker: dc,
si: si,
},
parentStmt: parentStmt,
query: stmt.query,
}
if parentStmt != nil {
tx.db.addDep(parentStmt, txs) // parentStmt一对多Stmt,记入DB依赖记录
}
tx.stmts.Lock()
tx.stmts.v = append(tx.stmts.v, txs) // tx记录所有属于它的Stmt
tx.stmts.Unlock()
return txs
}
提交
1.调用接口,让数据库驱动完成commit
2.tx.closePrepared 关闭属于事务的所有Stmt
3.tx.close 释放连接并把dc、txi字段设置为nil
func (tx *Tx) Commit() error {
// 首先检查context,是为了保证tx.done与COMMIT操作的一致性。
// 若改变次序,则可能tx.done变化后,DB结束,context也结束,事物未提交。
select {
default:
case <-tx.ctx.Done():
if atomic.LoadInt32(&tx.done) == 1 {
return ErrTxDone
}
return tx.ctx.Err()
}
if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) {
return ErrTxDone
}
var err error
withLock(tx.dc, func() {
err = tx.txi.Commit() // 调用驱动接口的Commit
})
if err != driver.ErrBadConn { // 如果连接没有问题,则应该关闭该连接上准备的所有Stmt
tx.closePrepared()
}
tx.close(err)
return err
}
回滚
正常的回滚是不会丢弃driverConn的,若是DB关闭造成context结束从而事务context结束,就会丢弃driverConn。这是通过函数awaitDone实现的,awaitDone会一直阻塞,直到context结束,开始回滚,且discardConn为true。
func (tx *Tx) rollback(discardConn bool) error {
if !atomic.CompareAndSwapInt32(&tx.done, 0, 1) { // 事物状态改成完成
return ErrTxDone
}
var err error
withLock(tx.dc, func() {
err = tx.txi.Rollback() // 调用驱动接口,进行回滚
})
if err != driver.ErrBadConn {
tx.closePrepared() // 关闭事物的所有Stmt
}
if discardConn { // 选择是否丢弃连接
err = driver.ErrBadConn
}
tx.close(err) // 释放事物使用的连接
return err
}
�Stmt
结构体
Stmt可以安全地被并发使用
type Stmt struct {
db *DB
query string
stickyErr error // 如果不为空,所有操作将返回这个Err
closemu sync.RWMutex
// 如果cg==nil,Stmt会在DB中随机抢一个连接。然后检查css决定是否还有在连接上prepare
cg stmtConnGrabber // 若属于一个事物,则cg!=nil
cgds *driverStmt
parentStmt *Stmt
mu sync.Mutex // 保护下面的字段
closed bool
css []connStmt
lastNumClosed uint64
}
Stmt两种结构图:cg==nil 与 cg!=nil
�生成Stmt
生成流程图:
生成代码:
获取连接的过程的代码结构与DB查询、DB的exec、DB开始事务一致
func (db *DB) prepareDC(ctx context.Context, dc *driverConn, release func(error), cg stmtConnGrabber, query string) (*Stmt, error) {
var ds *driverStmt
var err error
defer func() {
release(err) // 得到Stmt后释放连接
}()
withLock(dc, func() {
ds, err = dc.prepareLocked(ctx, cg, query) // 得到driverStmt
})
if err != nil {
return nil, err
}
stmt := &Stmt{ // 创建Stmt实例
db: db,
query: query,
cg: cg,
cgds: ds,
}
// 当cg == nil,即该Stmt不属于某个事物。则需要记录driverStmt与对应建立他们的driverConn
if cg == nil {
stmt.css = []connStmt{{dc, ds}}
stmt.lastNumClosed = atomic.LoadUint64(&db.numClosed)
db.addDep(stmt, stmt) // stmt创建需要在DB中记录依赖
}
return stmt, nil
}
Stmt查询
Stmt查询流程图:
查询参数可以是任何占位符。
主要步骤是:
1.得到一个dc与在该dc上建立的ds。
2.查询得到driver.Rows。
3.封装成sql.Rows。
4.添加Stmt与Rows依赖关系。
5.开启goroutine等Rows结束
Stmt获取连接及在该dc上建立的ds
如果Stmt.cg !=nil ,则通过grabConn得到dc,Stmt.cgds作为ds返回
如果Stmt.cg ==nil : 先清理Stmt.css,然后向DB随机要到一个连接,在css中查找该连接是否储存在其中,若是则避免在连接上准备,直接返回连接与其ds。不幸运地,css当中没找到这个连接,需要在连接上准备,然后在css中记录,最后返回dc与ds。
代码:
func (s *Stmt) connStmt(ctx context.Context, strategy connReuseStrategy) (dc *driverConn, releaseConn func(error), ds *driverStmt, err error) {
if err = s.stickyErr; err != nil { //Stmt增删查改操作都会调用connStmt,若stickyErr != nil所有操作都返回err
return
}
s.mu.Lock()
if s.closed {
s.mu.Unlock()
err = errors.New("sql: statement is closed")
return
}
// 在事物当中,只使用建立 Stmt 的这一个连接
if s.cg != nil {
s.mu.Unlock()
dc, releaseConn, err = s.cg.grabConn(ctx) // 由Tx或Conn实现
if err != nil {
return
}
return dc, releaseConn, s.cgds, nil // s.cgds是在dc上准备的
}
// 当cg == nil 执行下面语句
s.removeClosedStmtLocked() // 移除Stmt.css中driverConn关闭的connStmt
s.mu.Unlock()
dc, err = s.db.conn(ctx, strategy) // 向db要连接
if err != nil {
return nil, nil, nil, err
}
s.mu.Lock()
for _, v := range s.css {
if v.dc == dc { // 幸运情况,如果向DB随机要的dc恰好存储在css中,避免再准备
s.mu.Unlock()
return dc, dc.releaseConn, v.ds, nil
}
}
s.mu.Unlock()
// 不幸运情况,需要在连接上准备ds。会在当前Stmt.css中记录一个connStmt
withLock(dc, func() {
ds, err = s.prepareOnConnLocked(ctx, dc)
})
if err != nil {
dc.releaseConn(err) // 将连接返回给DB
return nil, nil, nil, err
}
return dc, dc.releaseConn, ds, nil
}
查询代码:
func (s *Stmt) QueryContext(ctx context.Context, args ...interface{}) (*Rows, error) {
s.closemu.RLock()
defer s.closemu.RUnlock()
var rowsi driver.Rows
strategy := cachedOrNewConn
for i := 0; i < maxBadConnRetries+1; i++ {
if i == maxBadConnRetries { // 若到了最后一次,策略使用alwaysNewConn
strategy = alwaysNewConn
}
dc, releaseConn, ds, err := s.connStmt(ctx, strategy) // 得到ds与对应dc
if err != nil {
if err == driver.ErrBadConn {
continue
}
return nil, err
}
rowsi, err = rowsiFromStatement(ctx, dc.ci, ds, args...) // 通过driverStmt得到driver.Rows
if err == nil {
rows := &Rows{ // 把连接传给Rows
dc: dc,
rowsi: rowsi,
}
// 添加依赖要在initContextClose前面,否则可能在添加之前移除
s.db.addDep(s, rows)
// 释放连接的设定也要在initContextClose前面,否则可能在设定好之前调用它
rows.releaseConn = func(err error) { // 释放连接,同时删除Stmt与Rows依赖
releaseConn(err)
s.db.removeDep(s, rows)
}
var txctx context.Context
if s.cg != nil { // 若属于一个事物,则txctx!=nil
txctx = s.cg.txCtx()
}
rows.initContextClose(ctx, txctx)
return rows, nil
}
releaseConn(err)
if err != driver.ErrBadConn {
return nil, err
}
}
return nil, driver.ErrBadConn
}
Stmt的exec过程与查询过程类似,只是返回的不是Rows而是Result。