15 #ifndef RCLCPP__SUBSCRIPTION_FACTORY_HPP_ 16 #define RCLCPP__SUBSCRIPTION_FACTORY_HPP_ 25 #include "rosidl_typesupport_cpp/message_type_support.hpp" 51 rclcpp::SubscriptionBase::SharedPtr(
61 rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm,
62 rclcpp::SubscriptionBase::SharedPtr subscription,
73 typename CallbackMessageT,
74 typename SubscriptionT>
77 CallbackT && callback,
79 CallbackMessageT, Alloc>::SharedPtr
87 any_subscription_callback.
set(std::forward<CallbackT>(callback));
90 std::make_shared<typename Subscription<CallbackMessageT, Alloc>::MessageAlloc>();
94 [allocator, msg_mem_strat, any_subscription_callback, message_alloc](
98 ) -> rclcpp::SubscriptionBase::SharedPtr
100 subscription_options.allocator =
101 rclcpp::allocator::get_rcl_allocator<CallbackMessageT>(*message_alloc.get());
107 node_base->get_shared_rcl_node_handle(),
108 *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
110 subscription_options,
111 any_subscription_callback,
120 rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm,
121 rclcpp::SubscriptionBase::SharedPtr subscription,
124 rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = ipm;
125 uint64_t intra_process_subscription_id = ipm->add_subscription(subscription);
128 intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator<CallbackMessageT>(
129 *message_alloc.get());
130 intra_process_options.qos = subscription_options.qos;
131 intra_process_options.ignore_local_publications =
false;
134 auto take_intra_process_message_func =
136 uint64_t publisher_id,
137 uint64_t message_sequence,
138 uint64_t subscription_id,
141 auto ipm = weak_ipm.lock();
145 "intra process take called after destruction of intra process manager");
147 ipm->take_intra_process_message<CallbackMessageT, Alloc>(
148 publisher_id, message_sequence, subscription_id, message);
152 auto matches_any_publisher_func =
153 [weak_ipm](
const rmw_gid_t * sender_gid) ->
bool 155 auto ipm = weak_ipm.lock();
158 "intra process publisher check called " 159 "after destruction of intra process manager");
161 return ipm->matches_any_publishers(sender_gid);
165 typed_sub_ptr->setup_intra_process(
166 intra_process_subscription_id,
167 take_intra_process_message_func,
168 matches_any_publisher_func,
169 intra_process_options
180 #endif // RCLCPP__SUBSCRIPTION_FACTORY_HPP_ Default allocation strategy for messages received by subscriptions.
Definition: message_memory_strategy.hpp:40
Definition: allocator_common.hpp:24
Subscription implementation, templated on the type of message this subscription receives.
Definition: subscription.hpp:148
rcl_subscription_options_t rcl_subscription_get_default_options(void)
Definition: any_subscription_callback.hpp:34
SubscriptionFactoryFunction create_typed_subscription
Definition: subscription_factory.hpp:56
Pure virtual interface class for the NodeBase part of the Node API.
Definition: node_base_interface.hpp:36
Factory with functions used to create a Subscription<MessageT>.
Definition: subscription_factory.hpp:47
T dynamic_pointer_cast(T... args)
SetupIntraProcessFunction setup_intra_process
Definition: subscription_factory.hpp:65
Definition: subscription.hpp:51
SubscriptionFactory create_subscription_factory(CallbackT &&callback, typename rclcpp::message_memory_strategy::MessageMemoryStrategy< CallbackMessageT, Alloc >::SharedPtr msg_mem_strat, std::shared_ptr< Alloc > allocator)
Return a SubscriptionFactory with functions for creating a SubscriptionT<MessageT, Alloc>.
Definition: subscription_factory.hpp:76
void set(CallbackT callback)
Definition: any_subscription_callback.hpp:79