Skip to content

Commit

Permalink
transport: support external proxy
Browse files Browse the repository at this point in the history
  • Loading branch information
fcharlie committed Dec 20, 2024
1 parent f51f869 commit 857f37d
Show file tree
Hide file tree
Showing 16 changed files with 128 additions and 60 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ SOURCE_DIR := $(abspath $(dir $(lastword ${MAKEFILE_LIST})))
BUILD_DIR := ${SOURCE_DIR}/_build
BUILD_TIME := $(shell date +'%Y-%m-%dT%H:%M:%S%z')
BUILD_COMMIT := $(shell git rev-parse --short HEAD 2>/dev/null || echo 'none')
BUILD_VERSION := $(shell cat VERSION || echo '0.16.0')
BUILD_VERSION := $(shell cat VERSION || echo '0.16.1')
GO_PACKAGES := $(shell go list ./... | grep -v '^${PKG}/mock/' | grep -v '^${PKG}/proto/')
GO_LDFLAGS := -ldflags '-X ${PKG}/pkg/version.version=${BUILD_VERSION} -X ${PKG}/pkg/version.buildTime=${BUILD_TIME} -X ${PKG}/pkg/version.buildCommit=${BUILD_COMMIT}'

Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.16.0
0.16.1
2 changes: 1 addition & 1 deletion bali.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "zeta"
summary = "HugeSCM - A next generation cloud-based version control system"
description = "HugeSCM - A next generation cloud-based version control system"
package-name = "alipay-linkc-zeta"
version = "0.16.0"
version = "0.16.1"
license = "MIT"
prefix = "/usr/local"
packager = "江二"
Expand Down
2 changes: 1 addition & 1 deletion cmd/zeta-mc/crate.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "zeta-mc"
description = "zeta-mc - Migrate Git repository to zeta"
destination = "bin"
version = "0.16.0"
version = "0.16.1"
goflags = [
"-ldflags",
"-X github.com/antgroup/hugescm/pkg/version.version=$BUILD_VERSION -X github.com/antgroup/hugescm/pkg/version.buildTime=$BUILD_TIME -X github.com/antgroup/hugescm/pkg/version.buildCommit=$BUILD_COMMIT",
Expand Down
2 changes: 1 addition & 1 deletion cmd/zeta/crate.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name = "zeta"
description = "HugeSCM - A next generation cloud-based version control system"
destination = "bin"
version = "0.16.0"
version = "0.16.1"
goflags = [
"-ldflags",
"-X github.com/antgroup/hugescm/pkg/version.version=$BUILD_VERSION -X github.com/antgroup/hugescm/pkg/version.buildTime=$BUILD_TIME -X github.com/antgroup/hugescm/pkg/version.buildCommit=$BUILD_COMMIT",
Expand Down
6 changes: 4 additions & 2 deletions modules/zeta/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ func (h *HTTP) Overwrite(o *HTTP) {
}

type Transport struct {
MaxEntries int `toml:"maxEntries,omitempty"`
LargeSizeRaw Size `toml:"largeSize,omitempty"`
MaxEntries int `toml:"maxEntries,omitempty"`
LargeSizeRaw Size `toml:"largeSize,omitempty"`
ExternalProxy string `toml:"externalProxy,omitempty"` // externalProxy
}

const (
Expand All @@ -163,6 +164,7 @@ func (t *Transport) Overwrite(o *Transport) {
if o.MaxEntries > 0 {
t.MaxEntries = o.MaxEntries
}
t.ExternalProxy = overwrite(t.ExternalProxy, o.ExternalProxy)
}

type Diff struct {
Expand Down
24 changes: 22 additions & 2 deletions pkg/transport/http/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,35 @@ type Downloader interface {
type downloader struct {
*http.Client
userAgent string
proxyURL string
language string
termEnv string
verbose bool
}

func NewDownloader(verbose bool, insecure bool) Downloader {
func proxyFromURL(externalProxyURL string) func(req *http.Request) (*url.URL, error) {
if len(externalProxyURL) != 0 {
// cfg := &httpproxy.Config{
// HTTPProxy: externalProxyURL,
// HTTPSProxy: externalProxyURL,
// }
// proxyFuncValue := cfg.ProxyFunc()
// return func(req *http.Request) (*url.URL, error) {
// fmt.Fprintf(os.Stderr, "proxy: %s\n", req.URL)
// return proxyFuncValue(req.URL)
// }
if proxyURL, err := url.Parse(externalProxyURL); err == nil {
return http.ProxyURL(proxyURL)
}
}
return proxy.ProxyFromEnvironment
}

func NewDownloader(verbose bool, insecure bool, proxyURL string) Downloader {
return &downloader{
Client: &http.Client{
Transport: &http.Transport{
Proxy: proxy.ProxyFromEnvironment,
Proxy: proxyFromURL(proxyURL),
DialContext: dialer.DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
Expand All @@ -136,6 +155,7 @@ func NewDownloader(verbose bool, insecure bool) Downloader {
},
userAgent: "Zeta/" + version.GetVersion(),
language: tr.Language(),
proxyURL: proxyURL,
termEnv: os.Getenv("TERM"),
verbose: verbose,
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/transport/http/external_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package http

import (
"context"
"fmt"
"io"
"os"
"testing"

"github.com/antgroup/hugescm/pkg/transport"
)

func TestProxy(t *testing.T) {
dl := NewDownloader(true, false, "socks5://127.0.0.1:13659")
sr, err := dl.Download(context.Background(), &transport.Representation{
Href: "https://github.com/zed-industries/zed/releases/download/v0.166.1/zed-remote-server-macos-x86_64.gz",
}, 0)
if err != nil {
fmt.Fprintf(os.Stderr, "download error: %v\n", err)
return
}
defer sr.Close()
if _, err := io.Copy(io.Discard, sr); err != nil {
fmt.Fprintf(os.Stderr, "download error: %v\n", err)
}
}
68 changes: 34 additions & 34 deletions pkg/zeta/cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,21 @@ func catShowError(oid string, err error) error {
return err
}

func (r *Repository) fetchMissingBlob(ctx context.Context, oid plumbing.Hash) error {
if r.odb.Exists(oid, false) {
func (r *Repository) fetchMissingBlob(ctx context.Context, o *promiseObject) error {
if r.odb.Exists(o.oid, false) {
return nil
}
if !r.promisorEnabled() {
return plumbing.NoSuchObject(oid)
return plumbing.NoSuchObject(o.oid)
}
return r.promiseMissingFetch(ctx, oid)
return r.promiseMissingFetch(ctx, o)
}

func (r *Repository) catMissingObject(ctx context.Context, oid plumbing.Hash) (*object.Blob, error) {
if err := r.fetchMissingBlob(ctx, oid); err != nil {
func (r *Repository) catMissingObject(ctx context.Context, o *promiseObject) (*object.Blob, error) {
if err := r.fetchMissingBlob(ctx, o); err != nil {
return nil, err
}
return r.odb.Blob(ctx, oid)
return r.odb.Blob(ctx, o.oid)
}

func objectSize(a object.Encoder) int {
Expand All @@ -87,37 +87,37 @@ func objectSize(a object.Encoder) int {
return b.Len()
}

func (r *Repository) printSize(ctx context.Context, opts *CatOptions, oid plumbing.Hash) error {
func (r *Repository) printSize(ctx context.Context, opts *CatOptions, o *promiseObject) error {
var a any
var err error
if a, err = r.odb.Object(ctx, oid); err == nil {
if a, err = r.odb.Object(ctx, o.oid); err == nil {
if v, ok := a.(object.Encoder); !ok {
return opts.Println(objectSize(v))
}
// unreachable
return nil
}
if !plumbing.IsNoSuchObject(err) {
fmt.Fprintf(os.Stderr, "cat-file: resolve object '%s' error: %v\n", oid, err)
fmt.Fprintf(os.Stderr, "cat-file: resolve object '%s' error: %v\n", o.oid, err)
return err
}
var b *object.Blob
if b, err = r.catMissingObject(ctx, oid); err != nil {
return catShowError(oid.String(), err)
if b, err = r.catMissingObject(ctx, o); err != nil {
return catShowError(o.oid.String(), err)
}
defer b.Close()
return opts.Println(b.Size)
}

func (r *Repository) printType(ctx context.Context, opts *CatOptions, oid plumbing.Hash) error {
a, err := r.odb.Object(ctx, oid)
func (r *Repository) printType(ctx context.Context, opts *CatOptions, o *promiseObject) error {
a, err := r.odb.Object(ctx, o.oid)
if plumbing.IsNoSuchObject(err) {
if err := r.fetchMissingBlob(ctx, oid); err == nil {
if err := r.fetchMissingBlob(ctx, o); err == nil {
return opts.Println("blob")
}
}
if err != nil {
return catShowError(oid.String(), err)
return catShowError(o.oid.String(), err)
}
switch a.(type) {
case *object.Commit:
Expand All @@ -136,11 +136,11 @@ const (
binaryTruncated = "*** Binary truncated ***"
)

func (r *Repository) catBlob(ctx context.Context, opts *CatOptions, oid plumbing.Hash) error {
if oid == backend.BLANK_BLOB_HASH {
func (r *Repository) catBlob(ctx context.Context, opts *CatOptions, o *promiseObject) error {
if o.oid == backend.BLANK_BLOB_HASH {
return nil // empty blob, skip
}
b, err := r.catMissingObject(ctx, oid)
b, err := r.catMissingObject(ctx, o)
if err != nil {
return err
}
Expand Down Expand Up @@ -191,7 +191,7 @@ func (r *Repository) catFragments(ctx context.Context, opts *CatOptions, ff *obj
}()
readers := make([]io.Reader, 0, len(ff.Entries))
for _, e := range ff.Entries {
o, err := r.catMissingObject(ctx, e.Hash)
o, err := r.catMissingObject(ctx, &promiseObject{oid: e.Hash, size: int64(e.Size)})
if err != nil {
return err
}
Expand All @@ -216,19 +216,19 @@ func (r *Repository) catFragments(ctx context.Context, opts *CatOptions, ff *obj
return nil
}

func (r *Repository) catObject(ctx context.Context, opts *CatOptions, oid plumbing.Hash) error {
func (r *Repository) catObject(ctx context.Context, opts *CatOptions, o *promiseObject) error {
if opts.PrintSize {
return r.printSize(ctx, opts, oid)
return r.printSize(ctx, opts, o)
}
if opts.Type {
return r.printType(ctx, opts, oid)
return r.printType(ctx, opts, o)
}
a, err := r.odb.Object(ctx, oid)
a, err := r.odb.Object(ctx, o.oid)
if plumbing.IsNoSuchObject(err) {
return catShowError(oid.String(), r.catBlob(ctx, opts, oid))
return catShowError(o.oid.String(), r.catBlob(ctx, opts, o))
}
if err != nil {
return catShowError(oid.String(), err)
return catShowError(o.oid.String(), err)
}
if opts.Verify {
if w, ok := a.(object.Encoder); ok {
Expand Down Expand Up @@ -269,7 +269,7 @@ func (r *Repository) catBranchOrTag(ctx context.Context, opts *CatOptions, branc
return catShowError(branchOrTag, err)
}
r.DbgPrint("resolve object '%s'", oid)
return r.catObject(ctx, opts, oid)
return r.catObject(ctx, opts, &promiseObject{oid: oid})
}

func (r *Repository) Cat(ctx context.Context, opts *CatOptions) error {
Expand All @@ -292,17 +292,17 @@ func (r *Repository) Cat(ctx context.Context, opts *CatOptions) error {
case *object.Tree:
if len(v) == 0 {
// self
return r.catObject(ctx, opts, a.Hash)
return r.catObject(ctx, opts, &promiseObject{oid: a.Hash})
}
e, err := a.FindEntry(ctx, v)
if err != nil {
return catShowError(v, err)
}
return r.catObject(ctx, opts, e.Hash)
return r.catObject(ctx, opts, &promiseObject{oid: e.Hash, size: e.Size})
case *object.Commit:
if len(v) == 0 {
// root tree
return r.catObject(ctx, opts, a.Tree)
return r.catObject(ctx, opts, &promiseObject{oid: a.Tree})
}
root, err := r.odb.Tree(ctx, a.Tree)
if err != nil {
Expand All @@ -312,15 +312,15 @@ func (r *Repository) Cat(ctx context.Context, opts *CatOptions) error {
if err != nil {
return catShowError(v, err)
}
return r.catObject(ctx, opts, e.Hash)
return r.catObject(ctx, opts, &promiseObject{oid: e.Hash, size: e.Size})
case *object.Tag:
cc, err := r.odb.ParseRevExhaustive(ctx, a.Hash)
if err != nil {
return catShowError(v, err)
}
if len(v) == 0 {
// root tree
return r.catObject(ctx, opts, cc.Tree)
return r.catObject(ctx, opts, &promiseObject{oid: cc.Tree})
}
root, err := r.odb.Tree(ctx, cc.Tree)
if err != nil {
Expand All @@ -330,8 +330,8 @@ func (r *Repository) Cat(ctx context.Context, opts *CatOptions) error {
if err != nil {
return catShowError(v, err)
}
return r.catObject(ctx, opts, e.Hash)
return r.catObject(ctx, opts, &promiseObject{oid: e.Hash, size: e.Size})
default:
}
return r.catObject(ctx, opts, oid)
return r.catObject(ctx, opts, &promiseObject{oid: oid})
}
2 changes: 1 addition & 1 deletion pkg/zeta/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type DoFetchOptions struct {
}

func (opts *DoFetchOptions) ReferenceName() plumbing.ReferenceName {
if len(opts.Name) == 0 {
if len(opts.Name) == 0 || opts.Name == string(plumbing.HEAD) {
return plumbing.HEAD
}
if strings.HasPrefix(opts.Name, plumbing.ReferencePrefix) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/zeta/merge_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (r *Repository) readMissingText(ctx context.Context, oid plumbing.Hash, tex
case err == nil:
// nothing
case plumbing.IsNoSuchObject(err):
if err = r.promiseMissingFetch(ctx, oid); err != nil {
if err = r.promiseMissingFetch(ctx, &promiseObject{oid: oid}); err != nil {
return "", "", err
}
if br, err = r.odb.Blob(ctx, oid); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/zeta/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
ENV_ZETA_SSL_NO_VERIFY = "ZETA_SSL_NO_VERIFY"
ENV_ZETA_TRANSPORT_MAX_ENTRIES = "ZETA_TRANSPORT_MAX_ENTRIES"
ENV_ZETA_TRANSPORT_LARGE_SIZE = "ZETA_TRANSPORT_LARGE_SIZE"
ENV_ZETA_TRANSPORT_EXTERNAL_PROXY = "ZETA_TRANSPORT_EXTERNAL_PROXY"
)

var (
Expand Down
18 changes: 15 additions & 3 deletions pkg/zeta/promisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,16 @@ func (r *Repository) promiseFetch(ctx context.Context, rev string, fetchMissing
return oid, nil
}

func (r *Repository) promiseMissingFetch(ctx context.Context, oid plumbing.Hash) (err error) {
type promiseObject struct {
oid plumbing.Hash
size int64
}

func (o *promiseObject) entry() *odb.Entry {
return &odb.Entry{Hash: o.oid, Size: o.size}
}

func (r *Repository) promiseMissingFetch(ctx context.Context, o *promiseObject) (err error) {
t, err := r.newTransport(ctx, transport.DOWNLOAD)
if err != nil {
return err
Expand All @@ -157,8 +166,11 @@ func (r *Repository) promiseMissingFetch(ctx context.Context, oid plumbing.Hash)
if r.quiet {
mode = odb.NO_BAR
}
if o.size >= r.largeSize() {
return r.transfer(ctx, t, []*odb.Entry{o.entry()})
}
// Fetch missing object
return r.odb.DoTransfer(ctx, oid, func(offset int64) (transport.SizeReader, error) {
return t.GetObject(ctx, oid, offset)
return r.odb.DoTransfer(ctx, o.oid, func(offset int64) (transport.SizeReader, error) {
return t.GetObject(ctx, o.oid, offset)
}, odb.NewSingleBar, mode)
}
7 changes: 7 additions & 0 deletions pkg/zeta/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,13 @@ func (r *Repository) largeSize() int64 {
return r.Transport.LargeSize()
}

func (r *Repository) externalProxy() string {
if externalProxy, ok := r.getFromValueOrEnv("transport.externalProxy", ENV_ZETA_TRANSPORT_EXTERNAL_PROXY); ok && len(externalProxy) > 0 {
return externalProxy
}
return r.Transport.ExternalProxy
}

func (r *Repository) NewCommitter() *object.Signature {
return &object.Signature{
Name: r.committerName(),
Expand Down
Loading

0 comments on commit 857f37d

Please sign in to comment.