15 #ifndef RCLCPP__SUBSCRIPTION_HPP_ 16 #define RCLCPP__SUBSCRIPTION_HPP_ 30 #include "rcl_interfaces/msg/intra_process_message.hpp" 43 namespace node_interfaces
45 class NodeTopicsInterface;
48 namespace subscription
67 std::shared_ptr<rcl_node_t> node_handle,
68 const rosidl_message_type_support_t & type_support_handle,
69 const std::string & topic_name,
74 virtual ~SubscriptionBase();
79 get_topic_name()
const;
83 get_subscription_handle()
const;
87 get_intra_process_subscription_handle()
const;
91 virtual std::shared_ptr<void>
99 handle_message(std::shared_ptr<void> & message,
const rmw_message_info_t & message_info) = 0;
104 return_message(std::shared_ptr<void> & message) = 0;
107 handle_intra_process_message(
108 rcl_interfaces::msg::IntraProcessMessage & ipm,
123 template<
typename MessageT,
typename Alloc = std::allocator<
void>>
148 const
std::
string & topic_name,
151 typename message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
152 memory_strategy = message_memory_strategy::MessageMemoryStrategy<MessageT,
153 Alloc>::create_default())
156 *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
158 subscription_options),
159 any_callback_(callback),
160 message_memory_strategy_(memory_strategy),
161 get_intra_process_message_callback_(
nullptr),
162 matches_any_intra_process_publishers_(
nullptr)
172 Alloc>::SharedPtr message_memory_strategy)
174 message_memory_strategy_ = message_memory_strategy;
182 return message_memory_strategy_->borrow_message();
187 if (matches_any_intra_process_publishers_) {
188 if (matches_any_intra_process_publishers_(&message_info.
publisher_gid)) {
194 auto typed_message = std::static_pointer_cast<MessageT>(message);
195 any_callback_.dispatch(typed_message, message_info);
202 auto typed_message = std::static_pointer_cast<MessageT>(message);
203 message_memory_strategy_->return_message(typed_message);
207 rcl_interfaces::msg::IntraProcessMessage & ipm,
210 if (!get_intra_process_message_callback_) {
218 get_intra_process_message_callback_(
220 ipm.message_sequence,
221 intra_process_subscription_id_,
229 any_callback_.dispatch_intra_process(msg, message_info);
233 std::function<void(uint64_t, uint64_t, uint64_t, MessageUniquePtr &)>;
238 uint64_t intra_process_subscription_id,
243 std::string intra_process_topic_name = std::string(get_topic_name()) +
"/_intra";
245 &intra_process_subscription_handle_,
248 intra_process_topic_name.c_str(),
249 &intra_process_options);
252 auto rcl_node_handle = node_handle_.get();
256 intra_process_topic_name,
264 intra_process_subscription_id_ = intra_process_subscription_id;
265 get_intra_process_message_callback_ = get_message_callback;
266 matches_any_intra_process_publishers_ = matches_any_publisher_callback;
273 if (!get_intra_process_message_callback_) {
276 return &intra_process_subscription_handle_;
284 message_memory_strategy_;
288 uint64_t intra_process_subscription_id_;
294 #endif // RCLCPP__SUBSCRIPTION_HPP_ allocator::Deleter< MessageAlloc, MessageT > MessageDeleter
Definition: subscription.hpp:131
typename MessageAllocTraits::allocator_type MessageAlloc
Definition: subscription.hpp:130
Default allocation strategy for messages received by subscriptions.
Definition: message_memory_strategy.hpp:33
Subscription implementation, templated on the type of message this subscription receives.
Definition: subscription.hpp:124
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
Definition: any_subscription_callback.hpp:37
std::shared_ptr< void > create_message()
Borrow a new message.
Definition: subscription.hpp:176
std::unique_ptr< MessageT, MessageDeleter > MessageUniquePtr
Definition: subscription.hpp:132
Definition: allocator_common.hpp:24
Definition: parameter.hpp:235
std::function< void(uint64_t, uint64_t, uint64_t, MessageUniquePtr &)> GetMessageCallbackType
Definition: subscription.hpp:233
void setup_intra_process(uint64_t intra_process_subscription_id, GetMessageCallbackType get_message_callback, MatchesAnyPublishersCallbackType matches_any_publisher_callback, const rcl_subscription_options_t &intra_process_options)
Implemenation detail.
Definition: subscription.hpp:237
rcl_ret_t rcl_subscription_init(rcl_subscription_t *subscription, const rcl_node_t *node, const rosidl_message_type_support_t *type_support, const char *topic_name, const rcl_subscription_options_t *options)
std::string expand_topic_or_service_name(const std::string &name, const std::string &node_name, const std::string &namespace_, bool is_service=false)
Expand a topic or service name and throw if it is not valid.
void throw_from_rcl_error(rcl_ret_t ret, const std::string &prefix="", const rcl_error_state_t *error_state=nullptr, void(*reset_error)()=rcl_reset_error)
Throw a C++ std::exception which was created based on an rcl error.
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
const rcl_subscription_t * get_intra_process_subscription_handle() const
Implemenation detail.
Definition: subscription.hpp:271
const rosidl_message_type_support_t * get_intra_process_message_msg_type_support()
void set_message_memory_strategy(typename message_memory_strategy::MessageMemoryStrategy< MessageT, Alloc >::SharedPtr message_memory_strategy)
Support dynamically setting the message memory strategy.
Definition: subscription.hpp:170
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
void return_message(std::shared_ptr< void > &message)
Return the loaned message.
Definition: subscription.hpp:200
void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage &ipm, const rmw_message_info_t &message_info)
Definition: subscription.hpp:206
Definition: subscription.hpp:53
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
Pure virtual interface class for the NodeTopics part of the Node API.
Definition: node_topics_interface.hpp:38
allocator::AllocRebind< MessageT, Alloc > MessageAllocTraits
Definition: subscription.hpp:129
std::shared_ptr< rcl_node_t > node_handle_
Definition: subscription.hpp:114
#define RCL_RET_TOPIC_NAME_INVALID
const char * rcl_node_get_name(const rcl_node_t *node)
const char * rcl_node_get_namespace(const rcl_node_t *node)
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
std::function< bool(const rmw_gid_t *)> MatchesAnyPublishersCallbackType
Definition: subscription.hpp:234
void handle_message(std::shared_ptr< void > &message, const rmw_message_info_t &message_info)
Check if we need to handle the message, and execute the callback if we do.
Definition: subscription.hpp:185
rcl_subscription_t rcl_get_zero_initialized_subscription(void)