Skip to content

Commit 7f9f923

Browse files
author
Rieger
committed
websocket client adapter
Signed-off-by: Rieger <wr10136@devqtccrt05.saccap.int> remove ts Signed-off-by: Rieger <wr10136@devqtccrt05.saccap.int>
1 parent a85bb5f commit 7f9f923

20 files changed

+952
-7
lines changed

CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ option(CSP_USE_LD_CLASSIC_MAC "On macOS, link with ld_classic" OFF)
7171
# Extension options
7272
option(CSP_BUILD_KAFKA_ADAPTER "Build kafka adapter" ON)
7373
option(CSP_BUILD_PARQUET_ADAPTER "Build parquet adapter" ON)
74+
option(CSP_BUILD_WS_CLIENT_ADAPTER "Build ws client adapter" ON)
7475

7576
# Normalize build type for downstream comparisons
7677
string(TOLOWER "${CMAKE_BUILD_TYPE}" CMAKE_BUILD_TYPE_LOWER)

conda/dev-environment-linux.yml

+1
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,5 @@ dependencies:
4646
- twine
4747
- unzip
4848
- wheel
49+
- websocketpp
4950
- zip

cpp/csp/adapters/CMakeLists.txt

+4
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,8 @@ if(CSP_BUILD_PARQUET_ADAPTER)
77
add_subdirectory(parquet)
88
endif()
99

