rclcpp  master
C++ ROS Client Library API
subscription.hpp
Go to the documentation of this file.
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__SUBSCRIPTION_HPP_
16 #define RCLCPP__SUBSCRIPTION_HPP_
17 
18 #include <rmw/error_handling.h>
19 #include <rmw/rmw.h>
20 
21 #include <functional>
22 #include <iostream>
23 #include <memory>
24 #include <sstream>
25 #include <string>
26 #include <utility>
27 
28 
29 #include "rcl/error_handling.h"
30 #include "rcl/subscription.h"
31 
35 #include "rclcpp/exceptions.hpp"
39 #include "rclcpp/logging.hpp"
40 #include "rclcpp/macros.hpp"
48 #include "rclcpp/waitable.hpp"
49 #include "tracetools/tracetools.h"
50 
51 namespace rclcpp
52 {
53 
54 namespace node_interfaces
55 {
56 class NodeTopicsInterface;
57 } // namespace node_interfaces
58 
60 template<
61  typename CallbackMessageT,
62  typename AllocatorT = std::allocator<void>,
63  typename MessageMemoryStrategyT = rclcpp::message_memory_strategy::MessageMemoryStrategy<
64  CallbackMessageT,
65  AllocatorT
66  >>
68 {
70 
71 public:
73  using MessageAllocator = typename MessageAllocatorTraits::allocator_type;
77 
79 
80 
81 
94  rclcpp::node_interfaces::NodeBaseInterface * node_base,
95  const rosidl_message_type_support_t & type_support_handle,
96  const std::string & topic_name,
97  const rclcpp::QoS & qos,
98  AnySubscriptionCallback<CallbackMessageT, AllocatorT> callback,
99  const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
100  typename MessageMemoryStrategyT::SharedPtr message_memory_strategy)
102  node_base,
103  type_support_handle,
104  topic_name,
105  options.template to_rcl_subscription_options<CallbackMessageT>(qos),
106  rclcpp::subscription_traits::is_serialized_subscription_argument<CallbackMessageT>::value),
107  any_callback_(callback),
108  options_(options),
109  message_memory_strategy_(message_memory_strategy)
110  {
111  if (options.event_callbacks.deadline_callback) {
112  this->add_event_handler(
113  options.event_callbacks.deadline_callback,
115  }
116  if (options.event_callbacks.liveliness_callback) {
117  this->add_event_handler(
118  options.event_callbacks.liveliness_callback,
120  }
121 
122  // Setup intra process publishing if requested.
123  if (rclcpp::detail::resolve_use_intra_process(options, *node_base)) {
125 
126  // Check if the QoS is compatible with intra-process.
127  if (qos.get_rmw_qos_profile().history == RMW_QOS_POLICY_HISTORY_KEEP_ALL) {
128  throw std::invalid_argument(
129  "intraprocess communication is not allowed with keep all history qos policy");
130  }
131  if (qos.get_rmw_qos_profile().depth == 0) {
132  throw std::invalid_argument(
133  "intraprocess communication is not allowed with 0 depth qos policy");
134  }
135  if (qos.get_rmw_qos_profile().durability != RMW_QOS_POLICY_DURABILITY_VOLATILE) {
136  throw std::invalid_argument(
137  "intraprocess communication allowed only with volatile durability");
138  }
139 
140  // First create a SubscriptionIntraProcess which will be given to the intra-process manager.
141  auto context = node_base->get_context();
142  auto subscription_intra_process = std::make_shared<
144  CallbackMessageT,
145  AllocatorT,
146  typename MessageUniquePtr::deleter_type
147  >>(
148  callback,
149  options.get_allocator(),
150  context,
151  this->get_topic_name(), // important to get like this, as it has the fully-qualified name
152  qos.get_rmw_qos_profile(),
153  resolve_intra_process_buffer_type(options.intra_process_buffer_type, callback)
154  );
155  TRACEPOINT(
156  rclcpp_subscription_init,
157  (const void *)get_subscription_handle().get(),
158  (const void *)subscription_intra_process.get());
159 
160  // Add it to the intra process manager.
162  auto ipm = context->get_sub_context<IntraProcessManager>();
163  uint64_t intra_process_subscription_id = ipm->add_subscription(subscription_intra_process);
164  this->setup_intra_process(intra_process_subscription_id, ipm);
165  }
166 
167  TRACEPOINT(
168  rclcpp_subscription_init,
169  (const void *)get_subscription_handle().get(),
170  (const void *)this);
171  TRACEPOINT(
172  rclcpp_subscription_callback_added,
173  (const void *)this,
174  (const void *)&any_callback_);
175  // The callback object gets copied, so if registration is done too early/before this point
176  // (e.g. in `AnySubscriptionCallback::set()`), its address won't match any address used later
177  // in subsequent tracepoints.
178  any_callback_.register_callback_for_tracing();
179  }
180 
184  const rclcpp::QoS & qos,
186  {
187  (void)node_base;
188  (void)qos;
189  (void)options;
190  }
191 
193 
198  typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT,
199  AllocatorT>::SharedPtr message_memory_strategy)
200  {
201  message_memory_strategy_ = message_memory_strategy;
202  }
203 
205  {
206  /* The default message memory strategy provides a dynamically allocated message on each call to
207  * create_message, though alternative memory strategies that re-use a preallocated message may be
208  * used (see rclcpp/strategies/message_pool_memory_strategy.hpp).
209  */
210  return message_memory_strategy_->borrow_message();
211  }
212 
214  {
215  return message_memory_strategy_->borrow_serialized_message();
216  }
217 
219  std::shared_ptr<void> & message, const rmw_message_info_t & message_info) override
220  {
221  if (matches_any_intra_process_publishers(&message_info.publisher_gid)) {
222  // In this case, the message will be delivered via intra process and
223  // we should ignore this copy of the message.
224  return;
225  }
226  auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
227  any_callback_.dispatch(typed_message, message_info);
228  }
229 
230  void
232  void * loaned_message, const rmw_message_info_t & message_info) override
233  {
234  auto typed_message = static_cast<CallbackMessageT *>(loaned_message);
235  // message is loaned, so we have to make sure that the deleter does not deallocate the message
237  typed_message, [](CallbackMessageT * msg) {(void) msg;});
238  any_callback_.dispatch(sptr, message_info);
239  }
240 
242 
243  void return_message(std::shared_ptr<void> & message) override
244  {
245  auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
246  message_memory_strategy_->return_message(typed_message);
247  }
248 
250  {
251  message_memory_strategy_->return_serialized_message(message);
252  }
253 
255  {
256  return any_callback_.use_take_shared_method();
257  }
258 
259 private:
261 
264 
270  message_memory_strategy_;
271 };
272 
273 } // namespace rclcpp
274 
275 #endif // RCLCPP__SUBSCRIPTION_HPP_
allocator::Deleter< MessageAllocator, CallbackMessageT > MessageDeleter
Definition: subscription.hpp:74
std::shared_ptr< rcl_serialized_message_t > create_serialized_message() override
Borrow a new serialized message.
Definition: subscription.hpp:213
Default allocation strategy for messages received by subscriptions.
Definition: message_memory_strategy.hpp:40
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
Encapsulation of Quality of Service settings.
Definition: qos.hpp:55
RMW_QOS_POLICY_HISTORY_KEEP_ALL
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
allocator::AllocRebind< CallbackMessageT, AllocatorT > MessageAllocatorTraits
Definition: subscription.hpp:72
void handle_loaned_message(void *loaned_message, const rmw_message_info_t &message_info) override
Definition: subscription.hpp:231
virtual void return_message(std::shared_ptr< MessageT > &msg)
Release ownership of the message, which will deallocate it if it has no more owners.
Definition: message_memory_strategy.hpp:125
Definition: subscription_intra_process.hpp:45
Subscription implementation, templated on the type of message this subscription receives.
Definition: subscription.hpp:67
void return_message(std::shared_ptr< void > &message) override
Return the borrowed message.
Definition: subscription.hpp:243
void post_init_setup(rclcpp::node_interfaces::NodeBaseInterface *node_base, const rclcpp::QoS &qos, const rclcpp::SubscriptionOptionsWithAllocator< AllocatorT > &options)
Called after construction to continue setup that requires shared_from_this().
Definition: subscription.hpp:182
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
RMW_QOS_POLICY_DURABILITY_VOLATILE
bool resolve_use_intra_process(const OptionsT &options, const NodeBaseT &node_base)
Return whether or not intra process is enabled, resolving "NodeDefault" if needed.
Definition: resolve_use_intra_process.hpp:31
Definition: any_subscription_callback.hpp:36
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED
This class performs intra process communication between nodes.
Definition: intra_process_manager.hpp:91
Pure virtual interface class for the NodeBase part of the Node API.
Definition: node_base_interface.hpp:36
std::shared_ptr< void > create_message() override
Borrow a new message.
Definition: subscription.hpp:204
T static_pointer_cast(T... args)
typename MessageAllocatorTraits::allocator_type MessageAllocator
Definition: subscription.hpp:73
rclcpp::IntraProcessBufferType resolve_intra_process_buffer_type(const rclcpp::IntraProcessBufferType buffer_type, const rclcpp::AnySubscriptionCallback< CallbackMessageT, AllocatorT > &any_subscription_callback)
Return the buffer type, resolving the "CallbackDefault" type to an actual type if needed...
Definition: resolve_intra_process_buffer_type.hpp:32
Structure containing optional configuration for Subscriptions.
Definition: subscription_options.hpp:58
uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
rmw_gid_t publisher_gid
T make_shared(T... args)
Pure virtual interface class for the NodeTopics part of the Node API.
Definition: node_topics_interface.hpp:38
Definition: subscription_base.hpp:54
void return_serialized_message(std::shared_ptr< rcl_serialized_message_t > &message) override
Return the message borrowed in create_serialized_message.
Definition: subscription.hpp:249
void handle_message(std::shared_ptr< void > &message, const rmw_message_info_t &message_info) override
Check if we need to handle the message, and execute the callback if we do.
Definition: subscription.hpp:218
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
bool use_take_shared_method() const
Definition: subscription.hpp:254
RCL_SUBSCRIPTION_LIVELINESS_CHANGED
void set_message_memory_strategy(typename message_memory_strategy::MessageMemoryStrategy< CallbackMessageT, AllocatorT >::SharedPtr message_memory_strategy)
Support dynamically setting the message memory strategy.
Definition: subscription.hpp:197