Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for Alibaba Cloud OSS artifact #1919

Merged
merged 32 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2925d8d
oss artifact configure
merlintang Jan 4, 2020
7f12f52
update for the oss configue
merlintang Jan 8, 2020
7b9015b
update with upstream
merlintang Jan 8, 2020
2ced7ad
update for passing the test
merlintang Jan 9, 2020
5327b24
update the codegen
merlintang Jan 10, 2020
7245cb8
triger ci
merlintang Jan 11, 2020
7e06961
test
merlintang Jan 11, 2020
970f130
update the openapi
merlintang Jan 11, 2020
8da9a34
Revert "update the openapi"
merlintang Jan 13, 2020
b5a9083
update the oss artifact yaml
merlintang Jan 14, 2020
b90167a
update the oss artifact yaml
merlintang Jan 14, 2020
ece451e
Merge branch 'oss-artifact' of github.com:merlintang/argo into oss-ar…
merlintang Feb 4, 2020
27621b9
update with codegen
merlintang Feb 4, 2020
6e69f74
update for the ci
merlintang Feb 4, 2020
5441cbe
Merge remote-tracking branch 'upstream/master' into oss-artifact
merlintang Feb 5, 2020
6c407d7
update for the golang
merlintang Feb 5, 2020
9175f3b
fix the goformat
merlintang Feb 6, 2020
a6fc3c3
Merge remote-tracking branch 'upstream/master' into oss-artifact
merlintang Feb 26, 2020
a7b2f7d
update the link
merlintang Feb 26, 2020
1ebed7f
make go happy
merlintang Feb 26, 2020
ecffead
gofrmat
merlintang Feb 26, 2020
c318f13
update with goimports
merlintang Feb 26, 2020
1852b37
update goimports
merlintang Feb 26, 2020
f4340c5
update go imports
merlintang Feb 26, 2020
4fb1b9f
Merge remote-tracking branch 'upstream/master' into oss-artifact
merlintang Feb 27, 2020
2dbc6e0
Delete Gopkg.toml
alexec Feb 28, 2020
3e476af
Delete Gopkg.lock
alexec Feb 28, 2020
b88461d
Update workflow/artifacts/oss/oss.go
alexec Feb 28, 2020
73cfb87
This is a blank commit
merlintang Mar 3, 2020
e084ac0
triger the ci
merlintang Mar 4, 2020
896cebd
update with comments
merlintang Mar 4, 2020
527d492
goformat
merlintang Mar 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions examples/input-artifact-oss.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# This example demonstrates the loading of a hard-wired input artifact from an OSS compliant
# store. OSS guards access to buckets using an access key and
# secret key, which will be stored as regular Kubernetes secrets, and referenced in the
# workflow using secret selectors. To create the secret required for this example, first
# run the following command:
# $ kubectl create secret generic my-oss-credentials --from-literal=accessKey=<YOUR-OSS-ACCESS-ID> --from-literal=secretKey=<YOUR-OSS-SECRET-KEY>
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: input-artifact-oss-
spec:
entrypoint: input-artifact-oss-example
templates:
- name: input-artifact-oss-example
inputs:
artifacts:
- name: my-art
path: /my-artifact
oss:
endpoint: http://oss-cn-hangzhou-zmf.aliyuncs.com
bucket: test-bucket-name
key: test/mydirectory/ # this is path in the bucket
# accessKeySecret and secretKeySecret are secret selectors.
# It references the k8s secret named 'my-oss-credentials'.
# This secret is expected to have have the keys 'accessKey'
# and 'secretKey', containing the base64 encoded credentials
# to the bucket.
accessKeySecret:
name: my-oss-credentials
key: accessKey
secretKeySecret:
name: my-oss-credentials
key: secretKey
container:
image: debian:latest
command: [sh, -c]
args: ["ls -l /my-artifact"]
38 changes: 37 additions & 1 deletion pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,9 @@ type ArtifactLocation struct {

// Raw contains raw artifact location details
Raw *RawArtifact `json:"raw,omitempty" protobuf:"bytes,7,opt,name=raw"`

// OSS contains OSS artifact location details
OSS *OSSArtifact `json:"oss,omitempty" protobuf:"bytes,8,opt,name=oss"`
}

