增加可配置的定时任务功能

This commit is contained in:
dushixiang
2021-02-28 23:56:25 +08:00
parent 5ea00d8e8a
commit 1b8ecefcfe
10 changed files with 862 additions and 83 deletions

86
pkg/api/job.go Normal file
View File

@ -0,0 +1,86 @@
package api
import (
"github.com/labstack/echo/v4"
"next-terminal/pkg/model"
"strconv"
"strings"
)
func JobCreateEndpoint(c echo.Context) error {
var item model.Job
if err := c.Bind(&item); err != nil {
return err
}
if err := model.CreateNewJob(&item); err != nil {
return err
}
return Success(c, "")
}
func JobPagingEndpoint(c echo.Context) error {
pageIndex, _ := strconv.Atoi(c.QueryParam("pageIndex"))
pageSize, _ := strconv.Atoi(c.QueryParam("pageSize"))
name := c.QueryParam("name")
status := c.QueryParam("status")
items, total, err := model.FindPageJob(pageIndex, pageSize, name, status)
if err != nil {
return err
}
return Success(c, H{
"total": total,
"items": items,
})
}
func JobUpdateEndpoint(c echo.Context) error {
id := c.Param("id")
var item model.Job
if err := c.Bind(&item); err != nil {
return err
}
if err := model.UpdateJobById(&item, id); err != nil {
return err
}
return Success(c, nil)
}
func JobChangeStatusEndpoint(c echo.Context) error {
id := c.Param("id")
status := c.QueryParam("status")
if err := model.ChangeJobStatusById(id, status); err != nil {
return err
}
return Success(c, "")
}
func JobDeleteEndpoint(c echo.Context) error {
ids := c.Param("id")
split := strings.Split(ids, ",")
for i := range split {
jobId := split[i]
if err := model.DeleteJobById(jobId); err != nil {
return err
}
}
return Success(c, nil)
}
func JobGetEndpoint(c echo.Context) error {
id := c.Param("id")
item, err := model.FindJobById(id)
if err != nil {
return err
}
return Success(c, item)
}

View File

@ -144,6 +144,16 @@ func SetupRoutes() *echo.Echo {
e.GET("/overview/counter", OverviewCounterEndPoint)
e.GET("/overview/sessions", OverviewSessionPoint)
jobs := e.Group("/jobs", Admin)
{
jobs.POST("", JobCreateEndpoint)
jobs.GET("/paging", JobPagingEndpoint)
jobs.PUT("/:id", JobUpdateEndpoint)
jobs.POST("/:id/change-status", JobChangeStatusEndpoint)
jobs.DELETE("/:id", JobDeleteEndpoint)
jobs.GET("/:id", JobGetEndpoint)
}
return e
}

View File

@ -2,6 +2,7 @@ package global
import (
"github.com/patrickmn/go-cache"
"github.com/robfig/cron/v3"
"gorm.io/gorm"
"next-terminal/pkg/config"
)
@ -13,3 +14,5 @@ var Cache *cache.Cache
var Config *config.Config
var Store *TunStore
var Cron *cron.Cron

View File

