15 #ifndef RCLCPP__SUBSCRIPTION_HPP_ 16 #define RCLCPP__SUBSCRIPTION_HPP_ 30 #include "rcl_interfaces/msg/intra_process_message.hpp" 44 namespace node_interfaces
46 class NodeTopicsInterface;
66 const rosidl_message_type_support_t & type_support_handle,
122 rcl_interfaces::msg::IntraProcessMessage & ipm,
125 const rosidl_message_type_support_t &
139 rosidl_message_type_support_t type_support_;
145 typename CallbackMessageT,
171 const rosidl_message_type_support_t & ts,
172 const
std::
string & topic_name,
175 typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, Alloc>::SharedPtr
176 memory_strategy = message_memory_strategy::MessageMemoryStrategy<CallbackMessageT,
177 Alloc>::create_default())
182 subscription_options,
183 rclcpp::subscription_traits::is_serialized_subscription_argument<CallbackMessageT>::value),
184 any_callback_(callback),
185 message_memory_strategy_(memory_strategy),
186 get_intra_process_message_callback_(
nullptr),
187 matches_any_intra_process_publishers_(
nullptr)
197 Alloc>::SharedPtr message_memory_strategy)
199 message_memory_strategy_ = message_memory_strategy;
218 if (matches_any_intra_process_publishers_) {
219 if (matches_any_intra_process_publishers_(&message_info.
publisher_gid)) {
226 any_callback_.
dispatch(typed_message, message_info);
243 rcl_interfaces::msg::IntraProcessMessage & ipm,
246 if (!get_intra_process_message_callback_) {
254 get_intra_process_message_callback_(
256 ipm.message_sequence,
257 intra_process_subscription_id_,
274 uint64_t intra_process_subscription_id,
284 intra_process_topic_name.
c_str(),
285 &intra_process_options);
292 intra_process_topic_name,
300 intra_process_subscription_id_ = intra_process_subscription_id;
301 get_intra_process_message_callback_ = get_message_callback;
302 matches_any_intra_process_publishers_ = matches_any_publisher_callback;
309 if (!get_intra_process_message_callback_) {
320 message_memory_strategy_;
324 uint64_t intra_process_subscription_id_;
329 #endif // RCLCPP__SUBSCRIPTION_HPP_ SubscriptionBase(std::shared_ptr< rcl_node_t > node_handle, const rosidl_message_type_support_t &type_support_handle, const std::string &topic_name, const rcl_subscription_options_t &subscription_options, bool is_serialized=false)
Default constructor.
std::shared_ptr< rcl_node_t > node_handle_
Definition: subscription.hpp:134
Subscription implementation, templated on the type of message this subscription receives.
Definition: subscription.hpp:147
allocator::AllocRebind< CallbackMessageT, Alloc > MessageAllocTraits
Definition: subscription.hpp:152
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage &ipm, const rmw_message_info_t &message_info)
Definition: subscription.hpp:242
const char * get_topic_name() const
Get the topic that this subscription is subscribed on.
virtual void return_serialized_message(std::shared_ptr< rcl_serialized_message_t > &serialized_msg)
Definition: message_memory_strategy.hpp:123
typename MessageAllocTraits::allocator_type MessageAlloc
Definition: subscription.hpp:153
const rosidl_message_type_support_t & get_message_type_support_handle() const
bool is_serialized() const
Definition: allocator_common.hpp:24
std::function< void(uint64_t, uint64_t, uint64_t, MessageUniquePtr &)> GetMessageCallbackType
Definition: subscription.hpp:269
virtual std::shared_ptr< void > create_message()=0
Borrow a new message.
virtual void return_message(std::shared_ptr< MessageT > &msg)
Release ownership of the message, which will deallocate it if it has no more owners.
Definition: message_memory_strategy.hpp:118
rcl_ret_t rcl_subscription_init(rcl_subscription_t *subscription, const rcl_node_t *node, const rosidl_message_type_support_t *type_support, const char *topic_name, const rcl_subscription_options_t *options)
void dispatch_intra_process(MessageUniquePtr &message, const rmw_message_info_t &message_info)
Definition: any_subscription_callback.hpp:179
std::shared_ptr< void > create_message()
Borrow a new message.
Definition: subscription.hpp:202
std::string expand_topic_or_service_name(const std::string &name, const std::string &node_name, const std::string &namespace_, bool is_service=false)
Expand a topic or service name and throw if it is not valid.
virtual const std::shared_ptr< rcl_subscription_t > get_intra_process_subscription_handle() const
Definition: any_subscription_callback.hpp:34
void throw_from_rcl_error(rcl_ret_t ret, const std::string &prefix="", const rcl_error_state_t *error_state=nullptr, void(*reset_error)()=rcl_reset_error)
Throw a C++ std::exception which was created based on an rcl error.
const std::shared_ptr< rcl_subscription_t > get_intra_process_subscription_handle() const
Implemenation detail.
Definition: subscription.hpp:307
allocator::Deleter< MessageAlloc, CallbackMessageT > MessageDeleter
Definition: subscription.hpp:154
virtual std::shared_ptr< rcl_serialized_message_t > create_serialized_message()=0
Borrow a new serialized message.
void set_message_memory_strategy(typename message_memory_strategy::MessageMemoryStrategy< CallbackMessageT, Alloc >::SharedPtr message_memory_strategy)
Support dynamically setting the message memory strategy.
Definition: subscription.hpp:195
void return_serialized_message(std::shared_ptr< rcl_serialized_message_t > &message)
Return the message borrowed in create_serialized_message.
Definition: subscription.hpp:237
virtual void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage &ipm, const rmw_message_info_t &message_info)=0
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
const rosidl_message_type_support_t * get_intra_process_message_msg_type_support()
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
Definition: subscription.hpp:51
std::function< bool(const rmw_gid_t *)> MatchesAnyPublishersCallbackType
Definition: subscription.hpp:270
Pure virtual interface class for the NodeTopics part of the Node API.
Definition: node_topics_interface.hpp:38
virtual void handle_message(std::shared_ptr< void > &message, const rmw_message_info_t &message_info)=0
Check if we need to handle the message, and execute the callback if we do.
T static_pointer_cast(T... args)
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
Default allocation strategy for messages received by subscriptions.
Definition: message_memory_strategy.hpp:38
virtual void return_serialized_message(std::shared_ptr< rcl_serialized_message_t > &message)=0
Return the message borrowed in create_serialized_message.
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
std::shared_ptr< rcl_serialized_message_t > create_serialized_message()
Borrow a new serialized message.
Definition: subscription.hpp:211
#define RCL_RET_TOPIC_NAME_INVALID
const char * rcl_node_get_name(const rcl_node_t *node)
virtual std::shared_ptr< rcl_serialized_message_t > borrow_serialized_message(size_t capacity)
Definition: message_memory_strategy.hpp:85
const char * rcl_node_get_namespace(const rcl_node_t *node)
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
std::shared_ptr< rcl_subscription_t > get_subscription_handle()
virtual std::shared_ptr< MessageT > borrow_message()
By default, dynamically allocate a new message.
Definition: message_memory_strategy.hpp:80
std::shared_ptr< rcl_subscription_t > subscription_handle_
Definition: subscription.hpp:133
virtual ~SubscriptionBase()
Default destructor.
void setup_intra_process(uint64_t intra_process_subscription_id, GetMessageCallbackType get_message_callback, MatchesAnyPublishersCallbackType matches_any_publisher_callback, const rcl_subscription_options_t &intra_process_options)
Implemenation detail.
Definition: subscription.hpp:273
void handle_message(std::shared_ptr< void > &message, const rmw_message_info_t &message_info)
Check if we need to handle the message, and execute the callback if we do.
Definition: subscription.hpp:216
void dispatch(std::shared_ptr< MessageT > message, const rmw_message_info_t &message_info)
Definition: any_subscription_callback.hpp:154
std::shared_ptr< rcl_subscription_t > intra_process_subscription_handle_
Definition: subscription.hpp:132
void return_message(std::shared_ptr< void > &message)
Return the loaned message.
Definition: subscription.hpp:231
virtual void return_message(std::shared_ptr< void > &message)=0
Return the message borrowed in create_message.