lico.core.contrib.models.py

  1. from typing import Callable, Dict, Iterable, Optional
  2. from django.db.models import Model as BaseModel
  3. from django.db.models.manager import BaseManager
  4. from django.db.models.query import QuerySet as BaseQuerySet
  5. class ToDictMixin:
  6. as_dict_exclude = ()
  7. """
  8. # workspace\lenovo_git\lico-core\lico-core-license\lico\core\license\views.py
  9. license = License.objects.get()
  10. result = license.as_dict(
  11. include=[
  12. 'license_key', 'customer_name',
  13. 'cluster_type', 'license_code', 'expire_time',
  14. 'license_numbers', 'status'
  15. ]
  16. )
  17. result: {
  18. "license_key": "b\"l\\xaa\\x",
  19. "cluster_type": "host",
  20. "license_code": "Lenovo HPC AI LiCO Software",
  21. "expire_time": "2032-06-30",
  22. "license_numbers": 64,
  23. "status": "valid",
  24. "rest_days": 4146
  25. }
  26. >>> from lico.core.accounting.models import BillGroup, Deposit
  27. >>> query = Deposit.objects.all()
  28. >>> query[0].as_dict()
  29. {'id': 1, 'user': 'test', 'credits': -240.0, 'apply_time': 1615432804, 'approved_time': 1615432804, 'billing_type': '', 'billing_id': None, 'balance': -107.0, 'bill_group': {'id': 1, 'name': 'test', 'balance': 500.0, 'charged': 0.0, 'used_time': 0, 'used_credits': 0.0, 'description': '', 'charge_rate': 2.0, 'cr_minute': 0.0, 'cr_display_type': 'hour', 'last_operation_time': 1615442820, 'gres_charge_rate': {}, 'gcr_minute': {}, 'gcr_display_type': {}, 'memory_charge_rate': 4.0, 'mcr_minute': 0.0, 'mcr_display_type': 'hour', 'storage_charge_rate': 1.0}}
  30. >>> query[0].as_dict(include = ['bill_group'])
  31. {'bill_group': {'id': 1, 'name': 'test', 'balance': 500.0, 'charged': 0.0, 'used_time': 0, 'used_credits': 0.0, 'description': '', 'charge_rate': 2.0, 'cr_minute': 0.0, 'cr_display_type': 'hour', 'last_operation_time': 1615442820, 'gres_charge_rate': {}, 'gcr_minute': {}, 'gcr_display_type': {}, 'memory_charge_rate': 4.0, 'mcr_minute': 0.0, 'mcr_display_type': 'hour', 'storage_charge_rate': 1.0}}
  32. >>> query[0].as_dict(exclude = ['bill_group'])
  33. {'id': 1, 'user': 'test', 'credits': -240.0, 'apply_time': 1615432804, 'approved_time': 1615432804, 'billing_type': '', 'billing_id': None, 'balance': -107.0}
  34. """
  35. def as_dict(
  36. self, inspect_related=True,
  37. include: Optional[Iterable[str]] = None,
  38. exclude: Optional[Iterable[str]] = None,
  39. related_field_options: Optional[Dict] = None, ## https://sikasjc.github.io/2018/07/14/type-hint-in-python/
  40. **on_finished_options
  41. ):
  42. if include is not None:
  43. def is_excluded(field):
  44. return field not in include
  45. else:
  46. if exclude is None:
  47. exclude = set(self.as_dict_exclude)
  48. else:
  49. exclude = set(exclude) | set(self.as_dict_exclude)
  50. def is_excluded(field):
  51. return field in exclude
  52. if related_field_options is None:
  53. related_field_options = {}
  54. result = {}
  55. for field in self._meta.concrete_fields:
  56. if is_excluded(field.name) or field.is_relation:
  57. continue
  58. if hasattr(field, 'dict_from_object'):
  59. result[field.name] = field.dict_from_object(self)
  60. else:
  61. result[field.name] = field.value_from_object(self)
  62. if inspect_related:
  63. self.inspect_related_fields(
  64. result, is_excluded, related_field_options
  65. )
  66. self.as_dict_on_finished(result, is_excluded, **on_finished_options)
  67. return result
  68. def as_dict_on_finished(
  69. self, result: Dict, is_exlucded: Callable, **kwargs
  70. ):
  71. pass
  72. def inspect_related_fields(
  73. self, result: Dict, is_excluded: Callable,
  74. related_field_options: Dict
  75. ):
  76. related_fields = (
  77. rf for rf in self._meta.get_fields()
  78. if rf.is_relation and not is_excluded(rf.get_cache_name())
  79. )
  80. for rf in related_fields:
  81. option = related_field_options.get(
  82. rf.get_cache_name(), dict(inspect_related=False)
  83. )
  84. if rf.one_to_many:
  85. self._on_inspect_one_to_many_field(rf, result, option)
  86. if rf.many_to_one:
  87. self._on_inspect_many_to_one_field(rf, result, option)
  88. if rf.one_to_one:
  89. self._on_inspect_one_to_one_field(rf, result, option)
  90. if rf.many_to_many:
  91. self._on_inspect_many_to_many_field(rf, result, option)
  92. def _on_inspect_one_to_many_field(self, rf, result: Dict, option: Dict):
  93. cache_name = rf.get_cache_name()
  94. result[cache_name] = [
  95. related_object.as_dict(**option)
  96. for related_object in getattr(self, cache_name).iterator()
  97. if hasattr(related_object, 'as_dict')
  98. ]
  99. def _on_inspect_one_to_one_field(self, rf, result: Dict, option: Dict):
  100. cache_name = rf.get_cache_name()
  101. related_object = getattr(self, cache_name, None)
  102. if related_object is None:
  103. result[cache_name] = None
  104. if hasattr(related_object, 'as_dict'):
  105. result[cache_name] = related_object.as_dict(**option)
  106. def _on_inspect_many_to_one_field(self, rf, result: Dict, option: Dict):
  107. cache_name = rf.get_cache_name()
  108. related_object = getattr(self, cache_name, None)
  109. if related_object is None:
  110. result[cache_name] = None
  111. if hasattr(related_object, 'as_dict'):
  112. result[cache_name] = related_object.as_dict(**option)
  113. def _on_inspect_many_to_many_field(self, rf, result: Dict, option: Dict):
  114. cache_name = rf.get_cache_name()
  115. result[cache_name] = [
  116. related_object.as_dict(**option)
  117. for related_object in getattr(self, cache_name).iterator()
  118. if hasattr(related_object, 'as_dict')
  119. ]
  120. class QuerySet(BaseQuerySet):
  121. def as_dict(self, *args, **kwargs):
  122. return [
  123. obj.as_dict(*args, **kwargs)
  124. for obj in self.iterator()
  125. ]
  126. def ci_search(self, **kwargs): # pragma: no cover
  127. # case insensitive search
  128. db_table_name = self.model._meta.db_table
  129. where_clause = ""
  130. for prop, keyword in kwargs.items():
  131. if where_clause:
  132. where_clause += " OR "
  133. where_clause += f"{db_table_name}.{prop} " \
  134. f"LIKE '%%{keyword}%%' COLLATE utf8_general_ci"
  135. return self.extra(where=[where_clause])
  136. """
  137. @classmethod
  138. def from_queryset(cls, queryset_class, class_name=None):
  139. if class_name is None:
  140. class_name = '%sFrom%s' % (cls.__name__, queryset_class.__name__)
  141. return type(class_name, (cls,), {
  142. '_queryset_class': queryset_class,
  143. **cls._get_queryset_methods(queryset_class),
  144. })
  145. ## return a new_class
  146. """
  147. class Manager(BaseManager.from_queryset(QuerySet)):
  148. pass
  149. class Model(BaseModel, ToDictMixin):
  150. objects = Manager()
  151. class Meta:
  152. abstract = True