import datetime
from peewee import *
import logging
logger = logging.getLogger("peewee")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
db = MySQLDatabase('peewee', host='127.0.0.1', user='root', passwd='anwma');
class BaseModel(Model):
add_time = DateTimeField(default=datetime.datetime.now, verbose_name="添加时间")
class Meta:
database = db # 这里是数据库链接,为了方便建立多个表,可以把这个部分提炼出来形成一个新的类
class Person(BaseModel):
name = CharField(verbose_name='姓名', max_length=10, null=False, index=True)
passwd = CharField(verbose_name='密码', max_length=20, null=False, default='123456')
email = CharField(verbose_name='邮件', max_length=50, null=False, unique=True)
gender = IntegerField(verbose_name='性别', null=False, default=1)
birthday = DateField(verbose_name='生日', null=True, default=None)
is_admin = BooleanField(verbose_name='是否是管理员', default=True)
class Meta:
table_name = 'person' # 这里可以自定义表名
if __name__ == "__main__":
db.connect()
db.create_tables([Person])
1. 主键约束
import datetime
from peewee import *
import logging
logger = logging.getLogger("peewee")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
db = MySQLDatabase('peewee', host='127.0.0.1', user='root', passwd='anwma');
class BaseModel(Model):
add_time = DateTimeField(default=datetime.datetime.now, verbose_name="添加时间")
class Meta:
database = db # 这里是数据库链接,为了方便建立多个表,可以把这个部分提炼出来形成一个新的类
class Person(BaseModel):
first = CharField()
last = CharField()
class Meta:
primary_key = CompositeKey('first', 'last')
class Pet(BaseModel):
owner_first = CharField()
owner_last = CharField()
pet_name = CharField()
class Meta:
constraints = [SQL('FOREIGN KEY(owner_first,owner_last) REFERENCES person(first,last)')]
class Blog(BaseModel):
pass
class Tag(BaseModel):
pass
# 复合主键
class BlogToTag(BaseModel):
"""A simple "through" table for many-to-many relationship."""
blog = ForeignKeyField(Blog)
tag = ForeignKeyField(Tag)
class Meta:
primary_key = CompositeKey('blog', 'tag')
if __name__ == "__main__":
db.connect()
db.create_tables([Person, Pet, Blog, Tag, BlogToTag])
2. 数据插入
p_id = Person.insert({'name': 'bobby'}).execute() # 插入一条数据,返回主键
print(p_id) # 打印出新插入的数据的id
data_source = [
{'field1': 'val1-1', 'field2': 'val1-2'},
{'field1': 'val2-1', 'field2': 'val2-2'},
# ...
]
for data_dict in data_source:
Model.create(**data_dict)
# 若是有很多数据需要插入,例如几万条数据,为了性能,这时就需要使用insert_many(),如下:
data = [
{'facid': 9, 'name': 'Spa', 'membercost': 20, 'guestcost': 30, 'initialoutlay': 100000, 'monthlymaintenance': 800},
{'facid': 10, 'name': 'Squash Court 2', 'membercost': 3.5, 'guestcost': 17.5, 'initialoutlay': 5000,
'monthlymaintenance': 80}
]
query = Facility.insert_many(data) # 插入了多个
3. 单条查询
User.get(User.id == 1)
User.get_by_id(1) # Same as above.
User[1] # Also same as above
g = Person.select().where(Person.name == 'Grandma L.').get() # where 是查询一个集合,select是查询字段
g = Person.get(Person.name == 'fff.') # get是得到第一个
g = Person.select().where(Person.age > 23).get()
# select 代表sql语句中select后面的语句表示要展示的字段
# where代表where条件语句 得到一个数据集合 用for循环遍历
# get()代表找第一个
person.created = Person.get_or_create(
first_name=first_name,
last_name=last_name,
defaults={'dob': 'dob', 'favorite_color': 'green'}
)
4. 复合条件查询
query1 = Person.select().where((Person.name == "fff0") | (Person.name == "sss1"))
query2 = Person.select().where((Person.name == "fff") & (Person.is_relative == True))
5. 模糊查询
query = Facility.select().where(Facility.name.contains('tennis'))
6. in查询
query = Facility.select().where(Facility.facid.in_([1,5]))
7. 字典展示
query = User.select().dicts()
for row in query:
print(row)
8. 排序、limit、去重
query = (Person.select(Person.name).order_by(Person.name).limit(10).distinct())
# 几乎和sql一模一样
Person.select().order_by(Person.birthday.desc()) # 日期排序
Tweet.select().order_by(-Tweet.created_date)
# 查询整张表的数据条数
total_num = Person.select().count()
9. 聚合函数
# SELECT MAX(birthday) FROM person;
query = Person.select(fn.MAX(Person.birthday))
# SELECT name, is_relative FROM person WHERE birthday = (SELECT MAX(birthday) FROM person)
MemberAlias = Member.alias() # 如果一个查询中用了两个表,需要这个Alias作为影子
subq = MemberAlias.select(fn.MAX(MemberAlias.joindate))
query = (Member.select(Person.is_relative, Person.name).where(Person.birthday == subq))
10. 分页&计数
# paginate 两个参数:page_number 和items_per_page
for tweet in Tweet.select().order_by(Tweet.id).paginate(2,10):
print(tweet.message)
# 返回查到了多少条记录
Tweet.select().where(Tweet.id > 50).count()
11. 执行原生sql
query = MyModel.raw('SELECT * FROM my_table WHERE data = %s', user_data)
query = MyModel.select().where(SQL('Some SQL expression %s' % user_data))
12. 查询多条
# 查询Person整张表的数据
persons = Person.select()
# 遍历数据
for p in persons:
print(p.name, p.birthday, p.is_relative)
# 获取is_relative为True的数据
# 我们可以在select()后面添加where()当做查询条件
persons = Person.select().where(Person.is_relative == True)
for p in person:
print(p.name, p.birthday, p.is_relative)
# 打印sql
persons = Person.select().where(Person.is_relative == True)
# 打印出的结果为:
# SELECT `t1`.`id`, `t1`.`name`, `t1`.`is_relative` FROM `Person` AS `t1` WHERE (`t1`.`is_relative` = %s)',[True])
print(persons.sql())
13. limit和offset
# 相当于sql语句:select * from person order by create_time desc limit 5
person = Person.select().order_by(Person.create_time.asc()).limit(5)
# 相当于sql语句中:select * from person order by create_time desc limit 2,5
person = Person.select().order_by(Person.create_time.asc()).limit(5).offset(2)
14. 表连接
import datetime
from peewee import *
db = SqliteDatabase(':memory:')
class BaseModel(Model):
class Meta:
database = db
class User(BaseModel):
username = TextField()
class Tweet(BaseModel):
content = TextField()
timestamp = DateTimeField(default=datetime.datetime.now)
user = ForeignKeyField(User, backref='tweets')
class Favorite(BaseModel):
user = ForeignKeyField(User, backref='favorites')
tweet = ForeignKeyField(Tweet, backref='favorites')
# 数据准备
def populate_test_data():
db.create_tables([User, Tweet, Favorite])
data = (
('huey', ('meow', 'hiss', 'purr')),
('mickey', ('woof', 'whine')),
('zaizee', ())
)
for username, tweets in data:
user = User.create(username=username)
for tweet in tweets:
Tweet.create(user=user, content=tweet)
# Populate a few favorites for our users,such that:
favorite_data = (
('huey', ['whine']),
('mickey', ['purr']),
('zaizee', ['meow', 'purr'])
)
for username, favorites in favorite_data:
user = User.get(User.username == username)
for content in favorites:
tweet = Tweet.get(Tweet.content == content)
Favorite.create(user=user, tweet=tweet)
query = Tweet.select().join(User).where(User.username == 'huey')
# 等价于
query = (Tweet
.select()
.join(User, on=(Tweet.user == User.id))
.where(User.username == 'huey')
)
huey.tweets.sql()
15. 外键
# 15 外键
class Pet(peewee.Model):
name = peewee.CharField()
owner = peewee.ForeignKeyField(Person, related_name="pets", backref='petties')
# backref 是反查的字段,如果有 related_name 用 related_name 反查,如果没有直接用 petties 反查
# e.g. [i.name for i in Person.get(name="aaa").petties]
class Meta:
database = db
class Category(Model):
name = CharField()
parent = ForeignKeyField('self', null=True, backref='children')
# 注意自关联永远是null = True
# 插入数据
g2 = Person.get(tablesinfo.Person.is_relative == False)
d2 = Pet.create(name="dog2", owner=g2)
# 正查
dog1 = Pet.get(name="dog1")
dog1.owner.name
# 反查
aaa = Person.get(name="aaa").pets
# pets为 related_name 字段, 如果没写用backref字段
for a in aaa:
print(i.name)
g1 = Person.select().join(Pet).where(Pet.name == "dog2")
# SELECT DISTINCT m.firstname, m.surname FROM members AS m2 INNER JOIN
# members AS m ON (m.memid = m2.recommendedby) ORDER BY m.surname, m.firstname;
MA = Member.alias()
query = (Member
.select(Member.firstname, Member.surname)
.join(MA, on=(MA.recommendedby == Member.memid)) # join中用on表示链接方法
.order_by(Member.surname, Member.firstname)
)
User.select().join(Tweet).join(Comment)
Artist.select().join(Album).switch(Artist).join(Genre) # 如果连一表多次
16. 操作符
http://doc.peewee-orm.com/en/latest/peewee/query
17. 避免N+1查询
N+1查询指的是当应用提交异常查询获取结果,然后在取得结构数据集的每一行时,应用至少再次查询一次(也可能看做是嵌套循环)。
大多数情况下,n查询可以通过使用SQL join或子查询来避免。数据库本身可能做了嵌套循环,但是它比在你的应用代码本身里做这些n查询更高效,后者通常会导致与数据库再次潜在通讯,没有利用数据库本身关联和执行子查询时会进行切片等优化工作。
query = (Tweet
.select(Tweet,User) # Note that we are selecting both models.
.join(User) # Use an INNER join because every tweet has an author
.order_by(Tweet.id.desc()) #Get the most recent tweets
.limit(10)
)
for tweet in query:
print(tweet.user.username, '-', tweet.message)
没有用join
时,得到tweet.user.username
会触发一次查询去解析外键tweet.user
从而得到相关联的user
.
由于我们在User
上关联并选择,peewee
自动为我们解析外键。
列出所有用户和他们的tweets
.