Skip to content

Commit

Permalink
Statefulset support: displaying pods, logs, describe, restart (#883)
Browse files Browse the repository at this point in the history
  • Loading branch information
laszlocph authored Nov 17, 2024
1 parent e61a643 commit 2cc48f5
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 56 deletions.
30 changes: 25 additions & 5 deletions cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,22 @@ func podLogs(
messages chan *streaming.WSMessage,
runningLogStreams *runningLogStreams,
) {
var matchLabels map[string]string
deployment, err := kubeEnv.Client.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, meta_v1.GetOptions{})
if err != nil {
logrus.Errorf("could not get deployments: %v", err)
return
if strings.Contains(err.Error(), "not found") {
statefulset, err := kubeEnv.Client.AppsV1().StatefulSets(namespace).Get(context.TODO(), deploymentName, meta_v1.GetOptions{})
if err != nil {
logrus.Errorf("could not get statefulset: %v", err)
return
}
matchLabels = statefulset.Spec.Selector.MatchLabels
} else {
logrus.Errorf("could not get deployments: %v", err)
return
}
} else {
matchLabels = deployment.Spec.Selector.MatchLabels
}

podsInNamespace, err := kubeEnv.Client.CoreV1().Pods(namespace).List(context.TODO(), meta_v1.ListOptions{})
Expand All @@ -415,7 +427,7 @@ func podLogs(
}

for _, pod := range podsInNamespace.Items {
if labelsMatchSelectors(pod.ObjectMeta.Labels, deployment.Spec.Selector.MatchLabels) {
if labelsMatchSelectors(pod.ObjectMeta.Labels, matchLabels) {
containers := agent.PodContainers(pod.Spec)
for _, container := range containers {
go streamPodLogs(kubeEnv, namespace, pod.Name, container.Name, deploymentName, messages, runningLogStreams)
Expand Down Expand Up @@ -568,8 +580,16 @@ func restartDeployment(kubeEnv *agent.KubeEnv, namespace, name string) {
data := fmt.Sprintf(`{"spec": {"template": {"metadata": {"annotations": {"kubectl.kubernetes.io/restartedAt": "%s"}}}}}`, time.Now().Format(time.RFC3339))
_, err := kubeEnv.Client.AppsV1().Deployments(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, []byte(data), meta_v1.PatchOptions{})
if err != nil {
logrus.Errorf("could not patch deployment %s in %s: %s", name, namespace, err)
return
if strings.Contains(err.Error(), "not found") {
_, err := kubeEnv.Client.AppsV1().StatefulSets(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, []byte(data), meta_v1.PatchOptions{})
if err != nil {
logrus.Errorf("could not patch statefulset: %v", err)
return
}
} else {
logrus.Errorf("could not patch deployment %s in %s: %s", name, namespace, err)
return
}
}
}

Expand Down
38 changes: 32 additions & 6 deletions pkg/agent/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ func (e *KubeEnv) Services(repo string) ([]*api.Stack, error) {
}
e.Perf.WithLabelValues("gimlet_agent_deployments").Observe(float64(time.Since(t0).Seconds()))

t0 = time.Now()
s, err := e.Client.AppsV1().StatefulSets(e.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("could not get statefulsets: %s", err)
}
e.Perf.WithLabelValues("gimlet_agent_statefulsets").Observe(float64(time.Since(t0).Seconds()))

t0 = time.Now()
i, err := e.Client.NetworkingV1().Ingresses(e.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
Expand All @@ -87,11 +94,10 @@ func (e *KubeEnv) Services(repo string) ([]*api.Stack, error) {
t0 = time.Now()
var stacks []*api.Stack
for _, service := range annotatedServices {
deployment, err := e.deploymentForService(service, d.Items)
if err != nil {
return nil, fmt.Errorf("could not get deployment for service: %s", err)
deployment := e.deploymentForService(service, d.Items)
if deployment == nil {
deployment = e.statefulsetForService(service, s.Items)
}

if deployment != nil {
deployment.Pods = []*api.Pod{}
for _, pod := range pods.Items {
Expand Down Expand Up @@ -272,7 +278,7 @@ func (e *KubeEnv) annotatedServices(repo string) ([]v1.Service, error) {
return services, nil
}

func (e *KubeEnv) deploymentForService(service v1.Service, deployments []appsv1.Deployment) (*api.Deployment, error) {
func (e *KubeEnv) deploymentForService(service v1.Service, deployments []appsv1.Deployment) *api.Deployment {
var deployment *api.Deployment

for _, d := range deployments {
Expand All @@ -289,7 +295,27 @@ func (e *KubeEnv) deploymentForService(service v1.Service, deployments []appsv1.
}
}

return deployment, nil
return deployment
}

func (e *KubeEnv) statefulsetForService(service v1.Service, statefulsets []appsv1.StatefulSet) *api.Deployment {
var statefulset *api.Deployment

for _, s := range statefulsets {
if SelectorsMatch(s.Spec.Selector.MatchLabels, service.Spec.Selector) {
var branch, sha string
if hash, ok := s.GetAnnotations()[AnnotationGitSha]; ok {
sha = hash
}
if b, ok := s.GetAnnotations()[AnnotationGitBranch]; ok {
branch = b
}

statefulset = &api.Deployment{Name: s.Name, Namespace: s.Namespace, Branch: branch, SHA: sha}
}
}

return statefulset
}

func logs(e *KubeEnv, pod v1.Pod) string {
Expand Down
149 changes: 107 additions & 42 deletions pkg/agent/podController.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/gimlet-io/gimlet/pkg/dashboard/api"
v1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"
)
Expand Down Expand Up @@ -33,26 +34,38 @@ func PodController(kubeEnv *KubeEnv, gimletHost string, agentKey string) *Contro
return err
}

allStatefulsets, err := kubeEnv.Client.AppsV1().StatefulSets(kubeEnv.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return err
}

createdPod := obj.(*v1.Pod)
for _, svc := range integratedServices {
for _, deployment := range allDeployments.Items {
if SelectorsMatch(deployment.Spec.Selector.MatchLabels, svc.Spec.Selector) {
if HasLabels(deployment.Spec.Selector.MatchLabels, createdPod.GetObjectMeta().GetLabels()) &&
createdPod.Namespace == deployment.Namespace {
update := &api.StackUpdate{
Event: EventPodCreated,
Env: kubeEnv.Name,
Repo: svc.GetAnnotations()[AnnotationGitRepository],
Subject: objectMeta.Namespace + "/" + objectMeta.Name,
Svc: svc.Namespace + "/" + svc.Name,

Status: string(createdPod.Status.Phase),
Deployment: deployment.Namespace + "/" + deployment.Name,
ImChannelId: svc.GetAnnotations()[AnnotationOwnerIm],
}
sendUpdate(gimletHost, agentKey, kubeEnv.Name, update)
}
}
matchAndSendCreatedEvent(
deployment.Spec.Selector.MatchLabels,
deployment.Namespace,
deployment.Name,
svc,
createdPod,
kubeEnv,
objectMeta,
gimletHost,
agentKey,
)
}
for _, statefulset := range allStatefulsets.Items {
matchAndSendCreatedEvent(
statefulset.Spec.Selector.MatchLabels,
statefulset.Namespace,
statefulset.Name,
svc,
createdPod,
kubeEnv,
objectMeta,
gimletHost,
agentKey,
)
}
}
case "update":
Expand All @@ -66,38 +79,42 @@ func PodController(kubeEnv *KubeEnv, gimletHost string, agentKey string) *Contro
return err
}

allStatefulsets, err := kubeEnv.Client.AppsV1().StatefulSets(kubeEnv.Namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return err
}

if obj == nil {
return nil
}

updatedPod := obj.(*v1.Pod)
for _, svc := range integratedServices {
for _, deployment := range allDeployments.Items {
if SelectorsMatch(deployment.Spec.Selector.MatchLabels, svc.Spec.Selector) {
if HasLabels(deployment.Spec.Selector.MatchLabels, updatedPod.GetObjectMeta().GetLabels()) &&
updatedPod.Namespace == deployment.Namespace {
podStatus := podStatus(*updatedPod)
podLogs := ""
if "CrashLoopBackOff" == podStatus {
podLogs = logs(kubeEnv, *updatedPod)
}

update := &api.StackUpdate{
Event: EventPodUpdated,
Env: kubeEnv.Name,
Repo: svc.GetAnnotations()[AnnotationGitRepository],
Subject: objectMeta.Namespace + "/" + objectMeta.Name,
Svc: svc.Namespace + "/" + svc.Name,

Status: podStatus,
Deployment: deployment.Namespace + "/" + deployment.Name,
ErrorCause: podErrorCause(*updatedPod),
Logs: podLogs,
ImChannelId: svc.GetAnnotations()[AnnotationOwnerIm],
}
sendUpdate(gimletHost, agentKey, kubeEnv.Name, update)
}
}
newFunction(
deployment.Spec.Selector.MatchLabels,
deployment.Namespace,
deployment.Name,
svc,
updatedPod,
kubeEnv,
objectMeta,
gimletHost,
agentKey,
)
}
for _, statefulset := range allStatefulsets.Items {
newFunction(
statefulset.Spec.Selector.MatchLabels,
statefulset.Namespace,
statefulset.Name,
svc,
updatedPod,
kubeEnv,
objectMeta,
gimletHost,
agentKey,
)
}
}
case "delete":
Expand All @@ -113,6 +130,54 @@ func PodController(kubeEnv *KubeEnv, gimletHost string, agentKey string) *Contro
return podController
}

func newFunction(matchLabels map[string]string, namespace string, name string, svc v1.Service, updatedPod *v1.Pod, kubeEnv *KubeEnv, objectMeta metav1.ObjectMeta, gimletHost string, agentKey string) {
if SelectorsMatch(matchLabels, svc.Spec.Selector) {
if HasLabels(matchLabels, updatedPod.GetObjectMeta().GetLabels()) &&
updatedPod.Namespace == namespace {
podStatus := podStatus(*updatedPod)
podLogs := ""
if "CrashLoopBackOff" == podStatus {
podLogs = logs(kubeEnv, *updatedPod)
}

update := &api.StackUpdate{
Event: EventPodUpdated,
Env: kubeEnv.Name,
Repo: svc.GetAnnotations()[AnnotationGitRepository],
Subject: objectMeta.Namespace + "/" + objectMeta.Name,
Svc: svc.Namespace + "/" + svc.Name,

Status: podStatus,
Deployment: namespace + "/" + name,
ErrorCause: podErrorCause(*updatedPod),
Logs: podLogs,
ImChannelId: svc.GetAnnotations()[AnnotationOwnerIm],
}
sendUpdate(gimletHost, agentKey, kubeEnv.Name, update)
}
}
}

func matchAndSendCreatedEvent(matchLabels map[string]string, namespace string, name string, svc v1.Service, createdPod *v1.Pod, kubeEnv *KubeEnv, objectMeta metav1.ObjectMeta, gimletHost string, agentKey string) {
if SelectorsMatch(matchLabels, svc.Spec.Selector) {
if HasLabels(matchLabels, createdPod.GetObjectMeta().GetLabels()) &&
createdPod.Namespace == namespace {
update := &api.StackUpdate{
Event: EventPodCreated,
Env: kubeEnv.Name,
Repo: svc.GetAnnotations()[AnnotationGitRepository],
Subject: objectMeta.Namespace + "/" + objectMeta.Name,
Svc: svc.Namespace + "/" + svc.Name,

Status: string(createdPod.Status.Phase),
Deployment: namespace + "/" + name,
ImChannelId: svc.GetAnnotations()[AnnotationOwnerIm],
}
sendUpdate(gimletHost, agentKey, kubeEnv.Name, update)
}
}
}

// hasLabels determines if all the selectors are present as labels
func HasLabels(selector map[string]string, labels map[string]string) bool {
for selectorLabel, selectorValue := range selector {
Expand Down
4 changes: 2 additions & 2 deletions web/src/components/serviceDetail/serviceDetail.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ function ServiceDetail(props) {

let hostPort = "<host-port>"
let appPort = "<app-port>"
if (config) {
if (config && config.values) {
appPort = config.values.containerPort ?? 80;

if (appPort < 99) {
Expand Down Expand Up @@ -258,7 +258,7 @@ function ServiceDetail(props) {
</Menu>
</div>
</h3>
{deployment && config && <DeployIndicator deploy={config.values.deploy} owner={owner} repo={repoName} branch={deployment.branch} />}
{deployment && config && <DeployIndicator deploy={config.values && config.values.deploy} owner={owner} repo={repoName} branch={deployment.branch} />}
{pullRequests && pullRequests.length !== 0 &&
<PullRequests items={pullRequests} />
}
Expand Down
2 changes: 1 addition & 1 deletion web/src/components/serviceDetail/simpleServiceDetail.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ function SimpleServiceDetail(props) {

let hostPort = "<host-port>"
let appPort = "<app-port>"
if (config) {
if (config && config.values) {
appPort = config.values.containerPort ?? 80;

if (appPort < 99) {
Expand Down

0 comments on commit 2cc48f5

Please sign in to comment.