Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Execution doesn't terminate even after all tasks have stopped and logs have been received #36

Open
jpadhye opened this issue Jun 2, 2020 · 5 comments

Comments

@jpadhye
Copy link

jpadhye commented Jun 2, 2020

Thanks for such an useful utility. It works wonderfully but in a Centos container, the binary doesn't terminate even after all tasks have stopped and logs have been received.

go version go1.13.6 linux/amd64

16:07:20 Log file: report.json
16:07:22 2020/06/02 16:04:31 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139239086
16:07:22 Completed 2.1 KiB/2.1 KiB (16.7 KiB/s) with 1 file(s) remaining
upload: ../../../tmp/container-meta.json to s3://stage-vis-scale-test/25/container-meta.json
16:07:22 Completed 16.3 KiB/16.3 KiB (264.6 KiB/s) with 1 file(s) remaining
upload: ./report.json to s3://stage-vis-scale-test/25/report.json 
16:07:22 Completed 33.3 KiB/33.3 KiB (314.6 KiB/s) with 1 file(s) remaining
upload: ./report.csv to s3://stage-vis-scale-test/25/report.csv   
16:07:24 2020/06/02 16:04:33 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:26 2020/06/02 16:04:35 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:28 2020/06/02 16:04:37 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:31 2020/06/02 16:04:39 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:33 2020/06/02 16:04:41 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:35 2020/06/02 16:04:43 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:37 2020/06/02 16:04:45 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:39 2020/06/02 16:04:48 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:41 2020/06/02 16:04:50 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:43 2020/06/02 16:04:52 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:45 2020/06/02 16:04:54 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:47 2020/06/02 16:04:56 All tasks have stopped
16:07:47 2020/06/02 16:04:56 Waiting for log stream run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f to exist...
16:07:47 2020/06/02 16:04:56 Found stream run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f after 92.378425ms
16:07:47 2020/06/02 16:04:56 Finding next sequence token for stream run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f
16:07:47 2020/06/02 16:04:56 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:48 2020/06/02 16:04:56 Putting log message "Container b5030d02-d6c5-4546-8f91-6c9912557732 exited with 0" to run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f
16:07:48 2020/06/02 16:04:56 Waiting for logs to finish
16:07:50 2020/06/02 16:04:58 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:52 2020/06/02 16:05:00 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:54 2020/06/02 16:05:02 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:56 2020/06/02 16:05:04 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:07:58 2020/06/02 16:05:07 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
.
.
.
.
.

16:10:43 2020/06/02 16:07:51 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:10:45 2020/06/02 16:07:53 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:10:46 Build timed out (after 5 minutes). Marking the build as failed.
16:10:47 2020/06/02 16:07:55 Printing events in stream "run_task_348479292/scale-test/0cf2e59d-c53f-451d-850e-21644e8c067f" after 1591139241076
16:10:47 scale-test/run-test.sh: line 26:   213 Terminated              ecs-run-task --file $WKSP_ROOT/scale-test/container_defn.json --region us-east-1 --cluster sse-jenkins --fargate --subnet subnet-008b63cc139fd3c68 --deregister --debug
16:10:47 make: *** [evt-stage-scale] Terminated
16:10:49 Completed 2.1 KiB/2.1 KiB (5.5 KiB/s) with 1 file(s) remaining
Finished: FAILURE
@conzy
Copy link

conzy commented Aug 3, 2020

I have the same issue with v1.3.0

Even after the task is stopped the Cloudwatch Log Stream is polled indefinitely

@sherzberg
Copy link
Contributor

We have a similar issue that just started happening a day ago. This may be a separate root issue though:

