Skip to content

Commit

Permalink
Allow to use an HTTP proxy
Browse files Browse the repository at this point in the history
Abstract SSL or plain mode of a ClientSession
  • Loading branch information
philippeVerney committed Feb 20, 2024
1 parent 1333d49 commit b1088bd
Show file tree
Hide file tree
Showing 15 changed files with 465 additions and 338 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
- dependencies
- The following compilers are known to work (used in CI)
- gcc from version 4.8
- visual studio from version 2017
- visual studio from version 2019
# Prepare the dependencies
Download (build and install if necessary) third party libraries:
- BOOST : All versions from version 1.66 should be ok but you may experience some [min/max build issues](https://github.com/boostorg/beast/issues/1980) using version 1.72 or 1.73.
Expand Down
67 changes: 36 additions & 31 deletions cmake/swigEtp1_2Include.i.in
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,12 @@ typedef long long time_t;
%nspace ETP_NS::TransactionHandlers;
%nspace ETP_NS::DataspaceHandlers;
%nspace ETP_NS::AbstractSession;
%nspace ETP_NS::PlainClientSession;
%nspace ETP_NS::ClientSession;
%nspace ETP_NS::InitializationParameters;
#ifdef WITH_FESAPI
%nspace ETP_NS::FesapiHdfProxyFactory;
#endif
#ifdef WITH_ETP_SSL
%nspace ETP_NS::SslClientSession;
#endif


%nspace Energistics::Etp::v12::Datatypes::SupportedDataObject;
%nspace Energistics::Etp::v12::Datatypes::Uuid;
%nspace Energistics::Etp::v12::Datatypes::Version;
Expand Down Expand Up @@ -1057,10 +1054,7 @@ namespace Energistics {
%shared_ptr(ETP_NS::TransactionHandlers)
%shared_ptr(ETP_NS::DataspaceHandlers)
%shared_ptr(ETP_NS::AbstractSession)
%shared_ptr(ETP_NS::PlainClientSession)
#ifdef WITH_ETP_SSL
%shared_ptr(ETP_NS::SslClientSession)
#endif
%shared_ptr(ETP_NS::ClientSession)

%feature("director") ETP_NS::CoreHandlers;
%feature("director") ETP_NS::DiscoveryHandlers;
Expand Down Expand Up @@ -1874,8 +1868,8 @@ namespace ETP_NS

/******************* CLIENT ***************************/

%nodefaultctor PlainClientSession;
class PlainClientSession : public AbstractSession
%nodefaultctor ClientSession;
class ClientSession : public AbstractSession
{
public:
/**
Expand All @@ -1885,27 +1879,43 @@ namespace ETP_NS
bool run();
};

#ifdef WITH_ETP_SSL
%nodefaultctor SslClientSession;
class SslClientSession : public AbstractSession
class InitializationParameters
{
public:
/**
* Run the websocket and then the ETP session.
* Everything related to this session (including the completion handlers) will operate on the same unique thread in a single event loop.
* @param instanceUuid The UUID of the client instance.
* @param etpServerUrl Must follow the syntax ws://<host>:<port>/<path> or wss://<host>:<port>/<path> or simply <host>:<port>/<path>
* where port is optional and is defaulted to 80 if scheme is "ws" or if no scheme is provided.
* In "wss" schema cases, port is defaulted to 443.
* @param proxyUrl The proxy URL. It must follow the syntax http://<host>:<port> or simply <host>:<port>.
* Leave it empty if your connection to eptServerUrl is direct and does not pass throughr any proxy.
*/
bool run();
};
#endif

class InitializationParameters
{
public:
InitializationParameters(const std::string& instanceUuid, const std::string & etpUrl);
InitializationParameters(const std::string& instanceUuid,
const std::string& etpServerUrl, const std::string& proxyUrl = "");
InitializationParameters(const std::string& instanceUuid, const std::string & host, unsigned short port, const std::string & urlPath = "");
virtual ~InitializationParameters();

void setMaxWebSocketMessagePayloadSize(int64_t value);
uint64_t getMaxWebSocketMessagePayloadSize() const;

void setPreferredMaxFrameSize(uint64_t value);
uint64_t getPreferredMaxFrameSize() const;

void setAdditionalHandshakeHeaderFields(const std::map<std::string, std::string>& extraHandshakeHeaderFields);
const std::map<std::string, std::string>& getAdditionalHandshakeHeaderFields() const;

void setAdditionalCertificates(const std::string& extraCertificates);
const std::string& getAdditionalCertificates() const;

const std::string& getEtpServerHost() const;
uint16_t getEtpServerPort() const;
const std::string& getEtpServerUrlPath() const;

const std::string& getProxyHost() const;
uint16_t getProxyPort() const;

void setForceTls(bool force);
bool isTlsForced();

virtual std::string getApplicationName() const;
virtual std::string getApplicationVersion() const;
Expand All @@ -1921,13 +1931,8 @@ namespace ETP_NS

namespace ClientSessionLaunchers
{
std::shared_ptr<ETP_NS::PlainClientSession> createWsClientSession(InitializationParameters* initializationParams, const std::string & authorization,
const std::map<std::string, std::string>& additionalHandshakeHeaderFields = {}, std::size_t preferredMaxFrameSize = 4096);

#ifdef WITH_ETP_SSL
std::shared_ptr<ETP_NS::SslClientSession> createWssClientSession(InitializationParameters* initializationParams, const std::string & authorization,
const std::map<std::string, std::string>& additionalHandshakeHeaderFields = {}, std::size_t preferredMaxFrameSize = 4096, const std::string & additionalCertificates = "");
#endif
std::shared_ptr<ETP_NS::ClientSession> createClientSession(InitializationParameters* initializationParams,
const std::string & etpServerAuthorization, const std::string& proxyAuthorization = "");
}

#ifdef WITH_FESAPI
Expand Down
173 changes: 173 additions & 0 deletions src/etp/AbstractClientSessionCRTP.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*-----------------------------------------------------------------------
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"; you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-----------------------------------------------------------------------*/
#pragma once

#include "ClientSession.h"

#include <thread>

#include <boost/asio/connect.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid_io.hpp>

#include "InitializationParameters.h"

namespace ETP_NS
{
// Echoes back all received WebSocket messages.
// This uses the Curiously Recurring Template Pattern so that the same code works with both SSL streams and regular sockets.
template<class Derived>
class AbstractClientSessionCRTP : public ETP_NS::ClientSession
{
public:

virtual ~AbstractClientSessionCRTP() = default;

void on_connect(boost::system::error_code ec) {
if (ec) {
std::cerr << "on_connect : " << ec.message() << std::endl;
}

#if BOOST_VERSION < 107000
// Perform the websocket handshake
derived().ws().async_handshake_ex(responseType,
host + ":" + port, target,
[&](websocket::request_type& m)
{
m.insert(boost::beast::http::field::sec_websocket_protocol, "etp12.energistics.org");
m.insert(boost::beast::http::field::authorization, authorization);
m.insert("etp-encoding", "binary");
for (const auto& mapEntry : additionalHandshakeHeaderFields_) {
m.insert(mapEntry.first, mapEntry.second);
}
},
std::bind(
&AbstractClientSessionCRTP::on_handshake,
std::static_pointer_cast<AbstractClientSessionCRTP>(shared_from_this()),
std::placeholders::_1));
#else
derived().ws().set_option(websocket::stream_base::decorator(
[&](websocket::request_type& m)
{
m.insert(boost::beast::http::field::sec_websocket_protocol, "etp12.energistics.org");
m.insert(boost::beast::http::field::authorization, authorization);
m.insert("etp-encoding", "binary");
for (const auto& mapEntry : additionalHandshakeHeaderFields_) {
m.insert(mapEntry.first, mapEntry.second);
}
})
);
// Perform the websocket handshake
derived().ws().async_handshake(responseType,
host + ":" + port, target,
std::bind(
&AbstractClientSessionCRTP::on_handshake,
std::static_pointer_cast<AbstractClientSessionCRTP>(shared_from_this()),
std::placeholders::_1));
#endif
}

/**
* Close the Websocket Session
* The ETP session had to be closed before.
*/
FETPAPI_DLL_IMPORT_OR_EXPORT void do_close() {
derived().ws().async_close(websocket::close_code::normal,
std::bind(
&AbstractSession::on_close,
shared_from_this(),
std::placeholders::_1));
}

FETPAPI_DLL_IMPORT_OR_EXPORT void do_read()
{
if (webSocketSessionClosed) {
fesapi_log("CLOSED : NOTHING MORE TO DO");
return;
}

// Read a message into our buffer
derived().ws().async_read(
receivedBuffer,
std::bind(
&AbstractSession::on_read,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2));
}

void on_handshake(boost::system::error_code ec)
{
if (ec) {
std::cerr << "on_handshake : " << ec.message() << std::endl;
std::cerr << "Sometimes some ETP server require a trailing slash at the end of their URL. Did you also check your optional \"data-partition-id\" additional Header Field?" << std::endl;
return;
}

if (!responseType.count(boost::beast::http::field::sec_websocket_protocol) ||
responseType[boost::beast::http::field::sec_websocket_protocol] != "etp12.energistics.org")
std::cerr << "The client MUST specify the Sec-Websocket-Protocol header value of etp12.energistics.org, and the server MUST reply with the same" << std::endl;

successfulConnection = true;
webSocketSessionClosed = false;

send(requestSession, 0, 0x02);
do_read();
}

void setMaxWebSocketMessagePayloadSize(int64_t value) final {
maxWebSocketMessagePayloadSize = value;
derived().ws().read_message_max(value);
}

protected:
using ClientSession::ClientSession;

// Access the derived class, this is part of the Curiously Recurring Template Pattern idiom.
Derived& derived() { return static_cast<Derived&>(*this); }

void do_write() {
const std::lock_guard<std::mutex> specificProtocolHandlersLock(specificProtocolHandlersMutex);
if (sendingQueue.empty()) {
fesapi_log("The sending queue is empty.");
return;
}

bool previousSentMessageCompleted = specificProtocolHandlers.find(std::get<0>(sendingQueue.front())) == specificProtocolHandlers.end();

if (!previousSentMessageCompleted) {
fesapi_log("Cannot send Message id :", std::to_string(std::get<0>(sendingQueue.front())), "because the previous message has not finished to be sent.");
}
else {
fesapi_log("Sending Message id :", std::to_string(std::get<0>(sendingQueue.front())));

derived().ws().async_write(
boost::asio::buffer(std::get<1>(sendingQueue.front())),
std::bind(
&AbstractSession::on_write,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2));

// Register the handler to respond to the sent message
specificProtocolHandlers[std::get<0>(sendingQueue.front())] = std::get<2>(sendingQueue.front());
}
}
};
}
Loading

0 comments on commit b1088bd

Please sign in to comment.