Skip to content
This repository has been archived by the owner on Aug 12, 2024. It is now read-only.

Commit

Permalink
Fix bundle upload tests
Browse files Browse the repository at this point in the history
This commit introduces a new condition to indicate
whether the bundle has been uploaded successfully
by the uploadmgr. Once the upload is successful,
the reconciler further starts unpacking the contents.

Signed-off-by: Varsha Prasad Narsing <[email protected]>
  • Loading branch information
varshaprasad96 authored and ncdc committed Jan 16, 2024
1 parent 0e86b39 commit 6e22aac
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 71 deletions.
4 changes: 4 additions & 0 deletions api/v1alpha2/bundledeployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ const (
TypeHasValidBundle = "HasValidBundle"
TypeHealthy = "Healthy"
TypeInstalled = "Installed"
// TypeUploadStatus indicates the status of the bundle content upload by the uploadmgr.
TypeUploadStatus = "UploadStatus"

ReasonBundleLoadFailed = "BundleLoadFailed"
ReasonCreateDynamicWatchFailed = "CreateDynamicWatchFailed"
Expand All @@ -45,6 +47,8 @@ const (
ReasonReconcileFailed = "ReconcileFailed"
ReasonUnhealthy = "Unhealthy"
ReasonUpgradeFailed = "UpgradeFailed"
ReasonUploadSuccessful = "UploadSuccessful"
ReasonUploadFailed = "UploadFailed"
)

// BundleDeploymentSpec defines the desired state of BundleDeployment
Expand Down
1 change: 0 additions & 1 deletion internal/rukpakctl/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ func (pf *ServicePortForwarder) Start(ctx context.Context) error {
path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", pf.serviceNamespace, podName)
host := strings.TrimLeft(pf.cfg.Host, "htps:/")
serverURL := url.URL{Scheme: "https", Path: path, Host: host}

roundTripper, upgrader, err := spdy.RoundTripperFor(pf.cfg)
if err != nil {
return err
Expand Down
17 changes: 17 additions & 0 deletions internal/source/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"net/http"

"github.com/nlepage/go-tarfs"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

rukpakv1alpha2 "github.com/operator-framework/rukpak/api/v1alpha2"
)
Expand All @@ -25,6 +27,12 @@ func (b *Upload) Unpack(ctx context.Context, bundle *rukpakv1alpha2.BundleDeploy
return nil, fmt.Errorf("cannot unpack source type %q with %q unpacker", bundle.Spec.Source.Type, rukpakv1alpha2.SourceTypeUpload)
}

// Proceed with fetching contents from a web server, only if the bundle upload was successful.
// If upload is a failure, we have "TypeUploadState" explicitly set to false.
if !isBundleContentUploaded(bundle) {
return &Result{State: StatePending, Message: "pending unpacking contents from uploaded bundle"}, nil
}

url := fmt.Sprintf("%s/uploads/%s.tgz", b.baseDownloadURL, bundle.Name)
action := fmt.Sprintf("%s %s", http.MethodGet, url)

Expand Down Expand Up @@ -57,3 +65,12 @@ func (b *Upload) Unpack(ctx context.Context, bundle *rukpakv1alpha2.BundleDeploy

return &Result{Bundle: bundleFS, ResolvedSource: bundle.Spec.Source.DeepCopy(), State: StateUnpacked, Message: message}, nil
}

func isBundleContentUploaded(bd *rukpakv1alpha2.BundleDeployment) bool {
if bd == nil {
return false
}

condition := meta.FindStatusCondition(bd.Status.Conditions, rukpakv1alpha2.TypeUploadStatus)
return condition != nil && condition.Status == metav1.ConditionTrue
}
29 changes: 27 additions & 2 deletions internal/uploadmgr/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -95,9 +96,33 @@ func newPutHandler(cl client.Client, storageDir string) http.Handler {
})
return cl.Status().Update(r.Context(), bundledeployment)
}); err != nil {
http.Error(w, err.Error(), int(getCode(err)))
errs := []error{}
errs = append(errs, err)

meta.SetStatusCondition(&bundledeployment.Status.Conditions, metav1.Condition{
Type: rukpakv1alpha2.TypeUploadStatus,
Status: metav1.ConditionFalse,
Reason: rukpakv1alpha2.ReasonBundleLoadFailed,
Message: err.Error(),
})
if statusUpdateErr := cl.Status().Update(r.Context(), bundledeployment); statusUpdateErr != nil {
errs = append(errs, statusUpdateErr)
}
http.Error(w, utilerrors.NewAggregate(errs).Error(), int(getCode(err)))
return
}
meta.SetStatusCondition(&bundledeployment.Status.Conditions, metav1.Condition{
Type: rukpakv1alpha2.TypeUploadStatus,
Status: metav1.ConditionTrue,
Reason: rukpakv1alpha2.ReasonUploadSuccessful,
Message: "successfully uploaded bundle contents.",
})
if statusUpdateErr := cl.Status().Update(r.Context(), bundledeployment); statusUpdateErr != nil {
// Though this would not be the http error returned from uploading, it
// is required to error, as BundleDeployment reconciler is waiting for
// was a successful upload status.
http.Error(w, statusUpdateErr.Error(), int(getCode(statusUpdateErr)))
}
w.WriteHeader(http.StatusCreated)
})
}
Expand All @@ -108,7 +133,7 @@ func isBundleDeploymentUnpacked(bd *rukpakv1alpha2.BundleDeployment) bool {
}

