1. import datetime
  2. from peewee import *
  3. import logging
  4. logger = logging.getLogger("peewee")
  5. logger.setLevel(logging.DEBUG)
  6. logger.addHandler(logging.StreamHandler())
  7. db = MySQLDatabase('peewee', host='127.0.0.1', user='root', passwd='anwma');
  8. class BaseModel(Model):
  9. add_time = DateTimeField(default=datetime.datetime.now, verbose_name="添加时间")
  10. class Meta:
  11. database = db # 这里是数据库链接,为了方便建立多个表,可以把这个部分提炼出来形成一个新的类
  12. class Person(BaseModel):
  13. name = CharField(verbose_name='姓名', max_length=10, null=False, index=True)
  14. passwd = CharField(verbose_name='密码', max_length=20, null=False, default='123456')
  15. email = CharField(verbose_name='邮件', max_length=50, null=False, unique=True)
  16. gender = IntegerField(verbose_name='性别', null=False, default=1)
  17. birthday = DateField(verbose_name='生日', null=True, default=None)
  18. is_admin = BooleanField(verbose_name='是否是管理员', default=True)
  19. class Meta:
  20. table_name = 'person' # 这里可以自定义表名
  21. if __name__ == "__main__":
  22. db.connect()
  23. db.create_tables([Person])

1. 主键约束

  1. import datetime
  2. from peewee import *
  3. import logging
  4. logger = logging.getLogger("peewee")
  5. logger.setLevel(logging.DEBUG)
  6. logger.addHandler(logging.StreamHandler())
  7. db = MySQLDatabase('peewee', host='127.0.0.1', user='root', passwd='anwma');
  8. class BaseModel(Model):
  9. add_time = DateTimeField(default=datetime.datetime.now, verbose_name="添加时间")
  10. class Meta:
  11. database = db # 这里是数据库链接,为了方便建立多个表,可以把这个部分提炼出来形成一个新的类
  12. class Person(BaseModel):
  13. first = CharField()
  14. last = CharField()
  15. class Meta:
  16. primary_key = CompositeKey('first', 'last')
  17. class Pet(BaseModel):
  18. owner_first = CharField()
  19. owner_last = CharField()
  20. pet_name = CharField()
  21. class Meta:
  22. constraints = [SQL('FOREIGN KEY(owner_first,owner_last) REFERENCES person(first,last)')]
  23. class Blog(BaseModel):
  24. pass
  25. class Tag(BaseModel):
  26. pass
  27. # 复合主键
  28. class BlogToTag(BaseModel):
  29. """A simple "through" table for many-to-many relationship."""
  30. blog = ForeignKeyField(Blog)
  31. tag = ForeignKeyField(Tag)
  32. class Meta:
  33. primary_key = CompositeKey('blog', 'tag')
  34. if __name__ == "__main__":
  35. db.connect()
  36. db.create_tables([Person, Pet, Blog, Tag, BlogToTag])

2. 数据插入

  1. p_id = Person.insert({'name': 'bobby'}).execute() # 插入一条数据,返回主键
  2. print(p_id) # 打印出新插入的数据的id
  3. data_source = [
  4. {'field1': 'val1-1', 'field2': 'val1-2'},
  5. {'field1': 'val2-1', 'field2': 'val2-2'},
  6. # ...
  7. ]
  8. for data_dict in data_source:
  9. Model.create(**data_dict)
  10. # 若是有很多数据需要插入,例如几万条数据,为了性能,这时就需要使用insert_many(),如下:
  11. data = [
  12. {'facid': 9, 'name': 'Spa', 'membercost': 20, 'guestcost': 30, 'initialoutlay': 100000, 'monthlymaintenance': 800},
  13. {'facid': 10, 'name': 'Squash Court 2', 'membercost': 3.5, 'guestcost': 17.5, 'initialoutlay': 5000,
  14. 'monthlymaintenance': 80}
  15. ]
  16. query = Facility.insert_many(data) # 插入了多个

