rclcpp  master
C++ ROS Client Library API
subscription.hpp
Go to the documentation of this file.
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__SUBSCRIPTION_HPP_
16 #define RCLCPP__SUBSCRIPTION_HPP_
17 
18 #include <rmw/error_handling.h>
19 #include <rmw/rmw.h>
20 
21 #include <chrono>
22 #include <functional>
23 #include <iostream>
24 #include <memory>
25 #include <sstream>
26 #include <string>
27 #include <utility>
28 
29 #include "rcl/error_handling.h"
30 #include "rcl/subscription.h"
31 
35 #include "rclcpp/exceptions.hpp"
39 #include "rclcpp/logging.hpp"
40 #include "rclcpp/macros.hpp"
41 #include "rclcpp/message_info.hpp"
49 #include "rclcpp/waitable.hpp"
51 #include "tracetools/tracetools.h"
52 
53 namespace rclcpp
54 {
55 
56 namespace node_interfaces
57 {
58 class NodeTopicsInterface;
59 } // namespace node_interfaces
60 
62 template<
63  typename CallbackMessageT,
64  typename AllocatorT = std::allocator<void>,
65  typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
66  CallbackMessageT,
67  AllocatorT
68  >>
70 {
72 
73 public:
75  using MessageAllocator = typename MessageAllocatorTraits::allocator_type;
81 
83 
84 
85 
103  rclcpp::node_interfaces::NodeBaseInterface * node_base,
104  const rosidl_message_type_support_t & type_support_handle,
105  const std::string & topic_name,
106  const rclcpp::QoS & qos,
107  AnySubscriptionCallback<CallbackMessageT, AllocatorT> callback,
108  const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
109  typename MessageMemoryStrategyT::SharedPtr message_memory_strategy,
110  SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr)
112  node_base,
113  type_support_handle,
114  topic_name,
115  options.template to_rcl_subscription_options<CallbackMessageT>(qos),
116  rclcpp::subscription_traits::is_serialized_subscription_argument<CallbackMessageT>::value),
117  any_callback_(callback),
118  options_(options),
119  message_memory_strategy_(message_memory_strategy)
120  {
121  if (options.event_callbacks.deadline_callback) {
122  this->add_event_handler(
123  options.event_callbacks.deadline_callback,
124  RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED);
125  }
126  if (options.event_callbacks.liveliness_callback) {
127  this->add_event_handler(
128  options.event_callbacks.liveliness_callback,
129  RCL_SUBSCRIPTION_LIVELINESS_CHANGED);
130  }
131  if (options.event_callbacks.incompatible_qos_callback) {
132  this->add_event_handler(
133  options.event_callbacks.incompatible_qos_callback,
134  RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
135  } else if (options_.use_default_callbacks) {
136  // Register default callback when not specified
137  try {
138  this->add_event_handler(
139  [this](QOSRequestedIncompatibleQoSInfo & info) {
141  },
142  RCL_SUBSCRIPTION_REQUESTED_INCOMPATIBLE_QOS);
143  } catch (UnsupportedEventTypeException & /*exc*/) {
144  // pass
145  }
146  }
147  if (options.event_callbacks.message_lost_callback) {
148  this->add_event_handler(
149  options.event_callbacks.message_lost_callback,
150  RCL_SUBSCRIPTION_MESSAGE_LOST);
151  }
152 
153  // Setup intra process publishing if requested.
154  if (rclcpp::detail::resolve_use_intra_process(options, *node_base)) {
156 
157  // Check if the QoS is compatible with intra-process.
159  if (qos_profile.history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) {
160  throw std::invalid_argument(
161  "intraprocess communication is not allowed with keep all history qos policy");
162  }
163  if (qos_profile.depth == 0) {
164  throw std::invalid_argument(
165  "intraprocess communication is not allowed with 0 depth qos policy");
166  }
167  if (qos_profile.durability != RMW_QOS_POLICY_DURABILITY_VOLATILE) {
168  throw std::invalid_argument(
169  "intraprocess communication allowed only with volatile durability");
170  }
171 
172  // First create a SubscriptionIntraProcess which will be given to the intra-process manager.
173  auto context = node_base->get_context();
174  subscription_intra_process_ = std::make_shared<SubscriptionIntraProcessT>(
175  callback,
176  options.get_allocator(),
177  context,
178  this->get_topic_name(), // important to get like this, as it has the fully-qualified name
179  qos_profile,
180  resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback));
181  TRACEPOINT(
182  rclcpp_subscription_init,
183  static_cast<const void *>(get_subscription_handle().get()),
184  static_cast<const void *>(subscription_intra_process_.get()));
185 
186  // Add it to the intra process manager.
188  auto ipm = context->get_sub_context<IntraProcessManager>();
189  uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process_);
190  this->setup_intra_process(intra_process_subscription_id, ipm);
191  }
192 
193  if (subscription_topic_statistics != nullptr) {
194  this->subscription_topic_statistics_ = std::move(subscription_topic_statistics);
195  }
196 
197  TRACEPOINT(
198  rclcpp_subscription_init,
199  static_cast<const void *>(get_subscription_handle().get()),
200  static_cast<const void *>(this));
201  TRACEPOINT(
202  rclcpp_subscription_callback_added,
203  static_cast<const void *>(this),
204  static_cast<const void *>(&any_callback_));
205  // The callback object gets copied, so if registration is done too early/before this point
206  // (e.g. in `AnySubscriptionCallback::set()`), its address won't match any address used later
207  // in subsequent tracepoints.
208 #ifndef TRACETOOLS_DISABLED
209  any_callback_.register_callback_for_tracing();
210 #endif
211  }
212 
214  void
217  const rclcpp::QoS & qos,
219  {
220  (void)node_base;
221  (void)qos;
222  (void)options;
223  }
224 
226 
243  bool
244  take(CallbackMessageT & message_out, rclcpp::MessageInfo & message_info_out)
245  {
246  return this->take_type_erased(static_cast<void *>(&message_out), message_info_out);
247  }
248 
250  create_message() override
251  {
252  /* The default message memory strategy provides a dynamically allocated message on each call to
253  * create_message, though alternative memory strategies that re-use a preallocated message may be
254  * used (see rclcpp/strategies/message_pool_memory_strategy.hpp).
255  */
256  return message_memory_strategy_->borrow_message();
257  }
258 
261  {
262  return message_memory_strategy_->borrow_serialized_message();
263  }
264 
265  void
267  std::shared_ptr<void> & message,
268  const rclcpp::MessageInfo & message_info) override
269  {
271  // In this case, the message will be delivered via intra process and
272  // we should ignore this copy of the message.
273  return;
274  }
275  auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
276 
278  if (subscription_topic_statistics_) {
279  // get current time before executing callback to
280  // exclude callback duration from topic statistics result.
282  }
283 
284  any_callback_.dispatch(typed_message, message_info);
285 
286  if (subscription_topic_statistics_) {
287  const auto nanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
288  const auto time = rclcpp::Time(nanos.time_since_epoch().count());
289  subscription_topic_statistics_->handle_message(*typed_message, time);
290  }
291  }
292 
293  void
295  void * loaned_message,
296  const rclcpp::MessageInfo & message_info) override
297  {
298  auto typed_message = static_cast<CallbackMessageT *>(loaned_message);
299  // message is loaned, so we have to make sure that the deleter does not deallocate the message
301  typed_message, [](CallbackMessageT * msg) {(void) msg;});
302  any_callback_.dispatch(sptr, message_info);
303  }
304 
306 
309  void
311  {
312  auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
313  message_memory_strategy_->return_message(typed_message);
314  }
315 
317 
320  void
322  {
323  message_memory_strategy_->return_serialized_message(message);
324  }
325 
326  bool
328  {
329  return any_callback_.use_take_shared_method();
330  }
331 
332 private:
334 
337 
343  message_memory_strategy_;
345  SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics_{nullptr};
346  using SubscriptionIntraProcessT = rclcpp::experimental::SubscriptionIntraProcess<
347  CallbackMessageT,
348  AllocatorT,
349  typename MessageUniquePtr::deleter_type>;
350  std::shared_ptr<SubscriptionIntraProcessT> subscription_intra_process_;
351 };
352 
353 } // namespace rclcpp
354 
355 #endif // RCLCPP__SUBSCRIPTION_HPP_
rclcpp::Subscription::create_serialized_message
std::shared_ptr< rclcpp::SerializedMessage > create_serialized_message() override
Borrow a new serialized message.
Definition: subscription.hpp:260
resolve_intra_process_buffer_type.hpp
RMW_QOS_POLICY_HISTORY_KEEP_ALL
RMW_QOS_POLICY_HISTORY_KEEP_ALL
rclcpp::SubscriptionBase::setup_intra_process
void setup_intra_process(uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm)
Implemenation detail.
exceptions.hpp
std::shared_ptr< const rcl_interfaces::msg::ParameterEvent >
rclcpp::Subscription::handle_message
void handle_message(std::shared_ptr< void > &message, const rclcpp::MessageInfo &message_info) override
Check if we need to handle the message, and execute the callback if we do.
Definition: subscription.hpp:266
message_info.hpp
std::move
T move(T... args)
RCLCPP_DISABLE_COPY
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
rmw.h
rmw_message_info_t::publisher_gid
rmw_gid_t publisher_gid
rclcpp::QoS::get_rmw_qos_profile
rmw_qos_profile_t & get_rmw_qos_profile()
Return the rmw qos profile.
rclcpp::Subscription::SubscriptionTopicStatisticsSharedPtr
std::shared_ptr< rclcpp::topic_statistics::SubscriptionTopicStatistics< CallbackMessageT > > SubscriptionTopicStatisticsSharedPtr
Definition: subscription.hpp:80
error_handling.h
rclcpp::Time
Definition: time.hpp:31
rclcpp::SubscriptionBase::get_subscription_handle
std::shared_ptr< rcl_subscription_t > get_subscription_handle()
rclcpp::SubscriptionOptionsWithAllocator
Structure containing optional configuration for Subscriptions.
Definition: subscription_options.hpp:87
rclcpp::SubscriptionBase
Definition: subscription_base.hpp:60
rmw_qos_profile_t::durability
enum rmw_qos_durability_policy_t durability
rclcpp::Subscription::return_serialized_message
void return_serialized_message(std::shared_ptr< rclcpp::SerializedMessage > &message) override
Return the borrowed serialized message.
Definition: subscription.hpp:321
subscription.h
subscription_base.hpp
rclcpp::SubscriptionBase::default_incompatible_qos_callback
void default_incompatible_qos_callback(QOSRequestedIncompatibleQoSInfo &info) const
rclcpp
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
rclcpp::Subscription
Subscription implementation, templated on the type of message this subscription receives.
Definition: subscription.hpp:69
rclcpp::experimental::IntraProcessManager::add_subscription
uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
rclcpp::node_interfaces::NodeBaseInterface
Pure virtual interface class for the NodeBase part of the Node API.
Definition: node_base_interface.hpp:36
rclcpp::QoS
Encapsulation of Quality of Service settings.
Definition: qos.hpp:110
RCLCPP_SMART_PTR_DEFINITIONS
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
subscription_traits.hpp
macros.hpp
rclcpp::Subscription::use_take_shared_method
bool use_take_shared_method() const
Definition: subscription.hpp:327
resolve_use_intra_process.hpp
logging.hpp
node_base_interface.hpp
rclcpp::allocator::AllocRebind
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
rclcpp::Subscription< rcl_interfaces::msg::ParameterEvent >::MessageDeleter
allocator::Deleter< MessageAllocator, rcl_interfaces::msg::ParameterEvent > MessageDeleter
Definition: subscription.hpp:76
rclcpp::MessageInfo::get_rmw_message_info
const rmw_message_info_t & get_rmw_message_info() const
Return the message info as the underlying rmw message info type.
expand_topic_or_service_name.hpp
rclcpp::Subscription< rcl_interfaces::msg::ParameterEvent >::MessageAllocatorTraits
allocator::AllocRebind< rcl_interfaces::msg::ParameterEvent, std::allocator< void > > MessageAllocatorTraits
Definition: subscription.hpp:74
std::chrono::time_point
rclcpp::experimental::SubscriptionIntraProcess
Definition: subscription_intra_process.hpp:47
rclcpp::AnySubscriptionCallback
Definition: any_subscription_callback.hpp:101
rclcpp::UnsupportedEventTypeException
Definition: qos_event.hpp:69
rmw_qos_profile_t::history
enum rmw_qos_history_policy_t history
std::invalid_argument
rclcpp::Subscription::take
bool take(CallbackMessageT &message_out, rclcpp::MessageInfo &message_info_out)
Take the next message from the inter-process subscription.
Definition: subscription.hpp:244
intra_process_manager.hpp
any_subscription_callback.hpp
rclcpp::node_interfaces::NodeTopicsInterface
Pure virtual interface class for the NodeTopics part of the Node API.
Definition: node_topics_interface.hpp:41
rclcpp::SubscriptionBase::take_type_erased
bool take_type_erased(void *message_out, rclcpp::MessageInfo &message_info_out)
Take the next inter-process message from the subscription as a type erased pointer.
subscription_options.hpp
rclcpp::MessageInfo
Additional meta data about messages taken from subscriptions.
Definition: message_info.hpp:26
rclcpp::detail::resolve_use_intra_process
bool resolve_use_intra_process(const OptionsT &options, const NodeBaseT &node_base)
Return whether or not intra process is enabled, resolving "NodeDefault" if needed.
Definition: resolve_use_intra_process.hpp:31
type_support_decl.hpp
visibility_control.hpp
rclcpp::allocator::Deleter
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
std
rclcpp::message_memory_strategy::MessageMemoryStrategy
Default allocation strategy for messages received by subscriptions.
Definition: message_memory_strategy.hpp:41
rmw_qos_incompatible_event_status_t
rclcpp::Subscription::handle_loaned_message
void handle_loaned_message(void *loaned_message, const rclcpp::MessageInfo &message_info) override
Definition: subscription.hpp:294
rmw_qos_profile_t
std::allocator
rclcpp::SubscriptionBase::matches_any_intra_process_publishers
bool matches_any_intra_process_publishers(const rmw_gid_t *sender_gid) const
message_memory_strategy.hpp
subscription_intra_process.hpp
rclcpp::experimental::IntraProcessManager
This class performs intra process communication between nodes.
Definition: intra_process_manager.hpp:91
rclcpp::Subscription::post_init_setup
void post_init_setup(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rclcpp::QoS &qos, const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options)
Called after construction to continue setup that requires shared_from_this().
Definition: subscription.hpp:215
rclcpp::Subscription::return_message
void return_message(std::shared_ptr< void > &message) override
Return the borrowed message.
Definition: subscription.hpp:310
std::unique_ptr
rclcpp::SubscriptionBase::add_event_handler
void add_event_handler(const EventCallbackT &callback, const rcl_subscription_event_type_t event_type)
Definition: subscription_base.hpp:282
waitable.hpp
RMW_QOS_POLICY_DURABILITY_VOLATILE
RMW_QOS_POLICY_DURABILITY_VOLATILE
subscription_topic_statistics.hpp
rclcpp::SubscriptionBase::get_actual_qos
rclcpp::QoS get_actual_qos() const
Get the actual QoS settings, after the defaults have been determined.
rmw_qos_profile_t::depth
size_t depth
rclcpp::Subscription::create_message
std::shared_ptr< void > create_message() override
Borrow a new message.
Definition: subscription.hpp:250
rclcpp::detail::resolve_intra_process_buffer_type
rclcpp::IntraProcessBufferType resolve_intra_process_buffer_type(const rclcpp::IntraProcessBufferType buffer_type, const rclcpp::AnySubscriptionCallback< CallbackMessageT, AllocatorT > &any_subscription_callback)
Return the buffer type, resolving the "CallbackDefault" type to an actual type if needed.
Definition: resolve_intra_process_buffer_type.hpp:32
rclcpp::Subscription< rcl_interfaces::msg::ParameterEvent >::MessageAllocator
typename MessageAllocatorTraits::allocator_type MessageAllocator
Definition: subscription.hpp:75
std::chrono::system_clock::now
T now(T... args)