mirror of https://github.com/ncarlier/webhookd
298 lines
6.5 KiB
Go
298 lines
6.5 KiB
Go
package hook
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"log/slog"
|
|
"os"
|
|
"os/exec"
|
|
"path"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/ncarlier/webhookd/pkg/helper"
|
|
"github.com/ncarlier/webhookd/pkg/logger"
|
|
)
|
|
|
|
var hookID uint64
|
|
|
|
// Job a hook job
|
|
type Job struct {
|
|
id uint64
|
|
name string
|
|
script string
|
|
method string
|
|
payload string
|
|
args []string
|
|
MessageChan chan []byte
|
|
timeout int
|
|
start time.Time
|
|
status Status
|
|
logFilename string
|
|
err error
|
|
exitCode int
|
|
mutex sync.Mutex
|
|
}
|
|
|
|
// NewHookJob creates new hook job
|
|
func NewHookJob(request *Request) (*Job, error) {
|
|
job := &Job{
|
|
id: atomic.AddUint64(&hookID, 1),
|
|
name: request.Name,
|
|
script: request.Script,
|
|
method: request.Method,
|
|
payload: request.Payload,
|
|
args: request.Args,
|
|
timeout: request.Timeout,
|
|
MessageChan: make(chan []byte),
|
|
status: Idle,
|
|
}
|
|
job.logFilename = path.Join(request.OutputDir, fmt.Sprintf("%s_%d_%s.txt", helper.ToSnake(job.name), job.id, time.Now().Format("20060102_1504")))
|
|
return job, nil
|
|
}
|
|
|
|
// ID returns job ID
|
|
func (job *Job) ID() uint64 {
|
|
return job.id
|
|
}
|
|
|
|
// Name returns job name
|
|
func (job *Job) Name() string {
|
|
return job.name
|
|
}
|
|
|
|
// Err returns job error
|
|
func (job *Job) Err() error {
|
|
return job.err
|
|
}
|
|
|
|
// Meta returns job meta
|
|
func (job *Job) Meta() []string {
|
|
return []string{
|
|
"hook_id=" + strconv.FormatUint(job.id, 10),
|
|
"hook_name=" + job.name,
|
|
"hook_method=" + job.method,
|
|
}
|
|
}
|
|
|
|
// Terminate set job as terminated
|
|
func (job *Job) Terminate(err error) error {
|
|
job.mutex.Lock()
|
|
defer job.mutex.Unlock()
|
|
job.status = Success
|
|
|
|
if err != nil {
|
|
if exiterr, ok := err.(*exec.ExitError); ok {
|
|
job.exitCode = exiterr.ExitCode()
|
|
}
|
|
job.status = Error
|
|
job.err = err
|
|
slog.Error(
|
|
"hook executed",
|
|
"hook", job.Name(),
|
|
"id", job.ID(),
|
|
"status", "error",
|
|
"exitCode", job.exitCode,
|
|
"err", err,
|
|
"took", time.Since(job.start).Milliseconds(),
|
|
)
|
|
return err
|
|
}
|
|
slog.Info(
|
|
"hook executed",
|
|
"hook", job.Name(),
|
|
"id", job.ID(),
|
|
"status", "success",
|
|
"took", time.Since(job.start).Milliseconds(),
|
|
)
|
|
return nil
|
|
}
|
|
|
|
// IsTerminated ask if the job is terminated
|
|
func (job *Job) IsTerminated() bool {
|
|
job.mutex.Lock()
|
|
defer job.mutex.Unlock()
|
|
return job.status == Success || job.status == Error
|
|
}
|
|
|
|
// Status get job status
|
|
func (job *Job) Status() Status {
|
|
return job.status
|
|
}
|
|
|
|
// StatusLabel return job status as string
|
|
func (job *Job) StatusLabel() string {
|
|
switch job.status {
|
|
case Error:
|
|
return "error"
|
|
case Success:
|
|
return "success"
|
|
case Running:
|
|
return "running"
|
|
default:
|
|
return "idle"
|
|
}
|
|
}
|
|
|
|
// ExitCode of the underlying process job
|
|
// Can be 0 if the process is not over
|
|
func (job *Job) ExitCode() int {
|
|
return job.exitCode
|
|
}
|
|
|
|
// SendMessage send message to the message channel
|
|
func (job *Job) SendMessage(message string) {
|
|
job.MessageChan <- []byte(message)
|
|
}
|
|
|
|
// OpenLogFile open job log file
|
|
func (job *Job) OpenLogFile() (*os.File, error) {
|
|
return os.Open(job.logFilename)
|
|
}
|
|
|
|
// Logs returns job logs filtered with the prefix
|
|
func (job *Job) Logs(prefixFilter string) string {
|
|
file, err := job.OpenLogFile()
|
|
if err != nil {
|
|
return err.Error()
|
|
}
|
|
defer file.Close()
|
|
|
|
var result bytes.Buffer
|
|
scanner := bufio.NewScanner(file)
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
if strings.HasPrefix(line, prefixFilter) {
|
|
line = strings.TrimPrefix(line, prefixFilter)
|
|
line = strings.TrimLeft(line, " ")
|
|
result.WriteString(line + "\n")
|
|
}
|
|
}
|
|
if err := scanner.Err(); err != nil {
|
|
return err.Error()
|
|
}
|
|
return result.String()
|
|
}
|
|
|
|
// Close job message chan
|
|
func (job *Job) Close() {
|
|
close(job.MessageChan)
|
|
}
|
|
|
|
// Run hook job
|
|
func (job *Job) Run() error {
|
|
if job.status != Idle {
|
|
return fmt.Errorf("unable to run job: status=%s", job.StatusLabel())
|
|
}
|
|
job.status = Running
|
|
job.start = time.Now()
|
|
slog.Info("executing hook...", "hook", job.name, "id", job.id)
|
|
|
|
binary, err := exec.LookPath(job.script)
|
|
if err != nil {
|
|
return job.Terminate(err)
|
|
}
|
|
|
|
// Exec script with parameter...
|
|
cmd := exec.Command(binary, job.payload)
|
|
// with env variables and hook arguments...
|
|
cmd.Env = append(os.Environ(), job.args...)
|
|
// and hook meta...
|
|
cmd.Env = append(cmd.Env, job.Meta()...)
|
|
// using a process group...
|
|
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
|
|
|
|
// Open the log file for writing
|
|
logFile, err := os.Create(job.logFilename)
|
|
if err != nil {
|
|
return job.Terminate(err)
|
|
}
|
|
defer logFile.Close()
|
|
slog.Debug("hook details", "hook", job.name, "id", job.id, "script", job.script, "args", job.args, "output", logFile.Name())
|
|
|
|
wLogFile := bufio.NewWriter(logFile)
|
|
defer wLogFile.Flush()
|
|
|
|
// Combine cmd stdout and stderr
|
|
outReader, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
return job.Terminate(err)
|
|
}
|
|
errReader, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
return job.Terminate(err)
|
|
}
|
|
cmdReader := io.MultiReader(outReader, errReader)
|
|
|
|
// Start the script...
|
|
err = cmd.Start()
|
|
if err != nil {
|
|
return job.Terminate(err)
|
|
}
|
|
|
|
// Create wait group to wait for command output completion
|
|
var wg sync.WaitGroup
|
|
wg.Add(1)
|
|
|
|
// Write script output to log file and the work message channel
|
|
go func(reader io.Reader) {
|
|
r := bufio.NewReader(reader)
|
|
for {
|
|
line, err := r.ReadString('\n')
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
slog.Error("error while reading hook std[out/err]", "hook", job.name, "id", job.id, "err", err)
|
|
break
|
|
}
|
|
line, _ = strings.CutSuffix(line, "\r")
|
|
// writing to the work channel
|
|
if !job.IsTerminated() {
|
|
job.MessageChan <- []byte(line)
|
|
} else {
|
|
slog.Error("hook execution done ; unable to write more data into the channel", "hook", job.name, "id", job.id, "line", line)
|
|
break
|
|
}
|
|
// write to stdout if configured
|
|
logger.LogIf(
|
|
logger.HookOutputEnabled,
|
|
slog.LevelInfo+1,
|
|
line,
|
|
"hook", job.name,
|
|
"id", job.id,
|
|
)
|
|
// writing to outfile
|
|
if _, err := wLogFile.WriteString(line + "\n"); err != nil {
|
|
slog.Error("error while writing into the log file", "filename", logFile.Name(), "err", err)
|
|
break
|
|
}
|
|
}
|
|
wg.Done()
|
|
}(cmdReader)
|
|
|
|
// Start timeout timer
|
|
timer := time.AfterFunc(time.Duration(job.timeout)*time.Second, func() {
|
|
slog.Warn("hook has timed out: killing process...", "hook", job.name, "id", job.id, "timeout", job.timeout, "pid", cmd.Process.Pid)
|
|
syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
|
|
})
|
|
|
|
// Wait for command output completion
|
|
wg.Wait()
|
|
|
|
// Wait for command completion
|
|
err = cmd.Wait()
|
|
|
|
// Stop timeout timer
|
|
timer.Stop()
|
|
|
|
// Mark work as terminated
|
|
return job.Terminate(err)
|
|
}
|