Skip to content

Commit

Permalink
Merge pull request #200 from ipni/feat/streaming-delegated
Browse files Browse the repository at this point in the history
Add ndjson semantics to delegated routing endpoint
  • Loading branch information
willscott authored Oct 30, 2024
2 parents 5ea99e8 + 785d17e commit d2a426e
Show file tree
Hide file tree
Showing 3 changed files with 310 additions and 56 deletions.
159 changes: 104 additions & 55 deletions delegated_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ const (
)

type findFunc func(ctx context.Context, method, source string, req *url.URL, encrypted bool) (int, []byte)
type findStreamFunc func(ctx context.Context, method string, req *url.URL, encrypted bool) (int, chan model.ProviderResult)

func NewDelegatedTranslator(backend findFunc) (http.Handler, error) {
finder := delegatedTranslator{backend}
func NewDelegatedTranslator(backend findFunc, streamingBackend findStreamFunc) (http.Handler, error) {
finder := delegatedTranslator{backend, streamingBackend}
m := http.NewServeMux()
m.HandleFunc("/providers", finder.provide)
m.HandleFunc("/encrypted/providers", finder.provide)
Expand All @@ -34,7 +35,8 @@ func NewDelegatedTranslator(backend findFunc) (http.Handler, error) {
}

type delegatedTranslator struct {
be findFunc
be findFunc
sbe findStreamFunc
}

func (dt *delegatedTranslator) provide(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -81,6 +83,48 @@ func (dt *delegatedTranslator) find(w http.ResponseWriter, r *http.Request, encr

// Translate URL by mapping `/providers/{CID}` to `/cid/{CID}`.
uri := r.URL.JoinPath("../../cid", cidUrlParam)

acc, err := getAccepts(r)
if err != nil {
http.Error(w, "invalid Accept header", http.StatusBadRequest)
return
}

switch {
case acc.ndjson:
rcode, respChan := dt.sbe(r.Context(), findMethodDelegated, uri, encrypted)
if rcode != http.StatusOK {
http.Error(w, "", rcode)
return
}
out := &drResp{}
hasWritten := false
encoder := json.NewEncoder(w)

for rcrd := range respChan {
if !hasWritten {
w.Header().Set("Content-Type", mediaTypeNDJson)
w.Header().Set("Connection", "Keep-Alive")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(200)
hasWritten = true
}
prov := drProvFromResult(rcrd)
// if new
if out.append(prov) {
if err := encoder.Encode(prov); err != nil {
return
}
}
}
if len(out.seenProviders) == 0 {
// no response.
w.WriteHeader(http.StatusNotFound)
}
return
default:
}

rcode, resp := dt.be(r.Context(), http.MethodGet, findMethodDelegated, uri, encrypted)
if rcode != http.StatusOK {
http.Error(w, "", rcode)
Expand All @@ -105,63 +149,14 @@ func (dt *delegatedTranslator) find(w http.ResponseWriter, r *http.Request, encr

res := parsed.MultihashResults[0]

out := drResp{}
out := &drResp{}

// Records returned from IPNI via Delegated Routing don't have ContextID in them. Becuase of that,
// some records that are valid from the IPNI point of view might look like duplicates from the Delegated Routing point of view.
// To make the Delegated Routing output nicer, deduplicate identical records.
uniqueProviders := map[uint32]struct{}{}
appendIfUnique := func(drp *drProvider) {
capacity := len(drp.ID) + len(drp.Schema)
for _, proto := range drp.Protocols {
capacity += len(proto)
}
for _, meta := range drp.Metadata {
capacity += len(meta)
}
drpb := make([]byte, 0, capacity)
drpb = append(drpb, []byte(drp.ID)...)
for _, proto := range drp.Protocols {
drpb = append(drpb, []byte(proto)...)
}
drpb = append(drpb, []byte(drp.Schema)...)
for _, meta := range drp.Metadata {
drpb = append(drpb, meta...)
}
key := crc32.ChecksumIEEE(drpb)
if _, ok := uniqueProviders[key]; ok {
return
}
uniqueProviders[key] = struct{}{}
out.Providers = append(out.Providers, *drp)
}

for _, p := range res.ProviderResults {
md := metadata.Default.New()
err := md.UnmarshalBinary(p.Metadata)
if err != nil {
appendIfUnique(&drProvider{
Schema: peerSchema,
ID: p.Provider.ID,
Addrs: p.Provider.Addrs,
})
} else {
provider := &drProvider{
Schema: peerSchema,
ID: p.Provider.ID,
Addrs: p.Provider.Addrs,
Metadata: make(map[string][]byte),
}

for _, proto := range md.Protocols() {
pl := md.Get(proto)
plb, _ := pl.MarshalBinary()
provider.Protocols = append(provider.Protocols, proto.String())
provider.Metadata[proto.String()] = plb
}

appendIfUnique(provider)
}
out.append(drProvFromResult(p))
}

outBytes, err := json.Marshal(out)
Expand All @@ -174,7 +169,34 @@ func (dt *delegatedTranslator) find(w http.ResponseWriter, r *http.Request, encr
}

type drResp struct {
Providers []drProvider
Providers []drProvider
seenProviders map[uint32]struct{}
}

func (dr *drResp) append(drp *drProvider) bool {
capacity := len(drp.ID) + len(drp.Schema)
for _, proto := range drp.Protocols {
capacity += len(proto)
}
for _, meta := range drp.Metadata {
capacity += len(meta)
}
drpb := make([]byte, 0, capacity)
drpb = append(drpb, []byte(drp.ID)...)
for _, proto := range drp.Protocols {
drpb = append(drpb, []byte(proto)...)
}
drpb = append(drpb, []byte(drp.Schema)...)
for _, meta := range drp.Metadata {
drpb = append(drpb, meta...)
}
key := crc32.ChecksumIEEE(drpb)
if _, ok := dr.seenProviders[key]; ok {
return false
}
dr.seenProviders[key] = struct{}{}
dr.Providers = append(dr.Providers, *drp)
return true
}

type drProvider struct {
Expand All @@ -185,6 +207,33 @@ type drProvider struct {
Metadata map[string][]byte
}

func drProvFromResult(p model.ProviderResult) *drProvider {
md := metadata.Default.New()
err := md.UnmarshalBinary(p.Metadata)
if err != nil {
return &drProvider{
Schema: peerSchema,
ID: p.Provider.ID,
Addrs: p.Provider.Addrs,
}
} else {
provider := &drProvider{
Schema: peerSchema,
ID: p.Provider.ID,
Addrs: p.Provider.Addrs,
Metadata: make(map[string][]byte),
}

for _, proto := range md.Protocols() {
pl := md.Get(proto)
plb, _ := pl.MarshalBinary()
provider.Protocols = append(provider.Protocols, proto.String())
provider.Metadata[proto.String()] = plb
}
return provider
}
}

func (dp drProvider) MarshalJSON() ([]byte, error) {
m := map[string]interface{}{}
if dp.Metadata != nil {
Expand Down
Loading

0 comments on commit d2a426e

Please sign in to comment.