Skip to content

Commit f0aa29d

Browse files
Riegerwrieg123
Rieger
authored andcommitted
websocket client adapter
Signed-off-by: Rieger <wr10136@devqtccrt05.saccap.int>
1 parent cee23e6 commit f0aa29d

23 files changed

+1501
-299
lines changed

CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ option(CSP_USE_LD_CLASSIC_MAC "On macOS, link with ld_classic" OFF)
7171
# Extension options
7272
option(CSP_BUILD_KAFKA_ADAPTER "Build kafka adapter" ON)
7373
option(CSP_BUILD_PARQUET_ADAPTER "Build parquet adapter" ON)
74+
option(CSP_BUILD_WS_CLIENT_ADAPTER "Build ws client adapter" ON)
7475

7576
# Normalize build type for downstream comparisons
7677
string(TOLOWER "${CMAKE_BUILD_TYPE}" CMAKE_BUILD_TYPE_LOWER)

NOTICE

+525-290
Large diffs are not rendered by default.

conda/dev-environment-unix.yml

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ dependencies:
1818
- isort
1919
- libarrow=15
2020
- librdkafka
21+
- libboost-headers
2122
- lz4-c
2223
- mamba
2324
- mdformat
@@ -47,4 +48,5 @@ dependencies:
4748
- twine
4849
- unzip
4950
- wheel
51+
- websocketpp
5052
- zip

cpp/csp/adapters/CMakeLists.txt

+4
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,8 @@ if(CSP_BUILD_PARQUET_ADAPTER)
77
add_subdirectory(parquet)
88
endif()
99

