Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

websocket client adapter #152

Merged
merged 1 commit into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ option(CSP_USE_LD_CLASSIC_MAC "On macOS, link with ld_classic" OFF)
# Extension options
option(CSP_BUILD_KAFKA_ADAPTER "Build kafka adapter" ON)
option(CSP_BUILD_PARQUET_ADAPTER "Build parquet adapter" ON)
option(CSP_BUILD_WS_CLIENT_ADAPTER "Build ws client adapter" ON)

# Normalize build type for downstream comparisons
string(TOLOWER "${CMAKE_BUILD_TYPE}" CMAKE_BUILD_TYPE_LOWER)
Expand Down
815 changes: 525 additions & 290 deletions NOTICE

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions conda/dev-environment-unix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ dependencies:
- isort
- libarrow=15
- librdkafka
- libboost-headers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wrieg123 alphabetize!

- lz4-c
- mamba
- mdformat
Expand Down Expand Up @@ -47,4 +48,5 @@ dependencies:
- twine
- unzip
- wheel
- websocketpp
- zip
4 changes: 4 additions & 0 deletions cpp/csp/adapters/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@ if(CSP_BUILD_PARQUET_ADAPTER)
add_subdirectory(parquet)
endif()

if(CSP_BUILD_WS_CLIENT_ADAPTER)
add_subdirectory(websocket)
endif()

add_subdirectory(utils)
41 changes: 41 additions & 0 deletions cpp/csp/adapters/websocket/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
csp_autogen( csp.adapters.websocket_types websocket_types WEBSOCKET_HEADER WEBSOCKET_SOURCE )

set(WS_CLIENT_HEADER_FILES
ClientAdapterManager.h
ClientInputAdapter.h
ClientOutputAdapter.h
ClientHeaderUpdateAdapter.h
WebsocketEndpoint.h
${WEBSOCKET_HEADER}
)

set(WS_CLIENT_SOURCE_FILES
ClientAdapterManager.cpp
ClientInputAdapter.cpp
ClientOutputAdapter.cpp
ClientHeaderUpdateAdapter.cpp
WebsocketEndpoint.cpp
${WS_CLIENT_HEADER_FILES}
${WEBSOCKET_SOURCE}
)

add_library(csp_websocket_client_adapter STATIC ${WS_CLIENT_SOURCE_FILES})
set_target_properties(csp_websocket_client_adapter PROPERTIES PUBLIC_HEADER "${WS_CLIENT_SOURCE_FILES}")

find_package(websocketpp REQUIRED)
#set(OPENSSL_USE_STATIC_LIBS TRUE)
find_package(OpenSSL REQUIRED)

target_link_libraries(csp_websocket_client_adapter
PRIVATE
csp_adapter_utils
${OPENSSL_SSL_LIBRARY}
${OPENSSL_CRYPTO_LIBRARY}
websocketpp::websocketpp
)

install(TARGETS csp_websocket_client_adapter
PUBLIC_HEADER DESTINATION include/csp/adapters/websocket
RUNTIME DESTINATION bin/
LIBRARY DESTINATION lib/
)
130 changes: 130 additions & 0 deletions cpp/csp/adapters/websocket/ClientAdapterManager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#include <csp/adapters/websocket/ClientAdapterManager.h>

#include <csp/core/Platform.h>
#include <chrono>
#include <iomanip>
#include <iostream>

namespace csp {

INIT_CSP_ENUM( adapters::websocket::ClientStatusType,
"ACTIVE",
"GENERIC_ERROR",
"CONNECTION_FAILED",
"CLOSED",
"MESSAGE_SEND_FAIL",
);

}

