From 0ec271bade87c3006a24ff1f128246eb0eb82837 Mon Sep 17 00:00:00 2001 From: Gus Eggert Date: Mon, 12 Sep 2022 16:55:42 -0400 Subject: [PATCH] feat: use GET for FindProviders This is so the result is cacheable using a CDN, which is required by StoreTheIndex to scale to current traffic levels from Hydra. --- gen/proto/proto_edelweiss.go | 450 ++++++++++++++++++++++++++--------- gen/routing.go | 1 + go.mod | 2 +- go.sum | 4 +- test/clientserver_test.go | 33 ++- 5 files changed, 361 insertions(+), 129 deletions(-) diff --git a/gen/proto/proto_edelweiss.go b/gen/proto/proto_edelweiss.go index 6b0dd7e..2750ef5 100644 --- a/gen/proto/proto_edelweiss.go +++ b/gen/proto/proto_edelweiss.go @@ -5,21 +5,21 @@ package proto import ( pd6 "bytes" pd7 "context" - pd9 "errors" + pd10 "errors" pd2 "fmt" pd16 "github.com/ipfs/go-cid" pd5 "github.com/ipfs/go-log/v2" - pd13 "github.com/ipld/edelweiss/services" + pd14 "github.com/ipld/edelweiss/services" pd1 "github.com/ipld/edelweiss/values" - pd11 "github.com/ipld/go-ipld-prime" - pd8 "github.com/ipld/go-ipld-prime/codec/dagjson" + pd12 "github.com/ipld/go-ipld-prime" + pd8 "github.com/ipld/go-ipld-prime/codec/dagcbor" + pd9 "github.com/ipld/go-ipld-prime/codec/dagjson" pd3 "github.com/ipld/go-ipld-prime/datamodel" pd17 "github.com/ipld/go-ipld-prime/linking/cid" - pd10 "io" - pd15 "io/ioutil" + pd11 "io" pd4 "net/http" - pd12 "net/url" - pd14 "sync" + pd13 "net/url" + pd15 "sync" ) // -- protocol type DelegatedRouting_IdentifyArg -- @@ -1094,8 +1094,8 @@ type DelegatedRouting_ClientOption func(*client_DelegatedRouting) error type client_DelegatedRouting struct { httpClient *pd4.Client - endpoint *pd12.URL - ulk pd14.Mutex + endpoint *pd13.URL + ulk pd15.Mutex unsupported map[string]bool // cache of methods not supported by server } @@ -1107,7 +1107,7 @@ func DelegatedRouting_Client_WithHTTPClient(hc *pd4.Client) DelegatedRouting_Cli } func New_DelegatedRouting_Client(endpoint string, opts ...DelegatedRouting_ClientOption) (*client_DelegatedRouting, error) { - u, err := pd12.Parse(endpoint) + u, err := pd13.Parse(endpoint) if err != nil { return nil, err } @@ -1155,21 +1155,27 @@ func (c *client_DelegatedRouting) Identify_Async(ctx pd7.Context, req *Delegated notSupported := c.unsupported["Identify"] c.ulk.Unlock() if notSupported { - return nil, pd13.ErrSchema + return nil, pd14.ErrSchema } envelope := &AnonInductive4{ Identify: req, } - buf, err := pd11.Encode(envelope, pd8.Encode) + buf, err := pd12.Encode(envelope, pd8.Encode) // XXX: apply binary encoding on top? + if err != nil { return nil, pd2.Errorf("serializing DAG-JSON request: %w", err) } // encode request in URL u := *c.endpoint - httpReq, err := pd4.NewRequestWithContext(ctx, "POST", u.String(), pd6.NewReader(buf)) + + q := pd13.Values{} + q.Set("q", string(buf)) + u.RawQuery = q.Encode() + httpReq, err := pd4.NewRequestWithContext(ctx, "GET", u.String(), nil) + if err != nil { return nil, err } @@ -1191,7 +1197,7 @@ func (c *client_DelegatedRouting) Identify_Async(ctx pd7.Context, req *Delegated c.ulk.Lock() c.unsupported["Identify"] = true c.ulk.Unlock() - return nil, pd13.ErrSchema + return nil, pd14.ErrSchema } // HTTP codes other than 200 correspond to service implementation rejecting the call when it is received // for reasons unrelated to protocol schema @@ -1199,7 +1205,7 @@ func (c *client_DelegatedRouting) Identify_Async(ctx pd7.Context, req *Delegated resp.Body.Close() if resp.Header != nil { if errValues, ok := resp.Header["Error"]; ok && len(errValues) == 1 { - err = pd13.ErrService{Cause: pd2.Errorf("%s", errValues[0])} + err = pd14.ErrService{Cause: pd2.Errorf("%s", errValues[0])} } else { err = pd2.Errorf("service rejected the call, no cause provided") } @@ -1214,10 +1220,10 @@ func (c *client_DelegatedRouting) Identify_Async(ctx pd7.Context, req *Delegated return ch, nil } -func process_DelegatedRouting_Identify_AsyncResult(ctx pd7.Context, ch chan<- DelegatedRouting_Identify_AsyncResult, r pd10.ReadCloser) { +func process_DelegatedRouting_Identify_AsyncResult(ctx pd7.Context, ch chan<- DelegatedRouting_Identify_AsyncResult, r pd11.ReadCloser) { defer close(ch) defer r.Close() - opt := pd8.DecodeOptions{ + opt := pd9.DecodeOptions{ ParseLinks: true, ParseBytes: true, DontParseBeyondEnd: true, @@ -1225,24 +1231,24 @@ func process_DelegatedRouting_Identify_AsyncResult(ctx pd7.Context, ch chan<- De for { var out DelegatedRouting_Identify_AsyncResult - n, err := pd11.DecodeStreaming(r, opt.Decode) + n, err := pd12.DecodeStreaming(r, opt.Decode) - if pd9.Is(err, pd10.EOF) || pd9.Is(err, pd10.ErrUnexpectedEOF) || pd9.Is(err, pd7.DeadlineExceeded) || pd9.Is(err, pd7.Canceled) { + if pd10.Is(err, pd11.EOF) || pd10.Is(err, pd11.ErrUnexpectedEOF) || pd10.Is(err, pd7.DeadlineExceeded) || pd10.Is(err, pd7.Canceled) { return } if err != nil { - out = DelegatedRouting_Identify_AsyncResult{Err: pd13.ErrProto{Cause: err}} // IPLD decode error + out = DelegatedRouting_Identify_AsyncResult{Err: pd14.ErrProto{Cause: err}} // IPLD decode error } else { var x [1]byte if k, err := r.Read(x[:]); k != 1 || x[0] != '\n' { - out = DelegatedRouting_Identify_AsyncResult{Err: pd13.ErrProto{Cause: pd2.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error + out = DelegatedRouting_Identify_AsyncResult{Err: pd14.ErrProto{Cause: pd2.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error } else { env := &AnonInductive5{} if err = env.Parse(n); err != nil { - out = DelegatedRouting_Identify_AsyncResult{Err: pd13.ErrProto{Cause: err}} // schema decode error + out = DelegatedRouting_Identify_AsyncResult{Err: pd14.ErrProto{Cause: err}} // schema decode error } else if env.Error != nil { - out = DelegatedRouting_Identify_AsyncResult{Err: pd13.ErrService{Cause: pd9.New(string(env.Error.Code))}} // service-level error + out = DelegatedRouting_Identify_AsyncResult{Err: pd14.ErrService{Cause: pd10.New(string(env.Error.Code))}} // service-level error } else if env.Identify != nil { out = DelegatedRouting_Identify_AsyncResult{Resp: env.Identify} } else { @@ -1294,21 +1300,27 @@ func (c *client_DelegatedRouting) FindProviders_Async(ctx pd7.Context, req *Find notSupported := c.unsupported["FindProviders"] c.ulk.Unlock() if notSupported { - return nil, pd13.ErrSchema + return nil, pd14.ErrSchema } envelope := &AnonInductive4{ FindProviders: req, } - buf, err := pd11.Encode(envelope, pd8.Encode) + buf, err := pd12.Encode(envelope, pd8.Encode) // XXX: apply binary encoding on top? + if err != nil { return nil, pd2.Errorf("serializing DAG-JSON request: %w", err) } // encode request in URL u := *c.endpoint - httpReq, err := pd4.NewRequestWithContext(ctx, "POST", u.String(), pd6.NewReader(buf)) + + q := pd13.Values{} + q.Set("q", string(buf)) + u.RawQuery = q.Encode() + httpReq, err := pd4.NewRequestWithContext(ctx, "GET", u.String(), nil) + if err != nil { return nil, err } @@ -1330,7 +1342,7 @@ func (c *client_DelegatedRouting) FindProviders_Async(ctx pd7.Context, req *Find c.ulk.Lock() c.unsupported["FindProviders"] = true c.ulk.Unlock() - return nil, pd13.ErrSchema + return nil, pd14.ErrSchema } // HTTP codes other than 200 correspond to service implementation rejecting the call when it is received // for reasons unrelated to protocol schema @@ -1338,7 +1350,7 @@ func (c *client_DelegatedRouting) FindProviders_Async(ctx pd7.Context, req *Find resp.Body.Close() if resp.Header != nil { if errValues, ok := resp.Header["Error"]; ok && len(errValues) == 1 { - err = pd13.ErrService{Cause: pd2.Errorf("%s", errValues[0])} + err = pd14.ErrService{Cause: pd2.Errorf("%s", errValues[0])} } else { err = pd2.Errorf("service rejected the call, no cause provided") } @@ -1353,10 +1365,10 @@ func (c *client_DelegatedRouting) FindProviders_Async(ctx pd7.Context, req *Find return ch, nil } -func process_DelegatedRouting_FindProviders_AsyncResult(ctx pd7.Context, ch chan<- DelegatedRouting_FindProviders_AsyncResult, r pd10.ReadCloser) { +func process_DelegatedRouting_FindProviders_AsyncResult(ctx pd7.Context, ch chan<- DelegatedRouting_FindProviders_AsyncResult, r pd11.ReadCloser) { defer close(ch) defer r.Close() - opt := pd8.DecodeOptions{ + opt := pd9.DecodeOptions{ ParseLinks: true, ParseBytes: true, DontParseBeyondEnd: true, @@ -1364,24 +1376,24 @@ func process_DelegatedRouting_FindProviders_AsyncResult(ctx pd7.Context, ch chan for { var out DelegatedRouting_FindProviders_AsyncResult - n, err := pd11.DecodeStreaming(r, opt.Decode) + n, err := pd12.DecodeStreaming(r, opt.Decode) - if pd9.Is(err, pd10.EOF) || pd9.Is(err, pd10.ErrUnexpectedEOF) || pd9.Is(err, pd7.DeadlineExceeded) || pd9.Is(err, pd7.Canceled) { + if pd10.Is(err, pd11.EOF) || pd10.Is(err, pd11.ErrUnexpectedEOF) || pd10.Is(err, pd7.DeadlineExceeded) || pd10.Is(err, pd7.Canceled) { return } if err != nil { - out = DelegatedRouting_FindProviders_AsyncResult{Err: pd13.ErrProto{Cause: err}} // IPLD decode error + out = DelegatedRouting_FindProviders_AsyncResult{Err: pd14.ErrProto{Cause: err}} // IPLD decode error } else { var x [1]byte if k, err := r.Read(x[:]); k != 1 || x[0] != '\n' { - out = DelegatedRouting_FindProviders_AsyncResult{Err: pd13.ErrProto{Cause: pd2.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error + out = DelegatedRouting_FindProviders_AsyncResult{Err: pd14.ErrProto{Cause: pd2.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error } else { env := &AnonInductive5{} if err = env.Parse(n); err != nil { - out = DelegatedRouting_FindProviders_AsyncResult{Err: pd13.ErrProto{Cause: err}} // schema decode error + out = DelegatedRouting_FindProviders_AsyncResult{Err: pd14.ErrProto{Cause: err}} // schema decode error } else if env.Error != nil { - out = DelegatedRouting_FindProviders_AsyncResult{Err: pd13.ErrService{Cause: pd9.New(string(env.Error.Code))}} // service-level error + out = DelegatedRouting_FindProviders_AsyncResult{Err: pd14.ErrService{Cause: pd10.New(string(env.Error.Code))}} // service-level error } else if env.FindProviders != nil { out = DelegatedRouting_FindProviders_AsyncResult{Resp: env.FindProviders} } else { @@ -1433,21 +1445,24 @@ func (c *client_DelegatedRouting) GetIPNS_Async(ctx pd7.Context, req *GetIPNSReq notSupported := c.unsupported["GetIPNS"] c.ulk.Unlock() if notSupported { - return nil, pd13.ErrSchema + return nil, pd14.ErrSchema } envelope := &AnonInductive4{ GetIPNS: req, } - buf, err := pd11.Encode(envelope, pd8.Encode) + buf, err := pd12.Encode(envelope, pd9.Encode) + if err != nil { return nil, pd2.Errorf("serializing DAG-JSON request: %w", err) } // encode request in URL u := *c.endpoint + httpReq, err := pd4.NewRequestWithContext(ctx, "POST", u.String(), pd6.NewReader(buf)) + if err != nil { return nil, err } @@ -1469,7 +1484,7 @@ func (c *client_DelegatedRouting) GetIPNS_Async(ctx pd7.Context, req *GetIPNSReq c.ulk.Lock() c.unsupported["GetIPNS"] = true c.ulk.Unlock() - return nil, pd13.ErrSchema + return nil, pd14.ErrSchema } // HTTP codes other than 200 correspond to service implementation rejecting the call when it is received // for reasons unrelated to protocol schema @@ -1477,7 +1492,7 @@ func (c *client_DelegatedRouting) GetIPNS_Async(ctx pd7.Context, req *GetIPNSReq resp.Body.Close() if resp.Header != nil { if errValues, ok := resp.Header["Error"]; ok && len(errValues) == 1 { - err = pd13.ErrService{Cause: pd2.Errorf("%s", errValues[0])} + err = pd14.ErrService{Cause: pd2.Errorf("%s", errValues[0])} } else { err = pd2.Errorf("service rejected the call, no cause provided") } @@ -1492,10 +1507,10 @@ func (c *client_DelegatedRouting) GetIPNS_Async(ctx pd7.Context, req *GetIPNSReq return ch, nil } -func process_DelegatedRouting_GetIPNS_AsyncResult(ctx pd7.Context, ch chan<- DelegatedRouting_GetIPNS_AsyncResult, r pd10.ReadCloser) { +func process_DelegatedRouting_GetIPNS_AsyncResult(ctx pd7.Context, ch chan<- DelegatedRouting_GetIPNS_AsyncResult, r pd11.ReadCloser) { defer close(ch) defer r.Close() - opt := pd8.DecodeOptions{ + opt := pd9.DecodeOptions{ ParseLinks: true, ParseBytes: true, DontParseBeyondEnd: true, @@ -1503,24 +1518,24 @@ func process_DelegatedRouting_GetIPNS_AsyncResult(ctx pd7.Context, ch chan<- Del for { var out DelegatedRouting_GetIPNS_AsyncResult - n, err := pd11.DecodeStreaming(r, opt.Decode) + n, err := pd12.DecodeStreaming(r, opt.Decode) - if pd9.Is(err, pd10.EOF) || pd9.Is(err, pd10.ErrUnexpectedEOF) || pd9.Is(err, pd7.DeadlineExceeded) || pd9.Is(err, pd7.Canceled) { + if pd10.Is(err, pd11.EOF) || pd10.Is(err, pd11.ErrUnexpectedEOF) || pd10.Is(err, pd7.DeadlineExceeded) || pd10.Is(err, pd7.Canceled) { return } if err != nil { - out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd13.ErrProto{Cause: err}} // IPLD decode error + out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd14.ErrProto{Cause: err}} // IPLD decode error } else { var x [1]byte if k, err := r.Read(x[:]); k != 1 || x[0] != '\n' { - out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd13.ErrProto{Cause: pd2.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error + out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd14.ErrProto{Cause: pd2.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error } else { env := &AnonInductive5{} if err = env.Parse(n); err != nil { - out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd13.ErrProto{Cause: err}} // schema decode error + out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd14.ErrProto{Cause: err}} // schema decode error } else if env.Error != nil { - out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd13.ErrService{Cause: pd9.New(string(env.Error.Code))}} // service-level error + out = DelegatedRouting_GetIPNS_AsyncResult{Err: pd14.ErrService{Cause: pd10.New(string(env.Error.Code))}} // service-level error } else if env.GetIPNS != nil { out = DelegatedRouting_GetIPNS_AsyncResult{Resp: env.GetIPNS} } else { @@ -1572,21 +1587,24 @@ func (c *client_DelegatedRouting) PutIPNS_Async(ctx pd7.Context, req *PutIPNSReq notSupported := c.unsupported["PutIPNS"] c.ulk.Unlock() if notSupported { - return nil, pd13.ErrSchema + return nil, pd14.ErrSchema } envelope := &AnonInductive4{ PutIPNS: req, } - buf, err := pd11.Encode(envelope, pd8.Encode) + buf, err := pd12.Encode(envelope, pd9.Encode) + if err != nil { return nil, pd2.Errorf("serializing DAG-JSON request: %w", err) } // encode request in URL u := *c.endpoint + httpReq, err := pd4.NewRequestWithContext(ctx, "POST", u.String(), pd6.NewReader(buf)) + if err != nil { return nil, err } @@ -1608,7 +1626,7 @@ func (c *client_DelegatedRouting) PutIPNS_Async(ctx pd7.Context, req *PutIPNSReq c.ulk.Lock() c.unsupported["PutIPNS"] = true c.ulk.Unlock() - return nil, pd13.ErrSchema + return nil, pd14.ErrSchema } // HTTP codes other than 200 correspond to service implementation rejecting the call when it is received // for reasons unrelated to protocol schema @@ -1616,7 +1634,7 @@ func (c *client_DelegatedRouting) PutIPNS_Async(ctx pd7.Context, req *PutIPNSReq resp.Body.Close() if resp.Header != nil { if errValues, ok := resp.Header["Error"]; ok && len(errValues) == 1 { - err = pd13.ErrService{Cause: pd2.Errorf("%s", errValues[0])} + err = pd14.ErrService{Cause: pd2.Errorf("%s", errValues[0])} } else { err = pd2.Errorf("service rejected the call, no cause provided") } @@ -1631,10 +1649,10 @@ func (c *client_DelegatedRouting) PutIPNS_Async(ctx pd7.Context, req *PutIPNSReq return ch, nil } -func process_DelegatedRouting_PutIPNS_AsyncResult(ctx pd7.Context, ch chan<- DelegatedRouting_PutIPNS_AsyncResult, r pd10.ReadCloser) { +func process_DelegatedRouting_PutIPNS_AsyncResult(ctx pd7.Context, ch chan<- DelegatedRouting_PutIPNS_AsyncResult, r pd11.ReadCloser) { defer close(ch) defer r.Close() - opt := pd8.DecodeOptions{ + opt := pd9.DecodeOptions{ ParseLinks: true, ParseBytes: true, DontParseBeyondEnd: true, @@ -1642,24 +1660,24 @@ func process_DelegatedRouting_PutIPNS_AsyncResult(ctx pd7.Context, ch chan<- Del for { var out DelegatedRouting_PutIPNS_AsyncResult - n, err := pd11.DecodeStreaming(r, opt.Decode) + n, err := pd12.DecodeStreaming(r, opt.Decode) - if pd9.Is(err, pd10.EOF) || pd9.Is(err, pd10.ErrUnexpectedEOF) || pd9.Is(err, pd7.DeadlineExceeded) || pd9.Is(err, pd7.Canceled) { + if pd10.Is(err, pd11.EOF) || pd10.Is(err, pd11.ErrUnexpectedEOF) || pd10.Is(err, pd7.DeadlineExceeded) || pd10.Is(err, pd7.Canceled) { return } if err != nil { - out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd13.ErrProto{Cause: err}} // IPLD decode error + out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd14.ErrProto{Cause: err}} // IPLD decode error } else { var x [1]byte if k, err := r.Read(x[:]); k != 1 || x[0] != '\n' { - out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd13.ErrProto{Cause: pd2.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error + out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd14.ErrProto{Cause: pd2.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error } else { env := &AnonInductive5{} if err = env.Parse(n); err != nil { - out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd13.ErrProto{Cause: err}} // schema decode error + out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd14.ErrProto{Cause: err}} // schema decode error } else if env.Error != nil { - out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd13.ErrService{Cause: pd9.New(string(env.Error.Code))}} // service-level error + out = DelegatedRouting_PutIPNS_AsyncResult{Err: pd14.ErrService{Cause: pd10.New(string(env.Error.Code))}} // service-level error } else if env.PutIPNS != nil { out = DelegatedRouting_PutIPNS_AsyncResult{Resp: env.PutIPNS} } else { @@ -1711,21 +1729,24 @@ func (c *client_DelegatedRouting) Provide_Async(ctx pd7.Context, req *ProvideReq notSupported := c.unsupported["Provide"] c.ulk.Unlock() if notSupported { - return nil, pd13.ErrSchema + return nil, pd14.ErrSchema } envelope := &AnonInductive4{ Provide: req, } - buf, err := pd11.Encode(envelope, pd8.Encode) + buf, err := pd12.Encode(envelope, pd9.Encode) + if err != nil { return nil, pd2.Errorf("serializing DAG-JSON request: %w", err) } // encode request in URL u := *c.endpoint + httpReq, err := pd4.NewRequestWithContext(ctx, "POST", u.String(), pd6.NewReader(buf)) + if err != nil { return nil, err } @@ -1747,7 +1768,7 @@ func (c *client_DelegatedRouting) Provide_Async(ctx pd7.Context, req *ProvideReq c.ulk.Lock() c.unsupported["Provide"] = true c.ulk.Unlock() - return nil, pd13.ErrSchema + return nil, pd14.ErrSchema } // HTTP codes other than 200 correspond to service implementation rejecting the call when it is received // for reasons unrelated to protocol schema @@ -1755,7 +1776,7 @@ func (c *client_DelegatedRouting) Provide_Async(ctx pd7.Context, req *ProvideReq resp.Body.Close() if resp.Header != nil { if errValues, ok := resp.Header["Error"]; ok && len(errValues) == 1 { - err = pd13.ErrService{Cause: pd2.Errorf("%s", errValues[0])} + err = pd14.ErrService{Cause: pd2.Errorf("%s", errValues[0])} } else { err = pd2.Errorf("service rejected the call, no cause provided") } @@ -1770,10 +1791,10 @@ func (c *client_DelegatedRouting) Provide_Async(ctx pd7.Context, req *ProvideReq return ch, nil } -func process_DelegatedRouting_Provide_AsyncResult(ctx pd7.Context, ch chan<- DelegatedRouting_Provide_AsyncResult, r pd10.ReadCloser) { +func process_DelegatedRouting_Provide_AsyncResult(ctx pd7.Context, ch chan<- DelegatedRouting_Provide_AsyncResult, r pd11.ReadCloser) { defer close(ch) defer r.Close() - opt := pd8.DecodeOptions{ + opt := pd9.DecodeOptions{ ParseLinks: true, ParseBytes: true, DontParseBeyondEnd: true, @@ -1781,24 +1802,24 @@ func process_DelegatedRouting_Provide_AsyncResult(ctx pd7.Context, ch chan<- Del for { var out DelegatedRouting_Provide_AsyncResult - n, err := pd11.DecodeStreaming(r, opt.Decode) + n, err := pd12.DecodeStreaming(r, opt.Decode) - if pd9.Is(err, pd10.EOF) || pd9.Is(err, pd10.ErrUnexpectedEOF) || pd9.Is(err, pd7.DeadlineExceeded) || pd9.Is(err, pd7.Canceled) { + if pd10.Is(err, pd11.EOF) || pd10.Is(err, pd11.ErrUnexpectedEOF) || pd10.Is(err, pd7.DeadlineExceeded) || pd10.Is(err, pd7.Canceled) { return } if err != nil { - out = DelegatedRouting_Provide_AsyncResult{Err: pd13.ErrProto{Cause: err}} // IPLD decode error + out = DelegatedRouting_Provide_AsyncResult{Err: pd14.ErrProto{Cause: err}} // IPLD decode error } else { var x [1]byte if k, err := r.Read(x[:]); k != 1 || x[0] != '\n' { - out = DelegatedRouting_Provide_AsyncResult{Err: pd13.ErrProto{Cause: pd2.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error + out = DelegatedRouting_Provide_AsyncResult{Err: pd14.ErrProto{Cause: pd2.Errorf("missing new line after result: err (%v), read (%d), char (%q)", err, k, string(x[:]))}} // Edelweiss decode error } else { env := &AnonInductive5{} if err = env.Parse(n); err != nil { - out = DelegatedRouting_Provide_AsyncResult{Err: pd13.ErrProto{Cause: err}} // schema decode error + out = DelegatedRouting_Provide_AsyncResult{Err: pd14.ErrProto{Cause: err}} // schema decode error } else if env.Error != nil { - out = DelegatedRouting_Provide_AsyncResult{Err: pd13.ErrService{Cause: pd9.New(string(env.Error.Code))}} // service-level error + out = DelegatedRouting_Provide_AsyncResult{Err: pd14.ErrService{Cause: pd10.New(string(env.Error.Code))}} // service-level error } else if env.Provide != nil { out = DelegatedRouting_Provide_AsyncResult{Resp: env.Provide} } else { @@ -1827,33 +1848,60 @@ type DelegatedRouting_Server interface { func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd4.HandlerFunc { return func(writer pd4.ResponseWriter, request *pd4.Request) { // parse request - msg, err := pd15.ReadAll(request.Body) - if err != nil { - logger_server_DelegatedRouting.Errorf("reading request body (%v)", err) - writer.WriteHeader(400) - return - } - n, err := pd11.Decode(msg, pd8.Decode) - if err != nil { - logger_server_DelegatedRouting.Errorf("received request not decodeable (%v)", err) - writer.WriteHeader(400) - return - } env := &AnonInductive4{} - if err = env.Parse(n); err != nil { - logger_server_DelegatedRouting.Errorf("parsing call envelope (%v)", err) + isReqCachable := false + switch request.Method { + case "POST": + isReqCachable = false + msg, err := pd11.ReadAll(request.Body) + if err != nil { + logger_server_DelegatedRouting.Errorf("reading request body (%v)", err) + writer.WriteHeader(400) + return + } + n, err := pd12.Decode(msg, pd9.Decode) + if err != nil { + logger_server_DelegatedRouting.Errorf("received request not decodeable (%v)", err) + writer.WriteHeader(400) + return + } + if err = env.Parse(n); err != nil { + logger_server_DelegatedRouting.Errorf("parsing call envelope (%v)", err) + writer.WriteHeader(400) + return + } + case "GET": + isReqCachable = true + msg := request.URL.Query().Get("q") + n, err := pd12.Decode([]byte(msg), pd8.Decode) + if err != nil { + logger_server_DelegatedRouting.Errorf("received url not decodeable (%v)", err) + writer.WriteHeader(400) + return + } + + if err = env.Parse(n); err != nil { + logger_server_DelegatedRouting.Errorf("parsing call envelope (%v)", err) + writer.WriteHeader(400) + return + } + default: + logger_server_DelegatedRouting.Errorf("http method not supported") writer.WriteHeader(400) return } + _ = isReqCachable writer.Header()["Content-Type"] = []string{ "application/vnd.ipfs.rpc+dag-json; version=1", } // demultiplex request + var err error switch { case env.FindProviders != nil: + ch, err := s.FindProviders(request.Context(), env.FindProviders) if err != nil { logger_server_DelegatedRouting.Errorf("service rejected request (%v)", err) @@ -1862,11 +1910,42 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd4.HandlerFunc { return } - writer.WriteHeader(200) - if f, ok := writer.(pd4.Flusher); ok { - f.Flush() - } + // if the request is cachable, collect all async results in a buffer, otherwise write them directly to http + var resultWriter pd11.Writer + if isReqCachable { + resultWriter = new(pd6.Buffer) + } else { + resultWriter = writer + writer.WriteHeader(200) + if f, ok := writer.(pd4.Flusher); ok { + f.Flush() + } + } + // if the request is cachable, compute an etag and send the collected results to http + if isReqCachable { + defer func() { + result := resultWriter.(*pd6.Buffer).Bytes() + etag, err := pd14.ETag(result) + if err != nil { + logger_server_DelegatedRouting.Errorf("etag generation (%v)", err) + writer.Header()["Error"] = []string{err.Error()} + writer.WriteHeader(500) + return + } + // if the request has an If-None-Match header, respond appropriately + ifNoneMatchValue := request.Header["If-None-Match"] + if len(ifNoneMatchValue) == 1 && ifNoneMatchValue[0] == etag { + writer.WriteHeader(304) + } else { + writer.Header()["ETag"] = []string{etag} + writer.Write(result) + if f, ok := writer.(pd4.Flusher); ok { + f.Flush() + } + } + }() + } for { select { case <-request.Context().Done(): @@ -1882,19 +1961,27 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd4.HandlerFunc { env = &AnonInductive5{FindProviders: resp.Resp} } var buf pd6.Buffer - if err = pd11.EncodeStreaming(&buf, env, pd8.Encode); err != nil { + if err = pd12.EncodeStreaming(&buf, env, pd9.Encode); err != nil { logger_server_DelegatedRouting.Errorf("cannot encode response (%v)", err) continue } buf.WriteByte("\n"[0]) - writer.Write(buf.Bytes()) - if f, ok := writer.(pd4.Flusher); ok { + resultWriter.Write(buf.Bytes()) + if f, ok := resultWriter.(pd4.Flusher); ok { f.Flush() } } } case env.GetIPNS != nil: + + if isReqCachable { + logger_server_DelegatedRouting.Errorf("non-cachable method called with http GET") + writer.Header()["Error"] = []string{"non-cachable method called with GET"} + writer.WriteHeader(500) + return + } + ch, err := s.GetIPNS(request.Context(), env.GetIPNS) if err != nil { logger_server_DelegatedRouting.Errorf("service rejected request (%v)", err) @@ -1903,11 +1990,42 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd4.HandlerFunc { return } - writer.WriteHeader(200) - if f, ok := writer.(pd4.Flusher); ok { - f.Flush() - } + // if the request is cachable, collect all async results in a buffer, otherwise write them directly to http + var resultWriter pd11.Writer + if isReqCachable { + resultWriter = new(pd6.Buffer) + } else { + resultWriter = writer + writer.WriteHeader(200) + if f, ok := writer.(pd4.Flusher); ok { + f.Flush() + } + } + // if the request is cachable, compute an etag and send the collected results to http + if isReqCachable { + defer func() { + result := resultWriter.(*pd6.Buffer).Bytes() + etag, err := pd14.ETag(result) + if err != nil { + logger_server_DelegatedRouting.Errorf("etag generation (%v)", err) + writer.Header()["Error"] = []string{err.Error()} + writer.WriteHeader(500) + return + } + // if the request has an If-None-Match header, respond appropriately + ifNoneMatchValue := request.Header["If-None-Match"] + if len(ifNoneMatchValue) == 1 && ifNoneMatchValue[0] == etag { + writer.WriteHeader(304) + } else { + writer.Header()["ETag"] = []string{etag} + writer.Write(result) + if f, ok := writer.(pd4.Flusher); ok { + f.Flush() + } + } + }() + } for { select { case <-request.Context().Done(): @@ -1923,19 +2041,27 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd4.HandlerFunc { env = &AnonInductive5{GetIPNS: resp.Resp} } var buf pd6.Buffer - if err = pd11.EncodeStreaming(&buf, env, pd8.Encode); err != nil { + if err = pd12.EncodeStreaming(&buf, env, pd9.Encode); err != nil { logger_server_DelegatedRouting.Errorf("cannot encode response (%v)", err) continue } buf.WriteByte("\n"[0]) - writer.Write(buf.Bytes()) - if f, ok := writer.(pd4.Flusher); ok { + resultWriter.Write(buf.Bytes()) + if f, ok := resultWriter.(pd4.Flusher); ok { f.Flush() } } } case env.PutIPNS != nil: + + if isReqCachable { + logger_server_DelegatedRouting.Errorf("non-cachable method called with http GET") + writer.Header()["Error"] = []string{"non-cachable method called with GET"} + writer.WriteHeader(500) + return + } + ch, err := s.PutIPNS(request.Context(), env.PutIPNS) if err != nil { logger_server_DelegatedRouting.Errorf("service rejected request (%v)", err) @@ -1944,11 +2070,42 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd4.HandlerFunc { return } - writer.WriteHeader(200) - if f, ok := writer.(pd4.Flusher); ok { - f.Flush() - } + // if the request is cachable, collect all async results in a buffer, otherwise write them directly to http + var resultWriter pd11.Writer + if isReqCachable { + resultWriter = new(pd6.Buffer) + } else { + resultWriter = writer + writer.WriteHeader(200) + if f, ok := writer.(pd4.Flusher); ok { + f.Flush() + } + } + // if the request is cachable, compute an etag and send the collected results to http + if isReqCachable { + defer func() { + result := resultWriter.(*pd6.Buffer).Bytes() + etag, err := pd14.ETag(result) + if err != nil { + logger_server_DelegatedRouting.Errorf("etag generation (%v)", err) + writer.Header()["Error"] = []string{err.Error()} + writer.WriteHeader(500) + return + } + // if the request has an If-None-Match header, respond appropriately + ifNoneMatchValue := request.Header["If-None-Match"] + if len(ifNoneMatchValue) == 1 && ifNoneMatchValue[0] == etag { + writer.WriteHeader(304) + } else { + writer.Header()["ETag"] = []string{etag} + writer.Write(result) + if f, ok := writer.(pd4.Flusher); ok { + f.Flush() + } + } + }() + } for { select { case <-request.Context().Done(): @@ -1964,19 +2121,27 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd4.HandlerFunc { env = &AnonInductive5{PutIPNS: resp.Resp} } var buf pd6.Buffer - if err = pd11.EncodeStreaming(&buf, env, pd8.Encode); err != nil { + if err = pd12.EncodeStreaming(&buf, env, pd9.Encode); err != nil { logger_server_DelegatedRouting.Errorf("cannot encode response (%v)", err) continue } buf.WriteByte("\n"[0]) - writer.Write(buf.Bytes()) - if f, ok := writer.(pd4.Flusher); ok { + resultWriter.Write(buf.Bytes()) + if f, ok := resultWriter.(pd4.Flusher); ok { f.Flush() } } } case env.Provide != nil: + + if isReqCachable { + logger_server_DelegatedRouting.Errorf("non-cachable method called with http GET") + writer.Header()["Error"] = []string{"non-cachable method called with GET"} + writer.WriteHeader(500) + return + } + ch, err := s.Provide(request.Context(), env.Provide) if err != nil { logger_server_DelegatedRouting.Errorf("service rejected request (%v)", err) @@ -1985,11 +2150,42 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd4.HandlerFunc { return } - writer.WriteHeader(200) - if f, ok := writer.(pd4.Flusher); ok { - f.Flush() - } + // if the request is cachable, collect all async results in a buffer, otherwise write them directly to http + var resultWriter pd11.Writer + if isReqCachable { + resultWriter = new(pd6.Buffer) + } else { + resultWriter = writer + writer.WriteHeader(200) + if f, ok := writer.(pd4.Flusher); ok { + f.Flush() + } + } + // if the request is cachable, compute an etag and send the collected results to http + if isReqCachable { + defer func() { + result := resultWriter.(*pd6.Buffer).Bytes() + etag, err := pd14.ETag(result) + if err != nil { + logger_server_DelegatedRouting.Errorf("etag generation (%v)", err) + writer.Header()["Error"] = []string{err.Error()} + writer.WriteHeader(500) + return + } + // if the request has an If-None-Match header, respond appropriately + ifNoneMatchValue := request.Header["If-None-Match"] + if len(ifNoneMatchValue) == 1 && ifNoneMatchValue[0] == etag { + writer.WriteHeader(304) + } else { + writer.Header()["ETag"] = []string{etag} + writer.Write(result) + if f, ok := writer.(pd4.Flusher); ok { + f.Flush() + } + } + }() + } for { select { case <-request.Context().Done(): @@ -2005,13 +2201,13 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd4.HandlerFunc { env = &AnonInductive5{Provide: resp.Resp} } var buf pd6.Buffer - if err = pd11.EncodeStreaming(&buf, env, pd8.Encode); err != nil { + if err = pd12.EncodeStreaming(&buf, env, pd9.Encode); err != nil { logger_server_DelegatedRouting.Errorf("cannot encode response (%v)", err) continue } buf.WriteByte("\n"[0]) - writer.Write(buf.Bytes()) - if f, ok := writer.(pd4.Flusher); ok { + resultWriter.Write(buf.Bytes()) + if f, ok := resultWriter.(pd4.Flusher); ok { f.Flush() } } @@ -2030,13 +2226,33 @@ func DelegatedRouting_AsyncHandler(s DelegatedRouting_Server) pd4.HandlerFunc { }, } var buf pd6.Buffer - if err = pd11.EncodeStreaming(&buf, env, pd8.Encode); err != nil { + if err = pd12.EncodeStreaming(&buf, env, pd9.Encode); err != nil { logger_server_DelegatedRouting.Errorf("cannot encode identify response (%v)", err) writer.WriteHeader(500) return } buf.WriteByte("\n"[0]) - writer.Write(buf.Bytes()) + + // compute etag, since Identify is cachable + result := buf.Bytes() + etag, err := pd14.ETag(result) + if err != nil { + logger_server_DelegatedRouting.Errorf("etag generation (%v)", err) + writer.Header()["Error"] = []string{err.Error()} + writer.WriteHeader(500) + return + } + // if the request has an If-None-Match header, respond appropriately + ifNoneMatchValue := request.Header["If-None-Match"] + if len(ifNoneMatchValue) == 1 && ifNoneMatchValue[0] == etag { + writer.WriteHeader(304) + } else { + writer.Header()["ETag"] = []string{etag} + writer.Write(result) + if f, ok := writer.(pd4.Flusher); ok { + f.Flush() + } + } default: logger_server_DelegatedRouting.Errorf("missing or unknown request") diff --git a/gen/routing.go b/gen/routing.go index 3f4dbb7..58980c0 100644 --- a/gen/routing.go +++ b/gen/routing.go @@ -24,6 +24,7 @@ var proto = defs.Defs{ Arg: defs.Ref{Name: "FindProvidersRequest"}, Return: defs.Ref{Name: "FindProvidersResponse"}, }, + Cachable: true, }, defs.Method{ Name: "GetIPNS", diff --git a/go.mod b/go.mod index 74344df..e580a1f 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/ipfs/go-cid v0.2.0 github.com/ipfs/go-ipns v0.1.2 github.com/ipfs/go-log/v2 v2.5.1 - github.com/ipld/edelweiss v0.1.6 + github.com/ipld/edelweiss v0.2.0 github.com/ipld/go-ipld-prime v0.17.1-0.20220627233435-adf99676901e github.com/libp2p/go-libp2p-core v0.16.1 github.com/libp2p/go-libp2p-record v0.1.3 diff --git a/go.sum b/go.sum index 30856e1..26c57d9 100644 --- a/go.sum +++ b/go.sum @@ -98,8 +98,8 @@ github.com/ipfs/go-log/v2 v2.0.5/go.mod h1:eZs4Xt4ZUJQFM3DlanGhy7TkwwawCZcSByscw github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g= github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY= github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI= -github.com/ipld/edelweiss v0.1.6 h1:LHx4wzsbQYC5UvN+7xwvwnGbq8kCsMbOSbfBYVCC1nY= -github.com/ipld/edelweiss v0.1.6/go.mod h1:IVSfo5e7vJrTKKRjR1lrtfgc2UbEMvvatNycfH9fRfY= +github.com/ipld/edelweiss v0.2.0 h1:KfAZBP8eeJtrLxLhi7r3N0cBCo7JmwSRhOJp3WSpNjk= +github.com/ipld/edelweiss v0.2.0/go.mod h1:FJAzJRCep4iI8FOFlRriN9n0b7OuX3T/S9++NpBDmA4= github.com/ipld/go-ipld-prime v0.9.0/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8= github.com/ipld/go-ipld-prime v0.17.0/go.mod h1:aYcKm5TIvGfY8P3QBKz/2gKcLxzJ1zDaD+o0bOowhgs= github.com/ipld/go-ipld-prime v0.17.1-0.20220627233435-adf99676901e h1:p5qepdt1UEk6UadNwNBFDlm/uC+GwSmdVB4wqyt2JLA= diff --git a/test/clientserver_test.go b/test/clientserver_test.go index ae0e099..016778b 100644 --- a/test/clientserver_test.go +++ b/test/clientserver_test.go @@ -3,6 +3,7 @@ package test import ( "bytes" "context" + "errors" "fmt" "math" "net/http/httptest" @@ -225,21 +226,35 @@ func TestCancelContext(t *testing.T) { t.Fatal(err) } - par, err := c.FindProvidersAsync(ctx, cid) - if err != nil { - t.Fatal(err) - } + // FindProviders reads all results into a buffer before returning headers. + // This means that, unlike the other calls, the client will not return the result channel + // until the server reads all the results, which will never happen. + // So we make the FindProversAsync call asynchronously and cancel, + // which should result in a cancelation error. + + done := make(chan struct{}) + go func() { + par, err := c.FindProvidersAsync(ctx, cid) + if err != nil { + if !errors.Is(err, context.Canceled) { + panic(err) + } + } + select { + case <-par: + panic("got a result when no result was expected") + default: + } + close(done) + }() cancel() - o2, ok := <-par - if ok { - t.Fatal("FindProvidersAsync channel must be closed", "OUTPUT:", o2.Err) - } + <-done + } func TestClientServer(t *testing.T) { - var numIter []int = []int{1e2, 1e3, 1e4} avgLatency := make([]time.Duration, len(numIter)) deltaGo := make([]int, len(numIter))