@ -1,81 +1,23 @@
package handle
import (
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"log"
"next-terminal/pkg/global"
"next-terminal/pkg/guacd"
"next-terminal/pkg/model"
"next-terminal/pkg/utils"
"os"
"strconv"
"time"
)
func RunTicker() {
c := cron.New(cron.WithSeconds()) //精确到秒
// 每隔一小时删除一次未使用的会话信息
_, _ = global.Cron.AddJob("0 0 0/1 * * ?", model.DelUnUsedSessionJob{})
// 每隔一小时检测一次资产状态
//_, _ = global.Cron.AddJob("0 0 0/1 * * ?", model.CheckAssetStatusJob{})
// 每日凌晨删除超过时长限制的会话
//_, _ = global.Cron.AddJob("0 0 0 * * ?", model.DelTimeoutSessionJob{})
_, _ = c.AddFunc("0 0 0/1 * * ?", func() {
// 定时任务,每隔一小时删除一次未使用的会话信息
sessions, _ := model.FindSessionByStatusIn([]string{model.NoConnect, model.Connecting})
if sessions != nil && len(sessions) > 0 {
now := time.Now()
for i := range sessions {
if now.Sub(sessions[i].ConnectedTime.Time) > time.Hour*1 {
_ = model.DeleteSessionById(sessions[i].ID)
s := sessions[i].Username + "@" + sessions[i].IP + ":" + strconv.Itoa(sessions[i].Port)
logrus.Infof("会话「%v」ID「%v」超过1小时未打开已删除。", s, sessions[i].ID)
}
}
}
// 每隔一小时检测一次资产是否存活
assets, _ := model.FindAllAsset()
if assets != nil && len(assets) > 0 {
for i := range assets {
asset := assets[i]
active := utils.Tcping(asset.IP, asset.Port)
model.UpdateAssetActiveById(active, asset.ID)
logrus.Infof("资产「%v」ID「%v」存活状态检测完成存活「%v」。", asset.Name, asset.ID, active)
}
}
})
_, err := c.AddFunc("0 0 0 * * ?", func() {
// 定时任务 每日凌晨检查超过时长限制的会话
property, err := model.FindPropertyByName("session-saved-limit")
if err != nil {
return
}
if property.Value == "" || property.Value == "-" {
return
}
limit, err := strconv.Atoi(property.Value)
if err != nil {
return
}
sessions, err := model.FindOutTimeSessions(limit)
if err != nil {
return
}
if sessions != nil && len(sessions) > 0 {
var sessionIds []string
for i := range sessions {
sessionIds = append(sessionIds, sessions[i].ID)
}
err := model.DeleteSessionByIds(sessionIds)
if err != nil {
logrus.Errorf("删除离线会话失败 %v", err)
}
}
})
if err != nil {
log.Fatal(err)
}
c.Start()
global.Cron.Start()
}
func RunDataFix() {

229
pkg/model/job.go Normal file
View File

@ -0,0 +1,229 @@
package model
import (
"errors"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"next-terminal/pkg/global"
"next-terminal/pkg/utils"
"strconv"
"time"
)
const (
JobStatusRunning = "running"
JobStatusNotRunning = "not-running"
FuncCheckAssetStatusJob = "check-asset-status-job"
FuncDelUnUsedSessionJob = "del-unused-session-job"
FuncDelTimeoutSessionJob = "del-timeout-session-job"
)
type Job struct {
ID string `gorm:"primary_key" json:"id"`
JobId int `json:"jobId"`
Name string `json:"name"`
Func string `json:"func"`
Cron string `json:"cron"`
Status string `json:"status"`
Created utils.JsonTime `json:"created"`
Updated utils.JsonTime `json:"updated"`
}
func (r *Job) TableName() string {
return "jobs"
}
func FindPageJob(pageIndex, pageSize int, name, status string) (o []Job, total int64, err error) {
job := Job{}
db := global.DB.Table(job.TableName())
dbCounter := global.DB.Table(job.TableName())
if len(name) > 0 {
db = db.Where("name like ?", "%"+name+"%")
dbCounter = dbCounter.Where("name like ?", "%"+name+"%")
}
if len(status) > 0 {
db = db.Where("status = ?", status)
dbCounter = dbCounter.Where("status = ?", status)
}
err = dbCounter.Count(&total).Error
if err != nil {
return nil, 0, err
}
err = db.Order("created desc").Find(&o).Offset((pageIndex - 1) * pageSize).Limit(pageSize).Error
if o == nil {
o = make([]Job, 0)
}
return
}
func FindJobByFunc(function string) (o []Job, err error) {
db := global.DB
err = db.Where("func = ?", function).Find(&o).Error
return
}
func CreateNewJob(o *Job) (err error) {
if o.Status == JobStatusRunning {
j, err := getJob(o.ID, o.Func)
if err != nil {
return err
}
jobId, err := global.Cron.AddJob(o.Cron, j)
if err != nil {
return err
}
o.JobId = int(jobId)
}
return global.DB.Create(o).Error
}
func UpdateJobById(o *Job, id string) (err error) {
if o.Status == JobStatusRunning {
return errors.New("请先停止定时任务后再修改")
}
return global.DB.Where("id = ?", id).Updates(o).Error
}
func UpdateJonUpdatedById(id string) (err error) {
err = global.DB.Where("id = ?", id).Update("updated = ?", utils.NowJsonTime()).Error
return
}
func ChangeJobStatusById(id, status string) (err error) {
var job Job
err = global.DB.Where("id = ?", id).First(&job).Error
if err != nil {
return err
}
if status == JobStatusNotRunning {
j, err := getJob(job.ID, job.Func)
if err != nil {
return err
}
entryID, err := global.Cron.AddJob(job.Cron, j)
if err != nil {
return err
}
job.JobId = int(entryID)
return global.DB.Where("id = ?", id).Update("status = ?", JobStatusRunning).Error
} else {
global.Cron.Remove(cron.EntryID(job.JobId))
return global.DB.Where("id = ?", id).Update("status = ?", JobStatusNotRunning).Error
}
}
func FindJobById(id string) (o Job, err error) {
err = global.DB.Where("id = ?").First(&o).Error
return
}
func DeleteJobById(id string) error {
job, err := FindJobById(id)
if err != nil {
return err
}
if job.Status == JobStatusRunning {
if err := ChangeJobStatusById(JobStatusNotRunning, id); err != nil {
return err
}
}
return global.DB.Where("id = ?").Delete(Job{}).Error
}
func getJob(id, function string) (job cron.Job, err error) {
switch function {
case FuncCheckAssetStatusJob:
job = CheckAssetStatusJob{ID: id}
case FuncDelUnUsedSessionJob:
job = DelUnUsedSessionJob{ID: id}
case FuncDelTimeoutSessionJob:
job = DelTimeoutSessionJob{ID: id}
default:
return nil, errors.New("未识别的任务")
}
return job, err
}
type CheckAssetStatusJob struct {
ID string
}
func (r CheckAssetStatusJob) Run() {
assets, _ := FindAllAsset()
if assets != nil && len(assets) > 0 {
for i := range assets {
asset := assets[i]
active := utils.Tcping(asset.IP, asset.Port)
UpdateAssetActiveById(active, asset.ID)
logrus.Infof("资产「%v」ID「%v」存活状态检测完成存活「%v」。", asset.Name, asset.ID, active)
}
}
if r.ID != "" {
_ = UpdateJonUpdatedById(r.ID)
}
}
type DelUnUsedSessionJob struct {
ID string
}
func (r DelUnUsedSessionJob) Run() {
sessions, _ := FindSessionByStatusIn([]string{NoConnect, Connecting})
if sessions != nil && len(sessions) > 0 {
now := time.Now()
for i := range sessions {
if now.Sub(sessions[i].ConnectedTime.Time) > time.Hour*1 {
_ = DeleteSessionById(sessions[i].ID)
s := sessions[i].Username + "@" + sessions[i].IP + ":" + strconv.Itoa(sessions[i].Port)
logrus.Infof("会话「%v」ID「%v」超过1小时未打开已删除。", s, sessions[i].ID)
}
}
}
if r.ID != "" {
_ = UpdateJonUpdatedById(r.ID)
}
}
type DelTimeoutSessionJob struct {
ID string
}
func (r DelTimeoutSessionJob) Run() {
property, err := FindPropertyByName("session-saved-limit")
if err != nil {
return
}
if property.Value == "" || property.Value == "-" {
return
}
limit, err := strconv.Atoi(property.Value)
if err != nil {
return
}
sessions, err := FindOutTimeSessions(limit)
if err != nil {
return
}
if sessions != nil && len(sessions) > 0 {
var sessionIds []string
for i := range sessions {
sessionIds = append(sessionIds, sessions[i].ID)
}
err := DeleteSessionByIds(sessionIds)
if err != nil {
logrus.Errorf("删除离线会话失败 %v", err)
}
}
if r.ID != "" {
_ = UpdateJonUpdatedById(r.ID)
}
}

View File

@ -21,6 +21,7 @@ type User struct {
Enabled bool `json:"enabled"`
Created utils.JsonTime `json:"created"`
Type string `json:"type"`
Mail string `json:"mail"`
}
type UserVo struct {