Skip to content
This repository has been archived by the owner on Oct 21, 2024. It is now read-only.

Commit

Permalink
feat: dynamic baseline flow (#24)
Browse files Browse the repository at this point in the history
* adding IsBaline field in flow premitive

* infering the baseline flow ID from base cluster topology namespace value

* gomod2nix updated

* echo services and deployments in Validate cluster resources endpoint in test

* edited the expected deployments name in the test

* renaming baseline flow ID and refactor traffic logic to adapt it to this flow ID change

* adding baseline_prefix argument in the trace-route GET /route calls

* adding baseline value in the gateway lua script

* replace prod concept with baseline in comments
  • Loading branch information
leoporoli authored Sep 16, 2024
1 parent 0469705 commit 45f1826
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 64 deletions.
11 changes: 5 additions & 6 deletions .github/workflows/ci-e2e-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ jobs:
if [ "${services}" != "cartservice frontend postgres productcatalogservice" ]; then exit 1; fi
deployments=$(curl http://localhost:8080/tenant/${tenant_id}/cluster-resources | jq -r '.deployments[].metadata.name' | tr " " "\n" | sort -g | tr "\n" " " | xargs)
echo "Deployments: $deployments"
if [ "${deployments}" != "cartservice-prod frontend-prod postgres-prod productcatalogservice-prod" ]; then exit 1; fi
if [ "${deployments}" != "cartservice-baseline frontend-baseline postgres-baseline productcatalogservice-baseline" ]; then exit 1; fi
- name: Validate topology endpoint
run: |
tenant_id=${{ steps.tenant.outputs.id }}
Expand All @@ -87,7 +86,7 @@ jobs:
tenant_id=${{ steps.tenant.outputs.id }}
deployments=$(curl http://localhost:8080/tenant/${tenant_id}/cluster-resources | jq -r '.deployments[].metadata.name' | tr " " "\n" | sort -g | tr "\n" " " | xargs)
echo "Deployments: $deployments"
if [ "${deployments}" != "cartservice-prod frontend-${flow_id} frontend-prod postgres-prod productcatalogservice-prod" ]; then exit 1; fi
if [ "${deployments}" != "cartservice-baseline frontend-baseline frontend-${flow_id} postgres-baseline productcatalogservice-baseline" ]; then exit 1; fi
KARDINAL_CLI_DEV_MODE=TRUE kardinal flow ls | grep ${flow_id}
KARDINAL_CLI_DEV_MODE=TRUE kardinal flow delete ${flow_id}
Expand All @@ -99,7 +98,7 @@ jobs:
tenant_id=${{ steps.tenant.outputs.id }}
deployments=$(curl http://localhost:8080/tenant/${tenant_id}/cluster-resources | jq -r '.deployments[].metadata.name' | tr " " "\n" | sort -g | tr "\n" " " | xargs)
echo "Deployments: $deployments"
if [ "${deployments}" != "cartservice-prod frontend-${flow_id} frontend-prod postgres-prod productcatalogservice-${flow_id} productcatalogservice-prod" ]; then exit 1; fi
if [ "${deployments}" != "cartservice-baseline frontend-baseline frontend-${flow_id} postgres-baseline productcatalogservice-baseline productcatalogservice-${flow_id}" ]; then exit 1; fi
KARDINAL_CLI_DEV_MODE=TRUE kardinal flow ls | grep ${flow_id}
KARDINAL_CLI_DEV_MODE=TRUE kardinal flow delete ${flow_id}
Expand All @@ -121,8 +120,8 @@ jobs:
- name: Delete base topology and dev flows
run: |
KARDINAL_CLI_DEV_MODE=TRUE kardinal flow delete prod
if KARDINAL_CLI_DEV_MODE=TRUE kardinal flow ls | grep prod; then echo "Topologies not deleted"; exit 1; fi
KARDINAL_CLI_DEV_MODE=TRUE kardinal flow delete baseline
if KARDINAL_CLI_DEV_MODE=TRUE kardinal flow ls | grep baseline; then echo "Topologies not deleted"; exit 1; fi
tenant_id=${{ steps.tenant.outputs.id }}
deployments=$(curl http://localhost:8080/tenant/${tenant_id}/cluster-resources | jq -r '.deployments[].metadata.name' | tr " " "\n" | sort -g | tr "\n" " " | xargs)
if [ "${deployments}" != "" ]; then echo "Deployments list not empty"; exit 1; fi
43 changes: 23 additions & 20 deletions kontrol-service/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

const (
prodFlowId = "prod"
defaultBaselineFlowId = "baseline"
)

// optional code omitted
Expand Down Expand Up @@ -67,27 +67,27 @@ func (sv *Server) GetTenantUuidFlows(_ context.Context, request api.GetTenantUui

finalTopology := flow.MergeClusterTopologies(*clusterTopology, lo.Values(allFlows))
flowHostMapping := finalTopology.GetFlowHostMapping()

resp := lo.MapToSlice(flowHostMapping, func(flowId string, flowUrls []string) apitypes.Flow {
isBaselineFlow := flowId == prodFlowId
// the baseline flow ID uses the base cluster topology namespace name
isBaselineFlow := flowId == clusterTopology.Namespace
return apitypes.Flow{FlowId: flowId, FlowUrls: flowUrls, IsBaseline: &isBaselineFlow}
})
return api.GetTenantUuidFlows200JSONResponse(resp), nil
}

func (sv *Server) PostTenantUuidDeploy(_ context.Context, request api.PostTenantUuidDeployRequestObject) (api.PostTenantUuidDeployResponseObject, error) {
logrus.Infof("deploying prod cluster for tenant '%s'", request.Uuid)
logrus.Infof("deploying baseline cluster for tenant '%s'", request.Uuid)
sv.analyticsWrapper.TrackEvent(EVENT_DEPLOY, request.Uuid)
serviceConfigs := *request.Body.ServiceConfigs
ingressConfigs := *request.Body.IngressConfigs
namespace := *request.Body.Namespace

if namespace == "" {
namespace = prodFlowId
namespace = defaultBaselineFlowId
}

flowId := prodFlowId
err, urls := applyProdOnlyFlow(sv, request.Uuid, serviceConfigs, ingressConfigs, namespace, flowId)
flowId := namespace
urls, err := applyProdOnlyFlow(sv, request.Uuid, serviceConfigs, ingressConfigs, namespace, flowId)
if err != nil {
errMsg := fmt.Sprintf("An error occurred deploying flow '%v'", flowId)
errResp := api.ErrorJSONResponse{
Expand All @@ -105,14 +105,15 @@ func (sv *Server) DeleteTenantUuidFlowFlowId(_ context.Context, request api.Dele
logrus.Infof("deleting dev flow for tenant '%s'", request.Uuid)
sv.analyticsWrapper.TrackEvent(EVENT_FLOW_DELETE, request.Uuid)

_, allFlows, _, _, _, err := getTenantTopologies(sv, request.Uuid)
baseClusterTopology, allFlows, _, _, _, err := getTenantTopologies(sv, request.Uuid)
if err != nil {
resourceType := "tenant"
missing := api.NotFoundJSONResponse{ResourceType: resourceType, Id: request.Uuid}
return api.DeleteTenantUuidFlowFlowId404JSONResponse{NotFoundJSONResponse: missing}, nil
}

if request.FlowId == prodFlowId {
// the baseline flow ID uses the base cluster topology namespace name
if request.FlowId == baseClusterTopology.Namespace {
// We received a request to delete the base topology, so we do that + the flows
err = deleteTenantTopologies(sv, request.Uuid)
if err != nil {
Expand Down Expand Up @@ -393,48 +394,49 @@ func (sv *Server) PostTenantUuidTemplatesCreate(_ context.Context, request api.P
}

// ============================================================================================================
func applyProdOnlyFlow(sv *Server, tenantUuidStr string, serviceConfigs []apitypes.ServiceConfig, ingressConfigs []apitypes.IngressConfig, namespace string, flowID string) (error, []string) {
// apply the baseline flow which can be also called prod flow which was the first name used
func applyProdOnlyFlow(sv *Server, tenantUuidStr string, serviceConfigs []apitypes.ServiceConfig, ingressConfigs []apitypes.IngressConfig, namespace string, flowID string) ([]string, error) {
clusterTopology, err := engine.GenerateProdOnlyCluster(flowID, serviceConfigs, ingressConfigs, namespace)
if err != nil {
return err, []string{}
return []string{}, err
}

tenant, err := sv.db.GetOrCreateTenant(tenantUuidStr)
if err != nil {
logrus.Errorf("an error occured while getting the tenant %s\n: '%v'", tenantUuidStr, err.Error())
return err, nil
return nil, err
}

clusterTopologyJson, err := json.Marshal(clusterTopology)
if err != nil {
logrus.Errorf("an error occured while encoding the cluster topology for tenant %s, error was \n: '%v'", tenantUuidStr, err.Error())
return err, nil
return nil, err
}
tenant.BaseClusterTopology = clusterTopologyJson

serviceConfigsJson, err := json.Marshal(serviceConfigs)
if err != nil {
logrus.Errorf("an error occured while encoding the service configs for tenant %s, error was \n: '%v'", tenantUuidStr, err.Error())
return err, nil
return nil, err
}
tenant.ServiceConfigs = serviceConfigsJson

ingressConfigsJson, err := json.Marshal(ingressConfigs)
if err != nil {
logrus.Errorf("an error occured while encoding the ingress configs for tenant %s, error was \n: '%v'", tenantUuidStr, err.Error())
return err, nil
return nil, err
}
tenant.IngressConfigs = ingressConfigsJson

err = sv.db.SaveTenant(tenant)
if err != nil {
logrus.Errorf("an error occured while saving tenant %s. erro was \n: '%v'", tenant.TenantId, err.Error())
return err, nil
return nil, err
}

flowHostMapping := clusterTopology.GetFlowHostMapping()

return nil, flowHostMapping[flowID]
return flowHostMapping[flowID], nil
}

// ============================================================================================================
Expand All @@ -459,7 +461,8 @@ func applyProdDevFlow(sv *Server, tenantUuidStr string, patches []flow_spec.Serv
}
serviceConfigs = template.ApplyTemplateOverrides(serviceConfigs, templateSpec)

baselineFlowID := prodFlowId
// the baseline flow ID uses the base cluster topology namespace name
baselineFlowID := baseClusterTopologyMaybeWithTemplateOverrides.Namespace

baseClusterTopologyWithTemplateOverridesPtr, err := engine.GenerateProdOnlyCluster(baselineFlowID, serviceConfigs, ingressConfigs, baseTopology.Namespace)
if err != nil {
Expand Down Expand Up @@ -546,8 +549,8 @@ func getTenantTopologies(sv *Server, tenantUuidStr string) (*resolved.ClusterTop
return nil, nil, nil, nil, nil, err
}
} else {
baseClusterTopology.FlowID = prodFlowId
baseClusterTopology.Namespace = prodFlowId
baseClusterTopology.FlowID = defaultBaselineFlowId
baseClusterTopology.Namespace = defaultBaselineFlowId
}

var serviceConfigs []apitypes.ServiceConfig
Expand Down
5 changes: 3 additions & 2 deletions kontrol-service/engine/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
"kardinal.kontrol-service/types/flow_spec"
)

// GenerateProdOnlyCluster create the baseline cluster which can be also called prod cluster which was the first name used
func GenerateProdOnlyCluster(flowID string, serviceConfigs []apitypes.ServiceConfig, ingressConfigs []apitypes.IngressConfig, namespace string) (*resolved.ClusterTopology, error) {
clusterTopology, err := generateClusterTopology(serviceConfigs, ingressConfigs, namespace, flowID)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occured generating the cluster topology from the service configs")
return nil, stacktrace.Propagate(err, "An error occurred generating the cluster topology from the service configs")
}

return clusterTopology, nil
Expand Down Expand Up @@ -58,7 +59,7 @@ func GenerateProdDevCluster(baseClusterTopologyMaybeWithTemplateOverrides *resol

clusterTopology, err := flow.CreateDevFlow(pluginRunner, *baseClusterTopologyMaybeWithTemplateOverrides, *baseTopology, flowPatch)
if err != nil {
return nil, stacktrace.Propagate(err, "An error occured generating the cluster topology from the service configs")
return nil, stacktrace.Propagate(err, "An error occurred generating the cluster topology from the service configs")
}

return clusterTopology, nil
Expand Down
14 changes: 5 additions & 9 deletions kontrol-service/engine/flow/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,27 +74,23 @@ function determine_destination(request_handle, trace_id, hostname)
"outbound|8080||trace-router.default.svc.cluster.local",
{
[":method"] = "GET",
[":path"] = "/route?trace_id=" .. trace_id .. "&hostname=" .. hostname,
[":path"] = "/route?trace_id=" .. trace_id .. "&hostname=" .. hostname .. "&baseline_prefix=%s",
[":authority"] = "trace-router.default.svc.cluster.local"
},
"",
5000
)
if not headers or headers[":status"] ~= "200" then
request_handle:logWarn("Failed to determine destination, falling back to prod")
return hostname .. "-prod" -- Fallback to prod
request_handle:logWarn("Failed to determine destination, falling back to baseline")
return hostname .. "-%s" -- Fallback to baseline
end
return body
end
`

luaFilterType = "type.googleapis.com/envoy.extensions.filters.http.lua.v3.Lua"

// this is related to the prod flowID and prod default namespace
// TODO find a way to centralize this value for all of these concepts (Service.version, flowID and default namespace)
prodVersion = "prod"
)

func generateLuaTraceHeaderPriorities() string {
Expand All @@ -109,6 +105,6 @@ func generateLuaTraceHeaderPriorities() string {
sb.WriteString("}")
return sb.String()
}
func getOutgoingRequestTraceIDFilter() string {
return fmt.Sprintf(outgoingRequestTraceIDFilterTemplate, generateLuaTraceHeaderPriorities())
func getOutgoingRequestTraceIDFilter(baselineHostName string) string {
return fmt.Sprintf(outgoingRequestTraceIDFilterTemplate, generateLuaTraceHeaderPriorities(), baselineHostName, baselineHostName)
}
13 changes: 8 additions & 5 deletions kontrol-service/engine/flow/dev_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ func CreateDevFlow(pluginRunner *plugins.PluginRunner, baseClusterTopologyMaybeW
}
}

// Replace "prod" version services with baseTopology versions
// the baseline topology flow ID and flow version are equal to the namespace these three should use same value
baselineFlowVersion := baseTopology.Namespace
// Replace "baseline" version services with baseTopology versions
for i, service := range topologyRef.Services {
if service.Version == prodVersion {

if service.Version == baselineFlowVersion {
prodService, err := baseTopology.GetService(service.ServiceID)
if err != nil {
return nil, fmt.Errorf("failed to get prod service %s: %v", service.ServiceID, err)
Expand All @@ -73,7 +76,7 @@ func CreateDevFlow(pluginRunner *plugins.PluginRunner, baseClusterTopologyMaybeW
// postgres is marked as shared, we mark its parent "cartservice" as shared
// cartservice then happens in the loop and we try again (currently we don't as we check if version isn't shared)
for _, service := range topology.Services {
if service.IsShared && service.Version != prodVersion && service.Version != constants.SharedVersionVersionString {
if service.IsShared && service.Version != baselineFlowVersion && service.Version != constants.SharedVersionVersionString {
logrus.Infof("Marking service '%v' as shared, current version '%v'", service.ServiceID, service.Version)
originalVersion := service.Version
service.Version = constants.SharedVersionVersionString
Expand All @@ -91,14 +94,14 @@ func CreateDevFlow(pluginRunner *plugins.PluginRunner, baseClusterTopologyMaybeW

// Update service dependencies
for i, dependency := range topologyRef.ServiceDependencies {
if dependency.Service.Version == prodVersion {
if dependency.Service.Version == baselineFlowVersion {
prodService, err := baseTopology.GetService(dependency.Service.ServiceID)
if err != nil {
return nil, fmt.Errorf("failed to get prod service %s for dependency: %v", dependency.Service.ServiceID, err)
}
topologyRef.ServiceDependencies[i].Service = prodService
}
if dependency.DependsOnService.Version == prodVersion {
if dependency.DependsOnService.Version == baselineFlowVersion {
prodDependsOnService, err := baseTopology.GetService(dependency.DependsOnService.ServiceID)
if err != nil {
return nil, fmt.Errorf("failed to get prod service %s for dependsOn: %v", dependency.DependsOnService.ServiceID, err)
Expand Down
Loading

0 comments on commit 45f1826

Please sign in to comment.