Other small fixes and improvements included. Change-Id: I878c4dce66e62e9128cbec658fc98e1ef71e8047 Signed-off-by: Ruslan Aliev <raliev@mirantis.com>
419 lines
14 KiB
419 lines
14 KiB
Copyright 2023.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package controller
import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
ctrl "sigs.k8s.io/controller-runtime"
armadav1 "opendev.org/airship/armada-operator/api/v1"
// ArmadaChartReconciler reconciles a ArmadaChart object
type ArmadaChartReconciler struct {
ControllerName string
Scheme *runtime.Scheme
httpClient *retryablehttp.Client
// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
func (r *ArmadaChartReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
start := time.Now()
log := ctrl.LoggerFrom(ctx)
log.Info("reconciling has started")
// Retrieve the custom resource
var ac armadav1.ArmadaChart
if err := r.Get(ctx, req.NamespacedName, &ac); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
// Add our finalizer if it does not exist
if !controllerutil.ContainsFinalizer(&ac, armadav1.ArmadaChartFinalizer) {
patch := client.MergeFrom(ac.DeepCopy())
controllerutil.AddFinalizer(&ac, armadav1.ArmadaChartFinalizer)
if err := r.Patch(ctx, &ac, patch); err != nil {
log.Error(err, "unable to register finalizer")
return ctrl.Result{}, err
// Examine if the object is under deletion
if !ac.ObjectMeta.DeletionTimestamp.IsZero() {
log.Info("object is under deletion, uninstalling corresponding helm release")
return r.reconcileDelete(ctx, &ac)
// Perform reconciliation
ac, result, err := r.reconcile(ctx, ac)
if updateStatusErr := r.patchStatus(ctx, &ac); updateStatusErr != nil {
log.Error(updateStatusErr, "unable to update status after reconciliation")
return ctrl.Result{Requeue: false}, updateStatusErr
// Log reconciliation duration
durationMsg := fmt.Sprintf("reconciliation finished in %s", time.Since(start).String())
if result.RequeueAfter > 0 {
durationMsg = fmt.Sprintf("%s, next run in %s", durationMsg, result.RequeueAfter.String())
return result, err
func (r *ArmadaChartReconciler) reconcile(ctx context.Context, ac armadav1.ArmadaChart) (armadav1.ArmadaChart, ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
// Observe HelmRelease generation.
if ac.Status.ObservedGeneration != ac.Generation {
ac.Status.ObservedGeneration = ac.Generation
ac = armadav1.ArmadaChartProgressing(ac)
if updateStatusErr := r.patchStatus(ctx, &ac); updateStatusErr != nil {
log.Error(updateStatusErr, "unable to update status after generation update")
return ac, ctrl.Result{Requeue: true}, updateStatusErr
// Prepare values
var vals map[string]interface{}
if ac.Spec.Values != nil {
var vals_err error
vals, vals_err = chartutil.ReadValues(ac.Spec.Values.Raw)
if vals_err != nil {
return armadav1.ArmadaChartNotReady(ac, "InitFailed", vals_err.Error()), ctrl.Result{}, vals_err
// Load chart from artifact
chrt, err := r.loadHelmChart(ctx, ac, ac.Spec.Source.Location)
if err != nil {
return armadav1.ArmadaChartNotReady(ac, "ArtifactFailed", err.Error()), ctrl.Result{}, err
reconAc, reconErr := r.reconcileChart(ctx, *ac.DeepCopy(), chrt, vals)
return reconAc, ctrl.Result{}, reconErr
func (r *ArmadaChartReconciler) reconcileChart(ctx context.Context,
ac armadav1.ArmadaChart, chrt *chart.Chart, vals chartutil.Values) (armadav1.ArmadaChart, error) {
log := ctrl.LoggerFrom(ctx)
gettr, err := r.buildRESTClientGetter(ctx, ac)
if err != nil {
return armadav1.ArmadaChartNotReady(ac, "InitFailed", err.Error()), err
run, err := runner.NewRunner(gettr, ac.Namespace, log)
if err != nil {
return armadav1.ArmadaChartNotReady(ac, "InitFailed", "failed to initialize Helm action runner"), err
// Determine last release revision.
rel, observeLastReleaseErr := run.ObserveLastRelease(ac)
if observeLastReleaseErr != nil {
err = fmt.Errorf("failed to get last release revision: %w", observeLastReleaseErr)
return armadav1.ArmadaChartNotReady(ac, "GetLastReleaseFailed", "failed to get last release revision"), err
testRel := func() (armadav1.ArmadaChart, error) {
if ac.Spec.Test.Enabled && !ac.Status.Tested {
log.Info("performing tests")
rel, err = run.Test(ac)
if err != nil {
return armadav1.ArmadaChartNotReady(ac, "TestFailed", err.Error()), err
return armadav1.ArmadaChartReady(ac), err
if rel == nil {
log.Info("helm install has started")
rel, err = run.Install(ctx, ac, chrt, vals)
} else {
if rel.Info.Status == release.StatusDeployed && !isUpdateRequired(ctx, rel, chrt, vals) {
log.Info("no updates found, skipping upgrade")
return testRel()
if rel.Info.Status.IsPending() {
log.Info("warning: release in pending state, unlocking")
rel.SetStatus(release.StatusFailed, fmt.Sprintf("release unlocked from stale state"))
} else {
for _, delRes := range ac.Spec.Upgrade.PreUpgrade.Delete {
log.Info(fmt.Sprintf("deleting all %ss in %s ns with labels %v", delRes.Type, ac.Spec.Namespace, delRes.Labels))
switch delRes.Type {
case "", "job":
err = r.DeleteAllOf(ctx, &batchv1.Job{}, client.MatchingLabels(delRes.Labels), client.InNamespace(ac.Spec.Namespace))
if err != nil {
return armadav1.ArmadaChartNotReady(ac, "DeleteFailed", err.Error()), err
case "pod":
err = r.DeleteAllOf(ctx, &corev1.Pod{}, client.MatchingLabels(delRes.Labels), client.InNamespace(ac.Spec.Namespace))
if err != nil {
return armadav1.ArmadaChartNotReady(ac, "DeleteFailed", err.Error()), err
case "cronjob":
err = r.DeleteAllOf(ctx, &batchv1.CronJob{}, client.MatchingLabels(delRes.Labels), client.InNamespace(ac.Spec.Namespace))
if err != nil {
return armadav1.ArmadaChartNotReady(ac, "DeleteFailed", err.Error()), err
log.Info("helm upgrade has started")
rel, err = run.Upgrade(ctx, ac, chrt, vals)
if err != nil {
err = fmt.Errorf("failed to install/upgrade helm release: %s", err.Error())
return armadav1.ArmadaChartNotReady(ac, "InstallUpgradeFailed", err.Error()), err
if ac.Spec.Wait.Timeout > 0 && len(ac.Spec.Wait.Labels) > 0 {
log.Info("preparing to wait resources")
resCfg, err := gettr.ToRESTConfig()
if err != nil {
return armadav1.ArmadaChartNotReady(ac, "WaitFailed", err.Error()), err
err = r.waitRelease(ctx, resCfg, ac)
if err != nil {
return armadav1.ArmadaChartNotReady(ac, "WaitFailed", err.Error()), err
return testRel()
func (r *ArmadaChartReconciler) waitRelease(ctx context.Context, restCfg *rest.Config, hr armadav1.ArmadaChart) error {
log := ctrl.LoggerFrom(ctx)
if hr.Spec.Wait.ArmadaChartWaitResources == nil {
log.Info(fmt.Sprintf("there are no explicitly defined resources to wait: %s, using default ones", hr.Name))
hr.Spec.Wait.ArmadaChartWaitResources = []armadav1.ArmadaChartWaitResource{{Type: "job"}, {Type: "pod"}}
if len(hr.Spec.Wait.ArmadaChartWaitResources) == 0 {
log.Info(fmt.Sprintf("there are none resources to wait: %s", hr.Name))
if hr.Spec.Wait.Labels == nil {
hr.Spec.Wait.Labels = make(map[string]string)
for _, res := range hr.Spec.Wait.ArmadaChartWaitResources {
log.Info(fmt.Sprintf("processing wait resource %v", res))
if res.Labels == nil || len(res.Labels) == 0 {
res.Labels = make(map[string]string)
for kk, vv := range hr.Spec.Wait.Labels {
res.Labels[kk] = vv
if len(res.Labels) == 0 {
log.Info("no selectors applied, continuing...")
log.Info(fmt.Sprintf("Resolved `wait.resources` list: %v", res))
var labelSelector string
for k, v := range res.Labels {
if len(labelSelector) > 0 {
labelSelector = fmt.Sprintf("%s,%s=%s", labelSelector, k, v)
} else {
labelSelector = fmt.Sprintf("%s=%s", k, v)
opts := waitutil.WaitOptions{
RestConfig: restCfg,
Namespace: hr.Spec.Namespace,
LabelSelector: labelSelector,
ResourceType: fmt.Sprintf("%ss", res.Type),
Timeout: time.Second * time.Duration(hr.Spec.Wait.Timeout),
MinReady: res.MinReady,
Logger: log,
err := opts.Wait(ctx)
if err != nil {
return err
log.Info("all resources are ready")
return nil
// loadHelmChart attempts to download the artifact from the provided source,
// loads it into a chart.Chart, and removes the downloaded artifact.
// It returns the loaded chart.Chart on success, or an error.
func (r *ArmadaChartReconciler) loadHelmChart(ctx context.Context, hr armadav1.ArmadaChart, source string) (*chart.Chart, error) {
log := ctrl.LoggerFrom(ctx)
f, err := os.CreateTemp("", fmt.Sprintf("%s-%s-*.tgz", hr.GetNamespace(), hr.GetName()))
if err != nil {
return nil, err
defer f.Close()
defer os.Remove(f.Name())
req, err := retryablehttp.NewRequest(http.MethodGet, source, nil)
if err != nil {
return nil, fmt.Errorf("failed to create a new request: %w", err)
resp, err := r.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to download artifact, error: %w", err)
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("artifact '%s' download failed (status code: %s)", source, resp.Status)
if _, err := io.Copy(f, resp.Body); err != nil {
return nil, err
log.Info(fmt.Sprintf("helm chart downloaded to %s", f.Name()))
return loader.Load(f.Name())
func (r *ArmadaChartReconciler) buildRESTClientGetter(_ context.Context, hr armadav1.ArmadaChart) (genericclioptions.RESTClientGetter, error) {
opts := []kube.Option{
return kube.NewInClusterMemoryRESTClientGetter(opts...)
func (r *ArmadaChartReconciler) patchStatus(ctx context.Context, ac *armadav1.ArmadaChart) error {
latest := &armadav1.ArmadaChart{}
if err := r.Client.Get(ctx, client.ObjectKeyFromObject(ac), latest); err != nil {
return err
patch := client.MergeFrom(latest.DeepCopy())
latest.Status = ac.Status
return r.Client.Status().Patch(ctx, latest, patch, client.FieldOwner(r.ControllerName))
// reconcileDelete deletes the Helm Release of the ArmadaChart,
// and uninstalls the Helm release if the resource has not been suspended.
// It only performs a Helm uninstall if the ServiceAccount to be impersonated
// exists.
func (r *ArmadaChartReconciler) reconcileDelete(ctx context.Context, ac *armadav1.ArmadaChart) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
getter, err := r.buildRESTClientGetter(ctx, *ac)
if err != nil {
return ctrl.Result{}, err
run, err := runner.NewRunner(getter, ac.Spec.Namespace, ctrl.LoggerFrom(ctx))
if err != nil {
return ctrl.Result{}, err
if err := run.Uninstall(*ac); err != nil && !errors.Is(err, driver.ErrReleaseNotFound) {
return ctrl.Result{}, err
log.Info("uninstalled Helm release for deleted resource")
// Remove our finalizer from the list and update it.
controllerutil.RemoveFinalizer(ac, armadav1.ArmadaChartFinalizer)
if err := r.Update(ctx, ac); err != nil {
return ctrl.Result{}, err
return ctrl.Result{}, nil
// SetupWithManager sets up the controller with the Manager.
func (r *ArmadaChartReconciler) SetupWithManager(mgr ctrl.Manager) error {
httpClient := retryablehttp.NewClient()
httpClient.RetryWaitMin = 5 * time.Second
httpClient.RetryWaitMax = 30 * time.Second
httpClient.RetryMax = 3
httpClient.Logger = nil
r.httpClient = httpClient
r.ControllerName = "armada-controller"
return ctrl.NewControllerManagedBy(mgr).
For(&armadav1.ArmadaChart{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})).Complete(r)
func isUpdateRequired(ctx context.Context, release *release.Release, chrt *chart.Chart, vals chartutil.Values) bool {
log := ctrl.LoggerFrom(ctx)
switch {
case !cmp.Equal(release.Chart.Templates, chrt.Templates, cmpopts.EquateEmpty()):
log.Info("There are chart template diffs found")
log.Info(cmp.Diff(release.Chart.Templates, chrt.Templates))
return true
//case !cmp.Equal(release.Chart.Values, chrt.Values, cmpopts.EquateEmpty()):
// log.Info("There are CHART DEF VALUES diffs")
// log.Info(cmp.Diff(release.Chart.Values, chrt.Values, cmpopts.EquateEmpty()))
// return true
case !cmp.Equal(release.Config, vals.AsMap(), cmpopts.EquateEmpty()):
log.Info("There are chart values diffs found")
log.Info(cmp.Diff(release.Config, vals.AsMap(), cmpopts.EquateEmpty()))
return true
return false