// With TLS
namespace csp::adapters::websocket {

ClientAdapterManager::ClientAdapterManager( Engine* engine, const Dictionary & properties )
: AdapterManager( engine ),
m_active( false ),
m_shouldRun( false ),
m_endpoint( nullptr ),
m_inputAdapter( nullptr ),
m_outputAdapter( nullptr ),
m_updateAdapter( nullptr ),
m_thread( nullptr ),
m_properties( properties )
{
if( m_properties.get<bool>( "use_tls" ) )
{
m_endpoint = new WebsocketEndpointTLS( properties );
}
else
{
m_endpoint = new WebsocketEndpointNoTLS( properties );
}

if( m_inputAdapter != nullptr )
{
m_endpoint -> setOnMessageCb( [ this ]( std::string msg ) {
PushBatch batch( m_engine -> rootEngine() );
m_inputAdapter -> processMessage( msg, &batch );
});
}
m_endpoint -> setOnOpenCb( [ this ]() {
m_active = true;
pushStatus( StatusLevel::INFO, ClientStatusType::ACTIVE, "Connected successfully" );
});
m_endpoint -> setOnFailCb( [ this ]() {
m_active = false;
pushStatus( StatusLevel::ERROR, ClientStatusType::CONNECTION_FAILED, "Connection failed, will try to reconnect" );
});
m_endpoint -> setOnCloseCb( [ this ]() {
m_active = false;
pushStatus( StatusLevel::INFO, ClientStatusType::CLOSED, "Connection closed" );
});
m_endpoint -> setOnSendFailCb( [ this ]( const std::string& s ) {
std::stringstream ss;
ss << "Failed to send: " << s;
pushStatus( StatusLevel::ERROR, ClientStatusType::MESSAGE_SEND_FAIL, ss.str() );
});

};

ClientAdapterManager::~ClientAdapterManager()
{ };

void ClientAdapterManager::start( DateTime starttime, DateTime endtime )
{
AdapterManager::start( starttime, endtime );
// start the bg thread
m_shouldRun = true;
m_thread = std::make_unique<std::thread>( [ this ]() {
while( m_shouldRun )
{
m_endpoint -> run();
m_active = false;
if( m_shouldRun ) sleep( m_properties.get<TimeDelta>( "reconnect_interval" ) );
}
});
};

void ClientAdapterManager::stop() {
AdapterManager::stop();

m_shouldRun=false;
if( m_active ) m_endpoint->close();
if( m_thread ) m_thread->join();
};

PushInputAdapter* ClientAdapterManager::getInputAdapter(CspTypePtr & type, PushMode pushMode, const Dictionary & properties)
{
if (m_inputAdapter == nullptr)
{
m_inputAdapter = m_engine -> createOwnedObject<ClientInputAdapter>(
// m_engine,
type,
pushMode,
properties
);
}
return m_inputAdapter;
};

OutputAdapter* ClientAdapterManager::getOutputAdapter()
{
if (m_outputAdapter == nullptr) m_outputAdapter = m_engine -> createOwnedObject<ClientOutputAdapter>(m_endpoint);

return m_outputAdapter;
}

OutputAdapter * ClientAdapterManager::getHeaderUpdateAdapter()
{
if (m_updateAdapter == nullptr) m_updateAdapter = m_engine -> createOwnedObject<ClientHeaderUpdateOutputAdapter>( m_endpoint -> getProperties() );

return m_updateAdapter;
}

DateTime ClientAdapterManager::processNextSimTimeSlice( DateTime time )
{
// no sim data
return DateTime::NONE();
}

}
77 changes: 77 additions & 0 deletions cpp/csp/adapters/websocket/ClientAdapterManager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#ifndef _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_ADAPTERMGR_H
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_ADAPTERMGR_H

#include <csp/core/Enum.h>
#include <csp/core/Hash.h>
#include <csp/engine/AdapterManager.h>
#include <csp/engine/Dictionary.h>
#include <csp/engine/PushInputAdapter.h>
#include <thread>

#include <csp/adapters/websocket/ClientInputAdapter.h>
#include <csp/adapters/websocket/ClientOutputAdapter.h>
#include <csp/adapters/websocket/ClientHeaderUpdateAdapter.h>
#include <csp/adapters/websocket/WebsocketEndpoint.h>

