rclcpp
master
C++ ROS Client Library API
|
Go to the documentation of this file.
15 #ifndef RCLCPP__SUBSCRIPTION_HPP_
16 #define RCLCPP__SUBSCRIPTION_HPP_
29 #include "rcl/error_handling.h"
51 #include "tracetools/tracetools.h"
56 namespace node_interfaces
58 class NodeTopicsInterface;
63 typename CallbackMessageT,
103 rclcpp::node_interfaces::NodeBaseInterface * node_base,
104 const rosidl_message_type_support_t & type_support_handle,
105 const
std::
string & topic_name,
109 typename MessageMemoryStrategyT::SharedPtr message_memory_strategy,
115 options.template to_rcl_subscription_options<CallbackMessageT>(qos),
116 rclcpp::subscription_traits::is_serialized_subscription_argument<CallbackMessageT>::value),
117 any_callback_(callback),
119 message_memory_strategy_(message_memory_strategy)
121 if (options.event_callbacks.deadline_callback) {
123 options.event_callbacks.deadline_callback,
124 RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED);
126 if (options.event_callbacks.liveliness_callback) {
128 options.event_callbacks.liveliness_callback,
129 RCL_SUBSCRIPTION_LIVELINESS_CHANGED);
131 if (options.event_callbacks.incompatible_qos_callback) {
133 options.event_callbacks.incompatible_qos_callback,
134 RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
135 }
else if (options_.use_default_callbacks) {
142 RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
147 if (options.event_callbacks.message_lost_callback) {
149 options.event_callbacks.message_lost_callback,
150 RCL_SUBSCRIPTION_MESSAGE_LOST);
161 "intraprocess communication is not allowed with keep all history qos policy");
163 if (qos_profile.
depth == 0) {
165 "intraprocess communication is not allowed with 0 depth qos policy");
169 "intraprocess communication allowed only with volatile durability");
173 auto context = node_base->get_context();
174 subscription_intra_process_ = std::make_shared<SubscriptionIntraProcessT>(
176 options.get_allocator(),
178 this->get_topic_name(),
182 rclcpp_subscription_init,
184 static_cast<const void *
>(subscription_intra_process_.get()));
188 auto ipm = context->get_sub_context<IntraProcessManager>();
189 uint64_t intra_process_subscription_id = ipm->
add_subscription(subscription_intra_process_);
193 if (subscription_topic_statistics !=
nullptr) {
194 this->subscription_topic_statistics_ =
std::move(subscription_topic_statistics);
198 rclcpp_subscription_init,
200 static_cast<const void *
>(
this));
202 rclcpp_subscription_callback_added,
203 static_cast<const void *
>(
this),
204 static_cast<const void *
>(&any_callback_));
208 #ifndef TRACETOOLS_DISABLED
209 any_callback_.register_callback_for_tracing();
246 return this->
take_type_erased(
static_cast<void *
>(&message_out), message_info_out);
256 return message_memory_strategy_->borrow_message();
262 return message_memory_strategy_->borrow_serialized_message();
275 auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
278 if (subscription_topic_statistics_) {
284 any_callback_.dispatch(typed_message, message_info);
286 if (subscription_topic_statistics_) {
287 const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
288 const auto time =
rclcpp::Time(nanos.time_since_epoch().count());
289 subscription_topic_statistics_->handle_message(*typed_message, time);
295 void * loaned_message,
298 auto typed_message =
static_cast<CallbackMessageT *
>(loaned_message);
301 typed_message, [](CallbackMessageT * msg) {(void) msg;});
302 any_callback_.dispatch(sptr, message_info);
312 auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
313 message_memory_strategy_->return_message(typed_message);
323 message_memory_strategy_->return_serialized_message(message);
329 return any_callback_.use_take_shared_method();
343 message_memory_strategy_;
349 typename MessageUniquePtr::deleter_type>;
355 #endif // RCLCPP__SUBSCRIPTION_HPP_
std::shared_ptr< rclcpp::SerializedMessage > create_serialized_message() override
Borrow a new serialized message.
Definition: subscription.hpp:260
RMW_QOS_POLICY_HISTORY_KEEP_ALL
void setup_intra_process(uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm)
Implemenation detail.
void handle_message(std::shared_ptr< void > &message, const rclcpp::MessageInfo &message_info) override
Check if we need to handle the message, and execute the callback if we do.
Definition: subscription.hpp:266
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
rmw_qos_profile_t & get_rmw_qos_profile()
Return the rmw qos profile.
std::shared_ptr< rclcpp::topic_statistics::SubscriptionTopicStatistics< CallbackMessageT > > SubscriptionTopicStatisticsSharedPtr
Definition: subscription.hpp:80
std::shared_ptr< rcl_subscription_t > get_subscription_handle()
Structure containing optional configuration for Subscriptions.
Definition: subscription_options.hpp:87
Definition: subscription_base.hpp:60
enum rmw_qos_durability_policy_t durability
void return_serialized_message(std::shared_ptr< rclcpp::SerializedMessage > &message) override
Return the borrowed serialized message.
Definition: subscription.hpp:321
void default_incompatible_qos_callback(QOSRequestedIncompatibleQoSInfo &info) const
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
Subscription implementation, templated on the type of message this subscription receives.
Definition: subscription.hpp:69
uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
Pure virtual interface class for the NodeBase part of the Node API.
Definition: node_base_interface.hpp:36
Encapsulation of Quality of Service settings.
Definition: qos.hpp:110
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
bool use_take_shared_method() const
Definition: subscription.hpp:327
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
allocator::Deleter< MessageAllocator, rcl_interfaces::msg::ParameterEvent > MessageDeleter
Definition: subscription.hpp:76
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
allocator::AllocRebind< rcl_interfaces::msg::ParameterEvent, std::allocator< void > > MessageAllocatorTraits
Definition: subscription.hpp:74
Definition: subscription_intra_process.hpp:47
Definition: any_subscription_callback.hpp:101
Definition: qos_event.hpp:69
enum rmw_qos_history_policy_t history
bool take(CallbackMessageT &message_out, rclcpp::MessageInfo &message_info_out)
Take the next message from the inter-process subscription.
Definition: subscription.hpp:244
Pure virtual interface class for the NodeTopics part of the Node API.
Definition: node_topics_interface.hpp:41
bool take_type_erased(void *message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message from the subscription as a type erased pointer.
Additional meta data about messages taken from subscriptions.
Definition: message_info.hpp:26
bool resolve_use_intra_process(const OptionsT &options, const NodeBaseT &node_base)
Return whether or not intra process is enabled, resolving "NodeDefault" if needed.
Definition: resolve_use_intra_process.hpp:31
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
Default allocation strategy for messages received by subscriptions.
Definition: message_memory_strategy.hpp:41
void handle_loaned_message(void *loaned_message, const rclcpp::MessageInfo &message_info) override
Definition: subscription.hpp:294
bool matches_any_intra_process_publishers(const rmw_gid_t *sender_gid) const
This class performs intra process communication between nodes.
Definition: intra_process_manager.hpp:91
void post_init_setup(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rclcpp::QoS &qos, const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options)
Called after construction to continue setup that requires shared_from_this().
Definition: subscription.hpp:215
void return_message(std::shared_ptr< void > &message) override
Return the borrowed message.
Definition: subscription.hpp:310
void add_event_handler(const EventCallbackT &callback, const rcl_subscription_event_type_t event_type)
Definition: subscription_base.hpp:282
RMW_QOS_POLICY_DURABILITY_VOLATILE
rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
std::shared_ptr< void > create_message() override
Borrow a new message.
Definition: subscription.hpp:250
rclcpp::IntraProcessBufferType resolve_intra_process_buffer_type(const rclcpp::IntraProcessBufferType buffer_type, const rclcpp::AnySubscriptionCallback< CallbackMessageT, AllocatorT > &any_subscription_callback)
Return the buffer type, resolving the "CallbackDefault" type to an actual type if needed.
Definition: resolve_intra_process_buffer_type.hpp:32
typename MessageAllocatorTraits::allocator_type MessageAllocator
Definition: subscription.hpp:75