diff --git a/MessageControl/MessageControl.cpp b/MessageControl/MessageControl.cpp index c452c5c57a..8119e0d297 100644 --- a/MessageControl/MessageControl.cpp +++ b/MessageControl/MessageControl.cpp @@ -21,291 +21,11 @@ #include "MessageOutput.h" namespace WPEFramework { -namespace Plugin { - SERVICE_REGISTRATION(MessageControl, 1, 0); - - Core::ProxyPoolType jsonExportDataFactory(2); - constexpr uint16_t MAX_CONNECTIONS = 5; - - class WebSocketExporter : public Messaging::IMessageOutput { - - public: - class ExportCommand : public Core::JSON::Container { - - public: - ExportCommand(const ExportCommand&) = delete; - ExportCommand& operator=(const ExportCommand&) = delete; - - ExportCommand() - : Core::JSON::Container() - , Filename() - , Identifier() - , Category() - , IncludingDate() - , Paused() - { - Add(_T("filename"), &Filename); - Add(_T("identifier"), &Identifier); - Add(_T("category"), &Category); - Add(_T("includingdate"), &IncludingDate); - Add(_T("paused"), &Paused); - } - - ~ExportCommand() override = default; - - public: - Core::JSON::Boolean Filename; - Core::JSON::Boolean Identifier; - Core::JSON::Boolean Category; - Core::JSON::Boolean IncludingDate; - Core::JSON::Boolean Paused; - }; - - public: - class MessageChannelOutput : public Messaging::JSONOutput { - - public: - MessageChannelOutput(const MessageChannelOutput&) = delete; - MessageChannelOutput& operator=(const MessageChannelOutput&) = delete; - - explicit MessageChannelOutput(PluginHost::Channel& channel) - : JSONOutput(channel) - { - } - ~MessageChannelOutput() = default; - - Core::ProxyType Process(const Core::ProxyType& element); - Core::ProxyType GetDataContainer() override - { - return jsonExportDataFactory.Element(); - } - }; - - public: - WebSocketExporter(const WebSocketExporter& copy) = delete; - WebSocketExporter& operator=(const WebSocketExporter&) = delete; - - explicit WebSocketExporter(const uint32_t maxConnections = MAX_CONNECTIONS) - : _messageChannelOutputs() - , _lock() - , _maxExportConnections(maxConnections) - { - } - - ~WebSocketExporter() = default; - - bool Activate(PluginHost::Channel& channel) - { - - bool accepted = false; - - _lock.Lock(); - - if ((_maxExportConnections != 0) && (_maxExportConnections - _messageChannelOutputs.size() > 0)) { - ASSERT(0 == _messageChannelOutputs.count(channel.Id())); - _messageChannelOutputs.emplace(std::make_pair(channel.Id(), new MessageChannelOutput(channel))); - accepted = true; - } - - _lock.Unlock(); - - return accepted; - } - - bool Deactivate(PluginHost::Channel& channel) - { - - bool deactivated = false; - - _lock.Lock(); - - if (_messageChannelOutputs.count(channel.Id() != 0)) { - _messageChannelOutputs.erase(channel.Id()); - deactivated = true; - } - - _lock.Unlock(); - - return deactivated; - } - - Core::ProxyType HandleExportCommand(const uint32_t ID, const Core::ProxyType& element); - void Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) - { - _lock.Lock(); - - for (auto& item : _messageChannelOutputs) { - item.second->Output(info, message); - } - - _lock.Unlock(); - } - - private: - std::unordered_map> _messageChannelOutputs; - mutable Core::CriticalSection _lock; - const uint32_t _maxExportConnections; - }; - - static Core::ProxyPoolType jsonExportCommandFactory(2); - - Core::ProxyType WebSocketExporter::HandleExportCommand(const uint32_t ID, - const Core::ProxyType& element) - { - - Core::ProxyType response; - - _lock.Lock(); - - auto index = _messageChannelOutputs.find(ID); - if (index != _messageChannelOutputs.end()) { - response = index->second->Process(element); - } - - _lock.Unlock(); - - return response; - } - - Core::ProxyType WebSocketExporter::MessageChannelOutput::Process(const Core::ProxyType& element) - { - Core::ProxyType inbound(element); - - ASSERT(inbound.IsValid() == true); - - ExtraOutputOptions options = OutputOptions(); - - if (inbound->Filename.IsSet() == true) { - if (inbound->Filename == true) { - options = static_cast(AsNumber(options) | AsNumber(ExtraOutputOptions::LINENUMBER)); - } else { - options = static_cast(AsNumber(options) & ~AsNumber(ExtraOutputOptions::LINENUMBER)); - } - } - - if (inbound->Identifier.IsSet() == true) { - if (inbound->Identifier == true) { - options = static_cast(AsNumber(options) | AsNumber(ExtraOutputOptions::MODULE)); - } else { - options = static_cast(AsNumber(options) & ~AsNumber(ExtraOutputOptions::MODULE)); - } - } - - if (inbound->Category.IsSet() == true) { - if (inbound->Category == true) { - options = static_cast(AsNumber(options) | AsNumber(ExtraOutputOptions::CATEGORY)); - } else { - options = static_cast(AsNumber(options) & ~AsNumber(ExtraOutputOptions::CATEGORY)); - } - } - - if (inbound->IncludingDate.IsSet() == true) { - if (inbound->IncludingDate == true) { - options = static_cast(AsNumber(options) | AsNumber(ExtraOutputOptions::INCLUDINGDATE)); - } else { - options = static_cast(AsNumber(options) & ~AsNumber(ExtraOutputOptions::INCLUDINGDATE)); - } - } - - OutputOptions(options); - - if (inbound->Paused.IsSet() == true) { - _paused = inbound->Paused; - } - - Core::ProxyType response(jsonExportCommandFactory.Element()); - - response->Filename = ((AsNumber(options) & AsNumber(ExtraOutputOptions::FILENAME)) != 0); - response->Identifier = ((AsNumber(options) & AsNumber(ExtraOutputOptions::MODULE)) != 0); - response->Category = ((AsNumber(options) & AsNumber(ExtraOutputOptions::CATEGORY)) != 0); - response->IncludingDate = ((AsNumber(options) & AsNumber(ExtraOutputOptions::INCLUDINGDATE)) != 0); - response->Paused = IsPaused(); - - return (Core::ProxyType(response)); - } - - MessageControl::Observer::Observer(MessageControl& parent) - : _adminLock() - , _parent(parent) - , _activationIds() - , _deactivationIds() - , _job(*this) - { - } - - void MessageControl::Observer::Activated(RPC::IRemoteConnection* connection) - { - ASSERT(connection != nullptr); - Core::SafeSyncType guard(_adminLock); - _activationIds.push_back(connection->Id()); - _job.Submit(); - } - void MessageControl::Observer::Deactivated(RPC::IRemoteConnection* connection) - { - ASSERT(connection != nullptr); - Core::SafeSyncType guard(_adminLock); - _deactivationIds.push_back(connection->Id()); - _job.Submit(); - } - - void MessageControl::Observer::Dispatch() - { - _adminLock.Lock(); - - while (_activationIds.size() > 0) { - _parent.Activated(_activationIds.back()); - _activationIds.pop_back(); - } - - while (_deactivationIds.size() > 0) { - _parent.Deactivated(_deactivationIds.back()); - _deactivationIds.pop_back(); - } - - _adminLock.Unlock(); - } - - MessageControl::MessageOutputNotification::MessageOutputNotification(MessageControl& parent) - : _parent(parent) - { - } - void MessageControl::MessageOutputNotification::ReceiveRawMessage(const Exchange::IMessageControl::MessageType type, const string& category, - const string& module, const string& fileName, - const uint16_t lineNumber, const uint64_t timestamp, - const string& message) - { - //yikes, recreating stuff from received pieces - Messaging::TextMessage textMessage(message); - Core::Messaging::Information info(static_cast(type), category, module, fileName, lineNumber, timestamp); - _parent._webSocketExporter->Output(info, &textMessage); - } - - MessageControl::ComNotificationSink::ComNotificationSink(MessageControl& parent) - : _parent(parent) - { - } - - void MessageControl::ComNotificationSink::CleanedUp(const Core::IUnknown*, const uint32_t) - { - } - void MessageControl::ComNotificationSink::Revoked(const Core::IUnknown* remote, const uint32_t interfaceId) - { - TRACE(Trace::Information, (_T("Revoking an interface: %d [%X] on object: [%s]"), interfaceId, interfaceId, typeid(*remote).name())); - - // Something happened to the other side - ASSERT(interfaceId != Exchange::ID_MESSAGE_CONTROL); - - if (interfaceId == Exchange::ID_MESSAGE_CONTROL_NOTIFICATION) { - auto result = remote->QueryInterface(); - ASSERT(result != nullptr); - - _parent.OnRevoke(result); - - result->Release(); - } - } + namespace Plugin { + SERVICE_REGISTRATION(MessageControl, 1, 0); + MessageControl::Config::NetworkNode::NetworkNode() : Core::JSON::Container() , Port(2200) @@ -314,6 +34,7 @@ namespace Plugin { Add(_T("port"), &Port); Add(_T("binding"), &Binding); } + MessageControl::Config::NetworkNode::NetworkNode(const NetworkNode& copy) : Core::JSON::Container() , Port(copy.Port) @@ -323,75 +44,61 @@ namespace Plugin { Add(_T("binding"), &Binding); } - MessageControl::Config::Config() - : Core::JSON::Container() - , Console(true) - , SysLog(true) - , FileName() - , Abbreviated(true) - , MaxExportConnections(1) - , Remote() - { - Add(_T("console"), &Console); - Add(_T("syslog"), &SysLog); - Add(_T("filepath"), &FileName); - Add(_T("abbreviated"), &Abbreviated); - Add(_T("maxexportconnections"), &MaxExportConnections); - Add(_T("remote"), &Remote); - } - MessageControl::MessageControl() - : _connectionId(0) - , _service(nullptr) + : _adminLock() + , _config() + , _outputDirector() + , _webSocketExporter() , _control(nullptr) , _observer(*this) - , _outputNotification(*this) - , _comSink(*this) - , _webSocketExporter(nullptr) - , _fullOutputFilePath(_T("")) - , _maxExportConnections(1) - { + , _connectionId(0) + , _service(nullptr) { } const string MessageControl::Initialize(PluginHost::IShell* service) { + string message; + ASSERT(service != nullptr); - ASSERT(_control == nullptr); - ASSERT(_service == nullptr); - ASSERT(_connectionId == 0); - string message; + _config.Clear(); + _config.FromString(service->ConfigLine()); _service = service; _service->AddRef(); - Config config; - config.FromString(service->ConfigLine()); - _fullOutputFilePath = service->VolatilePath() + config.FileName.Value(); - _maxExportConnections = config.MaxExportConnections.IsSet() ? config.MaxExportConnections.Value() : MAX_CONNECTIONS; + RegisterAll(); + + _service->Register(&_observer); + + // Lets see if we can create the Message catcher... + _control = _service->Root(_connectionId, RPC::CommunicationTimeOut, _T("MessageControlImplementation")); - _control = service->Root(_connectionId, RPC::CommunicationTimeOut, _T("MessageControlImplementation")); if (_control == nullptr) { message = _T("MessageControl plugin could not be instantiated."); - } else { - RegisterAll(); - - _service->Register(&_observer); - _service->Register(&_comSink); - - if (_control->Configure(service->Background(), - config.Abbreviated.Value(), - config.Console.Value(), - config.SysLog.Value(), - _fullOutputFilePath, - config.Remote.IsSet() ? config.Remote.Binding.Value() : _T(""), - config.Remote.IsSet() ? config.Remote.Port.Value() : 0) - != Core::ERROR_NONE) { - message = _T("MessageControl plugin could not be instantiated."); - } else { - _webSocketExporter.reset(new WebSocketExporter(_maxExportConnections)); - _control->RegisterOutputNotification(&_outputNotification); + } + else if (_control->Configure(&_observer) != Core::ERROR_NONE) { + message = _T("MessageControl plugin could not be _configured."); + } + else { + if ((service->Background() == false) && (((_config.SysLog.IsSet() == false) && (_config.Console.IsSet() == false)) || (_config.Console.Value() == true))) { + Announce(Core::Messaging::MetaData::MessageType::TRACING, std::make_shared(_config.Abbreviated.Value())); } + if ((service->Background() == true) && (((_config.SysLog.IsSet() == false) && (_config.Console.IsSet() == false)) || (_config.SysLog.Value() == true))) { + Announce(Core::Messaging::MetaData::MessageType::TRACING, std::make_shared(_config.Abbreviated.Value())); + } + if (_config.FileName.Value().empty() == false) { + _config.FileName = service->VolatilePath() + _config.FileName.Value(); + + Announce(Core::Messaging::MetaData::MessageType::TRACING, std::make_shared(_config.Abbreviated.Value(), _config.FileName.Value())); + } + if ((_config.Remote.Binding.Value().empty() == false) && (_config.Remote.Port.Value() != 0)) { + std::shared_ptr output = std::make_shared(Core::NodeId(_config.Remote.NodeId())); + Announce(Core::Messaging::MetaData::MessageType::TRACING, output); + Announce(Core::Messaging::MetaData::MessageType::LOGGING, output); + } + + _webSocketExporter.Initialize(service, _config.MaxExportConnections.Value()); } if(message.length() != 0) { @@ -403,35 +110,38 @@ namespace Plugin { void MessageControl::Deinitialize(PluginHost::IShell* service) { - ASSERT(service == _service); - - if (_control != nullptr) { - UnregisterAll(); - - service->Unregister(&_observer); - service->Unregister(&_comSink); - - _control->UnregisterOutputNotification(&_outputNotification); - _webSocketExporter.reset(); - - RPC::IRemoteConnection* connection(_service->RemoteConnection(_connectionId)); - - VARIABLE_IS_NOT_USED uint32_t result = _control->Release(); - _control = nullptr; - // It should have been the last reference we are releasing, - // so it should endup in a DESTRUCTION_SUCCEEDED, if not we - // are leaking... - ASSERT(result == Core::ERROR_DESTRUCTION_SUCCEEDED); - // The process can disappear in the meantime... - if (connection != nullptr) { - // But if it did not dissapear in the meantime, forcefully terminate it. Shoot to kill :-) - connection->Terminate(); - connection->Release(); - } - } + ASSERT (_service == service); + + UnregisterAll(); + + _service->Unregister(&_observer); + + _adminLock.Lock(); + _outputDirector.clear(); + _webSocketExporter.Deinitialize(); + + _adminLock.Unlock(); + + RPC::IRemoteConnection* connection(_service->RemoteConnection(_connectionId)); + + VARIABLE_IS_NOT_USED uint32_t result = _control->Release(); + _control = nullptr; + + // It should have been the last reference we are releasing, + // so it should endup in a DESTRUCTION_SUCCEEDED, if not we + // are leaking... + ASSERT(result == Core::ERROR_DESTRUCTION_SUCCEEDED); + + // The process can disappear in the meantime... + if (connection != nullptr) { + // But if it did not dissapear in the meantime, forcefully terminate it. Shoot to kill :-) + connection->Terminate(); + connection->Release(); + } _service->Release(); _service = nullptr; + _connectionId = 0; } @@ -441,48 +151,26 @@ namespace Plugin { return (string()); } - void MessageControl::Activated(const uint32_t id) - { - ASSERT(_control != nullptr); - _control->RegisterConnection(id); - } - void MessageControl::Deactivated(const uint32_t id) - { - ASSERT(_control != nullptr); - _control->UnregisterConnection(id); - if (_connectionId == id) { - ASSERT(_service != nullptr); - Core::IWorkerPool::Instance().Submit(PluginHost::IShell::Job::Create(_service, PluginHost::IShell::DEACTIVATED, PluginHost::IShell::FAILURE)); - } - } - - void MessageControl::OnRevoke(const Exchange::IMessageControl::INotification* remote) - { - if (_control != nullptr) { - _control->UnregisterOutputNotification(remote); - } - } - bool MessageControl::Attach(PluginHost::Channel& channel) { - TRACE(Trace::Information, (Core::Format(_T("Activating channel ID [%d]"), channel.Id()).c_str())); - return _webSocketExporter->Activate(channel); + TRACE(Trace::Information, (Core::Format(_T("Attaching channel ID [%d]"), channel.Id()).c_str())); + return (_webSocketExporter.Attach(channel.Id())); } void MessageControl::Detach(PluginHost::Channel& channel) { - TRACE(Trace::Information, (Core::Format(_T("Deactivating channel ID [%d]"), channel.Id()).c_str())); - _webSocketExporter->Deactivate(channel); + TRACE(Trace::Information, (Core::Format(_T("Detaching channel ID [%d]"), channel.Id()).c_str())); + _webSocketExporter.Detach(channel.Id()); } Core::ProxyType MessageControl::Inbound(const string&) { - return (Core::ProxyType(jsonExportCommandFactory.Element())); + return (_webSocketExporter.Command()); } Core::ProxyType MessageControl::Inbound(const uint32_t ID, const Core::ProxyType& element) { - return (Core::ProxyType(_webSocketExporter->HandleExportCommand(ID, element))); + return (Core::ProxyType(_webSocketExporter.Received(ID, element))); } } // namespace Plugin diff --git a/MessageControl/MessageControl.h b/MessageControl/MessageControl.h index e865f148c7..8511e996b7 100644 --- a/MessageControl/MessageControl.h +++ b/MessageControl/MessageControl.h @@ -20,73 +20,13 @@ #pragma once #include "Module.h" +#include "MessageOutput.h" namespace WPEFramework { namespace Plugin { - class WebSocketExporter; - class MessageControl : public PluginHost::JSONRPC, public PluginHost::IPluginExtended, public PluginHost::IWebSocket { private: - class Observer : public RPC::IRemoteConnection::INotification { - public: - explicit Observer(MessageControl& parent); - - void Activated(RPC::IRemoteConnection* connection) override; - void Deactivated(RPC::IRemoteConnection* connection) override; - - private: - friend class Core::ThreadPool::JobType; - - void Dispatch(); - - BEGIN_INTERFACE_MAP(Observer) - INTERFACE_ENTRY(RPC::IRemoteConnection::INotification) - END_INTERFACE_MAP - - Core::CriticalSection _adminLock; - MessageControl& _parent; - - std::list _activationIds; - std::list _deactivationIds; - Core::WorkerPool::JobType _job; - }; - - class MessageOutputNotification : public Exchange::IMessageControl::INotification { - public: - explicit MessageOutputNotification(MessageControl& parent); - - void ReceiveRawMessage(const Exchange::IMessageControl::MessageType type, const string& category, - const string& module, const string& fileName, - const uint16_t lineNumber, const uint64_t timestamp, - const string& message); - - private: - BEGIN_INTERFACE_MAP(MessageOutputNotification) - INTERFACE_ENTRY(Exchange::IMessageControl::INotification) - END_INTERFACE_MAP - - MessageControl& _parent; - }; - - class ComNotificationSink : public PluginHost::IShell::ICOMLink::INotification { - public: - ComNotificationSink() = delete; - explicit ComNotificationSink(MessageControl& parent); - - ~ComNotificationSink() override = default; - ComNotificationSink(const ComNotificationSink&) = delete; - ComNotificationSink& operator=(const ComNotificationSink&) = delete; - - BEGIN_INTERFACE_MAP(Notification) - INTERFACE_ENTRY(PluginHost::IShell::ICOMLink::INotification) - END_INTERFACE_MAP - - void CleanedUp(const Core::IUnknown*, const uint32_t) override; - void Revoked(const Core::IUnknown* remote, const uint32_t interfaceId) override; - - private: - MessageControl& _parent; - }; + using OutputMap = std::unordered_map>>; class Config : public Core::JSON::Container { private: @@ -96,13 +36,33 @@ namespace Plugin { NetworkNode(const NetworkNode& copy); ~NetworkNode() = default; + public: + Core::NodeId NodeId() const { + return (Core::NodeId(Binding.Value().c_str(), Port.Value())); + } + public: Core::JSON::DecUInt16 Port; Core::JSON::String Binding; }; public: - Config(); + Config() + : Core::JSON::Container() + , Console(false) + , SysLog(false) + , FileName() + , Abbreviated(true) + , MaxExportConnections(Publishers::WebSocketOutput::DefaultMaxConnections) + , Remote() + { + Add(_T("console"), &Console); + Add(_T("syslog"), &SysLog); + Add(_T("filepath"), &FileName); + Add(_T("abbreviated"), &Abbreviated); + Add(_T("maxexportconnections"), &MaxExportConnections); + Add(_T("remote"), &Remote); + } ~Config() = default; Config(const Config&) = delete; @@ -116,6 +76,136 @@ namespace Plugin { NetworkNode Remote; }; + class Observer + : public RPC::IRemoteConnection::INotification + , public Exchange::IMessageControl::ICallback { + private: + enum state { + ATTACHING, + DETACHING, + OBSERVING + }; + using ObservingMap = std::unordered_map; + + public: + Observer() = delete; + Observer(const Observer&) = delete; + Observer& operator= (const Observer&) = delete; + + explicit Observer(MessageControl& parent) + : _parent(parent) + , _adminLock() + , _observing() + , _job(*this) { + } + ~Observer() override { + _job.Revoke(); + } + + public: + // + // Exchange::IMessageControl::INotification + // ---------------------------------------------------------- + void Message(const Exchange::IMessageControl::MessageType type, const string& category, + const string& module, const string& fileName, + const uint16_t lineNumber, const uint64_t timestamp, + const string& message) override { + + //yikes, recreating stuff from received pieces + Messaging::TextMessage textMessage(message); + Core::Messaging::Information info(static_cast(type), category, module, fileName, lineNumber, timestamp); + _parent.Output(info, &textMessage); + } + + // + // RPC::IRemoteConnection::INotification + // ---------------------------------------------------------- + void Activated(RPC::IRemoteConnection* connection) override { + + uint32_t id = connection->Id(); + + _adminLock.Lock(); + + // Seems the ID is already in here, thats odd, and impossible :-) + ObservingMap::iterator index = _observing.find(id); + + if (index == _observing.end()) { + _observing.emplace(std::piecewise_construct, + std::make_tuple(id), + std::make_tuple(state::ATTACHING)); + } + else if (index->second == state::DETACHING) { + index->second = state::OBSERVING; + } + + _adminLock.Unlock(); + + _job.Submit(); + } + void Deactivated(RPC::IRemoteConnection* connection) override { + + uint32_t id = connection->Id(); + + _adminLock.Lock(); + + // Seems the ID is already in here, thats odd, and impossible :-) + ObservingMap::iterator index = _observing.find(id); + + if (index != _observing.end()) { + if (index->second == state::ATTACHING) { + _observing.erase(index); + } + else if (index->second == state::OBSERVING) { + _observing.emplace(std::piecewise_construct, + std::make_tuple(id), + std::make_tuple(state::DETACHING)); + } + } + + _adminLock.Unlock(); + + _job.Submit(); + } + + BEGIN_INTERFACE_MAP(Observer) + INTERFACE_ENTRY(RPC::IRemoteConnection::INotification) + INTERFACE_ENTRY(Exchange::IMessageControl::ICallback) + END_INTERFACE_MAP + + private: + friend class Core::ThreadPool::JobType; + + void Dispatch() + { + _adminLock.Lock(); + + ObservingMap::iterator index = _observing.begin(); + + while (index != _observing.end()) { + if (index->second == state::ATTACHING) { + index->second = state::OBSERVING; + _parent.Attach(index->first); + index++; + } + else if (index->second == state::DETACHING) { + _parent.Detach(index->first); + index = _observing.erase(index); + } + else { + index++; + } + } + + _adminLock.Unlock(); + } + + private: + MessageControl& _parent; + Core::CriticalSection _adminLock; + ObservingMap _observing; + Core::WorkerPool::JobType _job; + }; + public: MessageControl(const MessageControl&) = delete; MessageControl& operator=(const MessageControl&) = delete; @@ -124,11 +214,10 @@ namespace Plugin { ~MessageControl() override = default; BEGIN_INTERFACE_MAP(MessageControl) - INTERFACE_ENTRY(PluginHost::IPlugin) - INTERFACE_ENTRY(PluginHost::IDispatcher) - INTERFACE_ENTRY(PluginHost::IPluginExtended) - INTERFACE_ENTRY(PluginHost::IWebSocket) - INTERFACE_AGGREGATE(Exchange::IMessageControl, _control) + INTERFACE_ENTRY(PluginHost::IPlugin) + INTERFACE_ENTRY(PluginHost::IDispatcher) + INTERFACE_ENTRY(PluginHost::IPluginExtended) + INTERFACE_ENTRY(PluginHost::IWebSocket) END_INTERFACE_MAP public: @@ -148,20 +237,63 @@ namespace Plugin { uint32_t endpoint_status(const JsonData::MessageControl::StatusParamsData& params, JsonData::MessageControl::StatusResultData& response); private: - void OnRevoke(const Exchange::IMessageControl::INotification* remote); - void Activated(const uint32_t id); - void Deactivated(const uint32_t id); + void Announce(Core::Messaging::MetaData::MessageType type, const std::shared_ptr& output) { + + _adminLock.Lock(); + + OutputMap::iterator index = _outputDirector.find(type); + + if (index == _outputDirector.end()) { + index = _outputDirector.emplace(std::piecewise_construct, + std::make_tuple(type), + std::make_tuple()).first; + } + + index->second.emplace_back(output); + + _adminLock.Unlock(); + } + void Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) { + // Time to start sending it to all interested parties... + _adminLock.Lock(); + + OutputMap::iterator index = _outputDirector.find(info.MessageMetaData().Type()); + if (index != _outputDirector.end()) { + for (const auto& entry : index->second) { + entry->Output(info, message); + } + } + + _webSocketExporter.Output(info, message); + + _adminLock.Unlock(); + } + void Attach(const uint32_t id) { + _adminLock.Lock(); + _control->Attach(id); + _adminLock.Unlock(); + } + void Detach(const uint32_t id) { + _adminLock.Lock(); + _control->Detach(id); + _adminLock.Unlock(); + + if (id == _connectionId) { + ASSERT(_service != nullptr); + Core::IWorkerPool::Instance().Submit(PluginHost::IShell::Job::Create(_service, PluginHost::IShell::DEACTIVATED, PluginHost::IShell::FAILURE)); + } + } - uint32_t _connectionId; - PluginHost::IShell* _service; + private: + Core::CriticalSection _adminLock; + Config _config; + OutputMap _outputDirector; + Publishers::WebSocketOutput _webSocketExporter; Exchange::IMessageControl* _control; Core::Sink _observer; - Core::Sink _outputNotification; - Core::Sink _comSink; - std::unique_ptr _webSocketExporter; - Config _config; - string _fullOutputFilePath; - uint16_t _maxExportConnections; + uint32_t _connectionId; + PluginHost::IShell* _service; + }; } // namespace Plugin diff --git a/MessageControl/MessageControlImplementation.cpp b/MessageControl/MessageControlImplementation.cpp index 59a1a83c81..a29ec501b7 100644 --- a/MessageControl/MessageControlImplementation.cpp +++ b/MessageControl/MessageControlImplementation.cpp @@ -22,7 +22,9 @@ #include namespace WPEFramework { + namespace { + string DispatcherIdentifier() { string result; @@ -62,17 +64,14 @@ namespace Plugin { WorkerThread(MessageControlImplementation& parent) : Core::Thread() - , _parent(parent) - { + , _parent(parent) { } ~WorkerThread() override = default; private: uint32_t Worker() override { - if (Thread::IsRunning()) { - _parent.Dispatch(); - } + _parent.Dispatch(); return Core::infinite; } @@ -80,14 +79,19 @@ namespace Plugin { private: MessageControlImplementation& _parent; }; - public: + MessageControlImplementation(const MessageControlImplementation&) = delete; + MessageControlImplementation& operator=(const MessageControlImplementation&) = delete; + MessageControlImplementation() - : _dispatcherIdentifier(DispatcherIdentifier()) + : _adminLock() + , _callback(nullptr) + , _dispatcherIdentifier(DispatcherIdentifier()) , _dispatcherBasePath(DispatcherBasePath()) , _worker(*this) , _client(_dispatcherIdentifier, _dispatcherBasePath, DispatcherSocketPort()) - { + , _factory() + , _controls() { } ~MessageControlImplementation() override { @@ -96,53 +100,45 @@ namespace Plugin { _worker.Wait(Core::Thread::STOPPED, Core::infinite); _client.ClearInstances(); + + if (_callback != nullptr) { + _callback->Release(); + _callback = nullptr; + } } - MessageControlImplementation(const MessageControlImplementation&) = delete; - MessageControlImplementation& operator=(const MessageControlImplementation&) = delete; - public: - uint32_t Configure(const bool isBackground, const bool abbreviate, const bool outputToConsole, - const bool outputToSysLog, const string& outputFileName, const string& binding, const uint32_t port) override + uint32_t Configure(Exchange::IMessageControl::ICallback* callback) override { uint32_t result = Core::ERROR_NONE; - if ((!isBackground && !outputToConsole && !outputToSysLog) || (outputToConsole)) { - _outputDirector.AddOutput(Core::Messaging::MetaData::MessageType::TRACING, std::make_shared(abbreviate)); - } - if ((isBackground && !outputToConsole && !outputToSysLog) || (outputToSysLog)) { - _outputDirector.AddOutput(Core::Messaging::MetaData::MessageType::TRACING, std::make_shared(abbreviate)); - } - if (!outputFileName.empty()) { - _outputDirector.AddOutput(Core::Messaging::MetaData::MessageType::TRACING, std::make_shared(abbreviate, outputFileName)); - } - if (!binding.empty() && port != 0) { - auto udpOutput = std::make_shared(Core::NodeId(binding.c_str(), port)); - _outputDirector.AddOutput(Core::Messaging::MetaData::MessageType::TRACING, udpOutput); - _outputDirector.AddOutput(Core::Messaging::MetaData::MessageType::LOGGING, udpOutput); - } + _callback = callback; + _callback->AddRef(); _client.AddInstance(0); _client.AddFactory(Core::Messaging::MetaData::MessageType::TRACING, &_factory); _client.AddFactory(Core::Messaging::MetaData::MessageType::LOGGING, &_factory); + _worker.Run(); - //check if data is already available + // TODO: Seems this "creates" the doorbell, that shouldbe doneby the constructor, + // Something to check and fix. _client.SkipWaiting(); return result; } - void RegisterConnection(const uint32_t id) override - { + uint32_t Attach(const uint32_t id) override { _client.AddInstance(id); + return (Core::ERROR_NONE); } - void UnregisterConnection(const uint32_t id) override - { + + uint32_t Detach(const uint32_t id) override { _client.RemoveInstance(id); + return (Core::ERROR_NONE); } - uint32_t EnableMessage(const MessageType type, const string& moduleName, const string& categoryName, const bool enable) override + uint32_t Enable(const MessageType type, const string& moduleName, const string& categoryName, const bool enable) override { Core::Messaging::MetaData metaData(static_cast(type), categoryName, moduleName); _client.Enable(metaData, enable); @@ -162,7 +158,7 @@ namespace Plugin { * ERROR_UNAVAILABLE: No more data available * */ - uint32_t ActiveMessages(const bool initialize, MessageType& type, string& moduleName, string& categoryName, bool& enable) override + uint32_t Setting(const bool initialize, MessageType& type, string& moduleName, string& categoryName, bool& enable) override { uint32_t result = Core::ERROR_UNAVAILABLE; @@ -171,52 +167,52 @@ namespace Plugin { _controls = _client.Enabled(); } - if (_controls.Count() > 0) { - bool hasNext = _controls.Next(); - if (hasNext) { - type = static_cast(_controls.Current().first.Type()); - moduleName = _controls.Current().first.Module(); - categoryName = _controls.Current().first.Category(); - enable = _controls.Current().second; - result = Core::ERROR_NONE; - } + if (_controls.Next() == true) { + type = static_cast(_controls.Current().first.Type()); + moduleName = _controls.Current().first.Module(); + categoryName = _controls.Current().first.Category(); + enable = _controls.Current().second; + result = Core::ERROR_NONE; } return result; } - void RegisterOutputNotification(Exchange::IMessageControl::INotification* notification) override - { - _outputDirector.RegisterRawMessageNotification(notification); - } - void UnregisterOutputNotification(const Exchange::IMessageControl::INotification* notification) override - { - _outputDirector.UnregisterRawMessageNotification(notification); - } + BEGIN_INTERFACE_MAP(MessageControlImplementation) + INTERFACE_ENTRY(Exchange::IMessageControl) + END_INTERFACE_MAP + private: void Dispatch() { _client.WaitForUpdates(Core::infinite); + _client.PopMessagesAndCall([this](const Core::Messaging::Information& info, const Core::ProxyType& message) { - _outputDirector.Output(info, message.Origin()); - }); + string rawMessage; + message->ToString(rawMessage); + + _callback->Message(static_cast(info.MessageMetaData().Type()), + info.MessageMetaData().Category(), + info.MessageMetaData().Module(), + info.FileName(), + info.LineNumber(), + info.TimeStamp(), + rawMessage); + }); } - BEGIN_INTERFACE_MAP(MessageControlImplementation) - INTERFACE_ENTRY(Exchange::IMessageControl) - END_INTERFACE_MAP - private: - string _dispatcherIdentifier; - string _dispatcherBasePath; + Core::CriticalSection _adminLock; + IMessageControl::ICallback* _callback; + + const string _dispatcherIdentifier; + const string _dispatcherBasePath; + WorkerThread _worker; Messaging::MessageClient _client; - Messaging::TraceFactory _factory; - Messaging::MessageDirector _outputDirector; - Core::Messaging::ControlList::InformationIterator _controls; - Core::CriticalSection _adminLock; + Core::Messaging::ControlList::InformationIterator _controls; }; SERVICE_REGISTRATION(MessageControlImplementation, 1, 0); diff --git a/MessageControl/MessageControlJsonRpc.cpp b/MessageControl/MessageControlJsonRpc.cpp index 1b0f758ebb..234eb5c7b1 100644 --- a/MessageControl/MessageControlJsonRpc.cpp +++ b/MessageControl/MessageControlJsonRpc.cpp @@ -51,11 +51,8 @@ namespace Plugin { uint32_t MessageControl::endpoint_set(const MessageInfo& params) { uint32_t result = Core::ERROR_NONE; - if (_control != nullptr) { - auto state = params.State.Value() == StateType::ENABLED ? true : false; - result = _control->EnableMessage(static_cast(params.Type.Value()), params.Module.Value(), params.Category.Value(), state); - } - return result; + auto state = params.State.Value() == StateType::ENABLED ? true : false; + return(_control->Enable(static_cast(params.Type.Value()), params.Module.Value(), params.Category.Value(), state)); } // Method: status - Retrieves general information @@ -73,18 +70,18 @@ namespace Plugin { bool initialize = true; if (!params.IsSet()) { - response.Console = _config.Console.Value(); - response.Syslog = _config.SysLog.Value(); - response.FileNameOutput = _fullOutputFilePath; - response.Abbreviated = _config.Abbreviated.Value(); - response.Maxexportconnections = _maxExportConnections; + response.Console = _config.Console; + response.Syslog = _config.SysLog; + response.FileNameOutput = _config.FileName; + response.Abbreviated = _config.Abbreviated; + response.Maxexportconnections = _webSocketExporter.MaxConnections();; if (_config.Remote.IsSet()) { - response.Remote.Binding = _config.Remote.Binding.Value(); - response.Remote.Port = _config.Remote.Port.Value(); + response.Remote.Binding = _config.Remote.Binding; + response.Remote.Port = _config.Remote.Port; } } - while (_control->ActiveMessages(initialize, type, module, category, enabled) == Core::ERROR_NONE) { + while (_control->Setting(initialize, type, module, category, enabled) == Core::ERROR_NONE) { if (!params.IsSet()) { add = true; } else { diff --git a/MessageControl/MessageOutput.cpp b/MessageControl/MessageOutput.cpp index a5c9e449ec..5d3df5b43b 100644 --- a/MessageControl/MessageOutput.cpp +++ b/MessageControl/MessageOutput.cpp @@ -20,154 +20,86 @@ #include "MessageOutput.h" namespace WPEFramework { -namespace Messaging { +namespace Publishers { - TextOutput::TextOutput(const bool abbreviated) - : _abbreviated(abbreviated) + string Text::Convert(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) { - } - - void TextOutput::Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) - { - _output.str(""); - _output.clear(); + string deserializedMessage; + std::ostringstream output; - message->ToString(_deserializedMessage); + message->ToString(deserializedMessage); Core::Time now(info.TimeStamp()); if (_abbreviated == true) { string time(now.ToTimeOnly(true)); - _output << '[' << time.c_str() << ']' - << '[' << info.MessageMetaData().Module() << "]" - << '[' << info.MessageMetaData().Category() << "]: " - << _deserializedMessage << std::endl; + output << '[' << time.c_str() << ']' + << '[' << info.MessageMetaData().Module() << "]" + << '[' << info.MessageMetaData().Category() << "]: " + << deserializedMessage << std::endl; } else { string time(now.ToRFC1123(true)); - _output << '[' << time.c_str() << "]:[" << Core::FileNameOnly(info.FileName().c_str()) << ':' << info.LineNumber() << "] " - << info.MessageMetaData().Category() << ": " << _deserializedMessage << std::endl; + output << '[' << time.c_str() << "]:[" << Core::FileNameOnly(info.FileName().c_str()) << ':' << info.LineNumber() << "] " + << info.MessageMetaData().Category() << ": " << deserializedMessage << std::endl; } - HandleTextMessage(_output.str()); + return(output.str()); } - void ConsoleOutput::HandleTextMessage(const string& message) + void ConsoleOutput::Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) /* override */ { - std::cout << message; + std::cout << _convertor.Convert(info, message); } - void SyslogOutput::HandleTextMessage(const string& message) + void SyslogOutput::Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) /* override */ { #ifndef __WINDOWS__ - syslog(LOG_NOTICE, _T("%s"), message.c_str()); + syslog(LOG_NOTICE, _T("%s"), _convertor.Convert(info, message).c_str()); #else - printf(_T("%s"), message.c_str()); + printf(_T("%s"), _convertor.Convert(info, message).c_str()); #endif } - FileOutput::FileOutput(bool abbreviate, const string& filepath) - : TextOutput(abbreviate) - , _file(filepath) - { - _file.Create(); - - if (!_file.IsOpen()) { - TRACE(Trace::Error, (_T("Could not open file <%s>. Outputting warnings to file unavailable."), filepath)); - } - } - FileOutput::~FileOutput() - { - if (_file.IsOpen()) { - _file.Close(); - } - } - - void FileOutput::HandleTextMessage(const string& message) + void FileOutput::Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) { if (_file.IsOpen()) { - _file.Write(reinterpret_cast(message.c_str()), message.length()); + string line = _convertor.Convert(info, message); + _file.Write(reinterpret_cast(line.c_str()), static_cast(line.length())); } } - - JSONOutput::Data::Data() - : Core::JSON::Container() - , Time() - , Filename() - , Linenumber() - , Category() - , Module() - , Message() + + void JSON::Convert(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message, Data& data) { - Add(_T("time"), &Time); - Add(_T("filename"), &Filename); - Add(_T("linenumber"), &Linenumber); - Add(_T("category"), &Category); - Add(_T("module"), &Module); - Add(_T("message"), &Message); - } + ExtraOutputOptions options = _outputoptions; - JSONOutput::JSONOutput(WPEFramework::PluginHost::Channel& channel) - : _exportChannel(channel) - , _outputoptions(ExtraOutputOptions::ALL) - , _paused(false) - { - } - - bool JSONOutput::IsPaused() const - { - return _paused; - } - - JSONOutput::ExtraOutputOptions JSONOutput::OutputOptions() const - { - return _outputoptions; - } - - void JSONOutput::OutputOptions(const JSONOutput::ExtraOutputOptions outputoptions) - { - _outputoptions = outputoptions; - } - - void JSONOutput::Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) - { - if (!IsPaused()) { - ExtraOutputOptions options = _outputoptions; + if ((AsNumber(options) & AsNumber(ExtraOutputOptions::PAUSED)) == 0) { - WPEFramework::Core::ProxyType data = GetDataContainer(); - data->Clear(); if ((AsNumber(options) & AsNumber(ExtraOutputOptions::INCLUDINGDATE)) != 0) { - data->Time = WPEFramework::Core::Time::Now().ToRFC1123(true); + data.Time = Core::Time::Now().ToRFC1123(true); } else { - data->Time = WPEFramework::Core::Time::Now().ToTimeOnly(true); + data.Time = Core::Time::Now().ToTimeOnly(true); } if ((AsNumber(options) & AsNumber(ExtraOutputOptions::FILENAME)) != 0) { - data->Filename = info.FileName(); - if ((AsNumber(options) & AsNumber(ExtraOutputOptions::LINENUMBER)) != 0) { - data->Linenumber = info.LineNumber(); - } + data.Filename = info.FileName(); + } + if ((AsNumber(options) & AsNumber(ExtraOutputOptions::LINENUMBER)) != 0) { + data.Linenumber = info.LineNumber(); } if ((AsNumber(options) & AsNumber(ExtraOutputOptions::MODULE)) != 0) { - data->Module = info.MessageMetaData().Module(); + data.Module = info.MessageMetaData().Module(); } if ((AsNumber(options) & AsNumber(ExtraOutputOptions::CATEGORY)) != 0) { - data->Category = info.MessageMetaData().Category(); + data.Category = info.MessageMetaData().Category(); } string rawMessage; message->ToString(rawMessage); - data->Message = rawMessage; - - HandleJsonMessage(data); + data.Message = rawMessage; } } - void JSONOutput::HandleJsonMessage(const Core::ProxyType& jsondata) - { - _exportChannel.Submit(WPEFramework::Core::ProxyType(jsondata)); - } - //UDPOutput UDPOutput::Channel::Channel(const Core::NodeId& nodeId) : Core::SocketDatagram(false, nodeId.Origin(), nodeId, Core::Messaging::MessageUnit::DataSize, 0) @@ -224,51 +156,5 @@ namespace Messaging { { _output.Output(info, message); } - - //DIRECTOR - void MessageDirector::AddOutput(Core::Messaging::MetaData::MessageType type, std::shared_ptr output) - { - _outputs[type].push_back(std::move(output)); - } - - void MessageDirector::Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) - { - - for (const auto& output : _outputs[info.MessageMetaData().Type()]) { - output->Output(info, message); - } - - for (const auto& notification : _notifications) { - string rawMessage; - message->ToString(rawMessage); - - notification->ReceiveRawMessage(static_cast(info.MessageMetaData().Type()), - info.MessageMetaData().Category(), - info.MessageMetaData().Module(), - info.FileName(), - info.LineNumber(), - info.TimeStamp(), - rawMessage); - } - } - - void MessageDirector::RegisterRawMessageNotification(Exchange::IMessageControl::INotification* notification) - { - if (notification != nullptr) { - if (std::find(_notifications.begin(), _notifications.end(), notification) == _notifications.end()) { - notification->AddRef(); - _notifications.push_back(notification); - } - } - } - void MessageDirector::UnregisterRawMessageNotification(const Exchange::IMessageControl::INotification* notification) - { - auto it = std::find(_notifications.begin(), _notifications.end(), notification); - if (it != _notifications.end()) { - (*it)->Release(); - _notifications.erase(it); - } - } - } } \ No newline at end of file diff --git a/MessageControl/MessageOutput.h b/MessageControl/MessageOutput.h index e50f307cba..6f2075605f 100644 --- a/MessageControl/MessageOutput.h +++ b/MessageControl/MessageOutput.h @@ -20,78 +20,129 @@ #pragma once #include "Module.h" namespace WPEFramework { -namespace Messaging { +namespace Publishers { - class TextOutput : public Messaging::IMessageOutput { + class Text { public: - explicit TextOutput(const bool abbreviated); - ~TextOutput() override = default; + Text() = delete; + Text(const Text&) = delete; + Text& operator=(const Text&) = delete; - TextOutput(const TextOutput&) = delete; - TextOutput& operator=(const TextOutput&) = delete; + explicit Text(const bool abbreviated) + : _abbreviated(abbreviated) { + } + ~Text() = default; public: - void Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) override; - - private: - virtual void HandleTextMessage(const string& message) = 0; + string Convert(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message); private: - string _deserializedMessage; - std::ostringstream _output; bool _abbreviated; }; - class ConsoleOutput : public TextOutput { + class ConsoleOutput : public Messaging::IMessageOutput { public: - explicit ConsoleOutput(bool abbreviate) - : TextOutput(abbreviate) - { - } - ~ConsoleOutput() override = default; + ConsoleOutput() = delete; ConsoleOutput(const ConsoleOutput&) = delete; ConsoleOutput& operator=(const ConsoleOutput&) = delete; - void HandleTextMessage(const string& message) override; + explicit ConsoleOutput(const bool abbreviate) + : _convertor(abbreviate) { + } + ~ConsoleOutput() override = default; + + public: + void Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) override; + + private: + Text _convertor; }; - class SyslogOutput : public TextOutput { + class SyslogOutput : public Messaging::IMessageOutput { public: - explicit SyslogOutput(bool abbreviate) - : TextOutput(abbreviate) - { - } - ~SyslogOutput() override = default; + SyslogOutput() = delete; SyslogOutput(const SyslogOutput&) = delete; SyslogOutput& operator=(const SyslogOutput&) = delete; - void HandleTextMessage(const string& message) override; - }; + explicit SyslogOutput(const bool abbreviate) + : _convertor(abbreviate) { + } + ~SyslogOutput() override = default; + + public: + void Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) override; - class FileOutput : public TextOutput { + private: + Text _convertor; + }; + + class FileOutput : public Messaging::IMessageOutput { public: - explicit FileOutput(bool abbreviate, const string& filepath); - ~FileOutput() override; + FileOutput() = delete; FileOutput(const FileOutput&) = delete; FileOutput& operator=(const FileOutput&) = delete; - void HandleTextMessage(const string& message) override; + explicit FileOutput(const bool abbreviate, const string& filepath) + : _convertor(abbreviate) + , _file(filepath) { + _file.Create(); + + if (!_file.IsOpen()) { + TRACE(Trace::Error, (_T("Could not open file <%s>. Outputting warnings to file unavailable."), filepath)); + } + } + ~FileOutput() override { + if (_file.IsOpen()) { + _file.Close(); + } + } + + public: + void Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) override; private: Core::File _file; + Text _convertor; + }; - class JSONOutput : public IMessageOutput { + class JSON { + private: + enum class ExtraOutputOptions { + ABREVIATED = 0x00, + FILENAME = 0x01, + LINENUMBER = 0x03, // selecting LINENUMBER will automatically select FILENAME + MODULE = 0x04, + CATEGORY = 0x08, + INCLUDINGDATE = 0x10, + ALL = 0x1F, + PAUSED = 0x20 + }; + public: class Data : public Core::JSON::Container { - public: - Data(); - ~Data() override = default; - Data(const Data&) = delete; Data& operator=(const Data&) = delete; + Data() + : Core::JSON::Container() + , Time() + , Filename() + , Linenumber() + , Category() + , Module() + , Message() + { + Add(_T("time"), &Time); + Add(_T("filename"), &Filename); + Add(_T("linenumber"), &Linenumber); + Add(_T("category"), &Category); + Add(_T("module"), &Module); + Add(_T("message"), &Message); + } + ~Data() override = default; + public: Core::JSON::String Time; Core::JSON::String Filename; @@ -102,60 +153,112 @@ namespace Messaging { }; public: - JSONOutput(const JSONOutput&) = delete; - JSONOutput& operator=(const JSONOutput&) = delete; + JSON(const JSON&) = delete; + JSON& operator=(const JSON&) = delete; - enum class ExtraOutputOptions { - ABREVIATED = 0, - FILENAME = 1, - LINENUMBER = 3, // selecting LINENUMBER will automatically select FILENAME - MODULE = 4, - CATEGORY = 8, - INCLUDINGDATE = 16, - ALL = 31 - }; + JSON() + : _outputoptions(ExtraOutputOptions::ALL) { + } - explicit JSONOutput(WPEFramework::PluginHost::Channel& channel); - ~JSONOutput() = default; + ~JSON() = default; public: + bool FileName() const { + return ((AsNumber(_outputoptions) & AsNumber(ExtraOutputOptions::FILENAME)) != 0); + } + void FileName(const bool enabled) { + if (enabled == true) { + _outputoptions = static_cast(AsNumber(_outputoptions) | AsNumber(ExtraOutputOptions::FILENAME)); + } + else { + _outputoptions = static_cast(AsNumber(_outputoptions) & ~AsNumber(ExtraOutputOptions::FILENAME)); + } + } + bool LineNumber() const { + return ((AsNumber(_outputoptions) & AsNumber(ExtraOutputOptions::LINENUMBER)) != 0); + } + void LineNumber(const bool enabled) { + if (enabled == true) { + _outputoptions = static_cast(AsNumber(_outputoptions) | AsNumber(ExtraOutputOptions::LINENUMBER)); + } + else { + _outputoptions = static_cast(AsNumber(_outputoptions) & ~AsNumber(ExtraOutputOptions::LINENUMBER)); + } + } + bool Module() const { + return ((AsNumber(_outputoptions) & AsNumber(ExtraOutputOptions::MODULE)) != 0); + } + void Module(const bool enabled) { + if (enabled == true) { + _outputoptions = static_cast(AsNumber(_outputoptions) | AsNumber(ExtraOutputOptions::MODULE)); + } + else { + _outputoptions = static_cast(AsNumber(_outputoptions) & ~AsNumber(ExtraOutputOptions::MODULE)); + } + } + bool Category() const { + return ((AsNumber(_outputoptions) & AsNumber(ExtraOutputOptions::CATEGORY)) != 0); + } + void Category(const bool enabled) { + if (enabled == true) { + _outputoptions = static_cast(AsNumber(_outputoptions) | AsNumber(ExtraOutputOptions::CATEGORY)); + } + else { + _outputoptions = static_cast(AsNumber(_outputoptions) & ~AsNumber(ExtraOutputOptions::CATEGORY)); + } + } + bool Date() const { + return ((AsNumber(_outputoptions) & AsNumber(ExtraOutputOptions::INCLUDINGDATE)) != 0); + } + void Date(const bool enabled) { + if (enabled == true) { + _outputoptions = static_cast(AsNumber(_outputoptions) | AsNumber(ExtraOutputOptions::INCLUDINGDATE)); + } + else { + _outputoptions = static_cast(AsNumber(_outputoptions) & ~AsNumber(ExtraOutputOptions::INCLUDINGDATE)); + } + } + bool Paused() const { + return ((AsNumber(_outputoptions) & AsNumber(ExtraOutputOptions::PAUSED)) != 0); + } + void Paused(const bool enabled) { + if (enabled == true) { + _outputoptions = static_cast(AsNumber(_outputoptions) | AsNumber(ExtraOutputOptions::PAUSED)); + } + else { + _outputoptions = static_cast(AsNumber(_outputoptions) & ~AsNumber(ExtraOutputOptions::PAUSED)); + } + } + + void Convert(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message, Data& response); + + private: template static inline auto AsNumber(E t) -> typename std::underlying_type::type { return static_cast::type>(t); } - bool IsPaused() const; - ExtraOutputOptions OutputOptions() const; - void OutputOptions(const ExtraOutputOptions outputoptions); - void Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) override; - - virtual WPEFramework::Core::ProxyType Process(const WPEFramework::Core::ProxyType& element) = 0; - private: - virtual Core::ProxyType GetDataContainer() = 0; - void HandleJsonMessage(const Core::ProxyType& jsondata); - - protected: - WPEFramework::PluginHost::Channel& _exportChannel; std::atomic _outputoptions; - bool _paused; }; - class UDPOutput : public IMessageOutput { + class UDPOutput : public Messaging::IMessageOutput { private: class Channel : public Core::SocketDatagram { public: - Channel(const Core::NodeId& nodeId); - ~Channel() override; + Channel() = delete; Channel(const Channel&) = delete; Channel& operator=(const Channel&) = delete; + explicit Channel(const Core::NodeId& nodeId); + ~Channel() override; + void Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message); private: uint16_t SendData(uint8_t* dataFrame, const uint16_t maxSendSize) override; - //unused + // Unused uint16_t ReceiveData(uint8_t*, const uint16_t) override; void StateChange() override; @@ -165,34 +268,219 @@ namespace Messaging { }; public: - UDPOutput(const Core::NodeId& nodeId); - ~UDPOutput() = default; + UDPOutput() = delete; UDPOutput(const UDPOutput&) = delete; UDPOutput& operator=(const UDPOutput&) = delete; + explicit UDPOutput(const Core::NodeId& nodeId); + ~UDPOutput() = default; + void Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) override; private: Channel _output; }; - class MessageDirector { + class WebSocketOutput : public Messaging::IMessageOutput { + private: + + class ExportCommand : public Core::JSON::Container { + public: + ExportCommand(const ExportCommand&) = delete; + ExportCommand& operator=(const ExportCommand&) = delete; + + ExportCommand() + : Core::JSON::Container() + , Filename() + , Identifier() + , Category() + , IncludingDate() + , Paused() + { + Add(_T("filename"), &Filename); + Add(_T("identifier"), &Identifier); + Add(_T("category"), &Category); + Add(_T("includingdate"), &IncludingDate); + Add(_T("paused"), &Paused); + } + + ~ExportCommand() override = default; + + public: + Core::JSON::Boolean Filename; + Core::JSON::Boolean Identifier; + Core::JSON::Boolean Category; + Core::JSON::Boolean IncludingDate; + Core::JSON::Boolean Paused; + }; + using ChannelMap = std::unordered_map; + + public: + static constexpr uint16_t DefaultMaxConnections = 5; + + public: + WebSocketOutput(const WebSocketOutput& copy) = delete; + WebSocketOutput& operator=(const WebSocketOutput&) = delete; + + explicit WebSocketOutput() + : _lock() + , _server(nullptr) + , _channels() + , _maxExportConnections(0) + , _jsonExportDataFactory(2) + , _jsonExportCommandFactory(2) + { + } + ~WebSocketOutput() override = default; + public: - MessageDirector() = default; - ~MessageDirector() = default; - MessageDirector(const MessageDirector&) = delete; - MessageDirector& operator=(const MessageDirector&) = delete; + void Initialize(PluginHost::IShell* service, const uint32_t maxConnections = DefaultMaxConnections) { + _lock.Lock(); + _server = service; + _server->AddRef(); + _maxExportConnections = maxConnections; + _lock.Unlock(); + } + void Deinitialize() { + _lock.Lock(); + _server->Release(); + _server = nullptr; + _channels.clear(); + _maxExportConnections = 0; + _lock.Unlock(); + } + bool Attach(const uint32_t id) + { + bool accepted = false; + + _lock.Lock(); - void AddOutput(Core::Messaging::MetaData::MessageType type, std::shared_ptr output); - void Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message); + if (_channels.size() < _maxExportConnections) { + ChannelMap::iterator index = _channels.find(id); - void RegisterRawMessageNotification(Exchange::IMessageControl::INotification* notification); - void UnregisterRawMessageNotification(const Exchange::IMessageControl::INotification* notification); + ASSERT(index == _channels.end()); + + _channels.emplace(std::piecewise_construct, + std::forward_as_tuple(id), + std::forward_as_tuple()); + + accepted = true; + } + + _lock.Unlock(); + + return accepted; + } + + bool Detach(const uint32_t id) { + bool deactivated = false; + + _lock.Lock(); + + ChannelMap::iterator index = _channels.find(id); + + if (index != _channels.end()) { + _channels.erase(index); + deactivated = true; + } + + _lock.Unlock(); + + return deactivated; + } + + uint32_t MaxConnections() const { + return (_maxExportConnections); + } + + Core::ProxyType Received (const uint32_t id, const Core::ProxyType& element) { + Core::ProxyType info = Core::ProxyType(element); + + if (info.IsValid() == false) { + element.Release(); + } + else { + _lock.Lock(); + + ChannelMap::iterator index = _channels.find(id); + + if (index != _channels.end()) { + if (info->Filename.IsSet() == true) { + index->second.FileName(info->Filename == true); + } + if (info->Identifier.IsSet() == true) { + index->second.LineNumber(info->Identifier == true); + } + if (info->Category.IsSet() == true) { + index->second.Category(info->Category == true); + } + if (info->IncludingDate.IsSet() == true) { + index->second.Date(info->IncludingDate == true); + } + if (info->Paused.IsSet() == true) { + index->second.Paused(info->Paused == true); + } + + info->Clear(); + info->Filename = index->second.FileName(); + info->Identifier = index->second.LineNumber(); + info->Category = index->second.Category(); + info->IncludingDate = index->second.Date(); + info->Paused = index->second.Paused(); + } + + _lock.Unlock(); + } + + return (element); + } + + void Output(const Core::Messaging::Information& info, const Core::Messaging::IEvent* message) override { + + std::list>> cachedList; + PluginHost::IShell* server = nullptr; + + _lock.Lock(); + + if (_server != nullptr) { + + for (auto& item : _channels) { + if (item.second.Paused() == false) { + Core::ProxyType data = _jsonExportDataFactory.Element(); + item.second.Convert(info, message, *data); + cachedList.emplace_back(item.first, Core::ProxyType(data)); + } + } + + if (cachedList.empty() == false) { + server = _server; + server->AddRef(); + } + } + + _lock.Unlock(); + + if (server != nullptr) { + for (std::pair>& entry : cachedList) { + _server->Submit(entry.first, entry.second); + } + cachedList.clear(); + server->Release(); + } + } + + Core::ProxyType Command() { + return (Core::ProxyType(_jsonExportCommandFactory.Element())); + } private: - using Outputs = std::unordered_map>>; - Outputs _outputs; - std::list _notifications; + mutable Core::CriticalSection _lock; + PluginHost::IShell* _server; + ChannelMap _channels; + uint32_t _maxExportConnections; + Core::ProxyPoolType _jsonExportDataFactory; + Core::ProxyPoolType _jsonExportCommandFactory; }; + } } \ No newline at end of file