rclcpp  master
C++ ROS Client Library API
subscription.hpp
Go to the documentation of this file.
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__SUBSCRIPTION_HPP_
16 #define RCLCPP__SUBSCRIPTION_HPP_
17 
18 #include <rmw/error_handling.h>
19 #include <rmw/rmw.h>
20 
21 #include <functional>
22 #include <iostream>
23 #include <memory>
24 #include <sstream>
25 #include <string>
26 #include <utility>
27 
28 
29 #include "rcl/error_handling.h"
30 #include "rcl/subscription.h"
31 
32 #include "rcl_interfaces/msg/intra_process_message.hpp"
33 
35 #include "rclcpp/exceptions.hpp"
38 #include "rclcpp/logging.hpp"
39 #include "rclcpp/macros.hpp"
45 #include "rclcpp/waitable.hpp"
46 
47 namespace rclcpp
48 {
49 
50 namespace node_interfaces
51 {
52 class NodeTopicsInterface;
53 } // namespace node_interfaces
54 
56 template<
57  typename CallbackMessageT,
58  typename Alloc = std::allocator<void>>
60 {
62 
63 public:
65  using MessageAlloc = typename MessageAllocTraits::allocator_type;
69 
71 
72 
73 
84  std::shared_ptr<rcl_node_t> node_handle,
85  const rosidl_message_type_support_t & type_support_handle,
86  const std::string & topic_name,
87  const rcl_subscription_options_t & subscription_options,
88  AnySubscriptionCallback<CallbackMessageT, Alloc> callback,
89  const SubscriptionEventCallbacks & event_callbacks,
90  typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, Alloc>::SharedPtr
91  memory_strategy = message_memory_strategy::MessageMemoryStrategy<CallbackMessageT,
92  Alloc>::create_default())
94  node_handle,
95  type_support_handle,
96  topic_name,
97  subscription_options,
98  rclcpp::subscription_traits::is_serialized_subscription_argument<CallbackMessageT>::value),
99  any_callback_(callback),
100  message_memory_strategy_(memory_strategy)
101  {
102  if (event_callbacks.deadline_callback) {
103  this->add_event_handler(event_callbacks.deadline_callback,
105  }
106  if (event_callbacks.liveliness_callback) {
107  this->add_event_handler(event_callbacks.liveliness_callback,
109  }
110  }
111 
113 
118  typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT,
119  Alloc>::SharedPtr message_memory_strategy)
120  {
121  message_memory_strategy_ = message_memory_strategy;
122  }
123 
125  {
126  /* The default message memory strategy provides a dynamically allocated message on each call to
127  * create_message, though alternative memory strategies that re-use a preallocated message may be
128  * used (see rclcpp/strategies/message_pool_memory_strategy.hpp).
129  */
130  return message_memory_strategy_->borrow_message();
131  }
132 
134  {
135  return message_memory_strategy_->borrow_serialized_message();
136  }
137 
138  void handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info)
139  {
140  if (matches_any_intra_process_publishers(&message_info.publisher_gid)) {
141  // In this case, the message will be delivered via intra process and
142  // we should ignore this copy of the message.
143  return;
144  }
145  auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
146  any_callback_.dispatch(typed_message, message_info);
147  }
148 
150 
152  {
153  auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
154  message_memory_strategy_->return_message(typed_message);
155  }
156 
158  {
159  message_memory_strategy_->return_serialized_message(message);
160  }
161 
163  rcl_interfaces::msg::IntraProcessMessage & ipm,
164  const rmw_message_info_t & message_info)
165  {
166  if (!use_intra_process_) {
167  // throw std::runtime_error(
168  // "handle_intra_process_message called before setup_intra_process");
169  // TODO(wjwwood): for now, this could mean that intra process was just not enabled.
170  // However, this can only really happen if this node has it disabled, but the other doesn't.
171  return;
172  }
173  if (any_callback_.use_take_shared_method()) {
175  take_intra_process_message(
176  ipm.publisher_id,
177  ipm.message_sequence,
178  intra_process_subscription_id_,
179  msg);
180  if (!msg) {
181  // This can happen when having two nodes in different process both using intraprocess
182  // communication. It could happen too if the publisher no longer exists or the requested
183  // message is not longer being stored.
184  // TODO(ivanpauno): Print a warn message in the last two cases described above,
185  // but not in the first one.
186  return;
187  }
188  any_callback_.dispatch_intra_process(msg, message_info);
189  } else {
190  MessageUniquePtr msg;
191  take_intra_process_message(
192  ipm.publisher_id,
193  ipm.message_sequence,
194  intra_process_subscription_id_,
195  msg);
196  if (!msg) {
197  // This can happen when having two nodes in different process both using intraprocess
198  // communication. It could happen too if the publisher no longer exists or the requested
199  // message is not longer being stored.
200  // TODO(ivanpauno): Print a warn message in the last two cases described above,
201  // but not in the first one.
202  return;
203  }
204  any_callback_.dispatch_intra_process(std::move(msg), message_info);
205  }
206  }
207 
211  {
212  if (!use_intra_process_) {
213  return nullptr;
214  }
215  return intra_process_subscription_handle_;
216  }
217 
218 private:
219  void
220  take_intra_process_message(
221  uint64_t publisher_id,
222  uint64_t message_sequence,
223  uint64_t subscription_id,
224  MessageUniquePtr & message)
225  {
226  auto ipm = weak_ipm_.lock();
227  if (!ipm) {
228  throw std::runtime_error(
229  "intra process take called after destruction of intra process manager");
230  }
231  ipm->template take_intra_process_message<CallbackMessageT, Alloc>(
232  publisher_id, message_sequence, subscription_id, message);
233  }
234 
235  void
236  take_intra_process_message(
237  uint64_t publisher_id,
238  uint64_t message_sequence,
239  uint64_t subscription_id,
240  ConstMessageSharedPtr & message)
241  {
242  auto ipm = weak_ipm_.lock();
243  if (!ipm) {
244  throw std::runtime_error(
245  "intra process take called after destruction of intra process manager");
246  }
247  ipm->template take_intra_process_message<CallbackMessageT, Alloc>(
248  publisher_id, message_sequence, subscription_id, message);
249  }
250 
251  bool
252  matches_any_intra_process_publishers(const rmw_gid_t * sender_gid)
253  {
254  if (!use_intra_process_) {
255  return false;
256  }
257  auto ipm = weak_ipm_.lock();
258  if (!ipm) {
259  throw std::runtime_error(
260  "intra process publisher check called "
261  "after destruction of intra process manager");
262  }
263  return ipm->matches_any_publishers(sender_gid);
264  }
265 
267 
270  message_memory_strategy_;
271 };
272 
273 } // namespace rclcpp
274 
275 #endif // RCLCPP__SUBSCRIPTION_HPP_
Default allocation strategy for messages received by subscriptions.
Definition: message_memory_strategy.hpp:40
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
std::shared_ptr< rcl_serialized_message_t > create_serialized_message()
Borrow a new serialized message.
Definition: subscription.hpp:133
void return_message(std::shared_ptr< void > &message)
Return the loaned message.
Definition: subscription.hpp:151
This header provides the get_node_topics_interface() template function.
Definition: allocator_common.hpp:24
void handle_message(std::shared_ptr< void > &message, const rmw_message_info_t &message_info)
Check if we need to handle the message, and execute the callback if we do.
Definition: subscription.hpp:138
allocator::Deleter< MessageAlloc, CallbackMessageT > MessageDeleter
Definition: subscription.hpp:66
Subscription implementation, templated on the type of message this subscription receives.
Definition: subscription.hpp:59
void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage &ipm, const rmw_message_info_t &message_info)
Definition: subscription.hpp:162
typename MessageAllocTraits::allocator_type MessageAlloc
Definition: subscription.hpp:65
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
Definition: any_subscription_callback.hpp:34
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
std::shared_ptr< void > create_message()
Borrow a new message.
Definition: subscription.hpp:124
RCL_SUBSCRIPTION_REQUESTED_DEADLINE_MISSED
allocator::AllocRebind< CallbackMessageT, Alloc > MessageAllocTraits
Definition: subscription.hpp:64
void return_serialized_message(std::shared_ptr< rcl_serialized_message_t > &message)
Return the message borrowed in create_serialized_message.
Definition: subscription.hpp:157
T static_pointer_cast(T... args)
T move(T... args)
const std::shared_ptr< rcl_subscription_t > get_intra_process_subscription_handle() const
Implemenation detail.
Definition: subscription.hpp:210
rmw_gid_t publisher_gid
Pure virtual interface class for the NodeTopics part of the Node API.
Definition: node_topics_interface.hpp:38
Definition: subscription_base.hpp:53
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
void set_message_memory_strategy(typename message_memory_strategy::MessageMemoryStrategy< CallbackMessageT, Alloc >::SharedPtr message_memory_strategy)
Support dynamically setting the message memory strategy.
Definition: subscription.hpp:117
RCL_SUBSCRIPTION_LIVELINESS_CHANGED
Contains callbacks for non-message events that a Subscription can receive from the middleware...
Definition: qos_event.hpp:49