2020/12/29 22:15:55 Found stream run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324 after 44.049240354s
2020/12/29 22:15:57 Printing events in stream "run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324" after 0
2020/12/29 22:16:00 Printing events in stream "run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324" after 0
2020/12/29 22:16:02 Printing events in stream "run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324" after 0
2020/12/29 22:16:05 Printing events in stream "run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324" after 0
2020/12/29 22:16:07 Printing events in stream "run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324" after 0
2020/12/29 22:16:09 Printing events in stream "run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324" after 0
2020/12/29 22:16:26 Printing events in stream "run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324" after 0
...
2020/12/29 22:19:51 Printing events in stream "run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324" after 0
2020/12/29 22:19:53 All tasks have stopped
2020/12/29 22:19:53 Waiting for log stream run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324 to exist...
2020/12/29 22:19:53 Found stream run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324 after 83.78505ms
2020/12/29 22:19:53 Finding next sequence token for stream run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324
2020/12/29 22:19:53 Putting log message "Container 974f397b-c79e-448f-8e74-37ab15151bf3 exited with 0" to run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324
2020/12/29 22:19:53 Waiting for logs to finish
2020/12/29 22:19:53 Printing events in stream "run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324" after 0
2020/12/29 22:19:56 Printing events in stream "run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324" after 0
2020/12/29 22:19:58 Printing events in stream "run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324" after 0
2020/12/29 22:20:00 Printing events in stream "run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324" after 0
2020/12/29 22:20:02 Printing events in stream "run_task_261644790/app/564fc9dba1ee4b90b83bb6243fc99324" after 0
...

Log events are never found (after 0 never is reset either). The odd thing is that it seems to work for brand new log groups (for some time, then breaks again). This second loop of fetching logs seems to continue like the above reports.

@jason-teampay
Copy link

jason-teampay commented Dec 30, 2020

Yes, having the same issue. Started 12/29. Don't know what changed, but the container exit log appears and it just keeps spitting out the "Printing events in stream" message with only "after 0". Wondering if something changed with interface against AWS API/BOTO that lead to the change in activity. Need a workaround asap.

We ended up forking and modifying cloudwatch.go:

package runner

import (
	"context"
	"errors"
	"fmt"
	"log"
	"os"
	"sync"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/awserr"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
)

const (
	defaultLogTimeout      = time.Minute * 60
	defaultLogPollInterval = time.Second * 2
)

var count int64

