woodpecker/server/cron/cron.go

150 lines
3.8 KiB
Go

// Copyright 2022 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cron
import (
"context"
"fmt"
"time"
"github.com/gdgvda/cron"
"github.com/rs/zerolog/log"
"go.woodpecker-ci.org/woodpecker/v3/server"
"go.woodpecker-ci.org/woodpecker/v3/server/forge"
"go.woodpecker-ci.org/woodpecker/v3/server/model"
"go.woodpecker-ci.org/woodpecker/v3/server/pipeline"
"go.woodpecker-ci.org/woodpecker/v3/server/store"
)
const (
// Specifies the interval woodpecker checks for new crons to exec.
checkTime = time.Minute
// Specifies the batch size of crons to retrieve per check from database.
checkItems = 10
)
// Run starts the cron scheduler loop.
func Run(ctx context.Context, store store.Store) error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(checkTime):
go func() {
now := time.Now()
log.Trace().Msg("cron: fetch next crons")
crons, err := store.CronListNextExecute(now.Unix(), checkItems)
if err != nil {
log.Error().Err(err).Int64("now", now.Unix()).Msg("obtain cron list")
return
}
for _, cron := range crons {
if err := runCron(ctx, store, cron, now); err != nil {
log.Error().Err(err).Int64("cronID", cron.ID).Msg("run cron failed")
}
}
}()
}
}
}
// CalcNewNext parses a cron string and calculates the next exec time based on it.
func CalcNewNext(schedule string, now time.Time) (time.Time, error) {
// remove local timezone
now = now.UTC()
// TODO: allow the users / the admin to set a specific timezone
c, err := cron.ParseStandard(schedule)
if err != nil {
return time.Time{}, fmt.Errorf("cron parse schedule: %w", err)
}
return c.Next(now), nil
}
func runCron(ctx context.Context, store store.Store, cron *model.Cron, now time.Time) error {
log.Trace().Msgf("cron: run id[%d]", cron.ID)
newNext, err := CalcNewNext(cron.Schedule, now)
if err != nil {
return err
}
// try to get lock on cron
gotLock, err := store.CronGetLock(cron, newNext.Unix())
if err != nil {
return err
}
if !gotLock {
// another go routine caught it
return nil
}
repo, newPipeline, err := CreatePipeline(ctx, store, cron)
if err != nil {
return err
}
_, err = pipeline.Create(ctx, store, repo, newPipeline)
return err
}
func CreatePipeline(ctx context.Context, store store.Store, cron *model.Cron) (*model.Repo, *model.Pipeline, error) {
repo, err := store.GetRepo(cron.RepoID)
if err != nil {
return nil, nil, err
}
_forge, err := server.Config.Services.Manager.ForgeFromRepo(repo)
if err != nil {
return nil, nil, err
}
if cron.Branch == "" {
// fallback to the repos default branch
cron.Branch = repo.Branch
}
creator, err := store.GetUser(cron.CreatorID)
if err != nil {
return nil, nil, err
}
// If the forge has a refresh token, the current access token
// may be stale. Therefore, we should refresh prior to dispatching
// the pipeline.
forge.Refresh(ctx, _forge, store, creator)
commit, err := _forge.BranchHead(ctx, creator, repo, cron.Branch)
if err != nil {
return nil, nil, err
}
return repo, &model.Pipeline{
Event: model.EventCron,
Commit: commit.SHA,
Ref: "refs/heads/" + cron.Branch,
Branch: cron.Branch,
Message: cron.Name,
Timestamp: cron.NextExec,
Sender: cron.Name,
ForgeURL: commit.ForgeURL,
}, nil
}