Skip to content

Commit

Permalink
capicxx-dbus-runtime 3.1.12.3
Browse files Browse the repository at this point in the history
  • Loading branch information
juergengehring committed Jan 25, 2018
1 parent f072e9b commit c989150
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 21 deletions.
4 changes: 4 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
Changes
=======
v3.1.12.3
- Fixed data race in generated StubDefault when using attributes
- use eventfd instead of pipe in Watch

v3.1.12.2
- Fixed hang-up in proxy destruction when async method call was done and proxy is not available
- DBus connections are now released even if they were not created with the Factory interface
Expand Down
8 changes: 7 additions & 1 deletion include/CommonAPI/DBus/DBusMainLoopContext.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ class DBusQueueWatch : public Watch {
void processQueueEntry(std::shared_ptr<QueueEntry> _queueEntry);

private:
#ifdef _WIN32
int pipeFileDescriptors_[2];
#else
int eventFd_;
#endif

pollfd pollFileDescriptor_;

Expand All @@ -133,9 +137,11 @@ class DBusQueueWatch : public Watch {

std::weak_ptr<DBusConnection> connection_;

const int pipeValue_;
#ifdef _WIN32
HANDLE wsaEvent_;
const int pipeValue_;
#else
const std::uint64_t eventFdValue_;
#endif

};
Expand Down
24 changes: 20 additions & 4 deletions include/CommonAPI/DBus/DBusStubAdapterHelper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,8 @@ class DBusGetAttributeStubDispatcher: public virtual StubDispatcher<StubClass_>
public:
typedef typename StubClass_::RemoteEventHandlerType RemoteEventHandlerType;
typedef const AttributeType_& (StubClass_::*GetStubFunctor)(std::shared_ptr<CommonAPI::ClientId>);
typedef typename StubClass_::StubAdapterType StubAdapterType;
typedef typename CommonAPI::Stub<StubAdapterType, typename StubClass_::RemoteEventType> StubType;

DBusGetAttributeStubDispatcher(GetStubFunctor _getStubFunctor, const char *_signature, AttributeDepl_ *_depl = nullptr):
getStubFunctor_(_getStubFunctor),
Expand All @@ -891,7 +893,13 @@ class DBusGetAttributeStubDispatcher: public virtual StubDispatcher<StubClass_>

std::shared_ptr<DBusClientId> clientId = std::make_shared<DBusClientId>(std::string(dbusMessage.getSender()));
auto varDepl = CommonAPI::DBus::VariantDeployment<AttributeDepl_>(true, depl_); // presuming FreeDesktop variant deployment, as support for "legacy" service only
_output << CommonAPI::Deployable<CommonAPI::Variant<AttributeType_>, CommonAPI::DBus::VariantDeployment<AttributeDepl_>>((stub.get()->*getStubFunctor_)(clientId), &varDepl);

auto stubAdapter = stub->StubType::getStubAdapter();
stubAdapter->lockAttributes();
auto deployable = CommonAPI::Deployable<CommonAPI::Variant<AttributeType_>, CommonAPI::DBus::VariantDeployment<AttributeDepl_>>((stub.get()->*getStubFunctor_)(clientId), &varDepl);
stubAdapter->unlockAttributes();

_output << deployable;
_output.flush();
}

Expand All @@ -902,8 +910,12 @@ class DBusGetAttributeStubDispatcher: public virtual StubDispatcher<StubClass_>

std::shared_ptr<DBusClientId> clientId = std::make_shared<DBusClientId>(std::string(dbusMessage.getSender()));

dbusOutputStream << CommonAPI::Deployable<AttributeType_, AttributeDepl_>((stub.get()->*getStubFunctor_)(clientId), depl_);

auto stubAdapter = stub->StubType::getStubAdapter();
stubAdapter->lockAttributes();
auto deployable = CommonAPI::Deployable<AttributeType_, AttributeDepl_>((stub.get()->*getStubFunctor_)(clientId), depl_);
stubAdapter->unlockAttributes();

dbusOutputStream << deployable;
dbusOutputStream.flush();
if (std::shared_ptr<DBusProxyConnection> connection = connection_.lock()) {
bool isSuccessful = connection->sendDBusMessage(dbusMessageReply);
Expand Down Expand Up @@ -1047,7 +1059,11 @@ class DBusSetObservableAttributeStubDispatcher: public virtual DBusSetAttributeS
RemoteEventHandlerType* _remoteEventHandler,
const std::shared_ptr<StubClass_> _stub) {
(void)_remoteEventHandler;
(_stub->StubType::getStubAdapter().get()->*fireChangedFunctor_)(this->getAttributeValue(_client, _stub));

auto stubAdapter = _stub->StubType::getStubAdapter();
stubAdapter->lockAttributes();
(stubAdapter.get()->*fireChangedFunctor_)(this->getAttributeValue(_client, _stub));
stubAdapter->unlockAttributes();
}

