rclcpp  master
C++ ROS Client Library API
subscription.hpp
Go to the documentation of this file.
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__SUBSCRIPTION_HPP_
16 #define RCLCPP__SUBSCRIPTION_HPP_
17 
18 #include <rmw/error_handling.h>
19 #include <rmw/rmw.h>
20 
21 #include <functional>
22 #include <iostream>
23 #include <memory>
24 #include <sstream>
25 #include <string>
26 
27 #include "rcl/error_handling.h"
28 #include "rcl/subscription.h"
29 
30 #include "rcl_interfaces/msg/intra_process_message.hpp"
31 
33 #include "rclcpp/exceptions.hpp"
35 #include "rclcpp/macros.hpp"
40 
41 namespace rclcpp
42 {
43 
44 namespace node_interfaces
45 {
46 class NodeTopicsInterface;
47 } // namespace node_interfaces
48 
52 {
53 public:
55 
56 
57 
65  std::shared_ptr<rcl_node_t> node_handle,
66  const rosidl_message_type_support_t & type_support_handle,
67  const std::string & topic_name,
68  const rcl_subscription_options_t & subscription_options,
69  bool is_serialized = false);
70 
73  virtual ~SubscriptionBase();
74 
77  const char *
78  get_topic_name() const;
79 
83 
87 
91 
93 
94  virtual std::shared_ptr<void>
95  create_message() = 0;
96 
98 
101 
103 
107  virtual void
108  handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info) = 0;
109 
111 
112  virtual void
113  return_message(std::shared_ptr<void> & message) = 0;
114 
116 
117  virtual void
119 
120  virtual void
122  rcl_interfaces::msg::IntraProcessMessage & ipm,
123  const rmw_message_info_t & message_info) = 0;
124 
125  const rosidl_message_type_support_t &
127 
128  bool
129  is_serialized() const;
130 
131 protected:
135 
136 private:
138 
139  rosidl_message_type_support_t type_support_;
140  bool is_serialized_;
141 };
142 
144 template<
145  typename CallbackMessageT,
146  typename Alloc = std::allocator<void>>
148 {
150 
151 public:
153  using MessageAlloc = typename MessageAllocTraits::allocator_type;
156 
158 
159 
160 
170  std::shared_ptr<rcl_node_t> node_handle,
171  const rosidl_message_type_support_t & ts,
172  const std::string & topic_name,
173  const rcl_subscription_options_t & subscription_options,
174  AnySubscriptionCallback<CallbackMessageT, Alloc> callback,
175  typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT, Alloc>::SharedPtr
176  memory_strategy = message_memory_strategy::MessageMemoryStrategy<CallbackMessageT,
177  Alloc>::create_default())
179  node_handle,
180  ts,
181  topic_name,
182  subscription_options,
183  rclcpp::subscription_traits::is_serialized_subscription_argument<CallbackMessageT>::value),
184  any_callback_(callback),
185  message_memory_strategy_(memory_strategy),
186  get_intra_process_message_callback_(nullptr),
187  matches_any_intra_process_publishers_(nullptr)
188  {}
189 
191 
196  typename message_memory_strategy::MessageMemoryStrategy<CallbackMessageT,
197  Alloc>::SharedPtr message_memory_strategy)
198  {
199  message_memory_strategy_ = message_memory_strategy;
200  }
201 
203  {
204  /* The default message memory strategy provides a dynamically allocated message on each call to
205  * create_message, though alternative memory strategies that re-use a preallocated message may be
206  * used (see rclcpp/strategies/message_pool_memory_strategy.hpp).
207  */
208  return message_memory_strategy_->borrow_message();
209  }
210 
212  {
213  return message_memory_strategy_->borrow_serialized_message();
214  }
215 
216  void handle_message(std::shared_ptr<void> & message, const rmw_message_info_t & message_info)
217  {
218  if (matches_any_intra_process_publishers_) {
219  if (matches_any_intra_process_publishers_(&message_info.publisher_gid)) {
220  // In this case, the message will be delivered via intra process and
221  // we should ignore this copy of the message.
222  return;
223  }
224  }
225  auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
226  any_callback_.dispatch(typed_message, message_info);
227  }
228 
230 
232  {
233  auto typed_message = std::static_pointer_cast<CallbackMessageT>(message);
234  message_memory_strategy_->return_message(typed_message);
235  }
236 
238  {
239  message_memory_strategy_->return_serialized_message(message);
240  }
241 
243  rcl_interfaces::msg::IntraProcessMessage & ipm,
244  const rmw_message_info_t & message_info)
245  {
246  if (!get_intra_process_message_callback_) {
247  // throw std::runtime_error(
248  // "handle_intra_process_message called before setup_intra_process");
249  // TODO(wjwwood): for now, this could mean that intra process was just not enabled.
250  // However, this can only really happen if this node has it disabled, but the other doesn't.
251  return;
252  }
253  MessageUniquePtr msg;
254  get_intra_process_message_callback_(
255  ipm.publisher_id,
256  ipm.message_sequence,
257  intra_process_subscription_id_,
258  msg);
259  if (!msg) {
260  // This either occurred because the publisher no longer exists or the
261  // message requested is no longer being stored.
262  // TODO(wjwwood): should we notify someone of this? log error, log warning?
263  return;
264  }
265  any_callback_.dispatch_intra_process(msg, message_info);
266  }
267 
268  using GetMessageCallbackType =
271 
274  uint64_t intra_process_subscription_id,
275  GetMessageCallbackType get_message_callback,
276  MatchesAnyPublishersCallbackType matches_any_publisher_callback,
277  const rcl_subscription_options_t & intra_process_options)
278  {
279  std::string intra_process_topic_name = std::string(get_topic_name()) + "/_intra";
282  node_handle_.get(),
284  intra_process_topic_name.c_str(),
285  &intra_process_options);
286  if (ret != RCL_RET_OK) {
287  if (ret == RCL_RET_TOPIC_NAME_INVALID) {
288  auto rcl_node_handle = node_handle_.get();
289  // this will throw on any validation problem
290  rcl_reset_error();
292  intra_process_topic_name,
293  rcl_node_get_name(rcl_node_handle),
294  rcl_node_get_namespace(rcl_node_handle));
295  }
296 
297  rclcpp::exceptions::throw_from_rcl_error(ret, "could not create intra process subscription");
298  }
299 
300  intra_process_subscription_id_ = intra_process_subscription_id;
301  get_intra_process_message_callback_ = get_message_callback;
302  matches_any_intra_process_publishers_ = matches_any_publisher_callback;
303  }
304 
308  {
309  if (!get_intra_process_message_callback_) {
310  return nullptr;
311  }
313  }
314 
315 private:
317 
320  message_memory_strategy_;
321 
322  GetMessageCallbackType get_intra_process_message_callback_;
323  MatchesAnyPublishersCallbackType matches_any_intra_process_publishers_;
324  uint64_t intra_process_subscription_id_;
325 };
326 
327 } // namespace rclcpp
328 
329 #endif // RCLCPP__SUBSCRIPTION_HPP_
SubscriptionBase(std::shared_ptr< rcl_node_t > node_handle, const rosidl_message_type_support_t &type_support_handle, const std::string &topic_name, const rcl_subscription_options_t &subscription_options, bool is_serialized=false)
Default constructor.
std::shared_ptr< rcl_node_t > node_handle_
Definition: subscription.hpp:134
Subscription implementation, templated on the type of message this subscription receives.
Definition: subscription.hpp:147
allocator::AllocRebind< CallbackMessageT, Alloc > MessageAllocTraits
Definition: subscription.hpp:152
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage &ipm, const rmw_message_info_t &message_info)
Definition: subscription.hpp:242
const char * get_topic_name() const
Get the topic that this subscription is subscribed on.
#define rcl_reset_error
rmw_ret_t rcl_ret_t
virtual void return_serialized_message(std::shared_ptr< rcl_serialized_message_t > &serialized_msg)
Definition: message_memory_strategy.hpp:123
typename MessageAllocTraits::allocator_type MessageAlloc
Definition: subscription.hpp:153
const rosidl_message_type_support_t & get_message_type_support_handle() const
bool is_serialized() const
Definition: allocator_common.hpp:24
std::function< void(uint64_t, uint64_t, uint64_t, MessageUniquePtr &)> GetMessageCallbackType
Definition: subscription.hpp:269
virtual std::shared_ptr< void > create_message()=0
Borrow a new message.
virtual void return_message(std::shared_ptr< MessageT > &msg)
Release ownership of the message, which will deallocate it if it has no more owners.
Definition: message_memory_strategy.hpp:118
rcl_ret_t rcl_subscription_init(rcl_subscription_t *subscription, const rcl_node_t *node, const rosidl_message_type_support_t *type_support, const char *topic_name, const rcl_subscription_options_t *options)
void dispatch_intra_process(MessageUniquePtr &message, const rmw_message_info_t &message_info)
Definition: any_subscription_callback.hpp:179
std::shared_ptr< void > create_message()
Borrow a new message.
Definition: subscription.hpp:202
std::string expand_topic_or_service_name(const std::string &name, const std::string &node_name, const std::string &namespace_, bool is_service=false)
Expand a topic or service name and throw if it is not valid.
virtual const std::shared_ptr< rcl_subscription_t > get_intra_process_subscription_handle() const
#define RCL_RET_OK
Definition: any_subscription_callback.hpp:34
void throw_from_rcl_error(rcl_ret_t ret, const std::string &prefix="", const rcl_error_state_t *error_state=nullptr, void(*reset_error)()=rcl_reset_error)
Throw a C++ std::exception which was created based on an rcl error.
const std::shared_ptr< rcl_subscription_t > get_intra_process_subscription_handle() const
Implemenation detail.
Definition: subscription.hpp:307
allocator::Deleter< MessageAlloc, CallbackMessageT > MessageDeleter
Definition: subscription.hpp:154
virtual std::shared_ptr< rcl_serialized_message_t > create_serialized_message()=0
Borrow a new serialized message.
void set_message_memory_strategy(typename message_memory_strategy::MessageMemoryStrategy< CallbackMessageT, Alloc >::SharedPtr message_memory_strategy)
Support dynamically setting the message memory strategy.
Definition: subscription.hpp:195
void return_serialized_message(std::shared_ptr< rcl_serialized_message_t > &message)
Return the message borrowed in create_serialized_message.
Definition: subscription.hpp:237
virtual void handle_intra_process_message(rcl_interfaces::msg::IntraProcessMessage &ipm, const rmw_message_info_t &message_info)=0
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
const rosidl_message_type_support_t * get_intra_process_message_msg_type_support()
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
Definition: subscription.hpp:51
std::function< bool(const rmw_gid_t *)> MatchesAnyPublishersCallbackType
Definition: subscription.hpp:270
Pure virtual interface class for the NodeTopics part of the Node API.
Definition: node_topics_interface.hpp:38
virtual void handle_message(std::shared_ptr< void > &message, const rmw_message_info_t &message_info)=0
Check if we need to handle the message, and execute the callback if we do.
T static_pointer_cast(T... args)
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
Default allocation strategy for messages received by subscriptions.
Definition: message_memory_strategy.hpp:38
virtual void return_serialized_message(std::shared_ptr< rcl_serialized_message_t > &message)=0
Return the message borrowed in create_serialized_message.
T get(T... args)
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
rmw_gid_t publisher_gid
std::shared_ptr< rcl_serialized_message_t > create_serialized_message()
Borrow a new serialized message.
Definition: subscription.hpp:211
T c_str(T... args)
#define RCL_RET_TOPIC_NAME_INVALID
const char * rcl_node_get_name(const rcl_node_t *node)
virtual std::shared_ptr< rcl_serialized_message_t > borrow_serialized_message(size_t capacity)
Definition: message_memory_strategy.hpp:85
const char * rcl_node_get_namespace(const rcl_node_t *node)
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
std::shared_ptr< rcl_subscription_t > get_subscription_handle()
virtual std::shared_ptr< MessageT > borrow_message()
By default, dynamically allocate a new message.
Definition: message_memory_strategy.hpp:80
std::shared_ptr< rcl_subscription_t > subscription_handle_
Definition: subscription.hpp:133
virtual ~SubscriptionBase()
Default destructor.
void setup_intra_process(uint64_t intra_process_subscription_id, GetMessageCallbackType get_message_callback, MatchesAnyPublishersCallbackType matches_any_publisher_callback, const rcl_subscription_options_t &intra_process_options)
Implemenation detail.
Definition: subscription.hpp:273
void handle_message(std::shared_ptr< void > &message, const rmw_message_info_t &message_info)
Check if we need to handle the message, and execute the callback if we do.
Definition: subscription.hpp:216
void dispatch(std::shared_ptr< MessageT > message, const rmw_message_info_t &message_info)
Definition: any_subscription_callback.hpp:154
std::shared_ptr< rcl_subscription_t > intra_process_subscription_handle_
Definition: subscription.hpp:132
void return_message(std::shared_ptr< void > &message)
Return the loaned message.
Definition: subscription.hpp:231
virtual void return_message(std::shared_ptr< void > &message)=0
Return the message borrowed in create_message.