15 #ifndef RCLCPP__SUBSCRIPTION_FACTORY_HPP_ 16 #define RCLCPP__SUBSCRIPTION_FACTORY_HPP_ 25 #include "rosidl_typesupport_cpp/message_type_support.hpp" 50 rclcpp::subscription::SubscriptionBase::SharedPtr(
52 const std::string & topic_name,
59 rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm,
60 rclcpp::subscription::SubscriptionBase::SharedPtr subscription,
67 template<
typename MessageT,
typename CallbackT,
typename Alloc,
typename SubscriptionT>
70 CallbackT && callback,
73 std::shared_ptr<Alloc> allocator)
78 AnySubscriptionCallback<MessageT, Alloc> any_subscription_callback(allocator);
79 any_subscription_callback.set(std::forward<CallbackT>(callback));
82 std::make_shared<typename subscription::Subscription<MessageT, Alloc>::MessageAlloc>();
86 [allocator, msg_mem_strat, any_subscription_callback, message_alloc](
88 const std::string & topic_name,
90 ) -> rclcpp::subscription::SubscriptionBase::SharedPtr
92 subscription_options.allocator =
93 rclcpp::allocator::get_rcl_allocator<MessageT>(*message_alloc.get());
99 node_base->get_shared_rcl_node_handle(),
101 subscription_options,
102 any_subscription_callback,
111 rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm,
112 rclcpp::subscription::SubscriptionBase::SharedPtr subscription,
115 rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = ipm;
116 uint64_t intra_process_subscription_id = ipm->add_subscription(subscription);
119 intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator<MessageT>(
120 *message_alloc.get());
121 intra_process_options.qos = subscription_options.qos;
122 intra_process_options.ignore_local_publications =
false;
125 auto take_intra_process_message_func =
127 uint64_t publisher_id,
128 uint64_t message_sequence,
129 uint64_t subscription_id,
132 auto ipm = weak_ipm.lock();
135 throw std::runtime_error(
136 "intra process take called after destruction of intra process manager");
138 ipm->take_intra_process_message<MessageT, Alloc>(
139 publisher_id, message_sequence, subscription_id, message);
143 auto matches_any_publisher_func =
144 [weak_ipm](
const rmw_gid_t * sender_gid) ->
bool 146 auto ipm = weak_ipm.lock();
148 throw std::runtime_error(
149 "intra process publisher check called " 150 "after destruction of intra process manager");
152 return ipm->matches_any_publishers(sender_gid);
155 auto typed_sub_ptr = std::dynamic_pointer_cast<SubscriptionT>(subscription);
156 typed_sub_ptr->setup_intra_process(
157 intra_process_subscription_id,
158 take_intra_process_message_func,
159 matches_any_publisher_func,
160 intra_process_options
171 #endif // RCLCPP__SUBSCRIPTION_FACTORY_HPP_ Default allocation strategy for messages received by subscriptions.
Definition: message_memory_strategy.hpp:33
Subscription implementation, templated on the type of message this subscription receives.
Definition: subscription.hpp:124
Definition: any_subscription_callback.hpp:37
std::unique_ptr< MessageT, MessageDeleter > MessageUniquePtr
Definition: subscription.hpp:132
std::function< void(rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm, rclcpp::subscription::SubscriptionBase::SharedPtr subscription, const rcl_subscription_options_t &subscription_options)> SetupIntraProcessFunction
Definition: subscription_factory.hpp:61
Definition: allocator_common.hpp:24
SubscriptionFactory create_subscription_factory(CallbackT &&callback, typename rclcpp::message_memory_strategy::MessageMemoryStrategy< MessageT, Alloc >::SharedPtr msg_mem_strat, std::shared_ptr< Alloc > allocator)
Return a SubscriptionFactory with functions for creating a SubscriptionT<MessageT, Alloc>.
Definition: subscription_factory.hpp:69
rcl_subscription_options_t rcl_subscription_get_default_options(void)
SubscriptionFactoryFunction create_typed_subscription
Definition: subscription_factory.hpp:55
Pure virtual interface class for the NodeBase part of the Node API.
Definition: node_base_interface.hpp:36
Factory with functions used to create a Subscription<MessageT>.
Definition: subscription_factory.hpp:46
Definition: subscription.hpp:53
SetupIntraProcessFunction setup_intra_process
Definition: subscription_factory.hpp:63
std::function< rclcpp::subscription::SubscriptionBase::SharedPtr(rclcpp::node_interfaces::NodeBaseInterface *node_base, const std::string &topic_name, rcl_subscription_options_t &subscription_options)> SubscriptionFactoryFunction
Definition: subscription_factory.hpp:53