rclcpp  master
C++ ROS Client Library API
subscription.hpp
Go to the documentation of this file.
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__SUBSCRIPTION_HPP_
16 #define RCLCPP__SUBSCRIPTION_HPP_
17 
18 #include <rmw/error_handling.h>
19 #include <rmw/rmw.h>
20 
21 #include <functional>
22 #include <iostream>
23 #include <memory>
24 #include <sstream>
25 #include <string>
26 
27 #include "rcl/error_handling.h"
28 #include "rcl/subscription.h"
29 
30 #include "rcl_interfaces/msg/intra_process_message.hpp"
31 
32 #include "rclcpp/exceptions.hpp"
33 #include "rclcpp/macros.hpp"
39 
40 namespace rclcpp
41 {
42 
43 namespace node_interfaces
44 {
45 class NodeTopicsInterface;
46 } // namespace node_interfaces
47 
48 namespace subscription
49 {
50 
54 {
55 public:
57 
58 
59 
67  std::shared_ptr<rcl_node_t> node_handle,
68  const rosidl_message_type_support_t & type_support_handle,
69  const std::string & topic_name,
70  const rcl_subscription_options_t & subscription_options);
71 
74  virtual ~SubscriptionBase();
75 
78  const char *
79  get_topic_name() const;
80 
82  const rcl_subscription_t *
83  get_subscription_handle() const;
84 
86  virtual const rcl_subscription_t *
87  get_intra_process_subscription_handle() const;
88 
90 
91  virtual std::shared_ptr<void>
92  create_message() = 0;
94 
98  virtual void
99  handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info) = 0;
100 
102 
103  virtual void
104  return_message(std::shared_ptr<void> & message) = 0;
105 
106  virtual void
107  handle_intra_process_message(
108  rcl_interfaces::msg::IntraProcessMessage & ipm,
109  const rmw_message_info_t & message_info) = 0;
110 
111 protected:
112  rcl_subscription_t intra_process_subscription_handle_ = rcl_get_zero_initialized_subscription();
114  std::shared_ptr<rcl_node_t> node_handle_;
115 
116 private:
117  RCLCPP_DISABLE_COPY(SubscriptionBase)
118 };
119 
121 
123 template<typename MessageT, typename Alloc = std::allocator<void>>
125 {
127 
128 public:
130  using MessageAlloc = typename MessageAllocTraits::allocator_type;
132  using MessageUniquePtr = std::unique_ptr<MessageT, MessageDeleter>;
133 
135 
136 
137 
147  std::shared_ptr<rcl_node_t> node_handle,
148  const std::string & topic_name,
149  const rcl_subscription_options_t & subscription_options,
150  AnySubscriptionCallback<MessageT, Alloc> callback,
151  typename message_memory_strategy::MessageMemoryStrategy<MessageT, Alloc>::SharedPtr
152  memory_strategy = message_memory_strategy::MessageMemoryStrategy<MessageT,
153  Alloc>::create_default())
155  node_handle,
156  *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
157  topic_name,
158  subscription_options),
159  any_callback_(callback),
160  message_memory_strategy_(memory_strategy),
161  get_intra_process_message_callback_(nullptr),
162  matches_any_intra_process_publishers_(nullptr)
163  {}
164 
166 
172  Alloc>::SharedPtr message_memory_strategy)
173  {
174  message_memory_strategy_ = message_memory_strategy;
175  }
176  std::shared_ptr<void> create_message()
177  {
178  /* The default message memory strategy provides a dynamically allocated message on each call to
179  * create_message, though alternative memory strategies that re-use a preallocated message may be
180  * used (see rclcpp/strategies/message_pool_memory_strategy.hpp).
181  */
182  return message_memory_strategy_->borrow_message();
183  }
184 
185  void handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info)
186  {
187  if (matches_any_intra_process_publishers_) {
188  if (matches_any_intra_process_publishers_(&message_info.publisher_gid)) {
189  // In this case, the message will be delivered via intra process and
190  // we should ignore this copy of the message.
191  return;
192  }
193  }
194  auto typed_message = std::static_pointer_cast<MessageT>(message);
195  any_callback_.dispatch(typed_message, message_info);
196  }
197 
199 
200  void return_message(std::shared_ptr<void> & message)
201  {
202  auto typed_message = std::static_pointer_cast<MessageT>(message);
203  message_memory_strategy_->return_message(typed_message);
204  }
205 
207  rcl_interfaces::msg::IntraProcessMessage & ipm,
208  const rmw_message_info_t & message_info)
209  {
210  if (!get_intra_process_message_callback_) {
211  // throw std::runtime_error(
212  // "handle_intra_process_message called before setup_intra_process");
213  // TODO(wjwwood): for now, this could mean that intra process was just not enabled.
214  // However, this can only really happen if this node has it disabled, but the other doesn't.
215  return;
216  }
217  MessageUniquePtr msg;
218  get_intra_process_message_callback_(
219  ipm.publisher_id,
220  ipm.message_sequence,
221  intra_process_subscription_id_,
222  msg);
223  if (!msg) {
224  // This either occurred because the publisher no longer exists or the
225  // message requested is no longer being stored.
226  // TODO(wjwwood): should we notify someone of this? log error, log warning?
227  return;
228  }
229  any_callback_.dispatch_intra_process(msg, message_info);
230  }
231 
232  using GetMessageCallbackType =
233  std::function<void(uint64_t, uint64_t, uint64_t, MessageUniquePtr &)>;
234  using MatchesAnyPublishersCallbackType = std::function<bool(const rmw_gid_t *)>;
235 
238  uint64_t intra_process_subscription_id,
239  GetMessageCallbackType get_message_callback,
240  MatchesAnyPublishersCallbackType matches_any_publisher_callback,
241  const rcl_subscription_options_t & intra_process_options)
242  {
243  std::string intra_process_topic_name = std::string(get_topic_name()) + "/_intra";
245  &intra_process_subscription_handle_,
246  node_handle_.get(),
248  intra_process_topic_name.c_str(),
249  &intra_process_options);
250  if (ret != RCL_RET_OK) {
251  if (ret == RCL_RET_TOPIC_NAME_INVALID) {
252  auto rcl_node_handle = node_handle_.get();
253  // this will throw on any validation problem
254  rcl_reset_error();
256  intra_process_topic_name,
257  rcl_node_get_name(rcl_node_handle),
258  rcl_node_get_namespace(rcl_node_handle));
259  }
260 
261  rclcpp::exceptions::throw_from_rcl_error(ret, "could not create intra process subscription");
262  }
263 
264  intra_process_subscription_id_ = intra_process_subscription_id;
265  get_intra_process_message_callback_ = get_message_callback;
266  matches_any_intra_process_publishers_ = matches_any_publisher_callback;
267  }
268 
270  const rcl_subscription_t *
272  {
273  if (!get_intra_process_message_callback_) {
274  return nullptr;
275  }
276  return &intra_process_subscription_handle_;
277  }
278 
279 private:
281 
284  message_memory_strategy_;
285 
286  GetMessageCallbackType get_intra_process_message_callback_;
287  MatchesAnyPublishersCallbackType matches_any_intra_process_publishers_;
288  uint64_t intra_process_subscription_id_;
289 };
290 
291 } // namespace subscription
292 } // namespace rclcpp
293 
294 #endif // RCLCPP__SUBSCRIPTION_HPP_
allocator::Deleter< MessageAlloc, MessageT > MessageDeleter
Definition: subscription.hpp:131
typename MessageAllocTraits::allocator_type MessageAlloc
Definition: subscription.hpp:130
Default allocation strategy for messages received by subscriptions.
Definition: message_memory_strategy.hpp:33
Subscription implementation, templated on the type of message this subscription receives.
Definition: subscription.hpp:124
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
#define rcl_reset_error
rmw_ret_t rcl_ret_t
Definition: any_subscription_callback.hpp:37
std::shared_ptr< void > create_message()
Borrow a new message.
Definition: subscription.hpp:176
std::unique_ptr< MessageT, MessageDeleter > MessageUniquePtr
Definition: subscription.hpp:132
Definition: allocator_common.hpp:24
Definition: parameter.hpp:235
std::function< void(uint64_t, uint64_t, uint64_t, MessageUniquePtr &)> GetMessageCallbackType
Definition: subscription.hpp:233
void setup_intra_process(uint64_t intra_process_subscription_id, GetMessageCallbackType get_message_callback, MatchesAnyPublishersCallbackType matches_any_publisher_callback, const rcl_subscription_options_t &intra_process_options)
Implemenation detail.
Definition: subscription.hpp:237
rcl_ret_t rcl_subscription_init(rcl_subscription_t *subscription, const rcl_node_t *node, const rosidl_message_type_support_t *type_support, const char *topic_name, const rcl_subscription_options_t *options)
std::string expand_topic_or_service_name(const std::string &name, const std::string &node_name, const std::string &namespace_, bool is_service=false)
Expand a topic or service name and throw if it is not valid.
#define RCL_RET_OK
void throw_from_rcl_error(rcl_ret_t ret, const std::string &prefix="", const rcl_error_state_t *error_state=nullptr, void(*reset_error)()=rcl_reset_error)
Throw a C++ std::exception which was created based on an rcl error.
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
const rcl_subscription_t * get_intra_process_subscription_handle() const
Implemenation detail.
Definition: subscription.hpp:271
const rosidl_message_type_support_t * get_intra_process_message_msg_type_support()
void set_message_memory_strategy(typename message_memory_strategy::MessageMemoryStrategy< MessageT, Alloc >::SharedPtr message_memory_strategy)
Support dynamically setting the message memory strategy.
Definition: subscription.hpp:170
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
void return_message(std::shared_ptr< void > &message)
Return the loaned message.
Definition: subscription.hpp:200
void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage &ipm, const rmw_message_info_t &message_info)
Definition: subscription.hpp:206
Definition: subscription.hpp:53
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
rmw_gid_t publisher_gid
Pure virtual interface class for the NodeTopics part of the Node API.
Definition: node_topics_interface.hpp:38
allocator::AllocRebind< MessageT, Alloc > MessageAllocTraits
Definition: subscription.hpp:129
std::shared_ptr< rcl_node_t > node_handle_
Definition: subscription.hpp:114
#define RCL_RET_TOPIC_NAME_INVALID
const char * rcl_node_get_name(const rcl_node_t *node)
const char * rcl_node_get_namespace(const rcl_node_t *node)
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
std::function< bool(const rmw_gid_t *)> MatchesAnyPublishersCallbackType
Definition: subscription.hpp:234
void handle_message(std::shared_ptr< void > &message, const rmw_message_info_t &message_info)
Check if we need to handle the message, and execute the callback if we do.
Definition: subscription.hpp:185
rcl_subscription_t rcl_get_zero_initialized_subscription(void)