10+
if(CSP_BUILD_WS_CLIENT_ADAPTER)
11+
add_subdirectory(websockets)
12+
endif()
13+
1014
add_subdirectory(utils)
+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
set(WS_CLIENT_HEADER_FILES
2+
ClientAdapterManager.h
3+
ClientInputAdapter.h
4+
ClientOutputAdapter.h
5+
ClientHeaderUpdateAdapter.h
6+
WebsocketEndpoint.h
7+
)
8+
9+
set(WS_CLIENT_SOURCE_FILES
10+
ClientAdapterManager.cpp
11+
ClientInputAdapter.cpp
12+
ClientOutputAdapter.cpp
13+
ClientHeaderUpdateAdapter.cpp
14+
WebsocketEndpoint.cpp
15+
${WS_CLIENT_HEADER_FILES}
16+
)
17+
18+
add_library(csp_websocket_client_adapter STATIC ${WS_CLIENT_SOURCE_FILES})
19+
set_target_properties(csp_websocket_client_adapter PROPERTIES PUBLIC_HEADER "${WS_CLIENT_SOURCE_FILES}")
20+
21+
find_package(websocketpp REQUIRED)
22+
set(OPENSSL_USE_STATIC_LIBS TRUE)
23+
find_package(OpenSSL REQUIRED)
24+
25+
target_link_libraries(csp_websocket_client_adapter
26+
PRIVATE
27+
csp_adapter_utils
28+
${OPENSSL_SSL_LIBRARY}
29+
${OPENSSL_CRYPTO_LIBRARY}
30+
websocketpp::websocketpp
31+
)
32+
33+
install(TARGETS csp_websocket_client_adapter
34+
PUBLIC_HEADER DESTINATION include/csp/adapters/websockets
35+
RUNTIME DESTINATION bin/
36+
LIBRARY DESTINATION lib/
37+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
#include <csp/adapters/websockets/ClientAdapterManager.h>
2+
3+
#include <csp/core/Platform.h>
4+
#include <chrono>
5+
#include <iomanip>
6+
#include <iostream>
7+
8+
9+
using websocketpp::lib::placeholders::_1;
10+
using websocketpp::lib::placeholders::_2;
11+
using websocketpp::lib::bind;
12+
13+
namespace csp{
14+
15+
INIT_CSP_ENUM( csp::adapters::websockets::ClientStatusType,
16+
"ACTIVE",
17+
"GENERIC_ERROR",
18+
"CONNECTION_FAILED",
19+
"CLOSED",
20+
);
21+
22+
}
23+
24+
// With TLS
25+
namespace csp::adapters::websockets {
26+
27+
ClientAdapterManager::ClientAdapterManager( csp::Engine* engine, const Dictionary & properties
28+
) : csp::AdapterManager( engine ),
29+
m_active(false),
30+
m_shouldRun(false),
31+
m_endpoint(nullptr),
32+
m_inputAdapter(nullptr),
33+
m_outputAdapter(nullptr),
34+
m_updateAdapter(nullptr),
35+
m_thread(nullptr),
36+
m_threadActive(false),
37+
m_properties(properties)
38+
{
39+
if (m_properties.get<bool>("use_tls")) {
40+
m_endpoint = new WebsocketEndpointTLS(properties);
41+
} else {
42+
m_endpoint = new WebsocketEndpointNoTLS(properties);
43+
}
44+
45+
m_endpoint->setOnMessageCb(std::move([this](std::string msg) {
46+
if (m_inputAdapter != nullptr)
47+
{
48+
csp::PushBatch batch( m_engine -> rootEngine() );
49+
m_inputAdapter->processMessage(msg, &batch);
50+
}
51+
}));
52+
m_endpoint->setOnOpenCb(std::move([this](){
53+
m_active = true;
54+
pushStatus(StatusLevel::INFO, ClientStatusType::ACTIVE, "Connected successfully");
55+
}));
56+
m_endpoint->setOnFailCb(std::move([this](){
57+
m_active = false;
58+
pushStatus(StatusLevel::ERROR, ClientStatusType::CONNECTION_FAILED, "Connection failed, will try to reconnect");
59+
}));
60+
m_endpoint->setOnCloseCb(std::move([this](){
61+
m_active = false;
62+
pushStatus(StatusLevel::INFO, ClientStatusType::CLOSED, "Connection closed");
63+
}));
64+
65+
};
66+
67+
ClientAdapterManager::~ClientAdapterManager()
68+
{
69+
if (m_threadActive) {
70+
m_thread->join();
71+
}
72+
};
73+
74+
void ClientAdapterManager::start( DateTime starttime, DateTime endtime )
75+
{
76+
AdapterManager::start( starttime, endtime );
77+
// start the bg thread
78+
m_threadActive = true;
79+
m_shouldRun = true;
80+
m_thread = std::make_unique<std::thread>( [ this ](){
81+
while (m_shouldRun)
82+
{
83+
m_endpoint->run();
84+
m_active=false;
85+
// pushStatus(StatusLevel::ERROR, ClientStatusType::GENERIC_ERROR, "Reconnecting...");
86+
if(m_shouldRun) std::this_thread::sleep_for( std::chrono::seconds(m_properties.get<binding_int_t>("reconnect_seconds")) );
87+
}
88+
});
89+
};
90+
91+
void ClientAdapterManager::stop() {
92+
AdapterManager::stop();
93+
94+
m_shouldRun=false;
95+
if(m_active) {
96+
m_endpoint->close();
97+
}
98+
99+
if(m_threadActive) {
100+
m_thread->join();
101+
m_threadActive=false;
102+
}
103+
};
104+
105+
PushInputAdapter* ClientAdapterManager::getInputAdapter(CspTypePtr & type, PushMode pushMode, const Dictionary & properties)
106+
{
107+
if (m_inputAdapter == nullptr)
108+
{
109+
m_inputAdapter = m_engine->createOwnedObject<ClientInputAdapter>(
110+
// m_engine,
111+
type,
112+
pushMode,
113+
properties
114+
);
115+
}
116+
return m_inputAdapter;
117+
};
118+
119+
OutputAdapter* ClientAdapterManager::getOutputAdapter()
120+
{
121+
if (m_outputAdapter == nullptr)
122+
{
123+
m_outputAdapter = m_engine->createOwnedObject<ClientOutputAdapter>(m_endpoint);
124+
}
125+
126+
return m_outputAdapter;
127+
}
128+
129+
OutputAdapter * ClientAdapterManager::getHeaderUpdateAdapter()
130+
{
131+
if (m_updateAdapter == nullptr)
132+
{
133+
m_updateAdapter = m_engine->createOwnedObject<ClientHeaderUpdateAdapter>(
134+
m_endpoint->getProperties()
135+
);
136+
}
137+
138+
return m_updateAdapter;
139+
}
140+
141+
DateTime ClientAdapterManager::processNextSimTimeSlice( DateTime time )
142+
{
143+
// no sim data
144+
return DateTime::NONE();
145+
}
146+
147+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
#ifndef _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_ADAPTERMGR_H
2+
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_ADAPTERMGR_H
3+
4+
#include <csp/core/Enum.h>
5+
#include <csp/core/Hash.h>
6+
#include <csp/engine/AdapterManager.h>
7+
#include <csp/engine/Dictionary.h>
8+
#include <csp/engine/PushInputAdapter.h>
9+
#include <thread>
10+
11+
#include <csp/adapters/websockets/ClientInputAdapter.h>
12+
#include <csp/adapters/websockets/ClientOutputAdapter.h>
13+
#include <csp/adapters/websockets/ClientHeaderUpdateAdapter.h>
14+
#include <csp/adapters/websockets/WebsocketEndpoint.h>
15+
16+
namespace csp::adapters::websockets {
17+
18+
using namespace csp;
19+
20+
struct WebsocketClientStatusTypeTraits
21+
{
22+
enum _enum : unsigned char
23+
{
24+
ACTIVE = 0,
25+
GENERIC_ERROR = 1,
26+
CONNECTION_FAILED = 2,
27+
CLOSED = 3,
28+
29+
NUM_TYPES
30+
};
31+
32+
protected:
33+
_enum m_value;
34+
};
35+
36+
using ClientStatusType = csp::Enum<WebsocketClientStatusTypeTraits>;
37+
38+
class ClientAdapterManager final : public csp::AdapterManager
39+
{
40+
41+
42+
public:
43+
ClientAdapterManager(
44+
csp::Engine * engine,
45+
const csp::Dictionary & properties
46+
);
47+
~ClientAdapterManager();
48+
49+
const char * name() const override { return "ClientAdapterManager"; }
50+
51+
void start( DateTime starttime, DateTime endtime ) override;
52+
53+
void stop() override;
54+
55+
PushInputAdapter * getInputAdapter( CspTypePtr & type, PushMode pushMode, const Dictionary & properties );
56+
OutputAdapter * getOutputAdapter();
57+
OutputAdapter * getHeaderUpdateAdapter();
58+
59+
DateTime processNextSimTimeSlice( DateTime time ) override;
60+
61+
private:
62+
// need some client info
63+
64+
bool m_active;
65+
bool m_shouldRun;
66+
WebsocketEndpointBase* m_endpoint;
67+
ClientInputAdapter* m_inputAdapter;
68+
ClientOutputAdapter* m_outputAdapter;
69+
ClientHeaderUpdateAdapter* m_updateAdapter;
70+
std::unique_ptr<std::thread> m_thread;
71+
bool m_threadActive;
72+
Dictionary m_properties;
73+
};
74+
75+
}
76+
77+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#include <csp/adapters/websockets/ClientHeaderUpdateAdapter.h>
2+
#include <iostream>
3+
4+
namespace csp::adapters::websockets {
5+
6+
ClientHeaderUpdateAdapter::ClientHeaderUpdateAdapter(
7+
csp::Engine * engine,
8+
csp::Dictionary& properties
9+
) : csp::OutputAdapter( engine ), m_properties(properties)
10+
{
11+
12+
};
13+
14+
void ClientHeaderUpdateAdapter::executeImpl()
15+
{
16+
std::vector<std::string> value = input() -> lastValueTyped<std::vector<std::string>>();
17+
csp::DictionaryPtr headers = m_properties.get<csp::DictionaryPtr>("headers");
18+
// this is hacky but it works...
19+
for( size_t i = 0; i < value.size(); i+=2)
20+
{
21+
const std::string key = value[i];
22+
const std::string v= value[i+1];
23+
headers->update(key, v);
24+
}
25+
};
26+
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
#ifndef _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_HEADERUPDATEADAPTER_H
2+
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_HEADERUPDATEADAPTER_H
3+
4+
#include <websocketpp/config/asio_client.hpp>
5+
#include <websocketpp/client.hpp>
6+
#include <csp/engine/Dictionary.h>
7+
#include <csp/engine/OutputAdapter.h>
8+
#include <csp/adapters/utils/MessageWriter.h>
9+
10+
namespace csp::adapters::websockets
11+
{
12+
13+
class ClientHeaderUpdateAdapter final: public csp::OutputAdapter
14+
{
15+
public:
16+
ClientHeaderUpdateAdapter(
17+
csp::Engine * engine,
18+
csp::Dictionary& properties
19+
);
20+
21+
void executeImpl() override;
22+
23+
const char * name() const override { return "ClientHeaderUpdateAdapter"; }
24+
25+
private:
26+
csp::Dictionary& m_properties;
27+
28+
};
29+
30+
}
31+
32+
33+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#include <csp/adapters/websockets/ClientInputAdapter.h>
2+
3+
namespace csp::adapters::websockets
4+
{
5+
6+
ClientInputAdapter::ClientInputAdapter(
7+
csp::Engine * engine,
8+
csp::CspTypePtr & type,
9+
csp::PushMode pushMode,
10+
const csp::Dictionary & properties
11+
) : csp::PushInputAdapter(engine, type, pushMode)
12+
{
13+
// TODO: should I support bytes?
14+
if( type -> type() != csp::CspType::Type::STRUCT &&
15+
type -> type() != csp::CspType::Type::STRING )
16+
CSP_THROW( csp::RuntimeException, "Unsupported type: " << type -> type() );
17+
18+
if( properties.exists( "meta_field_map" ) )
19+
{
20+
// const CspStructType & structType = static_cast<const CspStructType &>( *type );
21+
const csp::Dictionary & metaFieldMap = *properties.get<csp::DictionaryPtr>( "meta_field_map" );
22+
23+
if( !metaFieldMap.empty() && type -> type() != csp::CspType::Type::STRUCT )
24+
CSP_THROW( csp::ValueError, "meta_field_map is not supported on non-struct types" );
25+
}
26+
27+
m_converter = csp::adapters::utils::MessageStructConverterCache::instance().create( type, properties );
28+
};
29+
30+
void ClientInputAdapter::processMessage( std::string payload, csp::PushBatch* batch )
31+
{
32+
33+
if( type() -> type() == csp::CspType::Type::STRUCT )
34+
{
35+
auto tick = m_converter -> asStruct( &payload, payload.length() );
36+
pushTick( std::move(tick), batch );
37+
} else if ( type() -> type() == csp::CspType::Type::STRING )
38+
{
39+
pushTick( std::move(payload), batch );
40+
}
41+
42+
}
43+
44+
}

0 commit comments

Comments
 (0)