Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: better reporting of airbyte chart errors #112

Merged
merged 6 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions internal/cmd/local/k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"strings"
"time"

v1 "k8s.io/api/apps/v1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -28,7 +28,7 @@ var DefaultPersistentVolumeSize = resource.MustParse("500Mi")
// Client primarily for testing purposes
type Client interface {
// DeploymentList returns a list of all the services within the namespace
DeploymentList(ctx context.Context, namespace string) (*v1.DeploymentList, error)
DeploymentList(ctx context.Context, namespace string) (*appsv1.DeploymentList, error)
// DeploymentRestart will force a restart of the deployment name in the provided namespace.
// This is a blocking call, it should only return once the deployment has completed.
DeploymentRestart(ctx context.Context, namespace, name string) error
Expand Down Expand Up @@ -76,6 +76,7 @@ type Client interface {

LogsGet(ctx context.Context, namespace string, name string) (string, error)
StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error)
PodList(ctx context.Context, namespace string) (*corev1.PodList, error)
}

var _ Client = (*DefaultK8sClient)(nil)
Expand All @@ -85,7 +86,7 @@ type DefaultK8sClient struct {
ClientSet kubernetes.Interface
}

func (d *DefaultK8sClient) DeploymentList(ctx context.Context, namespace string) (*v1.DeploymentList, error) {
func (d *DefaultK8sClient) DeploymentList(ctx context.Context, namespace string) (*appsv1.DeploymentList, error) {
return d.ClientSet.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{})
}

Expand Down Expand Up @@ -329,8 +330,12 @@ func (d *DefaultK8sClient) LogsGet(ctx context.Context, namespace string, name s

func (d *DefaultK8sClient) StreamPodLogs(ctx context.Context, namespace string, podName string, since time.Time) (io.ReadCloser, error) {
req := d.ClientSet.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
Follow: true,
Follow: true,
SinceTime: &metav1.Time{Time: since},
})
return req.Stream(ctx)
}

func (d *DefaultK8sClient) PodList(ctx context.Context, namespace string) (*corev1.PodList, error) {
return d.ClientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
}
8 changes: 8 additions & 0 deletions internal/cmd/local/k8s/k8stest/k8stest.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type MockClient struct {
FnEventsWatch func(ctx context.Context, namespace string) (watch.Interface, error)
FnLogsGet func(ctx context.Context, namespace string, name string) (string, error)
FnStreamPodLogs func(ctx context.Context, namespace, podName string, since time.Time) (io.ReadCloser, error)
FnPodList func(ctx context.Context, namespace string) (*corev1.PodList, error)
}

func (m *MockClient) DeploymentList(ctx context.Context, namespace string) (*v1.DeploymentList, error) {
Expand Down Expand Up @@ -176,3 +177,10 @@ func (m *MockClient) StreamPodLogs(ctx context.Context, namespace string, podNam
}
return m.FnStreamPodLogs(ctx, namespace, podName, since)
}

func (m *MockClient) PodList(ctx context.Context, namespace string) (*corev1.PodList, error) {
if m.FnPodList == nil {
return nil, nil
}
return m.FnPodList(ctx, namespace)
}
68 changes: 37 additions & 31 deletions internal/cmd/local/local/install.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package local

