rclcpp  beta1
C++ ROS Client Library API
subscription.hpp
Go to the documentation of this file.
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__SUBSCRIPTION_HPP_
16 #define RCLCPP__SUBSCRIPTION_HPP_
17 
18 #include <rmw/error_handling.h>
19 #include <rmw/rmw.h>
20 
21 #include <functional>
22 #include <iostream>
23 #include <memory>
24 #include <sstream>
25 #include <string>
26 
27 #include "rcl/error_handling.h"
28 #include "rcl/subscription.h"
29 
30 #include "rcl_interfaces/msg/intra_process_message.hpp"
31 
32 #include "rclcpp/macros.hpp"
37 
38 namespace rclcpp
39 {
40 
41 namespace node_interfaces
42 {
43 class NodeTopicsInterface;
44 } // namespace node_interfaces
45 
46 namespace subscription
47 {
48 
52 {
53 public:
55 
56 
57 
65  std::shared_ptr<rcl_node_t> node_handle,
66  const rosidl_message_type_support_t & type_support_handle,
67  const std::string & topic_name,
68  const rcl_subscription_options_t & subscription_options);
69 
72  virtual ~SubscriptionBase();
73 
76  const char *
77  get_topic_name() const;
78 
80  const rcl_subscription_t *
81  get_subscription_handle() const;
82 
84  virtual const rcl_subscription_t *
85  get_intra_process_subscription_handle() const;
86 
88 
89  virtual std::shared_ptr<void>
90  create_message() = 0;
92 
96  virtual void
97  handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info) = 0;
98 
100 
101  virtual void
102  return_message(std::shared_ptr<void> & message) = 0;
103 
104  virtual void
105  handle_intra_process_message(
106  rcl_interfaces::msg::IntraProcessMessage & ipm,
107  const rmw_message_info_t & message_info) = 0;
108 
109 protected:
110  rcl_subscription_t intra_process_subscription_handle_ = rcl_get_zero_initialized_subscription();
113 
114 private:
115  RCLCPP_DISABLE_COPY(SubscriptionBase)
116 };
117 
119 
121 template<typename MessageT, typename Alloc = std::allocator<void>>
123 {
125 
126 public:
128  using MessageAlloc = typename MessageAllocTraits::allocator_type;
131 
133 
134 
135 
145  std::shared_ptr<rcl_node_t> node_handle,
146  const std::string & topic_name,
147  const rcl_subscription_options_t & subscription_options,
148  AnySubscriptionCallback<MessageT, Alloc> callback,
149  typename message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
150  memory_strategy = message_memory_strategy::MessageMemoryStrategy<MessageT,
151  Alloc>::create_default())
153  node_handle,
154  *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
155  topic_name,
156  subscription_options),
157  any_callback_(callback),
158  message_memory_strategy_(memory_strategy),
159  get_intra_process_message_callback_(nullptr),
160  matches_any_intra_process_publishers_(nullptr)
161  {}
162 
164 
170  Alloc>::SharedPtr message_memory_strategy)
171  {
172  message_memory_strategy_ = message_memory_strategy;
173  }
175  {
176  /* The default message memory strategy provides a dynamically allocated message on each call to
177  * create_message, though alternative memory strategies that re-use a preallocated message may be
178  * used (see rclcpp/strategies/message_pool_memory_strategy.hpp).
179  */
180  return message_memory_strategy_->borrow_message();
181  }
182 
183  void handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info)
184  {
185  if (matches_any_intra_process_publishers_) {
186  if (matches_any_intra_process_publishers_(&message_info.publisher_gid)) {
187  // In this case, the message will be delivered via intra process and
188  // we should ignore this copy of the message.
189  return;
190  }
191  }
192  auto typed_message = std::static_pointer_cast<MessageT>(message);
193  any_callback_.dispatch(typed_message, message_info);
194  }
195 
197 
199  {
200  auto typed_message = std::static_pointer_cast<MessageT>(message);
201  message_memory_strategy_->return_message(typed_message);
202  }
203 
205  rcl_interfaces::msg::IntraProcessMessage & ipm,
206  const rmw_message_info_t & message_info)
207  {
208  if (!get_intra_process_message_callback_) {
209  // throw std::runtime_error(
210  // "handle_intra_process_message called before setup_intra_process");
211  // TODO(wjwwood): for now, this could mean that intra process was just not enabled.
212  // However, this can only really happen if this node has it disabled, but the other doesn't.
213  return;
214  }
215  MessageUniquePtr msg;
216  get_intra_process_message_callback_(
217  ipm.publisher_id,
218  ipm.message_sequence,
219  intra_process_subscription_id_,
220  msg);
221  if (!msg) {
222  // This either occurred because the publisher no longer exists or the
223  // message requested is no longer being stored.
224  // TODO(wjwwood): should we notify someone of this? log error, log warning?
225  return;
226  }
227  any_callback_.dispatch_intra_process(msg, message_info);
228  }
229 
230  using GetMessageCallbackType =
233 
236  uint64_t intra_process_subscription_id,
237  GetMessageCallbackType get_message_callback,
238  MatchesAnyPublishersCallbackType matches_any_publisher_callback,
239  const rcl_subscription_options_t & intra_process_options)
240  {
242  &intra_process_subscription_handle_, node_handle_.get(),
244  (std::string(get_topic_name()) + "__intra").c_str(),
245  &intra_process_options) != RCL_RET_OK)
246  {
247  // *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
248  throw std::runtime_error(
249  std::string("could not create intra process subscription: ") + rcl_get_error_string_safe());
250  // *INDENT-ON*
251  }
252 
253  intra_process_subscription_id_ = intra_process_subscription_id;
254  get_intra_process_message_callback_ = get_message_callback;
255  matches_any_intra_process_publishers_ = matches_any_publisher_callback;
256  }
257 
259  const rcl_subscription_t *
261  {
262  if (!get_intra_process_message_callback_) {
263  return nullptr;
264  }
265  return &intra_process_subscription_handle_;
266  }
267 
268 private:
270 
273  message_memory_strategy_;
274 
275  GetMessageCallbackType get_intra_process_message_callback_;
276  MatchesAnyPublishersCallbackType matches_any_intra_process_publishers_;
277  uint64_t intra_process_subscription_id_;
278 };
279 
280 } // namespace subscription
281 } // namespace rclcpp
282 
283 #endif // RCLCPP__SUBSCRIPTION_HPP_
const rcl_subscription_t * get_intra_process_subscription_handle() const
Implemenation detail.
Definition: subscription.hpp:260
Definition: any_subscription_callback.hpp:37
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
allocator::AllocRebind< MessageT, Alloc > MessageAllocTraits
Definition: subscription.hpp:127
Definition: allocator_common.hpp:24
rcl_ret_t rcl_subscription_init(rcl_subscription_t *subscription, const rcl_node_t *node, const rosidl_message_type_support_t *type_support, const char *topic_name, const rcl_subscription_options_t *options)
void return_message(std::shared_ptr< void > &message)
Return the loaned message.
Definition: subscription.hpp:198
allocator::Deleter< MessageAlloc, MessageT > MessageDeleter
Definition: subscription.hpp:129
#define RCL_RET_OK
Definition: subscription.hpp:51
Subscription implementation, templated on the type of message this subscription receives.
Definition: subscription.hpp:122
void set_message_memory_strategy(typename message_memory_strategy::MessageMemoryStrategy< MessageT, Alloc >::SharedPtr message_memory_strategy)
Support dynamically setting the message memory strategy.
Definition: subscription.hpp:168
void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage &ipm, const rmw_message_info_t &message_info)
Definition: subscription.hpp:204
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
const rosidl_message_type_support_t * get_intra_process_message_msg_type_support()
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
typename MessageAllocTraits::allocator_type MessageAlloc
Definition: subscription.hpp:128
std::shared_ptr< rcl_node_t > node_handle_
Definition: subscription.hpp:112
Pure virtual interface class for the NodeTopics part of the Node API.
Definition: node_topics_interface.hpp:38
std::shared_ptr< void > create_message()
Borrow a new message.
Definition: subscription.hpp:174
void handle_message(std::shared_ptr< void > &message, const rmw_message_info_t &message_info)
Check if we need to handle the message, and execute the callback if we do.
Definition: subscription.hpp:183
T static_pointer_cast(T... args)
#define rcl_get_error_string_safe
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
Default allocation strategy for messages received by subscriptions.
Definition: message_memory_strategy.hpp:33
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
rmw_gid_t publisher_gid
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
void setup_intra_process(uint64_t intra_process_subscription_id, GetMessageCallbackType get_message_callback, MatchesAnyPublishersCallbackType matches_any_publisher_callback, const rcl_subscription_options_t &intra_process_options)
Implemenation detail.
Definition: subscription.hpp:235
rcl_subscription_t rcl_get_zero_initialized_subscription(void)