Skip to content

Commit

Permalink
Introduce FakeSharedWebTransport
Browse files Browse the repository at this point in the history
Summary: This is a handy class to test a client/server application where both endpoints use WebTransport.  The read streams for one become the write streams for the other and vice versa.

Differential Revision: D60542454

fbshipit-source-id: 59be0c0f6bc07d0d8b4b6779291c883362aa6612
  • Loading branch information
afrind authored and facebook-github-bot committed Aug 5, 2024
1 parent 2299f96 commit f288d8b
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 0 deletions.
16 changes: 16 additions & 0 deletions proxygen/lib/http/webtransport/WebTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,20 @@ class WebTransport {
folly::Optional<uint32_t> error = folly::none) = 0;
};

// WebTransportHandler is a virtual interface for handling events that come
// from web transport that are not tied to an existing stream
//
// * New streams
// * Datagrams
// * The end of of session
class WebTransportHandler {
public:
virtual ~WebTransportHandler() = default;

virtual void onNewUniStream(WebTransport::StreamReadHandle* readHandle) = 0;
virtual void onNewBidiStream(WebTransport::BidiStreamHandle bidiHandle) = 0;
virtual void onDatagram(std::unique_ptr<folly::IOBuf> datagram) = 0;
virtual void onSessionEnd(folly::Optional<uint32_t> error) = 0;
};

} // namespace proxygen
245 changes: 245 additions & 0 deletions proxygen/lib/http/webtransport/test/FakeSharedWebTransport.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
* All rights reserved.
*
* This source code is licensed under the BSD-style license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <folly/ExceptionWrapper.h>
#include <folly/portability/GMock.h>
#include <proxygen/lib/http/webtransport/WebTransport.h>

