123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- from datetime import datetime
- from typing import Optional, Tuple
- from models.user_data import UserModel
- from stores.user_store import UserStore
- class UserService:
- def __init__(self):
- self._user_store = UserStore()
-
- def login(self, keyword: str, password: str) -> Tuple[Optional[UserModel], str]:
- # 验证必填参数
- if not keyword or not password:
- return None, "用户名/邮箱/手机号和密码不能为空"
- # 查询用户
- user = self._user_store.query_user(keyword)
- if not user:
- return None, "用户不存在"
- # 验证密码
- if not user.verify_password(password):
- return None, "密码错误"
- # 更新最后登录时间
- try:
- self._user_store.update_last_login(user.id)
- user.update_last_login()
- return user, "登录成功"
- except Exception as e:
- return None, f"登录失败:{str(e)}"
- def update_user_info(self, user_id: int, email: Optional[str] = None,
- phone: Optional[str] = None, name: Optional[str] = None,
- update_by: Optional[str] = None) -> Tuple[bool, str]:
- # 查询用户是否存在
- user = self._user_store.query_user_by_id(user_id)
- if not user:
- return False, "用户不存在"
- # 检查邮箱是否已被其他用户使用
- if email and email != user.email and self._user_store.check_email_exists(email, user_id):
- return False, "邮箱已被使用"
- # 检查手机号是否已被其他用户使用
- if phone and phone != user.phone and self._user_store.check_phone_exists(phone, user_id):
- return False, "手机号已被使用"
- # 更新用户信息
- user.email = email if email is not None else user.email
- user.phone = phone if phone is not None else user.phone
- user.name = name if name is not None else user.name
- user.update_by = update_by
- try:
- self._user_store.update_user(user)
- return True, "更新成功"
- except Exception as e:
- return False, f"更新失败:{str(e)}"
- def update_password(self, user_id: int, old_password: str,
- new_password: str) -> Tuple[bool, str]:
- # 验证必填参数
- if not old_password or not new_password:
- return False, "旧密码和新密码不能为空"
- # 查询用户
- user = self._user_store.query_user_by_id(user_id)
- if not user:
- return False, "用户不存在"
- # 验证旧密码
- if not user.verify_password(old_password):
- return False, "旧密码错误"
- # 设置新密码
- user.set_password_hash(new_password)
- try:
- self._user_store.update_user_password(user_id, user.get_password())
- return True, "密码更新成功"
- except Exception as e:
- return False, f"密码更新失败:{str(e)}"
- def delete_user(self, user_id: int, delete_by: str = "") -> Tuple[bool, str]:
- # 查询用户是否存在
- user = self._user_store.query_user_by_id(user_id)
- if not user:
- return False, "用户不存在"
- try:
- self._user_store.delete_user(user_id, delete_by)
- return True, "删除成功"
- except Exception as e:
- return False, f"删除失败:{str(e)}"
- def get_user_by_id(self, user_id: int) -> Tuple[Optional[dict], str]:
- user = self._user_store.query_user_by_id(user_id)
- if not user:
- return None, "用户不存在"
- return user.to_dict(), "获取成功"
- def get_users_paginated(self, page: int = 1, per_page: int = 10,
- keyword: Optional[str] = None) -> Tuple[list, int, str]:
- try:
- users, total = self._user_store.query_user_all_paginated(page, per_page, keyword)
- return [user.to_dict() for user in users], total, "获取成功"
- except Exception as e:
- return [], 0, f"获取失败:{str(e)}"
- def default_user(self):
- user = UserModel(
- username="admin",
- email="admin@example.com",
- phone="1234567890",
- name="Administrator",
- create_by="admin",
- update_by="admin"
- )
- user.set_password_hash("admin")
- self._user_store.insert_user(user)
- # def register_user(self, username: str, password: str, email: Optional[str] = None,
- # phone: Optional[str] = None, name: Optional[str] = None,
- # create_by: Optional[str] = None) -> Tuple[bool, str]:
- # # 验证必填参数
- # if not username or not password:
- # return False, "用户名和密码不能为空"
- # # 检查用户名是否已存在
- # if self._user_store.check_username_exists(username):
- # return False, "用户名已存在"
- # # 检查邮箱是否已存在
- # if email and self._user_store.check_email_exists(email):
- # return False, "邮箱已被使用"
- # # 检查手机号是否已存在
- # if phone and self._user_store.check_phone_exists(phone):
- # return False, "手机号已被使用"
- # # 创建新用户
- # user = UserModel(
- # username=username,
- # email=email,
- # phone=phone,
- # name=name,
- # create_by=create_by,
- # update_by=create_by
- # )
- # user.set_password_hash(password)
- # try:
- # self._user_store.insert_user(user)
- # return True, "注册成功"
- # except Exception as e:
- # return False, f"注册失败:{str(e)}"
|