service_user.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. from datetime import datetime
  2. from typing import Optional, Tuple
  3. from models.user_data import UserModel
  4. from stores.user_store import UserStore
  5. class UserService:
  6. def __init__(self):
  7. self._user_store = UserStore()
  8. def login(self, keyword: str, password: str) -> Tuple[Optional[UserModel], str]:
  9. # 验证必填参数
  10. if not keyword or not password:
  11. return None, "用户名/邮箱/手机号和密码不能为空"
  12. # 查询用户
  13. user = self._user_store.query_user(keyword)
  14. if not user:
  15. return None, "用户不存在"
  16. # 验证密码
  17. if not user.verify_password(password):
  18. return None, "密码错误"
  19. # 更新最后登录时间
  20. try:
  21. self._user_store.update_last_login(user.id)
  22. user.update_last_login()
  23. return user, "登录成功"
  24. except Exception as e:
  25. return None, f"登录失败:{str(e)}"
  26. def update_user_info(self, user_id: int, email: Optional[str] = None,
  27. phone: Optional[str] = None, name: Optional[str] = None,
  28. update_by: Optional[str] = None) -> Tuple[bool, str]:
  29. # 查询用户是否存在
  30. user = self._user_store.query_user_by_id(user_id)
  31. if not user:
  32. return False, "用户不存在"
  33. # 检查邮箱是否已被其他用户使用
  34. if email and email != user.email and self._user_store.check_email_exists(email, user_id):
  35. return False, "邮箱已被使用"
  36. # 检查手机号是否已被其他用户使用
  37. if phone and phone != user.phone and self._user_store.check_phone_exists(phone, user_id):
  38. return False, "手机号已被使用"
  39. # 更新用户信息
  40. user.email = email if email is not None else user.email
  41. user.phone = phone if phone is not None else user.phone
  42. user.name = name if name is not None else user.name
  43. user.update_by = update_by
  44. try:
  45. self._user_store.update_user(user)
  46. return True, "更新成功"
  47. except Exception as e:
  48. return False, f"更新失败:{str(e)}"
  49. def update_password(self, user_id: int, old_password: str,
  50. new_password: str) -> Tuple[bool, str]:
  51. # 验证必填参数
  52. if not old_password or not new_password:
  53. return False, "旧密码和新密码不能为空"
  54. # 查询用户
  55. user = self._user_store.query_user_by_id(user_id)
  56. if not user:
  57. return False, "用户不存在"
  58. # 验证旧密码
  59. if not user.verify_password(old_password):
  60. return False, "旧密码错误"
  61. # 设置新密码
  62. user.set_password_hash(new_password)
  63. try:
  64. self._user_store.update_user_password(user_id, user.get_password())
  65. return True, "密码更新成功"
  66. except Exception as e:
  67. return False, f"密码更新失败:{str(e)}"
  68. def delete_user(self, user_id: int, delete_by: str = "") -> Tuple[bool, str]:
  69. # 查询用户是否存在
  70. user = self._user_store.query_user_by_id(user_id)
  71. if not user:
  72. return False, "用户不存在"
  73. try:
  74. self._user_store.delete_user(user_id, delete_by)
  75. return True, "删除成功"
  76. except Exception as e:
  77. return False, f"删除失败:{str(e)}"
  78. def get_user_by_id(self, user_id: int) -> Tuple[Optional[dict], str]:
  79. user = self._user_store.query_user_by_id(user_id)
  80. if not user:
  81. return None, "用户不存在"
  82. return user.to_dict(), "获取成功"
  83. def get_users_paginated(self, page: int = 1, per_page: int = 10,
  84. keyword: Optional[str] = None) -> Tuple[list, int, str]:
  85. try:
  86. users, total = self._user_store.query_user_all_paginated(page, per_page, keyword)
  87. return [user.to_dict() for user in users], total, "获取成功"
  88. except Exception as e:
  89. return [], 0, f"获取失败:{str(e)}"
  90. def default_user(self):
  91. user = UserModel(
  92. username="admin",
  93. email="admin@example.com",
  94. phone="1234567890",
  95. name="Administrator",
  96. create_by="admin",
  97. update_by="admin"
  98. )
  99. user.set_password_hash("admin")
  100. self._user_store.insert_user(user)
  101. # def register_user(self, username: str, password: str, email: Optional[str] = None,
  102. # phone: Optional[str] = None, name: Optional[str] = None,
  103. # create_by: Optional[str] = None) -> Tuple[bool, str]:
  104. # # 验证必填参数
  105. # if not username or not password:
  106. # return False, "用户名和密码不能为空"
  107. # # 检查用户名是否已存在
  108. # if self._user_store.check_username_exists(username):
  109. # return False, "用户名已存在"
  110. # # 检查邮箱是否已存在
  111. # if email and self._user_store.check_email_exists(email):
  112. # return False, "邮箱已被使用"
  113. # # 检查手机号是否已存在
  114. # if phone and self._user_store.check_phone_exists(phone):
  115. # return False, "手机号已被使用"
  116. # # 创建新用户
  117. # user = UserModel(
  118. # username=username,
  119. # email=email,
  120. # phone=phone,
  121. # name=name,
  122. # create_by=create_by,
  123. # update_by=create_by
  124. # )
  125. # user.set_password_hash(password)
  126. # try:
  127. # self._user_store.insert_user(user)
  128. # return True, "注册成功"
  129. # except Exception as e:
  130. # return False, f"注册失败:{str(e)}"