rclcpp  master
C++ ROS Client Library API
subscription_intra_process.hpp
Go to the documentation of this file.
1 // Copyright 2019 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
16 #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
17 
18 #include <rmw/rmw.h>
19 
20 #include <functional>
21 #include <map>
22 #include <memory>
23 #include <stdexcept>
24 #include <string>
25 #include <utility>
26 
27 #include "rcl/error_handling.h"
28 
34 #include "rclcpp/waitable.hpp"
35 #include "tracetools/tracetools.h"
36 
37 namespace rclcpp
38 {
39 namespace experimental
40 {
41 
42 template<
43  typename MessageT,
44  typename Alloc = std::allocator<void>,
46  typename CallbackMessageT = MessageT>
48 {
49 public:
51 
52  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
53  using MessageAlloc = typename MessageAllocTraits::allocator_type;
54  using ConstMessageSharedPtr = std::shared_ptr<const MessageT>;
55  using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
56 
57  using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
58  MessageT,
59  Alloc,
60  Deleter
61  >::UniquePtr;
62 
64  AnySubscriptionCallback<CallbackMessageT, Alloc> callback,
65  std::shared_ptr<Alloc> allocator,
66  rclcpp::Context::SharedPtr context,
67  const std::string & topic_name,
68  rmw_qos_profile_t qos_profile,
69  rclcpp::IntraProcessBufferType buffer_type)
70  : SubscriptionIntraProcessBase(topic_name, qos_profile),
71  any_callback_(callback)
72  {
74  throw std::runtime_error("SubscriptionIntraProcess wrong callback type");
75  }
76 
77  // Create the intra-process buffer.
78  buffer_ = rclcpp::experimental::create_intra_process_buffer<MessageT, Alloc, Deleter>(
79  buffer_type,
80  qos_profile,
81  allocator);
82 
83  // Create the guard condition.
84  rcl_guard_condition_options_t guard_condition_options =
86 
89  &gc_, context->get_rcl_context().get(), guard_condition_options);
90 
91  if (RCL_RET_OK != ret) {
92  throw std::runtime_error("SubscriptionIntraProcess init error initializing guard condition");
93  }
94 
95  TRACEPOINT(
96  rclcpp_subscription_callback_added,
97  static_cast<const void *>(this),
98  static_cast<const void *>(&any_callback_));
99  // The callback object gets copied, so if registration is done too early/before this point
100  // (e.g. in `AnySubscriptionCallback::set()`), its address won't match any address used later
101  // in subsequent tracepoints.
102 #ifndef TRACETOOLS_DISABLED
103  any_callback_.register_callback_for_tracing();
104 #endif
105  }
106 
108  {
111  "rclcpp",
112  "Failed to destroy guard condition: %s",
114  }
115  }
116 
117  bool
119  {
120  (void) wait_set;
121  return buffer_->has_data();
122  }
123 
126  {
127  ConstMessageSharedPtr shared_msg;
128  MessageUniquePtr unique_msg;
129 
130  if (any_callback_.use_take_shared_method()) {
131  shared_msg = buffer_->consume_shared();
132  } else {
133  unique_msg = buffer_->consume_unique();
134  }
135  return std::static_pointer_cast<void>(
138  shared_msg, std::move(unique_msg)))
139  );
140  }
141 
143  {
144  execute_impl<CallbackMessageT>(data);
145  }
146 
147  void
149  {
150  buffer_->add_shared(std::move(message));
151  trigger_guard_condition();
152  }
153 
154  void
156  {
157  buffer_->add_unique(std::move(message));
158  trigger_guard_condition();
159  }
160 
161  bool
163  {
164  return buffer_->use_take_shared_method();
165  }
166 
167 private:
168  void
169  trigger_guard_condition()
170  {
172  (void)ret;
173  }
174 
175  template<typename T>
177  execute_impl(std::shared_ptr<void> & data)
178  {
179  (void)data;
180  throw std::runtime_error("Subscription intra-process can't handle serialized messages");
181  }
182 
183  template<class T>
185  execute_impl(std::shared_ptr<void> & data)
186  {
187  if (!data) {
188  throw std::runtime_error("'data' is empty");
189  }
190 
191  rmw_message_info_t msg_info;
192  msg_info.publisher_gid = {0, {0}};
193  msg_info.from_intra_process = true;
194 
195  auto shared_ptr = std::static_pointer_cast<std::pair<ConstMessageSharedPtr, MessageUniquePtr>>(
196  data);
197 
198  if (any_callback_.use_take_shared_method()) {
199  ConstMessageSharedPtr shared_msg = shared_ptr->first;
200  any_callback_.dispatch_intra_process(shared_msg, msg_info);
201  } else {
202  MessageUniquePtr unique_msg = std::move(shared_ptr->second);
203  any_callback_.dispatch_intra_process(std::move(unique_msg), msg_info);
204  }
205  shared_ptr.reset();
206  }
207 
208  AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_;
209  BufferUniquePtr buffer_;
210 };
211 
212 } // namespace experimental
213 } // namespace rclcpp
214 
215 #endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
std::is_same
rclcpp::experimental::SubscriptionIntraProcess::provide_intra_process_message
void provide_intra_process_message(ConstMessageSharedPtr message)
Definition: subscription_intra_process.hpp:148
std::shared_ptr
rcl_trigger_guard_condition
rcl_ret_t rcl_trigger_guard_condition(rcl_guard_condition_t *guard_condition)
std::move
T move(T... args)
rmw_message_info_t
rclcpp::experimental::SubscriptionIntraProcess::execute
void execute(std::shared_ptr< void > &data)
Execute data that is passed in.
Definition: subscription_intra_process.hpp:142
rmw.h
rmw_message_info_t::publisher_gid
rmw_gid_t publisher_gid
rcl_ret_t
rmw_ret_t rcl_ret_t
rclcpp::experimental::SubscriptionIntraProcess::BufferUniquePtr
typename rclcpp::experimental::buffers::IntraProcessBuffer< MessageT, Alloc, Deleter >::UniquePtr BufferUniquePtr
Definition: subscription_intra_process.hpp:61
std::pair
rclcpp::experimental::SubscriptionIntraProcess::provide_intra_process_message
void provide_intra_process_message(MessageUniquePtr message)
Definition: subscription_intra_process.hpp:155
std::make_shared
T make_shared(T... args)
rclcpp::experimental::SubscriptionIntraProcess::use_take_shared_method
bool use_take_shared_method() const
Definition: subscription_intra_process.hpp:162
rcl_guard_condition_options_t
rcutils_get_error_string
rcutils_error_string_t rcutils_get_error_string(void)
rclcpp::experimental::SubscriptionIntraProcessBase
Definition: subscription_intra_process_base.hpp:36
rclcpp::experimental::SubscriptionIntraProcess::MessageAllocTraits
allocator::AllocRebind< MessageT, Alloc > MessageAllocTraits
Definition: subscription_intra_process.hpp:52
rclcpp
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
subscription_intra_process_base.hpp
create_intra_process_buffer.hpp
RCLCPP_SMART_PTR_DEFINITIONS
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
rclcpp::experimental::SubscriptionIntraProcess::MessageUniquePtr
std::unique_ptr< MessageT, Deleter > MessageUniquePtr
Definition: subscription_intra_process.hpp:55
rclcpp::experimental::SubscriptionIntraProcess::~SubscriptionIntraProcess
~SubscriptionIntraProcess()
Definition: subscription_intra_process.hpp:107
RCUTILS_LOG_ERROR_NAMED
#define RCUTILS_LOG_ERROR_NAMED(name,...)
rclcpp::experimental::SubscriptionIntraProcess::MessageAlloc
typename MessageAllocTraits::allocator_type MessageAlloc
Definition: subscription_intra_process.hpp:53
std::enable_if
rcl_guard_condition_get_default_options
rcl_guard_condition_options_t rcl_guard_condition_get_default_options(void)
intra_process_buffer.hpp
rclcpp::allocator::AllocRebind
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
rcl_guard_condition_init
rcl_ret_t rcl_guard_condition_init(rcl_guard_condition_t *guard_condition, rcl_context_t *context, const rcl_guard_condition_options_t options)
rcl_get_zero_initialized_guard_condition
rcl_guard_condition_t rcl_get_zero_initialized_guard_condition(void)
rcl_wait_set_t
rclcpp::experimental::SubscriptionIntraProcess
Definition: subscription_intra_process.hpp:47
rclcpp::AnySubscriptionCallback
Definition: any_subscription_callback.hpp:101
std::runtime_error
any_subscription_callback.hpp
rclcpp::Context
Context which encapsulates shared state between nodes and other similar entities.
Definition: context.hpp:68
rcl_guard_condition_fini
rcl_ret_t rcl_guard_condition_fini(rcl_guard_condition_t *guard_condition)
type_support_decl.hpp
rclcpp::allocator::Deleter
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
std
rmw_qos_profile_t
rclcpp::IntraProcessBufferType
IntraProcessBufferType
Definition: intra_process_buffer_type.hpp:23
std::allocator
rclcpp::experimental::SubscriptionIntraProcessBase::gc_
rcl_guard_condition_t gc_
Definition: subscription_intra_process_base.hpp:79
RCL_RET_OK
#define RCL_RET_OK
std::unique_ptr
rmw_message_info_t::from_intra_process
bool from_intra_process
waitable.hpp
std::default_delete
rclcpp::experimental::SubscriptionIntraProcess::take_data
std::shared_ptr< void > take_data()
Take the data so that it can be consumed with execute.
Definition: subscription_intra_process.hpp:125
rclcpp::experimental::SubscriptionIntraProcess::is_ready
bool is_ready(rcl_wait_set_t *wait_set)
Check if the Waitable is ready.
Definition: subscription_intra_process.hpp:118
rclcpp::experimental::SubscriptionIntraProcess::ConstMessageSharedPtr
std::shared_ptr< const MessageT > ConstMessageSharedPtr
Definition: subscription_intra_process.hpp:54