lico.core.contrib.models.py
from typing import Callable, Dict, Iterable, Optional
from django.db.models import Model as BaseModel
from django.db.models.manager import BaseManager
from django.db.models.query import QuerySet as BaseQuerySet
class ToDictMixin:
as_dict_exclude = ()
"""
# workspace\lenovo_git\lico-core\lico-core-license\lico\core\license\views.py
license = License.objects.get()
result = license.as_dict(
include=[
'license_key', 'customer_name',
'cluster_type', 'license_code', 'expire_time',
'license_numbers', 'status'
]
)
result: {
"license_key": "b\"l\\xaa\\x",
"cluster_type": "host",
"license_code": "Lenovo HPC AI LiCO Software",
"expire_time": "2032-06-30",
"license_numbers": 64,
"status": "valid",
"rest_days": 4146
}
>>> from lico.core.accounting.models import BillGroup, Deposit
>>> query = Deposit.objects.all()
>>> query[0].as_dict()
{'id': 1, 'user': 'test', 'credits': -240.0, 'apply_time': 1615432804, 'approved_time': 1615432804, 'billing_type': '', 'billing_id': None, 'balance': -107.0, 'bill_group': {'id': 1, 'name': 'test', 'balance': 500.0, 'charged': 0.0, 'used_time': 0, 'used_credits': 0.0, 'description': '', 'charge_rate': 2.0, 'cr_minute': 0.0, 'cr_display_type': 'hour', 'last_operation_time': 1615442820, 'gres_charge_rate': {}, 'gcr_minute': {}, 'gcr_display_type': {}, 'memory_charge_rate': 4.0, 'mcr_minute': 0.0, 'mcr_display_type': 'hour', 'storage_charge_rate': 1.0}}
>>> query[0].as_dict(include = ['bill_group'])
{'bill_group': {'id': 1, 'name': 'test', 'balance': 500.0, 'charged': 0.0, 'used_time': 0, 'used_credits': 0.0, 'description': '', 'charge_rate': 2.0, 'cr_minute': 0.0, 'cr_display_type': 'hour', 'last_operation_time': 1615442820, 'gres_charge_rate': {}, 'gcr_minute': {}, 'gcr_display_type': {}, 'memory_charge_rate': 4.0, 'mcr_minute': 0.0, 'mcr_display_type': 'hour', 'storage_charge_rate': 1.0}}
>>> query[0].as_dict(exclude = ['bill_group'])
{'id': 1, 'user': 'test', 'credits': -240.0, 'apply_time': 1615432804, 'approved_time': 1615432804, 'billing_type': '', 'billing_id': None, 'balance': -107.0}
"""
def as_dict(
self, inspect_related=True,
include: Optional[Iterable[str]] = None,
exclude: Optional[Iterable[str]] = None,
related_field_options: Optional[Dict] = None, ## https://sikasjc.github.io/2018/07/14/type-hint-in-python/
**on_finished_options
):
if include is not None:
def is_excluded(field):
return field not in include
else:
if exclude is None:
exclude = set(self.as_dict_exclude)
else:
exclude = set(exclude) | set(self.as_dict_exclude)
def is_excluded(field):
return field in exclude
if related_field_options is None:
related_field_options = {}
result = {}
for field in self._meta.concrete_fields:
if is_excluded(field.name) or field.is_relation:
continue
if hasattr(field, 'dict_from_object'):
result[field.name] = field.dict_from_object(self)
else:
result[field.name] = field.value_from_object(self)
if inspect_related:
self.inspect_related_fields(
result, is_excluded, related_field_options
)
self.as_dict_on_finished(result, is_excluded, **on_finished_options)
return result
def as_dict_on_finished(
self, result: Dict, is_exlucded: Callable, **kwargs
):
pass
def inspect_related_fields(
self, result: Dict, is_excluded: Callable,
related_field_options: Dict
):
related_fields = (
rf for rf in self._meta.get_fields()
if rf.is_relation and not is_excluded(rf.get_cache_name())
)
for rf in related_fields:
option = related_field_options.get(
rf.get_cache_name(), dict(inspect_related=False)
)
if rf.one_to_many:
self._on_inspect_one_to_many_field(rf, result, option)
if rf.many_to_one:
self._on_inspect_many_to_one_field(rf, result, option)
if rf.one_to_one:
self._on_inspect_one_to_one_field(rf, result, option)
if rf.many_to_many:
self._on_inspect_many_to_many_field(rf, result, option)
def _on_inspect_one_to_many_field(self, rf, result: Dict, option: Dict):
cache_name = rf.get_cache_name()
result[cache_name] = [
related_object.as_dict(**option)
for related_object in getattr(self, cache_name).iterator()
if hasattr(related_object, 'as_dict')
]
def _on_inspect_one_to_one_field(self, rf, result: Dict, option: Dict):
cache_name = rf.get_cache_name()
related_object = getattr(self, cache_name, None)
if related_object is None:
result[cache_name] = None
if hasattr(related_object, 'as_dict'):
result[cache_name] = related_object.as_dict(**option)
def _on_inspect_many_to_one_field(self, rf, result: Dict, option: Dict):
cache_name = rf.get_cache_name()
related_object = getattr(self, cache_name, None)
if related_object is None:
result[cache_name] = None
if hasattr(related_object, 'as_dict'):
result[cache_name] = related_object.as_dict(**option)
def _on_inspect_many_to_many_field(self, rf, result: Dict, option: Dict):
cache_name = rf.get_cache_name()
result[cache_name] = [
related_object.as_dict(**option)
for related_object in getattr(self, cache_name).iterator()
if hasattr(related_object, 'as_dict')
]
class QuerySet(BaseQuerySet):
def as_dict(self, *args, **kwargs):
return [
obj.as_dict(*args, **kwargs)
for obj in self.iterator()
]
def ci_search(self, **kwargs): # pragma: no cover
# case insensitive search
db_table_name = self.model._meta.db_table
where_clause = ""
for prop, keyword in kwargs.items():
if where_clause:
where_clause += " OR "
where_clause += f"{db_table_name}.{prop} " \
f"LIKE '%%{keyword}%%' COLLATE utf8_general_ci"
return self.extra(where=[where_clause])
"""
@classmethod
def from_queryset(cls, queryset_class, class_name=None):
if class_name is None:
class_name = '%sFrom%s' % (cls.__name__, queryset_class.__name__)
return type(class_name, (cls,), {
'_queryset_class': queryset_class,
**cls._get_queryset_methods(queryset_class),
})
## return a new_class
"""
class Manager(BaseManager.from_queryset(QuerySet)):
pass
class Model(BaseModel, ToDictMixin):
objects = Manager()
class Meta:
abstract = True