Skip to content

Commit

Permalink
merge upstream
Browse files Browse the repository at this point in the history
Signed-off-by: Mecoli1219 <michaellai901026@gmail.com>
  • Loading branch information
Mecoli1219 committed Feb 3, 2025
2 parents 0127d56 + 588a619 commit cb67237
Show file tree
Hide file tree
Showing 120 changed files with 4,545 additions and 1,334 deletions.
4 changes: 2 additions & 2 deletions README.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion boilerplate/flyte/golang_support_tools/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ require (
golang.org/x/exp v0.0.0-20240904232852-e7e105dedf7e // indirect
golang.org/x/exp/typeparams v0.0.0-20240314144324-c7f7c6466f7f // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/oauth2 v0.22.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions boilerplate/flyte/golang_support_tools/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,8 @@ golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.22.0 h1:BzDx2FehcG7jJwgWLELCdmLuxk2i+x9UDpSiss2u0ZA=
golang.org/x/oauth2 v0.22.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
Expand Down
2 changes: 1 addition & 1 deletion datacatalog/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ require (
go.opentelemetry.io/proto/otlp v1.1.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20240325151524-a685a6edb6d8 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/net v0.33.0 // indirect
golang.org/x/oauth2 v0.18.0 // indirect
golang.org/x/sync v0.10.0 // indirect
golang.org/x/sys v0.28.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions datacatalog/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -496,8 +496,8 @@ golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down
3 changes: 3 additions & 0 deletions docs/deployment/agents/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ If you are using a managed deployment of Flyte, you will need to contact your de
- Configuring your Flyte deployment for the SnowFlake agent.
* - {ref}`OpenAI Batch <deployment-agent-setup-openai-batch>`
- Submit requests to OpenAI GPT models for asynchronous batch processing.
* - {ref}`LinkedIn K8s Service Batch <deployment-agent-setup-k8sservice>`
- Configuring your Flyte deployment for the K8s service agent.
```

```{toctree}
Expand All @@ -49,4 +51,5 @@ sagemaker_inference
sensor
snowflake
openai_batch
k8sservice
```
179 changes: 179 additions & 0 deletions docs/deployment/agents/k8sservice.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
.. _deployment-agent-setup-k8sservice:

Kubernetes (K8s) Service Agent
==============================

The Kubernetes (K8s) Data Service Agent enables machine learning (ML) users to efficiently handle non-training tasks—such as data loading, caching, and processing—concurrently with training jobs in Kubernetes clusters.
This capability is particularly valuable in deep learning applications, such as those in Graph Neural Networks (GNNs).

This guide offers a comprehensive overview of setting up the K8s Data Service Agent within your Flyte deployment.

Spin up a cluster
-----------------

.. tabs::

.. group-tab:: Flyte binary

You can spin up a demo cluster using the following command:

.. code-block:: bash
flytectl demo start
Or install Flyte using the :ref:`flyte-binary helm chart <deployment-deployment-cloud-simple>`.

.. group-tab:: Flyte core

If you've installed Flyte using the
`flyte-core helm chart <https://github.com/flyteorg/flyte/tree/master/charts/flyte-core>`__, please ensure:

* You have the correct kubeconfig and have selected the correct Kubernetes context.
* You have configured the correct flytectl settings in ``~/.flyte/config.yaml``.

.. note::

Add the Flyte chart repo to Helm if you're installing via the Helm charts.

.. code-block:: bash
helm repo add flyteorg https://flyteorg.github.io/flyte
Specify agent configuration
----------------------------

Enable the K8s service agent by adding the following config to the relevant YAML file(s):

.. code-block:: yaml
tasks:
task-plugins:
enabled-plugins:
- agent-service
default-for-task-types:
- dataservicetask: agent-service
.. code-block:: yaml
plugins:
agent-service:
agents:
k8sservice-agent:
endpoint: <AGENT_ENDPOINT>
insecure: true
agentForTaskTypes:
- dataservicetask: k8sservice-agent
- sensor: k8sservice-agent
Substitute ``<AGENT_ENDPOINT>`` with the endpoint of your MMCloud agent.

Setup the RBAC
--------------

The K8s Data Service Agent will create a StatefulSet and expose the Service endpoint for the StatefulSet pods.
RBAC needs to be set up to allow the K8s Data Service Agent to perform CRUD operations on the StatefulSet and Service.

The role `flyte-flyteagent-role` set up:

.. code-block:: yaml
# Example of the role/binding set up for the data service to create/update/delete resources in the sandbox flyte namespace
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: flyte-flyteagent-role
namespace: flyte
labels:
app.kubernetes.io/name: flyteagent
app.kubernetes.io/instance: flyte
rules:
- apiGroups:
- apps
resources:
- statefulsets
- statefulsets/status
- statefulsets/scale
- statefulsets/finalizers
verbs:
- get
- list
- watch
- create
- update
- delete
- patch
- apiGroups:
- ""
resources:
- pods
- configmaps
- serviceaccounts
- secrets
- pods/exec
- pods/log
- pods/status
- services
verbs:
- '*'
The binding `flyte-flyteagent-rolebinding` for the role `flyte-flyteagent-role`

.. code-block:: yaml
# Example of the role/binding set up for the data service to create/update/delete resources in the sandbox flyte namespace
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: flyte-flyteagent-rolebinding
namespace: flyte
labels:
app.kubernetes.io/name: flyteagent
app.kubernetes.io/instance: flyte
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: flyte-flyteagent-role
subjects:
- kind: ServiceAccount
name: flyteagent
namespace: flyte
Upgrade the deployment
----------------------

.. tabs::

.. group-tab:: Flyte binary

.. tabs::

.. group-tab:: Demo cluster

.. code-block:: bash
kubectl rollout restart deployment flyte-sandbox -n flyte
.. group-tab:: Helm chart

.. code-block:: bash
helm upgrade <RELEASE_NAME> flyteorg/flyte-binary -n <YOUR_NAMESPACE> --values <YOUR_YAML_FILE>
Replace ``<RELEASE_NAME>`` with the name of your release (e.g., ``flyte-backend``),
``<YOUR_NAMESPACE>`` with the name of your namespace (e.g., ``flyte``),
and ``<YOUR_YAML_FILE>`` with the name of your YAML file.

.. group-tab:: Flyte core

.. code-block::
helm upgrade <RELEASE_NAME> flyte/flyte-core -n <YOUR_NAMESPACE> --values values-override.yaml
Replace ``<RELEASE_NAME>`` with the name of your release (e.g., ``flyte``)
and ``<YOUR_NAMESPACE>`` with the name of your namespace (e.g., ``flyte``).

Wait for the upgrade to complete. You can check the status of the deployment pods by running the following command:

.. code-block::
kubectl get pods -n flyte
41 changes: 21 additions & 20 deletions flyteadmin/dataproxy/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
Expand All @@ -33,8 +34,8 @@ func TestNewService(t *testing.T) {
dataStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)

nodeExecutionManager := &mocks.MockNodeExecutionManager{}
taskExecutionManager := &mocks.MockTaskExecutionManager{}
nodeExecutionManager := &mocks.NodeExecutionInterface{}
taskExecutionManager := &mocks.TaskExecutionInterface{}
s, err := NewService(config.DataProxyConfig{
Upload: config.DataProxyUploadConfig{},
}, nodeExecutionManager, dataStore, taskExecutionManager)
Expand All @@ -59,8 +60,8 @@ func Test_createStorageLocation(t *testing.T) {
func TestCreateUploadLocation(t *testing.T) {
dataStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
assert.NoError(t, err)
nodeExecutionManager := &mocks.MockNodeExecutionManager{}
taskExecutionManager := &mocks.MockTaskExecutionManager{}
nodeExecutionManager := &mocks.NodeExecutionInterface{}
taskExecutionManager := &mocks.TaskExecutionInterface{}
s, err := NewService(config.DataProxyConfig{}, nodeExecutionManager, dataStore, taskExecutionManager)
assert.NoError(t, err)
t.Run("No project/domain", func(t *testing.T) {
Expand Down Expand Up @@ -113,8 +114,8 @@ func TestCreateUploadLocationMore(t *testing.T) {
}

assert.NoError(t, err)
nodeExecutionManager := &mocks.MockNodeExecutionManager{}
taskExecutionManager := &mocks.MockTaskExecutionManager{}
nodeExecutionManager := &mocks.NodeExecutionInterface{}
taskExecutionManager := &mocks.TaskExecutionInterface{}
s, err := NewService(config.DataProxyConfig{}, nodeExecutionManager, &ds, taskExecutionManager)
assert.NoError(t, err)

Expand Down Expand Up @@ -171,15 +172,15 @@ func (t testMetadata) Exists() bool {

func TestCreateDownloadLink(t *testing.T) {
dataStore := commonMocks.GetMockStorageClient()
nodeExecutionManager := &mocks.MockNodeExecutionManager{}
nodeExecutionManager.SetGetNodeExecutionFunc(func(ctx context.Context, request *admin.NodeExecutionGetRequest) (*admin.NodeExecution, error) {
nodeExecutionManager := &mocks.NodeExecutionInterface{}
nodeExecutionManager.EXPECT().GetNodeExecution(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *admin.NodeExecutionGetRequest) (*admin.NodeExecution, error) {
return &admin.NodeExecution{
Closure: &admin.NodeExecutionClosure{
DeckUri: "s3://something/something",
},
}, nil
})
taskExecutionManager := &mocks.MockTaskExecutionManager{}
taskExecutionManager := &mocks.TaskExecutionInterface{}

s, err := NewService(config.DataProxyConfig{Download: config.DataProxyDownloadConfig{MaxExpiresIn: stdlibConfig.Duration{Duration: time.Hour}}}, nodeExecutionManager, dataStore, taskExecutionManager)
assert.NoError(t, err)
Expand Down Expand Up @@ -262,8 +263,8 @@ func TestCreateDownloadLink(t *testing.T) {

func TestCreateDownloadLocation(t *testing.T) {
dataStore := commonMocks.GetMockStorageClient()
nodeExecutionManager := &mocks.MockNodeExecutionManager{}
taskExecutionManager := &mocks.MockTaskExecutionManager{}
nodeExecutionManager := &mocks.NodeExecutionInterface{}
taskExecutionManager := &mocks.TaskExecutionInterface{}
s, err := NewService(config.DataProxyConfig{Download: config.DataProxyDownloadConfig{MaxExpiresIn: stdlibConfig.Duration{Duration: time.Hour}}}, nodeExecutionManager, dataStore, taskExecutionManager)
assert.NoError(t, err)

Expand Down Expand Up @@ -300,8 +301,8 @@ func TestCreateDownloadLocation(t *testing.T) {

func TestService_GetData(t *testing.T) {
dataStore := commonMocks.GetMockStorageClient()
nodeExecutionManager := &mocks.MockNodeExecutionManager{}
taskExecutionManager := &mocks.MockTaskExecutionManager{}
nodeExecutionManager := &mocks.NodeExecutionInterface{}
taskExecutionManager := &mocks.TaskExecutionInterface{}
s, err := NewService(config.DataProxyConfig{}, nodeExecutionManager, dataStore, taskExecutionManager)
assert.NoError(t, err)

Expand Down Expand Up @@ -340,15 +341,15 @@ func TestService_GetData(t *testing.T) {
},
}

nodeExecutionManager.SetGetNodeExecutionDataFunc(
nodeExecutionManager.EXPECT().GetNodeExecutionData(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, request *admin.NodeExecutionGetDataRequest) (*admin.NodeExecutionGetDataResponse, error) {
return &admin.NodeExecutionGetDataResponse{
FullInputs: inputsLM,
FullOutputs: outputsLM,
}, nil
},
)
taskExecutionManager.SetListTaskExecutionsCallback(func(ctx context.Context, request *admin.TaskExecutionListRequest) (*admin.TaskExecutionList, error) {
taskExecutionManager.EXPECT().ListTaskExecutions(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *admin.TaskExecutionListRequest) (*admin.TaskExecutionList, error) {
return &admin.TaskExecutionList{
TaskExecutions: []*admin.TaskExecution{
{
Expand All @@ -374,7 +375,7 @@ func TestService_GetData(t *testing.T) {
},
}, nil
})
taskExecutionManager.SetGetTaskExecutionDataCallback(func(ctx context.Context, request *admin.TaskExecutionGetDataRequest) (*admin.TaskExecutionGetDataResponse, error) {
taskExecutionManager.EXPECT().GetTaskExecutionData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *admin.TaskExecutionGetDataRequest) (*admin.TaskExecutionGetDataResponse, error) {
return &admin.TaskExecutionGetDataResponse{
FullInputs: inputsLM,
FullOutputs: outputsLM,
Expand Down Expand Up @@ -441,13 +442,13 @@ func TestService_GetData(t *testing.T) {

func TestService_Error(t *testing.T) {
dataStore := commonMocks.GetMockStorageClient()
nodeExecutionManager := &mocks.MockNodeExecutionManager{}
taskExecutionManager := &mocks.MockTaskExecutionManager{}
nodeExecutionManager := &mocks.NodeExecutionInterface{}
taskExecutionManager := &mocks.TaskExecutionInterface{}
s, err := NewService(config.DataProxyConfig{}, nodeExecutionManager, dataStore, taskExecutionManager)
assert.NoError(t, err)

t.Run("get a working set of urls without retry attempt", func(t *testing.T) {
taskExecutionManager.SetListTaskExecutionsCallback(func(ctx context.Context, request *admin.TaskExecutionListRequest) (*admin.TaskExecutionList, error) {
taskExecutionManager.EXPECT().ListTaskExecutions(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *admin.TaskExecutionListRequest) (*admin.TaskExecutionList, error) {
return nil, errors.NewFlyteAdminErrorf(1, "not found")
})
nodeExecID := &core.NodeExecutionIdentifier{
Expand All @@ -463,7 +464,7 @@ func TestService_Error(t *testing.T) {
})

t.Run("get a working set of urls without retry attempt", func(t *testing.T) {
taskExecutionManager.SetListTaskExecutionsCallback(func(ctx context.Context, request *admin.TaskExecutionListRequest) (*admin.TaskExecutionList, error) {
taskExecutionManager.EXPECT().ListTaskExecutions(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, request *admin.TaskExecutionListRequest) (*admin.TaskExecutionList, error) {
return &admin.TaskExecutionList{
TaskExecutions: nil,
Token: "",
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
github.com/wI2L/jsondiff v0.6.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.47.0
go.opentelemetry.io/otel v1.24.0
golang.org/x/net v0.27.0
golang.org/x/net v0.33.0
golang.org/x/oauth2 v0.18.0
golang.org/x/sync v0.10.0
golang.org/x/time v0.5.0
Expand Down
4 changes: 2 additions & 2 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1543,8 +1543,8 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys=
golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE=
golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181003184128-c57b0facaced/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down
Loading

0 comments on commit cb67237

Please sign in to comment.