-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
libp2phttp.Host implements RoundTripper #2840
Changes from all commits
3f6f8cb
d1ac979
00c651b
95bae21
94493f1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -406,11 +406,14 @@ type PeerMetadataGetter interface { | |
} | ||
|
||
type streamRoundTripper struct { | ||
server peer.ID | ||
addrsAdded sync.Once | ||
serverAddrs []ma.Multiaddr | ||
h host.Host | ||
httpHost *Host | ||
server peer.ID | ||
// if true, we won't add the server's addresses to the peerstore. This | ||
// should only be set when creating the struct. | ||
skipAddAddrs bool | ||
addrsAdded sync.Once | ||
serverAddrs []ma.Multiaddr | ||
h host.Host | ||
httpHost *Host | ||
} | ||
|
||
// streamReadCloser wraps an io.ReadCloser and closes the underlying stream when | ||
|
@@ -438,12 +441,14 @@ func (rt *streamRoundTripper) GetPeerMetadata() (PeerMeta, error) { | |
// RoundTrip implements http.RoundTripper. | ||
func (rt *streamRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { | ||
// Add the addresses we learned about for this server | ||
rt.addrsAdded.Do(func() { | ||
if len(rt.serverAddrs) > 0 { | ||
rt.h.Peerstore().AddAddrs(rt.server, rt.serverAddrs, peerstore.TempAddrTTL) | ||
} | ||
rt.serverAddrs = nil // may as well cleanup | ||
}) | ||
if !rt.skipAddAddrs { | ||
rt.addrsAdded.Do(func() { | ||
if len(rt.serverAddrs) > 0 { | ||
rt.h.Peerstore().AddAddrs(rt.server, rt.serverAddrs, peerstore.TempAddrTTL) | ||
} | ||
rt.serverAddrs = nil // may as well cleanup | ||
}) | ||
} | ||
|
||
s, err := rt.h.NewStream(r.Context(), rt.server, ProtocolIDForMultistreamSelect) | ||
if err != nil { | ||
|
@@ -620,6 +625,85 @@ func (h *Host) NamespacedClient(p protocol.ID, server peer.AddrInfo, opts ...Rou | |
|
||
return http.Client{Transport: nrt}, nil | ||
} | ||
func (h *Host) initDefaultRT() { | ||
h.createDefaultClientRoundTripper.Do(func() { | ||
if h.DefaultClientRoundTripper == nil { | ||
tr, ok := http.DefaultTransport.(*http.Transport) | ||
if ok { | ||
h.DefaultClientRoundTripper = tr | ||
} else { | ||
h.DefaultClientRoundTripper = &http.Transport{} | ||
} | ||
} | ||
}) | ||
} | ||
|
||
// RoundTrip implements http.RoundTripper for the HTTP Host. | ||
// This allows you to use the Host as a Transport for an http.Client. | ||
// See the example for idomatic usage. | ||
func (h *Host) RoundTrip(r *http.Request) (*http.Response, error) { | ||
switch r.URL.Scheme { | ||
case "http", "https": | ||
h.initDefaultRT() | ||
return h.DefaultClientRoundTripper.RoundTrip(r) | ||
case "multiaddr": | ||
break | ||
default: | ||
return nil, fmt.Errorf("unsupported scheme %s", r.URL.Scheme) | ||
} | ||
|
||
addr, err := ma.NewMultiaddr(r.URL.String()[len("multiaddr:"):]) | ||
if err != nil { | ||
return nil, err | ||
} | ||
addr, isHTTP := normalizeHTTPMultiaddr(addr) | ||
if isHTTP { | ||
parsed := parseMultiaddr(addr) | ||
scheme := "http" | ||
if parsed.useHTTPS { | ||
scheme = "https" | ||
} | ||
h.initDefaultRT() | ||
rt := h.DefaultClientRoundTripper | ||
if parsed.sni != parsed.host { | ||
// We have a different host and SNI (e.g. using an IP address but specifying a SNI) | ||
Comment on lines
+668
to
+669
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this work with Do we need something like: https://github.com/libp2p/go-libp2p/blob/master/p2p/transport/websocket/websocket.go#L123 here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes |
||
// We need to make our own transport to support this. | ||
// | ||
// TODO: if we end up using this code path a lot, we could maintain | ||
// a pool of these transports. For now though, it's here for | ||
// completeness, but I don't expect us to hit it often. | ||
rt = rt.Clone() | ||
rt.TLSClientConfig.ServerName = parsed.sni | ||
} | ||
|
||
// TODO add http-path support | ||
url := url.URL{ | ||
Scheme: scheme, | ||
Host: parsed.host + ":" + parsed.port, | ||
} | ||
|
||
r.URL = &url | ||
return rt.RoundTrip(r) | ||
} | ||
|
||
if h.StreamHost == nil { | ||
return nil, fmt.Errorf("can not do HTTP over streams. Missing StreamHost") | ||
} | ||
|
||
addr, pid := peer.SplitAddr(addr) | ||
if pid == "" { | ||
return nil, fmt.Errorf("no peer ID in multiaddr") | ||
} | ||
h.StreamHost.Peerstore().AddAddrs(pid, []ma.Multiaddr{addr}, peerstore.TempAddrTTL) | ||
|
||
srt := streamRoundTripper{ | ||
server: pid, | ||
skipAddAddrs: true, | ||
httpHost: h, | ||
h: h.StreamHost, | ||
} | ||
return srt.RoundTrip(r) | ||
} | ||
|
||
// NewConstrainedRoundTripper returns an http.RoundTripper that can fulfill and HTTP | ||
// request to the given server. It may use an HTTP transport or a stream based | ||
|
@@ -672,11 +756,7 @@ func (h *Host) NewConstrainedRoundTripper(server peer.AddrInfo, opts ...RoundTri | |
scheme = "https" | ||
} | ||
|
||
h.createDefaultClientRoundTripper.Do(func() { | ||
if h.DefaultClientRoundTripper == nil { | ||
h.DefaultClientRoundTripper = &http.Transport{} | ||
} | ||
}) | ||
h.initDefaultRT() | ||
rt := h.DefaultClientRoundTripper | ||
ownRoundtripper := false | ||
if parsed.sni != parsed.host { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -755,3 +755,71 @@ func TestResponseWriterShouldNotHaveCancelledContext(t *testing.T) { | |
|
||
require.False(t, <-closeNotifyCh) | ||
} | ||
|
||
func TestHTTPHostAsRoundTripper(t *testing.T) { | ||
serverHost, err := libp2p.New( | ||
libp2p.ListenAddrStrings("/ip4/127.0.0.1/udp/0/quic-v1"), | ||
) | ||
require.NoError(t, err) | ||
|
||
serverHttpHost := libp2phttp.Host{ | ||
InsecureAllowHTTP: true, | ||
StreamHost: serverHost, | ||
ListenAddrs: []ma.Multiaddr{ma.StringCast("/ip4/127.0.0.1/tcp/0/http")}, | ||
} | ||
|
||
serverHttpHost.SetHTTPHandlerAtPath("/hello", "/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
w.Write([]byte("hello")) | ||
})) | ||
|
||
// Uncomment when we get the http-path changes in go-multiaddr | ||
// // Different protocol.ID and mounted at a different path | ||
// serverHttpHost.SetHTTPHandlerAtPath("/hello-again", "/hello", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
// w.Write([]byte("hello")) | ||
// })) | ||
|
||
go serverHttpHost.Serve() | ||
defer serverHttpHost.Close() | ||
|
||
testCases := []string{ | ||
// Version that has an http-path. Will uncomment when we get the http-path changes in go-multiaddr | ||
// "multiaddr:" + serverHost.Addrs()[0].String() + "/http-path/hello", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! ❤️ |
||
} | ||
for _, a := range serverHttpHost.Addrs() { | ||
if _, err := a.ValueForProtocol(ma.P_HTTP); err == nil { | ||
testCases = append(testCases, "multiaddr:"+a.String()) | ||
serverPort, err := a.ValueForProtocol(ma.P_TCP) | ||
require.NoError(t, err) | ||
testCases = append(testCases, "http://127.0.0.1:"+serverPort) | ||
} else { | ||
testCases = append(testCases, "multiaddr:"+a.String()+"/p2p/"+serverHost.ID().String()) | ||
} | ||
} | ||
|
||
clientStreamHost, err := libp2p.New() | ||
require.NoError(t, err) | ||
defer clientStreamHost.Close() | ||
|
||
clientHttpHost := libp2phttp.Host{StreamHost: clientStreamHost} | ||
client := http.Client{Transport: &clientHttpHost} | ||
for _, tc := range testCases { | ||
t.Run(tc, func(t *testing.T) { | ||
resp, err := client.Get(tc) | ||
require.NoError(t, err) | ||
defer resp.Body.Close() | ||
body, err := io.ReadAll(resp.Body) | ||
require.NoError(t, err) | ||
require.Equal(t, "hello", string(body)) | ||
}) | ||
} | ||
} | ||
|
||
func TestHTTPHostAsRoundTripperFailsWhenNoStreamHostPresent(t *testing.T) { | ||
clientHttpHost := libp2phttp.Host{} | ||
client := http.Client{Transport: &clientHttpHost} | ||
|
||
_, err := client.Get("multiaddr:/ip4/127.0.0.1/udp/1111/quic-v1") | ||
// Fails because we don't have a stream host available to make the request | ||
require.Error(t, err) | ||
require.ErrorContains(t, err, "Missing StreamHost") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate on why we need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It really doesn't matter, but it saves us the synchronization overhead of the
addrsAdded sync.Once
call.