django中ORM有三个类:
Manager:模型管理器,其中Manager类实现了和数据库之间的操作,Manager类中从QuerySet类中拷贝了许多表级方法,通过这些方法实现数据库的查询,并且将查询到的结果封装成一个Queryset对象返回,然后通过Queryset对象中的表级方法可以获取到我们需要的数据,如果我们不满足于现有的表级方法,可以通过重写Manager实现。
Queryset:查询结果集,同数据库的所有查询以及更新交互都是通过QuerySet来完成的。
Model:模型类,实现与数据库表的字段映射。
自定义Queryset
重写Queryset的update和delete方法可以帮助我们记录数据库的更新和删除
class BaseQuerySet(QuerySet):
def update(self, **kwargs):
# 重写QuerySet的update方法 记录数据库更新日志
log_list = []
for model in self:
update_log = dict()
name = model.__module__.split('.')[-1]
update_log['table_name'] = name
update_log[name + '_pk'] = model.pk
update_before = {} # 更新之前
update_after = {} # 更新之后
for key, value in kwargs.items():
if not hasattr(model, key):
continue
model_value = getattr(model, key)
if isinstance(model_value, decimal.Decimal): # 取出的类型有可能mongodb无法存储
update_before[key] = float(model_value)
else:
update_before[key] = model_value
if isinstance(value, models.Model): # 判断key是否为外键
value = value.pk
update_after[key] = value
if key == 'update_operator' or key == 'update_operator_id':
update_log['update_operator'] = value
if [i for i in log_handler.log.important_table if i.match(name)]:
update_log['update_before_all_info'] = str(model_to_dict(model))
update_log['update_before_info'] = str(update_before)
update_log['update_after_info'] = str(update_after)
update_log['operating'] = 'update'
log_list.append(update_log)
result = super(BaseQuerySet, self).update(**kwargs)
log_handler.log.record_business_log(log_list)
return result
def delete(self):
log_list = []
for info in self:
delete_log = dict()
name = info.__module__.split('.')[-1]
delete_log['table_name'] = name
delete_log[name + '_pk'] = info.pk
delete_info = dict()
for k, v in info.__dict__.items():
if k == '_state':
continue
v = mysql_conversion_mongo_utils.conversion(v)
delete_info[k] = v
delete_log['delete_info'] = delete_info
delete_log['delete_datetime'] = datetime.now()
delete_log['operating'] = 'delete'
log_list.append(delete_log)
log_handler.log.record_business_log(log_list=log_list)
return super(BaseQuerySet, self).delete()
自定义Manager
可以自定义Manager类来实现自己的方法,简化数据库的DML操作
class Manager(DjangoManager.from_queryset(BaseQuerySet)):
"""
继承 BaseQuerySet
将QuerySet的方法当参数传给了 Manager
"""
pass
class BaseManager(Manager):
"""
通用的orm管理器
"""
def get_one_object(self, db_select=None, **kwargs):
"""
base manager 获取一条数据
:param db_select: 为数据库分离做准备
:param kwargs: 查询条件 k=v
:return:
"""
try:
k_obj = self.db_manager(db_select).get(**kwargs)
except (ObjectDoesNotExist, MultipleObjectsReturned) as e:
k_obj = None
except Exception as e:
print(e)
raise EtError(type=EtDBError.read_error, value='获取一条数据连接数据库有误')
return k_obj
def create_one_object(self, create_info=None, db_select=None, **kwargs):
"""
base manager 添加一个对象 判断传来的参数是否为该管理器对应模型类的字段
:param db_select: 为数据库分离做准备
:param kwargs: k=v
:return:
"""
create_info = create_info if type(create_info) == dict else dict()
res_attr_list = self.__get_all_attr_list()
kwg_copy = copy.deepcopy(create_info)
# 如果不是管理器对应模型类的字段则删除
for kw in create_info:
if kw not in res_attr_list:
kwg_copy.pop(kw)
# 创建对象f
try:
k_obj = self.db_manager(db_select).model(**kwg_copy)
k_obj.save(force_insert=True)
except Exception as e:
print(e)
k_obj = None
return k_obj
def update_object(self, filters=None, excludes=None, updates=None, db_select=None, **kwargs):
"""
base manager 根据条件更新数据库的参数
update info
:param filters: 查询条件
:param db_select: 为数据库分离做准备
:param exclude: 查询条件
:param update: 更新的内容
:return:
"""
filters = filters if type(filters) == dict else dict()
excludes = excludes if type(excludes) == dict else dict()
updates = updates if type(updates) == dict else dict()
if len(filters) == 0 and len(excludes) == 0:
return None
if len(updates) == 0:
return None
try:
k_objects = self.db_manager(db_select).filter(**filters).exclude(**excludes).update(**updates)
except Exception as e:
print(e)
k_objects = None
return k_objects
def get_filter_list(self, filters=None, excludes=None, order_by=('-pk',), db_select=None, **kwargs):
"""
base manager 批量获取数据
:param filters: 查询条件
:param excludes: 取反条件
:param db_select: 为数据库分离做准备
:param order_by: 排序提交
:param kwargs:
:return:
"""
filters = filters if type(filters) == dict else dict()
excludes = excludes if type(excludes) == dict else dict()
try:
select_related = kwargs.pop('select_related')
except KeyError as e:
select_related = (None,)
try:
only = kwargs.pop('only')
k_object_list = self.db_manager(db_select).filter(**filters).exclude(**excludes).order_by(*order_by).select_related(*select_related).only(*only)
except KeyError as e:
k_object_list = self.db_manager(db_select).filter(**filters).exclude(**excludes).order_by(*order_by).select_related(*select_related)
except Exception as e:
print(e)
k_object_list = None
return k_object_list
def get_filter_values(self, q_objs=None, filters=None, e_q_objs=None, excludes=None, order_by=None, values=None, db_select=None):
"""
wangge
只查询某些字段,获取到的是QuerySet,跨表查询用的是 Inner Join
:param q_objs: 元组形式的查询条件
:param filters: 字典形式的查询条件
:param e_q_objs: 元组形式的排除查询条件
:param excludes: 排除条件
:param order_by: 排序条件
:param values: 只查询的某些字段
:param db_select:
:return: 返回值是QuerySet,元素是字典,查询报错返回 None
"""
filters = filters if isinstance(filters, dict) else dict()
excludes = excludes if isinstance(excludes, dict) else dict()
q_objs = tuple(q_objs) if isinstance(q_objs, (tuple, list, set)) else tuple()
e_q_objs = tuple(e_q_objs) if isinstance(e_q_objs, (tuple, list, set)) else tuple()
order_by = tuple(order_by) if isinstance(order_by, (tuple, list, set)) else tuple()
values = tuple(values) if isinstance(values, (tuple, list, set)) else tuple()
try:
dict_list = self.db_manager(db_select).filter(*q_objs, **filters).exclude(*e_q_objs, **excludes).order_by(*order_by).values(*values)
except Exception as e:
print(e)
dict_list = None
return dict_list
def get_filter_and_fuzzy_queries_list(self, filters=None, q_objs=(), e_q_objs=(), excludes=None, order_by=None, db_select=None, **kwargs):
"""
过滤出查询结果后,对结果进行模糊查询,再将结果返回
:param filters: 过滤条件 --> 字典
:param excludes: 排除条件 --> 字典
:param q_objs: 查询条件 --> Q对象
:param e_q_objs: 排除条件 --> Q对象
:param order_by: 排序条件
:param db_select: 数据库选择
:param kwargs: 预留参数
:return:
"""
filters = filters if type(filters) == dict else dict()
excludes = excludes if isinstance(excludes, dict) else dict()
if not order_by:
order_by = ('-pk',)
try:
k_object_list = self.db_manager(db_select).filter(**filters).filter(*q_objs).exclude(*e_q_objs, **excludes).order_by(*order_by)
except Exception as e:
print(e)
k_object_list = None
return k_object_list
def get_count(self, condition=None, db_select=None):
"""
manager 获取count
:param db_select: 为数据库分离做准备
:param condition:
:return:
"""
condition = condition if type(condition) == dict else dict()
try:
count = self.db_manager(db_select).filter(**condition).count()
except Exception as e:
count = None
print(e)
return count
def delete_mul_obj(self, filters, excludes, db_select, **kwargs):
"""
删除数据库
:param filters:
:param excludes:
:param db_select:
:param kwargs:
:return:
"""
filters = filters if type(filters) == dict else dict()
excludes = excludes if type(excludes) == dict else dict()
if len(filters) == 0 and len(excludes) == 0:
return None
try:
res = self.db_manager(db_select).filter(**filters).exclude(**excludes).delete()
except Exception as e:
print(e)
res = None
return res
def del_info(self, condition, db_select=None, **kwargs):
"""
删除一些临时数据
:param condition:
:param db_select:
:param kwargs:
:return:
"""
condition = condition if type(condition) == dict else dict()
if len(condition) == 0:
return 0
try:
result = self.db_manager(db_select).filter(**condition).delete()
result = result[0]
except Exception as e:
print(e)
result = 0
return result
class BaseModel(models.Model):
objects = BaseManager()
def __getitem__(self, item):
if hasattr(self, item):
return getattr(self, item)
else:
return None
class Meta:
abstract = True