diff --git a/examples/input-artifact-oss.yaml b/examples/input-artifact-oss.yaml new file mode 100644 index 000000000000..bae3f4acb773 --- /dev/null +++ b/examples/input-artifact-oss.yaml @@ -0,0 +1,37 @@ +# This example demonstrates the loading of a hard-wired input artifact from an OSS compliant +# store. OSS guards access to buckets using an access key and +# secret key, which will be stored as regular Kubernetes secrets, and referenced in the +# workflow using secret selectors. To create the secret required for this example, first +# run the following command: +# $ kubectl create secret generic my-oss-credentials --from-literal=accessKey= --from-literal=secretKey= +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: input-artifact-oss- +spec: + entrypoint: input-artifact-oss-example + templates: + - name: input-artifact-oss-example + inputs: + artifacts: + - name: my-art + path: /my-artifact + oss: + endpoint: http://oss-cn-hangzhou-zmf.aliyuncs.com + bucket: test-bucket-name + key: test/mydirectory/ # this is path in the bucket + # accessKeySecret and secretKeySecret are secret selectors. + # It references the k8s secret named 'my-oss-credentials'. + # This secret is expected to have have the keys 'accessKey' + # and 'secretKey', containing the base64 encoded credentials + # to the bucket. + accessKeySecret: + name: my-oss-credentials + key: accessKey + secretKeySecret: + name: my-oss-credentials + key: secretKey + container: + image: debian:latest + command: [sh, -c] + args: ["ls -l /my-artifact"] diff --git a/pkg/apis/workflow/v1alpha1/workflow_types.go b/pkg/apis/workflow/v1alpha1/workflow_types.go index 895f90fd2f58..b81244254caf 100644 --- a/pkg/apis/workflow/v1alpha1/workflow_types.go +++ b/pkg/apis/workflow/v1alpha1/workflow_types.go @@ -629,6 +629,9 @@ type ArtifactLocation struct { // Raw contains raw artifact location details Raw *RawArtifact `json:"raw,omitempty" protobuf:"bytes,7,opt,name=raw"` + + // OSS contains OSS artifact location details + OSS *OSSArtifact `json:"oss,omitempty" protobuf:"bytes,8,opt,name=oss"` } type ArtifactRepositoryRef struct { @@ -1193,6 +1196,38 @@ func (h *HTTPArtifact) HasLocation() bool { return h != nil && h.URL != "" } +// OSSBucket contains the access information required for interfacing with an OSS bucket +type OSSBucket struct { + // Endpoint is the hostname of the bucket endpoint + Endpoint string `json:"endpoint"` + + // Bucket is the name of the bucket + Bucket string `json:"bucket"` + + // AccessKeySecret is the secret selector to the bucket's access key + AccessKeySecret apiv1.SecretKeySelector `json:"accessKeySecret"` + + // SecretKeySecret is the secret selector to the bucket's secret key + SecretKeySecret apiv1.SecretKeySelector `json:"secretKeySecret"` +} + +// OSSArtifact is the location of an OSS artifact +type OSSArtifact struct { + OSSBucket `json:",inline"` + + // Key is the path in the bucket where the artifact resides + Key string `json:"key"` +} + +func (o *OSSArtifact) String() string { + protocol := "https" + return fmt.Sprintf("%s://%s/%s/%s", protocol, o.Endpoint, o.Bucket, o.Key) +} + +func (o *OSSArtifact) HasLocation() bool { + return o != nil && o.Bucket != "" && o.Endpoint != "" && o.Key != "" +} + // ExecutorConfig holds configurations of an executor container. type ExecutorConfig struct { // ServiceAccountName specifies the service account name of the executor container. @@ -1418,7 +1453,8 @@ func (a *Artifact) HasLocation() bool { a.HTTP.HasLocation() || a.Artifactory.HasLocation() || a.Raw.HasLocation() || - a.HDFS.HasLocation() + a.HDFS.HasLocation() || + a.OSS.HasLocation() } // GetTemplateByName retrieves a defined template by its name diff --git a/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go index 67fb9eedefb3..4e773577a838 100644 --- a/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go @@ -133,6 +133,11 @@ func (in *ArtifactLocation) DeepCopyInto(out *ArtifactLocation) { *out = new(RawArtifact) **out = **in } + if in.OSS != nil { + in, out := &in.OSS, &out.OSS + *out = new(OSSArtifact) + (*in).DeepCopyInto(*out) + } return } @@ -782,6 +787,41 @@ func (in *NoneStrategy) DeepCopy() *NoneStrategy { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OSSArtifact) DeepCopyInto(out *OSSArtifact) { + *out = *in + in.OSSBucket.DeepCopyInto(&out.OSSBucket) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OSSArtifact. +func (in *OSSArtifact) DeepCopy() *OSSArtifact { + if in == nil { + return nil + } + out := new(OSSArtifact) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OSSBucket) DeepCopyInto(out *OSSBucket) { + *out = *in + in.AccessKeySecret.DeepCopyInto(&out.AccessKeySecret) + in.SecretKeySecret.DeepCopyInto(&out.SecretKeySecret) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OSSBucket. +func (in *OSSBucket) DeepCopy() *OSSBucket { + if in == nil { + return nil + } + out := new(OSSBucket) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Outputs) DeepCopyInto(out *Outputs) { *out = *in diff --git a/workflow/artifacts/artifacts.go b/workflow/artifacts/artifacts.go index 5a5f9a715eb6..0a41eb1e21fa 100644 --- a/workflow/artifacts/artifacts.go +++ b/workflow/artifacts/artifacts.go @@ -2,6 +2,7 @@ package executor import ( "fmt" + "github.com/argoproj/argo/workflow/artifacts/oss" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo/workflow/artifacts/artifactory" @@ -107,5 +108,30 @@ func NewDriver(art *wfv1.Artifact, ri resource.Interface) (ArtifactDriver, error return &raw.RawArtifactDriver{}, nil } + if art.OSS != nil { + var accessKey string + var secretKey string + + if art.OSS.AccessKeySecret.Name != "" { + accessKeyBytes, err := ri.GetSecret(art.OSS.AccessKeySecret.Name, art.OSS.AccessKeySecret.Key) + if err != nil { + return nil, err + } + accessKey = string(accessKeyBytes) + secretKeyBytes, err := ri.GetSecret(art.OSS.SecretKeySecret.Name, art.OSS.SecretKeySecret.Key) + if err != nil { + return nil, err + } + secretKey = string(secretKeyBytes) + } + + driver := oss.OSSArtifactDriver{ + Endpoint: art.OSS.Endpoint, + AccessKey: accessKey, + SecretKey: secretKey, + } + return &driver, nil + } + return nil, ErrUnsupportedDriver } diff --git a/workflow/artifacts/oss/oss.go b/workflow/artifacts/oss/oss.go new file mode 100644 index 000000000000..50802691c21b --- /dev/null +++ b/workflow/artifacts/oss/oss.go @@ -0,0 +1,78 @@ +package oss + +import ( + "time" + + log "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/aliyun/aliyun-oss-go-sdk/oss" + + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" +) + +// OSSArtifactDriver is a driver for OSS +type OSSArtifactDriver struct { + Endpoint string + AccessKey string + SecretKey string +} + +func (ossDriver *OSSArtifactDriver) newOSSClient() (*oss.Client, error) { + client, err := oss.New(ossDriver.Endpoint, ossDriver.AccessKey, ossDriver.SecretKey) + if err != nil { + log.Warnf("Failed to create new OSS client: %v", err) + return nil, err + } + return client, err +} + +// Downloads artifacts from OSS compliant storage, e.g., downloading an artifact into local path +func (ossDriver *OSSArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error { + err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1}, + func() (bool, error) { + log.Infof("OSS Load path: %s, key: %s", path, inputArtifact.OSS.Key) + osscli, err := ossDriver.newOSSClient() + if err != nil { + log.Warnf("Failed to create new OSS client: %v", err) + return false, nil + } + bucketName := inputArtifact.OSS.Bucket + bucket, err := osscli.Bucket(bucketName) + if err != nil { + return false, err + } + objectName := inputArtifact.OSS.Key + err = bucket.GetObjectToFile(objectName, path) + if err != nil { + return false, err + } + return true, nil + }) + return err +} + +// Saves an artifact to OSS compliant storage, e.g., uploading a local file to OSS bucket +func (ossDriver *OSSArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact) error { + err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1}, + func() (bool, error) { + log.Infof("OSS Save path: %s, key: %s", path, outputArtifact.OSS.Key) + osscli, err := ossDriver.newOSSClient() + if err != nil { + log.Warnf("Failed to create new OSS client: %v", err) + return false, nil + } + bucketName := outputArtifact.OSS.Bucket + bucket, err := osscli.Bucket(bucketName) + if err != nil { + return false, err + } + objectName := outputArtifact.OSS.Key + err = bucket.PutObjectFromFile(objectName, path) + if err != nil { + return false, err + } + return true, nil + }) + return err +} diff --git a/workflow/config/config.go b/workflow/config/config.go index 77541530eb6e..a22fa2fbdb31 100644 --- a/workflow/config/config.go +++ b/workflow/config/config.go @@ -91,6 +91,8 @@ type ArtifactRepository struct { Artifactory *ArtifactoryArtifactRepository `json:"artifactory,omitempty"` // HDFS stores artifacts in HDFS HDFS *HDFSArtifactRepository `json:"hdfs,omitempty"` + // OSS stores artifact in a OSS-compliant object store + OSS *OSSArtifactRepository `json:"oss,omitempty"` } func (a *ArtifactRepository) IsArchiveLogs() bool { @@ -150,6 +152,14 @@ type S3ArtifactRepository struct { KeyPrefix string `json:"keyPrefix,omitempty"` } +// OSSArtifactRepository defines the controller configuration for an OSS artifact repository +type OSSArtifactRepository struct { + wfv1.OSSBucket `json:",inline"` + + // KeyFormat is defines the format of how to store keys. Can reference workflow variables + KeyFormat string `json:"keyFormat,omitempty"` +} + // ArtifactoryArtifactRepository defines the controller configuration for an artifactory artifact repository type ArtifactoryArtifactRepository struct { wfv1.ArtifactoryAuth `json:",inline"` diff --git a/workflow/controller/workflowpod.go b/workflow/controller/workflowpod.go index cd0e8d15dc2f..1ce2c560e9a5 100644 --- a/workflow/controller/workflowpod.go +++ b/workflow/controller/workflowpod.go @@ -832,7 +832,7 @@ func (woc *wfOperationCtx) addArchiveLocation(pod *apiv1.Pod, tmpl *wfv1.Templat var needLocation bool if tmpl.ArchiveLocation != nil { - if tmpl.ArchiveLocation.S3 != nil || tmpl.ArchiveLocation.Artifactory != nil || tmpl.ArchiveLocation.HDFS != nil { + if tmpl.ArchiveLocation.S3 != nil || tmpl.ArchiveLocation.Artifactory != nil || tmpl.ArchiveLocation.HDFS != nil || tmpl.ArchiveLocation.OSS != nil { // User explicitly set the location. nothing else to do. return nil } @@ -888,6 +888,17 @@ func (woc *wfOperationCtx) addArchiveLocation(pod *apiv1.Pod, tmpl *wfv1.Templat Path: hdfsLocation.PathFormat, Force: hdfsLocation.Force, } + } else if ossLocation := woc.artifactRepository.OSS; ossLocation != nil { + woc.log.Debugf("Setting OSS artifact repository information") + artLocationKey := ossLocation.KeyFormat + tmpl.ArchiveLocation.OSS = &wfv1.OSSArtifact{ + OSSBucket: wfv1.OSSBucket{ + Endpoint: ossLocation.Endpoint, + AccessKeySecret: ossLocation.AccessKeySecret, + SecretKeySecret: ossLocation.SecretKeySecret, + }, + Key: artLocationKey, + } } else { return errors.Errorf(errors.CodeBadRequest, "controller is not configured with a default archive location") } @@ -1078,6 +1089,9 @@ func createArchiveLocationSecret(tmpl *wfv1.Template, volMap map[string]apiv1.Vo createSecretVal(volMap, gitRepo.UsernameSecret, uniqueKeyMap) createSecretVal(volMap, gitRepo.PasswordSecret, uniqueKeyMap) createSecretVal(volMap, gitRepo.SSHPrivateKeySecret, uniqueKeyMap) + } else if ossRepo := tmpl.ArchiveLocation.OSS; ossRepo != nil { + createSecretVal(volMap, &ossRepo.AccessKeySecret, uniqueKeyMap) + createSecretVal(volMap, &ossRepo.SecretKeySecret, uniqueKeyMap) } } @@ -1095,6 +1109,9 @@ func createSecretVolume(volMap map[string]apiv1.Volume, art wfv1.Artifact, keyMa } else if art.HDFS != nil { createSecretVal(volMap, art.HDFS.KrbCCacheSecret, keyMap) createSecretVal(volMap, art.HDFS.KrbKeytabSecret, keyMap) + } else if art.OSS != nil { + createSecretVal(volMap, &art.OSS.AccessKeySecret, keyMap) + createSecretVal(volMap, &art.OSS.SecretKeySecret, keyMap) } } diff --git a/workflow/executor/executor.go b/workflow/executor/executor.go index 78907331358d..0d634bcd7768 100644 --- a/workflow/executor/executor.go +++ b/workflow/executor/executor.go @@ -268,6 +268,10 @@ func (we *WorkflowExecutor) saveArtifact(mainCtrID string, art *wfv1.Artifact) e shallowCopy := *we.Template.ArchiveLocation.HDFS art.HDFS = &shallowCopy art.HDFS.Path = path.Join(art.HDFS.Path, fileName) + } else if we.Template.ArchiveLocation.OSS != nil { + shallowCopy := *we.Template.ArchiveLocation.OSS + art.OSS = &shallowCopy + art.OSS.Key = path.Join(art.OSS.Key, fileName) } else { return errors.Errorf(errors.CodeBadRequest, "Unable to determine path to store %s. Archive location provided no information", art.Name) }