Skip to content

Commit 6fb4924

Browse files
RiegerRieger
Rieger
authored and
Rieger
committed
websocket client adapter
Signed-off-by: Rieger <wr10136@devqtccrt05.saccap.int>
1 parent cee23e6 commit 6fb4924

22 files changed

+990
-8
lines changed

CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ option(CSP_USE_LD_CLASSIC_MAC "On macOS, link with ld_classic" OFF)
7171
# Extension options
7272
option(CSP_BUILD_KAFKA_ADAPTER "Build kafka adapter" ON)
7373
option(CSP_BUILD_PARQUET_ADAPTER "Build parquet adapter" ON)
74+
option(CSP_BUILD_WS_CLIENT_ADAPTER "Build ws client adapter" ON)
7475

7576
# Normalize build type for downstream comparisons
7677
string(TOLOWER "${CMAKE_BUILD_TYPE}" CMAKE_BUILD_TYPE_LOWER)

conda/dev-environment-unix.yml

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ dependencies:
1818
- isort
1919
- libarrow=15
2020
- librdkafka
21+
- libboost-headers
2122
- lz4-c
2223
- mamba
2324
- mdformat
@@ -47,4 +48,5 @@ dependencies:
4748
- twine
4849
- unzip
4950
- wheel
51+
- websocketpp
5052
- zip

cpp/csp/adapters/CMakeLists.txt

+4
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,8 @@ if(CSP_BUILD_PARQUET_ADAPTER)
77
add_subdirectory(parquet)
88
endif()
99

