Kaynağa Gözat

Update 优化定时任务模块

YueYunyun 2 yıl önce
ebeveyn
işleme
6c51e5cc15

+ 2 - 2
SERVER/IotAdmin/app/schedule/apis/job.go

@@ -1,7 +1,7 @@
 package apis
 
 import (
-	"IotAdmin/app/schedule/jobs"
+	cJobs "IotAdmin/common/jobs"
 	"IotAdmin/common/permission"
 	"IotAdmin/core/sdk"
 	"fmt"
@@ -214,7 +214,7 @@ func (e SysJobApi) GetJobKeys(c *gin.Context) {
 		e.Error(500, err, err.Error())
 		return
 	}
-	result := jobs.GetJobKeys()
+	result := cJobs.GetJobKeys()
 	e.OK(result, "获取成功")
 }
 

+ 37 - 49
SERVER/IotAdmin/app/schedule/jobs/clean-log.go

@@ -2,11 +2,10 @@ package jobs
 
 import (
 	"IotAdmin/common"
-	"IotAdmin/core/logger"
+	"IotAdmin/common/jobs"
 	iotLog "IotAdmin/iot/log"
 	"strconv"
 	"strings"
-	"time"
 )
 
 // CleanLog
@@ -14,63 +13,52 @@ import (
 type CleanLog struct {
 }
 
-func (t *CleanLog) Exec(arg interface{}) error {
-	jobLogger.Infof("【CleanLog】 EXEC START")
+func (t *CleanLog) Exec(job *jobs.JobCore) error {
+	job.Logger.Infof("【CleanLog】 EXEC START")
 	err := common.CleanSysLog()
 	if err != nil {
-		jobLogger.Errorf("CleanSysLog EXEC ERROR:%s", err.Error())
+		job.Logger.Errorf("CleanSysLog EXEC ERROR:%s", err.Error())
 		return err
 	}
-	err = CleanJobLog(7)
+
+	arr := strings.Split(job.Args, ",")
+	var p1, p2, p3 int
+	p1, err = strconv.Atoi(arr[0])
 	if err != nil {
-		jobLogger.Errorf("CleanJobLog EXEC ERROR:%s", err.Error())
+		job.Logger.Errorf("CleanLog P1:%s EXEC ERROR:%s", arr[0], err.Error())
 		return err
 	}
-	switch arg.(type) {
-	case string:
-		if arg.(string) != "" {
-			str := arg.(string)
-			arr := strings.Split(str, ",")
-			p1, err := strconv.Atoi(arr[0])
-			if err != nil {
-				jobLogger.Errorf("CleanLog EXEC ERROR:%s", err.Error())
-				return err
-			}
-			if len(arr) > 1 {
-				p2, err := strconv.Atoi(arr[1])
-				if err != nil {
-					jobLogger.Errorf("CleanLog EXEC ERROR:%s", err.Error())
-					return err
-				}
-				err = iotLog.Clean(p1, p2)
-				if err != nil {
-					jobLogger.Errorf("CleanLog EXEC ERROR:%s", err.Error())
-					return err
-				}
-			} else {
-				err = iotLog.Clean(p1, p1)
-				if err != nil {
-					jobLogger.Errorf("CleanLog EXEC ERROR:%s", err.Error())
-					return err
-				}
-			}
-		} else {
-			jobLogger.Errorf("CleanLog EXEC ERROR: arg is empty")
+	p2 = p1
+	p3 = p1
+	if len(arr) > 2 {
+		p2, err = strconv.Atoi(arr[1])
+		if err != nil {
+			job.Logger.Errorf("CleanLog P2:%s EXEC ERROR:%s", arr[1], err.Error())
+			return err
+		}
+		p3, err = strconv.Atoi(arr[2])
+		if err != nil {
+			job.Logger.Errorf("CleanLog P3:%s EXEC ERROR:%s", arr[2], err.Error())
+			return err
 		}
-		break
-	case int:
-		if arg.(int) != 0 {
-			p := arg.(int)
-			err := iotLog.Clean(p, p)
-			if err != nil {
-				logger.Error(time.Now().Format(timeFormat) + " [ERROR] JobCore CleanLog exec error:" + err.Error())
-				return err
-			}
+	} else if len(arr) > 1 {
+		p2, err = strconv.Atoi(arr[1])
+		if err != nil {
+			job.Logger.Errorf("CleanLog P2:%s EXEC ERROR:%s", arr[1], err.Error())
+			return err
 		}
-		break
-	default:
 	}
-	jobLogger.Infof("【CleanLog】 EXEC END\r\n")
+	err = iotLog.Clean(p1, p2)
+	if err != nil {
+		job.Logger.Errorf("CleanLog EXEC ERROR:%s", err.Error())
+		return err
+	}
+	err = jobs.CleanJobLog(p3)
+	if err != nil {
+		job.Logger.Errorf("CleanJobLog EXEC ERROR:%s", err.Error())
+		return err
+	}
+	job.Logger.Infof("【CleanLog】 EXEC END\r\n")
 
 	return nil
 }

+ 0 - 33
SERVER/IotAdmin/app/schedule/jobs/examples.go

@@ -1,33 +0,0 @@
-package jobs
-
-import (
-	"IotAdmin/core/logger"
-	"time"
-)
-
-// ExamplesOne
-// 新添加的job 必须按照以下格式定义,并实现Exec函数
-type ExamplesOne struct {
-}
-
-func (t *ExamplesOne) Exec(arg interface{}) error {
-	logger.Info(time.Now().Format(timeFormat) + " [INFO] JobCore ExamplesOne exec START")
-	// TODO: 这里需要注意 Examples 传入参数是 string 所以 arg.(string);请根据对应的类型进行转化;
-	switch arg.(type) {
-	case string:
-		if arg.(string) != "" {
-
-		} else {
-			logger.Errorf(time.Now().Format(timeFormat) + " [ERROR] JobCore ExamplesOne exec error: arg is empty")
-		}
-		break
-	case int:
-		break
-	}
-
-	return nil
-}
-
-func (t *ExamplesOne) GetName() string {
-	return "函数测试"
-}

+ 8 - 86
SERVER/IotAdmin/app/schedule/jobs/init.go

@@ -1,93 +1,15 @@
 package jobs
 
-import (
-	"IotAdmin/app/schedule/models"
-	"IotAdmin/core/sdk"
-	"IotAdmin/core/sdk/pkg/cronjob"
+import "IotAdmin/common/jobs"
 
-	"gorm.io/gorm"
-)
-
-var (
-	// jobList 定义的job
-	jobList map[string]JobExec
-)
-
-// InitJob 初始化job
-func InitJob() {
-	initJobList()
-	Setup(sdk.Runtime.GetDb())
-}
-
-// 需要将定义的struct 添加到字典中;
-// 字典 key 可以配置到 自动任务 调用目标 中;
-func initJobList() {
-	jobList = map[string]JobExec{
-		"CleanLog":    &CleanLog{},
-		"ExamplesOne": &ExamplesOne{},
-	}
-}
-
-// Setup 初始化
-func Setup(dbs map[string]*gorm.DB) {
-	jobLogger.Infof("STARTING...")
-	for k, db := range dbs {
-		sdk.Runtime.SetCrontab(k, cronjob.NewWithSecondsLogger(jobLogger))
-		setup(k, db)
-	}
-}
-
-func setup(key string, db *gorm.DB) {
-	crontab := sdk.Runtime.GetCrontabKey(key)
-	sysJob := models.SysJob{}
-	jobList := make([]models.SysJob, 0)
-	err := sysJob.GetList(db, &jobList)
-	if err != nil {
-		jobLogger.Errorf("INIT ERROR:%s", err.Error())
-	}
-	if len(jobList) == 0 {
-		jobLogger.Infof("TOTAL:0")
+func init() {
+	mp := map[string]jobs.JobExec{
+		"CleanLog": &CleanLog{},
+		// ...
 	}
-
-	_, err = sysJob.RemoveAllEntryID(db)
-	if err != nil {
-		jobLogger.Errorf("REMOVE ALL ENTRY ERROR ERROR: %s", err.Error())
-	}
-
-	for i := 0; i < len(jobList); i++ {
-		if jobList[i].JobType == 1 {
-			j := &HttpJob{}
-			j.InvokeTarget = jobList[i].InvokeTarget
-			j.CronExpression = jobList[i].CronExpression
-			j.JobId = jobList[i].JobId
-			j.Name = jobList[i].JobName
-
-			sysJob.EntryId, err = AddJob(crontab, j)
-		} else if jobList[i].JobType == 2 {
-			j := &ExecJob{}
-			j.InvokeTarget = jobList[i].InvokeTarget
-			j.CronExpression = jobList[i].CronExpression
-			j.JobId = jobList[i].JobId
-			j.Name = jobList[i].JobName
-			j.Args = jobList[i].Args
-			sysJob.EntryId, err = AddJob(crontab, j)
-		}
-		err = sysJob.Update(db, jobList[i].JobId)
-	}
-
-	// 其中任务
-	crontab.Start()
-	jobLogger.Infof("START SUCCESS")
-	// 关闭任务
-	defer crontab.Stop()
-	select {}
+	jobs.AddJobs(mp)
 }
 
-// GetJobKeys 获取job key列表
-func GetJobKeys() map[string]string {
-	var mp = make(map[string]string)
-	for k, v := range jobList {
-		mp[k] = v.GetName()
-	}
-	return mp
+func Start() {
+	jobs.InitJob()
 }

+ 4 - 0
SERVER/IotAdmin/app/schedule/models/job.go

@@ -15,6 +15,10 @@ type SysJob struct {
 	CronExpression string `json:"cronExpression" gorm:"type:varchar(255);comment:Cron表达式"`
 	InvokeTarget   string `json:"invokeTarget" gorm:"type:varchar(255);comment:调用目标"`
 	Args           string `json:"args" gorm:"type:varchar(255);comment:目标参数"`
+	ExecCount      int    `json:"execCount" gorm:"type:bigint(20);comment:执行次数"`
+	FailCount      int    `json:"failCount" gorm:"type:bigint(20);comment:失败次数"`
+	LastExecTime   int64  `json:"lastExecTime" gorm:"type:bigint(20);comment:上次执行时间"`
+	NextExecTime   int64  `json:"nextExecTime" gorm:"type:bigint(20);comment:下次执行时间"`
 	MisfirePolicy  int    `json:"misfirePolicy" gorm:"type:bigint(20);comment:执行策略"`
 	Concurrent     int    `json:"concurrent" gorm:"type:tinyint(4);comment:是否并发"`
 	Status         int    `json:"status" gorm:"type:tinyint(4);comment:状态"`

+ 8 - 4
SERVER/IotAdmin/app/schedule/service/dto/job.go

@@ -19,10 +19,14 @@ type SysJobGetPageReq struct {
 }
 
 type SysJobOrder struct {
-	JobGroup  string `form:"jobGroupOrder"  search:"type:order;column:job_group;table:sys_job"`
-	JobType   string `form:"jobTypeOrder"  search:"type:order;column:job_type;table:sys_job"`
-	CreatedAt string `form:"createdAtOrder"  search:"type:order;column:created_at;table:sys_job"`
-	Status    string `form:"statusOrder"  search:"type:order;column:status;table:sys_job"`
+	JobGroup     string `form:"jobGroupOrder"  search:"type:order;column:job_group;table:sys_job"`
+	JobType      string `form:"jobTypeOrder"  search:"type:order;column:job_type;table:sys_job"`
+	ExecCount    string `form:"execCountOrder"  search:"type:order;column:exec_count;table:sys_job"`
+	FailCount    string `form:"failCountOrder"  search:"type:order;column:fail_count;table:sys_job"`
+	LastExecTime string `form:"lastExecTimeOrder"  search:"type:order;column:last_exec_time;table:sys_job"`
+	NextExecTime string `form:"nextExecTimeOrder"  search:"type:order;column:next_exec_time;table:sys_job"`
+	CreatedAt    string `form:"createdAtOrder"  search:"type:order;column:created_at;table:sys_job"`
+	Status       string `form:"statusOrder"  search:"type:order;column:status;table:sys_job"`
 }
 
 func (m *SysJobGetPageReq) GetNeedSearch() interface{} {

+ 10 - 6
SERVER/IotAdmin/app/schedule/service/job.go

@@ -4,10 +4,10 @@ import (
 	"errors"
 	"time"
 
-	"IotAdmin/app/schedule/jobs"
 	"IotAdmin/app/schedule/models"
 	"IotAdmin/app/schedule/service/dto"
 	cDto "IotAdmin/common/dto"
+	cJobs "IotAdmin/common/jobs"
 	"IotAdmin/common/permission"
 	"IotAdmin/core/sdk/service"
 
@@ -129,23 +129,27 @@ func (e *SysJobService) StartJob(c *dto.SysJobGetReq) error {
 	}
 
 	if data.JobType == 1 {
-		var j = &jobs.HttpJob{}
+		var j = &cJobs.HttpJob{}
 		j.InvokeTarget = data.InvokeTarget
 		j.CronExpression = data.CronExpression
 		j.JobId = data.JobId
 		j.Name = data.JobName
-		data.EntryId, err = jobs.AddJob(e.Cron, j)
+		j.Logger = cJobs.GetJobLogger()
+		j.DbKey = "*"
+		data.EntryId, err = cJobs.AddJob(e.Cron, j)
 		if err != nil {
 			e.Log.Errorf("jobs AddJob[HttpJob] error: %s", err)
 		}
 	} else {
-		var j = &jobs.ExecJob{}
+		var j = &cJobs.ExecJob{}
 		j.InvokeTarget = data.InvokeTarget
 		j.CronExpression = data.CronExpression
 		j.JobId = data.JobId
 		j.Name = data.JobName
 		j.Args = data.Args
-		data.EntryId, err = jobs.AddJob(e.Cron, j)
+		j.Logger = cJobs.GetJobLogger()
+		j.DbKey = "*"
+		data.EntryId, err = cJobs.AddJob(e.Cron, j)
 		if err != nil {
 			e.Log.Errorf("jobs AddJob[ExecJob] error: %s", err)
 		}
@@ -174,7 +178,7 @@ func (e *SysJobService) StopJob(c *dto.SysJobGetReq) error {
 		err = errors.New("当前Job是已停用。")
 		return err
 	}
-	cn := jobs.Remove(e.Cron, data.EntryId)
+	cn := cJobs.Remove(e.Cron, data.EntryId)
 
 	select {
 	case res := <-cn:

+ 42 - 0
SERVER/IotAdmin/common/jobs/call.go

@@ -0,0 +1,42 @@
+package jobs
+
+import (
+	"IotAdmin/common/jobs/models"
+	"IotAdmin/core/sdk"
+	"IotAdmin/core/tools/utils"
+	"fmt"
+
+	"github.com/robfig/cron/v3"
+)
+
+func CallExec(e JobExec, jobCore *JobCore) error {
+	job := &models.SysJob{
+		JobId: jobCore.JobId,
+	}
+	db := sdk.Runtime.GetDbByKey(jobCore.DbKey)
+	if db == nil {
+		err := fmt.Errorf("【%s】 EXEC DB ERROR:%s %s", jobCore.Name, jobCore.DbKey, "db is nil")
+		return err
+	}
+	err := job.Get(db, job)
+	if err != nil {
+		err = fmt.Errorf("【%s】 EXEC DB ERROR: %s", jobCore.Name, err.Error())
+		return err
+	}
+	jobLogger.Infof("【%s】 EXEC START", jobCore.Name)
+	job.ExecCount++
+	job.LastExecTime = utils.NowLong()
+	err = e.Exec(jobCore)
+	if err != nil {
+		job.FailCount++
+		jobLogger.Errorf("【%s】 EXEC ERROR: %s", jobCore.Name, err.Error())
+	}
+	crontab := sdk.Runtime.GetCrontabKey(jobCore.DbKey)
+	entry := crontab.Entry(cron.EntryID(job.EntryId))
+	job.NextExecTime = utils.Time2Long(entry.Next)
+	jobLogger.Infof("【%s】 EXEC END\r\n", jobCore.Name)
+	if err2 := job.Update(db, job.JobId); err2 != nil {
+		err = fmt.Errorf("【%s】 EXEC DB ERROR: %s", jobCore.Name, err2.Error())
+	}
+	return err
+}

+ 106 - 0
SERVER/IotAdmin/common/jobs/init.go

@@ -0,0 +1,106 @@
+package jobs
+
+import (
+	"IotAdmin/common/jobs/models"
+	"IotAdmin/core/sdk"
+	"IotAdmin/core/sdk/pkg/cronjob"
+
+	"gorm.io/gorm"
+)
+
+func init() {
+	jobList = make(map[string]JobExec)
+}
+
+var (
+	// jobList 定义的job
+	jobList map[string]JobExec
+)
+
+// InitJob 初始化job
+func InitJob() {
+	initJobList()
+	Setup(sdk.Runtime.GetDb())
+}
+
+// 需要将定义的struct 添加到字典中;
+// 字典 key 可以配置到 自动任务 调用目标 中;
+func initJobList() {
+
+}
+
+func AddJobs(mp map[string]JobExec) {
+	for k, v := range mp {
+		jobList[k] = v
+	}
+}
+
+// Setup 初始化
+func Setup(dbs map[string]*gorm.DB) {
+	jobLogger.Info("STARTING...")
+	for k, db := range dbs {
+		sdk.Runtime.SetCrontab(k, cronjob.NewWithSecondsLogger(jobLogger))
+		setup(k, db)
+	}
+}
+
+func setup(key string, db *gorm.DB) {
+	crontab := sdk.Runtime.GetCrontabKey(key)
+	sysJob := models.SysJob{}
+	jobList := make([]models.SysJob, 0)
+	err := sysJob.GetList(db, &jobList)
+	if err != nil {
+		jobLogger.Errorf("INIT [%s] ERROR: %s", key, err.Error())
+	}
+	if len(jobList) == 0 {
+		jobLogger.Infof("TOTAL:0")
+	}
+	_, err = sysJob.RemoveAllEntryID(db)
+	if err != nil {
+		jobLogger.Errorf("REMOVE ALL ENTRY [%s] ERROR: %s", key, err.Error())
+	}
+
+	for i := 0; i < len(jobList); i++ {
+		if jobList[i].JobType == 1 {
+			j := &HttpJob{}
+			j.InvokeTarget = jobList[i].InvokeTarget
+			j.CronExpression = jobList[i].CronExpression
+			j.JobId = jobList[i].JobId
+			j.Name = jobList[i].JobName
+			j.DbKey = key
+			j.Logger = jobLogger
+			sysJob.EntryId, err = AddJob(crontab, j)
+		} else if jobList[i].JobType == 2 {
+			j := &ExecJob{}
+			j.InvokeTarget = jobList[i].InvokeTarget
+			j.CronExpression = jobList[i].CronExpression
+			j.JobId = jobList[i].JobId
+			j.Name = jobList[i].JobName
+			j.Args = jobList[i].Args
+			j.DbKey = key
+			j.Logger = jobLogger
+			sysJob.EntryId, err = AddJob(crontab, j)
+		}
+		jobLogger.Infof("ADD JOB: %s", jobList[i].JobName)
+		err = sysJob.Update(db, jobList[i].JobId)
+	}
+
+	// 启动任务
+	crontab.Start()
+	jobLogger.Infof("START [%s] SUCCESS", key)
+	// 关闭任务
+	defer func() {
+		jobLogger.Infof("STOP [%s]\r\n\r\n", key)
+		crontab.Stop()
+	}()
+	select {}
+}
+
+// GetJobKeys 获取job key列表
+func GetJobKeys() map[string]string {
+	var mp = make(map[string]string)
+	for k, v := range jobList {
+		mp[k] = v.GetName()
+	}
+	return mp
+}

+ 21 - 15
SERVER/IotAdmin/app/schedule/jobs/job_http.go → SERVER/IotAdmin/common/jobs/job_http.go

@@ -2,7 +2,6 @@ package jobs
 
 import (
 	"IotAdmin/core/sdk/pkg"
-	"fmt"
 	"time"
 
 	"github.com/robfig/cron/v3"
@@ -15,10 +14,22 @@ type HttpJob struct {
 
 // Run http 任务接口
 func (h *HttpJob) Run() {
-
 	startTime := time.Now()
+	err := CallExec(h, &h.JobCore)
+	// 结束时间
+	endTime := time.Now()
+	// 执行时间
+	latencyTime := endTime.Sub(startTime)
+	if err != nil {
+		jobLogger.Errorf("%s EXEC ERROR , SPEND :%v, ERROR: %s", h.Name, latencyTime, err.Error())
+		return
+	}
+	jobLogger.Infof("%s EXEC SUCCESS , SPEND :%v", h.Name, latencyTime)
+	return
+}
+
+func (h *HttpJob) Exec(jobCore *JobCore) (err error) {
 	var count = 0
-	var err error
 	var str string
 	/* 循环 */
 LOOP:
@@ -27,28 +38,23 @@ LOOP:
 		str, err = pkg.Get(h.InvokeTarget)
 		if err != nil {
 			// 如果失败暂停一段时间重试
-			jobLogger.Errorf("[%s] EXEC FAILED! ERROR: %s", h.Name, err.Error())
-			jobLogger.Infof("[%s]  Retry after the task fails %d seconds! %s ", h.Name, (count+1)*5, str)
-			time.Sleep(time.Duration(count+1) * 5 * time.Second)
+			jobLogger.Errorf("【%s】 EXEC FAILED! %s", h.Name, err.Error())
+			jobLogger.Infof("【%s】  RETRY AFTER  %d SECONDS [%s]", h.Name, (count+1)*15, str)
+			time.Sleep(time.Duration(count+1) * 15 * time.Second)
 			count = count + 1
 			goto LOOP
 		}
 	}
-	// 结束时间
-	endTime := time.Now()
-
-	// 执行时间
-	latencyTime := endTime.Sub(startTime)
-	//TODO: 待完善部分
-
-	jobLogger.Infof("%s EXEC SUCCESS , SPEND :%v", h.Name, latencyTime)
 	return
 }
 
+func (h *HttpJob) GetName() string {
+	return ""
+}
 func (h *HttpJob) addJob(c *cron.Cron) (int, error) {
 	id, err := c.AddJob(h.CronExpression, h)
 	if err != nil {
-		fmt.Println(time.Now().Format(timeFormat), " [ERROR] JobCore AddJob error", err)
+		jobLogger.Errorf("ADDJOB ERROR: %s", err.Error())
 		return 0, err
 	}
 	EntryId := int(id)

+ 2 - 7
SERVER/IotAdmin/app/schedule/jobs/job_sys.go → SERVER/IotAdmin/common/jobs/job_sys.go

@@ -1,7 +1,6 @@
 package jobs
 
 import (
-	"fmt"
 	"time"
 
 	"github.com/robfig/cron/v3"
@@ -18,19 +17,15 @@ func (e *ExecJob) Run() {
 		jobLogger.Warnf("[Job] ExecJob Run jobs nil [%s]", e.Name)
 		return
 	}
-	err := CallExec(obj.(JobExec), e.Args)
+	err := CallExec(obj.(JobExec), &e.JobCore)
 	if err != nil {
-		// 如果失败暂停一段时间重试
-		fmt.Println(time.Now().Format(timeFormat), " [ERROR] mission failed! ", err)
+		jobLogger.Errorf("[%s] EXEC FAILED! %s", e.Name, err.Error())
 	}
 	// 结束时间
 	endTime := time.Now()
 
 	// 执行时间
 	latencyTime := endTime.Sub(startTime)
-	//TODO: 待完善部分
-	//str := time.Now().Format(timeFormat) + " [INFO] JobCore " + string(e.EntryId) + "exec success , spend :" + latencyTime.String()
-	//ws.SendAll(str)
 	jobLogger.Infof("[%s] EXEC SUCCESS , SPEND :%v", e.Name, latencyTime)
 	return
 }

+ 0 - 0
SERVER/IotAdmin/app/schedule/jobs/jobbase.go → SERVER/IotAdmin/common/jobs/jobbase.go


+ 5 - 4
SERVER/IotAdmin/app/schedule/jobs/logger.go → SERVER/IotAdmin/common/jobs/logger.go

@@ -68,8 +68,9 @@ func CleanJobLog(days int) error {
 	path := getJobLogPath()
 	return cLog.DeleteLogsOlderThan(path, "log", days)
 }
+
 func (j *JobLogger) Info(msg string, args ...interface{}) {
-	msg = "[CRON]" + msg
+	msg = "[CRON] " + msg
 	if len(args) > 0 {
 		for _, v := range args {
 			msg += fmt.Sprintf(" %v", v)
@@ -78,12 +79,12 @@ func (j *JobLogger) Info(msg string, args ...interface{}) {
 	j.Helper.Info(msg)
 }
 func (j *JobLogger) Infof(msg string, args ...interface{}) {
-	msg = "[JobCore]" + msg
+	msg = "[JobCore] " + msg
 	j.Helper.Infof(msg, args...)
 }
 
 func (j *JobLogger) Error(err error, msg string, args ...interface{}) {
-	msg = "[CRON]" + msg
+	msg = "[CRON] " + msg
 	if len(args) > 0 {
 		for _, v := range args {
 			msg += fmt.Sprintf("%v ", v)
@@ -93,6 +94,6 @@ func (j *JobLogger) Error(err error, msg string, args ...interface{}) {
 }
 
 func (j *JobLogger) Errorf(msg string, args ...interface{}) {
-	msg = "[JobCore]" + msg
+	msg = "[JobCore] " + msg
 	j.Helper.Errorf(msg, args...)
 }

+ 62 - 0
SERVER/IotAdmin/common/jobs/models/job.go

@@ -0,0 +1,62 @@
+package models
+
+import (
+	"IotAdmin/common/models"
+
+	"gorm.io/gorm"
+)
+
+// SysJob 定时任务实体
+type SysJob struct {
+	JobId          int    `json:"jobId" gorm:"type:bigint(20);primaryKey;autoIncrement;comment:编码"`
+	JobName        string `json:"jobName" gorm:"type:varchar(255);comment:名称"`
+	JobGroup       string `json:"jobGroup" gorm:"type:varchar(255);comment:任务分组"`
+	JobType        int    `json:"jobType" gorm:"type:tinyint(4);comment:调用类型"`
+	CronExpression string `json:"cronExpression" gorm:"type:varchar(255);comment:Cron表达式"`
+	InvokeTarget   string `json:"invokeTarget" gorm:"type:varchar(255);comment:调用目标"`
+	Args           string `json:"args" gorm:"type:varchar(255);comment:目标参数"`
+	ExecCount      int    `json:"execCount" gorm:"type:bigint(20);comment:执行次数"`
+	FailCount      int    `json:"failCount" gorm:"type:bigint(20);comment:失败次数"`
+	LastExecTime   int64  `json:"lastExecTime" gorm:"type:bigint(20);comment:上次执行时间"`
+	NextExecTime   int64  `json:"nextExecTime" gorm:"type:bigint(20);comment:下次执行时间"`
+	MisfirePolicy  int    `json:"misfirePolicy" gorm:"type:bigint(20);comment:执行策略"`
+	Concurrent     int    `json:"concurrent" gorm:"type:tinyint(4);comment:是否并发"`
+	Status         int    `json:"status" gorm:"type:tinyint(4);comment:状态"`
+	EntryId        int    `json:"entryId" gorm:"type:smallint(6);comment:EntryId"`
+
+	models.ControlBy
+	models.ModelTime
+}
+
+func (*SysJob) TableName() string {
+	return "sys_job"
+}
+
+func (e *SysJob) Generate() models.ActiveRecord {
+	o := *e
+	return &o
+}
+
+func (e *SysJob) GetId() interface{} {
+	return e.JobId
+}
+
+func (e *SysJob) Get(tx *gorm.DB, job *SysJob) (err error) {
+	return tx.Model(e).First(job, e.JobId).Error
+}
+
+func (e *SysJob) GetList(tx *gorm.DB, list interface{}) (err error) {
+	return tx.Table(e.TableName()).Where("status = ?", 2).Find(list).Error
+}
+
+// Update 更新SysJob
+func (e *SysJob) Update(tx *gorm.DB, id interface{}) (err error) {
+	return tx.Table(e.TableName()).Where(id).Updates(&e).Error
+}
+
+func (e *SysJob) RemoveAllEntryID(tx *gorm.DB) (update SysJob, err error) {
+	if err = tx.Table(e.TableName()).Where("entry_id > ?", 0).Update("entry_id", 0).Error; err != nil {
+		return
+	}
+	return
+}

+ 6 - 6
SERVER/IotAdmin/app/schedule/jobs/type.go → SERVER/IotAdmin/common/jobs/type.go

@@ -1,6 +1,8 @@
 package jobs
 
-import "github.com/robfig/cron/v3"
+import (
+	"github.com/robfig/cron/v3"
+)
 
 type JobCore struct {
 	InvokeTarget   string
@@ -9,6 +11,8 @@ type JobCore struct {
 	EntryId        int
 	CronExpression string
 	Args           string
+	DbKey          string
+	Logger         *JobLogger
 }
 
 type Job interface {
@@ -17,10 +21,6 @@ type Job interface {
 }
 
 type JobExec interface {
-	Exec(arg interface{}) error
+	Exec(job *JobCore) error
 	GetName() string
 }
-
-func CallExec(e JobExec, arg interface{}) error {
-	return e.Exec(arg)
-}

+ 2 - 3
SERVER/IotAdmin/config/sql/db.sql

@@ -439,9 +439,8 @@ INSERT INTO sys_dict_data VALUES (54, 2, '表计主动上报', '2', 'iot_device_
 
 
 
-INSERT INTO sys_job VALUES (1, '接口测试', 'SYSTEM', 1, '0/5 * * * * ', 'http://localhost:6071', '', 1, 1, 1, 0, 1, 1, '2024-03-14 00:00:00.000', '2024-03-14 00:00:00.000', NULL);
-INSERT INTO sys_job VALUES (2, '函数测试', 'SYSTEM', 2, '0/5 * * * * ', 'ExamplesOne', '参数', 1, 1, 1, 0, 1, 1, '2024-03-14 00:00:00.000', '2024-03-14 00:00:00.000', NULL);
-INSERT INTO sys_job VALUES (3, '清除日志', 'SYSTEM', 2, '30 0 0 * * ?', 'CleanLog', '1', 1, 2, 2, 2, 1, 1, '2024-04-02 17:45:13.384', '2024-04-02 17:47:15.219', NULL);
+INSERT INTO sys_job VALUES (1, '接口测试', 'SYSTEM', 1, '0/5 * * * * ', 'http://localhost:6071', '', 0, 0, 0, 0, 1, 1, 1, 0, 1, 1, '2024-03-14 00:00:00.000', '2024-03-14 00:00:00.000', NULL);
+INSERT INTO sys_job VALUES (2, '清除日志', 'SYSTEM', 2, '30 0 0 * * ?', 'CleanLog', '1', 0, 0, 0, 0, 1, 2, 2, 2, 1, 1, '2024-04-02 17:45:13.384', '2024-04-02 17:47:15.219', NULL);
 
 
 INSERT INTO sys_post VALUES (1, '默认岗位', 'CEO', 0, '2','默认岗位', 1, 1, '2024-03-14 00:00:00.000', '2024-03-14 00:00:00.000', NULL);

+ 25 - 1
SERVER/IotAdmin/core/tools/utils/dateTime.go

@@ -1,6 +1,9 @@
 package utils
 
-import "time"
+import (
+	"strconv"
+	"time"
+)
 
 // TimeDiff 时间差计算,true表示超过时间
 func TimeDiff(tm, mat int64) bool {
@@ -10,3 +13,24 @@ func TimeDiff(tm, mat int64) bool {
 		return false
 	}
 }
+
+func NowLong() int64 {
+	return Time2Long(time.Now())
+}
+
+func Time2Long(time time.Time) int64 {
+	str := time.Format("20060102150405")
+	res, err := strconv.Atoi(str)
+	if err != nil {
+		return 0
+	}
+	return int64(res)
+}
+
+func TimeStr2Long(str string, format string) int64 {
+	t, err := time.Parse(format, str)
+	if err != nil {
+		return 0
+	}
+	return Time2Long(t)
+}

+ 1 - 3
SERVER/IotAdmin/server/api.go

@@ -17,7 +17,6 @@ import (
 	"IotAdmin/core/sdk/pkg"
 	"IotAdmin/iot"
 	iotDb "IotAdmin/iot/db"
-	iotLog "IotAdmin/iot/log"
 	"context"
 	"errors"
 	"fmt"
@@ -55,7 +54,6 @@ func init() {
 	queue.Register(global.UpdateMeterCalc, iotDb.UpdateMeterCalc)
 	go queue.Run()
 
-	iotLog.Clean(1, 1)
 	//3. 注册路由
 	// 在app/router目录下新建文件 放在init方法 参考system
 
@@ -77,7 +75,7 @@ func Init() {
 	// 数据采集服务
 	go iot.InitIotService()
 	// 定时任务
-	go jobs.InitJob()
+	go jobs.Start()
 
 	if *apiCheck {
 		var routers = sdk.Runtime.GetRouter()

+ 22 - 0
UI/IOTADMIN.VUE/src/views/schedule/job/index.vue

@@ -34,6 +34,10 @@ const opts = reactive<any>({
 		{ field: "cronExpression", name: "Cron表达式", width: "auto", isSort: false, visible: true },
 		{ field: "invokeTarget", name: "调用目标", width: "auto", isSort: false, visible: true },
 		{ field: "status", name: "状态", width: 100, isSort: true, visible: true },
+		{ field: "execCount", name: "执行次数", width: 100, isSort: true, visible: true },
+		{ field: "failCount", name: "失败次数", width: 100, isSort: true, visible: true },
+		{ field: "lastExecTime", name: "上次执行时间", width: 155, isSort: true, visible: true },
+		{ field: "nextExecTime", name: "下一次执行时间", width: 155, isSort: true, visible: true },
 		{ field: "createdAt", name: "创建时间", width: 185, isSort: true, visible: true },
 		{ field: "actions", name: `操作`, width: 150 }
 	],
@@ -352,6 +356,24 @@ onMounted(() => setTimeout(init, 100))
 			<template #createdAt="{ row }">
 				<span>{{ dayjs(row.createdAt).format("YYYY-MM-DD HH:mm:ss") }}</span>
 			</template>
+			<template #lastExecTime="{ row }">
+				<span>
+					{{
+						row.lastExecTime < 20240000000000
+							? "-"
+							: dayjs(row.lastExecTime + "", "YYYYMMDDHHmmss").format("YYYY-MM-DD HH:mm:ss")
+					}}
+				</span>
+			</template>
+			<template #nextExecTime="{ row }">
+				<span>
+					{{
+						row.nextExecTime < 20240000000000
+							? "-"
+							: dayjs(row.nextExecTime + "", "YYYYMMDDHHmmss").format("YYYY-MM-DD HH:mm:ss")
+					}}
+				</span>
+			</template>
 			<template #actions="{ row }">
 				<vb-tooltip content="修改" placement="top">
 					<el-button link type="primary" @click="handleUpdate(row)">