namespace csp::adapters::websocket {

using namespace csp;

struct WebsocketClientStatusTypeTraits
{
enum _enum : unsigned char
{
ACTIVE = 0,
GENERIC_ERROR = 1,
CONNECTION_FAILED = 2,
CLOSED = 3,
MESSAGE_SEND_FAIL = 4,

NUM_TYPES
};

protected:
_enum m_value;
};

using ClientStatusType = Enum<WebsocketClientStatusTypeTraits>;

class ClientAdapterManager final : public AdapterManager
{


public:
ClientAdapterManager(
Engine * engine,
const Dictionary & properties
);
~ClientAdapterManager();

const char * name() const override { return "WebsocketClientAdapterManager"; }

void start( DateTime starttime, DateTime endtime ) override;

void stop() override;

PushInputAdapter * getInputAdapter( CspTypePtr & type, PushMode pushMode, const Dictionary & properties );
OutputAdapter * getOutputAdapter();
OutputAdapter * getHeaderUpdateAdapter();

DateTime processNextSimTimeSlice( DateTime time ) override;

private:
// need some client info

bool m_active;
bool m_shouldRun;
WebsocketEndpointBase* m_endpoint;
ClientInputAdapter* m_inputAdapter;
ClientOutputAdapter* m_outputAdapter;
ClientHeaderUpdateOutputAdapter* m_updateAdapter;
std::unique_ptr<std::thread> m_thread;
Dictionary m_properties;
};

}

#endif
20 changes: 20 additions & 0 deletions cpp/csp/adapters/websocket/ClientHeaderUpdateAdapter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include <csp/adapters/websocket/ClientHeaderUpdateAdapter.h>

namespace csp::adapters::websocket {

ClientHeaderUpdateOutputAdapter::ClientHeaderUpdateOutputAdapter(
Engine * engine,
Dictionary& properties
) : OutputAdapter( engine ), m_properties( properties )
{ };

void ClientHeaderUpdateOutputAdapter::executeImpl()
{
DictionaryPtr headers = m_properties.get<DictionaryPtr>("headers");
for( auto& update : input() -> lastValueTyped<std::vector<WebsocketHeaderUpdate::Ptr>>() )
{
if( update -> key_isSet() && update -> value_isSet() ) headers->update( update->key(), update->value() );
}
};

}
35 changes: 35 additions & 0 deletions cpp/csp/adapters/websocket/ClientHeaderUpdateAdapter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#ifndef _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_HEADERUPDATEADAPTER_H
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_HEADERUPDATEADAPTER_H

#include <websocketpp/config/asio_client.hpp>
#include <websocketpp/client.hpp>
#include <csp/engine/Dictionary.h>
#include <csp/engine/OutputAdapter.h>
#include <csp/adapters/utils/MessageWriter.h>
#include <csp/adapters/websocket/csp_autogen/websocket_types.h>

namespace csp::adapters::websocket
{
using namespace csp::autogen;

class ClientHeaderUpdateOutputAdapter final: public OutputAdapter
{
public:
ClientHeaderUpdateOutputAdapter(
Engine * engine,
Dictionary& properties
);

void executeImpl() override;

const char * name() const override { return "WebsocketClientHeaderUpdateAdapter"; }

private:
Dictionary& m_properties;

};

}


#endif
42 changes: 42 additions & 0 deletions cpp/csp/adapters/websocket/ClientInputAdapter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#include <csp/adapters/websocket/ClientInputAdapter.h>

namespace csp::adapters::websocket
{

ClientInputAdapter::ClientInputAdapter(
Engine * engine,
CspTypePtr & type,
PushMode pushMode,
const Dictionary & properties
) : PushInputAdapter(engine, type, pushMode)
{
if( type -> type() != CspType::Type::STRUCT &&
type -> type() != CspType::Type::STRING )
CSP_THROW( RuntimeException, "Unsupported type: " << type -> type() );

if( properties.exists( "meta_field_map" ) )
{
const Dictionary & metaFieldMap = *properties.get<DictionaryPtr>( "meta_field_map" );

if( !metaFieldMap.empty() && type -> type() != CspType::Type::STRUCT )
CSP_THROW( ValueError, "meta_field_map is not supported on non-struct types" );
}

m_converter = adapters::utils::MessageStructConverterCache::instance().create( type, properties );
};

void ClientInputAdapter::processMessage( std::string payload, PushBatch* batch )
{

if( type() -> type() == CspType::Type::STRUCT )
{
auto tick = m_converter -> asStruct( &payload, payload.length() );
pushTick( std::move(tick), batch );
} else if ( type() -> type() == CspType::Type::STRING )
{
pushTick( std::move(payload), batch );
}

}

}
Loading