import (
"bufio"
"context"
"fmt"
"net/http"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -162,7 +160,6 @@ func (c *Command) Install(ctx context.Context, opts InstallOpts) error {

if opts.Migrate {
c.spinner.UpdateText("Migrating airbyte data")
//if err := c.tel.Wrap(ctx, telemetry.Migrate, func() error { return opts.Docker.MigrateComposeDB(ctx, "airbyte_db") }); err != nil {
if err := c.tel.Wrap(ctx, telemetry.Migrate, func() error { return migrate.FromDockerVolume(ctx, opts.Docker.Client, "airbyte_db") }); err != nil {
pterm.Error.Println("Failed to migrate data from previous Airbyte installation")
return fmt.Errorf("unable to migrate data from previous airbyte installation: %w", err)
Expand Down Expand Up @@ -275,7 +272,7 @@ func (c *Command) Install(ctx context.Context, opts InstallOpts) error {
namespace: airbyteNamespace,
valuesYAML: valuesYAML,
}); err != nil {
return fmt.Errorf("unable to install airbyte chart: %w", err)
return c.diagnoseAirbyteChartFailure(ctx, err)
}

if err := c.handleChart(ctx, chartRequest{
Expand Down Expand Up @@ -331,6 +328,35 @@ func (c *Command) Install(ctx context.Context, opts InstallOpts) error {
return nil
}

func (c *Command) diagnoseAirbyteChartFailure(ctx context.Context, chartErr error) error {

if podList, err := c.k8s.PodList(ctx, airbyteNamespace); err == nil {

errors := []string{}
for _, pod := range podList.Items {
if pod.Status.Phase == corev1.PodFailed {
msg := "unknown"

logs, err := c.k8s.LogsGet(ctx, airbyteNamespace, pod.Name)
if err != nil {
msg = "unknown: failed to get pod logs."
}
m, err := getLastLogError(strings.NewReader(logs))
if err != nil {
msg = "unknown: failed to find error log."
}
if m != "" {
msg = m
}

errors = append(errors, fmt.Sprintf("pod %s: %s", pod.Name, msg))
}
}
return fmt.Errorf("unable to install airbyte chart:\n%s", strings.Join(errors, "\n"))
}
return fmt.Errorf("unable to install airbyte chart: %w", chartErr)
}

func (c *Command) handleIngress(ctx context.Context, hosts []string) error {
c.spinner.UpdateText("Checking for existing Ingress")

Expand Down Expand Up @@ -380,43 +406,23 @@ func (c *Command) watchEvents(ctx context.Context) {
}
}

// 2024-09-10 20:16:24 WARN i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [273....
var javaLogRx = regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \x1b\[(?:1;)?\d+m(?P<level>[A-Z]+)\x1b\[m (?P<msg>\S+ - .*)`)

func (c *Command) streamPodLogs(ctx context.Context, namespace, podName, prefix string, since time.Time) error {
r, err := c.k8s.StreamPodLogs(ctx, namespace, podName, since)
if err != nil {
return err
}
defer r.Close()

level := pterm.Debug
scanner := bufio.NewScanner(r)

for scanner.Scan() {

// skip java stacktrace noise
if strings.HasPrefix(scanner.Text(), "\tat ") || strings.HasPrefix(scanner.Text(), "\t... ") {
continue
}

m := javaLogRx.FindSubmatch(scanner.Bytes())
var msg string

if m != nil {
msg = string(m[2])
if string(m[1]) == "ERROR" {
level = pterm.Error
} else {
level = pterm.Debug
}
s := newLogScanner(r)
for s.Scan() {
if s.line.level == "ERROR" {
pterm.Error.Printfln("%s: %s", prefix, s.line.msg)
} else {
msg = scanner.Text()
pterm.Debug.Printfln("%s: %s", prefix, s.line.msg)
}

level.Printfln("%s: %s", prefix, msg)
}
return scanner.Err()

return s.Err()
}

func (c *Command) watchBootloaderLogs(ctx context.Context) {
Expand Down
79 changes: 79 additions & 0 deletions internal/cmd/local/local/log_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package local

import (
"bufio"
"io"
"regexp"
"strings"
)

// 2024-09-10 20:16:24 WARN i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [273....
var logRx = regexp.MustCompile(`^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \x1b\[(?:1;)?\d+m(?P<level>[A-Z]+)\x1b\[m (?P<msg>\S+ - .*)`)

type logLine struct {
msg string
level string
}

type logScanner struct {
scanner *bufio.Scanner
line logLine
}

func newLogScanner(r io.Reader) *logScanner {
return &logScanner{
scanner: bufio.NewScanner(r),
line: logLine{
msg: "",
level: "DEBUG",
},
}
}

func (j *logScanner) Scan() bool {
for {
if ok := j.scanner.Scan(); !ok {
return false
}

// skip java stacktrace noise
if strings.HasPrefix(j.scanner.Text(), "\tat ") || strings.HasPrefix(j.scanner.Text(), "\t... ") {
continue
}

m := logRx.FindSubmatch(j.scanner.Bytes())

if m != nil {
j.line.msg = string(m[2])
j.line.level = string(m[1])
} else {
// Some logs aren't from java (e.g. temporal) or they have a different format,
// or the log covers multiple lines (e.g. java stack trace). In that case, use the full line
// and reuse the level of the previous line.
j.line.msg = j.scanner.Text()
}
return true
}
}

func (j *logScanner) Err() error {
return j.scanner.Err()
}

func getLastLogError(r io.Reader) (string, error) {
lines := []logLine{}
s := newLogScanner(r)
for s.Scan() {
lines = append(lines, s.line)
}
if s.Err() != nil {
return "", s.Err()
}

for i := len(lines) - 1; i >= 0; i-- {
if lines[i].level == "ERROR" {
return lines[i].msg, nil
}
}
return "", nil
}
62 changes: 62 additions & 0 deletions internal/cmd/local/local/log_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package local

import (
"strings"
"testing"
)

var testLogs = strings.TrimSpace(`
2024-09-12 15:56:25 INFO i.a.d.c.DatabaseAvailabilityCheck(check):49 - Database is not ready yet. Please wait a moment, it might still be initializing...
2024-09-12 15:56:30 WARN i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [54bd6014, L:/127.0.0.1:52991 - R:localhost/127.0.0.1:8125] An exception has been observed post termination, use DEBUG level to see the full stack: java.net.PortUnreachableException: recvAddress(..) failed: Connection refused
2024-09-12 15:56:31 ERROR i.a.b.Application(main):25 - Unable to bootstrap Airbyte environment.
io.airbyte.db.init.DatabaseInitializationException: Database availability check failed.
at io.airbyte.db.init.DatabaseInitializer.initialize(DatabaseInitializer.java:54) ~[io.airbyte.airbyte-db-db-lib-0.64.3.jar:?]
at io.airbyte.bootloader.Bootloader.initializeDatabases(Bootloader.java:229) ~[io.airbyte-airbyte-bootloader-0.64.3.jar:?]
at io.airbyte.bootloader.Bootloader.load(Bootloader.java:104) ~[io.airbyte-airbyte-bootloader-0.64.3.jar:?]
at io.airbyte.bootloader.Application.main(Application.java:22) [io.airbyte-airbyte-bootloader-0.64.3.jar:?]
Caused by: io.airbyte.db.check.DatabaseCheckException: Unable to connect to the database.
at io.airbyte.db.check.DatabaseAvailabilityCheck.check(DatabaseAvailabilityCheck.java:40) ~[io.airbyte.airbyte-db-db-lib-0.64.3.jar:?]
at io.airbyte.db.init.DatabaseInitializer.initialize(DatabaseInitializer.java:45) ~[io.airbyte.airbyte-db-db-lib-0.64.3.jar:?]
... 3 more
2024-09-12 15:56:31 INFO i.m.r.Micronaut(lambda$start$0):118 - Embedded Application shutting down
2024-09-12T15:56:33.125352208Z Thread-4 INFO Loading mask data from '/seed/specs_secrets_mask.yaml
`)

func TestJavaLogScanner(t *testing.T) {
s := newLogScanner(strings.NewReader(testLogs))

expectLogLine := func(level, msg string) {
s.Scan()

if s.line.level != level {
t.Errorf("expected level %q but got %q", level, s.line.level)
}
if s.line.msg != msg {
t.Errorf("expected msg %q but got %q", msg, s.line.msg)
}
if s.Err() != nil {
t.Errorf("unexpected error %v", s.Err())
}
}

expectLogLine("INFO", "i.a.d.c.DatabaseAvailabilityCheck(check):49 - Database is not ready yet. Please wait a moment, it might still be initializing...")
expectLogLine("WARN", "i.m.s.r.u.Loggers$Slf4JLogger(warn):299 - [54bd6014, L:/127.0.0.1:52991 - R:localhost/127.0.0.1:8125] An exception has been observed post termination, use DEBUG level to see the full stack: java.net.PortUnreachableException: recvAddress(..) failed: Connection refused")
expectLogLine("ERROR", "i.a.b.Application(main):25 - Unable to bootstrap Airbyte environment.")
expectLogLine("ERROR", "io.airbyte.db.init.DatabaseInitializationException: Database availability check failed.")
expectLogLine("ERROR", "Caused by: io.airbyte.db.check.DatabaseCheckException: Unable to connect to the database.")
expectLogLine("INFO", "i.m.r.Micronaut(lambda$start$0):118 - Embedded Application shutting down")
expectLogLine("INFO", "2024-09-12T15:56:33.125352208Z Thread-4 INFO Loading mask data from '/seed/specs_secrets_mask.yaml")
}

func TestLastErrorLog(t *testing.T) {
l, err := getLastLogError(strings.NewReader(testLogs))
if err != nil {
t.Errorf("unexpected error %s", err)
}
expect := "Caused by: io.airbyte.db.check.DatabaseCheckException: Unable to connect to the database."
if l != expect {
t.Errorf("expected %q but got %q", expect, l)
}
}