使用方法:

在Vscode中,会有代码提示,很容易看懂

  1. import mysqldb
  2. db = mysqldb.connect_db(mysqldb.conn_confing)
  3. # 执行sql,会有提示
  4. db.query("update table1 set col='' ")
  5. # 获取多个结果,返回 dict
  6. rows = db.fetch_rows(table='table')

源代码 mysqldb.py

  1. from pymysql import (connect, cursors, err, escape_sequence)
  2. conn_confing = {
  3. 'host': 'localhost',
  4. 'port': '3306',
  5. 'user': 'root',
  6. 'password': 'root',
  7. 'db': 'your_db',
  8. 'charset': 'utf-8'
  9. }
  10. def connect_db(conn_confing):
  11. # msyql dababase connection info
  12. dbconn = MYSQL(
  13. dbhost=conn_confing.get('host'),
  14. dbport=conn_confing.get('port'),
  15. dbuser=conn_confing.get('user'),
  16. dbpwd=conn_confing.get('password'),
  17. dbname=conn_confing.get('db'),
  18. dbcharset=conn_confing.get('charset')
  19. )
  20. return dbconn
  21. def connect_ssdc(conn_confing):
  22. """Connect to the database return SSDictCursor dbsession"""
  23. connection = connect(
  24. host=conn_confing.get('host'),
  25. port=int(conn_confing.get('port')) or 3306,
  26. user=conn_confing.get('user'),
  27. password=conn_confing.get('password'),
  28. db=conn_confing.get('db'),
  29. charset=conn_confing.get('charset'),
  30. cursorclass=cursors.SSDictCursor)
  31. return connection
  32. class MYSQL:
  33. """A Friendly pymysql Class, Provide CRUD functionality"""
  34. def __init__(self, dbhost, dbuser, dbpwd, dbname, dbcharset='utf-8', dbport=3306,):
  35. self.dbhost = dbhost
  36. self.dbport = int(dbport)
  37. self.dbuser = dbuser
  38. self.dbpwd = dbpwd
  39. self.dbname = dbname
  40. self.dbcharset = dbcharset
  41. self.connection = self.session()
  42. def session(self):
  43. """Connect to the database return dbsession"""
  44. connection = connect(
  45. host=self.dbhost,
  46. port=self.dbport,
  47. user=self.dbuser,
  48. password=self.dbpwd,
  49. db=self.dbname,
  50. # charset=self.dbcharset,
  51. cursorclass=cursors.DictCursor)
  52. return connection
  53. def insert(self, table, data):
  54. """mysql insert() function"""
  55. with self.connection.cursor() as cursor:
  56. params = self.join_field_value(data)
  57. sql = "INSERT IGNORE INTO {table} SET {params}".format(
  58. table=table, params=params)
  59. # print(sql)
  60. try:
  61. cursor.execute(sql, tuple(data.values()))
  62. last_id = cursor.lastrowid
  63. self.connection.commit()
  64. except Exception as err:
  65. print(err)
  66. return last_id
  67. def bulk_insert(self, table, data):
  68. assert isinstance(data, list) and data != [], "data_format_error"
  69. with self.connection.cursor() as cursor:
  70. params = []
  71. for param in data:
  72. params.append(escape_sequence(param.values(), 'utf-8'))
  73. values = ', '.join(params)
  74. fields = ', '.join('`{}`'.format(x) for x in param.keys())
  75. sql = u"INSERT IGNORE INTO {table} ({fields}) VALUES {values}".format(
  76. fields=fields, table=table, values=values)
  77. cursor.execute(sql)
  78. last_id = cursor.lastrowid
  79. self.connection.commit()
  80. return last_id
  81. def delete(self, table, condition=None, limit=None):
  82. """
  83. mysql delete() function
  84. sql.PreparedStatement method
  85. """
  86. with self.connection.cursor() as cursor:
  87. prepared = []
  88. if not condition:
  89. where = '1'
  90. elif isinstance(condition, dict):
  91. where = self.join_field_value(condition, ' AND ')
  92. prepared.extend(condition.values())
  93. else:
  94. where = condition
  95. limits = "LIMIT {limit}".format(limit=limit) if limit else ""
  96. sql = "DELETE FROM {table} WHERE {where} {limits}".format(
  97. table=table, where=where, limits=limits)
  98. if not prepared:
  99. result = cursor.execute(sql)
  100. else:
  101. result = cursor.execute(sql, tuple(prepared))
  102. self.connection.commit()
  103. return result
  104. def update(self, table, data, condition=None):
  105. """
  106. mysql update() function
  107. Use sql.PreparedStatement method
  108. """
  109. with self.connection.cursor() as cursor:
  110. prepared = []
  111. params = self.join_field_value(data)
  112. prepared.extend(data.values())
  113. if not condition:
  114. where = '1'
  115. elif isinstance(condition, dict):
  116. where = self.join_field_value(condition, ' AND ')
  117. prepared.extend(condition.values())
  118. else:
  119. where = condition
  120. sql = "UPDATE IGNORE {table} SET {params} WHERE {where}".format(
  121. table=table, params=params, where=where)
  122. # check PreparedStatement
  123. if not prepared:
  124. result = cursor.execute(sql)
  125. else:
  126. result = cursor.execute(sql, tuple(prepared))
  127. self.connection.commit()
  128. return result
  129. def count(self, table, condition=None):
  130. """
  131. count database record
  132. Use sql.PreparedStatement method
  133. """
  134. with self.connection.cursor() as cursor:
  135. prepared = []
  136. if not condition:
  137. where = '1'
  138. elif isinstance(condition, dict):
  139. where = self.join_field_value(condition, ' AND ')
  140. prepared.extend(condition.values())
  141. else:
  142. where = condition
  143. sql = "SELECT COUNT(*) as cnt FROM {table} WHERE {where}".format(
  144. table=table, where=where)
  145. if not prepared:
  146. cursor.execute(sql)
  147. else:
  148. cursor.execute(sql, tuple(prepared))
  149. self.connection.commit()
  150. return cursor.fetchone().get('cnt')
  151. def fetch_rows(self, table, fields=None, condition=None, order=None, limit=None, fetchone=False):
  152. """
  153. mysql select() function
  154. Use sql.PreparedStatement method
  155. """
  156. with self.connection.cursor() as cursor:
  157. prepared = []
  158. if not fields:
  159. fields = '*'
  160. elif isinstance(fields, tuple) or isinstance(fields, list):
  161. fields = '`{0}`'.format('`, `'.join(fields))
  162. else:
  163. fields = fields
  164. if not condition:
  165. where = '1'
  166. elif isinstance(condition, dict):
  167. where = self.join_field_value(condition, ' AND ')
  168. prepared.extend(condition.values())
  169. else:
  170. where = condition
  171. if not order:
  172. orderby = ''
  173. else:
  174. orderby = 'ORDER BY {order}'.format(order=order)
  175. limits = "LIMIT {limit}".format(limit=limit) if limit else ""
  176. sql = "SELECT {fields} FROM {table} WHERE {where} {orderby} {limits}".format(
  177. fields=fields, table=table, where=where, orderby=orderby, limits=limits)
  178. if not prepared:
  179. cursor.execute(sql)
  180. else:
  181. cursor.execute(sql, tuple(prepared))
  182. self.connection.commit()
  183. return cursor.fetchone() if fetchone else cursor.fetchall()
  184. def query(self, sql, fetchone=False, execute=False):
  185. """execute custom sql query"""
  186. with self.connection.cursor() as cursor:
  187. cursor.execute(sql)
  188. self.connection.commit()
  189. if execute:
  190. return
  191. return cursor.fetchone() if fetchone else cursor.fetchall()
  192. def join_field_value(self, data, glue=', '):
  193. sql = comma = ''
  194. for key in data.keys():
  195. sql += "{}`{}` = %s".format(comma, key)
  196. comma = glue
  197. return sql
  198. def close(self):
  199. if getattr(self, 'connection', 0):
  200. return self.connection.close()
  201. def __del__(self):
  202. """close mysql database connection"""
  203. self.close()