官网总教程
所有代码基于sqlalchemy,官网有极其详细的英文教程
创建表
sqlalchemy官网教程:Column的写法
sqlalchemy官网教程:用ORM方式创建表
from sqlalchemy import create_enginefrom sqlalchemy import Column, Integer,String, DateTimefrom sqlalchemy.orm import declarative_baseBase = declarative_base()class Test1(Base):__tablename__ = "test1"id = Column(Integer, primary_key=True)name = Column(String, nullable=False)lastuse = Column("上次使用", DateTime)hash = Column(String(16), nullable=False)engine = create_engine("sqlite:///aa.sqlite3", echo=True)Test1.metadata.create_all(engine)
写入新记录
python官网:日期时间的模式字符串
pandas官网:sql相关快捷函数
pandas函数to_sql
pandas函数to_dict
sqlalchemy官网:批量insert
from sqlalchemy import select, update, insert# 注意这里的日期格式是sqlite标准日期格式,时分秒后面可以再加微秒,如12:31:54.800689。微秒的模式符为%fdata_init = pd.DataFrame({"id": [4, 5, 6],"name": ["name1", "name2", "name3"],"上次使用": pd.to_datetime(["2022-02-03 15:00:00", "2022-02-01 16:00:00", "2022-01-03 09:00:23"],format="%Y-%m-%d %H:%M:%S",),})#========data_init.to_sql("test1", engine, if_exists="append", index=False)# 或者with engine.connect() as cursor:cursor.execute(insert(Test1).values(data_init.to_dict("split")["data"]))
更新记录
注意需要先确保这些记录的ID在数据库中已存在。
from sqlalchemy import select, update, insertdf = df.to_dict("records")with engine.connect() as cursor:for line in df:cursor.execute(update(Test1).where(Test1.id == line["id"]).values(line))
删除记录
from sqlalchemy import select, update, insertIDs = df["id"].to_list()with engine.connect() as cursor:for item in IDs:cursor.execute(delete(Test1).where(Test1.id == item))
读取数据
pandas官网:read_sql_table
pandas官网:read_sql_query
通过engine的输出可以看出read_sql_query()确实只调用了一句SELECT ... FROM ...,而read_sql_table()进行了更多的操作。因此从性能的角度优选read_sql_query()。
df = pd.read_sql_table("test1", engine, columns=["id", "hash"])df = pd.read_sql_query(select(Test1.id,Test1.hash),engine)
