版本 2.7<=, <=3.8
https://pypi.org/project/pymodm/
https://pymodm.readthedocs.io/en/stable/api/index.html#model-fields

github
https://github.com/mongodb/pymodm/

https://github.com/mongodb/pymodm/blob/master/test

example

models.py

  1. from pymodm.connection import connect
  2. connect("mongodb://root:example@192.168.204.129:27017/pintai?authSource=admin", alias="hwmg")
  3. import datetime as dt
  4. from pathlib import Path
  5. from pymodm.errors import ValidationError
  6. from pymongo.write_concern import WriteConcern
  7. from pymodm import MongoModel, fields
  8. def must_be_exists(value):
  9. if not Path(value).exists():
  10. raise ValidationError('指定的路径不存在')
  11. class Magazine(MongoModel):
  12. name = fields.CharField(required=True, min_length=5, max_length=20)
  13. uptime = fields.DateTimeField(default=dt.datetime.utcnow)
  14. # path = fields.CharField(required=True, validators=[must_be_exists])
  15. class Meta:
  16. write_concern = WriteConcern(j =True)
  17. connection_alias = 'hwmg'
  18. collection_name = 'MAGAZINE'

app.py

  1. from typing import Dict
  2. from models import Magazine
  3. from pathlib import Path
  4. from pymodm.errors import ValidationError
  5. def main():
  6. # magazine = Magazine(name='安天安全周期第1期', path=r'/home/magazine/001.pdf')
  7. # if magazine.is_valid():
  8. # magazine.save()
  9. m1 = Magazine(name='安天')
  10. # if m1.is_valid():
  11. # m1.save()
  12. # m1.save()
  13. try:
  14. m1.full_clean()
  15. except ValidationError as ve:
  16. if isinstance(ve.message, dict):
  17. # print(ve.message)
  18. for k, v in ve.message.items():
  19. print(k,'=' ,v)
  20. if __name__ == "__main__":
  21. main()

3

hwmg

  1. import pymongo
  2. import datetime as dt
  3. from datetime import timedelta
  4. from flask import request
  5. from pathlib import Path
  6. from app.models import Magazine
  7. from pymodm.errors import ValidationError
  8. from commonUtil.file_operation import filename_check, genDir, getSize
  9. # from module_server.resource_node.log_func import LOGGER
  10. from config.config import SECURITY_INFO_FILE_UPLOAD, MAX_ALLOW_UPLOAD_FILE_SIZE, ALLOW_UPLOAD_FILE_SUFFIX
  11. # 安全资讯文档上传存放路径 /home/hwmg/log/download/security_info
  12. genDir(SECURITY_INFO_FILE_UPLOAD)
  13. def get_sec_info_list(info_data):
  14. """获取查询结果集成员, 生成列表
  15. Args:
  16. info_data: _description_
  17. """
  18. info_list = list()
  19. for info in info_data:
  20. item = {
  21. "id": str(info.pk),
  22. "name": info.name,
  23. "uploadTime": info.uptime.timestamp(),
  24. "url": f"download/security_info/{info.name}"
  25. }
  26. info_list.append(item)
  27. return info_list
  28. def sec_info_get(json_data):
  29. """获取安全资讯信息
  30. Args:
  31. json_data: _description_
  32. """
  33. page = int(json_data.get("page", 1))
  34. limit = int(json_data.get("limit", 10))
  35. name = json_data.get("name")
  36. upload_time = json_data.get("uploadTime")
  37. tt = dt.datetime.now() # 查询时间段结束时间, to_time 默认当前
  38. ft = tt - timedelta(days=7) # 查询时间段开始时间, from_time 默认7天前
  39. payload = dict()
  40. if upload_time:
  41. from_ts, to_ts = upload_time.split(",")
  42. ft = dt.datetime.fromtimestamp(int(from_ts))
  43. tt = dt.datetime.fromtimestamp(int(to_ts))
  44. find_qs = {"uptime": {"$gte": ft, "$lte": tt}} # 查询条件,时间段-上传时间
  45. if name:
  46. find_qs.update({"name": {"$regex": name}})
  47. # 获取满足查询条件的所有结果, 实现排序
  48. result = Magazine.objects.raw(find_qs).order_by([("uptime", pymongo.DESCENDING)])
  49. if result.count() < 1:
  50. payload['total'] = 0
  51. payload['data'] = []
  52. else:
  53. payload['total'] = result.count()
  54. payload['data'] = get_sec_info_list(result.limit(limit).skip((page - 1) * limit))
  55. return dict(status=1, message="查询成功", payload = payload)
  56. def sec_info_post(json_dat):
  57. """上传安全资讯
  58. Args:
  59. json_dat: _description_
  60. """
  61. sec_info_file = request.files.get("file")
  62. if not sec_info_file:
  63. return dict(status=-1, message="上传文件未找到")
  64. sec_info_filename = filename_check(sec_info_file.filename)
  65. sec_info_file_type = Path(sec_info_filename).suffix
  66. if sec_info_file_type.lower() not in ALLOW_UPLOAD_FILE_SUFFIX:
  67. return dict(status=-1, message="不支持的文件类型")
  68. if sec_info_filename == sec_info_file_type:
  69. return dict(status=-1, message="文件名称检查异常")
  70. try:
  71. Magazine.objects.get({"name":sec_info_filename})
  72. except Magazine.DoesNotExist:
  73. sec_info_file_save_path = SECURITY_INFO_FILE_UPLOAD / sec_info_filename
  74. sec_info_file.save(str(sec_info_file_save_path))
  75. if getSize(str(sec_info_file_save_path)) > 1024 * 1024 * MAX_ALLOW_UPLOAD_FILE_SIZE:
  76. return dict(status=-1, message="文件大小超过限制 50M")
  77. # 数据验证部分返回错误消息
  78. try:
  79. Magazine(name=sec_info_filename, path=str(sec_info_file_save_path)).save()
  80. return dict(status=1, message="上传成功")
  81. except ValidationError as ve:
  82. for k, v in ve.message.items():
  83. if k == "name":
  84. return dict(status=-1, message=f"文件名长度不合法: {v}")
  85. else:
  86. return dict(status=-1, message=f"{v}")
  87. else:
  88. return dict(status=-1, message="文件已存在")