Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework log streaming and related functions #1802

Merged
merged 49 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
a0cf979
fix logging
anbraten Jun 2, 2023
c4ddf20
store to db
anbraten Jun 2, 2023
60cec1d
Merge branch 'master' into fix-log
6543 Jun 2, 2023
5789aee
Merge branch 'master' into fix-log
6543 Jun 3, 2023
bd17843
Merge branch 'master' into fix-log
6543 Jun 3, 2023
bc96707
Merge remote-tracking branch 'upstream/master' into fix-log
anbraten Jun 3, 2023
5e4d5bd
Merge branch 'fix-log' of github.com:anbraten/woodpecker into fix-log
anbraten Jun 3, 2023
7a30a53
update model and add migration
anbraten Jun 4, 2023
bba6e02
refactor log code
anbraten Jun 4, 2023
7a64b24
Merge branch 'master' into fix-log
anbraten Jun 4, 2023
d35daa7
update ui
anbraten Jun 4, 2023
120a0e7
Merge branch 'fix-log' of github.com:anbraten/woodpecker into fix-log
anbraten Jun 4, 2023
8a101de
adjust api endpoints and ui
anbraten Jun 4, 2023
474724d
rename
anbraten Jun 4, 2023
dd3e1fd
finish store & bump proto version ...
6543 Jun 4, 2023
ea13eb0
new entity will get new table so we dont need to rename on migration
6543 Jun 4, 2023
b22c4ee
cleanup + fixing
6543 Jun 4, 2023
7b1890a
fix
6543 Jun 4, 2023
faa70be
fix
6543 Jun 4, 2023
ae3c9fc
steps get an UUID from pipeline.Compiler
6543 Jun 4, 2023
a07234f
workflow need a uuid to atm ... show they have one in future too?
6543 Jun 4, 2023
fae1e52
for logs:
6543 Jun 4, 2023
716503d
gen cods
6543 Jun 4, 2023
5d4f34a
document issue
6543 Jun 4, 2023
553b67e
Merge branch 'master' into fix-log
6543 Jun 4, 2023
46ca1ae
clean unused
6543 Jun 4, 2023
0d0bdef
adjust cli
6543 Jun 4, 2023
92d09b5
Merge branch 'master' into fix-log
6543 Jun 4, 2023
b578311
adjust log streaming code so use stepID
6543 Jun 4, 2023
0db1ac9
update swagger doc
6543 Jun 5, 2023
88c91a2
add check back
6543 Jun 5, 2023
2d5c497
fix
6543 Jun 5, 2023
6412306
fix tests
6543 Jun 5, 2023
08d75c6
migration
6543 Jun 5, 2023
ff498e1
migration timeout
6543 Jun 5, 2023
5c290f9
migration takes a long time
6543 Jun 5, 2023
762e5de
make cli work
6543 Jun 5, 2023
fcee178
fix webui log display
6543 Jun 5, 2023
e4d806d
a todo got resolved
6543 Jun 5, 2023
0b5c32e
cleanup code
6543 Jun 5, 2023
0f82a4f
fix web logstream
6543 Jun 5, 2023
1be3243
clean
6543 Jun 5, 2023
dab2e5d
Update server/store/datastore/log.go
6543 Jun 5, 2023
21b4989
Update server/logging/logging.go
6543 Jun 5, 2023
e0ff996
Merge branch 'master' into fix-log
6543 Jun 5, 2023
9eb0273
move migration into new pull: #1828
6543 Jun 5, 2023
7ae0308
OrderBy_ID
6543 Jun 5, 2023
6c06e4b
Merge branch 'master' into fix-log
6543 Jun 6, 2023
d31fe9a
Merge branch 'master' into fix-log
6543 Jun 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions agent/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ func (r *Runner) createLogger(_ context.Context, logger zerolog.Logger, uploads

loglogger.Debug().Msg("log stream opened")

limitedPart := io.LimitReader(part, maxLogsUpload)
logStream := rpc.NewLineWriter(r.client, work.ID, step.Alias, secrets...)
if _, err := io.Copy(logStream, limitedPart); err != nil {
// TODO: use step-id instead of work-id
logStream := rpc.NewLineWriter(r.client, work.ID, secrets...)
if _, err := io.Copy(logStream, part); err != nil {
log.Error().Err(err).Msg("copy limited logStream part")
}

Expand Down
15 changes: 8 additions & 7 deletions agent/rpc/client_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package rpc
import (
"context"
"encoding/json"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -278,14 +279,14 @@ func (c *client) Update(ctx context.Context, id string, state rpc.State) (err er
}

// Log writes the pipeline log entry.
func (c *client) Log(ctx context.Context, id string, line *rpc.Line) (err error) {
func (c *client) Log(ctx context.Context, logEntry *rpc.LogEntry) (err error) {
req := new(proto.LogRequest)
req.Id = id
req.Line = new(proto.Line)
req.Line.Out = line.Out
req.Line.Pos = int32(line.Pos)
req.Line.Step = line.Step
req.Line.Time = line.Time
req.Id = strconv.FormatInt(logEntry.StepID, 10)
req.LogEntry = new(proto.LogEntry)
req.LogEntry.StepId = logEntry.StepID
req.LogEntry.Data = logEntry.Data
req.LogEntry.Line = int32(logEntry.Line)
req.LogEntry.Time = logEntry.Time
for {
_, err = c.client.Log(ctx, req)
if err == nil {
Expand Down
8 changes: 0 additions & 8 deletions agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ import (
"github.com/woodpecker-ci/woodpecker/shared/utils"
)

// TODO: Implement log streaming.
// Until now we need to limit the size of the logs and files that we upload.
// The maximum grpc payload size is 4194304. So we need to set these limits below the maximum.
const (
maxLogsUpload = 2000000 // this is per step
maxFileUpload = 1000000
)

type Runner struct {
client rpc.Peer
filter rpc.Filter
Expand Down
50 changes: 17 additions & 33 deletions cli/exec/line.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,60 +19,44 @@ import (
"os"
"strings"
"time"
)

// Identifies the type of line in the logs.
const (
LineStdout int = iota
LineStderr
LineExitCode
LineMetadata
LineProgress
"github.com/woodpecker-ci/woodpecker/pipeline/rpc"
)

// Line is a line of console output.
type Line struct {
Step string `json:"step,omitempty"`
Time int64 `json:"time,omitempty"`
Type int `json:"type,omitempty"`
Pos int `json:"pos,omitempty"`
Out string `json:"out,omitempty"`
}

// LineWriter sends logs to the client.
type LineWriter struct {
name string
num int
now time.Time
rep *strings.Replacer
lines []*Line
stepID int64
num int
now time.Time
rep *strings.Replacer
lines []*rpc.LogEntry
}

// NewLineWriter returns a new line reader.
func NewLineWriter(name string) *LineWriter {
func NewLineWriter(stepID int64) *LineWriter {
w := new(LineWriter)
w.name = name
w.stepID = stepID
w.num = 0
w.now = time.Now().UTC()

return w
}

func (w *LineWriter) Write(p []byte) (n int, err error) {
out := string(p)
data := string(p)
if w.rep != nil {
out = w.rep.Replace(out)
data = w.rep.Replace(data)
}

line := &Line{
Out: out,
Step: w.name,
Pos: w.num,
Time: int64(time.Since(w.now).Seconds()),
Type: LineStdout,
line := &rpc.LogEntry{
Data: data,
StepID: w.stepID,
Line: w.num,
Time: int64(time.Since(w.now).Seconds()),
Type: rpc.LogEntryStdout,
}

fmt.Fprintf(os.Stderr, "[%s:L%d:%ds] %s", w.name, w.num, int64(time.Since(w.now).Seconds()), out)
fmt.Fprintf(os.Stderr, "[%d:L%d:%ds] %s", w.stepID, w.num, int64(time.Since(w.now).Seconds()), data)

w.num++

Expand Down
79 changes: 38 additions & 41 deletions pipeline/rpc/line.go → pipeline/rpc/log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,70 +28,67 @@ import (

// Identifies the type of line in the logs.
const (
LineStdout int = iota
LineStderr
LineExitCode
LineMetadata
LineProgress
LogEntryStdout int = iota
LogEntryStderr
LogEntryExitCode
LogEntryMetadata
LogEntryProgress
)

// Line is a line of console output.
type Line struct {
Step string `json:"step,omitempty"`
Time int64 `json:"time,omitempty"`
Type int `json:"type,omitempty"`
Pos int `json:"pos,omitempty"`
Out string `json:"out,omitempty"`
type LogEntry struct {
StepID int64 `json:"step_id,omitempty"`
Time int64 `json:"time,omitempty"`
Type int `json:"type,omitempty"`
Line int `json:"line,omitempty"`
Data string `json:"data,omitempty"`
}

func (l *Line) String() string {
func (l *LogEntry) String() string {
switch l.Type {
case LineExitCode:
return fmt.Sprintf("[%s] exit code %s", l.Step, l.Out)
case LogEntryExitCode:
return fmt.Sprintf("[%d] exit code %s", l.StepID, l.Data)
default:
return fmt.Sprintf("[%s:L%v:%vs] %s", l.Step, l.Pos, l.Time, l.Out)
return fmt.Sprintf("[%d:L%v:%vs] %s", l.StepID, l.Line, l.Time, l.Data)
}
}

// LineWriter sends logs to the client.
type LineWriter struct {
peer Peer
id string
name string
num int
now time.Time
rep *strings.Replacer
lines []*Line
peer Peer
stepID int64
num int
now time.Time
rep *strings.Replacer
lines []*LogEntry
}

// NewLineWriter returns a new line reader.
func NewLineWriter(peer Peer, id, name string, secret ...string) *LineWriter {
func NewLineWriter(peer Peer, stepID int64, secret ...string) *LineWriter {
return &LineWriter{
peer: peer,
id: id,
name: name,
now: time.Now().UTC(),
rep: shared.NewSecretsReplacer(secret),
lines: nil,
peer: peer,
stepID: stepID,
now: time.Now().UTC(),
rep: shared.NewSecretsReplacer(secret),
lines: nil,
}
}

func (w *LineWriter) Write(p []byte) (n int, err error) {
out := string(p)
data := string(p)
if w.rep != nil {
out = w.rep.Replace(out)
data = w.rep.Replace(data)
}
log.Trace().Str("name", w.name).Str("ID", w.id).Msgf("grpc write line: %s", out)
log.Trace().Int64("step-id", w.stepID).Msgf("grpc write line: %s", data)

line := &Line{
Out: out,
Step: w.name,
Pos: w.num,
Time: int64(time.Since(w.now).Seconds()),
Type: LineStdout,
line := &LogEntry{
Data: data,
StepID: w.stepID,
Time: int64(time.Since(w.now).Seconds()),
Type: LogEntryStdout,
}
if err := w.peer.Log(context.Background(), w.id, line); err != nil {
log.Error().Err(err).Msgf("fail to write pipeline log to peer '%s'", w.id)
if err := w.peer.Log(context.Background(), line); err != nil {
log.Error().Err(err).Msgf("fail to write pipeline log to peer '%d'", w.stepID)
}
w.num++

Expand All @@ -111,7 +108,7 @@ func (w *LineWriter) Write(p []byte) (n int, err error) {
}

// Lines returns the line history
func (w *LineWriter) Lines() []*Line {
func (w *LineWriter) Lines() []*LogEntry {
return w.lines
}

Expand Down
12 changes: 6 additions & 6 deletions pipeline/rpc/line_test.go → pipeline/rpc/log_entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
"testing"
)

func TestLine(t *testing.T) {
line := Line{
Step: "redis",
Time: 60,
Pos: 1,
Out: "starting redis server",
func TestLogEntry(t *testing.T) {
line := LogEntry{
StepID: int64(123),
Time: 60,
Line: 1,
Data: "starting redis server",
}
got, want := line.String(), "[redis:L1:60s] starting redis server"
if got != want {
Expand Down
2 changes: 1 addition & 1 deletion pipeline/rpc/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type Peer interface {
Update(c context.Context, id string, state State) error

// Log writes the pipeline log entry.
Log(c context.Context, id string, line *Line) error
Log(c context.Context, logEntry *LogEntry) error

// RegisterAgent register our agent to the server
RegisterAgent(ctx context.Context, platform, backend, version string, capacity int) (int64, error)
Expand Down
2 changes: 1 addition & 1 deletion pipeline/rpc/proto/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ package proto

// Version is the version of the woodpecker.proto file,
// !IMPORTANT! increased by 1 each time it get changed !IMPORTANT!
const Version int32 = 2
const Version int32 = 3
Loading