condition := meta.FindStatusCondition(bd.Status.Conditions, rukpakv1alpha2.TypeUnpacked)
return condition.Status == metav1.ConditionTrue
return condition != nil && condition.Status == metav1.ConditionTrue
}

func getCode(err error) int32 {
Expand Down
14 changes: 7 additions & 7 deletions test/e2e/crdvalidator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var _ = Describe("crdvalidator", func() {
crd = testutil.NewTestingCRD("", testutil.DefaultGroup,
[]apiextensionsv1.CustomResourceDefinitionVersion{
{
Name: "v1alpha2",
Name: "v1alpha1",
Served: true,
Storage: true,
Schema: &apiextensionsv1.CustomResourceValidation{
Expand Down Expand Up @@ -108,7 +108,7 @@ var _ = Describe("crdvalidator", func() {
BeforeEach(func() {
crd = testutil.NewTestingCRD("", testutil.DefaultGroup,
[]apiextensionsv1.CustomResourceDefinitionVersion{{
Name: "v1alpha2",
Name: "v1alpha1",
Served: true,
Storage: true,
Schema: &apiextensionsv1.CustomResourceValidation{
Expand All @@ -130,7 +130,7 @@ var _ = Describe("crdvalidator", func() {
}).Should(Succeed(), "should be able to create a safe crd but was not")

// Build up a CR to create out of unstructured.Unstructured
sampleCR := testutil.NewTestingCR(testutil.DefaultCrName, testutil.DefaultGroup, "v1alpha2", crd.Spec.Names.Singular)
sampleCR := testutil.NewTestingCR(testutil.DefaultCrName, testutil.DefaultGroup, "v1alpha1", crd.Spec.Names.Singular)
Eventually(func() error {
return c.Create(ctx, sampleCR)
}).Should(Succeed(), "should be able to create a cr for the sample crd but was not")
Expand All @@ -148,7 +148,7 @@ var _ = Describe("crdvalidator", func() {
return err.Error()
}

// Update the v1alpha2 schema to invalidate existing CR created in BeforeEach()
// Update the v1alpha1 schema to invalidate existing CR created in BeforeEach()
crd.Spec.Versions[0].Schema.OpenAPIV3Schema.Required = []string{"sampleProperty"}

err := c.Update(ctx, crd)
Expand All @@ -169,7 +169,7 @@ var _ = Describe("crdvalidator", func() {
crd.Labels = map[string]string{}
Expect(c.Update(ctx, crd)).To(Succeed())

// Update the v1alpha2 schema to invalidate existing CR created in BeforeEach()
// Update the v1alpha1 schema to invalidate existing CR created in BeforeEach()
crd.Spec.Versions[0].Schema.OpenAPIV3Schema.Required = []string{"sampleProperty"}

return c.Update(ctx, crd)
Expand All @@ -186,7 +186,7 @@ var _ = Describe("crdvalidator", func() {
crd.Annotations = map[string]string{annotation.ValidationKey: annotation.Disabled}
Expect(c.Update(ctx, crd)).To(Succeed())

// Update the v1alpha2 schema to invalidate existing CR created in BeforeEach()
// Update the v1alpha1 schema to invalidate existing CR created in BeforeEach()
crd.Spec.Versions[0].Schema.OpenAPIV3Schema.Required = []string{"sampleProperty"}

return c.Update(ctx, crd)
Expand All @@ -210,7 +210,7 @@ var _ = Describe("crdvalidator", func() {
crd = testutil.NewTestingCRD("", testutil.DefaultGroup,
[]apiextensionsv1.CustomResourceDefinitionVersion{
{
Name: "v1alpha2",
Name: "v1alpha1",
Served: true,
Storage: true,
Schema: &apiextensionsv1.CustomResourceValidation{
Expand Down
122 changes: 61 additions & 61 deletions test/e2e/plain_provisioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,67 +912,6 @@ var _ = Describe("plain provisioner bundle", func() {
})
})

When("the bundle deployment is uploaded", func() {
var (
bundledeployment *rukpakv1alpha2.BundleDeployment
ctx context.Context
)

BeforeEach(func() {
ctx = context.Background()

bundleFS := os.DirFS(filepath.Join(testdataDir, "bundles/plain-v0/valid"))
bundledeployment = &rukpakv1alpha2.BundleDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("valid-upload-%s", rand.String(8)),
},
Spec: rukpakv1alpha2.BundleDeploymentSpec{
ProvisionerClassName: plain.ProvisionerID,
Source: rukpakv1alpha2.BundleSource{
Type: rukpakv1alpha2.SourceTypeUpload,
Upload: &rukpakv1alpha2.UploadSource{},
},
},
}
err := c.Create(ctx, bundledeployment)
Expect(err).ToNot(HaveOccurred())

rootCAs, err := rukpakctl.GetClusterCA(ctx, c, types.NamespacedName{Namespace: defaultSystemNamespace, Name: "rukpak-ca"})
Expect(err).ToNot(HaveOccurred())

bu := rukpakctl.BundleUploader{
UploadServiceName: defaultUploadServiceName,
UploadServiceNamespace: defaultSystemNamespace,
Cfg: cfg,
RootCAs: rootCAs,
}
uploadCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
_, err = bu.Upload(uploadCtx, bundledeployment.Name, bundleFS)
Expect(err).ToNot(HaveOccurred())
})

AfterEach(func() {
err := c.Delete(ctx, bundledeployment)
Expect(client.IgnoreNotFound(err)).To(Succeed())
})

It("can unpack the bundle successfully", func() {
Eventually(func() (*metav1.Condition, error) {
if err := c.Get(ctx, client.ObjectKeyFromObject(bundledeployment), bundledeployment); err != nil {
return nil, err
}
return meta.FindStatusCondition(bundledeployment.Status.Conditions, rukpakv1alpha2.TypeInstalled), nil
}).Should(And(
Not(BeNil()),
WithTransform(func(c *metav1.Condition) string { return c.Type }, Equal(rukpakv1alpha2.TypeInstalled)),
WithTransform(func(c *metav1.Condition) metav1.ConditionStatus { return c.Status }, Equal(metav1.ConditionTrue)),
WithTransform(func(c *metav1.Condition) string { return c.Reason }, Equal(rukpakv1alpha2.ReasonInstallationSucceeded)),
WithTransform(func(c *metav1.Condition) string { return c.Message }, ContainSubstring("Instantiated bundle")),
))
})
})

When("the bundle is backed by an invalid configmap", func() {
var (
bundledeployment *rukpakv1alpha2.BundleDeployment
Expand Down Expand Up @@ -1050,6 +989,67 @@ var _ = Describe("plain provisioner bundle", func() {
})
})

When("the bundle deployment is uploaded", func() {
var (
bundledeployment *rukpakv1alpha2.BundleDeployment
ctx context.Context
)

BeforeEach(func() {
ctx = context.Background()

bundleFS := os.DirFS(filepath.Join(testdataDir, "bundles/plain-v0/valid"))
bundledeployment = &rukpakv1alpha2.BundleDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("valid-upload-%s", rand.String(8)),
},
Spec: rukpakv1alpha2.BundleDeploymentSpec{
ProvisionerClassName: plain.ProvisionerID,
Source: rukpakv1alpha2.BundleSource{
Type: rukpakv1alpha2.SourceTypeUpload,
Upload: &rukpakv1alpha2.UploadSource{},
},
},
}
err := c.Create(ctx, bundledeployment)
Expect(err).ToNot(HaveOccurred())

rootCAs, err := rukpakctl.GetClusterCA(ctx, c, types.NamespacedName{Namespace: defaultSystemNamespace, Name: "rukpak-ca"})
Expect(err).ToNot(HaveOccurred())

bu := rukpakctl.BundleUploader{
UploadServiceName: defaultUploadServiceName,
UploadServiceNamespace: defaultSystemNamespace,
Cfg: cfg,
RootCAs: rootCAs,
}
uploadCtx, cancel := context.WithTimeout(ctx, time.Second*5)
defer cancel()
_, err = bu.Upload(uploadCtx, bundledeployment.Name, bundleFS)
Expect(err).ToNot(HaveOccurred())
})

AfterEach(func() {
err := c.Delete(ctx, bundledeployment)
Expect(client.IgnoreNotFound(err)).To(Succeed())
})

It("can unpack the bundle successfully", func() {
Eventually(func() (*metav1.Condition, error) {
if err := c.Get(ctx, client.ObjectKeyFromObject(bundledeployment), bundledeployment); err != nil {
return nil, err
}
return meta.FindStatusCondition(bundledeployment.Status.Conditions, rukpakv1alpha2.TypeInstalled), nil
}).Should(And(
Not(BeNil()),
WithTransform(func(c *metav1.Condition) string { return c.Type }, Equal(rukpakv1alpha2.TypeInstalled)),
WithTransform(func(c *metav1.Condition) metav1.ConditionStatus { return c.Status }, Equal(metav1.ConditionTrue)),
WithTransform(func(c *metav1.Condition) string { return c.Reason }, Equal(rukpakv1alpha2.ReasonInstallationSucceeded)),
WithTransform(func(c *metav1.Condition) string { return c.Message }, ContainSubstring("Instantiated bundle")),
))
})
})

When("the bundle deployment is backed by an invalid upload", func() {
var (
bundledeployment *rukpakv1alpha2.BundleDeployment
Expand Down

0 comments on commit 6e22aac

Please sign in to comment.