Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support clients that don't enable side-band or side-band-64k #41

Merged
merged 8 commits into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions internal/integration/hiderefs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,13 @@ func TestHiderefsConfig(t *testing.T) {

// Send an empty pack, since we're using commits that are already in
// the repo.
packObjects := exec.CommandContext(ctx, "git", "-C", testRepo, "pack-objects", "--all-progress-implied", "--revs", "--stdout", "--thin", "--delta-base-offset", "--progress")
packObjects.Stderr = os.Stderr
pack, err := packObjects.StdoutPipe()
pack, err := os.Open("testdata/empty.pack")
require.NoError(t, err)
go packObjects.Run()
if _, err := io.Copy(srpIn, pack); err != nil {
t.Logf("error writing pack to spokes-receive-pack input: %v", err)
}

refStatus, unpackRes, _, err := readResult(bufSRPOut)
refStatus, unpackRes, _, err := readResult(t, bufSRPOut)
require.NoError(t, err)
assert.Equal(t, map[string]string{
createBranch: "ok",
Expand Down
120 changes: 120 additions & 0 deletions internal/integration/nosideband_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
//go:build integration

package integration

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestNoSideBand(t *testing.T) {
const (
defaultBranch = "refs/heads/main"
createBranch = "refs/heads/newbranch"

testCommit = "e589bdee50e39beac56220c4b7a716225f79e3cf"
)

wd, err := os.Getwd()
require.NoError(t, err)
origin := filepath.Join(wd, "testdata/remote/git-internals-fork.git")

testRepo := t.TempDir()
requireRun(t, "git", "init", "--bare", testRepo)
requireRun(t, "git", "-C", testRepo, "fetch", origin, defaultBranch+":"+defaultBranch)

ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

srp := exec.CommandContext(ctx, "spokes-receive-pack", ".")
srp.Dir = testRepo
srp.Env = append(os.Environ(),
"GIT_SOCKSTAT_VAR_spokes_quarantine=bool:true",
"GIT_SOCKSTAT_VAR_quarantine_id=config-test-quarantine-id")
srp.Stderr = os.Stderr
srpIn, err := srp.StdinPipe()
require.NoError(t, err)
srpOut, err := srp.StdoutPipe()
require.NoError(t, err)

srpErr := make(chan error)
go func() { srpErr <- srp.Run() }()

bufSRPOut := bufio.NewReader(srpOut)

refs, _, err := readAdv(bufSRPOut)
require.NoError(t, err)
assert.Equal(t, refs, map[string]string{
defaultBranch: testCommit,
})

oldnew := fmt.Sprintf("%040d %s", 0, testCommit)
require.NoError(t, writePktlinef(srpIn,
"%s %s\x00report-status report-status-v2 push-options object-format=sha1\n", oldnew, createBranch))
_, err = srpIn.Write([]byte("0000"))
require.NoError(t, err)

require.NoError(t, writePktlinef(srpIn,
"anything i want to put in a push option\n"))
_, err = srpIn.Write([]byte("0000"))
require.NoError(t, err)

// Send an empty pack, since we're using commits that are already in
// the repo.
pack, err := os.Open("testdata/empty.pack")
require.NoError(t, err)
if _, err := io.Copy(srpIn, pack); err != nil {
t.Logf("error writing pack to spokes-receive-pack input: %v", err)
}

require.NoError(t, srpIn.Close())

lines, err := readResultNoSideBand(t, bufSRPOut)
require.NoError(t, err)
assert.Equal(t, []string{
"unpack ok\n",
"ok refs/heads/newbranch\n",
}, lines)
}

func readResultNoSideBand(t *testing.T, r io.Reader) ([]string, error) {
var lines []string

// Read all of the output so that we can include it with errors.
data, err := io.ReadAll(r)
if err != nil {
if len(data) > 0 {
t.Logf("got data, but there was an error: %v", err)
} else {
return nil, err
}
}

// Replace r.
r = bytes.NewReader(data)

for {
pkt, err := readPktline(r)
switch {
case err != nil:
return nil, fmt.Errorf("%w while parsing %q", err, string(data))

case pkt == nil:
return lines, nil

default:
lines = append(lines, string(pkt))
}
}
}
8 changes: 6 additions & 2 deletions internal/integration/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func readAdv(r io.Reader) (map[string]string, string, error) {
}
}

func readResult(r io.Reader) (map[string]string, string, [][]byte, error) {
func readResult(t *testing.T, r io.Reader) (map[string]string, string, [][]byte, error) {
var (
refStatus map[string]string
unpackRes string
Expand All @@ -61,7 +61,11 @@ func readResult(r io.Reader) (map[string]string, string, [][]byte, error) {
// Read all of the output so that we can include it with errors.
data, err := io.ReadAll(r)
if err != nil {
return nil, "", nil, err
if len(data) > 0 {
t.Logf("got data, but there was an error: %v", err)
} else {
return nil, "", nil, err
}
Comment on lines -64 to +68
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change works around a similar problem to the one described in hashicorp/go-plugin#116, where there's a race between reads from a pipe and exec.Cmd.Wait closing the pipe after the process exits.

}

// Replace r.
Expand Down
7 changes: 2 additions & 5 deletions internal/integration/pushoptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,18 +71,15 @@ func TestPushOptions(t *testing.T) {

// Send an empty pack, since we're using commits that are already in
// the repo.
packObjects := exec.CommandContext(ctx, "git", "-C", testRepo, "pack-objects", "--all-progress-implied", "--revs", "--stdout", "--thin", "--delta-base-offset", "--progress")
packObjects.Stderr = os.Stderr
pack, err := packObjects.StdoutPipe()
pack, err := os.Open("testdata/empty.pack")
require.NoError(t, err)
go packObjects.Run()
if _, err := io.Copy(srpIn, pack); err != nil {
t.Logf("error writing pack to spokes-receive-pack input: %v", err)
}

require.NoError(t, srpIn.Close())

refStatus, unpackRes, _, err := readResult(bufSRPOut)
refStatus, unpackRes, _, err := readResult(t, bufSRPOut)
require.NoError(t, err)
assert.Equal(t, map[string]string{
createBranch: "ok",
Expand Down
Binary file added internal/integration/testdata/empty.pack
Binary file not shown.
5 changes: 2 additions & 3 deletions internal/pktline/capabilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,8 @@ func (c Capabilities) Quiet() Capability {
func (c Capabilities) Atomic() Capability {
return c.caps[Atomic]
}
func (c Capabilities) HasPushOptions() bool {
_, ok := c.caps[PushOptions]
return ok
func (c Capabilities) PushOptions() Capability {
return c.caps[PushOptions]
}
func (c Capabilities) AllowTipSha1InWant() Capability {
return c.caps[AllowTipSha1InWant]
Expand Down
67 changes: 46 additions & 21 deletions internal/spokes/spokes.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (r *spokesReceivePack) execute(ctx context.Context) error {
return nil
}

if capabilities.HasPushOptions() {
if capabilities.IsDefined(pktline.PushOptions) {
// We don't use push-options here.
if err := r.dumpPushOptions(ctx); err != nil {
return err
Expand Down Expand Up @@ -190,12 +190,12 @@ func (r *spokesReceivePack) execute(ctx context.Context) error {
}

if capabilities.IsDefined(pktline.ReportStatusV2) || capabilities.IsDefined(pktline.ReportStatus) {
if err := r.report(ctx, unpackErr == nil, commands); err != nil {
if err := r.report(ctx, unpackErr == nil, commands, capabilities); err != nil {
return err
}
}

return nil
return unpackErr
}

func (r *spokesReceivePack) isFastForward(c *command, ctx context.Context) bool {
Expand Down Expand Up @@ -520,8 +520,11 @@ func (r *spokesReceivePack) readPack(ctx context.Context, commands []command, ca

// FIXME? add --pack_header similar to git's push_header_arg

// These options are always on in prod.
args = append(args, "--show-resolving-progress", "--report-end-of-input", "--fix-thin")
if useSideBand(capabilities) {
args = append(args, "--show-resolving-progress", "--report-end-of-input")
}

args = append(args, "--fix-thin")

if r.isFsckConfigEnabled() {
args = append(args, "--strict")
Expand Down Expand Up @@ -673,10 +676,7 @@ func (r *spokesReceivePack) getRefUpdateCommandLimit() (int, error) {
// startSidebandMultiplexer checks if a sideband capability has been required and, in that case, starts multiplexing the
// stderr of the command `cmd` into the indicated `output`
func startSidebandMultiplexer(stderr io.ReadCloser, output io.Writer, capabilities pktline.Capabilities) (*errgroup.Group, error) {
_, sbDefined := capabilities.Get(pktline.SideBand)
_, sb64kDefined := capabilities.Get(pktline.SideBand64k)

if !sbDefined && !sb64kDefined {
if !useSideBand(capabilities) {
// no sideband capability has been defined
return nil, nil
}
Expand All @@ -689,10 +689,7 @@ func startSidebandMultiplexer(stderr io.ReadCloser, output io.Writer, capabiliti
_ = stderr.Close()
}()
for {
var bufferSize = 999
if sb64kDefined {
bufferSize = 65519
}
bufferSize := sideBandBufSize(capabilities)
buf := make([]byte, bufferSize)

n, err := stderr.Read(buf[:])
Expand Down Expand Up @@ -828,38 +825,53 @@ func (r *spokesReceivePack) performCheckConnectivityOnObject(ctx context.Context
}

// report the success/failure of the push operation to the client
func (r *spokesReceivePack) report(_ context.Context, unpackOK bool, commands []command) error {
var buf bytes.Buffer
func writeReport(w io.Writer, unpackOK bool, commands []command) error {
if unpackOK {
if err := writePacketLine(&buf, []byte("unpack ok\n")); err != nil {
if err := writePacketLine(w, []byte("unpack ok\n")); err != nil {
return err
}
} else {
if err := writePacketLine(&buf, []byte("unpack index-pack failed\n")); err != nil {
if err := writePacketLine(w, []byte("unpack index-pack failed\n")); err != nil {
return err
}
}
for _, c := range commands {
if c.err != "" {
if err := writePacketf(&buf, "ng %s %s\n", c.refname, c.err); err != nil {
if err := writePacketf(w, "ng %s %s\n", c.refname, c.err); err != nil {
return err
}
} else {
if err := writePacketf(&buf, "%s %s\n", c.reportFF, c.refname); err != nil {
if err := writePacketf(w, "%s %s\n", c.reportFF, c.refname); err != nil {
return err
}
// FIXME? if statusV2, maybe also write option refname, option old-oid, option new-oid, option forced-update
}
}

if _, err := fmt.Fprint(&buf, "0000"); err != nil {
if _, err := fmt.Fprint(w, "0000"); err != nil {
return err
}

return nil
}

func (r *spokesReceivePack) report(_ context.Context, unpackOK bool, commands []command, capabilities pktline.Capabilities) error {
if !useSideBand(capabilities) {
return writeReport(r.output, unpackOK, commands)
}

var buf bytes.Buffer

if err := writeReport(&buf, unpackOK, commands); err != nil {
return err
}

output := buf.Bytes()

packetMax := sideBandBufSize(capabilities)

for len(output) > 0 {
n := 4096
n := packetMax - 5
if len(output) < n {
n = len(output)
}
Expand All @@ -868,9 +880,11 @@ func (r *spokesReceivePack) report(_ context.Context, unpackOK bool, commands []
}
output = output[n:]
}

if _, err := fmt.Fprintf(r.output, "0000"); err != nil {
return nil
}

return nil
}

Expand All @@ -884,3 +898,14 @@ func includeNonDeletes(commands []command) bool {
}
return false
}

func useSideBand(c pktline.Capabilities) bool {
return c.IsDefined(pktline.SideBand) || c.IsDefined(pktline.SideBand64k)
}

func sideBandBufSize(capabilities pktline.Capabilities) int {
if capabilities.IsDefined(pktline.SideBand64k) {
return 65519
}
return 999
}