10+
if(CSP_BUILD_WS_CLIENT_ADAPTER)
11+
add_subdirectory(websocket)
12+
endif()
13+
1014
add_subdirectory(utils)
+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
csp_autogen( csp.adapters.websocket_types websocket_types WEBSOCKET_HEADER WEBSOCKET_SOURCE )
2+
3+
set(WS_CLIENT_HEADER_FILES
4+
ClientAdapterManager.h
5+
ClientInputAdapter.h
6+
ClientOutputAdapter.h
7+
ClientHeaderUpdateAdapter.h
8+
WebsocketEndpoint.h
9+
${WEBSOCKET_HEADER}
10+
)
11+
12+
set(WS_CLIENT_SOURCE_FILES
13+
ClientAdapterManager.cpp
14+
ClientInputAdapter.cpp
15+
ClientOutputAdapter.cpp
16+
ClientHeaderUpdateAdapter.cpp
17+
WebsocketEndpoint.cpp
18+
${WS_CLIENT_HEADER_FILES}
19+
${WEBSOCKET_SOURCE}
20+
)
21+
22+
add_library(csp_websocket_client_adapter STATIC ${WS_CLIENT_SOURCE_FILES})
23+
set_target_properties(csp_websocket_client_adapter PROPERTIES PUBLIC_HEADER "${WS_CLIENT_SOURCE_FILES}")
24+
25+
find_package(websocketpp REQUIRED)
26+
#set(OPENSSL_USE_STATIC_LIBS TRUE)
27+
find_package(OpenSSL REQUIRED)
28+
29+
target_link_libraries(csp_websocket_client_adapter
30+
PRIVATE
31+
csp_adapter_utils
32+
${OPENSSL_SSL_LIBRARY}
33+
${OPENSSL_CRYPTO_LIBRARY}
34+
websocketpp::websocketpp
35+
)
36+
37+
install(TARGETS csp_websocket_client_adapter
38+
PUBLIC_HEADER DESTINATION include/csp/adapters/websocket
39+
RUNTIME DESTINATION bin/
40+
LIBRARY DESTINATION lib/
41+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
#include <csp/adapters/websocket/ClientAdapterManager.h>
2+
3+
#include <csp/core/Platform.h>
4+
#include <chrono>
5+
#include <iomanip>
6+
#include <iostream>
7+
8+
namespace csp {
9+
10+
INIT_CSP_ENUM( adapters::websocket::ClientStatusType,
11+
"ACTIVE",
12+
"GENERIC_ERROR",
13+
"CONNECTION_FAILED",
14+
"CLOSED",
15+
"MESSAGE_SEND_FAIL",
16+
);
17+
18+
}
19+
20+
// With TLS
21+
namespace csp::adapters::websocket {
22+
23+
ClientAdapterManager::ClientAdapterManager( Engine* engine, const Dictionary & properties )
24+
: AdapterManager( engine ),
25+
m_active( false ),
26+
m_shouldRun( false ),
27+
m_endpoint( nullptr ),
28+
m_inputAdapter( nullptr ),
29+
m_outputAdapter( nullptr ),
30+
m_updateAdapter( nullptr ),
31+
m_thread( nullptr ),
32+
m_properties( properties )
33+
{
34+
if( m_properties.get<bool>( "use_tls" ) )
35+
{
36+
m_endpoint = new WebsocketEndpointTLS( properties );
37+
}
38+
else
39+
{
40+
m_endpoint = new WebsocketEndpointNoTLS( properties );
41+
}
42+
43+
if( m_inputAdapter != nullptr )
44+
{
45+
m_endpoint -> setOnMessageCb( [ this ]( std::string msg ) {
46+
PushBatch batch( m_engine -> rootEngine() );
47+
m_inputAdapter -> processMessage( msg, &batch );
48+
});
49+
}
50+
m_endpoint -> setOnOpenCb( [ this ]() {
51+
m_active = true;
52+
pushStatus( StatusLevel::INFO, ClientStatusType::ACTIVE, "Connected successfully" );
53+
});
54+
m_endpoint -> setOnFailCb( [ this ]() {
55+
m_active = false;
56+
pushStatus( StatusLevel::ERROR, ClientStatusType::CONNECTION_FAILED, "Connection failed, will try to reconnect" );
57+
});
58+
m_endpoint -> setOnCloseCb( [ this ]() {
59+
m_active = false;
60+
pushStatus( StatusLevel::INFO, ClientStatusType::CLOSED, "Connection closed" );
61+
});
62+
m_endpoint -> setOnSendFailCb( [ this ]( const std::string& s ) {
63+
std::stringstream ss;
64+
ss << "Failed to send: " << s;
65+
pushStatus( StatusLevel::ERROR, ClientStatusType::MESSAGE_SEND_FAIL, ss.str() );
66+
});
67+
68+
};
69+
70+
ClientAdapterManager::~ClientAdapterManager()
71+
{ };
72+
73+
void ClientAdapterManager::start( DateTime starttime, DateTime endtime )
74+
{
75+
AdapterManager::start( starttime, endtime );
76+
// start the bg thread
77+
m_shouldRun = true;
78+
m_thread = std::make_unique<std::thread>( [ this ]() {
79+
while( m_shouldRun )
80+
{
81+
m_endpoint -> run();
82+
m_active = false;
83+
if( m_shouldRun ) sleep( m_properties.get<TimeDelta>( "reconnect_interval" ) );
84+
}
85+
});
86+
};
87+
88+
void ClientAdapterManager::stop() {
89+
AdapterManager::stop();
90+
91+
m_shouldRun=false;
92+
if( m_active ) m_endpoint->close();
93+
if( m_thread ) m_thread->join();
94+
};
95+
96+
PushInputAdapter* ClientAdapterManager::getInputAdapter(CspTypePtr & type, PushMode pushMode, const Dictionary & properties)
97+
{
98+
if (m_inputAdapter == nullptr)
99+
{
100+
m_inputAdapter = m_engine -> createOwnedObject<ClientInputAdapter>(
101+
// m_engine,
102+
type,
103+
pushMode,
104+
properties
105+
);
106+
}
107+
return m_inputAdapter;
108+
};
109+
110+
OutputAdapter* ClientAdapterManager::getOutputAdapter()
111+
{
112+
if (m_outputAdapter == nullptr) m_outputAdapter = m_engine -> createOwnedObject<ClientOutputAdapter>(m_endpoint);
113+
114+
return m_outputAdapter;
115+
}
116+
117+
OutputAdapter * ClientAdapterManager::getHeaderUpdateAdapter()
118+
{
119+
if (m_updateAdapter == nullptr) m_updateAdapter = m_engine -> createOwnedObject<ClientHeaderUpdateOutputAdapter>( m_endpoint -> getProperties() );
120+
121+
return m_updateAdapter;
122+
}
123+
124+
DateTime ClientAdapterManager::processNextSimTimeSlice( DateTime time )
125+
{
126+
// no sim data
127+
return DateTime::NONE();
128+
}
129+
130+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#ifndef _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_ADAPTERMGR_H
2+
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_ADAPTERMGR_H
3+
4+
#include <csp/core/Enum.h>
5+
#include <csp/core/Hash.h>
6+
#include <csp/engine/AdapterManager.h>
7+
#include <csp/engine/Dictionary.h>
8+
#include <csp/engine/PushInputAdapter.h>
9+
#include <thread>
10+
11+
#include <csp/adapters/websocket/ClientInputAdapter.h>
12+
#include <csp/adapters/websocket/ClientOutputAdapter.h>
13+
#include <csp/adapters/websocket/ClientHeaderUpdateAdapter.h>
14+
#include <csp/adapters/websocket/WebsocketEndpoint.h>
15+
16+
namespace csp::adapters::websocket {
17+
18+
using namespace csp;
19+
20+
struct WebsocketClientStatusTypeTraits
21+
{
22+
enum _enum : unsigned char
23+
{
24+
ACTIVE = 0,
25+
GENERIC_ERROR = 1,
26+
CONNECTION_FAILED = 2,
27+
CLOSED = 3,
28+
MESSAGE_SEND_FAIL = 4,
29+
30+
NUM_TYPES
31+
};
32+
33+
protected:
34+
_enum m_value;
35+
};
36+
37+
using ClientStatusType = Enum<WebsocketClientStatusTypeTraits>;
38+
39+
class ClientAdapterManager final : public AdapterManager
40+
{
41+
42+
43+
public:
44+
ClientAdapterManager(
45+
Engine * engine,
46+
const Dictionary & properties
47+
);
48+
~ClientAdapterManager();
49+
50+
const char * name() const override { return "WebsocketClientAdapterManager"; }
51+
52+
void start( DateTime starttime, DateTime endtime ) override;
53+
54+
void stop() override;
55+
56+
PushInputAdapter * getInputAdapter( CspTypePtr & type, PushMode pushMode, const Dictionary & properties );
57+
OutputAdapter * getOutputAdapter();
58+
OutputAdapter * getHeaderUpdateAdapter();
59+
60+
DateTime processNextSimTimeSlice( DateTime time ) override;
61+
62+
private:
63+
// need some client info
64+
65+
bool m_active;
66+
bool m_shouldRun;
67+
WebsocketEndpointBase* m_endpoint;
68+
ClientInputAdapter* m_inputAdapter;
69+
ClientOutputAdapter* m_outputAdapter;
70+
ClientHeaderUpdateOutputAdapter* m_updateAdapter;
71+
std::unique_ptr<std::thread> m_thread;
72+
Dictionary m_properties;
73+
};
74+
75+
}
76+
77+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#include <csp/adapters/websocket/ClientHeaderUpdateAdapter.h>
2+
3+
namespace csp::adapters::websocket {
4+
5+
ClientHeaderUpdateOutputAdapter::ClientHeaderUpdateOutputAdapter(
6+
Engine * engine,
7+
Dictionary& properties
8+
) : OutputAdapter( engine ), m_properties( properties )
9+
{ };
10+
11+
void ClientHeaderUpdateOutputAdapter::executeImpl()
12+
{
13+
DictionaryPtr headers = m_properties.get<DictionaryPtr>("headers");
14+
for( auto& update : input() -> lastValueTyped<std::vector<WebsocketHeaderUpdate::Ptr>>() )
15+
{
16+
if( update -> key_isSet() && update -> value_isSet() ) headers->update( update->key(), update->value() );
17+
}
18+
};
19+
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
#ifndef _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_HEADERUPDATEADAPTER_H
2+
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_HEADERUPDATEADAPTER_H
3+
4+
#include <websocketpp/config/asio_client.hpp>
5+
#include <websocketpp/client.hpp>
6+
#include <csp/engine/Dictionary.h>
7+
#include <csp/engine/OutputAdapter.h>
8+
#include <csp/adapters/utils/MessageWriter.h>
9+
#include <csp/adapters/websocket/csp_autogen/websocket_types.h>
10+
11+
namespace csp::adapters::websocket
12+
{
13+
using namespace csp::autogen;
14+
15+
class ClientHeaderUpdateOutputAdapter final: public OutputAdapter
16+
{
17+
public:
18+
ClientHeaderUpdateOutputAdapter(
19+
Engine * engine,
20+
Dictionary& properties
21+
);
22+
23+
void executeImpl() override;
24+
25+
const char * name() const override { return "WebsocketClientHeaderUpdateAdapter"; }
26+
27+
private:
28+
Dictionary& m_properties;
29+
30+
};
31+
32+
}
33+
34+
35+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#include <csp/adapters/websocket/ClientInputAdapter.h>
2+
3+
namespace csp::adapters::websocket
4+
{
5+
6+
ClientInputAdapter::ClientInputAdapter(
7+
Engine * engine,
8+
CspTypePtr & type,
9+
PushMode pushMode,
10+
const Dictionary & properties
11+
) : PushInputAdapter(engine, type, pushMode)
12+
{
13+
if( type -> type() != CspType::Type::STRUCT &&
14+
type -> type() != CspType::Type::STRING )
15+
CSP_THROW( RuntimeException, "Unsupported type: " << type -> type() );
16+
17+
if( properties.exists( "meta_field_map" ) )
18+
{
19+
const Dictionary & metaFieldMap = *properties.get<DictionaryPtr>( "meta_field_map" );
20+
21+
if( !metaFieldMap.empty() && type -> type() != CspType::Type::STRUCT )
22+
CSP_THROW( ValueError, "meta_field_map is not supported on non-struct types" );
23+
}
24+
25+
m_converter = adapters::utils::MessageStructConverterCache::instance().create( type, properties );
26+
};
27+
28+
void ClientInputAdapter::processMessage( std::string payload, PushBatch* batch )
29+
{
30+
31+
if( type() -> type() == CspType::Type::STRUCT )
32+
{
33+
auto tick = m_converter -> asStruct( &payload, payload.length() );
34+
pushTick( std::move(tick), batch );
35+
} else if ( type() -> type() == CspType::Type::STRING )
36+
{
37+
pushTick( std::move(payload), batch );
38+
}
39+
40+
}
41+
42+
}

0 commit comments

Comments
 (0)