From 08c66b55fd879561784602804d029f0a3e085859 Mon Sep 17 00:00:00 2001 From: Barak Amar Date: Sun, 16 Aug 2020 21:35:28 +0300 Subject: [PATCH] basic gcs storage adapter no expiry or multipart upload (#485) * basic gcs storage adapter no expiry or multipart upload * fix adapter receiver name * code review changes Former-commit-id: bd4255f9154e6d1ea372aa7d3c04a888a71b1d03 --- block/gcs/adapter.go | 438 +++++++++++++++++++++++++++++++++++++++++++ block/gcs/stats.go | 31 +++ block/gcs/stream.go | 157 ++++++++++++++++ block/namespace.go | 3 + config/config.go | 49 ++++- swagger.yml | 2 +- 6 files changed, 678 insertions(+), 2 deletions(-) create mode 100644 block/gcs/adapter.go create mode 100644 block/gcs/stats.go create mode 100644 block/gcs/stream.go diff --git a/block/gcs/adapter.go b/block/gcs/adapter.go new file mode 100644 index 00000000000..1ef4a86a1b8 --- /dev/null +++ b/block/gcs/adapter.go @@ -0,0 +1,438 @@ +package gcs + +import ( + "context" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + v4 "github.com/aws/aws-sdk-go/aws/signer/v4" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/treeverse/lakefs/block" + "github.com/treeverse/lakefs/logging" +) + +const ( + BlockstoreType = "gcs" + + DefaultStreamingChunkSize = 2 << 19 // 1MiB by default per chunk + DefaultStreamingChunkTimeout = time.Second * 1 // if we haven't read DefaultStreamingChunkSize by this duration, write whatever we have as a chunk +) + +var ( + ErrGCS = errors.New("gcs error") + ErrMissingETag = fmt.Errorf("%w: missing ETag", ErrGCS) + ErrInventoryNotImplemented = errors.New("inventory feature not implemented for gcs storage adapter") +) + +func resolveNamespace(obj block.ObjectPointer) (block.QualifiedKey, error) { + qualifiedKey, err := block.ResolveNamespace(obj.StorageNamespace, obj.Identifier) + if err != nil { + return qualifiedKey, err + } + if qualifiedKey.StorageType != block.StorageTypeGCS { + return qualifiedKey, block.ErrInvalidNamespace + } + return qualifiedKey, nil +} + +type Adapter struct { + s3 s3iface.S3API + httpClient *http.Client + ctx context.Context + uploadIDTranslator block.UploadIDTranslator + streamingChunkSize int + streamingChunkTimeout time.Duration +} + +func WithHTTPClient(c *http.Client) func(a *Adapter) { + return func(a *Adapter) { + a.httpClient = c + } +} + +func WithStreamingChunkSize(sz int) func(a *Adapter) { + return func(a *Adapter) { + a.streamingChunkSize = sz + } +} + +func WithStreamingChunkTimeout(d time.Duration) func(a *Adapter) { + return func(a *Adapter) { + a.streamingChunkTimeout = d + } +} + +func WithContext(ctx context.Context) func(a *Adapter) { + return func(a *Adapter) { + a.ctx = ctx + } +} + +func WithTranslator(t block.UploadIDTranslator) func(a *Adapter) { + return func(a *Adapter) { + a.uploadIDTranslator = t + } +} + +func NewAdapter(s3 s3iface.S3API, opts ...func(a *Adapter)) block.Adapter { + a := &Adapter{ + s3: s3, + httpClient: http.DefaultClient, + ctx: context.Background(), + uploadIDTranslator: &block.NoOpTranslator{}, + streamingChunkSize: DefaultStreamingChunkSize, + streamingChunkTimeout: DefaultStreamingChunkTimeout, + } + for _, opt := range opts { + opt(a) + } + return a +} + +func (s *Adapter) WithContext(ctx context.Context) block.Adapter { + return &Adapter{ + s3: s.s3, + httpClient: s.httpClient, + ctx: ctx, + uploadIDTranslator: s.uploadIDTranslator, + streamingChunkSize: s.streamingChunkSize, + streamingChunkTimeout: s.streamingChunkTimeout, + } +} + +func (s *Adapter) log() logging.Logger { + return logging.FromContext(s.ctx) +} + +// work around, because put failed with trying to create symlinks +func (s *Adapter) PutWithoutStream(obj block.ObjectPointer, sizeBytes int64, reader io.Reader, opts block.PutOpts) error { + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return err + } + putObject := s3.PutObjectInput{ + Body: aws.ReadSeekCloser(reader), + Bucket: aws.String(qualifiedKey.StorageNamespace), + Key: aws.String(qualifiedKey.Key), + StorageClass: opts.StorageClass, + } + _, err = s.s3.PutObject(&putObject) + return err +} + +func (s *Adapter) Put(obj block.ObjectPointer, sizeBytes int64, reader io.Reader, opts block.PutOpts) error { + var err error + defer reportMetrics("Put", time.Now(), &sizeBytes, &err) + + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return err + } + putObject := s3.PutObjectInput{ + Bucket: aws.String(qualifiedKey.StorageNamespace), + Key: aws.String(qualifiedKey.Key), + StorageClass: opts.StorageClass, + } + sdkRequest, _ := s.s3.PutObjectRequest(&putObject) + _, err = s.streamRequestData(sdkRequest, sizeBytes, reader) + return err +} + +func (s *Adapter) UploadPart(obj block.ObjectPointer, sizeBytes int64, reader io.Reader, uploadID string, partNumber int64) (string, error) { + var err error + defer reportMetrics("UploadPart", time.Now(), &sizeBytes, &err) + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return "", err + } + uploadID = s.uploadIDTranslator.TranslateUploadID(uploadID) + uploadPartObject := s3.UploadPartInput{ + Bucket: aws.String(qualifiedKey.StorageNamespace), + Key: aws.String(qualifiedKey.Key), + PartNumber: aws.Int64(partNumber), + UploadId: aws.String(uploadID), + } + sdkRequest, _ := s.s3.UploadPartRequest(&uploadPartObject) + etag, err := s.streamRequestData(sdkRequest, sizeBytes, reader) + if err != nil { + return "", err + } + if etag == "" { + return "", ErrMissingETag + } + return etag, nil +} + +func (s *Adapter) streamRequestData(sdkRequest *request.Request, sizeBytes int64, reader io.Reader) (string, error) { + sigTime := time.Now() + log := s.log().WithField("operation", "PutObject") + + if err := sdkRequest.Build(); err != nil { + return "", err + } + + req, err := http.NewRequest(sdkRequest.HTTPRequest.Method, sdkRequest.HTTPRequest.URL.String(), nil) + if err != nil { + return "", err + } + req.Header.Set("Content-Encoding", StreamingContentEncoding) + req.Header.Set("Transfer-Encoding", "chunked") + req.Header.Set("x-amz-content-sha256", StreamingSha256) + req.Header.Set("x-amz-decoded-content-length", fmt.Sprintf("%d", sizeBytes)) + req.Header.Set("Expect", "100-Continue") + + baseSigner := v4.NewSigner(sdkRequest.Config.Credentials) + + _, err = baseSigner.Sign(req, nil, s3.ServiceName, aws.StringValue(sdkRequest.Config.Region), sigTime) + if err != nil { + log.WithError(err).Error("failed to sign request") + return "", err + } + + sigSeed, err := v4.GetSignedRequestSignature(req) + if err != nil { + log.WithError(err).Error("failed to get seed signature") + return "", err + } + + req.Body = ioutil.NopCloser(&StreamingReader{ + Reader: reader, + Size: int(sizeBytes), + Time: sigTime, + StreamSigner: v4.NewStreamSigner( + aws.StringValue(sdkRequest.Config.Region), + s3.ServiceName, + sigSeed, + sdkRequest.Config.Credentials, + ), + ChunkSize: s.streamingChunkSize, + ChunkTimeout: s.streamingChunkTimeout, + }) + resp, err := s.httpClient.Do(req) + if err != nil { + log.WithError(err). + WithField("url", sdkRequest.HTTPRequest.URL.String()). + Error("error making request request") + return "", err + } + + defer func() { + _ = resp.Body.Close() + }() + + if resp.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + err = fmt.Errorf("%w: %d %s (unknown)", ErrGCS, resp.StatusCode, resp.Status) + } else { + err = fmt.Errorf("%w: %s", ErrGCS, body) + } + log.WithError(err). + WithField("url", sdkRequest.HTTPRequest.URL.String()). + WithField("status_code", resp.StatusCode). + Error("bad GCS PutObject response") + return "", err + } + etag := resp.Header.Get("Etag") + // error in case etag is missing - note that empty header value will cause the same error + if len(etag) == 0 { + return "", ErrMissingETag + } + return etag, nil +} + +func (s *Adapter) Get(obj block.ObjectPointer, _ int64) (io.ReadCloser, error) { + var err error + var sizeBytes int64 + defer reportMetrics("Get", time.Now(), &sizeBytes, &err) + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return nil, err + } + log := s.log().WithField("operation", "GetObject") + getObjectInput := s3.GetObjectInput{ + Bucket: aws.String(qualifiedKey.StorageNamespace), + Key: aws.String(qualifiedKey.Key), + } + objectOutput, err := s.s3.GetObject(&getObjectInput) + if err != nil { + log.WithError(err).Error("failed to get GCS object") + return nil, err + } + sizeBytes = *objectOutput.ContentLength + return objectOutput.Body, nil +} + +func (s *Adapter) GetRange(obj block.ObjectPointer, startPosition int64, endPosition int64) (io.ReadCloser, error) { + var err error + var sizeBytes int64 + defer reportMetrics("GetRange", time.Now(), &sizeBytes, &err) + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return nil, err + } + log := s.log().WithField("operation", "GetObjectRange") + getObjectInput := s3.GetObjectInput{ + Bucket: aws.String(qualifiedKey.StorageNamespace), + Key: aws.String(qualifiedKey.Key), + Range: aws.String(fmt.Sprintf("bytes=%d-%d", startPosition, endPosition)), + } + objectOutput, err := s.s3.GetObject(&getObjectInput) + if err != nil { + log.WithError(err).WithFields(logging.Fields{ + "start_position": startPosition, + "end_position": endPosition, + }).Error("failed to get GCS object range") + return nil, err + } + sizeBytes = *objectOutput.ContentLength + return objectOutput.Body, nil +} + +func (s *Adapter) GetProperties(obj block.ObjectPointer) (block.Properties, error) { + var err error + defer reportMetrics("GetProperties", time.Now(), nil, &err) + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return block.Properties{}, err + } + headObjectParams := &s3.HeadObjectInput{ + Bucket: aws.String(qualifiedKey.StorageNamespace), + Key: aws.String(qualifiedKey.Key), + } + s3Props, err := s.s3.HeadObject(headObjectParams) + if err != nil { + return block.Properties{}, err + } + return block.Properties{StorageClass: s3Props.StorageClass}, nil +} + +func (s *Adapter) Remove(obj block.ObjectPointer) error { + var err error + defer reportMetrics("Remove", time.Now(), nil, &err) + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return err + } + deleteObjectParams := &s3.DeleteObjectInput{ + Bucket: aws.String(qualifiedKey.StorageNamespace), + Key: aws.String(qualifiedKey.Key), + } + _, err = s.s3.DeleteObject(deleteObjectParams) + if err != nil { + s.log().WithError(err).Error("failed to delete GCS object") + return err + } + err = s.s3.WaitUntilObjectNotExists(&s3.HeadObjectInput{ + Bucket: aws.String(qualifiedKey.StorageNamespace), + Key: aws.String(qualifiedKey.Key), + }) + return err +} + +func (s *Adapter) CreateMultiPartUpload(obj block.ObjectPointer, r *http.Request, opts block.CreateMultiPartUploadOpts) (string, error) { + var err error + defer reportMetrics("CreateMultiPartUpload", time.Now(), nil, &err) + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return "", err + } + input := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(qualifiedKey.StorageNamespace), + Key: aws.String(qualifiedKey.Key), + ContentType: aws.String(""), + StorageClass: opts.StorageClass, + } + resp, err := s.s3.CreateMultipartUpload(input) + if err != nil { + return "", err + } + uploadID := *resp.UploadId + uploadID = s.uploadIDTranslator.SetUploadID(uploadID) + s.log().WithFields(logging.Fields{ + "upload_id": *resp.UploadId, + "translated_upload_id": uploadID, + "qualified_ns": qualifiedKey.StorageNamespace, + "qualified_key": qualifiedKey.Key, + "key": obj.Identifier, + }).Debug("created multipart upload") + return uploadID, err +} + +func (s *Adapter) AbortMultiPartUpload(obj block.ObjectPointer, uploadID string) error { + var err error + defer reportMetrics("AbortMultiPartUpload", time.Now(), nil, &err) + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return err + } + uploadID = s.uploadIDTranslator.TranslateUploadID(uploadID) + input := &s3.AbortMultipartUploadInput{ + Bucket: aws.String(qualifiedKey.StorageNamespace), + Key: aws.String(qualifiedKey.Key), + UploadId: aws.String(uploadID), + } + _, err = s.s3.AbortMultipartUpload(input) + s.uploadIDTranslator.RemoveUploadID(uploadID) + s.log().WithFields(logging.Fields{ + "upload_id": uploadID, + "qualified_ns": qualifiedKey.StorageNamespace, + "qualified_key": qualifiedKey.Key, + "key": obj.Identifier, + }).Debug("aborted multipart upload") + return err +} + +func (s *Adapter) CompleteMultiPartUpload(obj block.ObjectPointer, uploadID string, multipartList *block.MultipartUploadCompletion) (*string, int64, error) { + var err error + defer reportMetrics("CompleteMultiPartUpload", time.Now(), nil, &err) + qualifiedKey, err := resolveNamespace(obj) + if err != nil { + return nil, 0, err + } + cmpu := &s3.CompletedMultipartUpload{Parts: multipartList.Part} + translatedUploadID := s.uploadIDTranslator.TranslateUploadID(uploadID) + input := &s3.CompleteMultipartUploadInput{ + Bucket: aws.String(qualifiedKey.StorageNamespace), + Key: aws.String(qualifiedKey.Key), + UploadId: aws.String(translatedUploadID), + MultipartUpload: cmpu, + } + lg := s.log().WithFields(logging.Fields{ + "upload_id": uploadID, + "translated_upload_id": translatedUploadID, + "qualified_ns": qualifiedKey.StorageNamespace, + "qualified_key": qualifiedKey.Key, + "key": obj.Identifier, + }) + resp, err := s.s3.CompleteMultipartUpload(input) + + if err != nil { + lg.WithError(err).Error("CompleteMultipartUpload failed") + return nil, -1, err + } + lg.Debug("completed multipart upload") + s.uploadIDTranslator.RemoveUploadID(translatedUploadID) + headInput := &s3.HeadObjectInput{Bucket: &qualifiedKey.StorageNamespace, Key: &qualifiedKey.Key} + headResp, err := s.s3.HeadObject(headInput) + if err != nil { + return nil, -1, err + } else { + return resp.ETag, *headResp.ContentLength, err + } +} + +func (s *Adapter) ValidateConfiguration(_ string) error { + return nil +} + +func (s *Adapter) GenerateInventory(_ context.Context, _ logging.Logger, _ string) (block.Inventory, error) { + return nil, ErrInventoryNotImplemented +} diff --git a/block/gcs/stats.go b/block/gcs/stats.go new file mode 100644 index 00000000000..00b6e0ad9d1 --- /dev/null +++ b/block/gcs/stats.go @@ -0,0 +1,31 @@ +package gcs + +import ( + "strconv" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var durationHistograms = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "gcs_operation_duration_seconds", + Help: "durations of outgoing gcs operations", + }, + []string{"operation", "error"}) + +var requestSizeHistograms = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "gcs_operation_size_bytes", + Help: "handled sizes of outgoing gcs operations", + Buckets: prometheus.ExponentialBuckets(1, 10, 10), + }, []string{"operation", "error"}) + +func reportMetrics(operation string, start time.Time, sizeBytes *int64, err *error) { + isErrStr := strconv.FormatBool(*err != nil) + durationHistograms.WithLabelValues(operation, isErrStr).Observe(time.Since(start).Seconds()) + if sizeBytes != nil { + requestSizeHistograms.WithLabelValues(operation, isErrStr).Observe(float64(*sizeBytes)) + } +} diff --git a/block/gcs/stream.go b/block/gcs/stream.go new file mode 100644 index 00000000000..69a04ae9979 --- /dev/null +++ b/block/gcs/stream.go @@ -0,0 +1,157 @@ +package gcs + +import ( + "bytes" + "encoding/hex" + "errors" + "fmt" + "io" + "time" + + "github.com/treeverse/lakefs/logging" + + v4 "github.com/aws/aws-sdk-go/aws/signer/v4" +) + +const ( + StreamingSha256 = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" + StreamingContentEncoding = "aws-chunked" +) + +type StreamingReader struct { + Reader io.Reader + Size int + StreamSigner *v4.StreamSigner + Time time.Time + ChunkSize int + ChunkTimeout time.Duration + + currentChunk io.Reader + totalRead int + eof bool +} + +func chunkBoundary(signature []byte, length int) []byte { + return []byte(fmt.Sprintf("%x;chunk-signature=%s\r\n", length, hex.EncodeToString(signature))) +} + +func isEOF(err error) bool { + return err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) +} + +func (s *StreamingReader) GetLastChunk() []byte { + res := make([]byte, 0) + sig, _ := s.StreamSigner.GetSignature([]byte{}, []byte{}, s.Time) + lastBoundary := chunkBoundary(sig, 0) + res = append(res, lastBoundary...) + res = append(res, '\r', '\n') // additional \r\n after the last boundary + return res +} + +// ReadAllWithTimeout is taken from io.ReadAtLeast and adapted to support a timeout +func ReadAllWithTimeout(r io.Reader, buf []byte, timeout time.Duration) (n int, err error) { + desired := len(buf) + + lg := logging.Default().WithFields(logging.Fields{ + "timeout": timeout, + "desired_size": desired, + }) + + start := time.Now() + timedOut := false + for n < desired && err == nil { + var nn int + nn, err = r.Read(buf[n:]) + n += nn + + if time.Since(start) > timeout { + timedOut = true + break + } + } + if n >= desired { + err = nil + } else if n > 0 && err == io.EOF { + err = io.ErrUnexpectedEOF + } + if err == nil && timedOut { + err = ErrReaderTimeout + lg.WithField("n", n).Warn("duration passed, reader timed out") + } + return +} + +var ErrReaderTimeout = errors.New("reader timeout") + +func (s *StreamingReader) readNextChunk() error { + buf := make([]byte, s.ChunkSize) + n, err := ReadAllWithTimeout(s.Reader, buf, s.ChunkTimeout) + s.totalRead += n + buf = buf[:n] + if err != nil && !isEOF(err) && !errors.Is(err, ErrReaderTimeout) { + // actual error happened + return err + } + if n == 0 { + if s.currentChunk == nil { + s.currentChunk = bytes.NewBuffer(s.GetLastChunk()) + } + return io.EOF + } + sig, sigErr := s.StreamSigner.GetSignature([]byte{}, buf, s.Time) + if sigErr != nil { + return sigErr + } + + buf = append(buf, '\r', '\n') // additional \r\n after the content + boundary := chunkBoundary(sig, n) + if isEOF(err) || s.totalRead == s.Size { + // we're done with the upstream Reader, let's write one last chunk boundary. + buf = append(buf, s.GetLastChunk()...) + } + buf = append(boundary, buf...) + s.currentChunk = bytes.NewBuffer(buf) + if s.totalRead == s.Size { + err = io.EOF + } + if errors.Is(err, ErrReaderTimeout) { + return nil // a slow reader shouldn't fail. + } + return err +} + +func (s *StreamingReader) Read(p []byte) (int, error) { + if s.eof { + return 0, io.EOF + } + + var nerr error + var currentN int + if s.currentChunk == nil { + nerr = s.readNextChunk() + if nerr != nil && !isEOF(nerr) { + return currentN, nerr + } + } + + for currentN < len(p) { + n, err := s.currentChunk.Read(p[currentN:]) + currentN += n + if err != nil && !isEOF(err) { + return currentN, err + } + if isEOF(err) { // we drained the current chunk + if isEOF(nerr) { // no more chunks to read + s.eof = true + return currentN, io.EOF + } + // otherwise, we read the entire chunk, let's fill it back up (if n < size of p) + nerr = s.readNextChunk() + if nerr != nil && !isEOF(nerr) { + return currentN, nerr // something bad happened when reading the next chunk + } + } + } + + return currentN, nil +} diff --git a/block/namespace.go b/block/namespace.go index 890259ff0cd..e36a30f7c8f 100644 --- a/block/namespace.go +++ b/block/namespace.go @@ -13,6 +13,7 @@ const ( StorageTypeMem = iota StorageTypeLocal StorageTypeS3 + StorageTypeGCS ) var ( @@ -34,6 +35,8 @@ func GetStorageType(namespaceURL *url.URL) (StorageType, error) { return StorageTypeMem, nil case "local": return StorageTypeLocal, nil + case "gcs": + return StorageTypeGCS, nil default: return st, fmt.Errorf("%s: %w", namespaceURL.Scheme, ErrInvalidNamespace) } diff --git a/config/config.go b/config/config.go index 8eaae087208..165e6de79d9 100644 --- a/config/config.go +++ b/config/config.go @@ -7,6 +7,8 @@ import ( "strings" "time" + "github.com/treeverse/lakefs/block/gcs" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" @@ -38,6 +40,10 @@ const ( DefaultBlockStoreS3StreamingChunkSize = 2 << 19 // 1MiB by default per chunk DefaultBlockStoreS3StreamingChunkTimeout = time.Second * 1 // or 1 seconds, whatever comes first + DefaultBlockStoreGCSS3Endpoint = "https://storage.googleapis.com" + DefaultBlockStoreGCSStreamingChunkSize = 2 << 19 // 1MiB by default per chunk + DefaultBlockStoreGCSStreamingChunkTimeout = time.Second * 1 // or 1 seconds, whatever comes first + DefaultAuthCacheEnabled = true DefaultAuthCacheSize = 1024 DefaultAuthCacheTTL = 20 * time.Second @@ -100,6 +106,10 @@ func setDefaults() { viper.SetDefault("gateways.s3.domain_name", DefaultS3GatewayDomainName) viper.SetDefault("gateways.s3.region", DefaultS3GatewayRegion) + viper.SetDefault("blockstore.gcs.s3_endpoint", DefaultBlockStoreGCSS3Endpoint) + viper.SetDefault("blockstore.gcs.streaming_chunk_size", DefaultBlockStoreGCSStreamingChunkSize) + viper.SetDefault("blockstore.gcs.streaming_chunk_timeout", DefaultBlockStoreGCSStreamingChunkTimeout) + viper.SetDefault("stats.enabled", DefaultStatsEnabled) viper.SetDefault("stats.address", DefaultStatsAddr) viper.SetDefault("stats.flush_interval", DefaultStatsFlushInterval) @@ -168,6 +178,25 @@ func (c *Config) GetAwsConfig() *aws.Config { return cfg } +func (c *Config) GetGCSAwsConfig() *aws.Config { + cfg := &aws.Config{ + Region: aws.String(viper.GetString("blockstore.gcs.s3_region")), + Logger: &LogrusAWSAdapter{log.WithField("sdk", "aws")}, + } + if viper.IsSet("blockstore.gcs.s3_profile") || viper.IsSet("blockstore.gcs.s3_credentials_file") { + cfg.Credentials = credentials.NewSharedCredentials( + viper.GetString("blockstore.gcs.s3_credentials_file"), + viper.GetString("blockstore.gcs.s3_profile")) + } + if viper.IsSet("blockstore.gcs.s3_credentials.access_key_id") { + cfg.Credentials = credentials.NewStaticCredentials( + viper.GetString("blockstore.gcs.s3_credentials.access_key_id"), + viper.GetString("blockstore.gcs.s3_credentials.access_secret_key"), + viper.GetString("blockstore.gcs.s3_credentials.session_token")) + } + return cfg +} + func GetAwsAccessKeyID(awsConfig *aws.Config) (string, error) { awsCredentials, err := awsConfig.Credentials.Get() if err != nil { @@ -215,6 +244,22 @@ func (c *Config) buildS3Adapter() (block.Adapter, error) { return adapter, nil } +func (c *Config) buildGCSAdapter() (block.Adapter, error) { + cfg := c.GetGCSAwsConfig() + s3Endpoint := viper.GetString("blockstore.gcs.s3_endpoint") + sess, err := session.NewSession(cfg) + if err != nil { + return nil, err + } + sess.ClientConfig(s3.ServiceName) + svc := s3.New(sess, aws.NewConfig().WithEndpoint(s3Endpoint)) + adapter := gcs.NewAdapter(svc, + gcs.WithStreamingChunkSize(viper.GetInt("blockstore.gcs.streaming_chunk_size")), + gcs.WithStreamingChunkTimeout(viper.GetDuration("blockstore.gcs.streaming_chunk_timeout"))) + log.WithFields(log.Fields{"type": "gcs"}).Info("initialized blockstore adapter") + return adapter, nil +} + func (c *Config) buildLocalAdapter() (block.Adapter, error) { localPath := viper.GetString("blockstore.local.path") location, err := homedir.Expand(localPath) @@ -247,9 +292,11 @@ func (c *Config) BuildBlockAdapter() (block.Adapter, error) { return mem.New(), nil case transient.BlockstoreType: return transient.New(), nil + case gcs.BlockstoreType: + return c.buildGCSAdapter() default: return nil, fmt.Errorf("%w '%s' please choose one of %s", - ErrInvalidBlockStoreType, blockstore, []string{local.BlockstoreType, s3a.BlockstoreType, mem.BlockstoreType, transient.BlockstoreType}) + ErrInvalidBlockStoreType, blockstore, []string{local.BlockstoreType, s3a.BlockstoreType, mem.BlockstoreType, transient.BlockstoreType, gcs.BlockstoreType}) } } diff --git a/swagger.yml b/swagger.yml index e83d0d30b73..00bc5dca916 100644 --- a/swagger.yml +++ b/swagger.yml @@ -90,7 +90,7 @@ definitions: type: string description: "Filesystem URI to store the underlying data in (i.e. 's3://my-bucket/some/path/')" example: "s3://example-bucket/" - pattern: '^(s3|mem|local|transient)://.*$' + pattern: '^(s3|gs|mem|local|transient)://.*$' default_branch: example: "master" type: string