namespace proxygen::test {

using GenericApiRet = folly::Expected<folly::Unit, WebTransport::ErrorCode>;

// WebTransport stream handle that is both a read and write handle. It's
// designed to be held by two FakeSharedWebTransport objects so that writes
// from one are delivered as reads to the other.
class FakeStreamHandle
: public WebTransport::StreamReadHandle
, public WebTransport::StreamWriteHandle {
public:
explicit FakeStreamHandle(uint64_t inId) : id(inId) {
}

~FakeStreamHandle() {
cs_.requestCancellation();
}

uint64_t getID() override {
return id;
}
folly::CancellationToken getCancelToken() override {
return cs_.getToken();
}
folly::SemiFuture<WebTransport::StreamData> readStreamData() override {
XCHECK(!promise_) << "One read at a time";
if (writeErr_) {
auto exwrapper =
folly::make_exception_wrapper<WebTransport::Exception>(*writeErr_);
return folly::makeFuture<WebTransport::StreamData>(exwrapper);
} else if (!buf_.empty() || fin_) {
return folly::makeFuture(WebTransport::StreamData({buf_.move(), fin_}));
} else {
// need a new promise
auto [promise, future] =
folly::makePromiseContract<WebTransport::StreamData>();
promise_ = std::move(promise);
return std::move(future);
}
}
GenericApiRet stopSending(uint32_t code) override {
stopSendingErrorCode_ = code;
cs_.requestCancellation();
return folly::unit;
}

using WriteStreamDataRet =
folly::Expected<folly::SemiFuture<folly::Unit>, WebTransport::ErrorCode>;
WriteStreamDataRet writeStreamData(std::unique_ptr<folly::IOBuf> data,
bool fin) override {
buf_.append(std::move(data));
fin_ = fin;
if (promise_) {
promise_->setValue(WebTransport::StreamData({buf_.move(), fin_}));
promise_.reset();
} else {
}
return folly::makeFuture(folly::unit);
}

GenericApiRet resetStream(uint32_t err) override {
if (promise_) {
promise_->setException(WebTransport::Exception(err));
} else {
writeErr_ = err;
}
return folly::unit;
}
GenericApiRet setPriority(uint8_t urgency,
uint64_t order,
bool inc) override {
pri.emplace(std::forward_as_tuple(urgency, order, inc));
return folly::unit;
}

uint64_t id{0};
folly::CancellationSource cs_;
folly::Optional<folly::Promise<WebTransport::StreamData>> promise_;
folly::IOBufQueue buf_{folly::IOBufQueue::cacheChainLength()};
bool fin_{false};
folly::Optional<std::tuple<uint8_t, uint64_t, bool>> pri;
folly::Optional<uint32_t> writeErr_;
};

// Implementation of WebTransport for testing two connected endpoints.
//
// Usage:
//
// auto [client, server] = FakeSharedWebTransport::makeSharedWebTransport();
//
// Each FakeSharedWebTransport also requires a WebTransportHandler for the peer
// to deliver new streams, datagrams, and end-of-session events.

class FakeSharedWebTransport : public WebTransport {
public:
static std::pair<std::unique_ptr<FakeSharedWebTransport>,
std::unique_ptr<FakeSharedWebTransport>>
makeSharedWebTransport() {
auto a = std::make_unique<FakeSharedWebTransport>();
auto b = std::make_unique<FakeSharedWebTransport>();
a->setPeer(b.get());
b->setPeer(a.get());
return {std::move(a), std::move(b)};
}
FakeSharedWebTransport() = default;
~FakeSharedWebTransport() override {
writeHandles.clear();
readHandles.clear();
}

void setPeer(FakeSharedWebTransport* peer) {
peer_ = peer;
}

void setPeerHandler(WebTransportHandler* peerHandler) {
peerHandler_ = peerHandler;
}

folly::Expected<StreamWriteHandle*, ErrorCode> createUniStream() override {
auto id = nextUniStreamId_;
nextUniStreamId_ += 4;
auto handle = std::make_shared<FakeStreamHandle>(id);
writeHandles.emplace(id, handle);
peer_->readHandles.emplace(id, handle);
peerHandler_->onNewUniStream(handle.get());
return handle.get();
}
folly::Expected<BidiStreamHandle, ErrorCode> createBidiStream() override {
auto id = nextBidiStreamId_;
nextBidiStreamId_ += 4;
auto readH = std::make_shared<FakeStreamHandle>(id);
auto writeH = std::make_shared<FakeStreamHandle>(id);
readHandles.emplace(id, readH);
writeHandles.emplace(id, writeH);
peer_->readHandles.emplace(id, writeH);
peer_->writeHandles.emplace(id, readH);
peerHandler_->onNewBidiStream({writeH.get(), readH.get()});
return BidiStreamHandle({readH.get(), writeH.get()});
}
using AwaitStreamCreditRet = folly::SemiFuture<folly::Unit>;
AwaitStreamCreditRet awaitUniStreamCredit() override {
return folly::makeFuture(folly::unit);
}
AwaitStreamCreditRet awaitBidiStreamCredit() override {
return folly::makeFuture(folly::unit);
}

using ReadStreamDataRet =
folly::Expected<folly::SemiFuture<StreamData>, WebTransport::ErrorCode>;
ReadStreamDataRet readStreamData(uint64_t id) override {
auto h = readHandles.find(id);
if (h == readHandles.end()) {
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
return h->second->readStreamData();
}

folly::Expected<folly::SemiFuture<folly::Unit>, ErrorCode> writeStreamData(
uint64_t id, std::unique_ptr<folly::IOBuf> data, bool fin) override {
auto h = writeHandles.find(id);
if (h == writeHandles.end()) {
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
return h->second->writeStreamData(std::move(data), fin);
}

folly::Expected<folly::Unit, ErrorCode> resetStream(uint64_t streamId,
uint32_t error) override {
auto h = writeHandles.find(streamId);
if (h == writeHandles.end()) {
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
return h->second->resetStream(error);
}

folly::Expected<folly::Unit, ErrorCode> setPriority(
uint64_t streamId,
uint8_t level,
uint64_t order,
bool incremental) override {
auto h = writeHandles.find(streamId);
if (h == writeHandles.end()) {
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
return h->second->setPriority(level, order, incremental);
}

folly::Expected<folly::Unit, ErrorCode> stopSending(uint64_t streamId,
uint32_t error) override {
auto h = readHandles.find(streamId);
if (h == readHandles.end()) {
return folly::makeUnexpected(WebTransport::ErrorCode::GENERIC_ERROR);
}
return h->second->stopSending(error);
}

folly::Expected<folly::Unit, ErrorCode> sendDatagram(
std::unique_ptr<folly::IOBuf> datagram) override {
peerHandler_->onDatagram(std::move(datagram));
return folly::unit;
}

// Close the WebTransport session, with an optional error
//
// Any pending futures will complete with a folly::OperationCancelled
// exception
folly::Expected<folly::Unit, ErrorCode> closeSession(
folly::Optional<uint32_t> error = folly::none) override {
for (auto& h : writeHandles) {
h.second->resetStream(std::numeric_limits<uint32_t>::max());
}
writeHandles.clear();
for (auto& h : readHandles) {
h.second->stopSending(std::numeric_limits<uint32_t>::max());
}
readHandles.clear();
peerHandler_->onSessionEnd(error);
return folly::unit;
}

std::map<uint64_t, std::shared_ptr<FakeStreamHandle>> writeHandles;
std::map<uint64_t, std::shared_ptr<FakeStreamHandle>> readHandles;

private:
uint64_t nextBidiStreamId_{0};
uint64_t nextUniStreamId_{2};
FakeSharedWebTransport* peer_{nullptr};
WebTransportHandler* peerHandler_{nullptr};
};

} // namespace proxygen::test

0 comments on commit f288d8b

Please sign in to comment.