lico.core.contrib.models.py
from typing import Callable, Dict, Iterable, Optionalfrom django.db.models import Model as BaseModelfrom django.db.models.manager import BaseManagerfrom django.db.models.query import QuerySet as BaseQuerySetclass ToDictMixin: as_dict_exclude = () """ # workspace\lenovo_git\lico-core\lico-core-license\lico\core\license\views.py license = License.objects.get() result = license.as_dict( include=[ 'license_key', 'customer_name', 'cluster_type', 'license_code', 'expire_time', 'license_numbers', 'status' ] ) result: { "license_key": "b\"l\\xaa\\x", "cluster_type": "host", "license_code": "Lenovo HPC AI LiCO Software", "expire_time": "2032-06-30", "license_numbers": 64, "status": "valid", "rest_days": 4146 } >>> from lico.core.accounting.models import BillGroup, Deposit >>> query = Deposit.objects.all() >>> query[0].as_dict() {'id': 1, 'user': 'test', 'credits': -240.0, 'apply_time': 1615432804, 'approved_time': 1615432804, 'billing_type': '', 'billing_id': None, 'balance': -107.0, 'bill_group': {'id': 1, 'name': 'test', 'balance': 500.0, 'charged': 0.0, 'used_time': 0, 'used_credits': 0.0, 'description': '', 'charge_rate': 2.0, 'cr_minute': 0.0, 'cr_display_type': 'hour', 'last_operation_time': 1615442820, 'gres_charge_rate': {}, 'gcr_minute': {}, 'gcr_display_type': {}, 'memory_charge_rate': 4.0, 'mcr_minute': 0.0, 'mcr_display_type': 'hour', 'storage_charge_rate': 1.0}} >>> query[0].as_dict(include = ['bill_group']) {'bill_group': {'id': 1, 'name': 'test', 'balance': 500.0, 'charged': 0.0, 'used_time': 0, 'used_credits': 0.0, 'description': '', 'charge_rate': 2.0, 'cr_minute': 0.0, 'cr_display_type': 'hour', 'last_operation_time': 1615442820, 'gres_charge_rate': {}, 'gcr_minute': {}, 'gcr_display_type': {}, 'memory_charge_rate': 4.0, 'mcr_minute': 0.0, 'mcr_display_type': 'hour', 'storage_charge_rate': 1.0}} >>> query[0].as_dict(exclude = ['bill_group']) {'id': 1, 'user': 'test', 'credits': -240.0, 'apply_time': 1615432804, 'approved_time': 1615432804, 'billing_type': '', 'billing_id': None, 'balance': -107.0} """ def as_dict( self, inspect_related=True, include: Optional[Iterable[str]] = None, exclude: Optional[Iterable[str]] = None, related_field_options: Optional[Dict] = None, ## https://sikasjc.github.io/2018/07/14/type-hint-in-python/ **on_finished_options ): if include is not None: def is_excluded(field): return field not in include else: if exclude is None: exclude = set(self.as_dict_exclude) else: exclude = set(exclude) | set(self.as_dict_exclude) def is_excluded(field): return field in exclude if related_field_options is None: related_field_options = {} result = {} for field in self._meta.concrete_fields: if is_excluded(field.name) or field.is_relation: continue if hasattr(field, 'dict_from_object'): result[field.name] = field.dict_from_object(self) else: result[field.name] = field.value_from_object(self) if inspect_related: self.inspect_related_fields( result, is_excluded, related_field_options ) self.as_dict_on_finished(result, is_excluded, **on_finished_options) return result def as_dict_on_finished( self, result: Dict, is_exlucded: Callable, **kwargs ): pass def inspect_related_fields( self, result: Dict, is_excluded: Callable, related_field_options: Dict ): related_fields = ( rf for rf in self._meta.get_fields() if rf.is_relation and not is_excluded(rf.get_cache_name()) ) for rf in related_fields: option = related_field_options.get( rf.get_cache_name(), dict(inspect_related=False) ) if rf.one_to_many: self._on_inspect_one_to_many_field(rf, result, option) if rf.many_to_one: self._on_inspect_many_to_one_field(rf, result, option) if rf.one_to_one: self._on_inspect_one_to_one_field(rf, result, option) if rf.many_to_many: self._on_inspect_many_to_many_field(rf, result, option) def _on_inspect_one_to_many_field(self, rf, result: Dict, option: Dict): cache_name = rf.get_cache_name() result[cache_name] = [ related_object.as_dict(**option) for related_object in getattr(self, cache_name).iterator() if hasattr(related_object, 'as_dict') ] def _on_inspect_one_to_one_field(self, rf, result: Dict, option: Dict): cache_name = rf.get_cache_name() related_object = getattr(self, cache_name, None) if related_object is None: result[cache_name] = None if hasattr(related_object, 'as_dict'): result[cache_name] = related_object.as_dict(**option) def _on_inspect_many_to_one_field(self, rf, result: Dict, option: Dict): cache_name = rf.get_cache_name() related_object = getattr(self, cache_name, None) if related_object is None: result[cache_name] = None if hasattr(related_object, 'as_dict'): result[cache_name] = related_object.as_dict(**option) def _on_inspect_many_to_many_field(self, rf, result: Dict, option: Dict): cache_name = rf.get_cache_name() result[cache_name] = [ related_object.as_dict(**option) for related_object in getattr(self, cache_name).iterator() if hasattr(related_object, 'as_dict') ]class QuerySet(BaseQuerySet): def as_dict(self, *args, **kwargs): return [ obj.as_dict(*args, **kwargs) for obj in self.iterator() ] def ci_search(self, **kwargs): # pragma: no cover # case insensitive search db_table_name = self.model._meta.db_table where_clause = "" for prop, keyword in kwargs.items(): if where_clause: where_clause += " OR " where_clause += f"{db_table_name}.{prop} " \ f"LIKE '%%{keyword}%%' COLLATE utf8_general_ci" return self.extra(where=[where_clause])""" @classmethod def from_queryset(cls, queryset_class, class_name=None): if class_name is None: class_name = '%sFrom%s' % (cls.__name__, queryset_class.__name__) return type(class_name, (cls,), { '_queryset_class': queryset_class, **cls._get_queryset_methods(queryset_class), })## return a new_class"""class Manager(BaseManager.from_queryset(QuerySet)): passclass Model(BaseModel, ToDictMixin): objects = Manager() class Meta: abstract = True