Skip to content

Commit

Permalink
feat(pipelineloop): Support logging to Object store. Built as an exte…
Browse files Browse the repository at this point in the history
…nsion for zap. (kubeflow#806)

* Object store logging as an extension for zap.

* Do not add object store logger if it is not enabled.
  • Loading branch information
ScrapCodes authored Jan 11, 2022
1 parent ed7673c commit 50c172f
Show file tree
Hide file tree
Showing 6 changed files with 281 additions and 3 deletions.
4 changes: 2 additions & 2 deletions tekton-catalog/pipeline-loops/config/201-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get"]
resourceNames: ["config-leader-election", "config-logging", "config-observability"]
resourceNames: ["config-leader-election", "config-logging", "config-observability", "object-store-config"]
- apiGroups: ["policy"]
resources: ["podsecuritypolicies"]
resourceNames: ["tekton-pipelines"]
Expand All @@ -51,7 +51,7 @@ rules:
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get"]
resourceNames: ["config-logging", "config-observability", "config-leader-election"]
resourceNames: ["config-logging", "config-observability", "config-leader-election", "object-store-config"]
- apiGroups: [""]
resources: ["secrets"]
verbs: ["list", "watch"]
Expand Down
34 changes: 34 additions & 0 deletions tekton-catalog/pipeline-loops/config/203-object-store-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2020 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ConfigMap
metadata:
name: object-store-config
namespace: tekton-pipelines
labels:
app.kubernetes.io/instance: default
app.kubernetes.io/part-of: tekton-pipelines-loops
data:
enable: "false"
defaultBucketName: "pipelineloop-default"
ibmStyleCredentials: "false"
region: "us-south"
accessKey: "<access key>"
secretKey: "<secret key>"
# Below are IBM cloud specific credentials, available if the flag ibmStyleCredentials is true.
apiKey: "<APIKEY-dummy-1231231231-123abcdefgh>"
serviceInstanceID: "crn:v1:bluemix:public:cloud-object-storage:global:a/ID-dummy-1231231231-123abcdefgh:dummy-values::"
serviceEndpoint: "https://s3.us-south.cloud-object-storage.appdomain.cloud"
authEndpoint: "https://iam.cloud.ibm.com/identity/token"
1 change: 1 addition & 0 deletions tekton-catalog/pipeline-loops/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops
go 1.13

require (
github.com/IBM/ibm-cos-sdk-go v1.8.0
github.com/google/go-cmp v0.5.6
github.com/hashicorp/go-multierror v1.1.1
github.com/tektoncd/pipeline v0.30.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ package pipelinelooprun

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"

"github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop"
pipelineloopv1alpha1 "github.com/kubeflow/kfp-tekton/tekton-catalog/pipeline-loops/pkg/apis/pipelineloop/v1alpha1"
Expand All @@ -28,7 +32,10 @@ import (
pipelineruninformer "github.com/tektoncd/pipeline/pkg/client/injection/informers/pipeline/v1beta1/pipelinerun"
runreconciler "github.com/tektoncd/pipeline/pkg/client/injection/reconciler/pipeline/v1alpha1/run"
pipelinecontroller "github.com/tektoncd/pipeline/pkg/controller"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand All @@ -37,7 +44,7 @@ import (
// NewController instantiates a new controller.Impl from knative.dev/pkg/controller
func NewController(namespace string) func(context.Context, configmap.Watcher) *controller.Impl {
return func(ctx context.Context, cmw configmap.Watcher) *controller.Impl {

kubeclientset := kubeclient.Get(ctx)
logger := logging.FromContext(ctx)
pipelineclientset := pipelineclient.Get(ctx)
pipelineloopclientset := pipelineloopclient.Get(ctx)
Expand All @@ -46,12 +53,50 @@ func NewController(namespace string) func(context.Context, configmap.Watcher) *c
pipelineRunInformer := pipelineruninformer.Get(ctx)

c := &Reconciler{
KubeClientSet: kubeclientset,
pipelineClientSet: pipelineclientset,
pipelineloopClientSet: pipelineloopclientset,
runLister: runInformer.Lister(),
pipelineLoopLister: pipelineLoopInformer.Lister(),
pipelineRunLister: pipelineRunInformer.Lister(),
}
objectStoreLogger := Logger{
MaxSize: 1024 * 100, // TODO make it configurable via a configmap.
}
err := objectStoreLogger.LoadDefaults(ctx, kubeclientset)
if err == nil && objectStoreLogger.LogConfig.Enable {
logger.Info("Loading object store logger...")
w := zapcore.NewMultiWriteSyncer(
zapcore.AddSync(os.Stdout),
zapcore.AddSync(&objectStoreLogger),
)
core := zapcore.NewCore(
zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()),
w,
zap.InfoLevel,
)
logger := zap.New(core)
logger.Info("First log msg with object store logger.")
ctx = logging.WithLogger(ctx, logger.Sugar())

// set up SIGHUP to send logs to object store before shutdown.
signal.Ignore(syscall.SIGHUP, syscall.SIGTERM, syscall.SIGINT)
c := make(chan os.Signal, 3)
signal.Notify(c, syscall.SIGTERM)
signal.Notify(c, syscall.SIGINT)
signal.Notify(c, syscall.SIGHUP)

go func() {
for {
<-c
err = objectStoreLogger.Close()
fmt.Printf("Synced with object store... %v", err)
os.Exit(0)
}
}()
} else {
logger.Errorf("Object store logging unavailable, %v ", err)
}

impl := runreconciler.NewImpl(ctx, c, func(impl *controller.Impl) controller.Options {
return controller.Options{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package pipelinelooprun

import (
"bytes"
"context"
"fmt"
"io"
"strconv"
"sync"
"time"

"github.com/IBM/ibm-cos-sdk-go/aws"
"github.com/IBM/ibm-cos-sdk-go/aws/credentials"
"github.com/IBM/ibm-cos-sdk-go/aws/credentials/ibmiam"
"github.com/IBM/ibm-cos-sdk-go/aws/session"
"github.com/IBM/ibm-cos-sdk-go/service/s3"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/system"
)

type ObjectStoreLogConfig struct {
Enable bool
defaultBucketName string
accessKey string
secretKey string
ibmStyleCredentials bool
apiKey string
serviceInstanceID string
region string
serviceEndpoint string
authEndpoint string
client *s3.S3
}

type Logger struct {
buffer *bytes.Buffer
// When buffer reaches the size of MaxSize, it tries to sync with object store.
MaxSize int64
// Whether to compress before syncing the buffer.
Compress bool
// Current size of the buffer.
size int64
// Sync irrespective of buffer size after elapsing this interval.
SyncInterval time.Duration
mu sync.Mutex
LogConfig *ObjectStoreLogConfig
}

// ensure we always implement io.WriteCloser
var _ io.WriteCloser = (*Logger)(nil)

func (l *Logger) Write(p []byte) (n int, err error) {
l.mu.Lock()
defer l.mu.Unlock()
writeLen := int64(len(p))
if l.size+writeLen >= l.MaxSize {
if err := l.syncBuffer(); err != nil {
return 0, err
}
}
if n, err = l.buffer.Write(p); err != nil {
return n, err
}
l.size = l.size + int64(n)
return n, nil
}

func (l *Logger) syncBuffer() error {
fmt.Printf("Syncing buffer size : %d, MaxSize: %d \n", l.size, l.MaxSize)
err := l.LogConfig.writeToObjectStore(l.LogConfig.defaultBucketName,
time.Now().Format(time.RFC3339Nano), l.buffer.Bytes())
if err != nil {
return err
}
l.buffer.Reset()
l.size = 0
return nil
}

func (l *Logger) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
return l.syncBuffer()
}

func (o *ObjectStoreLogConfig) load(ctx context.Context, kubeClientSet kubernetes.Interface) error {
configMap, err := kubeClientSet.CoreV1().ConfigMaps(system.Namespace()).
Get(ctx, "object-store-config", metaV1.GetOptions{})
if err != nil {
return err
}
if o.Enable, err = strconv.ParseBool(configMap.Data["enable"]); err != nil || !o.Enable {
return err
}

if o.ibmStyleCredentials, err = strconv.ParseBool(configMap.Data["ibmStyleCredentials"]); err != nil {
return err
}

o.apiKey = configMap.Data["apiKey"]
o.accessKey = configMap.Data["accessKey"]
o.secretKey = configMap.Data["secretKey"]
o.serviceInstanceID = configMap.Data["serviceInstanceID"]
o.region = configMap.Data["region"]
o.serviceEndpoint = configMap.Data["serviceEndpoint"]
o.authEndpoint = configMap.Data["authEndpoint"]
o.defaultBucketName = configMap.Data["defaultBucketName"]
ibmCredentials := ibmiam.NewStaticCredentials(aws.NewConfig(), o.authEndpoint, o.apiKey, o.serviceInstanceID)
s3Credentials := credentials.NewStaticCredentials(o.accessKey, o.secretKey, "")
var creds *credentials.Credentials
if o.ibmStyleCredentials {
creds = ibmCredentials
} else {
creds = s3Credentials
}
// Create client config
var conf = aws.NewConfig().
WithRegion(o.region).
WithEndpoint(o.serviceEndpoint).
WithCredentials(creds).
WithS3ForcePathStyle(true)

var sess = session.Must(session.NewSession())
o.client = s3.New(sess, conf)
input := &s3.CreateBucketInput{
Bucket: aws.String(o.defaultBucketName),
}
_, err = o.client.CreateBucket(input)
if err != nil {
fmt.Printf("This error might be harmless, as the default bucket may already exist, %v\n",
err.Error())
}
return nil
}

func (o *ObjectStoreLogConfig) CreateNewBucket(bucketName string) error {
if !o.Enable || bucketName == o.defaultBucketName {
return nil
}
input := &s3.CreateBucketInput{
Bucket: aws.String(bucketName),
}
_, err := o.client.CreateBucket(input)
return err
}

func (o *ObjectStoreLogConfig) writeToObjectStore(bucketName string, key string, content []byte) error {
if !o.Enable {
return nil
}
input := s3.PutObjectInput{
Bucket: aws.String(bucketName),
Key: aws.String(key),
Body: bytes.NewReader(content),
}

_, err := o.client.PutObject(&input)
// fmt.Printf("Response from object store: %v\n", obj)
return err
}

func (l *Logger) LoadDefaults(ctx context.Context, kubeClientSet kubernetes.Interface) error {

if l.LogConfig == nil {
l.LogConfig = &ObjectStoreLogConfig{}
err := l.LogConfig.load(ctx, kubeClientSet)
if err != nil {
return err
}
if !l.LogConfig.Enable {
return fmt.Errorf("Object store logging is disabled. " +
"Please edit `object-store-config` configMap to setup logging.\n")
}
}
if l.buffer == nil {
l.buffer = new(bytes.Buffer)
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/apis"
"knative.dev/pkg/logging"
pkgreconciler "knative.dev/pkg/reconciler"
Expand Down Expand Up @@ -86,6 +87,7 @@ const (

// Reconciler implements controller.Reconciler for Configuration resources.
type Reconciler struct {
KubeClientSet kubernetes.Interface
pipelineClientSet clientset.Interface
pipelineloopClientSet pipelineloopclientset.Interface
runLister listersalpha.RunLister
Expand Down

0 comments on commit 50c172f

Please sign in to comment.