Files
terminal/server/api/scheduled_task_api.go
T

340 lines
8.0 KiB
Go

package api
import (
"context"
"encoding/json"
"strconv"
"strings"
"next-terminal/server/common"
"next-terminal/server/common/maps"
"next-terminal/server/log"
"next-terminal/server/model"
"next-terminal/server/repository"
"next-terminal/server/service"
"next-terminal/server/utils"
"github.com/labstack/echo/v4"
"github.com/robfig/cron/v3"
)
type ScheduledTaskApi struct{}
type ScheduledTaskDTO struct {
ID string `json:"id"`
EntryId int `json:"entryId"`
Name string `json:"name"`
Spec string `json:"spec"`
Type string `json:"type"`
AssetIdList []string `json:"assetIdList"`
Mode string `json:"mode"`
Script string `json:"script"`
Enabled bool `json:"enabled"`
CreatedAt common.JsonTime `json:"createdAt"`
UpdatedAt common.JsonTime `json:"updatedAt"`
LastExecAt *common.JsonTime `json:"lastExecAt,omitempty"`
}
type JobLogDTO struct {
ID string `json:"id"`
JobId string `json:"jobId"`
JobType string `json:"jobType"`
Results []interface{} `json:"results"`
CreatedAt common.JsonTime `json:"createdAt"`
}
func JobLogToDTO(log model.JobLog, jobType string) JobLogDTO {
var results []interface{}
if log.Results != "" {
json.Unmarshal([]byte(log.Results), &results)
}
if results == nil {
results = []interface{}{}
}
return JobLogDTO{
ID: log.ID,
JobId: log.JobId,
JobType: jobType,
Results: results,
CreatedAt: log.Timestamp,
}
}
func JobToDTO(job model.Job) ScheduledTaskDTO {
dto := ScheduledTaskDTO{
ID: job.ID,
EntryId: job.CronJobId,
Name: job.Name,
Spec: job.Cron,
Type: job.Func,
Mode: job.Mode,
Enabled: job.Status == "enabled",
CreatedAt: job.Created,
UpdatedAt: job.Updated,
}
if job.ResourceIds != "" {
dto.AssetIdList = strings.Split(job.ResourceIds, ",")
}
if job.Metadata != "" && job.Func == "asset-exec-command" {
var metadataShell struct {
Shell string `json:"shell"`
}
if err := json.Unmarshal([]byte(job.Metadata), &metadataShell); err == nil {
dto.Script = metadataShell.Shell
} else {
dto.Script = job.Metadata
}
} else {
dto.Script = job.Metadata
}
if !job.LastExecAt.IsZero() {
dto.LastExecAt = &job.LastExecAt
}
return dto
}
func DTOToJob(dto ScheduledTaskDTO) model.Job {
status := "disabled"
if dto.Enabled {
status = "enabled"
}
metadata := dto.Script
if dto.Type == "asset-exec-command" && dto.Script != "" {
metadataJSON, _ := json.Marshal(map[string]string{"shell": dto.Script})
metadata = string(metadataJSON)
}
return model.Job{
ID: dto.ID,
CronJobId: dto.EntryId,
Name: dto.Name,
Cron: dto.Spec,
Func: dto.Type,
ResourceIds: strings.Join(dto.AssetIdList, ","),
Mode: dto.Mode,
Metadata: metadata,
Status: status,
}
}
func (api ScheduledTaskApi) AllEndpoint(c echo.Context) error {
items, err := repository.JobRepository.FindAll(context.TODO())
if err != nil {
return err
}
var dtos []ScheduledTaskDTO
for _, item := range items {
dtos = append(dtos, JobToDTO(item))
}
if dtos == nil {
dtos = []ScheduledTaskDTO{}
}
return Success(c, dtos)
}
func (api ScheduledTaskApi) PagingEndpoint(c echo.Context) error {
pageIndex, _ := strconv.Atoi(c.QueryParam("pageIndex"))
pageSize, _ := strconv.Atoi(c.QueryParam("pageSize"))
name := c.QueryParam("name")
status := c.QueryParam("status")
order := c.QueryParam("order")
field := c.QueryParam("field")
items, total, err := repository.JobRepository.Find(context.TODO(), pageIndex, pageSize, name, status, order, field)
if err != nil {
return err
}
var dtos []ScheduledTaskDTO
for _, item := range items {
dtos = append(dtos, JobToDTO(item))
}
if dtos == nil {
dtos = []ScheduledTaskDTO{}
}
return Success(c, maps.Map{
"total": total,
"items": dtos,
})
}
func (api ScheduledTaskApi) CreateEndpoint(c echo.Context) error {
var dto ScheduledTaskDTO
if err := c.Bind(&dto); err != nil {
return err
}
log.Info("Create ScheduledTask", log.Any("dto", dto))
job := DTOToJob(dto)
job.ID = utils.UUID()
job.Created = common.NowJsonTime()
job.Updated = common.NowJsonTime()
if job.Status == "" {
job.Status = "disabled"
}
if err := repository.JobRepository.Create(context.TODO(), &job); err != nil {
return err
}
return Success(c, job.ID)
}
func (api ScheduledTaskApi) UpdateEndpoint(c echo.Context) error {
id := c.Param("id")
var dto ScheduledTaskDTO
if err := c.Bind(&dto); err != nil {
return err
}
job := DTOToJob(dto)
job.ID = id
job.Updated = common.NowJsonTime()
if err := repository.JobRepository.UpdateById(context.TODO(), &job); err != nil {
return err
}
return Success(c, nil)
}
func (api ScheduledTaskApi) DeleteEndpoint(c echo.Context) error {
id := c.Param("id")
if err := repository.JobRepository.DeleteJobById(context.TODO(), id); err != nil {
return err
}
return Success(c, nil)
}
func (api ScheduledTaskApi) GetEndpoint(c echo.Context) error {
id := c.Param("id")
item, err := repository.JobRepository.FindById(context.TODO(), id)
if err != nil {
return err
}
dto := JobToDTO(item)
log.Info("Get ScheduledTask", log.Any("dto", dto))
return Success(c, dto)
}
func (api ScheduledTaskApi) ChangeStatusEndpoint(c echo.Context) error {
id := c.Param("id")
enabled := c.QueryParam("enabled") == "true"
var status string
if enabled {
status = "enabled"
} else {
status = "disabled"
}
job := model.Job{
ID: id,
Status: status,
}
if err := repository.JobRepository.UpdateById(context.TODO(), &job); err != nil {
return err
}
return Success(c, nil)
}
func (api ScheduledTaskApi) ExecEndpoint(c echo.Context) error {
id := c.Param("id")
job, err := repository.JobRepository.FindById(context.TODO(), id)
if err != nil {
return err
}
now := common.NowJsonTime()
jobUpdate := model.Job{
ID: id,
LastExecAt: now,
}
if err := repository.JobRepository.UpdateById(context.TODO(), &jobUpdate); err != nil {
return err
}
switch job.Func {
case "asset-exec-command":
shellJob := service.ShellJob{
ID: job.ID,
Mode: job.Mode,
ResourceIds: job.ResourceIds,
Metadata: job.Metadata,
}
shellJob.Run()
default:
jobLog := &model.JobLog{
ID: utils.UUID(),
JobId: id,
Timestamp: now,
Message: "任务执行成功",
}
_ = repository.JobLogRepository.Create(context.TODO(), jobLog)
}
return Success(c, nil)
}
func (api ScheduledTaskApi) GetLogsEndpoint(c echo.Context) error {
jobId := c.Param("id")
pageIndex, _ := strconv.Atoi(c.QueryParam("pageIndex"))
pageSize, _ := strconv.Atoi(c.QueryParam("pageSize"))
if pageIndex == 0 {
pageIndex = 1
}
if pageSize == 0 {
pageSize = 10
}
job, err := repository.JobRepository.FindById(context.TODO(), jobId)
if err != nil {
return err
}
items, total, err := repository.JobLogRepository.FindByJobId(context.TODO(), jobId, pageIndex, pageSize)
if err != nil {
return err
}
var dtos []JobLogDTO
for _, item := range items {
dtos = append(dtos, JobLogToDTO(item, job.Func))
}
if dtos == nil {
dtos = []JobLogDTO{}
}
return Success(c, maps.Map{
"total": total,
"items": dtos,
})
}
func (api ScheduledTaskApi) DeleteLogsEndpoint(c echo.Context) error {
jobId := c.Param("id")
if err := repository.JobLogRepository.DeleteByJobId(context.TODO(), jobId); err != nil {
return err
}
return Success(c, nil)
}
func (api ScheduledTaskApi) NextTenRunsEndpoint(c echo.Context) error {
var req struct {
Spec string `json:"spec"`
}
if err := c.Bind(&req); err != nil {
return err
}
var results []string
if req.Spec == "" {
return Success(c, results)
}
parser := cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow)
schedule, err := parser.Parse(req.Spec)
if err != nil {
return Success(c, results)
}
now := common.NowJsonTime().Time
for i := 0; i < 10; i++ {
next := schedule.Next(now)
results = append(results, next.Format("2006-01-02 15:04:05"))
}
return Success(c, results)
}