diff --git a/src/modules/uORB/Subscription.hpp b/src/modules/uORB/Subscription.hpp index 22b69942b6c3..6c98d3871806 100644 --- a/src/modules/uORB/Subscription.hpp +++ b/src/modules/uORB/Subscription.hpp @@ -48,6 +48,8 @@ namespace uORB { +class SubscriptionCallback; + // Base subscription wrapper class class Subscription { @@ -107,6 +109,10 @@ class Subscription protected: + friend class SubscriptionCallback; + + DeviceNode *get_node() { return _node; } + bool subscribe(); void unsubscribe(); diff --git a/src/modules/uORB/SubscriptionCallback.hpp b/src/modules/uORB/SubscriptionCallback.hpp new file mode 100644 index 000000000000..01722bc6819e --- /dev/null +++ b/src/modules/uORB/SubscriptionCallback.hpp @@ -0,0 +1,136 @@ +/**************************************************************************** + * + * Copyright (c) 2019 PX4 Development Team. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name PX4 nor the names of its contributors may be + * used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ****************************************************************************/ + +/** + * @file SubscriptionCallback.hpp + * + */ + +#pragma once + +#include +#include +#include + +namespace uORB +{ + +// Subscription wrapper class with callbacks on new publications +class SubscriptionCallback : public SubscriptionInterval, public ListNode +{ +public: + /** + * Constructor + * + * @param meta The uORB metadata (usually from the ORB_ID() macro) for the topic. + * @param instance The instance for multi sub. + */ + SubscriptionCallback(const orb_metadata *meta, uint8_t interval_ms = 0, uint8_t instance = 0) : + SubscriptionInterval(meta, interval_ms, instance) + { + } + + virtual ~SubscriptionCallback() + { + unregister_callback(); + }; + + bool register_callback() + { + bool ret = false; + + if (_subscription.get_node() && _subscription.get_node()->register_callback(this)) { + // registered + ret = true; + + } else { + // force topic creation by subscribing with old API + int fd = orb_subscribe_multi(_subscription.get_topic(), _subscription.get_instance()); + + // try to register callback again + if (_subscription.forceInit()) { + if (_subscription.get_node() && _subscription.get_node()->register_callback(this)) { + ret = true; + } + } + + orb_unsubscribe(fd); + } + + + return ret; + } + + void unregister_callback() + { + if (_subscription.get_node()) { + _subscription.get_node()->unregister_callback(this); + } + } + + virtual void call() = 0; + +}; + +// Subscription with callback that schedules a WorkItem +class SubscriptionCallbackWorkItem : public SubscriptionCallback +{ +public: + /** + * Constructor + * + * @param work_item The WorkItem that will be scheduled immediately on new publications. + * @param meta The uORB metadata (usually from the ORB_ID() macro) for the topic. + * @param instance The instance for multi sub. + */ + SubscriptionCallbackWorkItem(px4::WorkItem *work_item, const orb_metadata *meta, uint8_t instance = 0) : + SubscriptionCallback(meta, 0, instance), // interval 0 + _work_item(work_item) + { + } + + virtual ~SubscriptionCallbackWorkItem() = default; + + void call() override + { + // schedule immediately if no interval, otherwise check time elapsed + if ((_interval == 0) || (hrt_elapsed_time(&_last_update) >= _interval)) { + _work_item->ScheduleNow(); + } + } + +private: + px4::WorkItem *_work_item; +}; + +} // namespace uORB diff --git a/src/modules/uORB/SubscriptionInterval.hpp b/src/modules/uORB/SubscriptionInterval.hpp new file mode 100644 index 000000000000..253c0570d2ce --- /dev/null +++ b/src/modules/uORB/SubscriptionInterval.hpp @@ -0,0 +1,133 @@ +/**************************************************************************** + * + * Copyright (c) 2019 PX4 Development Team. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name PX4 nor the names of its contributors may be + * used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS + * OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN + * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * + ****************************************************************************/ + +/** + * @file SubscriptionInterval.hpp + * + */ + +#pragma once + +#include +#include + +#include "uORBDeviceNode.hpp" +#include "uORBManager.hpp" +#include "uORBUtils.hpp" + +#include "Subscription.hpp" + +namespace uORB +{ + +// Base subscription wrapper class +class SubscriptionInterval +{ +public: + + /** + * Constructor + * + * @param meta The uORB metadata (usually from the ORB_ID() macro) for the topic. + * @param interval The requested maximum update interval in microseconds. + * @param instance The instance for multi sub. + */ + SubscriptionInterval(const orb_metadata *meta, uint32_t interval_us = 0, uint8_t instance = 0) : + _subscription{meta, instance}, + _interval(interval_us) + {} + + SubscriptionInterval() : _subscription{nullptr} {} + + ~SubscriptionInterval() = default; + + bool published() { return _subscription.published(); } + + /** + * Check if there is a new update. + * */ + bool updated() + { + if (published() && (hrt_elapsed_time(&_last_update) >= _interval)) { + return _subscription.updated(); + } + + return false; + } + + /** + * Copy the struct if updated. + * @param dst The destination pointer where the struct will be copied. + * @return true only if topic was updated and copied successfully. + */ + bool update(void *dst) + { + if (updated()) { + return copy(dst); + } + + return false; + } + + /** + * Copy the struct + * @param dst The destination pointer where the struct will be copied. + * @return true only if topic was copied successfully. + */ + bool copy(void *dst) + { + if (_subscription.copy(dst)) { + _last_update = hrt_absolute_time(); + return true; + } + + return false; + } + + bool valid() const { return _subscription.valid(); } + + uint8_t get_instance() const { return _subscription.get_instance(); } + orb_id_t get_topic() const { return _subscription.get_topic(); } + uint32_t get_interval() const { return _interval; } + + void set_interval(uint32_t interval) { _interval = interval; } + +protected: + + Subscription _subscription; + uint64_t _last_update{0}; // last update in microseconds + uint32_t _interval{0}; // maximum update interval in microseconds + +}; + +} // namespace uORB diff --git a/src/modules/uORB/uORBDeviceNode.cpp b/src/modules/uORB/uORBDeviceNode.cpp index 484ae4ab2459..d6aad0439842 100644 --- a/src/modules/uORB/uORBDeviceNode.cpp +++ b/src/modules/uORB/uORBDeviceNode.cpp @@ -37,6 +37,8 @@ #include "uORBUtils.hpp" #include "uORBManager.hpp" +#include "SubscriptionCallback.hpp" + #ifdef ORB_COMMUNICATOR #include "uORBCommunicator.hpp" #endif /* ORB_COMMUNICATOR */ @@ -317,6 +319,11 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen) _published = true; + // callbacks + for (auto item : _callbacks) { + item->call(); + } + ATOMIC_LEAVE; /* notify any poll waiters */ @@ -673,3 +680,33 @@ int uORB::DeviceNode::update_queue_size(unsigned int queue_size) _queue_size = queue_size; return PX4_OK; } + +bool +uORB::DeviceNode::register_callback(uORB::SubscriptionCallback *callback_sub) +{ + if (callback_sub != nullptr) { + ATOMIC_ENTER; + + // prevent duplicate registrations + for (auto existing_callbacks : _callbacks) { + if (callback_sub == existing_callbacks) { + ATOMIC_LEAVE; + return true; + } + } + + _callbacks.add(callback_sub); + ATOMIC_LEAVE; + return true; + } + + return false; +} + +void +uORB::DeviceNode::unregister_callback(uORB::SubscriptionCallback *callback_sub) +{ + ATOMIC_ENTER; + _callbacks.remove(callback_sub); + ATOMIC_LEAVE; +} diff --git a/src/modules/uORB/uORBDeviceNode.hpp b/src/modules/uORB/uORBDeviceNode.hpp index ee0ec759dc1e..ef0a7b08db3b 100644 --- a/src/modules/uORB/uORBDeviceNode.hpp +++ b/src/modules/uORB/uORBDeviceNode.hpp @@ -45,6 +45,7 @@ namespace uORB class DeviceNode; class DeviceMaster; class Manager; +class SubscriptionCallback; } /** @@ -231,6 +232,12 @@ class uORB::DeviceNode : public cdev::CDev, public ListNode */ uint64_t copy_and_get_timestamp(void *dst, unsigned &generation); + // add item to list of work items to schedule on node update + bool register_callback(SubscriptionCallback *callback_sub); + + // remove item from list of work items + void unregister_callback(SubscriptionCallback *callback_sub); + protected: pollevent_t poll_state(cdev::file_t *filp) override; @@ -269,6 +276,7 @@ class uORB::DeviceNode : public cdev::CDev, public ListNode uint8_t *_data{nullptr}; /**< allocated object buffer */ hrt_abstime _last_update{0}; /**< time the object was last updated */ volatile unsigned _generation{0}; /**< object generation count */ + List _callbacks; uint8_t _priority; /**< priority of the topic */ bool _published{false}; /**< has ever data been published */ uint8_t _queue_size; /**< maximum number of elements in the queue */ diff --git a/src/platforms/common/px4_work_queue/WorkItem.cpp b/src/platforms/common/px4_work_queue/WorkItem.cpp index ec28e2e236d5..cbf8541b20fb 100644 --- a/src/platforms/common/px4_work_queue/WorkItem.cpp +++ b/src/platforms/common/px4_work_queue/WorkItem.cpp @@ -49,8 +49,16 @@ WorkItem::WorkItem(const wq_config_t &config) } } +WorkItem::~WorkItem() +{ + Deinit(); +} + bool WorkItem::Init(const wq_config_t &config) { + // clear any existing first + Deinit(); + px4::WorkQueue *wq = WorkQueueFindOrCreate(config); if (wq == nullptr) { @@ -64,4 +72,16 @@ bool WorkItem::Init(const wq_config_t &config) return false; } +void WorkItem::Deinit() +{ + // remove any currently queued work + if (_wq != nullptr) { + // prevent additional insertions + px4::WorkQueue *wq_temp = _wq; + _wq = nullptr; + + wq_temp->Remove(this); + } +} + } // namespace px4 diff --git a/src/platforms/common/px4_work_queue/WorkItem.hpp b/src/platforms/common/px4_work_queue/WorkItem.hpp index a71cd05c3848..2135d0367c62 100644 --- a/src/platforms/common/px4_work_queue/WorkItem.hpp +++ b/src/platforms/common/px4_work_queue/WorkItem.hpp @@ -51,7 +51,7 @@ class WorkItem : public IntrusiveQueueNode explicit WorkItem(const wq_config_t &config); WorkItem() = delete; - virtual ~WorkItem() = default; + virtual ~WorkItem(); inline void ScheduleNow() { if (_wq != nullptr) _wq->Add(this); } @@ -60,6 +60,7 @@ class WorkItem : public IntrusiveQueueNode protected: bool Init(const wq_config_t &config); + void Deinit(); private: diff --git a/src/platforms/common/px4_work_queue/WorkQueue.cpp b/src/platforms/common/px4_work_queue/WorkQueue.cpp index 965747475618..e9fd51be4d42 100644 --- a/src/platforms/common/px4_work_queue/WorkQueue.cpp +++ b/src/platforms/common/px4_work_queue/WorkQueue.cpp @@ -74,6 +74,8 @@ WorkQueue::~WorkQueue() void WorkQueue::Add(WorkItem *item) { + // TODO: prevent additions when shutting down + work_lock(); _q.push(item); work_unlock(); @@ -82,6 +84,24 @@ void WorkQueue::Add(WorkItem *item) px4_sem_post(&_process_lock); } +void WorkQueue::Remove(WorkItem *item) +{ + work_lock(); + _q.remove(item); + work_unlock(); +} + +void WorkQueue::Clear() +{ + work_lock(); + + while (!_q.empty()) { + _q.pop(); + } + + work_unlock(); +} + void WorkQueue::Run() { while (!should_exit()) { diff --git a/src/platforms/common/px4_work_queue/WorkQueue.hpp b/src/platforms/common/px4_work_queue/WorkQueue.hpp index dcfc57c2b688..ab4ad1f1b758 100644 --- a/src/platforms/common/px4_work_queue/WorkQueue.hpp +++ b/src/platforms/common/px4_work_queue/WorkQueue.hpp @@ -58,10 +58,9 @@ class WorkQueue : public ListNode const char *get_name() { return _config.name; } void Add(WorkItem *item); + void Remove(WorkItem *item); - // TODO: need helpers to handle clean shutdown - remove and clear? - //void remove(WorkItem *item); - //void clear(); + void Clear(); void Run(); diff --git a/src/platforms/common/px4_work_queue/WorkQueueManager.cpp b/src/platforms/common/px4_work_queue/WorkQueueManager.cpp index 9b15bb2c774d..f9d092396f60 100644 --- a/src/platforms/common/px4_work_queue/WorkQueueManager.cpp +++ b/src/platforms/common/px4_work_queue/WorkQueueManager.cpp @@ -198,13 +198,18 @@ static void WorkQueueManagerRun() } // stack size +#ifndef __PX4_QURT const size_t stacksize = math::max(PTHREAD_STACK_MIN, PX4_STACK_ADJUSTED(wq->stacksize)); +#else + const size_t stacksize = math::max(8 * 1024, PX4_STACK_ADJUSTED(wq->stacksize)); +#endif int ret_setstacksize = pthread_attr_setstacksize(&attr, stacksize); if (ret_setstacksize != 0) { PX4_ERR("setting stack size for %s failed (%i)", wq->name, ret_setstacksize); } +#ifndef __PX4_QURT // schedule policy FIFO int ret_setschedpolicy = pthread_attr_setschedpolicy(&attr, SCHED_FIFO); @@ -212,6 +217,8 @@ static void WorkQueueManagerRun() PX4_ERR("failed to set sched policy SCHED_FIFO (%i)", ret_setschedpolicy); } +#endif // ! QuRT + // priority param.sched_priority = sched_get_priority_max(SCHED_FIFO) + wq->relative_priority; int ret_setschedparam = pthread_attr_setschedparam(&attr, ¶m); diff --git a/src/platforms/common/px4_work_queue/WorkQueueManager.hpp b/src/platforms/common/px4_work_queue/WorkQueueManager.hpp index 79007be96067..d53813f8eeb4 100644 --- a/src/platforms/common/px4_work_queue/WorkQueueManager.hpp +++ b/src/platforms/common/px4_work_queue/WorkQueueManager.hpp @@ -48,7 +48,7 @@ struct wq_config_t { namespace wq_configurations { -static constexpr wq_config_t rate_ctrl{"wq:rate_ctrl", 1500, 0}; // PX4 inner loop highest priority +static constexpr wq_config_t rate_ctrl{"wq:rate_ctrl", 1600, 0}; // PX4 inner loop highest priority static constexpr wq_config_t SPI1{"wq:SPI1", 1250, -1}; static constexpr wq_config_t SPI2{"wq:SPI2", 1250, -2}; @@ -62,7 +62,9 @@ static constexpr wq_config_t I2C2{"wq:I2C2", 1250, -8}; static constexpr wq_config_t I2C3{"wq:I2C3", 1250, -9}; static constexpr wq_config_t I2C4{"wq:I2C4", 1250, -10}; -static constexpr wq_config_t hp_default{"wq:hp_default", 1500, -11}; +static constexpr wq_config_t att_pos_ctrl{"wq:att_pos_ctrl", 7000, -11}; // PX4 att/pos controllers, highest priority after sensors + +static constexpr wq_config_t hp_default{"wq:hp_default", 1500, -12}; static constexpr wq_config_t lp_default{"wq:lp_default", 1500, -50}; static constexpr wq_config_t test1{"wq:test1", 800, 0};