type cloudwatchLogsInterface interface {
	DescribeLogStreamsPages(input *cloudwatchlogs.DescribeLogStreamsInput,
		fn func(*cloudwatchlogs.DescribeLogStreamsOutput, bool) bool) error
	DescribeLogStreams(input *cloudwatchlogs.DescribeLogStreamsInput) (*cloudwatchlogs.DescribeLogStreamsOutput, error)
	PutLogEvents(input *cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
	FilterLogEventsPages(input *cloudwatchlogs.FilterLogEventsInput,
		fn func(*cloudwatchlogs.FilterLogEventsOutput, bool) bool) error
}

// logWaiter waits for a log stream to exist
type logWaiter struct {
	CloudWatchLogs cloudwatchLogsInterface

	LogGroupName  string
	LogStreamName string

	Interval time.Duration
	Timeout  time.Duration
}

// streamExists checks the log group for a specific log stream
func (lw *logWaiter) streamExists() (bool, error) {
	params := &cloudwatchlogs.DescribeLogStreamsInput{
		LogGroupName:        aws.String(lw.LogGroupName),
		LogStreamNamePrefix: aws.String(lw.LogStreamName),
		Descending:          aws.Bool(true),
	}

	var exists bool
	err := lw.CloudWatchLogs.DescribeLogStreamsPages(params,
		func(page *cloudwatchlogs.DescribeLogStreamsOutput, lastPage bool) bool {
			for _, stream := range page.LogStreams {
				// return early if we match the log stream
				if *stream.LogStreamName == lw.LogStreamName {
					exists = true
					return true
				}
			}
			return lastPage
		})

	return exists, err
}

// Wait waits for a log stream to exist
func (lw *logWaiter) Wait(ctx context.Context) error {
	log.Printf("Waiting for log stream %s to exist...", lw.LogStreamName)
	t := time.Now()

	pollInterval := lw.Interval
	if pollInterval == time.Duration(0) {
		pollInterval = defaultLogPollInterval
	}

	timeout := lw.Timeout
	if timeout == time.Duration(0) {
		timeout = defaultLogTimeout
	}

	ticker := time.NewTicker(pollInterval)
	defer ticker.Stop()
	done := make(chan bool)
	go func() {
		time.Sleep(timeout)
		done <- true
	}()

	for {
		exists, err := lw.streamExists()

		// handle rate-limiting errors which seem to occur during
		// excessive polling operations
		if isRateLimited(err) {
			time.Sleep(5 * time.Second)
			continue
		} else if err != nil {
			return err
		} else if exists {
			log.Printf("Found stream %s after %v", lw.LogStreamName, time.Now().Sub(t))
			return nil
		}

		select {
		case <-done:
			log.Printf("Timed out waiting for stream")
			return fmt.Errorf("Timed out waiting for stream %s", lw.LogStreamName)
		case <-ticker.C:
			continue
		}
	}
}

func isRateLimited(err error) bool {
	if aerr, ok := err.(awserr.Error); ok {
		if aerr.Code() == "Throttling" {
			return true
		}
	}
	return false
}

// logWatcher watches a given CloudWatch Logs stream and prints events as they appear
type logWatcher struct {
	CloudWatchLogs cloudwatchLogsInterface

	LogGroupName  string
	LogStreamName string
	Printer       func(event *cloudwatchlogs.FilteredLogEvent) bool

	Interval time.Duration
	Timeout  time.Duration

	mu   sync.Mutex
	stop chan struct{}
}

// Watch follows the log stream and prints events via a Printer
func (lw *logWatcher) Watch(ctx context.Context) error {
	lw.mu.Lock()
	lw.stop = make(chan struct{})
	lw.mu.Unlock()

	waiter := &logWaiter{
		CloudWatchLogs: lw.CloudWatchLogs,
		LogGroupName:   lw.LogGroupName,
		LogStreamName:  lw.LogStreamName,
		Interval:       lw.Interval,
		Timeout:        lw.Timeout,
	}

	if err := waiter.Wait(ctx); err != nil {
		return err
	}

	var after int64
	var err error

	pollInterval := lw.Interval
	if pollInterval == time.Duration(0) {
		pollInterval = time.Second * 2
	}

	for {
		select {
		case <-time.After(pollInterval):
			if after, err = lw.printEventsAfter(ctx, after); err != nil {
				return err
			}

		case <-lw.stop:
			return nil

		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

// Stop watching a log stream
func (lw *logWatcher) Stop() error {
	lw.mu.Lock()
	defer lw.mu.Unlock()
	if lw.stop != nil {
		close(lw.stop)
		return nil
	}
	return errors.New("Log watcher not started")
}

// printEventsAfter prints events from a given stream after a given timestamp
func (lw *logWatcher) printEventsAfter(ctx context.Context, ts int64) (int64, error) {
	log.Printf("Printing events in stream %q after %d (%d)", lw.LogStreamName, ts, count)
	count++
	if count >= 60 {
		log.Printf("Returning because count limit reached")
		os.Exit(0)
	}
	t := time.Now()
	//var count int64

	filterInput := &cloudwatchlogs.FilterLogEventsInput{
		LogGroupName:   aws.String(lw.LogGroupName),
		LogStreamNames: aws.StringSlice([]string{lw.LogStreamName}),
		StartTime:      aws.Int64(ts + 1),
	}

	err := lw.CloudWatchLogs.FilterLogEventsPages(filterInput,
		func(p *cloudwatchlogs.FilterLogEventsOutput, lastPage bool) (shouldContinue bool) {
			for _, event := range p.Events {
				//count++
				if !lw.Printer(event) {
					log.Printf("Stopping log watcher via print function")
					lw.Stop()
				}
				if *event.Timestamp > ts {
					ts = *event.Timestamp
				}
			}
			return lastPage
		})
	if err != nil {
		log.Printf("Printed %d events in %v", count, time.Now().Sub(t))
	}

	return ts, err
}

// logWriter appends a line to a finished log stream
type logWriter struct {
	CloudWatchLogs cloudwatchLogsInterface

	LogGroupName  string
	LogStreamName string

	Interval time.Duration
	Timeout  time.Duration
}

func (lw *logWriter) nextSequenceToken() (*string, error) {
	log.Printf("Finding next sequence token for stream %s", lw.LogStreamName)

	streams, err := lw.CloudWatchLogs.DescribeLogStreams(&cloudwatchlogs.DescribeLogStreamsInput{
		LogGroupName:        aws.String(lw.LogGroupName),
		LogStreamNamePrefix: aws.String(lw.LogStreamName),
		Descending:          aws.Bool(true),
		Limit:               aws.Int64(1),
	})
	if err != nil {
		return nil, err
	} else if len(streams.LogStreams) == 0 {
		return nil, fmt.Errorf("failed to find stream %s in group %s", lw.LogStreamName, lw.LogGroupName)
	}

	return streams.LogStreams[0].UploadSequenceToken, nil
}

func (lw *logWriter) WriteString(ctx context.Context, msg string) error {
	waiter := &logWaiter{
		CloudWatchLogs: lw.CloudWatchLogs,
		LogGroupName:   lw.LogGroupName,
		LogStreamName:  lw.LogStreamName,
		Interval:       lw.Interval,
		Timeout:        lw.Timeout,
	}

	if err := waiter.Wait(ctx); err != nil {
		return err
	}

	sequence, err := lw.nextSequenceToken()
	if err != nil {
		return err
	}

	log.Printf("Putting log message %q to %s", msg, lw.LogStreamName)
	_, err = lw.CloudWatchLogs.PutLogEvents(&cloudwatchlogs.PutLogEventsInput{
		SequenceToken: sequence,
		LogGroupName:  aws.String(lw.LogGroupName),
		LogStreamName: aws.String(lw.LogStreamName),
		LogEvents: []*cloudwatchlogs.InputLogEvent{
			{
				Message:   aws.String(msg),
				Timestamp: aws.Int64(aws.TimeUnixMilli(time.Now())),
			},
		},
	})
	return err
}

func createLogGroup(sess *session.Session, logGroup string) error {
	cwl := cloudwatchlogs.New(sess)
	groups, err := cwl.DescribeLogGroups(&cloudwatchlogs.DescribeLogGroupsInput{
		Limit:              aws.Int64(1),
		LogGroupNamePrefix: aws.String(logGroup),
	})
	if err != nil {
		return err
	}
	if len(groups.LogGroups) == 0 {
		log.Printf("Creating log group %s", logGroup)
		_, err = cwl.CreateLogGroup(&cloudwatchlogs.CreateLogGroupInput{
			LogGroupName: aws.String(logGroup),
		})
		if err != nil {
			return err
		}
	} else {
		log.Printf("Log group %s exists", logGroup)
	}
	return nil
}

And runner.go:

package runner

import (
	"context"
	"errors"
	"fmt"
	"log"
	"os"
	"path"
	"strings"
	"sync"
	"time"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
	"github.com/aws/aws-sdk-go/service/ecs"
	"github.com/buildkite/ecs-run-task/parser"
)

type Override struct {
	Service string
	Command []string
}

type Runner struct {
	Service            string
	TaskName           string
	TaskDefinitionFile string
	Cluster            string
	LogGroupName       string
	Region             string
	Config             *aws.Config
	Overrides          []Override
	Fargate            bool
	SecurityGroups     []string
	Subnets            []string
	Environment        []string
	Count              int64
	Deregister         bool
}

func New() *Runner {
	return &Runner{
		Region: os.Getenv("AWS_REGION"),
		Config: aws.NewConfig(),
	}
}

func (r *Runner) Run(ctx context.Context) error {
	taskDefinitionInput, err := parser.Parse(r.TaskDefinitionFile, os.Environ())
	if err != nil {
		return err
	}

	streamPrefix := r.TaskName
	if streamPrefix == "" {
		streamPrefix = fmt.Sprintf("run_task_%d", time.Now().Nanosecond())
	}

	sess := session.Must(session.NewSession(r.Config))

	if err := createLogGroup(sess, r.LogGroupName); err != nil {
		return err
	}

	log.Printf("Setting tasks to use log group %s", r.LogGroupName)
	for _, def := range taskDefinitionInput.ContainerDefinitions {
		def.LogConfiguration = &ecs.LogConfiguration{
			LogDriver: aws.String("awslogs"),
			Options: map[string]*string{
				"awslogs-group":         aws.String(r.LogGroupName),
				"awslogs-region":        aws.String(r.Region),
				"awslogs-stream-prefix": aws.String(streamPrefix),
			},
		}
	}

	svc := ecs.New(sess)

	log.Printf("Registering a task for %s", *taskDefinitionInput.Family)
	resp, err := svc.RegisterTaskDefinition(taskDefinitionInput)
	if err != nil {
		return err
	}

	taskDefinition := fmt.Sprintf("%s:%d",
		*resp.TaskDefinition.Family, *resp.TaskDefinition.Revision)

	defer func() {
		if !r.Deregister {
			return
		}

		log.Printf("Deregistering task %s", taskDefinition)
		_, err := svc.DeregisterTaskDefinition(&ecs.DeregisterTaskDefinitionInput{
			TaskDefinition: &taskDefinition,
		})
		if err != nil {
			log.Printf("Failed to deregister task %s: %s", taskDefinition, err.Error())
			return
		}
		log.Printf("Successfully deregistered task %s", taskDefinition)
	}()

	runTaskInput := &ecs.RunTaskInput{
		TaskDefinition: aws.String(taskDefinition),
		Cluster:        aws.String(r.Cluster),
		Count:          aws.Int64(r.Count),
		Overrides: &ecs.TaskOverride{
			ContainerOverrides: []*ecs.ContainerOverride{},
		},
	}
	if r.Fargate {
		runTaskInput.LaunchType = aws.String("FARGATE")
	}
	if len(r.Subnets) > 0 || len(r.SecurityGroups) > 0 {
		runTaskInput.NetworkConfiguration = &ecs.NetworkConfiguration{
			AwsvpcConfiguration: &ecs.AwsVpcConfiguration{
				Subnets:        awsStrings(r.Subnets),
				AssignPublicIp: aws.String("ENABLED"),
				SecurityGroups: awsStrings(r.SecurityGroups),
			},
		}
	}

	for _, override := range r.Overrides {
		if len(override.Command) > 0 {
			cmds := []*string{}

			if override.Service == "" {
				if len(taskDefinitionInput.ContainerDefinitions) != 1 {
					return fmt.Errorf("No service provided for override and can't determine default service with %d container definitions", len(taskDefinitionInput.ContainerDefinitions))
				}

				override.Service = *taskDefinitionInput.ContainerDefinitions[0].Name
				log.Printf("Assuming override applies to '%s'", override.Service)
			}

			for _, command := range override.Command {
				cmds = append(cmds, aws.String(command))
			}

			env, err := awsKeyValuePairForEnv(os.LookupEnv, r.Environment)
			if err != nil {
				return err
			}

			runTaskInput.Overrides.ContainerOverrides = append(
				runTaskInput.Overrides.ContainerOverrides,
				&ecs.ContainerOverride{
					Command:     cmds,
					Name:        aws.String(override.Service),
					Environment: env,
				},
			)
		}
	}

	log.Printf("Running task %s", taskDefinition)
	runResp, err := svc.RunTask(runTaskInput)
	if err != nil {
		return fmt.Errorf("Unable to run task: %s", err.Error())
	}

	cwl := cloudwatchlogs.New(sess)
	var wg sync.WaitGroup

	// spawn a log watcher for each container
	for _, task := range runResp.Tasks {
		for _, container := range task.Containers {
			containerId := path.Base(*container.ContainerArn)
			watcher := &logWatcher{
				LogGroupName:   r.LogGroupName,
				LogStreamName:  logStreamName(streamPrefix, container, task),
				CloudWatchLogs: cwl,

				// watch for the finish message to terminate the logger
				Printer: func(ev *cloudwatchlogs.FilteredLogEvent) bool {
					finishedPrefix := fmt.Sprintf(
						"Container %s exited with",
						containerId,
					)
					log.Printf("Count: %d", count)
					if strings.HasPrefix(*ev.Message, finishedPrefix) || count >= 75 {
						log.Printf("Found container finished message for %s: %s",
							containerId, *ev.Message)
						return false
					}
					fmt.Println(*ev.Message)
					return true
				},
			}

			wg.Add(1)
			go func() {
				defer wg.Done()
				if err := watcher.Watch(ctx); err != nil {
					log.Printf("Log watcher returned error: %v", err)
				}
			}()
		}
	}

	var taskARNs []*string
	for _, task := range runResp.Tasks {
		log.Printf("Waiting until task %s has stopped", *task.TaskArn)
		taskARNs = append(taskARNs, task.TaskArn)
	}

	err = svc.WaitUntilTasksStopped(&ecs.DescribeTasksInput{
		Cluster: aws.String(r.Cluster),
		Tasks:   taskARNs,
	})
	if err != nil {
		return err
	}

	log.Printf("All tasks have stopped")

	output, err := svc.DescribeTasks(&ecs.DescribeTasksInput{
		Cluster: aws.String(r.Cluster),
		Tasks:   taskARNs,
	})
	if err != nil {
		return err
	}

	// Get the final state of each task and container and write to cloudwatch logs
	for _, task := range output.Tasks {
		for _, container := range task.Containers {
			lw := &logWriter{
				LogGroupName:   r.LogGroupName,
				LogStreamName:  logStreamName(streamPrefix, container, task),
				CloudWatchLogs: cwl,
			}
			if err := writeContainerFinishedMessage(ctx, lw, task, container); err != nil {
				return err
			}
		}
	}

	log.Printf("Waiting for logs to finish")
	wg.Wait()

	// Determine exit code based on the first non-zero exit code
	for _, task := range output.Tasks {
		for _, container := range task.Containers {
			if *container.ExitCode != 0 {
				return &exitError{
					fmt.Errorf(
						"container %s exited with %d",
						*container.Name,
						*container.ExitCode,
					),
					int(*container.ExitCode),
				}
			}
		}
	}

	return err
}

func logStreamName(logStreamPrefix string, container *ecs.Container, task *ecs.Task) string {
	return fmt.Sprintf(
		"%s/%s/%s",
		logStreamPrefix,
		*container.Name,
		path.Base(*task.TaskArn),
	)
}

func writeContainerFinishedMessage(ctx context.Context, w *logWriter, task *ecs.Task, container *ecs.Container) error {
	if *container.LastStatus != `STOPPED` {
		return fmt.Errorf("expected container to be STOPPED, got %s", *container.LastStatus)
	}
	if container.ExitCode == nil {
		return errors.New(*container.Reason)
	}
	return w.WriteString(ctx, fmt.Sprintf(
		"Container %s exited with %d",
		path.Base(*container.ContainerArn),
		*container.ExitCode,
	))
}

type exitError struct {
	error
	exitCode int
}

func (ee *exitError) ExitCode() int {
	return ee.exitCode
}

func awsStrings(ss []string) []*string {
	out := make([]*string, len(ss))
	for i := range ss {
		out[i] = &ss[i]
	}
	return out
}

func awsKeyValuePairForEnv(lookupEnv func(key string) (string, bool), wanted []string) ([]*ecs.KeyValuePair, error) {
	var kvp []*ecs.KeyValuePair
	for _, s := range wanted {
		parts := strings.SplitN(s, "=", 2)
		key := parts[0]
		var value string
		if len(parts) == 2 {
			value = parts[1]
		} else {
			v2, ok := lookupEnv(parts[0])
			if !ok {
				return nil, fmt.Errorf("missing environment variable %q", key)
			}
			value = v2
		}

		kvp = append(kvp, &ecs.KeyValuePair{
			Name:  &key,
			Value: &value,
		})
	}

	return kvp, nil
}

To work around the issue, we added a counter since we know that after 55-ish polls our stuff completes

@sherzberg
Copy link
Contributor

sherzberg commented Jan 2, 2021

Strangely, I have only been able to reproduce this in us-east-1. us-west-2 and us-east-2 don't seem to show this behavior with the same image that produces the exact same log set. After a about 5000 logs, we get into the cycle I posted above.

I believe there is a bug in the printEventsAfter's FilterLogEventsPages func argument. I think we want ALL pages, so the return statement should be just return True, not return lastPage.

The documentation here, https://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_FilterLogEvents.html, states:

This operation can return empty results while there are more log events available through the token.

In my tests, when we get into the state of not finding logs I posted about above, the first page has 0 events, and the lastPage argument is false. So when this happens with the current code base, we never attempt to get the logs from the other pages.

I plan on writing a test that shows this behavior before submitting a PR unless someone beats me to it.

@vcastellm
Copy link

@sherzberg did you manage to find a solution?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants