123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404 |
- import calendar
- from datetime import datetime
- import utils
- from models.process_data import ProcessData
- from models.process_result_data import ProcessResultData, InstrumentData
- from stores.data_store_interface import IDataStore
- class DataSend:
- _error_arr = []
- _email_area_arr = []
- _email_area_virtual_arr = []
- @property
- def store(self) -> IDataStore:
- return self._store
- def __init__(self, store: IDataStore):
- self._store = store
- self._email_area_arr = self.store.query_all_emails()
- self._email_area_virtual_arr = self.store.query_all_virtual_emails()
- def send(self) -> None:
- self._error_arr = []
- list = self.store.query_to_send()
- utils.get_logger().info(f"开始发送邮件,数量为 {len(list)}")
- for item in list:
- self._send_item(item)
- if len(self._error_arr) > 0:
- self._send_email_no_found()
- def send_report_current_month(self):
- # 查询当月的数据
- start_date, end_date = self._get_first_and_last_day_of_current_month()
- self._send_reports(start_date, end_date)
- def send_report_prev_month(self):
- # 查询上月的数据
- start_date, end_date = self._get_first_and_last_day_of_prev_month()
- self._send_reports(start_date, end_date)
- def _send_reports(self, start_date, end_date):
- utils.get_logger().info(
- f"开始发送中标报告邮件,开始日期:{start_date.strftime("%Y-%m-%d")},结束日期:{end_date.strftime("%Y-%m-%d")}"
- )
- email = self.store.query_master_email()
- if not email:
- utils.get_logger().error("没有找到master email")
- return
- items = self.store.query_to_report_by_date(start_date, end_date)
- title_prev = utils.get_config_value("email.report_title_prev", "【中标报告】")
- title = f"{start_date.month}月中标结果报告"
- body = self._build_report_email_html(title, items)
- attach_path = self._gen_report_exlecl(title, items)
- flag = utils.send_email(email, f"{title_prev} {title}", body, True, attach_path)
- if flag:
- utils.get_logger().info("发送中标报告邮件成功")
- def _send_item(self, item: ProcessData) -> None:
- utils.get_logger().info(f"开始发送邮件,地区为:{item.city} ,URL为 {item.url}")
- email = self._get_email_by_area(item.city)
- if not email:
- utils.get_logger().error(f"{item.city} 下没有找到email")
- if item.city not in self._error_arr:
- self._error_arr.append(item.city)
- return
- title_prev = utils.get_config_value("email.title_prev", "【招标信息】")
- body = self._build_email_html(item)
- flag = utils.send_email(
- email, f"{title_prev} {item.title}", body, True, item.attach_path
- )
- if flag:
- self.store.set_send(item.no)
- def _get_email_by_area(
- self, area: str, count: int = 0, virtual_area: str = None
- ) -> str:
- email = None
- area_str = (
- area.replace("省", "").replace("市", "").replace("区", "").replace("县", "")
- )
- for area_item in self._email_area_arr:
- if area_str in area_item.area:
- email = area_item.email
- if virtual_area:
- new_area = f"{area_item.area},{virtual_area}"
- self.store.update_area_email_area_by_name(area_item.name, new_area)
- self._email_area_arr = self.store.query_all_emails()
- break
- if not email and count < 3:
- area_name = self._get_email_by_area_virtual(area_str)
- if area_name:
- virtual_area = (
- f"{area_str},{virtual_area}" if virtual_area else area_str
- )
- email = self._get_email_by_area(area_name, count + 1, virtual_area)
- return email
- def _get_email_by_area_virtual(self, area: str) -> str:
- name = None
- for area_item in self._email_area_virtual_arr:
- if area in area_item.area:
- name = area_item.name
- break
- return name
- @staticmethod
- def _build_email_html(item: ProcessData, other: str = "") -> str:
- html_body = f"""
- <html>
- <head>
- <style>
- body {{
- background-color: #f4f4f9;
- font-family: Arial, sans-serif;
- margin: 0;
- padding: 20px;
- }}
- h1 {{
- text-align: center;
- color: #333;
- }}
- .container {{
- max-width: 600px;
- margin: 0 auto;
- background-color: #fff;
- padding: 20px;
- border-radius: 8px;
- box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
- }}
- .button-container {{
- text-align: center;
- margin-top: 20px;
- }}
- .button {{
- display: inline-block;
- padding: 10px 20px;
- font-size: 16px;
- color: #fff!important;
- background-color: #007bff;
- text-decoration: none;
- border-radius: 5px;
- transition: background-color 0.3s;
- }}
- .button:hover {{
- background-color: #0056b3;
- }}
- .system {{
- color: #aaa;
- }}
- </style>
- </head>
- <body>
- <div class="container">
- <h1>{item.title}</h1>
- <p><strong>招标编号:</strong> {item.no if item.no else ""}</p>
- <p><strong>项目区域:</strong> {item.provice if item.provice else ""}{item.city if item.city else ""}</p>
- <p><strong>相关设备:</strong> {item.devices if item.devices else ""}</p>
- <p><strong>开标时间:</strong> {item.date if item.date else ""}</p>
- <p><strong>开标地点:</strong> {item.address if item.address else ""}</p>
- <p><strong>发布日期:</strong> {item.release_date if item.release_date else ""}</p>
- <p><strong>标书摘要:</strong> {item.summary if item.summary else ""}</p>
- <div class="button-container">
- <a href="{item.url}" class="button">查看详情</a>
- </div>
- <div>
- <h3>{other}</h3>
- </div>
- <p class="system">本邮件由系统自动发送,请勿回复。</p>
- </div>
- </body>
- </html>
- """
- return html_body
- def _build_report_email_html(self, title, items: list[ProcessResultData]) -> str:
- body = self._build_report_email_body(items)
- html = f"""
- <html>
- <head>
- <style>
- body {{
- background-color: #f4f4f9;
- font-family: Arial, sans-serif;
- margin: 0;
- padding: 20px;
- }}
- h1 {{
- text-align: center;
- color: #333;
- }}
- .container {{
- max-width: 1200px;
- margin: 0 auto;
- background-color: #fff;
- padding: 20px;
- border-radius: 8px;
- box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
- }}
- .system {{
- color: #aaa;
- font-size: 80%;
- }}
- .table-container {{
- overflow-x: auto;
- width: 100%;
- }}
- .table {{
- width: 1200px;
- background-color: #ffffff;
- border: 1px solid #dddddd;
- border-radius: 8px;
- margin-bottom: 20px;
- padding: 20px;
- box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1);
- border-collapse: collapse;
- }}
- .table th, .table td {{
- padding: 5px;
- border-bottom: 1px solid #dddddd;
- word-wrap: break-word;
- text-align: center;
- font-size:12px;
- }}
- .table th:not(:first-child), .table td:not(:first-child) ,.table tr.instrument-row th,.table tr.instrument-row td{{
- border-left: 1px solid #dddddd;
- }}
-
- .table th {{
- padding: 10px;
- background-color: #f8f9fa;
- font-weight: bold;
- font-size:14px;
- }}
- .table tr:last-child td {{
- border-bottom: none;
- }}
- .table td a{{
- color: #007bff;
- }}
- </style>
- </head>
- <body>
- <div class="container">
- <h1>{title}</h1>
- {body}
- <p class="system">本邮件由系统自动发送,请勿回复。</p>
- </div>
- </body>
- </html>
- """
- return html
- def _build_report_email_body(self, items: list[ProcessResultData]) -> str:
- if not items:
- return ""
- body = """
- <div class="table-container">
- <table class="table">
- <tr>
- <th style="width:200px" rowspan="2">项目名称</th>
- <th style="width:100px" rowspan="2">地区</th>
- <th colspan="6" style="width:800px">中标设备</th>
- <th style="width:100px" rowspan="2">公告日期</th>
- </tr>
- <tr class='instrument-row'>
- <th style="180px">仪器名称</th>
- <th style="200px">仪器厂商</th>
- <th style="130px">仪器型号</th>
- <th style="80px">数量</th>
- <th style="140px">单价(元)</th>
- <th>中标单位</th>
- </tr>
- """
- for item in items:
- body += self._gen_report_body_item(item)
- body += "</table></div>"
- return body
- def _gen_report_body_item(self, item: ProcessResultData):
- if not item:
- return ""
- row_count = len(item.instruments) if item.instruments else 1
- html = f"""
- <tr>
- <td rowspan="{row_count}"><a title="点击查看详情" href="{item.url}">{item.title}</a></td>
- <td rowspan="{row_count}">{item.provice if item.provice else ''}{item.city if item.city else ''}</td>
- {self._gen_report_body_item_instrument(item.instruments[0] if item.instruments else None)}
- <td rowspan="{row_count}">{item.date if item.date else ''}</td>
- </tr>
- """
- if row_count > 1:
- for instrument in item.instruments[1:]:
- html += f"""
- <tr class="instrument-row">
- {self._gen_report_body_item_instrument(instrument)}
- </tr>
- """
- return html
- @staticmethod
- def _gen_report_body_item_instrument(instrument):
- if not instrument:
- return '<td colspan="6">无设备信息</td>'
- return f"""
- <td>{instrument.name if instrument.name else ''}</td>
- <td>{instrument.manufacturer if instrument.manufacturer else ''}</td>
- <td>{instrument.model if instrument.model else ''}</td>
- <td>{instrument.quantity if instrument.quantity else ''}</td>
- <td>{instrument.unit_price if instrument.unit_price else ''}</td>
- <td>{instrument.company if instrument.company else ''}</td>
- """
- def _gen_report_exlecl(self, title, items: list[ProcessResultData]) -> str:
- all_data = []
- for item in items:
- if item.instruments:
- # 获取第一台仪器的数据
- first_instrument = item.instruments[0]
- all_data.append(self._gen_report_row_data(item, first_instrument))
- # 处理剩余的仪器
- for instrument in item.instruments[1:]:
- all_data.append(self._gen_report_row_data(None, instrument))
- else:
- # 如果没有仪器,只添加 ProcessResultData 的字段
- all_data.append(self._gen_report_row_data(item, None))
- return utils.save_reort_excel(all_data, title)
- @staticmethod
- def _gen_report_row_data(
- data: ProcessResultData | None, instrument: InstrumentData | None
- ):
- return {
- "项目编号": data.no if data and data.no else "",
- "项目名称": data.title if data and data.title else "",
- "公告日期": data.date if data and data.date else "",
- "招标省份": data.provice if data and data.provice else "",
- "招标城市": data.city if data and data.city else "",
- "中标单位名称": instrument.company if instrument.company else "",
- "仪器名称": instrument.name if instrument and instrument.name else "",
- "仪器厂商": (
- instrument.manufacturer
- if instrument and instrument.manufacturer
- else ""
- ),
- "仪器型号": instrument.model if instrument and instrument.model else "",
- "数量": instrument.quantity if instrument and instrument.quantity else "",
- "单价": (
- instrument.unit_price if instrument and instrument.unit_price else ""
- ),
- "公告摘要": data.summary if data and data.summary else "",
- "URL": data.url if data and data.url else "",
- }
- def _send_email_no_found(self) -> None:
- email = utils.get_config_value("email.error_email")
- utils.get_logger().info(f"开始发送区域邮箱未匹配邮件: {email}")
- if not email:
- return
- title = "Warning: 相关地区没有匹配到邮箱,请及时添加相关配置"
- content = "以下区域中没有配置邮箱:\n\n "
- content += "、".join(self._error_arr)
- content += "\n\n请及时添加相关配置。"
- utils.send_email(email, title, content, False, None)
- @staticmethod
- def _get_first_and_last_day_of_current_month():
- # 获取当前日期
- today = datetime.today()
- # 获取这个月的第一天
- first_day_of_current_month = datetime(today.year, today.month, 1, 0, 0, 0)
- # 获取这个月的最后一天
- _, last_day = calendar.monthrange(today.year, today.month)
- last_day_of_current_month = datetime(
- today.year, today.month, last_day, 23, 59, 59
- )
- return first_day_of_current_month, last_day_of_current_month
- @staticmethod
- def _get_first_and_last_day_of_prev_month():
- # 获取当前日期
- today = datetime.today()
- # 获取上个月的年份和月份
- if today.month == 1:
- prev_month_year = today.year - 1
- prev_month = 12
- else:
- prev_month_year = today.year
- prev_month = today.month - 1
- # 获取上个月的第一天
- first_day_prev_month = datetime(prev_month_year, prev_month, 1, 0, 0, 0)
- # 获取上个月的最后一天
- _, last_day = calendar.monthrange(prev_month_year, prev_month)
- last_day_of_prev_month = datetime(
- prev_month_year, prev_month, last_day, 23, 59, 59
- )
- return first_day_prev_month, last_day_of_prev_month
|