const FireChangedFunctor fireChangedFunctor_;
Expand Down
51 changes: 35 additions & 16 deletions src/CommonAPI/DBus/DBusMainLoopContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#else
#include <poll.h>
#include <unistd.h>
#include <sys/eventfd.h>
#endif

#include <fcntl.h>
Expand Down Expand Up @@ -177,7 +178,14 @@ void DBusWatch::addDependentDispatchSource(DispatchSource* dispatchSource) {
dependentDispatchSources_.push_back(dispatchSource);
}

DBusQueueWatch::DBusQueueWatch(std::shared_ptr<DBusConnection> _connection) : pipeValue_(4) {
DBusQueueWatch::DBusQueueWatch(std::shared_ptr<DBusConnection> _connection) :
#ifdef _WIN32
pipeValue_(4)
#else
eventFd_(0),
eventFdValue_(1)
#endif
{
#ifdef _WIN32
WSADATA wsaData;
int iResult;
Expand Down Expand Up @@ -301,12 +309,14 @@ DBusQueueWatch::DBusQueueWatch(std::shared_ptr<DBusConnection> _connection) : pi
closesocket(ListenSocket);
WSACleanup();
}
pollFileDescriptor_.fd = pipeFileDescriptors_[0];
#else
if(pipe2(pipeFileDescriptors_, O_NONBLOCK) == -1) {
eventFd_ = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE);
if (eventFd_ == -1) {
std::perror(__func__);
}
pollFileDescriptor_.fd = eventFd_;
#endif
pollFileDescriptor_.fd = pipeFileDescriptors_[0];
pollFileDescriptor_.events = POLLIN;

connection_ = _connection;
Expand All @@ -326,8 +336,7 @@ DBusQueueWatch::~DBusQueueWatch() {
closesocket(pipeFileDescriptors_[0]);
WSACleanup();
#else
close(pipeFileDescriptors_[0]);
close(pipeFileDescriptors_[1]);
close(eventFd_);
#endif

std::unique_lock<std::mutex> itsLock(queueMutex_);
Expand Down Expand Up @@ -374,8 +383,10 @@ void DBusQueueWatch::removeDependentDispatchSource(CommonAPI::DispatchSource* _d
}

void DBusQueueWatch::pushQueue(std::shared_ptr<QueueEntry> _queueEntry) {
std::unique_lock<std::mutex> itsLock(queueMutex_);
queue_.push(_queueEntry);
{
std::unique_lock<std::mutex> itsLock(queueMutex_);
queue_.push(_queueEntry);
}

#ifdef _WIN32
// Send an initial buffer
Expand All @@ -390,15 +401,17 @@ void DBusQueueWatch::pushQueue(std::shared_ptr<QueueEntry> _queueEntry) {
}
}
#else
if(write(pipeFileDescriptors_[1], &pipeValue_, sizeof(pipeValue_)) == -1) {
std::perror(__func__);
while (write(eventFd_, &eventFdValue_, sizeof(eventFdValue_)) == -1) {
if (errno != EAGAIN && errno != EINTR) {
std::perror(__func__);
break;
}
std::this_thread::yield();
}
#endif
}

void DBusQueueWatch::popQueue() {
std::unique_lock<std::mutex> itsLock(queueMutex_);

#ifdef _WIN32
// Receive until the peer closes the connection
int iResult;
Expand All @@ -416,13 +429,19 @@ void DBusQueueWatch::popQueue() {
printf("recv failed with error: %d\n", WSAGetLastError());
}
#else
int readValue = 0;
if(read(pipeFileDescriptors_[0], &readValue, sizeof(readValue)) == -1) {
std::perror(__func__);
std::uint64_t readValue(0);
while (read(eventFd_, &readValue, sizeof(readValue)) == -1) {
if (errno != EAGAIN && errno != EINTR) {
std::perror(__func__);
break;
}
std::this_thread::yield();
}
#endif

queue_.pop();
{
std::unique_lock<std::mutex> itsLock(queueMutex_);
queue_.pop();
}
}

std::shared_ptr<QueueEntry> DBusQueueWatch::frontQueue() {
Expand Down

0 comments on commit c989150

Please sign in to comment.