type ArtifactRepositoryRef struct {
Expand Down Expand Up @@ -1193,6 +1196,38 @@ func (h *HTTPArtifact) HasLocation() bool {
return h != nil && h.URL != ""
}

// OSSBucket contains the access information required for interfacing with an OSS bucket
type OSSBucket struct {
// Endpoint is the hostname of the bucket endpoint
Endpoint string `json:"endpoint"`

// Bucket is the name of the bucket
Bucket string `json:"bucket"`

// AccessKeySecret is the secret selector to the bucket's access key
AccessKeySecret apiv1.SecretKeySelector `json:"accessKeySecret"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for your comments, it is updated.


// SecretKeySecret is the secret selector to the bucket's secret key
SecretKeySecret apiv1.SecretKeySelector `json:"secretKeySecret"`
}

// OSSArtifact is the location of an OSS artifact
type OSSArtifact struct {
OSSBucket `json:",inline"`

// Key is the path in the bucket where the artifact resides
Key string `json:"key"`
}

func (o *OSSArtifact) String() string {
protocol := "https"
return fmt.Sprintf("%s://%s/%s/%s", protocol, o.Endpoint, o.Bucket, o.Key)
}

func (o *OSSArtifact) HasLocation() bool {
return o != nil && o.Bucket != "" && o.Endpoint != "" && o.Key != ""
}

// ExecutorConfig holds configurations of an executor container.
type ExecutorConfig struct {
// ServiceAccountName specifies the service account name of the executor container.
Expand Down Expand Up @@ -1418,7 +1453,8 @@ func (a *Artifact) HasLocation() bool {
a.HTTP.HasLocation() ||
a.Artifactory.HasLocation() ||
a.Raw.HasLocation() ||
a.HDFS.HasLocation()
a.HDFS.HasLocation() ||
a.OSS.HasLocation()
}

// GetTemplateByName retrieves a defined template by its name
Expand Down
40 changes: 40 additions & 0 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions workflow/artifacts/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package executor

import (
"fmt"
"github.com/argoproj/argo/workflow/artifacts/oss"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/artifacts/artifactory"
Expand Down Expand Up @@ -107,5 +108,30 @@ func NewDriver(art *wfv1.Artifact, ri resource.Interface) (ArtifactDriver, error
return &raw.RawArtifactDriver{}, nil
}

if art.OSS != nil {
var accessKey string
var secretKey string

if art.OSS.AccessKeySecret.Name != "" {
accessKeyBytes, err := ri.GetSecret(art.OSS.AccessKeySecret.Name, art.OSS.AccessKeySecret.Key)
if err != nil {
return nil, err
}
accessKey = string(accessKeyBytes)
secretKeyBytes, err := ri.GetSecret(art.OSS.SecretKeySecret.Name, art.OSS.SecretKeySecret.Key)
if err != nil {
return nil, err
}
secretKey = string(secretKeyBytes)
}

driver := oss.OSSArtifactDriver{
Endpoint: art.OSS.Endpoint,
AccessKey: accessKey,
SecretKey: secretKey,
}
return &driver, nil
}

return nil, ErrUnsupportedDriver
}
78 changes: 78 additions & 0 deletions workflow/artifacts/oss/oss.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package oss

import (
alexec marked this conversation as resolved.
Show resolved Hide resolved
"time"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/aliyun/aliyun-oss-go-sdk/oss"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)

// OSSArtifactDriver is a driver for OSS
type OSSArtifactDriver struct {
Endpoint string
AccessKey string
SecretKey string
}

func (ossDriver *OSSArtifactDriver) newOSSClient() (*oss.Client, error) {
client, err := oss.New(ossDriver.Endpoint, ossDriver.AccessKey, ossDriver.SecretKey)
if err != nil {
log.Warnf("Failed to create new OSS client: %v", err)
return nil, err
}
return client, err
}

// Downloads artifacts from OSS compliant storage, e.g., downloading an artifact into local path
func (ossDriver *OSSArtifactDriver) Load(inputArtifact *wfv1.Artifact, path string) error {
err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1},
func() (bool, error) {
log.Infof("OSS Load path: %s, key: %s", path, inputArtifact.OSS.Key)
osscli, err := ossDriver.newOSSClient()
if err != nil {
log.Warnf("Failed to create new OSS client: %v", err)
return false, nil
}
bucketName := inputArtifact.OSS.Bucket
bucket, err := osscli.Bucket(bucketName)
if err != nil {
return false, err
}
objectName := inputArtifact.OSS.Key
err = bucket.GetObjectToFile(objectName, path)
if err != nil {
return false, err
}
return true, nil
})
return err
}

// Saves an artifact to OSS compliant storage, e.g., uploading a local file to OSS bucket
func (ossDriver *OSSArtifactDriver) Save(path string, outputArtifact *wfv1.Artifact) error {
err := wait.ExponentialBackoff(wait.Backoff{Duration: time.Second * 2, Factor: 2.0, Steps: 5, Jitter: 0.1},
func() (bool, error) {
log.Infof("OSS Save path: %s, key: %s", path, outputArtifact.OSS.Key)
osscli, err := ossDriver.newOSSClient()
if err != nil {
log.Warnf("Failed to create new OSS client: %v", err)
return false, nil
}
bucketName := outputArtifact.OSS.Bucket
bucket, err := osscli.Bucket(bucketName)
if err != nil {
return false, err
}
objectName := outputArtifact.OSS.Key
err = bucket.PutObjectFromFile(objectName, path)
if err != nil {
return false, err
}
return true, nil
})
return err
}
10 changes: 10 additions & 0 deletions workflow/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ type ArtifactRepository struct {
Artifactory *ArtifactoryArtifactRepository `json:"artifactory,omitempty"`
// HDFS stores artifacts in HDFS
HDFS *HDFSArtifactRepository `json:"hdfs,omitempty"`
// OSS stores artifact in a OSS-compliant object store
OSS *OSSArtifactRepository `json:"oss,omitempty"`
}

func (a *ArtifactRepository) IsArchiveLogs() bool {
Expand Down Expand Up @@ -150,6 +152,14 @@ type S3ArtifactRepository struct {
KeyPrefix string `json:"keyPrefix,omitempty"`
}

// OSSArtifactRepository defines the controller configuration for an OSS artifact repository
type OSSArtifactRepository struct {
wfv1.OSSBucket `json:",inline"`

// KeyFormat is defines the format of how to store keys. Can reference workflow variables
KeyFormat string `json:"keyFormat,omitempty"`
}

// ArtifactoryArtifactRepository defines the controller configuration for an artifactory artifact repository
type ArtifactoryArtifactRepository struct {
wfv1.ArtifactoryAuth `json:",inline"`
Expand Down
19 changes: 18 additions & 1 deletion workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ func (woc *wfOperationCtx) addArchiveLocation(pod *apiv1.Pod, tmpl *wfv1.Templat
var needLocation bool

if tmpl.ArchiveLocation != nil {
if tmpl.ArchiveLocation.S3 != nil || tmpl.ArchiveLocation.Artifactory != nil || tmpl.ArchiveLocation.HDFS != nil {
if tmpl.ArchiveLocation.S3 != nil || tmpl.ArchiveLocation.Artifactory != nil || tmpl.ArchiveLocation.HDFS != nil || tmpl.ArchiveLocation.OSS != nil {
// User explicitly set the location. nothing else to do.
return nil
}
Expand Down Expand Up @@ -888,6 +888,17 @@ func (woc *wfOperationCtx) addArchiveLocation(pod *apiv1.Pod, tmpl *wfv1.Templat
Path: hdfsLocation.PathFormat,
Force: hdfsLocation.Force,
}
} else if ossLocation := woc.artifactRepository.OSS; ossLocation != nil {
woc.log.Debugf("Setting OSS artifact repository information")
artLocationKey := ossLocation.KeyFormat
tmpl.ArchiveLocation.OSS = &wfv1.OSSArtifact{
OSSBucket: wfv1.OSSBucket{
Endpoint: ossLocation.Endpoint,
AccessKeySecret: ossLocation.AccessKeySecret,
SecretKeySecret: ossLocation.SecretKeySecret,
},
Key: artLocationKey,
}
} else {
return errors.Errorf(errors.CodeBadRequest, "controller is not configured with a default archive location")
}
Expand Down Expand Up @@ -1078,6 +1089,9 @@ func createArchiveLocationSecret(tmpl *wfv1.Template, volMap map[string]apiv1.Vo
createSecretVal(volMap, gitRepo.UsernameSecret, uniqueKeyMap)
createSecretVal(volMap, gitRepo.PasswordSecret, uniqueKeyMap)
createSecretVal(volMap, gitRepo.SSHPrivateKeySecret, uniqueKeyMap)
} else if ossRepo := tmpl.ArchiveLocation.OSS; ossRepo != nil {
createSecretVal(volMap, &ossRepo.AccessKeySecret, uniqueKeyMap)
createSecretVal(volMap, &ossRepo.SecretKeySecret, uniqueKeyMap)
}
}

Expand All @@ -1095,6 +1109,9 @@ func createSecretVolume(volMap map[string]apiv1.Volume, art wfv1.Artifact, keyMa
} else if art.HDFS != nil {
createSecretVal(volMap, art.HDFS.KrbCCacheSecret, keyMap)
createSecretVal(volMap, art.HDFS.KrbKeytabSecret, keyMap)
} else if art.OSS != nil {
createSecretVal(volMap, &art.OSS.AccessKeySecret, keyMap)
createSecretVal(volMap, &art.OSS.SecretKeySecret, keyMap)
}
}

Expand Down
4 changes: 4 additions & 0 deletions workflow/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ func (we *WorkflowExecutor) saveArtifact(mainCtrID string, art *wfv1.Artifact) e
shallowCopy := *we.Template.ArchiveLocation.HDFS
art.HDFS = &shallowCopy
art.HDFS.Path = path.Join(art.HDFS.Path, fileName)
} else if we.Template.ArchiveLocation.OSS != nil {
shallowCopy := *we.Template.ArchiveLocation.OSS
art.OSS = &shallowCopy
art.OSS.Key = path.Join(art.OSS.Key, fileName)
} else {
return errors.Errorf(errors.CodeBadRequest, "Unable to determine path to store %s. Archive location provided no information", art.Name)
}
Expand Down