| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- from datetime import datetime, timedelta
- from apscheduler.triggers.cron import CronTrigger
- # 重写Cron定时
- class VbCronTrigger(CronTrigger):
- @classmethod
- def from_crontab(cls, expr: str, timezone=None):
- values = expr.split()
- if len(values) != 6 and len(values) != 7:
- raise ValueError('Wrong number of fields; got {}, expected 6 or 7'.format(len(values)))
-
- second = values[0]
- minute = values[1]
- hour = values[2]
- if '?' in values[3]:
- day = None
- elif 'L' in values[5]:
- day = f"last {values[5].replace('L', '')}"
- elif 'W' in values[3]:
- day = cls.__find_recent_workday(int(values[3].split('W')[0]))
- else:
- day = values[3].replace('L', 'last')
- month = values[4]
- if '?' in values[5] or 'L' in values[5]:
- week = None
- elif '#' in values[5]:
- week = int(values[5].split('#')[1])
- else:
- week = values[5]
- if '#' in values[5]:
- day_of_week = int(values[5].split('#')[0]) - 1
- else:
- day_of_week = None
- year = values[6] if len(values) == 7 else None
- return cls(
- second=second,
- minute=minute,
- hour=hour,
- day=day,
- month=month,
- week=week,
- day_of_week=day_of_week,
- year=year,
- timezone=timezone,
- )
-
- @classmethod
- def __find_recent_workday(cls, day: int):
- now = datetime.now()
- date = datetime(now.year, now.month, day)
- if date.weekday() < 5:
- return date.day
- else:
- diff = 1
- while True:
- previous_day = date - timedelta(days=diff)
- if previous_day.weekday() < 5:
- return previous_day.day
- else:
- diff += 1
|