完成计划任务功能

This commit is contained in:
dushixiang
2021-03-05 15:14:37 +08:00
parent 2d06cd373f
commit f81aedcac0
11 changed files with 375 additions and 112 deletions

View File

@ -1,12 +1,15 @@
package model
import (
"encoding/json"
"errors"
"fmt"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"next-terminal/pkg/global"
"next-terminal/pkg/term"
"next-terminal/pkg/utils"
"strings"
"time"
)
@ -15,18 +18,24 @@ const (
JobStatusNotRunning = "not-running"
FuncCheckAssetStatusJob = "check-asset-status-job"
FuncShellJob = "shell-job"
JobModeAll = "all"
JobModeCustom = "custom"
)
type Job struct {
ID string `gorm:"primary_key" json:"id"`
CronJobId int `json:"cronJobId"`
Name string `json:"name"`
Func string `json:"func"`
Cron string `json:"cron"`
Status string `json:"status"`
Metadata string `json:"metadata"`
Created utils.JsonTime `json:"created"`
Updated utils.JsonTime `json:"updated"`
ID string `gorm:"primary_key" json:"id"`
CronJobId int `json:"cronJobId"`
Name string `json:"name"`
Func string `json:"func"`
Cron string `json:"cron"`
Mode string `json:"mode"`
ResourceIds string `json:"resourceIds"`
Status string `json:"status"`
Metadata string `json:"metadata"`
Created utils.JsonTime `json:"created"`
Updated utils.JsonTime `json:"updated"`
}
func (r *Job) TableName() string {
@ -69,7 +78,7 @@ func FindJobByFunc(function string) (o []Job, err error) {
func CreateNewJob(o *Job) (err error) {
if o.Status == JobStatusRunning {
j, err := getJob(o.ID, o.Func)
j, err := getJob(o)
if err != nil {
return err
}
@ -103,7 +112,7 @@ func ChangeJobStatusById(id, status string) (err error) {
return err
}
if status == JobStatusRunning {
j, err := getJob(job.ID, job.Func)
j, err := getJob(&job)
if err != nil {
return err
}
@ -111,10 +120,12 @@ func ChangeJobStatusById(id, status string) (err error) {
if err != nil {
return err
}
job.CronJobId = int(entryID)
return global.DB.Updates(Job{ID: id, Status: JobStatusRunning}).Error
logrus.Debugf("开启计划任务「%v」,运行中计划任务数量「%v」", job.Name, len(global.Cron.Entries()))
return global.DB.Updates(Job{ID: id, Status: JobStatusRunning, CronJobId: int(entryID)}).Error
} else {
global.Cron.Remove(cron.EntryID(job.CronJobId))
logrus.Debugf("关闭计划任务「%v」,运行中计划任务数量「%v」", job.Name, len(global.Cron.Entries()))
return global.DB.Updates(Job{ID: id, Status: JobStatusNotRunning}).Error
}
}
@ -124,7 +135,7 @@ func ExecJobById(id string) (err error) {
if err != nil {
return err
}
j, err := getJob(id, job.Func)
j, err := getJob(&job)
if err != nil {
return err
}
@ -150,10 +161,12 @@ func DeleteJobById(id string) error {
return global.DB.Where("id = ?", id).Delete(Job{}).Error
}
func getJob(id, function string) (job cron.Job, err error) {
switch function {
func getJob(j *Job) (job cron.Job, err error) {
switch j.Func {
case FuncCheckAssetStatusJob:
job = CheckAssetStatusJob{ID: id}
job = CheckAssetStatusJob{ID: j.ID, Mode: j.Mode, ResourceIds: j.ResourceIds, Metadata: j.Metadata}
case FuncShellJob:
job = ShellJob{ID: j.ID, Mode: j.Mode, ResourceIds: j.ResourceIds, Metadata: j.Metadata}
default:
return nil, errors.New("未识别的任务")
}
@ -161,44 +174,176 @@ func getJob(id, function string) (job cron.Job, err error) {
}
type CheckAssetStatusJob struct {
ID string
ID string
Mode string
ResourceIds string
Metadata string
}
func (r CheckAssetStatusJob) Run() {
assets, _ := FindAllAsset()
if assets != nil && len(assets) > 0 {
msgChan := make(chan string)
for i := range assets {
asset := assets[i]
go func() {
t1 := time.Now()
active := utils.Tcping(asset.IP, asset.Port)
elapsed := time.Since(t1)
msg := fmt.Sprintf("资产「%v」存活状态检测完成存活「%v」耗时「%v」", asset.Name, active, elapsed)
UpdateAssetActiveById(active, asset.ID)
logrus.Infof(msg)
msgChan <- msg
}()
}
if r.ID != "" {
var message = ""
for i := 0; i < len(assets); i++ {
message += <-msgChan + "\n"
}
_ = UpdateJonUpdatedById(r.ID)
jobLog := JobLog{
ID: utils.UUID(),
JobId: r.ID,
Timestamp: utils.NowJsonTime(),
Message: message,
}
_ = CreateNewJobLog(&jobLog)
}
if r.ID == "" {
return
}
var assets []Asset
if r.Mode == JobModeAll {
assets, _ = FindAllAsset()
} else {
assets, _ = FindAssetByIds(strings.Split(r.ResourceIds, ","))
}
if assets == nil || len(assets) == 0 {
return
}
msgChan := make(chan string)
for i := range assets {
asset := assets[i]
go func() {
t1 := time.Now()
active := utils.Tcping(asset.IP, asset.Port)
elapsed := time.Since(t1)
msg := fmt.Sprintf("资产「%v」存活状态检测完成存活「%v」耗时「%v」", asset.Name, active, elapsed)
UpdateAssetActiveById(active, asset.ID)
logrus.Infof(msg)
msgChan <- msg
}()
}
var message = ""
for i := 0; i < len(assets); i++ {
message += <-msgChan + "\n"
}
_ = UpdateJonUpdatedById(r.ID)
jobLog := JobLog{
ID: utils.UUID(),
JobId: r.ID,
Timestamp: utils.NowJsonTime(),
Message: message,
}
_ = CreateNewJobLog(&jobLog)
}
type ShellJob struct {
ID string
Mode string
ResourceIds string
Metadata string
}
type MetadataShell struct {
Shell string
}
func (r ShellJob) Run() {
if r.ID == "" {
return
}
var assets []Asset
if r.Mode == JobModeAll {
assets, _ = FindAssetByProtocol("ssh")
} else {
assets, _ = FindAssetByProtocolAndIds("ssh", strings.Split(r.ResourceIds, ","))
}
if assets == nil || len(assets) == 0 {
return
}
var metadataShell MetadataShell
err := json.Unmarshal([]byte(r.Metadata), &metadataShell)
if err != nil {
logrus.Errorf("JSON数据解析失败 %v", err)
return
}
msgChan := make(chan string)
for i := range assets {
asset, err := FindAssetById(assets[i].ID)
if err != nil {
msgChan <- fmt.Sprintf("资产「%v」Shell执行失败查询数据异常「%v」", assets[i].Name, err.Error())
return
}
var (
username = asset.Username
password = asset.Password
privateKey = asset.PrivateKey
passphrase = asset.Passphrase
ip = asset.IP
port = asset.Port
)
if asset.AccountType == "credential" {
credential, err := FindCredentialById(asset.CredentialId)
if err != nil {
msgChan <- fmt.Sprintf("资产「%v」Shell执行失败查询授权凭证数据异常「%v」", assets[i].Name, err.Error())
return
}
if credential.Type == Custom {
username = credential.Username
password = credential.Password
} else {
username = credential.Username
privateKey = credential.PrivateKey
passphrase = credential.Passphrase
}
}
go func() {
t1 := time.Now()
result, err := ExecCommandBySSH(metadataShell.Shell, ip, port, username, password, privateKey, passphrase)
elapsed := time.Since(t1)
var msg string
if err != nil {
msg = fmt.Sprintf("资产「%v」Shell执行失败返回值「%v」耗时「%v」", asset.Name, err.Error(), elapsed)
logrus.Infof(msg)
} else {
msg = fmt.Sprintf("资产「%v」Shell执行成功返回值「%v」耗时「%v」", asset.Name, result, elapsed)
logrus.Infof(msg)
}
msgChan <- msg
}()
}
var message = ""
for i := 0; i < len(assets); i++ {
message += <-msgChan + "\n"
}
_ = UpdateJonUpdatedById(r.ID)
jobLog := JobLog{
ID: utils.UUID(),
JobId: r.ID,
Timestamp: utils.NowJsonTime(),
Message: message,
}
_ = CreateNewJobLog(&jobLog)
}
func ExecCommandBySSH(cmd, ip string, port int, username, password, privateKey, passphrase string) (result string, err error) {
sshClient, err := term.NewSshClient(ip, port, username, password, privateKey, passphrase)
if err != nil {
return "", err
}
session, err := sshClient.NewSession()
if err != nil {
return "", err
}
defer session.Close()
//执行远程命令
combo, err := session.CombinedOutput(cmd)
if err != nil {
return "", err
}
return string(combo), nil
}