15 #ifndef RCLCPP__SUBSCRIPTION_HPP_ 16 #define RCLCPP__SUBSCRIPTION_HPP_ 49 #include "tracetools/tracetools.h" 54 namespace node_interfaces
56 class NodeTopicsInterface;
61 typename CallbackMessageT,
94 rclcpp::node_interfaces::NodeBaseInterface * node_base,
95 const rosidl_message_type_support_t & type_support_handle,
96 const
std::
string & topic_name,
100 typename MessageMemoryStrategyT::SharedPtr message_memory_strategy)
105 options.template to_rcl_subscription_options<CallbackMessageT>(qos),
106 rclcpp::subscription_traits::is_serialized_subscription_argument<CallbackMessageT>::value),
107 any_callback_(callback),
109 message_memory_strategy_(message_memory_strategy)
111 if (options.event_callbacks.deadline_callback) {
112 this->add_event_handler(
113 options.event_callbacks.deadline_callback,
116 if (options.event_callbacks.liveliness_callback) {
117 this->add_event_handler(
118 options.event_callbacks.liveliness_callback,
129 "intraprocess communication is not allowed with keep all history qos policy");
131 if (qos.get_rmw_qos_profile().depth == 0) {
133 "intraprocess communication is not allowed with 0 depth qos policy");
137 "intraprocess communication allowed only with volatile durability");
141 auto context = node_base->get_context();
146 typename MessageUniquePtr::deleter_type
149 options.get_allocator(),
151 this->get_topic_name(),
152 qos.get_rmw_qos_profile(),
156 rclcpp_subscription_init,
157 (
const void *)get_subscription_handle().
get(),
158 (
const void *)subscription_intra_process.get());
162 auto ipm = context->get_sub_context<IntraProcessManager>();
163 uint64_t intra_process_subscription_id = ipm->
add_subscription(subscription_intra_process);
164 this->setup_intra_process(intra_process_subscription_id, ipm);
168 rclcpp_subscription_init,
169 (
const void *)get_subscription_handle().
get(),
172 rclcpp_subscription_callback_added,
174 (
const void *)&any_callback_);
178 any_callback_.register_callback_for_tracing();
199 AllocatorT>::SharedPtr message_memory_strategy)
201 message_memory_strategy_ = message_memory_strategy;
210 return message_memory_strategy_->borrow_message();
215 return message_memory_strategy_->borrow_serialized_message();
221 if (matches_any_intra_process_publishers(&message_info.
publisher_gid)) {
227 any_callback_.dispatch(typed_message, message_info);
234 auto typed_message =
static_cast<CallbackMessageT *
>(loaned_message);
237 typed_message, [](CallbackMessageT * msg) {(void) msg;});
238 any_callback_.dispatch(sptr, message_info);
251 message_memory_strategy_->return_serialized_message(message);
256 return any_callback_.use_take_shared_method();
270 message_memory_strategy_;
275 #endif // RCLCPP__SUBSCRIPTION_HPP_ allocator::Deleter< MessageAllocator, CallbackMessageT > MessageDeleter
Definition: subscription.hpp:74
std::shared_ptr< rcl_serialized_message_t > create_serialized_message() override
Borrow a new serialized message.
Definition: subscription.hpp:213
Default allocation strategy for messages received by subscriptions.
Definition: message_memory_strategy.hpp:40
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
Encapsulation of Quality of Service settings.
Definition: qos.hpp:55
RMW_QOS_POLICY_HISTORY_KEEP_ALL
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
allocator::AllocRebind< CallbackMessageT, AllocatorT > MessageAllocatorTraits
Definition: subscription.hpp:72
void handle_loaned_message(void *loaned_message, const rmw_message_info_t &message_info) override
Definition: subscription.hpp:231
virtual void return_message(std::shared_ptr< MessageT > &msg)
Release ownership of the message, which will deallocate it if it has no more owners.
Definition: message_memory_strategy.hpp:125
Definition: subscription_intra_process.hpp:45
Subscription implementation, templated on the type of message this subscription receives.
Definition: subscription.hpp:67
void return_message(std::shared_ptr< void > &message) override
Return the borrowed message.
Definition: subscription.hpp:243
void post_init_setup(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rclcpp::QoS &qos, const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options)
Called after construction to continue setup that requires shared_from_this().
Definition: subscription.hpp:182
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
RMW_QOS_POLICY_DURABILITY_VOLATILE
bool resolve_use_intra_process(const OptionsT &options, const NodeBaseT &node_base)
Return whether or not intra process is enabled, resolving "NodeDefault" if needed.
Definition: resolve_use_intra_process.hpp:31
Definition: any_subscription_callback.hpp:36
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED
This class performs intra process communication between nodes.
Definition: intra_process_manager.hpp:91
Pure virtual interface class for the NodeBase part of the Node API.
Definition: node_base_interface.hpp:36
std::shared_ptr< void > create_message() override
Borrow a new message.
Definition: subscription.hpp:204
T static_pointer_cast(T... args)
typename MessageAllocatorTraits::allocator_type MessageAllocator
Definition: subscription.hpp:73
rclcpp::IntraProcessBufferType resolve_intra_process_buffer_type(const rclcpp::IntraProcessBufferType buffer_type, const rclcpp::AnySubscriptionCallback< CallbackMessageT, AllocatorT > &any_subscription_callback)
Return the buffer type, resolving the "CallbackDefault" type to an actual type if needed...
Definition: resolve_intra_process_buffer_type.hpp:32
Structure containing optional configuration for Subscriptions.
Definition: subscription_options.hpp:58
uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
Pure virtual interface class for the NodeTopics part of the Node API.
Definition: node_topics_interface.hpp:38
Definition: subscription_base.hpp:54
void return_serialized_message(std::shared_ptr< rcl_serialized_message_t > &message) override
Return the message borrowed in create_serialized_message.
Definition: subscription.hpp:249
void handle_message(std::shared_ptr< void > &message, const rmw_message_info_t &message_info) override
Check if we need to handle the message, and execute the callback if we do.
Definition: subscription.hpp:218
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
bool use_take_shared_method() const
Definition: subscription.hpp:254
RCL_SUBSCRIPTION_LIVELINESS_CHANGED
void set_message_memory_strategy(typename message_memory_strategy::MessageMemoryStrategy< CallbackMessageT, AllocatorT >::SharedPtr message_memory_strategy)
Support dynamically setting the message memory strategy.
Definition: subscription.hpp:197