3. 单条查询

  1. User.get(User.id == 1)
  2. User.get_by_id(1) # Same as above.
  3. User[1] # Also same as above
  4. g = Person.select().where(Person.name == 'Grandma L.').get() # where 是查询一个集合,select是查询字段
  5. g = Person.get(Person.name == 'fff.') # get是得到第一个
  6. g = Person.select().where(Person.age > 23).get()
  7. # select 代表sql语句中select后面的语句表示要展示的字段
  8. # where代表where条件语句 得到一个数据集合 用for循环遍历
  9. # get()代表找第一个
  10. person.created = Person.get_or_create(
  11. first_name=first_name,
  12. last_name=last_name,
  13. defaults={'dob': 'dob', 'favorite_color': 'green'}
  14. )

4. 复合条件查询

  1. query1 = Person.select().where((Person.name == "fff0") | (Person.name == "sss1"))
  2. query2 = Person.select().where((Person.name == "fff") & (Person.is_relative == True))

5. 模糊查询

  1. query = Facility.select().where(Facility.name.contains('tennis'))

6. in查询

  1. query = Facility.select().where(Facility.facid.in_([1,5]))

7. 字典展示

  1. query = User.select().dicts()
  2. for row in query:
  3. print(row)

8. 排序、limit、去重

  1. query = (Person.select(Person.name).order_by(Person.name).limit(10).distinct())
  2. # 几乎和sql一模一样
  3. Person.select().order_by(Person.birthday.desc()) # 日期排序
  4. Tweet.select().order_by(-Tweet.created_date)
  5. # 查询整张表的数据条数
  6. total_num = Person.select().count()

9. 聚合函数

  1. # SELECT MAX(birthday) FROM person;
  2. query = Person.select(fn.MAX(Person.birthday))
  3. # SELECT name, is_relative FROM person WHERE birthday = (SELECT MAX(birthday) FROM person)
  4. MemberAlias = Member.alias() # 如果一个查询中用了两个表,需要这个Alias作为影子
  5. subq = MemberAlias.select(fn.MAX(MemberAlias.joindate))
  6. query = (Member.select(Person.is_relative, Person.name).where(Person.birthday == subq))

10. 分页&计数

  1. # paginate 两个参数:page_number 和items_per_page
  2. for tweet in Tweet.select().order_by(Tweet.id).paginate(2,10):
  3. print(tweet.message)
  4. # 返回查到了多少条记录
  5. Tweet.select().where(Tweet.id > 50).count()

11. 执行原生sql

  1. query = MyModel.raw('SELECT * FROM my_table WHERE data = %s', user_data)
  2. query = MyModel.select().where(SQL('Some SQL expression %s' % user_data))

12. 查询多条

  1. # 查询Person整张表的数据
  2. persons = Person.select()
  3. # 遍历数据
  4. for p in persons:
  5. print(p.name, p.birthday, p.is_relative)
  6. # 获取is_relative为True的数据
  7. # 我们可以在select()后面添加where()当做查询条件
  8. persons = Person.select().where(Person.is_relative == True)
  9. for p in person:
  10. print(p.name, p.birthday, p.is_relative)
  11. # 打印sql
  12. persons = Person.select().where(Person.is_relative == True)
  13. # 打印出的结果为:
  14. # SELECT `t1`.`id`, `t1`.`name`, `t1`.`is_relative` FROM `Person` AS `t1` WHERE (`t1`.`is_relative` = %s)',[True])
  15. print(persons.sql())

13. limit和offset

  1. # 相当于sql语句:select * from person order by create_time desc limit 5
  2. person = Person.select().order_by(Person.create_time.asc()).limit(5)
  3. # 相当于sql语句中:select * from person order by create_time desc limit 2,5
  4. person = Person.select().order_by(Person.create_time.asc()).limit(5).offset(2)

