From bd6c4c1f87f517b7ed5d08cae4aacb6d47e43339 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 26 Apr 2023 15:53:05 +0200 Subject: [PATCH] feat: Write pod cost to CRD (#1134) * alternative to adding the Kubernetes annotation directly to the pod resource in case allowing PATCH verb is a security concern * a separate Kubernetes operator with more privilages would watch the CR and update the pod resource with the annotation * separate ci job for the CR integration tests --- .../integration-tests-rollingupdate-cr.yml | 68 ++++ build.sbt | 4 + docs/src/main/paradox/rolling-updates.md | 59 +++- .../rollingupdate-kubernetes/build.sbt | 18 +- .../kubernetes/akka-cluster-cr.yml | 126 +++++++ .../kubernetes/akka-cluster.yml | 2 + .../src/main/resources/logback.xml | 6 +- .../rollingupdate-kubernetes/test-cr.sh | 12 + .../rollingupdate-kubernetes-cr-test.sh | 71 ++++ .../scripts/rollingupdate-kubernetes-test.sh | 4 + project/Dependencies.scala | 5 +- .../pod-cost-example.yml | 16 + rolling-update-kubernetes/pod-cost.yml | 49 +++ .../KubernetesApiIntegrationTest.scala | 170 ++++++++++ .../src/main/resources/reference.conf | 18 + .../kubernetes/ApiRequests.scala | 42 --- .../kubernetes/KubernetesApi.scala | 106 ++++++ .../kubernetes/KubernetesApiImpl.scala | 314 ++++++++++++++++++ .../kubernetes/KubernetesJsonSupport.scala | 47 +++ .../kubernetes/KubernetesSettings.scala | 33 +- .../kubernetes/PodDeletionCost.scala | 73 +++- .../kubernetes/PodDeletionCostAnnotator.scala | 205 +++++++----- .../PodDeletionCostAnnotatorCrSpec.scala | 307 +++++++++++++++++ .../PodDeletionCostAnnotatorSpec.scala | 49 ++- 24 files changed, 1631 insertions(+), 173 deletions(-) create mode 100644 .github/workflows/integration-tests-rollingupdate-cr.yml create mode 100644 integration-test/rollingupdate-kubernetes/kubernetes/akka-cluster-cr.yml create mode 100755 integration-test/rollingupdate-kubernetes/test-cr.sh create mode 100755 integration-test/scripts/rollingupdate-kubernetes-cr-test.sh create mode 100644 rolling-update-kubernetes/pod-cost-example.yml create mode 100644 rolling-update-kubernetes/pod-cost.yml create mode 100644 rolling-update-kubernetes/src/it/scala/akka/rollingupdate/kubernetes/KubernetesApiIntegrationTest.scala delete mode 100644 rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/ApiRequests.scala create mode 100644 rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesApi.scala create mode 100644 rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesApiImpl.scala create mode 100644 rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesJsonSupport.scala create mode 100644 rolling-update-kubernetes/src/test/scala/akka/rollingupdate/kubernetes/PodDeletionCostAnnotatorCrSpec.scala diff --git a/.github/workflows/integration-tests-rollingupdate-cr.yml b/.github/workflows/integration-tests-rollingupdate-cr.yml new file mode 100644 index 000000000..7b5fd7e62 --- /dev/null +++ b/.github/workflows/integration-tests-rollingupdate-cr.yml @@ -0,0 +1,68 @@ +name: Integration test for Rolling Update CR Kubernetes + +on: + pull_request: + push: + branches: + - main + - release-* + tags-ignore: [ v.* ] + schedule: + - cron: '0 2 * * *' # every day 2am + workflow_dispatch: + +permissions: + contents: read + +jobs: + integration-test: + name: Integration Tests for Rolling Update CR Kubernetes + runs-on: ubuntu-22.04 + strategy: + fail-fast: false + steps: + - name: Checkout + uses: actions/checkout@v3.1.0 + with: + fetch-depth: 0 + + - name: Checkout GitHub merge + if: github.event.pull_request + run: |- + git fetch origin pull/${{ github.event.pull_request.number }}/merge:scratch + git checkout scratch + + - name: Cache Coursier cache + uses: coursier/cache-action@v6.4.0 + + - name: Set up JDK 11 + uses: coursier/setup-action@v1.3.0 + with: + jvm: temurin:1.11.0 + + - name: Setup Minikube + # https://github.com/manusa/actions-setup-minikube/releases + # v2.7.1 + uses: manusa/actions-setup-minikube@4582844dcacbf482729f8d7ef696f515d2141bb9 + with: + minikube version: 'v1.21.0' + kubernetes version: 'v1.22.0' + driver: docker + start args: '--addons=ingress' + + - name: Run Integration Tests + timeout-minutes: 15 + run: |- + echo 'Creating namespace' + kubectl create namespace rolling + echo 'Creating resources' + kubectl apply -f ./rolling-update-kubernetes/pod-cost.yml + echo 'Adding proxy port' + kubectl proxy --port=8080 & + echo 'Running tests' + sbt "rolling-update-kubernetes/IntegrationTest/test" + ./integration-test/rollingupdate-kubernetes/test-cr.sh + + - name: Print logs on failure + if: ${{ failure() }} + run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \; diff --git a/build.sbt b/build.sbt index fa1c239a2..5afd8925b 100644 --- a/build.sbt +++ b/build.sbt @@ -167,6 +167,10 @@ lazy val `rolling-update-kubernetes` = project libraryDependencies := Dependencies.RollingUpdateKubernetes, mimaPreviousArtifacts := Set.empty ) + .settings( + Defaults.itSettings + ) + .configs(IntegrationTest) .dependsOn(`akka-management-pki`) lazy val `lease-kubernetes` = project diff --git a/docs/src/main/paradox/rolling-updates.md b/docs/src/main/paradox/rolling-updates.md index 0726c34e8..7b88c90bf 100644 --- a/docs/src/main/paradox/rolling-updates.md +++ b/docs/src/main/paradox/rolling-updates.md @@ -2,7 +2,7 @@ Rolling updates allow you to update an application by gradually replacing old nodes with new ones. This ensures that the application remains available throughout the update process, with minimal disruption to clients. -#### Graceful shutdown +## Graceful shutdown Akka Cluster can handle hard failures using a downing provider such as Lightbend's @extref:[Split Brain Resolver](akka:split-brain-resolver.html). However, this should not be relied upon for regular rolling redeploys. Features such as `ClusterSingleton`s and `ClusterSharding` @@ -19,12 +19,12 @@ Upon receiving a `SIGTERM` Coordinated Shutdown will: `ClusterSingleton`s to be migrated if this was the oldest node. Finally, the node is removed from the Akka Cluster membership. -#### Number of nodes to redeploy at once +## Number of nodes to redeploy at once Akka bootstrap requires a `stable-period` where service discovery returns a stable set of contact points. When doing rolling updates it is best to wait for a node (or group of nodes) to finish joining the cluster before adding and removing other nodes. -#### Cluster Singletons +## Cluster Singletons `ClusterSingleton`s run on the oldest node in the cluster. To avoid singletons moving during every node deployment it is advised to start a rolling redeploy starting at the newest node. Then `ClusterSingleton`s only move once. Cluster Sharding uses a singleton internally so this is important even if not using singletons directly. @@ -89,11 +89,11 @@ Additionally, the pod annotator needs to know which namespace the pod belongs to from the service account secret, in `/var/run/secrets/kubernetes.io/serviceaccount/namespace`, but can be overridden by setting `akka.rollingupdate.kubernetes.namespace` or by providing `KUBERNETES_NAMESPACE` environment variable. -##### Role based access control +#### Role based access control @@@ warning -This extension uses the Kubernetes API to set the `pod-deletion-cost` annotation on its own pod. To be able to do that, it requires special permission to be able to `patch` the pod configuration. Each pod only needs access to the namespace they are in. +This extension uses the Kubernetes API to set the `pod-deletion-cost` annotation on its own pod. To be able to do that, it requires special permission to be able to `patch` the pod configuration. Each pod only needs access to the namespace they are in. If this is a security concern in your environment you may instead use @ref:[Alternative with Custom Resource Definition](#alternative-with-custom-resource-definition). @@@ @@ -130,5 +130,54 @@ This RBAC example covers only the permissions needed for this `PodDeletionCost` @@@ +#### Alternative with Custom Resource Definition + +If it's a security concern in your environment to allow "patch" in RBAC as described above, you can instead use an +intermediate Custom Resource Definition (CRD). Instead of updating the `controller.kubernetes.io/pod-deletion-cost` +annotation directly it will update a `PodCost` custom resource and then you would have an operator that reconciles +that and updates the pod-deletion-cost annotation of the pod resource. + +@@@ note + +You would have to write the Kubernetes operator that watches the `PodCost` resource and updates the +`controller.kubernetes.io/pod-deletion-cost` annotation of the corresponding pod resource. This operator +is not provided by Akka. + +@@@ + +Enable updates of custom resource with configuration: + +``` +akka.rollingupdate.kubernetes.custom-resource.enabled = true +``` + +The `PodCost` CRD: + +@@snip [pod-cost.yml](/rolling-update-kubernetes/pod-cost.yml) {} + +The RBAC for the application to update the `PodCost` CR, instead of "patch" of the "pods" resources: + +``` +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: podcost-access +rules: + - apiGroups: ["akka.io"] + resources: ["podcosts"] + verbs: ["get", "create", "update", "delete", "list"] +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: podcost-access +subjects: + - kind: User + name: system:serviceaccount::default +roleRef: + kind: Role + name: podcost-access + apiGroup: rbac.authorization.k8s.io +``` diff --git a/integration-test/rollingupdate-kubernetes/build.sbt b/integration-test/rollingupdate-kubernetes/build.sbt index 9479b7396..ed36e253e 100644 --- a/integration-test/rollingupdate-kubernetes/build.sbt +++ b/integration-test/rollingupdate-kubernetes/build.sbt @@ -1,20 +1,8 @@ -import com.typesafe.sbt.packager.docker._ -enablePlugins(JavaServerAppPackaging) +enablePlugins(JavaAppPackaging, DockerPlugin) version := "1.3.3.7" // we hard-code the version here, it could be anything really -dockerCommands := - dockerCommands.value.flatMap { - case ExecCmd("ENTRYPOINT", args @ _*) => Seq(Cmd("ENTRYPOINT", args.mkString(" "))) - case v => Seq(v) - } - dockerExposedPorts := Seq(8080, 8558, 2552) -dockerBaseImage := "openjdk:8-jre-alpine" - -dockerCommands ++= Seq( - Cmd("USER", "root"), - Cmd("RUN", "/sbin/apk", "add", "--no-cache", "bash", "bind-tools", "busybox-extras", "curl", "strace"), - Cmd("RUN", "chgrp -R 0 . && chmod -R g=u .") -) +dockerBaseImage := "docker.io/library/adoptopenjdk:11-jre-hotspot" +dockerUpdateLatest := true diff --git a/integration-test/rollingupdate-kubernetes/kubernetes/akka-cluster-cr.yml b/integration-test/rollingupdate-kubernetes/kubernetes/akka-cluster-cr.yml new file mode 100644 index 000000000..2bdfc051e --- /dev/null +++ b/integration-test/rollingupdate-kubernetes/kubernetes/akka-cluster-cr.yml @@ -0,0 +1,126 @@ +#deployment +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: akka-rollingupdate-demo + name: akka-rollingupdate-demo +spec: + replicas: 3 + selector: + matchLabels: + app: akka-rollingupdate-demo + strategy: + rollingUpdate: + maxSurge: 1 + maxUnavailable: 0 + type: RollingUpdate + + template: + metadata: + labels: + app: akka-rollingupdate-demo + actorSystemName: akka-rollingupdate-demo + spec: + containers: + - name: akka-rollingupdate-demo + image: integration-test-rollingupdate-kubernetes:1.3.3.7 + # Remove for a real project, the image is picked up locally for the integration test + imagePullPolicy: Never + resources: + limits: + memory: "256Mi" + requests: + memory: "256Mi" + cpu: "300m" + #health + livenessProbe: + httpGet: + path: /alive + port: management + readinessProbe: + httpGet: + path: /ready + port: management + #health + ports: + # akka-management bootstrap + - name: management + containerPort: 8558 + protocol: TCP + - name: http + containerPort: 8080 + protocol: TCP + env: + - name: KUBERNETES_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + # The pod deletion cost will use this var to identity the pod to be annotated (in case that applies) + - name: KUBERNETES_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: REQUIRED_CONTACT_POINT_NR + value: "3" + - name: JAVA_TOOL_OPTIONS + value: "-XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75 -Dakka.rollingupdate.kubernetes.custom-resource.enabled=on" +#deployment +--- +#rbac-reader +# +# Create a role, `pod-annotator`, that can list pods and +# bind the default service account in the namespace +# that the binding is deployed to to that role. +# + +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: pod-reader +rules: +- apiGroups: [""] # "" indicates the core API group + resources: ["pods"] + verbs: ["get", "watch", "list"] # requires "patch" to annotate the pod +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: pod-reader +subjects: + # Uses the default service account. + # Consider creating a dedicated service account to run your + # Akka Cluster services and binding the role to that one. +- kind: ServiceAccount + name: default +roleRef: + kind: Role + name: pod-reader + apiGroup: rbac.authorization.k8s.io +#rbac-reader +--- +#rbac-podcost-cr +# +# Create a role, `podcost-access`, that can update the PodCost CR +# +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: podcost-access +rules: + - apiGroups: ["akka.io"] + resources: ["podcosts"] + verbs: ["get", "create", "update", "delete", "list"] +--- +kind: RoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: podcost-access +subjects: + - kind: User + name: system:serviceaccount:akka-rollingupdate-demo-cr-ns:default +roleRef: + kind: Role + name: podcost-access + apiGroup: rbac.authorization.k8s.io +#rbac-podcost-cr diff --git a/integration-test/rollingupdate-kubernetes/kubernetes/akka-cluster.yml b/integration-test/rollingupdate-kubernetes/kubernetes/akka-cluster.yml index 8e0aca0cb..13194cdba 100644 --- a/integration-test/rollingupdate-kubernetes/kubernetes/akka-cluster.yml +++ b/integration-test/rollingupdate-kubernetes/kubernetes/akka-cluster.yml @@ -57,6 +57,8 @@ spec: fieldPath: metadata.name - name: REQUIRED_CONTACT_POINT_NR value: "3" + - name: JAVA_TOOL_OPTIONS + value: "-XX:InitialRAMPercentage=75 -XX:MaxRAMPercentage=75" #deployment --- #rbac-reader diff --git a/integration-test/rollingupdate-kubernetes/src/main/resources/logback.xml b/integration-test/rollingupdate-kubernetes/src/main/resources/logback.xml index 073d142cf..b04430b17 100644 --- a/integration-test/rollingupdate-kubernetes/src/main/resources/logback.xml +++ b/integration-test/rollingupdate-kubernetes/src/main/resources/logback.xml @@ -8,10 +8,12 @@ - + + + - \ No newline at end of file + diff --git a/integration-test/rollingupdate-kubernetes/test-cr.sh b/integration-test/rollingupdate-kubernetes/test-cr.sh new file mode 100755 index 000000000..1cd8db1e9 --- /dev/null +++ b/integration-test/rollingupdate-kubernetes/test-cr.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +set -exu + +export NAMESPACE=akka-rollingupdate-demo-cr-ns +export APP_NAME=akka-rollingupdate-demo +export PROJECT_NAME=integration-test-rollingupdate-kubernetes +export CRD=rolling-update-kubernetes/pod-cost.yml +export DEPLOYMENT=integration-test/rollingupdate-kubernetes/kubernetes/akka-cluster-cr.yml + +integration-test/scripts/rollingupdate-kubernetes-cr-test.sh + diff --git a/integration-test/scripts/rollingupdate-kubernetes-cr-test.sh b/integration-test/scripts/rollingupdate-kubernetes-cr-test.sh new file mode 100755 index 000000000..37d518e74 --- /dev/null +++ b/integration-test/scripts/rollingupdate-kubernetes-cr-test.sh @@ -0,0 +1,71 @@ +#!/bin/bash -e + +echo "Running rollingupdate-kubernetes-cr-test.sh with deployment: $DEPLOYMENT" + +eval $(minikube -p minikube docker-env) +sbt $PROJECT_NAME/docker:publishLocal + +docker images | head + +kubectl create namespace $NAMESPACE || true +kubectl apply -f $CRD +kubectl -n $NAMESPACE delete deployment akka-rollingupdate-demo || true +kubectl -n $NAMESPACE apply -f $DEPLOYMENT + +for i in {1..10} +do + echo "Waiting for pods to get ready..." + kubectl get pods -n $NAMESPACE + [ `kubectl get pods -n $NAMESPACE | grep Running | wc -l` -eq 3 ] && break + sleep 4 +done + +if [ $i -eq 10 ] +then + echo "Pods did not get ready" + kubectl -n $NAMESPACE describe deployment akka-rollingupdate-demo + exit -1 +fi + +max_tries=10 +try_count=0 + +# Loop until all pods are included or the maximum number of tries is reached +while true +do + # Get the list of pods matching the namespace and app name, and are in the Running state + pod_list=$(kubectl get pods -n $NAMESPACE | grep $APP_NAME | grep Running | awk '{ print $1 }' | sort) + + # Get the pods in the CR + cr_pod_list=$(kubectl describe podcosts.akka.io akka-rollingupdate-demo -n $NAMESPACE | grep "Pod Name" | awk '{print $3}' | sort) + + if [ "$pod_list" = "$cr_pod_list" ] + then + echo "Found expected pods in CR: $cr_pod_list" + break + else + echo "Expected $pod_list, but didn't find expected pods in CR: $cr_pod_list" + fi + + for pod_name in $pod_list + do + # Get the pod names from the cr + cr_pod_list=$(kubectl describe podcosts.akka.io akka-rollingupdate-demo -n $NAMESPACE | grep "Pod Name" | awk '{print $3}' | sort -z) + + # Check if the annotation value is set or empty + if ["$pod_list" == "$cr_pod_list" ] + then + echo "Found expected pods in CR: $cr_pod_list" + else + echo "Didn't find expected pods in CR: $cr_pod_list" + fi + done + + # Wait for 10 seconds before trying again + sleep 10 +done + +if [[ $try_count -ge $max_tries ]]; then + echo "Exceeded max retries, aborting" + exit 1 +fi diff --git a/integration-test/scripts/rollingupdate-kubernetes-test.sh b/integration-test/scripts/rollingupdate-kubernetes-test.sh index b8e0abe6b..251589809 100755 --- a/integration-test/scripts/rollingupdate-kubernetes-test.sh +++ b/integration-test/scripts/rollingupdate-kubernetes-test.sh @@ -1,11 +1,14 @@ #!/bin/bash -e +echo "Running rollingupdate-kubernetes-test.sh with deployment: $DEPLOYMENT" + eval $(minikube -p minikube docker-env) sbt $PROJECT_NAME/docker:publishLocal docker images | head kubectl create namespace $NAMESPACE || true +kubectl -n $NAMESPACE delete deployment akka-rollingupdate-demo || true kubectl -n $NAMESPACE apply -f $DEPLOYMENT for i in {1..10} @@ -19,6 +22,7 @@ done if [ $i -eq 10 ] then echo "Pods did not get ready" + kubectl -n $NAMESPACE describe deployment akka-rollingupdate-demo exit -1 fi diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 17ebfd3c4..d37e125ba 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -159,9 +159,10 @@ object Dependencies { "com.typesafe.akka" %% "akka-http" % AkkaHttpVersion, "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion, "com.typesafe.akka" %% "akka-testkit" % AkkaVersion % Test, - "org.scalatest" %% "scalatest" % ScalaTestVersion % Test, + "org.scalatest" %% "scalatest" % ScalaTestVersion % "it,test", "com.github.tomakehurst" % "wiremock-jre8" % "2.34.0" % Test, - "org.scalatestplus" %% "junit-4-13" % ScalaTestPlusJUnitVersion % "test" + "org.scalatestplus" %% "junit-4-13" % ScalaTestPlusJUnitVersion % "test", + "com.typesafe.akka" %% "akka-testkit" % AkkaVersion % "it,test" ) val LeaseKubernetes = Seq( diff --git a/rolling-update-kubernetes/pod-cost-example.yml b/rolling-update-kubernetes/pod-cost-example.yml new file mode 100644 index 000000000..03e2c7374 --- /dev/null +++ b/rolling-update-kubernetes/pod-cost-example.yml @@ -0,0 +1,16 @@ +apiVersion: akka.io/v1 +kind: PodCost +metadata: + name: sampleservice +spec: + pods: + - address: akka://SampleService@10.42.0.13:2552 + cost: 10000 + podName: "pod1" + time: 1681909802669 + uid: 3870636288020406585 + - address: akka://SampleService@10.42.0.14:2552 + cost: 9900 + podName: "pod2" + time: 1681909802716 + uid: 1681910047053 diff --git a/rolling-update-kubernetes/pod-cost.yml b/rolling-update-kubernetes/pod-cost.yml new file mode 100644 index 000000000..df4220489 --- /dev/null +++ b/rolling-update-kubernetes/pod-cost.yml @@ -0,0 +1,49 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + # name must match the spec fields below, and be in the form: . + name: podcosts.akka.io +spec: + group: akka.io + versions: + - name: v1 + storage: true + served: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + version: + type: string + pods: + type: array + items: + type: object + properties: + # the name of the pod that should be updated with the pod-deletion-cost annotation + podName: + type: string + # the value of the controller.kubernetes.io/pod-deletion-cost annotation + cost: + type: integer + # address, uid and time are used for cleanup of removed members + address: + type: string + # address, uid and time are used for cleanup of removed members + uid: + type: integer + # address, uid and time are used for cleanup of removed members + time: + type: integer + scope: Namespaced + names: + # kind is normally the CamelCased singular type. Your resource manifests use this. + kind: PodCost + listKind: PodCostList + # singular name to be used as an alias on the CLI and for display + singular: podcost + # plural name to be used in the URL: /apis/// + plural: podcosts diff --git a/rolling-update-kubernetes/src/it/scala/akka/rollingupdate/kubernetes/KubernetesApiIntegrationTest.scala b/rolling-update-kubernetes/src/it/scala/akka/rollingupdate/kubernetes/KubernetesApiIntegrationTest.scala new file mode 100644 index 000000000..e3b2136a6 --- /dev/null +++ b/rolling-update-kubernetes/src/it/scala/akka/rollingupdate/kubernetes/KubernetesApiIntegrationTest.scala @@ -0,0 +1,170 @@ +package akka.rollingupdate.kubernetes + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.Done +import akka.actor.ActorSystem +import akka.cluster.Cluster +import akka.testkit.TestKit +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.Eventually +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpecLike +import org.scalatest.BeforeAndAfterAll +import org.scalatest.CancelAfterFailure + +/** + * This test requires an API server available on localhost:8080, the PodCost CRD created and a namespace called `rolling` + * + * One way of doing this is to have a kubectl proxy open: + * + * `kubectl proxy --port=8080` + * + */ +class KubernetesApiIntegrationTest + extends TestKit( + ActorSystem( + "KubernetesApiIntegrationSpec", + ConfigFactory.parseString(""" + akka.loglevel = DEBUG + akka.actor.provider = cluster + akka.remote.artery.canonical.port = 0 + akka.remote.artery.canonical.hostname = 127.0.0.1 + """) + )) + with AnyWordSpecLike + with Matchers + with ScalaFutures + with BeforeAndAfterAll + with CancelAfterFailure + with Eventually { + + implicit val patience: PatienceConfig = PatienceConfig(testKitSettings.DefaultTimeout.duration) + + private val cluster = Cluster(system) + + private val settings = new KubernetesSettings( + "", + "", + "localhost", + 8080, + namespace = Some("rolling"), + "", + podName = "pod1", + secure = false, + apiServiceRequestTimeout = 1.second, + new CustomResourceSettings( + enabled = true, + crName = None, + cleanupAfter = 60.seconds + ) + ) + + private val underTest = + new KubernetesApiImpl(system, settings, settings.namespace.get, apiToken = "", clientHttpsConnectionContext = None) + private val crName = KubernetesApi.makeDNS1039Compatible(system.name) + private val podName1 = "pod1" + private val podName2 = "pod2" + private var currentVersion = "" + + override protected def afterAll(): Unit = { + TestKit.shutdownActorSystem(system) + } + + override protected def beforeAll(): Unit = { + // do some operation to check the proxy is up + eventually { + Await.result(underTest.removePodCostResource(crName), 2.second) shouldEqual Done + } + } + + "Kubernetes PodCost resource" should { + "be able to be created" in { + val podCostResource = underTest.readOrCreatePodCostResource(crName).futureValue + podCostResource.version shouldNot equal("") + podCostResource.version shouldNot equal(null) + podCostResource.pods shouldEqual Nil + currentVersion = podCostResource.version + } + + "be able to read back with same version" in { + val podCostResource = underTest.readOrCreatePodCostResource(crName).futureValue + podCostResource.version shouldEqual currentVersion + } + + "be able to update empty resource" in { + val podCost = PodCost( + podName1, + 1, + cluster.selfUniqueAddress.address.toString, + cluster.selfUniqueAddress.longUid, + System.currentTimeMillis()) + val podCostResource = underTest.updatePodCostResource(crName, currentVersion, Vector(podCost)).futureValue + val success: PodCostResource = podCostResource match { + case Right(r) => r + case Left(_) => fail("There shouldn't be anyone else updating the resource.") + } + success.version shouldNot equal(currentVersion) + currentVersion = success.version + success.pods shouldEqual Vector(podCost) + } + + "be able to update a resource if resource version is correct" in { + val podCost = PodCost( + podName1, + 2, + cluster.selfUniqueAddress.address.toString, + cluster.selfUniqueAddress.longUid, + System.currentTimeMillis()) + val podCostResource = underTest.updatePodCostResource(crName, currentVersion, Vector(podCost)).futureValue + val success: PodCostResource = podCostResource match { + case Right(r) => r + case Left(_) => fail("There shouldn't be anyone else updating the resource.") + } + success.version shouldNot equal(currentVersion) + currentVersion = success.version + success.pods shouldEqual Vector(podCost) + } + + "not be able to update a resource if resource version is incorrect" in { + val podCost = PodCost( + podName1, + 3, + cluster.selfUniqueAddress.address.toString, + cluster.selfUniqueAddress.longUid, + System.currentTimeMillis()) + val podCostResource = underTest.updatePodCostResource(crName, version = "10", Vector(podCost)).futureValue + val failure: PodCostResource = podCostResource match { + case Right(_) => fail("Expected update failure (we've used an invalid version!).") + case Left(r) => r + } + failure.version shouldEqual currentVersion + currentVersion = failure.version + failure.pods.head.cost shouldNot equal(podCost.cost) + failure.pods.head.time shouldNot equal(podCost.time) + } + + "be able to add more to the resource" in { + val podCost2 = PodCost( + podName2, + 4, + cluster.selfUniqueAddress.address.toString, + cluster.selfUniqueAddress.longUid, + System.currentTimeMillis()) + val podCostResource1 = underTest.readOrCreatePodCostResource(crName).futureValue + val podCostResource2 = + underTest.updatePodCostResource(crName, currentVersion, podCostResource1.pods :+ podCost2).futureValue + val success: PodCostResource = podCostResource2 match { + case Right(r) => r + case Left(_) => fail("There shouldn't be anyone else updating the resource.") + } + success.version shouldNot equal(currentVersion) + currentVersion = success.version + success.pods.last shouldEqual podCost2 + success.pods.size shouldEqual podCostResource1.pods.size + 1 + } + } + +} diff --git a/rolling-update-kubernetes/src/main/resources/reference.conf b/rolling-update-kubernetes/src/main/resources/reference.conf index d83203b26..02afd824e 100644 --- a/rolling-update-kubernetes/src/main/resources/reference.conf +++ b/rolling-update-kubernetes/src/main/resources/reference.conf @@ -17,6 +17,8 @@ akka.rollingupdate.kubernetes { api-service-port = 8080 api-service-port = ${?KUBERNETES_SERVICE_PORT} + api-service-request-timeout = 2s + # Namespace file path. The namespace is to create the lock in. Can be overridden by "namespace" # # If this path doesn't exist, the namespace will default to "default". @@ -44,4 +46,20 @@ akka.rollingupdate.kubernetes { # Fixed time delay between retries when pod annotation fails retry-delay = 5s } + + # An alternative if allowing PATCH of the pod resource is a security concern is to use a custom resource. + # Instead of updating the "controller.kubernetes.io/pod-deletion-cost" annotation directly it will + # update a PodCost custom resource and then you would have an operator that reconciles that and updates the + # pod-deletion-cost annotation of the pod resource. + custom-resource { + # When enabled the PodCost custom resource is updated instead of directly updating + # the "controller.kubernetes.io/pod-deletion-cost" annotation. + enabled = off + # The name of the custom resource instance (CR). If undefined, it will use the ActorSystem name. + # It's recommended to use a separate CR for each Akka Cluster, but it's possible to share the + # CR in case it is preferred to have only one CR per namespace. + cr-name = "" + # Remove old entries that don't exist in the cluster membership after this duration. + cleanup-after = 60s + } } diff --git a/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/ApiRequests.scala b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/ApiRequests.scala deleted file mode 100644 index cc3708914..000000000 --- a/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/ApiRequests.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Copyright (C) 2017-2023 Lightbend Inc. - */ - -package akka.rollingupdate.kubernetes - -import akka.annotation.InternalApi -import akka.http.scaladsl.model.HttpMethods.PATCH -import akka.http.scaladsl.model.headers.Authorization -import akka.http.scaladsl.model.headers.OAuth2BearerToken -import akka.http.scaladsl.model.HttpEntity -import akka.http.scaladsl.model.HttpRequest -import akka.http.scaladsl.model.MediaTypes -import akka.http.scaladsl.model.Uri -import akka.util.ByteString - -import scala.collection.immutable - -/** - * INTERNAL API - */ -@InternalApi private[kubernetes] object ApiRequests { - - def podDeletionCost(settings: KubernetesSettings, apiToken: String, namespace: String, cost: Int): HttpRequest = { - val path = Uri.Path.Empty / "api" / "v1" / "namespaces" / namespace / "pods" / settings.podName - val scheme = if (settings.secure) "https" else "http" - val uri = Uri.from(scheme, host = settings.apiServiceHost, port = settings.apiServicePort).withPath(path) - val headers = if (settings.secure) immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil - - HttpRequest( - method = PATCH, - uri = uri, - headers = headers, - entity = HttpEntity( - MediaTypes.`application/merge-patch+json`, - ByteString( - s"""{"metadata": {"annotations": {"controller.kubernetes.io/pod-deletion-cost": "$cost" }}}""" - )) - ) - } - -} diff --git a/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesApi.scala b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesApi.scala new file mode 100644 index 000000000..0a0a207cb --- /dev/null +++ b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesApi.scala @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2017-2023 Lightbend Inc. + */ + +package akka.rollingupdate.kubernetes + +import java.text.Normalizer + +import scala.collection.immutable +import scala.concurrent.Future + +import akka.Done +import akka.actor.AddressFromURIString +import akka.annotation.InternalApi +import akka.cluster.UniqueAddress + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final case class PodCostResource(version: String, pods: immutable.Seq[PodCost]) + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final case class PodCost(podName: String, cost: Int, address: String, uid: Long, time: Long) { + @transient + lazy val uniqueAddress: UniqueAddress = UniqueAddress(AddressFromURIString(address), uid) +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] sealed class PodCostException(message: String) extends RuntimeException(message) + +/** + * INTERNAL API + */ +@InternalApi private[akka] final class PodCostTimeoutException(message: String) extends PodCostException(message) + +/** + * INTERNAL API + */ +@InternalApi private[akka] final class PodCostClientException(message: String) extends PodCostException(message) + +/** + * INTERNAL API + */ +@InternalApi private[akka] object KubernetesApi { + + /** + * Removes from the leading and trailing positions the specified characters. + */ + private def trim(name: String, characters: List[Char]): String = + name.dropWhile(characters.contains(_)).reverse.dropWhile(characters.contains(_)).reverse + + /** + * Make a name compatible with DNS 1039 standard: like a single domain name segment. + * Regex to follow: [a-z]([-a-z0-9]*[a-z0-9]) + * Validates the resulting name to be at most 63 characters, otherwise throws `IllegalArgumentException`. + */ + def makeDNS1039Compatible(name: String): String = { + val normalized = + Normalizer.normalize(name, Normalizer.Form.NFKD).toLowerCase.replaceAll("[_.]", "-").replaceAll("[^-a-z0-9]", "") + if (normalized.length > 63) + throw new IllegalArgumentException(s"Too long resource name [$normalized]. At most 63 characters is accepted. " + + "A custom resource name can be defined in configuration `akka.rollingupdate.kubernetes.custom-resource.cr-name`.") + trim(normalized, List('-')) + } +} + +/** + * INTERNAL API + */ +private[akka] trait KubernetesApi { + + def namespace: String + + def updatePodDeletionCostAnnotation(podName: String, cost: Int): Future[Done] + + /** + * Reads a PodCost from the API server. If it doesn't exist it tries to create it. + * The creation can fail due to another instance creating at the same time, in this case + * the read is retried. + */ + def readOrCreatePodCostResource(crName: String): Future[PodCostResource] + + /** + * Update the named resource. + * + * Must [[readOrCreatePodCostResource]] to first to get a resource version. + * + * Can return one of three things: + * - Future failure e.g. timed out waiting for k8s api server to respond + * - Left - Update failed due to version not matching current in the k8s api server. In this case resource is returned so the version can be used for subsequent calls + * - Right - Success + * + * Any subsequent updates should also use the latest version or re-read with [[readOrCreatePodCostResource]] + */ + def updatePodCostResource( + crName: String, + version: String, + pods: immutable.Seq[PodCost]): Future[Either[PodCostResource, PodCostResource]] + +} diff --git a/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesApiImpl.scala b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesApiImpl.scala new file mode 100644 index 000000000..adc2e75c4 --- /dev/null +++ b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesApiImpl.scala @@ -0,0 +1,314 @@ +/* + * Copyright (C) 2017-2023 Lightbend Inc. + */ + +package akka.rollingupdate.kubernetes + +import scala.collection.immutable +import scala.concurrent.Future + +import akka.Done +import akka.actor.ActorSystem +import akka.annotation.InternalApi +import akka.event.Logging +import akka.http.scaladsl.Http +import akka.http.scaladsl.HttpsConnectionContext +import akka.http.scaladsl.marshalling.Marshal +import akka.http.scaladsl.model.HttpEntity +import akka.http.scaladsl.model.HttpMethods.PATCH +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.MediaTypes +import akka.http.scaladsl.model.StatusCodes.ClientError +import akka.http.scaladsl.model.Uri +import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.Authorization +import akka.http.scaladsl.model.headers.OAuth2BearerToken +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.pattern.after +import akka.util.ByteString + +/** + * INTERNAL API + */ +@InternalApi private[akka] class KubernetesApiImpl( + system: ActorSystem, + settings: KubernetesSettings, + override val namespace: String, + apiToken: String, + clientHttpsConnectionContext: Option[HttpsConnectionContext]) + extends KubernetesApi + with KubernetesJsonSupport { + + import system.dispatcher + + private implicit val sys: ActorSystem = system + private val log = Logging(system, classOf[KubernetesApiImpl]) + private val http = Http()(system) + + private val scheme = if (settings.secure) "https" else "http" + private lazy val headers = if (settings.secure) immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil + + log.debug("kubernetes access namespace: {}. Secure: {}", namespace, settings.secure) + + override def updatePodDeletionCostAnnotation(podName: String, cost: Int): Future[Done] = { + val path = Uri.Path.Empty / "api" / "v1" / "namespaces" / namespace / "pods" / podName + val scheme = if (settings.secure) "https" else "http" + val uri = Uri.from(scheme, host = settings.apiServiceHost, port = settings.apiServicePort).withPath(path) + val headers = if (settings.secure) immutable.Seq(Authorization(OAuth2BearerToken(apiToken))) else Nil + + val httpRequest = HttpRequest( + method = PATCH, + uri = uri, + headers = headers, + entity = HttpEntity( + MediaTypes.`application/merge-patch+json`, + ByteString( + s"""{"metadata": {"annotations": {"controller.kubernetes.io/pod-deletion-cost": "$cost" }}}""" + )) + ) + val httpResponse = makeRequest( + httpRequest, + s"Timed out updating pod-deletion-cost annotation for pod: [$podName] with cost: [$cost]. Namespace: [$namespace]") + httpResponse.map { + case HttpResponse(status, _, e, _) if status.isSuccess() => + e.discardBytes() + Done + case HttpResponse(s @ ClientError(_), _, e, _) => + e.discardBytes() + throw new PodCostClientException(s.toString()) + case HttpResponse(status, _, e, _) => + e.discardBytes() + throw new PodCostException(s"Request failed with status=$status") + } + } + + /* + PATH: to get all: /apis/akka.io/v1/namespaces//podcosts + PATH: to get a specific one: /apis/akka.io/v1/namespaces//podcosts/ + curl -v -X POST localhost:8080/apis/akka.io/v1/namespaces//podcosts/ -H "Content-Type: application/yaml" --data-binary "@pod-cost-example.yml" + + responds with either: + 409 Conflict Already Exists + + OR + + 201 Created if it works + */ + override def readOrCreatePodCostResource(crName: String): Future[PodCostResource] = { + val maxTries = 5 + + def loop(tries: Int = 0): Future[PodCostResource] = { + log.debug("Trying to create PodCost {}", tries) + for { + oldResource <- getPodCostResource(crName) + lr <- oldResource match { + case Some(found) => + log.debug("{} already exists. Returning {}", crName, found) + Future.successful(found) + case None => + log.info("PodCost {} does not exist, creating", crName) + createPodCostResource(crName).flatMap { + case Some(created) => Future.successful(created) + case None => + if (tries < maxTries) loop(tries + 1) + else Future.failed(new PodCostException(s"Unable to create or read PodCost after $maxTries tries")) + } + } + } yield lr + } + + loop() + } + + /* +curl -v -X PUT localhost:8080/apis/akka.io/v1/namespaces//podcosts/ --data-binary "@pod-cost-example.yml" -H "Content-Type: application/yaml" +PUTs must contain resourceVersions. Response: +409: Resource version is out of date +200 if it is updated + */ + /** + * Update the named resource. + * + * Must [[readOrCreatePodCostResource]] to first to get a resource version. + * + * Can return one of three things: + * - Future.Failure, e.g. timed out waiting for k8s api server to respond + * - Future.sucess[Left(resource)]: the update failed due to version not matching current in the k8s api server. + * In this case the current resource is returned so the version can be used for subsequent calls + * - Future.sucess[Right(resource)]: Returns the PodCostResource that contains the new version. + * The new version should be used for any subsequent calls + */ + override def updatePodCostResource( + crName: String, + version: String, + pods: immutable.Seq[PodCost]): Future[Either[PodCostResource, PodCostResource]] = { + val cr = PodCostCustomResource(Metadata(crName, Some(version)), Spec(pods)) + for { + entity <- Marshal(cr).to[RequestEntity] + response <- { + log.debug("updating {} to {}", crName, cr) + makeRequest( + requestForPath(pathForPodCostResource(crName), method = HttpMethods.PUT, entity), + s"Timed out updating PodCost [$crName]. It is not known if the update happened" + ) + } + result <- response.status match { + case StatusCodes.OK => + Unmarshal(response.entity) + .to[PodCostCustomResource] + .map(updatedCr => { + log.debug("CR after update: {}", updatedCr) + Right(toPodCostResource(updatedCr)) + }) + case StatusCodes.Conflict => + getPodCostResource(crName).map { + case None => + throw new PodCostException(s"GET after PUT conflict did not return a PodCost [$crName]") + case Some(cr) => + log.debug("PodCostResource read after conflict: {}", cr) + Left(cr) + } + case StatusCodes.Unauthorized => + handleUnauthorized(response) + case unexpected => + Unmarshal(response.entity) + .to[String] + .map(body => + throw new PodCostException( + s"PUT for PodCost [$crName] returned unexpected status code $unexpected. Body: $body")) + } + } yield result + } + + private[akka] def removePodCostResource(crName: String): Future[Done] = { + for { + response <- makeRequest( + requestForPath(pathForPodCostResource(crName), HttpMethods.DELETE), + s"Timed out removing PodCost [$crName]. It is not known if the remove happened") + + result <- response.status match { + case StatusCodes.OK => + log.debug("PodCost deleted [{}]", crName) + response.discardEntityBytes() + Future.successful(Done) + case StatusCodes.NotFound => + log.debug("PodCost already deleted [{}]", crName) + response.discardEntityBytes() + Future.successful(Done) // already deleted + case StatusCodes.Unauthorized => + handleUnauthorized(response) + case unexpected => + Unmarshal(response.entity) + .to[String] + .map(body => + throw new PodCostException( + s"Unexpected status code when deleting PodCost. Status: $unexpected. Body: $body")) + } + } yield result + } + + private def getPodCostResource(crName: String): Future[Option[PodCostResource]] = { + val fResponse = makeRequest(requestForPath(pathForPodCostResource(crName)), s"Timed out reading PodCost [$crName]") + for { + response <- fResponse + entity <- response.entity.toStrict(settings.bodyReadTimeout) + lr <- response.status match { + case StatusCodes.OK => + // it exists, parse it + log.debug("Resource [{}] exists: {}", crName, entity) + Unmarshal(entity).to[PodCostCustomResource].map(cr => Some(toPodCostResource(cr))) + case StatusCodes.NotFound => + response.discardEntityBytes() + log.debug("Resource [{}] does not exist", crName) + Future.successful(None) + case StatusCodes.Unauthorized => + handleUnauthorized(response) + case unexpected => + Unmarshal(response.entity) + .to[String] + .map(body => + throw new PodCostException( + s"Unexpected response from API server when retrieving PodCost StatusCode: $unexpected. Body: $body")) + } + } yield lr + } + + private def handleUnauthorized(response: HttpResponse) = { + Unmarshal(response.entity) + .to[String] + .map(body => + throw new PodCostException( + "Unauthorized to communicate with Kubernetes API server. See " + + "https://doc.akka.io/docs/akka-management/current/rolling-updates.html#role-based-access-control " + + s"for setting up access control. Body: $body")) + } + + private def pathForPodCostResource(crName: String): Uri.Path = + Uri.Path.Empty / "apis" / "akka.io" / "v1" / "namespaces" / namespace / "podcosts" / crName + .replaceAll("[^\\d\\w\\-\\.]", "") + .toLowerCase + + private def requestForPath( + path: Uri.Path, + method: HttpMethod = HttpMethods.GET, + entity: RequestEntity = HttpEntity.Empty): HttpRequest = { + val uri = Uri.from(scheme = scheme, host = settings.apiServiceHost, port = settings.apiServicePort).withPath(path) + HttpRequest(uri = uri, headers = headers, method = method, entity = entity) + } + + private def makeRequest(request: HttpRequest, timeoutMsg: String): Future[HttpResponse] = { + val response = { + clientHttpsConnectionContext match { + case None => http.singleRequest(request) + case Some(httpsConnectionContext) => http.singleRequest(request, httpsConnectionContext) + } + } + + // make sure we always consume response body (in case of timeout) + val strictResponse = response.flatMap(_.toStrict(settings.bodyReadTimeout)) + + val timeout = after(settings.apiServiceRequestTimeout, using = system.scheduler)( + Future.failed(new PodCostTimeoutException(s"$timeoutMsg. Is the API server up?"))) + + Future.firstCompletedOf(Seq(strictResponse, timeout)) + } + + private def toPodCostResource(cr: PodCostCustomResource) = { + log.debug("Converting {}", cr) + require( + cr.metadata.resourceVersion.isDefined, + s"PodCostCustomResource returned from Kubernetes without a resourceVersion: $cr") + PodCostResource(cr.metadata.resourceVersion.get, cr.spec.pods) + } + + private def createPodCostResource(crName: String): Future[Option[PodCostResource]] = { + val cr = PodCostCustomResource(Metadata(crName, None), Spec(pods = Vector.empty)) + for { + entity <- Marshal(cr).to[RequestEntity] + response <- makeRequest( + requestForPath(pathForPodCostResource(crName), HttpMethods.POST, entity = entity), + s"Timed out creating PodCost $crName") + responseEntity <- response.entity.toStrict(settings.bodyReadTimeout) + resource <- response.status match { + case StatusCodes.Created => + log.debug("PodCost resource created") + Unmarshal(responseEntity).to[PodCostCustomResource].map(cr => Some(toPodCostResource(cr))) + case StatusCodes.Conflict => + log.debug("creation of PodCost resource failed as already exists. Will attempt to read again") + entity.discardBytes() + // someone else has created it + Future.successful(None) + case StatusCodes.Unauthorized => + handleUnauthorized(response) + case unexpected => + responseEntity + .toStrict(settings.bodyReadTimeout) + .flatMap(e => Unmarshal(e).to[String]) + .map(body => + throw new PodCostException( + s"Unexpected response from API server when creating PodCost StatusCode: $unexpected. Body: $body")) + } + } yield resource + } + +} diff --git a/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesJsonSupport.scala b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesJsonSupport.scala new file mode 100644 index 000000000..0b54e0505 --- /dev/null +++ b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesJsonSupport.scala @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2017-2023 Lightbend Inc. + */ + +package akka.rollingupdate.kubernetes + +import scala.collection.immutable + +import akka.annotation.InternalApi +import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +import spray.json.DefaultJsonProtocol +import spray.json.JsonFormat +import spray.json.RootJsonFormat + +/** + * INTERNAL API + */ +@InternalApi +case class PodCostCustomResource( + metadata: Metadata, + spec: Spec, + kind: String = "PodCost", + apiVersion: String = "akka.io/v1") + +/** + * INTERNAL API + */ +@InternalApi +case class Metadata(name: String, resourceVersion: Option[String]) + +/** + * INTERNAL API + */ +@InternalApi +case class Spec(pods: immutable.Seq[PodCost]) + +/** + * INTERNAL API + */ +@InternalApi +trait KubernetesJsonSupport extends SprayJsonSupport with DefaultJsonProtocol { + implicit val metadataFormat: JsonFormat[Metadata] = jsonFormat2(Metadata.apply) + implicit val podCostFormat: JsonFormat[PodCost] = jsonFormat5(PodCost.apply) + implicit val specFormat: JsonFormat[Spec] = jsonFormat1(Spec.apply) + implicit val podCostCustomResourceFormat: RootJsonFormat[PodCostCustomResource] = jsonFormat4( + PodCostCustomResource.apply) +} diff --git a/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesSettings.scala b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesSettings.scala index 4f86ac056..f00fdfb2c 100644 --- a/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesSettings.scala +++ b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/KubernetesSettings.scala @@ -4,6 +4,8 @@ package akka.rollingupdate.kubernetes +import scala.concurrent.duration._ +import akka.util.JavaDurationConverters._ import akka.annotation.InternalApi import com.typesafe.config.Config @@ -24,6 +26,17 @@ private[kubernetes] object KubernetesSettings { } def apply(config: Config): KubernetesSettings = { + val crName = config.getString("custom-resource.cr-name") match { + case "" => None + case name => Some(name) + } + + val customResourceSettings = new CustomResourceSettings( + enabled = config.getBoolean("custom-resource.enabled"), + crName = crName, + cleanupAfter = config.getDuration("custom-resource.cleanup-after").asScala + ) + new KubernetesSettings( config.getString("api-ca-path"), config.getString("api-token-path"), @@ -32,7 +45,9 @@ private[kubernetes] object KubernetesSettings { config.optDefinedValue("namespace"), config.getString("namespace-path"), config.getString("pod-name"), - config.getBoolean("secure-api-server") + config.getBoolean("secure-api-server"), + config.getDuration("api-service-request-timeout").asScala, + customResourceSettings ) } } @@ -49,4 +64,18 @@ private[kubernetes] class KubernetesSettings( val namespace: Option[String], val namespacePath: String, val podName: String, - val secure: Boolean) + val secure: Boolean, + val apiServiceRequestTimeout: FiniteDuration, + val customResourceSettings: CustomResourceSettings, + val bodyReadTimeout: FiniteDuration = 1.second +) + +/** + * INTERNAL API + */ +@InternalApi +private[kubernetes] class CustomResourceSettings( + val enabled: Boolean, + val crName: Option[String], + val cleanupAfter: FiniteDuration +) diff --git a/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/PodDeletionCost.scala b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/PodDeletionCost.scala index f71775bb0..5825a62da 100644 --- a/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/PodDeletionCost.scala +++ b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/PodDeletionCost.scala @@ -4,26 +4,35 @@ package akka.rollingupdate.kubernetes +import java.nio.file.Files +import java.nio.file.Paths +import java.security.KeyStore +import java.security.SecureRandom +import java.util.concurrent.atomic.AtomicReference + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.util.control.NonFatal + import akka.actor.ActorSystem import akka.actor.ClassicActorSystemProvider import akka.actor.ExtendedActorSystem import akka.actor.Extension import akka.actor.ExtensionId import akka.actor.ExtensionIdProvider -import akka.actor.Props import akka.annotation.InternalApi import akka.dispatch.Dispatchers.DefaultBlockingDispatcherId import akka.event.Logging +import akka.http.scaladsl.ConnectionContext +import akka.http.scaladsl.HttpsConnectionContext +import akka.pki.kubernetes.PemManagersProvider import akka.rollingupdate.kubernetes.PodDeletionCost.Internal.BootstrapStep import akka.rollingupdate.kubernetes.PodDeletionCost.Internal.Initializing import akka.rollingupdate.kubernetes.PodDeletionCost.Internal.NotRunning - -import java.nio.file.Files -import java.nio.file.Paths -import java.util.concurrent.atomic.AtomicReference -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.util.control.NonFatal +import javax.net.ssl.KeyManager +import javax.net.ssl.KeyManagerFactory +import javax.net.ssl.SSLContext +import javax.net.ssl.TrustManager final class PodDeletionCost(implicit system: ExtendedActorSystem) extends Extension { @@ -43,8 +52,6 @@ final class PodDeletionCost(implicit system: ExtendedActorSystem) extends Extens s"Be sure to provide the pod name with `$configPath.pod-name` " + "or by setting ENV variable `KUBERNETES_POD_NAME`.") } else if (startStep.compareAndSet(NotRunning, Initializing)) { - log.debug("Starting PodDeletionCost for podName={} with settings={}", k8sSettings.podName, costSettings) - implicit val blockingDispatcher: ExecutionContext = system.dispatchers.lookup(DefaultBlockingDispatcherId) val props = for { apiToken: String <- Future { readConfigVarFromFilesystem(k8sSettings.apiTokenPath, "api-token").getOrElse("") } @@ -53,14 +60,56 @@ final class PodDeletionCost(implicit system: ExtendedActorSystem) extends Extens .orElse(readConfigVarFromFilesystem(k8sSettings.namespacePath, "namespace")) .getOrElse("default") } - } yield Props(classOf[PodDeletionCostAnnotator], k8sSettings, apiToken, podNamespace, costSettings) + httpsContext <- Future(clientHttpsConnectionContext()) + } yield { + val kubernetesApi = new KubernetesApiImpl(system, k8sSettings, podNamespace, apiToken, httpsContext) + val crName = + if (k8sSettings.customResourceSettings.enabled) { + val name = + k8sSettings.customResourceSettings.crName.getOrElse(KubernetesApi.makeDNS1039Compatible(system.name)) + log.info( + "Starting PodDeletionCost for podName [{}], [{}] oldest will written to CR [{}].", + k8sSettings.podName, + costSettings.annotatedPodsNr, + name) + Some(name) + } else { + log.info( + "Starting PodDeletionCost for podName [{}], [{}] oldest will be annotated.", + k8sSettings.podName, + costSettings.annotatedPodsNr) + None + } + PodDeletionCostAnnotator.props(k8sSettings, costSettings, kubernetesApi, crName) + } props.foreach(system.systemActorOf(_, "podDeletionCostAnnotator")) } else log.warning("PodDeletionCost extension already initiated, yet start() method was called again. Ignoring.") } /** - * This uses blocking IO, and so should only be used to read configuration at startup. + * This uses blocking IO, and so should only be used at startup from blocking dispatcher. + */ + private def clientHttpsConnectionContext(): Option[HttpsConnectionContext] = { + if (k8sSettings.secure) { + val certificates = PemManagersProvider.loadCertificates(k8sSettings.apiCaPath) + val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) + val keyStore = KeyStore.getInstance("PKCS12") + keyStore.load(null) + factory.init(keyStore, Array.empty) + val km: Array[KeyManager] = factory.getKeyManagers + val tm: Array[TrustManager] = + PemManagersProvider.buildTrustManagers(certificates) + val random: SecureRandom = new SecureRandom + val sslContext = SSLContext.getInstance("TLSv1.2") + sslContext.init(km, tm, random) + Some(ConnectionContext.httpsClient(sslContext)) + } else + None + } + + /** + * This uses blocking IO, and so should only be used to read configuration at startup from blocking dispatcher. */ private def readConfigVarFromFilesystem(path: String, name: String): Option[String] = { val file = Paths.get(path) diff --git a/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/PodDeletionCostAnnotator.scala b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/PodDeletionCostAnnotator.scala index 3c5207d96..105493db7 100644 --- a/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/PodDeletionCostAnnotator.scala +++ b/rolling-update-kubernetes/src/main/scala/akka/rollingupdate/kubernetes/PodDeletionCostAnnotator.scala @@ -4,85 +4,60 @@ package akka.rollingupdate.kubernetes +import java.util.concurrent.ThreadLocalRandom +import java.util.concurrent.TimeUnit + +import scala.collection.immutable +import scala.collection.immutable.SortedSet +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.duration.DurationLong +import scala.concurrent.duration.FiniteDuration +import scala.util.control.NonFatal + +import akka.Done import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.ActorSystem +import akka.actor.Props +import akka.actor.Status import akka.actor.Timers import akka.annotation.InternalApi import akka.cluster.Cluster import akka.cluster.ClusterEvent import akka.cluster.Member +import akka.cluster.UniqueAddress import akka.event.Logging.InfoLevel import akka.event.Logging.WarningLevel -import akka.http.scaladsl.ConnectionContext -import akka.http.scaladsl.Http -import akka.http.scaladsl.HttpsConnectionContext -import akka.http.scaladsl.model.StatusCodes.ClientError -import akka.http.scaladsl.model._ import akka.pattern.pipe -import akka.pki.kubernetes.PemManagersProvider import akka.rollingupdate.OlderCostsMore -import akka.rollingupdate.kubernetes.PodDeletionCostAnnotator.GiveUp -import akka.rollingupdate.kubernetes.PodDeletionCostAnnotator.PodAnnotated -import akka.rollingupdate.kubernetes.PodDeletionCostAnnotator.RetryAnnotate -import akka.rollingupdate.kubernetes.PodDeletionCostAnnotator.RetryTimerId -import akka.rollingupdate.kubernetes.PodDeletionCostAnnotator.ScheduleRetry -import akka.rollingupdate.kubernetes.PodDeletionCostAnnotator.toResult import com.typesafe.config.Config -import java.security.KeyStore -import java.security.SecureRandom -import java.util.concurrent.TimeUnit -import javax.net.ssl.KeyManager -import javax.net.ssl.KeyManagerFactory -import javax.net.ssl.SSLContext -import javax.net.ssl.TrustManager -import scala.collection.immutable -import scala.collection.immutable.SortedSet -import scala.concurrent.ExecutionContextExecutor -import scala.concurrent.Future -import scala.concurrent.duration.DurationLong -import scala.concurrent.duration.FiniteDuration -import scala.util.control.NonFatal - /** * INTERNAL API * - * Actor responsible to annotate the hosting pod with the pod-deletion-cost. + * Actor responsible to annotate the hosting pod with the pod-deletion-cost or + * update the PodCost CR depending on configuration. * It will automatically retry upon a fixed-configurable delay if the annotation fails. */ @InternalApi private[kubernetes] final class PodDeletionCostAnnotator( settings: KubernetesSettings, - apiToken: String, - podNamespace: String, - costSettings: PodDeletionCostSettings) + costSettings: PodDeletionCostSettings, + kubernetesApi: KubernetesApi, + crName: Option[String]) extends Actor with ActorLogging with Timers { + import PodDeletionCostAnnotator._ + + private val podName = settings.podName + private val resourceLogDescription = if (crName.isDefined) "PodCost CR" else "pod-deletion-cost annotation" + private val cluster = Cluster(context.system) - private val http = Http()(context.system) Cluster(context.system).subscribe(context.self, classOf[ClusterEvent.MemberUp], classOf[ClusterEvent.MemberRemoved]) - private lazy val sslContext = { - val certificates = PemManagersProvider.loadCertificates(settings.apiCaPath) - val factory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm) - val keyStore = KeyStore.getInstance("PKCS12") - keyStore.load(null) - factory.init(keyStore, Array.empty) - val km: Array[KeyManager] = factory.getKeyManagers - val tm: Array[TrustManager] = - PemManagersProvider.buildTrustManagers(certificates) - val random: SecureRandom = new SecureRandom - val sslContext = SSLContext.getInstance("TLSv1.2") - sslContext.init(km, tm, random) - sslContext - } - private val clientSslContext: Option[HttpsConnectionContext] = - if (settings.secure) Some(ConnectionContext.httpsClient(sslContext)) else None - - implicit val dispatcher: ExecutionContextExecutor = context.system.dispatcher - def receive = idle(0, SortedSet.empty(Member.ageOrdering), 0) + def receive: Receive = idle(0, SortedSet.empty(Member.ageOrdering), 0) private def idle(deletionCost: Int, membersByAgeDesc: SortedSet[Member], retryNr: Int): Receive = { case cs @ ClusterEvent.CurrentClusterState(members, _, _, _, _) => @@ -98,8 +73,8 @@ import scala.util.control.NonFatal updateIfNewCost(deletionCost, membersByAgeDesc - m, retryNr) case PodAnnotated => - log.debug("Annotation updated successfully to {}", deletionCost) - // cancelling an eventual retry in case the annotation succeeded in the meantime + log.debug("{} updated successfully to [{}]", resourceLogDescription, deletionCost) + // cancelling an eventual retry in case the operation succeeded in the meantime timers.cancel(RetryTimerId) context.become(idle(deletionCost, membersByAgeDesc, 0)) @@ -107,17 +82,31 @@ import scala.util.control.NonFatal val ll = if (retryNr < 3) InfoLevel else WarningLevel log.log( ll, - s"Failed to update annotation: [$ex]. Scheduled retry with fixed delay of ${costSettings.retryDelay}, retry number $retryNr.") + s"Failed to update $resourceLogDescription: [$ex]. Scheduled retry with fixed delay of ${costSettings.retryDelay}, retry number $retryNr.") - timers.startSingleTimer(RetryTimerId, RetryAnnotate, costSettings.retryDelay) + val retryDelay = + if (crName.isDefined) + // add some random delay to minimize risk of conflicts + costSettings.retryDelay + (costSettings.retryDelay * ThreadLocalRandom.current().nextDouble(0.1)) + .asInstanceOf[FiniteDuration] + else + costSettings.retryDelay + timers.startSingleTimer(RetryTimerId, RetryAnnotate, retryDelay) context.become(underRetryBackoff(membersByAgeDesc, retryNr)) case GiveUp(er: String) => log.error( - "There was a client error when trying to set pod-deletion-cost annotation. " + + "There was a client error when trying to set {}. " + "Not retrying, check configuration. Error: {}", + resourceLogDescription, er) + case Status.Failure(exc) => + throw new IllegalStateException( + "Unexpected failure, Future failure should have been recovered " + + "to message before pipeTo self. This is a bug.", + exc) + case msg => log.debug("Ignoring message {}", msg) } @@ -133,6 +122,12 @@ import scala.util.control.NonFatal case RetryAnnotate => updateIfNewCost(Int.MinValue, membersByAgeDesc, retryNr + 1) + case Status.Failure(exc) => + throw new IllegalStateException( + "Unexpected failure, Future failure should have been recovered " + + "to message before pipeTo self. This is a bug.", + exc) + case msg => log.debug("Under retry backoff, ignoring message {}", msg) } @@ -149,18 +144,27 @@ import scala.util.control.NonFatal if (newCost != existingCost) { log.info( - "Updating pod-deletion-cost annotation for pod: [{}] with cost: [{}]. Namespace: [{}]", - settings.podName, + "Updating {} for pod: [{}] with cost: [{}]. Namespace: [{}]", + resourceLogDescription, + podName, newCost, - podNamespace + kubernetesApi.namespace ) - val request = ApiRequests.podDeletionCost(settings, apiToken, podNamespace, newCost) - val response = - clientSslContext.map(http.singleRequest(request, _)).getOrElse(http.singleRequest(request)) - toResult(response)(context.system).pipeTo(self) + implicit val dispatcher: ExecutionContext = context.system.dispatcher + updatePodCost( + kubernetesApi, + crName, + podName, + newCost, + cluster.selfUniqueAddress, + membersByAgeDesc, + settings.customResourceSettings.cleanupAfter)(context.system).pipeTo(self) + context.become(idle(newCost, membersByAgeDesc, retryNr)) - } else context.become(idle(existingCost, membersByAgeDesc, retryNr)) + } else { + context.become(idle(existingCost, membersByAgeDesc, retryNr)) + } } } @@ -197,22 +201,73 @@ import scala.util.control.NonFatal case class ScheduleRetry(cause: String) extends RequestResult case class GiveUp(cause: String) extends RequestResult - private def toResult(futResponse: Future[HttpResponse])(implicit system: ActorSystem): Future[RequestResult] = { + def props( + settings: KubernetesSettings, + costSettings: PodDeletionCostSettings, + kubernetesApi: KubernetesApi, + crName: Option[String] + ): Props = + Props(new PodDeletionCostAnnotator(settings, costSettings, kubernetesApi, crName)) + + private def updatePodCost( + kubernetesApi: KubernetesApi, + crNameOpt: Option[String], + podName: String, + newCost: Int, + selfUniqueAddress: UniqueAddress, + membersByAgeDesc: immutable.SortedSet[Member], + cleanupAfter: FiniteDuration)(implicit system: ActorSystem): Future[RequestResult] = { + import system.dispatcher + crNameOpt match { + case Some(crName) => + val response = + kubernetesApi.readOrCreatePodCostResource(crName).flatMap { cr => + val now = System.currentTimeMillis() + val newPodCost = + PodCost(podName, newCost, selfUniqueAddress.address.toString, selfUniqueAddress.longUid, now) + val newPods = cr.pods.filterNot { podCost => + // remove entry that is to be added for this podName + podCost.podName == podName || + // remove entries that don't exist in the cluster membership any more + (podCost.uniqueAddress.address.system == selfUniqueAddress.address.system && // only same cluster + now - podCost.time > cleanupAfter.toMillis && // in case new member hasn't been seen yet + !membersByAgeDesc.exists(_.uniqueAddress == podCost.uniqueAddress) // removed, not in cluster membership + ) + } :+ newPodCost + kubernetesApi.updatePodCostResource(crName, cr.version, newPods) + } + updatePodCostResourceResult(response) + case None => + val response = kubernetesApi.updatePodDeletionCostAnnotation(podName, newCost) + updatePodDeletionCostAnnotationResult(response) + } + } + + private def updatePodDeletionCostAnnotationResult(futResponse: Future[Done])( + implicit system: ActorSystem): Future[RequestResult] = { import system.dispatcher futResponse .map { - case HttpResponse(status, _, e, _) if status.isSuccess() => - e.discardBytes() - PodAnnotated - case HttpResponse(s @ ClientError(_), _, e, _) => - e.discardBytes() - GiveUp(s.toString()) - case HttpResponse(status, _, e, _) => - e.discardBytes() - ScheduleRetry(s"Request failed with status=$status") + case Done => PodAnnotated } .recover { - case NonFatal(e) => ScheduleRetry(e.getMessage) + case e: PodCostClientException => GiveUp(e.getMessage) + case NonFatal(e) => ScheduleRetry(e.getMessage) } } + + private def updatePodCostResourceResult(futResponse: Future[Either[PodCostResource, PodCostResource]])( + implicit system: ActorSystem): Future[RequestResult] = { + import system.dispatcher + futResponse + .map { + case Right(_) => PodAnnotated + case Left(_) => ScheduleRetry("Request failed with conflict") + } + .recover { + case e: PodCostClientException => GiveUp(e.getMessage) + case NonFatal(e) => ScheduleRetry(e.getMessage) + } + + } } diff --git a/rolling-update-kubernetes/src/test/scala/akka/rollingupdate/kubernetes/PodDeletionCostAnnotatorCrSpec.scala b/rolling-update-kubernetes/src/test/scala/akka/rollingupdate/kubernetes/PodDeletionCostAnnotatorCrSpec.scala new file mode 100644 index 000000000..318b7ceb3 --- /dev/null +++ b/rolling-update-kubernetes/src/test/scala/akka/rollingupdate/kubernetes/PodDeletionCostAnnotatorCrSpec.scala @@ -0,0 +1,307 @@ +/* + * Copyright (C) 2017-2023 Lightbend Inc. + */ + +package akka.rollingupdate.kubernetes + +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration._ + +import akka.Done +import akka.actor.ActorSystem +import akka.actor.Address +import akka.cluster.Cluster +import akka.cluster.ClusterEvent.MemberUp +import akka.cluster.Member +import akka.cluster.MemberStatus +import akka.cluster.UniqueAddress +import akka.testkit.EventFilter +import akka.testkit.ImplicitSender +import akka.testkit.TestKit +import akka.testkit.TestProbe +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfterAll +import org.scalatest.BeforeAndAfterEach +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.Millis +import org.scalatest.time.Seconds +import org.scalatest.time.Span +import org.scalatest.wordspec.AnyWordSpecLike + +object PodDeletionCostAnnotatorCrSpec { + val config = ConfigFactory.parseString(""" + akka.loggers = ["akka.testkit.TestEventListener"] + akka.actor.provider = cluster + akka.rollingupdate.kubernetes.pod-deletion-cost.retry-delay = 1s + + akka.remote.artery.canonical.port = 0 + akka.remote.artery.canonical.hostname = 127.0.0.1 + + akka.cluster.jmx.multi-mbeans-in-same-jvm = on + akka.coordinated-shutdown.terminate-actor-system = off + akka.coordinated-shutdown.run-by-actor-system-terminate = off + akka.test.filter-leeway = 10s + """) + + private[akka] trait TestCallCount { + val callCount = new AtomicInteger() + + def getCallCount(): Int = callCount.get() + } + + private[akka] class TestKubernetesApi extends KubernetesApi { + private var version = 1 + private var podCosts = Vector.empty[PodCost] + + override def namespace: String = "namespace-test" + + override def updatePodDeletionCostAnnotation(podName: String, cost: Int): Future[Done] = + Future.successful(Done) + + override def readOrCreatePodCostResource(crName: String): Future[PodCostResource] = this.synchronized { + Future.successful(PodCostResource(version.toString, podCosts)) + } + + override def updatePodCostResource( + crName: String, + v: String, + pods: immutable.Seq[PodCost]): Future[Either[PodCostResource, PodCostResource]] = this.synchronized { + + podCosts = pods.toVector + version = v.toInt + 1 + + Future.successful(Right(PodCostResource(version.toString, podCosts))) + } + + def getPodCosts(): Vector[PodCost] = this.synchronized { + podCosts + } + } +} + +class PodDeletionCostAnnotatorCrSpec + extends TestKit( + ActorSystem( + "PodDeletionCostAnnotatorCrSpec", + PodDeletionCostAnnotatorCrSpec.config + )) + with ImplicitSender + with AnyWordSpecLike + with Matchers + with BeforeAndAfterAll + with BeforeAndAfterEach + with Eventually { + import PodDeletionCostAnnotatorCrSpec._ + + private val namespace = "namespace-test" + private val podName1 = "pod-test-1" + private val podName2 = "pod-test-2" + private lazy val system2 = ActorSystem(system.name, system.settings.config) + + private def settings(podName: String) = { + new KubernetesSettings( + apiCaPath = "", + apiTokenPath = "", + apiServiceHost = "localhost", + apiServicePort = 0, + namespace = Some(namespace), + namespacePath = "", + podName = podName, + secure = false, + apiServiceRequestTimeout = 2.seconds, + customResourceSettings = new CustomResourceSettings(enabled = false, crName = None, 60.seconds) + ) + } + + private def annotatorProps(pod: String, kubernetesApi: KubernetesApi) = + PodDeletionCostAnnotator.props( + settings(pod), + PodDeletionCostSettings(system.settings.config.getConfig("akka.rollingupdate.kubernetes")), + kubernetesApi, + crName = Some("poddeletioncostannotatorcrspec") + ) + + override implicit val patienceConfig: PatienceConfig = + PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis)) + + override protected def afterAll(): Unit = { + super.shutdown() + TestKit.shutdownActorSystem(system2) + } + + "The pod-deletion-cost annotator, when under normal behavior" should { + + "have a single node cluster running first" in { + val probe = TestProbe() + Cluster(system).join(Cluster(system).selfMember.address) + probe.awaitAssert({ + Cluster(system).selfMember.status == MemberStatus.Up + }, 3.seconds) + } + + "correctly add the cluster node to the PodCost resource" in { + val kubernetesApi = new TestKubernetesApi + expectLogInfo(pattern = ".*Updating PodCost CR.*") { + system.actorOf(annotatorProps(podName1, kubernetesApi)) + } + eventually { + val podCosts = kubernetesApi.getPodCosts() + podCosts.size shouldBe 1 + podCosts.head.podName shouldBe podName1 + podCosts.head.cost shouldBe 10000 + } + } + + "give up when failing with non-transient error" in { + // e.g. return code 404 + val kubernetesApi = new TestKubernetesApi { + override def readOrCreatePodCostResource(crName: String): Future[PodCostResource] = { + Future.failed(new PodCostClientException("test 404")) + } + } + + expectLogError(pattern = ".*Not retrying, check configuration.*") { + system.actorOf(annotatorProps(podName1, kubernetesApi)) + } + } + + "retry when failing with transient error" in { + val kubernetesApi = new TestKubernetesApi with TestCallCount { + + override def updatePodCostResource( + crName: String, + v: String, + pods: immutable.Seq[PodCost]): Future[Either[PodCostResource, PodCostResource]] = { + // first call fails + if (callCount.incrementAndGet() == 1) + Future.failed(new PodCostException("test 500")) + else + super.updatePodCostResource(crName, v, pods) + } + } + + system.actorOf(annotatorProps(podName1, kubernetesApi)) + eventually { + kubernetesApi.getCallCount() shouldBe 2 + val podCosts = kubernetesApi.getPodCosts() + podCosts.size shouldBe 1 + podCosts.head.podName shouldBe podName1 + podCosts.head.cost shouldBe 10000 + } + } + + "retry when conflicting update" in { + val kubernetesApi = new TestKubernetesApi with TestCallCount { + + override def updatePodCostResource( + crName: String, + v: String, + pods: immutable.Seq[PodCost]): Future[Either[PodCostResource, PodCostResource]] = this.synchronized { + // conflict for first call + if (callCount.incrementAndGet() == 1) + readOrCreatePodCostResource(crName).map { existing => + Left(existing) + }(system.dispatcher) + else + super.updatePodCostResource(crName, v, pods) + } + } + + system.actorOf(annotatorProps(podName1, kubernetesApi)) + eventually { + kubernetesApi.getCallCount() shouldBe 2 + val podCosts = kubernetesApi.getPodCosts() + podCosts.size shouldBe 1 + podCosts.head.podName shouldBe podName1 + podCosts.head.cost shouldBe 10000 + } + } + + "annotate a second node correctly" in { + val kubernetesApi = new TestKubernetesApi + + val probe = TestProbe() + Cluster(system2).join(Cluster(system).selfMember.address) + probe.awaitAssert({ + Cluster(system2).selfMember.status == MemberStatus.Up + }, 3.seconds) + + system2.actorOf(annotatorProps(podName2, kubernetesApi)) + eventually { + val podCosts = kubernetesApi.getPodCosts().sortBy(_.cost) + podCosts.head.podName shouldBe podName2 + podCosts.head.cost shouldBe 9900 + } + } + + } + + "The pod-deletion-cost annotator, when under retry backoff" should { + + "have a single node cluster running first" in { + val probe = TestProbe() + Cluster(system).join(Cluster(system).selfMember.address) + probe.awaitAssert({ + Cluster(system).selfMember.status == MemberStatus.Up + }, 3.seconds) + } + + "not annotate until backoff delay expires" in { + val kubernetesApi = new TestKubernetesApi with TestCallCount { + + override def updatePodCostResource( + crName: String, + v: String, + pods: immutable.Seq[PodCost]): Future[Either[PodCostResource, PodCostResource]] = { + // first 3 call fails + if (callCount.incrementAndGet() <= 3) + Future.failed(new PodCostException("test 500")) + else + super.updatePodCostResource(crName, v, pods) + } + } + + val underTest = expectLogInfo(".*Failed to update PodCost CR:.*") { + system.actorOf(annotatorProps(podName1, kubernetesApi)) + } + + val dummyNewMember = + MemberUp( + Member( + UniqueAddress(Address("akka", system.name, Cluster(system).selfAddress.host.get, 2553), 2L), + Cluster(system).selfRoles, + Cluster(system).selfMember.appVersion + ).copyUp(upNumber = 2)) + underTest ! dummyNewMember + underTest ! dummyNewMember + underTest ! dummyNewMember + + // no other interactions should have occurred while on backoff regardless of updates to the cluster + kubernetesApi.getCallCount() shouldBe 1 + Thread.sleep(100) + kubernetesApi.getCallCount() shouldBe 1 + + eventually { + kubernetesApi.getCallCount() shouldBe 4 + val podCosts = kubernetesApi.getPodCosts() + podCosts.size shouldBe 1 // dummyNewMember is not added by pod1 + podCosts.head.podName shouldBe podName1 + podCosts.head.cost shouldBe 10000 + } + } + } + + def expectLogInfo[T](pattern: String = null)(block: => T): T = + EventFilter.info(pattern = pattern, occurrences = 1).intercept(block)(system) + + def expectLogError[T](pattern: String = null, occurrences: Int = 1)(block: => T): T = + EventFilter.error(pattern = pattern, occurrences = occurrences).intercept(block)(system) + + def expectLogWarning[T](pattern: String = null, occurrences: Int = 1)(block: => T): T = + EventFilter.warning(pattern = pattern, occurrences = occurrences).intercept(block)(system) + +} diff --git a/rolling-update-kubernetes/src/test/scala/akka/rollingupdate/kubernetes/PodDeletionCostAnnotatorSpec.scala b/rolling-update-kubernetes/src/test/scala/akka/rollingupdate/kubernetes/PodDeletionCostAnnotatorSpec.scala index 23a7a87e2..4d40b57a2 100644 --- a/rolling-update-kubernetes/src/test/scala/akka/rollingupdate/kubernetes/PodDeletionCostAnnotatorSpec.scala +++ b/rolling-update-kubernetes/src/test/scala/akka/rollingupdate/kubernetes/PodDeletionCostAnnotatorSpec.scala @@ -4,20 +4,20 @@ package akka.rollingupdate.kubernetes +import scala.collection.JavaConverters._ +import scala.concurrent.duration._ + import akka.actor.ActorSystem import akka.actor.Address -import akka.actor.Props import akka.cluster.Cluster import akka.cluster.ClusterEvent.MemberUp import akka.cluster.Member import akka.cluster.MemberStatus -import akka.cluster.MemberStatus.Up import akka.cluster.UniqueAddress import akka.testkit.EventFilter import akka.testkit.ImplicitSender import akka.testkit.TestKit import akka.testkit.TestProbe -import akka.util.Version import com.github.tomakehurst.wiremock.WireMockServer import com.github.tomakehurst.wiremock.client.MappingBuilder import com.github.tomakehurst.wiremock.client.WireMock @@ -40,9 +40,6 @@ import org.scalatest.time.Seconds import org.scalatest.time.Span import org.scalatest.wordspec.AnyWordSpecLike -import scala.collection.JavaConverters._ -import scala.concurrent.duration._ - object PodDeletionCostAnnotatorSpec { val config = ConfigFactory.parseString(""" akka.loggers = ["akka.testkit.TestEventListener"] @@ -62,7 +59,7 @@ object PodDeletionCostAnnotatorSpec { class PodDeletionCostAnnotatorSpec extends TestKit( ActorSystem( - "MySpec", + "PodDeletionCostAnnotatorSpec", PodDeletionCostAnnotatorSpec.config )) with ImplicitSender @@ -79,7 +76,7 @@ class PodDeletionCostAnnotatorSpec private val namespace = "namespace-test" private val podName1 = "pod-test-1" private val podName2 = "pod-test-2" - private lazy val system2 = ActorSystem("MySpec", PodDeletionCostAnnotatorSpec.config) + private lazy val system2 = ActorSystem(system.name, system.settings.config) private def settings(podName: String) = { new KubernetesSettings( @@ -90,16 +87,27 @@ class PodDeletionCostAnnotatorSpec namespace = Some(namespace), namespacePath = "", podName = podName, - secure = false) + secure = false, + apiServiceRequestTimeout = 2.seconds, + customResourceSettings = new CustomResourceSettings(enabled = false, crName = None, 60.seconds) + ) } - private def annotatorProps(pod: String) = Props( - classOf[PodDeletionCostAnnotator], - settings(pod), - "apiToken", - namespace, - PodDeletionCostSettings(system.settings.config.getConfig("akka.rollingupdate.kubernetes")) - ) + private val kubernetesApi = + new KubernetesApiImpl( + system, + settings(podName1), + namespace, + apiToken = "apiToken", + clientHttpsConnectionContext = None) + + private def annotatorProps(pod: String) = + PodDeletionCostAnnotator.props( + settings(pod), + PodDeletionCostSettings(system.settings.config.getConfig("akka.rollingupdate.kubernetes")), + kubernetesApi, + crName = None + ) override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = Span(5, Seconds), interval = Span(100, Millis)) @@ -232,13 +240,18 @@ class PodDeletionCostAnnotatorSpec assertState(scenarioName, "FAILING") - val underTest = expectLogWarning(".*Failed to update annotation:.*") { + val underTest = expectLogWarning(".*Failed to update pod-deletion-cost annotation:.*") { system.actorOf(annotatorProps(podName1)) } wireMockServer.resetRequests() val dummyNewMember = - MemberUp(Member(UniqueAddress(Address("", ""), 2L), Set("dc-default"), Version("v1")).copy(Up)) + MemberUp( + Member( + UniqueAddress(Address("akka", system.name, Cluster(system).selfAddress.host.get, 2553), 2L), + Cluster(system).selfRoles, + Cluster(system).selfMember.appVersion + ).copyUp(upNumber = 2)) underTest ! dummyNewMember underTest ! dummyNewMember underTest ! dummyNewMember