woodpecker/pipeline/backend/kubernetes/kubernetes.go

439 lines
12 KiB
Go

// Copyright 2022 Woodpecker Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubernetes
import (
"context"
std_errs "errors"
"fmt"
"io"
"maps"
"os"
"runtime"
"slices"
"time"
"github.com/rs/zerolog/log"
"github.com/urfave/cli/v3"
"gopkg.in/yaml.v3"
v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp" // To authenticate to GCP K8s clusters
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"go.woodpecker-ci.org/woodpecker/v3/pipeline/backend/types"
)
const (
EngineName = "kubernetes"
// TODO: 5 seconds is against best practice, k3s didn't work otherwise
defaultResyncDuration = 5 * time.Second
)
var defaultDeleteOptions = newDefaultDeleteOptions()
type kube struct {
client kubernetes.Interface
config *config
goos string
}
type config struct {
Namespace string
StorageClass string
VolumeSize string
StorageRwx bool
PodLabels map[string]string
PodLabelsAllowFromStep bool
PodAnnotations map[string]string
PodAnnotationsAllowFromStep bool
PodNodeSelector map[string]string
ImagePullSecretNames []string
SecurityContext SecurityContextConfig
NativeSecretsAllowFromStep bool
}
type SecurityContextConfig struct {
RunAsNonRoot bool
FSGroup *int64
}
func newDefaultDeleteOptions() meta_v1.DeleteOptions {
gracePeriodSeconds := int64(0) // immediately
propagationPolicy := meta_v1.DeletePropagationBackground
return meta_v1.DeleteOptions{
GracePeriodSeconds: &gracePeriodSeconds,
PropagationPolicy: &propagationPolicy,
}
}
func configFromCliContext(ctx context.Context) (*config, error) {
if ctx != nil {
if c, ok := ctx.Value(types.CliCommand).(*cli.Command); ok {
config := config{
Namespace: c.String("backend-k8s-namespace"),
StorageClass: c.String("backend-k8s-storage-class"),
VolumeSize: c.String("backend-k8s-volume-size"),
StorageRwx: c.Bool("backend-k8s-storage-rwx"),
PodLabels: make(map[string]string), // just init empty map to prevent nil panic
PodLabelsAllowFromStep: c.Bool("backend-k8s-pod-labels-allow-from-step"),
PodAnnotations: make(map[string]string), // just init empty map to prevent nil panic
PodAnnotationsAllowFromStep: c.Bool("backend-k8s-pod-annotations-allow-from-step"),
PodNodeSelector: make(map[string]string), // just init empty map to prevent nil panic
ImagePullSecretNames: c.StringSlice("backend-k8s-pod-image-pull-secret-names"),
SecurityContext: SecurityContextConfig{
RunAsNonRoot: c.Bool("backend-k8s-secctx-nonroot"), // cspell:words secctx nonroot
FSGroup: newInt64(defaultFSGroup),
},
NativeSecretsAllowFromStep: c.Bool("backend-k8s-allow-native-secrets"),
}
// Unmarshal label and annotation settings here to ensure they're valid on startup
if labels := c.String("backend-k8s-pod-labels"); labels != "" {
if err := yaml.Unmarshal([]byte(labels), &config.PodLabels); err != nil {
log.Error().Err(err).Msgf("could not unmarshal pod labels '%s'", c.String("backend-k8s-pod-labels"))
return nil, err
}
}
if annotations := c.String("backend-k8s-pod-annotations"); annotations != "" {
if err := yaml.Unmarshal([]byte(c.String("backend-k8s-pod-annotations")), &config.PodAnnotations); err != nil {
log.Error().Err(err).Msgf("could not unmarshal pod annotations '%s'", c.String("backend-k8s-pod-annotations"))
return nil, err
}
}
if nodeSelector := c.String("backend-k8s-pod-node-selector"); nodeSelector != "" {
if err := yaml.Unmarshal([]byte(nodeSelector), &config.PodNodeSelector); err != nil {
log.Error().Err(err).Msgf("could not unmarshal pod node selector '%s'", nodeSelector)
return nil, err
}
}
return &config, nil
}
}
return nil, types.ErrNoCliContextFound
}
// New returns a new Kubernetes Backend.
func New() types.Backend {
return &kube{}
}
func (e *kube) Name() string {
return EngineName
}
func (e *kube) IsAvailable(context.Context) bool {
host := os.Getenv("KUBERNETES_SERVICE_HOST")
return len(host) > 0
}
func (e *kube) Flags() []cli.Flag {
return Flags
}
func (e *kube) Load(ctx context.Context) (*types.BackendInfo, error) {
config, err := configFromCliContext(ctx)
if err != nil {
return nil, err
}
e.config = config
var kubeClient kubernetes.Interface
_, err = rest.InClusterConfig()
if err != nil {
kubeClient, err = getClientOutOfCluster()
} else {
kubeClient, err = getClientInsideOfCluster()
}
if err != nil {
return nil, err
}
e.client = kubeClient
// TODO(2693): use info resp of kubeClient to define platform var
e.goos = runtime.GOOS
return &types.BackendInfo{
Platform: runtime.GOOS + "/" + runtime.GOARCH,
}, nil
}
func (e *kube) getConfig() *config {
if e.config == nil {
return nil
}
c := *e.config
c.PodLabels = maps.Clone(e.config.PodLabels)
c.PodAnnotations = maps.Clone(e.config.PodAnnotations)
c.PodNodeSelector = maps.Clone(e.config.PodNodeSelector)
c.ImagePullSecretNames = slices.Clone(e.config.ImagePullSecretNames)
return &c
}
// SetupWorkflow sets up the pipeline environment.
func (e *kube) SetupWorkflow(ctx context.Context, conf *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msgf("Setting up Kubernetes primitives")
for _, vol := range conf.Volumes {
_, err := startVolume(ctx, e, vol.Name)
if err != nil {
return err
}
}
var extraHosts []types.HostAlias
for _, stage := range conf.Stages {
for _, step := range stage.Steps {
if step.Type == types.StepTypeService {
svc, err := startService(ctx, e, step)
if err != nil {
return err
}
hostAlias := types.HostAlias{Name: step.Networks[0].Aliases[0], IP: svc.Spec.ClusterIP}
extraHosts = append(extraHosts, hostAlias)
}
}
}
log.Trace().Msgf("adding extra hosts: %v", extraHosts)
for _, stage := range conf.Stages {
for _, step := range stage.Steps {
step.ExtraHosts = extraHosts
}
}
return nil
}
// StartStep starts the pipeline step.
func (e *kube) StartStep(ctx context.Context, step *types.Step, taskUUID string) error {
options, err := parseBackendOptions(step)
if err != nil {
log.Error().Err(err).Msg("could not parse backend options")
}
if needsRegistrySecret(step) {
err = startRegistrySecret(ctx, e, step)
if err != nil {
return err
}
}
log.Trace().Str("taskUUID", taskUUID).Msgf("starting step: %s", step.Name)
_, err = startPod(ctx, e, step, options)
return err
}
// WaitStep waits for the pipeline step to complete and returns
// the completion results.
func (e *kube) WaitStep(ctx context.Context, step *types.Step, taskUUID string) (*types.State, error) {
podName, err := stepToPodName(step)
if err != nil {
return nil, err
}
log.Trace().Str("taskUUID", taskUUID).Msgf("waiting for pod: %s", podName)
finished := make(chan bool)
podUpdated := func(_, newPod any) {
pod, ok := newPod.(*v1.Pod)
if !ok {
log.Error().Msgf("could not parse pod: %v", newPod)
return
}
if pod.Name == podName {
if isImagePullBackOffState(pod) || isInvalidImageName(pod) {
finished <- true
}
switch pod.Status.Phase {
case v1.PodSucceeded, v1.PodFailed, v1.PodUnknown:
finished <- true
}
}
}
si := informers.NewSharedInformerFactoryWithOptions(e.client, defaultResyncDuration, informers.WithNamespace(e.config.Namespace))
if _, err := si.Core().V1().Pods().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: podUpdated,
},
); err != nil {
return nil, err
}
stop := make(chan struct{})
si.Start(stop)
defer close(stop)
// TODO: Cancel on ctx.Done
<-finished
pod, err := e.client.CoreV1().Pods(e.config.Namespace).Get(ctx, podName, meta_v1.GetOptions{})
if err != nil {
return nil, err
}
if isImagePullBackOffState(pod) || isInvalidImageName(pod) {
return nil, fmt.Errorf("could not pull image for pod %s", podName)
}
if len(pod.Status.ContainerStatuses) == 0 {
return nil, fmt.Errorf("no container statuses found for pod %s", podName)
}
cs := pod.Status.ContainerStatuses[0]
if cs.State.Terminated == nil {
err := fmt.Errorf("no terminated state found for container %s/%s", podName, cs.Name)
log.Error().Str("taskUUID", taskUUID).Str("pod", podName).Str("container", cs.Name).Interface("state", cs.State).Msg(err.Error())
return nil, err
}
bs := &types.State{
ExitCode: int(cs.State.Terminated.ExitCode),
Exited: true,
OOMKilled: false,
}
return bs, nil
}
// TailStep tails the pipeline step logs.
func (e *kube) TailStep(ctx context.Context, step *types.Step, taskUUID string) (io.ReadCloser, error) {
podName, err := stepToPodName(step)
if err != nil {
return nil, err
}
log.Trace().Str("taskUUID", taskUUID).Msgf("tail logs of pod: %s", podName)
up := make(chan bool)
podUpdated := func(_, newPod any) {
pod, ok := newPod.(*v1.Pod)
if !ok {
log.Error().Msgf("could not parse pod: %v", newPod)
return
}
if pod.Name == podName {
if isImagePullBackOffState(pod) || isInvalidImageName(pod) {
up <- true
}
switch pod.Status.Phase {
case v1.PodRunning, v1.PodSucceeded, v1.PodFailed:
up <- true
}
}
}
si := informers.NewSharedInformerFactoryWithOptions(e.client, defaultResyncDuration, informers.WithNamespace(e.config.Namespace))
if _, err := si.Core().V1().Pods().Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
UpdateFunc: podUpdated,
},
); err != nil {
return nil, err
}
stop := make(chan struct{})
si.Start(stop)
defer close(stop)
<-up
opts := &v1.PodLogOptions{
Follow: true,
Container: podName,
}
logs, err := e.client.CoreV1().RESTClient().Get().
Namespace(e.config.Namespace).
Name(podName).
Resource("pods").
SubResource("log").
VersionedParams(opts, scheme.ParameterCodec).
Stream(ctx)
if err != nil {
return nil, err
}
rc, wc := io.Pipe()
go func() {
defer logs.Close()
defer wc.Close()
_, err = io.Copy(wc, logs)
if err != nil {
return
}
}()
return rc, nil
}
func (e *kube) DestroyStep(ctx context.Context, step *types.Step, taskUUID string) error {
var errs []error
log.Trace().Str("taskUUID", taskUUID).Msgf("Stopping step: %s", step.Name)
if needsRegistrySecret(step) {
err := stopRegistrySecret(ctx, e, step, defaultDeleteOptions)
if err != nil {
errs = append(errs, err)
}
}
err := stopPod(ctx, e, step, defaultDeleteOptions)
if err != nil {
errs = append(errs, err)
}
return std_errs.Join(errs...)
}
// DestroyWorkflow destroys the pipeline environment.
func (e *kube) DestroyWorkflow(ctx context.Context, conf *types.Config, taskUUID string) error {
log.Trace().Str("taskUUID", taskUUID).Msg("deleting Kubernetes primitives")
for _, stage := range conf.Stages {
for _, step := range stage.Steps {
err := stopPod(ctx, e, step, defaultDeleteOptions)
if err != nil {
return err
}
if step.Type == types.StepTypeService {
err := stopService(ctx, e, step, defaultDeleteOptions)
if err != nil {
return err
}
}
}
}
for _, vol := range conf.Volumes {
err := stopVolume(ctx, e, vol.Name, defaultDeleteOptions)
if err != nil {
return err
}
}
return nil
}