14. 表连接

  1. import datetime
  2. from peewee import *
  3. db = SqliteDatabase(':memory:')
  4. class BaseModel(Model):
  5. class Meta:
  6. database = db
  7. class User(BaseModel):
  8. username = TextField()
  9. class Tweet(BaseModel):
  10. content = TextField()
  11. timestamp = DateTimeField(default=datetime.datetime.now)
  12. user = ForeignKeyField(User, backref='tweets')
  13. class Favorite(BaseModel):
  14. user = ForeignKeyField(User, backref='favorites')
  15. tweet = ForeignKeyField(Tweet, backref='favorites')
  16. # 数据准备
  17. def populate_test_data():
  18. db.create_tables([User, Tweet, Favorite])
  19. data = (
  20. ('huey', ('meow', 'hiss', 'purr')),
  21. ('mickey', ('woof', 'whine')),
  22. ('zaizee', ())
  23. )
  24. for username, tweets in data:
  25. user = User.create(username=username)
  26. for tweet in tweets:
  27. Tweet.create(user=user, content=tweet)
  28. # Populate a few favorites for our users,such that:
  29. favorite_data = (
  30. ('huey', ['whine']),
  31. ('mickey', ['purr']),
  32. ('zaizee', ['meow', 'purr'])
  33. )
  34. for username, favorites in favorite_data:
  35. user = User.get(User.username == username)
  36. for content in favorites:
  37. tweet = Tweet.get(Tweet.content == content)
  38. Favorite.create(user=user, tweet=tweet)
  39. query = Tweet.select().join(User).where(User.username == 'huey')
  40. # 等价于
  41. query = (Tweet
  42. .select()
  43. .join(User, on=(Tweet.user == User.id))
  44. .where(User.username == 'huey')
  45. )
  46. huey.tweets.sql()

15. 外键

  1. # 15 外键
  2. class Pet(peewee.Model):
  3. name = peewee.CharField()
  4. owner = peewee.ForeignKeyField(Person, related_name="pets", backref='petties')
  5. # backref 是反查的字段,如果有 related_name 用 related_name 反查,如果没有直接用 petties 反查
  6. # e.g. [i.name for i in Person.get(name="aaa").petties]
  7. class Meta:
  8. database = db
  9. class Category(Model):
  10. name = CharField()
  11. parent = ForeignKeyField('self', null=True, backref='children')
  12. # 注意自关联永远是null = True
  13. # 插入数据
  14. g2 = Person.get(tablesinfo.Person.is_relative == False)
  15. d2 = Pet.create(name="dog2", owner=g2)
  16. # 正查
  17. dog1 = Pet.get(name="dog1")
  18. dog1.owner.name
  19. # 反查
  20. aaa = Person.get(name="aaa").pets
  21. # pets为 related_name 字段, 如果没写用backref字段
  22. for a in aaa:
  23. print(i.name)
  24. g1 = Person.select().join(Pet).where(Pet.name == "dog2")
  25. # SELECT DISTINCT m.firstname, m.surname FROM members AS m2 INNER JOIN
  26. # members AS m ON (m.memid = m2.recommendedby) ORDER BY m.surname, m.firstname;
  27. MA = Member.alias()
  28. query = (Member
  29. .select(Member.firstname, Member.surname)
  30. .join(MA, on=(MA.recommendedby == Member.memid)) # join中用on表示链接方法
  31. .order_by(Member.surname, Member.firstname)
  32. )
  33. User.select().join(Tweet).join(Comment)
  34. Artist.select().join(Album).switch(Artist).join(Genre) # 如果连一表多次

16. 操作符

image.png
http://doc.peewee-orm.com/en/latest/peewee/query

17. 避免N+1查询

N+1查询指的是当应用提交异常查询获取结果,然后在取得结构数据集的每一行时,应用至少再次查询一次(也可能看做是嵌套循环)。
大多数情况下,n查询可以通过使用SQL join或子查询来避免。数据库本身可能做了嵌套循环,但是它比在你的应用代码本身里做这些n查询更高效,后者通常会导致与数据库再次潜在通讯,没有利用数据库本身关联和执行子查询时会进行切片等优化工作。

  1. query = (Tweet
  2. .select(Tweet,User) # Note that we are selecting both models.
  3. .join(User) # Use an INNER join because every tweet has an author
  4. .order_by(Tweet.id.desc()) #Get the most recent tweets
  5. .limit(10)
  6. )
  7. for tweet in query:
  8. print(tweet.user.username, '-', tweet.message)

没有用join时,得到tweet.user.username会触发一次查询去解析外键tweet.user从而得到相关联的user.
由于我们在User上关联并选择,peewee自动为我们解析外键。
列出所有用户和他们的tweets.