10+
if(CSP_BUILD_WS_CLIENT_ADAPTER)
11+
add_subdirectory(websocket)
12+
endif()
13+
1014
add_subdirectory(utils)
+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
csp_autogen( csp.adapters.websocket_types websocket_types WEBSOCKET_HEADER WEBSOCKET_SOURCE )
2+
3+
set(WS_CLIENT_HEADER_FILES
4+
ClientAdapterManager.h
5+
ClientInputAdapter.h
6+
ClientOutputAdapter.h
7+
ClientHeaderUpdateAdapter.h
8+
WebsocketEndpoint.h
9+
${WEBSOCKET_HEADER}
10+
)
11+
12+
set(WS_CLIENT_SOURCE_FILES
13+
ClientAdapterManager.cpp
14+
ClientInputAdapter.cpp
15+
ClientOutputAdapter.cpp
16+
ClientHeaderUpdateAdapter.cpp
17+
WebsocketEndpoint.cpp
18+
${WS_CLIENT_HEADER_FILES}
19+
${WEBSOCKET_SOURCE}
20+
)
21+
22+
add_library(csp_websocket_client_adapter STATIC ${WS_CLIENT_SOURCE_FILES})
23+
set_target_properties(csp_websocket_client_adapter PROPERTIES PUBLIC_HEADER "${WS_CLIENT_SOURCE_FILES}")
24+
25+
find_package(websocketpp REQUIRED)
26+
#set(OPENSSL_USE_STATIC_LIBS TRUE)
27+
find_package(OpenSSL REQUIRED)
28+
29+
target_link_libraries(csp_websocket_client_adapter
30+
PRIVATE
31+
csp_adapter_utils
32+
${OPENSSL_SSL_LIBRARY}
33+
${OPENSSL_CRYPTO_LIBRARY}
34+
websocketpp::websocketpp
35+
)
36+
37+
install(TARGETS csp_websocket_client_adapter
38+
PUBLIC_HEADER DESTINATION include/csp/adapters/websocket
39+
RUNTIME DESTINATION bin/
40+
LIBRARY DESTINATION lib/
41+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
#include <csp/adapters/websocket/ClientAdapterManager.h>
2+
3+
#include <csp/core/Platform.h>
4+
#include <chrono>
5+
#include <iomanip>
6+
#include <iostream>
7+
8+
9+
using websocketpp::lib::placeholders::_1;
10+
using websocketpp::lib::placeholders::_2;
11+
using websocketpp::lib::bind;
12+
13+
namespace csp {
14+
15+
INIT_CSP_ENUM( adapters::websocket::ClientStatusType,
16+
"ACTIVE",
17+
"GENERIC_ERROR",
18+
"CONNECTION_FAILED",
19+
"CLOSED",
20+
"MESSAGE_SEND_FAIL",
21+
);
22+
23+
}
24+
25+
// With TLS
26+
namespace csp::adapters::websocket {
27+
28+
ClientAdapterManager::ClientAdapterManager( Engine* engine, const Dictionary & properties
29+
) : AdapterManager( engine ),
30+
m_active(false),
31+
m_shouldRun(false),
32+
m_endpoint(nullptr),
33+
m_inputAdapter(nullptr),
34+
m_outputAdapter(nullptr),
35+
m_updateAdapter(nullptr),
36+
m_thread(nullptr),
37+
m_properties(properties)
38+
{
39+
if (m_properties.get<bool>("use_tls")) {
40+
m_endpoint = new WebsocketEndpointTLS(properties);
41+
} else {
42+
m_endpoint = new WebsocketEndpointNoTLS(properties);
43+
}
44+
45+
if (m_inputAdapter != nullptr)
46+
{
47+
m_endpoint->setOnMessageCb(std::move([this](std::string msg) {
48+
PushBatch batch( m_engine -> rootEngine() );
49+
m_inputAdapter->processMessage(msg, &batch);
50+
}));
51+
}
52+
m_endpoint->setOnOpenCb(std::move([this](){
53+
m_active = true;
54+
pushStatus(StatusLevel::INFO, ClientStatusType::ACTIVE, "Connected successfully");
55+
}));
56+
m_endpoint->setOnFailCb(std::move([this](){
57+
m_active = false;
58+
pushStatus(StatusLevel::ERROR, ClientStatusType::CONNECTION_FAILED, "Connection failed, will try to reconnect");
59+
}));
60+
m_endpoint->setOnCloseCb(std::move([this](){
61+
m_active = false;
62+
pushStatus(StatusLevel::INFO, ClientStatusType::CLOSED, "Connection closed");
63+
}));
64+
m_endpoint->setOnSendFailCb(std::move([this](const std::string& s){
65+
std::stringstream ss;
66+
ss << "Failed to send: " << s;
67+
pushStatus(StatusLevel::ERROR, ClientStatusType::MESSAGE_SEND_FAIL, ss.str());
68+
}));
69+
70+
};
71+
72+
ClientAdapterManager::~ClientAdapterManager()
73+
{ };
74+
75+
void ClientAdapterManager::start( DateTime starttime, DateTime endtime )
76+
{
77+
AdapterManager::start( starttime, endtime );
78+
// start the bg thread
79+
m_shouldRun = true;
80+
m_thread = std::make_unique<std::thread>( [ this ](){
81+
while (m_shouldRun)
82+
{
83+
m_endpoint->run();
84+
m_active=false;
85+
if(m_shouldRun) sleep( m_properties.get<TimeDelta>("reconnect_interval") );
86+
}
87+
});
88+
};
89+
90+
void ClientAdapterManager::stop() {
91+
AdapterManager::stop();
92+
93+
m_shouldRun=false;
94+
if(m_active) {
95+
m_endpoint->close();
96+
}
97+
98+
if(m_thread) {
99+
m_thread->join();
100+
}
101+
};
102+
103+
PushInputAdapter* ClientAdapterManager::getInputAdapter(CspTypePtr & type, PushMode pushMode, const Dictionary & properties)
104+
{
105+
if (m_inputAdapter == nullptr)
106+
{
107+
m_inputAdapter = m_engine->createOwnedObject<ClientInputAdapter>(
108+
// m_engine,
109+
type,
110+
pushMode,
111+
properties
112+
);
113+
}
114+
return m_inputAdapter;
115+
};
116+
117+
OutputAdapter* ClientAdapterManager::getOutputAdapter()
118+
{
119+
if (m_outputAdapter == nullptr)
120+
{
121+
m_outputAdapter = m_engine->createOwnedObject<ClientOutputAdapter>(m_endpoint);
122+
}
123+
124+
return m_outputAdapter;
125+
}
126+
127+
OutputAdapter * ClientAdapterManager::getHeaderUpdateAdapter()
128+
{
129+
if (m_updateAdapter == nullptr)
130+
{
131+
m_updateAdapter = m_engine->createOwnedObject<ClientHeaderUpdateOutputAdapter>(
132+
m_endpoint->getProperties()
133+
);
134+
}
135+
136+
return m_updateAdapter;
137+
}
138+
139+
DateTime ClientAdapterManager::processNextSimTimeSlice( DateTime time )
140+
{
141+
// no sim data
142+
return DateTime::NONE();
143+
}
144+
145+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#ifndef _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_ADAPTERMGR_H
2+
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_ADAPTERMGR_H
3+
4+
#include <csp/core/Enum.h>
5+
#include <csp/core/Hash.h>
6+
#include <csp/engine/AdapterManager.h>
7+
#include <csp/engine/Dictionary.h>
8+
#include <csp/engine/PushInputAdapter.h>
9+
#include <thread>
10+
11+
#include <csp/adapters/websocket/ClientInputAdapter.h>
12+
#include <csp/adapters/websocket/ClientOutputAdapter.h>
13+
#include <csp/adapters/websocket/ClientHeaderUpdateAdapter.h>
14+
#include <csp/adapters/websocket/WebsocketEndpoint.h>
15+
16+
namespace csp::adapters::websocket {
17+
18+
using namespace csp;
19+
20+
struct WebsocketClientStatusTypeTraits
21+
{
22+
enum _enum : unsigned char
23+
{
24+
ACTIVE = 0,
25+
GENERIC_ERROR = 1,
26+
CONNECTION_FAILED = 2,
27+
CLOSED = 3,
28+
MESSAGE_SEND_FAIL = 4,
29+
30+
NUM_TYPES
31+
};
32+
33+
protected:
34+
_enum m_value;
35+
};
36+
37+
using ClientStatusType = Enum<WebsocketClientStatusTypeTraits>;
38+
39+
class ClientAdapterManager final : public AdapterManager
40+
{
41+
42+
43+
public:
44+
ClientAdapterManager(
45+
Engine * engine,
46+
const Dictionary & properties
47+
);
48+
~ClientAdapterManager();
49+
50+
const char * name() const override { return "WebsocketClientAdapterManager"; }
51+
52+
void start( DateTime starttime, DateTime endtime ) override;
53+
54+
void stop() override;
55+
56+
PushInputAdapter * getInputAdapter( CspTypePtr & type, PushMode pushMode, const Dictionary & properties );
57+
OutputAdapter * getOutputAdapter();
58+
OutputAdapter * getHeaderUpdateAdapter();
59+
60+
DateTime processNextSimTimeSlice( DateTime time ) override;
61+
62+
private:
63+
// need some client info
64+
65+
bool m_active;
66+
bool m_shouldRun;
67+
WebsocketEndpointBase* m_endpoint;
68+
ClientInputAdapter* m_inputAdapter;
69+
ClientOutputAdapter* m_outputAdapter;
70+
ClientHeaderUpdateOutputAdapter* m_updateAdapter;
71+
std::unique_ptr<std::thread> m_thread;
72+
Dictionary m_properties;
73+
};
74+
75+
}
76+
77+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#include <csp/adapters/websocket/ClientHeaderUpdateAdapter.h>
2+
3+
namespace csp::adapters::websocket {
4+
5+
ClientHeaderUpdateOutputAdapter::ClientHeaderUpdateOutputAdapter(
6+
Engine * engine,
7+
Dictionary& properties
8+
) : OutputAdapter( engine ), m_properties(properties)
9+
{
10+
11+
};
12+
13+
void ClientHeaderUpdateOutputAdapter::executeImpl()
14+
{
15+
DictionaryPtr headers = m_properties.get<DictionaryPtr>("headers");
16+
for(auto update : input() -> lastValueTyped<std::vector<WebsocketHeaderUpdate::Ptr>>())
17+
{
18+
if(update->key_isSet() && update->value_isSet()) headers->update(update->key(), update->value());
19+
}
20+
};
21+
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#ifndef _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_HEADERUPDATEADAPTER_H
2+
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_HEADERUPDATEADAPTER_H
3+
4+
#include <websocketpp/config/asio_client.hpp>
5+
#include <websocketpp/client.hpp>
6+
#include <csp/engine/Dictionary.h>
7+
#include <csp/engine/OutputAdapter.h>
8+
#include <csp/adapters/utils/MessageWriter.h>
9+
#include <csp/adapters/websocket/csp_autogen/websocket_types.h>
10+
11+
namespace csp::adapters::websocket
12+
{
13+
using namespace csp::autogen;
14+
15+
class ClientHeaderUpdateOutputAdapter final: public OutputAdapter
16+
{
17+
public:
18+
ClientHeaderUpdateOutputAdapter(
19+
Engine * engine,
20+
Dictionary& properties
21+
);
22+
23+
void executeImpl() override;
24+
25+
const char * name() const override { return "WebsocketClientHeaderUpdateAdapter"; }
26+
27+
private:
28+
Dictionary& m_properties;
29+
30+
};
31+
32+
}
33+
34+
35+
#endif

0 commit comments

Comments
 (0)