官网总教程
所有代码基于sqlalchemy,官网有极其详细的英文教程
创建表
sqlalchemy官网教程:Column的写法
sqlalchemy官网教程:用ORM方式创建表
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer,String, DateTime
from sqlalchemy.orm import declarative_base
Base = declarative_base()
class Test1(Base):
__tablename__ = "test1"
id = Column(Integer, primary_key=True)
name = Column(String, nullable=False)
lastuse = Column("上次使用", DateTime)
hash = Column(String(16), nullable=False)
engine = create_engine("sqlite:///aa.sqlite3", echo=True)
Test1.metadata.create_all(engine)
写入新记录
python官网:日期时间的模式字符串
pandas官网:sql相关快捷函数
pandas函数to_sql
pandas函数to_dict
sqlalchemy官网:批量insert
from sqlalchemy import select, update, insert
# 注意这里的日期格式是sqlite标准日期格式,时分秒后面可以再加微秒,如12:31:54.800689。微秒的模式符为%f
data_init = pd.DataFrame(
{
"id": [4, 5, 6],
"name": ["name1", "name2", "name3"],
"上次使用": pd.to_datetime(
["2022-02-03 15:00:00", "2022-02-01 16:00:00", "2022-01-03 09:00:23"],
format="%Y-%m-%d %H:%M:%S",
),
}
)
#========
data_init.to_sql("test1", engine, if_exists="append", index=False)
# 或者
with engine.connect() as cursor:
cursor.execute(insert(Test1).values(data_init.to_dict("split")["data"]))
更新记录
注意需要先确保这些记录的ID在数据库中已存在。
from sqlalchemy import select, update, insert
df = df.to_dict("records")
with engine.connect() as cursor:
for line in df:
cursor.execute(update(Test1).where(Test1.id == line["id"]).values(line))
删除记录
from sqlalchemy import select, update, insert
IDs = df["id"].to_list()
with engine.connect() as cursor:
for item in IDs:
cursor.execute(delete(Test1).where(Test1.id == item))
读取数据
pandas官网:read_sql_table
pandas官网:read_sql_query
通过engine的输出可以看出read_sql_query()
确实只调用了一句SELECT ... FROM ...
,而read_sql_table()
进行了更多的操作。因此从性能的角度优选read_sql_query()
。
df = pd.read_sql_table("test1", engine, columns=["id", "hash"])
df = pd.read_sql_query(select(Test1.id,Test1.hash),engine)