From 4cc80f26b9c13f9c68714a12a99a2a581aaea0d0 Mon Sep 17 00:00:00 2001 From: Matt Burke Date: Thu, 6 Apr 2023 09:16:36 -0400 Subject: [PATCH 1/8] If unpack does not succeed, exit with an error --- internal/spokes/spokes.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/spokes/spokes.go b/internal/spokes/spokes.go index 1c0af1a..1267fb5 100644 --- a/internal/spokes/spokes.go +++ b/internal/spokes/spokes.go @@ -195,7 +195,7 @@ func (r *spokesReceivePack) execute(ctx context.Context) error { } } - return nil + return unpackErr } func (r *spokesReceivePack) isFastForward(c *command, ctx context.Context) bool { From 9e788b525a58ba8f2d0cabb76045d7e59b4d69a5 Mon Sep 17 00:00:00 2001 From: Matt Burke Date: Thu, 6 Apr 2023 09:51:29 -0400 Subject: [PATCH 2/8] Revert a change to pktline from #40 --- internal/pktline/capabilities.go | 5 ++--- internal/spokes/spokes.go | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/internal/pktline/capabilities.go b/internal/pktline/capabilities.go index 8b3fedc..9fe0fb9 100644 --- a/internal/pktline/capabilities.go +++ b/internal/pktline/capabilities.go @@ -152,9 +152,8 @@ func (c Capabilities) Quiet() Capability { func (c Capabilities) Atomic() Capability { return c.caps[Atomic] } -func (c Capabilities) HasPushOptions() bool { - _, ok := c.caps[PushOptions] - return ok +func (c Capabilities) PushOptions() Capability { + return c.caps[PushOptions] } func (c Capabilities) AllowTipSha1InWant() Capability { return c.caps[AllowTipSha1InWant] diff --git a/internal/spokes/spokes.go b/internal/spokes/spokes.go index 1267fb5..6817929 100644 --- a/internal/spokes/spokes.go +++ b/internal/spokes/spokes.go @@ -141,7 +141,7 @@ func (r *spokesReceivePack) execute(ctx context.Context) error { return nil } - if capabilities.HasPushOptions() { + if capabilities.IsDefined(pktline.PushOptions) { // We don't use push-options here. if err := r.dumpPushOptions(ctx); err != nil { return err From 3d157b55e0edb1d68d3b8a32b013b2ff929585d5 Mon Sep 17 00:00:00 2001 From: Matt Burke Date: Thu, 6 Apr 2023 09:55:16 -0400 Subject: [PATCH 3/8] Extract a couple of methods for use later --- internal/spokes/spokes.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/internal/spokes/spokes.go b/internal/spokes/spokes.go index 6817929..5e552bd 100644 --- a/internal/spokes/spokes.go +++ b/internal/spokes/spokes.go @@ -673,10 +673,7 @@ func (r *spokesReceivePack) getRefUpdateCommandLimit() (int, error) { // startSidebandMultiplexer checks if a sideband capability has been required and, in that case, starts multiplexing the // stderr of the command `cmd` into the indicated `output` func startSidebandMultiplexer(stderr io.ReadCloser, output io.Writer, capabilities pktline.Capabilities) (*errgroup.Group, error) { - _, sbDefined := capabilities.Get(pktline.SideBand) - _, sb64kDefined := capabilities.Get(pktline.SideBand64k) - - if !sbDefined && !sb64kDefined { + if !useSideBand(capabilities) { // no sideband capability has been defined return nil, nil } @@ -689,10 +686,7 @@ func startSidebandMultiplexer(stderr io.ReadCloser, output io.Writer, capabiliti _ = stderr.Close() }() for { - var bufferSize = 999 - if sb64kDefined { - bufferSize = 65519 - } + bufferSize := sideBandBufSize(capabilities) buf := make([]byte, bufferSize) n, err := stderr.Read(buf[:]) @@ -884,3 +878,14 @@ func includeNonDeletes(commands []command) bool { } return false } + +func useSideBand(c pktline.Capabilities) bool { + return c.IsDefined(pktline.SideBand) || c.IsDefined(pktline.SideBand64k) +} + +func sideBandBufSize(capabilities pktline.Capabilities) int { + if capabilities.IsDefined(pktline.SideBand64k) { + return 65519 + } + return 999 +} From eaf8dafecb26d722b95bd30e114b09171b297a90 Mon Sep 17 00:00:00 2001 From: Matt Burke Date: Thu, 6 Apr 2023 09:56:02 -0400 Subject: [PATCH 4/8] Add a test that demonstrates a protocol error When the client doesn't request sideband information, the report should be sent directly, not in a sideband message, and there should be no other sideband messages (i.e. from index-pack). --- internal/integration/nosideband_test.go | 119 ++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 internal/integration/nosideband_test.go diff --git a/internal/integration/nosideband_test.go b/internal/integration/nosideband_test.go new file mode 100644 index 0000000..0b69a67 --- /dev/null +++ b/internal/integration/nosideband_test.go @@ -0,0 +1,119 @@ +//go:build integration + +package integration + +import ( + "bufio" + "bytes" + "context" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNoSideBand(t *testing.T) { + const ( + defaultBranch = "refs/heads/main" + createBranch = "refs/heads/newbranch" + + testCommit = "e589bdee50e39beac56220c4b7a716225f79e3cf" + ) + + wd, err := os.Getwd() + require.NoError(t, err) + origin := filepath.Join(wd, "testdata/remote/git-internals-fork.git") + + testRepo := t.TempDir() + requireRun(t, "git", "init", "--bare", testRepo) + requireRun(t, "git", "-C", testRepo, "fetch", origin, defaultBranch+":"+defaultBranch) + + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + srp := exec.CommandContext(ctx, "spokes-receive-pack", ".") + srp.Dir = testRepo + srp.Env = append(os.Environ(), + "GIT_SOCKSTAT_VAR_spokes_quarantine=bool:true", + "GIT_SOCKSTAT_VAR_quarantine_id=config-test-quarantine-id") + srp.Stderr = os.Stderr + srpIn, err := srp.StdinPipe() + require.NoError(t, err) + srpOut, err := srp.StdoutPipe() + require.NoError(t, err) + + srpErr := make(chan error) + go func() { srpErr <- srp.Run() }() + + bufSRPOut := bufio.NewReader(srpOut) + + refs, _, err := readAdv(bufSRPOut) + require.NoError(t, err) + assert.Equal(t, refs, map[string]string{ + defaultBranch: testCommit, + }) + + oldnew := fmt.Sprintf("%040d %s", 0, testCommit) + require.NoError(t, writePktlinef(srpIn, + "%s %s\x00report-status report-status-v2 push-options object-format=sha1\n", oldnew, createBranch)) + _, err = srpIn.Write([]byte("0000")) + require.NoError(t, err) + + require.NoError(t, writePktlinef(srpIn, + "anything i want to put in a push option\n")) + _, err = srpIn.Write([]byte("0000")) + require.NoError(t, err) + + // Send an empty pack, since we're using commits that are already in + // the repo. + packObjects := exec.CommandContext(ctx, "git", "-C", testRepo, "pack-objects", "--all-progress-implied", "--revs", "--stdout", "--thin", "--delta-base-offset", "--progress") + packObjects.Stderr = os.Stderr + pack, err := packObjects.StdoutPipe() + require.NoError(t, err) + go packObjects.Run() + if _, err := io.Copy(srpIn, pack); err != nil { + t.Logf("error writing pack to spokes-receive-pack input: %v", err) + } + + require.NoError(t, srpIn.Close()) + + lines, err := readResultNoSideBand(bufSRPOut) + require.NoError(t, err) + assert.Equal(t, []string{ + "unpack ok\n", + "ok refs/heads/newbranch\n", + }, lines) +} + +func readResultNoSideBand(r io.Reader) ([]string, error) { + var lines []string + + // Read all of the output so that we can include it with errors. + data, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + // Replace r. + r = bytes.NewReader(data) + + for { + pkt, err := readPktline(r) + switch { + case err != nil: + return nil, fmt.Errorf("%w while parsing %q", err, string(data)) + + case pkt == nil: + return lines, nil + + default: + lines = append(lines, string(pkt)) + } + } +} From eff59d6a55647a6f7677b47a03020ce4bc932cab Mon Sep 17 00:00:00 2001 From: Matt Burke Date: Thu, 6 Apr 2023 09:59:13 -0400 Subject: [PATCH 5/8] Make index-pack quieter if we're not going to use its output --- internal/spokes/spokes.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/spokes/spokes.go b/internal/spokes/spokes.go index 5e552bd..581ae04 100644 --- a/internal/spokes/spokes.go +++ b/internal/spokes/spokes.go @@ -520,8 +520,11 @@ func (r *spokesReceivePack) readPack(ctx context.Context, commands []command, ca // FIXME? add --pack_header similar to git's push_header_arg - // These options are always on in prod. - args = append(args, "--show-resolving-progress", "--report-end-of-input", "--fix-thin") + if useSideBand(capabilities) { + args = append(args, "--show-resolving-progress", "--report-end-of-input") + } + + args = append(args, "--fix-thin") if r.isFsckConfigEnabled() { args = append(args, "--strict") From 1488c8291f119eaee270e82e91b201338c16f991 Mon Sep 17 00:00:00 2001 From: Matt Burke Date: Thu, 6 Apr 2023 09:59:42 -0400 Subject: [PATCH 6/8] When sideband is off, send the report directly Most git clients enable the side-band or side-band-64k capability. When they do this, the report ("unpack ok" and per-ref status) will be sent nested in a sideband 1 packet. However, git clients aren't required to do this, and when they don't the report is supposed to be written without getting nested. So that's what this change does. --- internal/spokes/spokes.go | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/internal/spokes/spokes.go b/internal/spokes/spokes.go index 581ae04..f60cb08 100644 --- a/internal/spokes/spokes.go +++ b/internal/spokes/spokes.go @@ -190,7 +190,7 @@ func (r *spokesReceivePack) execute(ctx context.Context) error { } if capabilities.IsDefined(pktline.ReportStatusV2) || capabilities.IsDefined(pktline.ReportStatus) { - if err := r.report(ctx, unpackErr == nil, commands); err != nil { + if err := r.report(ctx, unpackErr == nil, commands, capabilities); err != nil { return err } } @@ -825,38 +825,53 @@ func (r *spokesReceivePack) performCheckConnectivityOnObject(ctx context.Context } // report the success/failure of the push operation to the client -func (r *spokesReceivePack) report(_ context.Context, unpackOK bool, commands []command) error { - var buf bytes.Buffer +func writeReport(w io.Writer, unpackOK bool, commands []command) error { if unpackOK { - if err := writePacketLine(&buf, []byte("unpack ok\n")); err != nil { + if err := writePacketLine(w, []byte("unpack ok\n")); err != nil { return err } } else { - if err := writePacketLine(&buf, []byte("unpack index-pack failed\n")); err != nil { + if err := writePacketLine(w, []byte("unpack index-pack failed\n")); err != nil { return err } } for _, c := range commands { if c.err != "" { - if err := writePacketf(&buf, "ng %s %s\n", c.refname, c.err); err != nil { + if err := writePacketf(w, "ng %s %s\n", c.refname, c.err); err != nil { return err } } else { - if err := writePacketf(&buf, "%s %s\n", c.reportFF, c.refname); err != nil { + if err := writePacketf(w, "%s %s\n", c.reportFF, c.refname); err != nil { return err } // FIXME? if statusV2, maybe also write option refname, option old-oid, option new-oid, option forced-update } } - if _, err := fmt.Fprint(&buf, "0000"); err != nil { + if _, err := fmt.Fprint(w, "0000"); err != nil { + return err + } + + return nil +} + +func (r *spokesReceivePack) report(_ context.Context, unpackOK bool, commands []command, capabilities pktline.Capabilities) error { + if !useSideBand(capabilities) { + return writeReport(r.output, unpackOK, commands) + } + + var buf bytes.Buffer + + if err := writeReport(&buf, unpackOK, commands); err != nil { return err } output := buf.Bytes() + packetMax := sideBandBufSize(capabilities) + for len(output) > 0 { - n := 4096 + n := packetMax - 5 if len(output) < n { n = len(output) } @@ -865,9 +880,11 @@ func (r *spokesReceivePack) report(_ context.Context, unpackOK bool, commands [] } output = output[n:] } + if _, err := fmt.Fprintf(r.output, "0000"); err != nil { return nil } + return nil } From 2e77fa787e8dca139d9f8b278f4efda8874eed9a Mon Sep 17 00:00:00 2001 From: Matt Burke Date: Thu, 6 Apr 2023 10:06:05 -0400 Subject: [PATCH 7/8] Use a file for the empty pack The empty pack is the same for all of these tests, so we can generate it once, store it in the repo, and avoid the complexity of shelling out to git during the tests. Here's how I generated the file: $ git pack-objects --revs --stdout > internal/integration/testdata/empty.pack 32 bytes 4 files changed, 3 insertions(+), 12 deletions(-) create mode 100644 internal/integration/testdata/empty.pack diff --git a/internal/integration/hiderefs_test.go b/internal/integration/hiderefs_test.go index 4e8b9a4..fd483d4 100644 --- a/internal/integration/hiderefs_test.go +++ b/internal/integration/hiderefs_test.go @@ -110,11 +110,8 @@ func TestHiderefsConfig(t *testing.T) { // Send an empty pack, since we're using commits that are already in // the repo. - packObjects := exec.CommandContext(ctx, "git", "-C", testRepo, "pack-objects", "--all-progress-implied", "--revs", "--stdout", "--thin", "--delta-base-offset", "--progress") - packObjects.Stderr = os.Stderr - pack, err := packObjects.StdoutPipe() + pack, err := os.Open("testdata/empty.pack") require.NoError(t, err) - go packObjects.Run() if _, err := io.Copy(srpIn, pack); err != nil { t.Logf("error writing pack to spokes-receive-pack input: %v", err) } diff --git a/internal/integration/nosideband_test.go b/internal/integration/nosideband_test.go index 0b69a67..5a2a9b5 100644 --- a/internal/integration/nosideband_test.go +++ b/internal/integration/nosideband_test.go @@ -72,11 +72,8 @@ func TestNoSideBand(t *testing.T) { // Send an empty pack, since we're using commits that are already in // the repo. - packObjects := exec.CommandContext(ctx, "git", "-C", testRepo, "pack-objects", "--all-progress-implied", "--revs", "--stdout", "--thin", "--delta-base-offset", "--progress") - packObjects.Stderr = os.Stderr - pack, err := packObjects.StdoutPipe() + pack, err := os.Open("testdata/empty.pack") require.NoError(t, err) - go packObjects.Run() if _, err := io.Copy(srpIn, pack); err != nil { t.Logf("error writing pack to spokes-receive-pack input: %v", err) } diff --git a/internal/integration/pushoptions_test.go b/internal/integration/pushoptions_test.go index 1f35ebf..d4c1985 100644 --- a/internal/integration/pushoptions_test.go +++ b/internal/integration/pushoptions_test.go @@ -71,11 +71,8 @@ func TestPushOptions(t *testing.T) { // Send an empty pack, since we're using commits that are already in // the repo. - packObjects := exec.CommandContext(ctx, "git", "-C", testRepo, "pack-objects", "--all-progress-implied", "--revs", "--stdout", "--thin", "--delta-base-offset", "--progress") - packObjects.Stderr = os.Stderr - pack, err := packObjects.StdoutPipe() + pack, err := os.Open("testdata/empty.pack") require.NoError(t, err) - go packObjects.Run() if _, err := io.Copy(srpIn, pack); err != nil { t.Logf("error writing pack to spokes-receive-pack input: %v", err) } diff --git a/internal/integration/testdata/empty.pack b/internal/integration/testdata/empty.pack new file mode 100644 index 0000000000000000000000000000000000000000..dc270159c10266fdd2c21f1d16288f0da18ca30f GIT binary patch literal 32 mcmWG=boORoU|<4b2Bx_jP1ZM7yxJ Date: Thu, 6 Apr 2023 10:26:20 -0400 Subject: [PATCH 8/8] Don't fail is there was some data When reading the output from spokes-receive-pack, sometimes we get "file already closed" instead of EOF at the end of the output. In that case, when there is some data, just log the error and proceed to parsing the data. This is an attempt to fix or at least learn more about errors like the one in https://github.com/github/spokes-receive-pack/actions/runs/4629813916/jobs/8190557968. --- internal/integration/hiderefs_test.go | 2 +- internal/integration/nosideband_test.go | 10 +++++++--- internal/integration/parse.go | 8 ++++++-- internal/integration/pushoptions_test.go | 2 +- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/internal/integration/hiderefs_test.go b/internal/integration/hiderefs_test.go index fd483d4..52ebe67 100644 --- a/internal/integration/hiderefs_test.go +++ b/internal/integration/hiderefs_test.go @@ -116,7 +116,7 @@ func TestHiderefsConfig(t *testing.T) { t.Logf("error writing pack to spokes-receive-pack input: %v", err) } - refStatus, unpackRes, _, err := readResult(bufSRPOut) + refStatus, unpackRes, _, err := readResult(t, bufSRPOut) require.NoError(t, err) assert.Equal(t, map[string]string{ createBranch: "ok", diff --git a/internal/integration/nosideband_test.go b/internal/integration/nosideband_test.go index 5a2a9b5..5b70ab6 100644 --- a/internal/integration/nosideband_test.go +++ b/internal/integration/nosideband_test.go @@ -80,7 +80,7 @@ func TestNoSideBand(t *testing.T) { require.NoError(t, srpIn.Close()) - lines, err := readResultNoSideBand(bufSRPOut) + lines, err := readResultNoSideBand(t, bufSRPOut) require.NoError(t, err) assert.Equal(t, []string{ "unpack ok\n", @@ -88,13 +88,17 @@ func TestNoSideBand(t *testing.T) { }, lines) } -func readResultNoSideBand(r io.Reader) ([]string, error) { +func readResultNoSideBand(t *testing.T, r io.Reader) ([]string, error) { var lines []string // Read all of the output so that we can include it with errors. data, err := io.ReadAll(r) if err != nil { - return nil, err + if len(data) > 0 { + t.Logf("got data, but there was an error: %v", err) + } else { + return nil, err + } } // Replace r. diff --git a/internal/integration/parse.go b/internal/integration/parse.go index 96d56b0..d2e1180 100644 --- a/internal/integration/parse.go +++ b/internal/integration/parse.go @@ -51,7 +51,7 @@ func readAdv(r io.Reader) (map[string]string, string, error) { } } -func readResult(r io.Reader) (map[string]string, string, [][]byte, error) { +func readResult(t *testing.T, r io.Reader) (map[string]string, string, [][]byte, error) { var ( refStatus map[string]string unpackRes string @@ -61,7 +61,11 @@ func readResult(r io.Reader) (map[string]string, string, [][]byte, error) { // Read all of the output so that we can include it with errors. data, err := io.ReadAll(r) if err != nil { - return nil, "", nil, err + if len(data) > 0 { + t.Logf("got data, but there was an error: %v", err) + } else { + return nil, "", nil, err + } } // Replace r. diff --git a/internal/integration/pushoptions_test.go b/internal/integration/pushoptions_test.go index d4c1985..21d73e5 100644 --- a/internal/integration/pushoptions_test.go +++ b/internal/integration/pushoptions_test.go @@ -79,7 +79,7 @@ func TestPushOptions(t *testing.T) { require.NoError(t, srpIn.Close()) - refStatus, unpackRes, _, err := readResult(bufSRPOut) + refStatus, unpackRes, _, err := readResult(t, bufSRPOut) require.NoError(t, err) assert.Equal(t, map[string]string{ createBranch: "ok",