rclcpp  master
C++ ROS Client Library API
subscription_intra_process.hpp
Go to the documentation of this file.
1 // Copyright 2019 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
16 #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
17 
18 #include <rmw/rmw.h>
19 
20 #include <functional>
21 #include <memory>
22 #include <string>
23 #include <utility>
24 
25 #include "rcl/error_handling.h"
26 
32 #include "rclcpp/waitable.hpp"
33 #include "tracetools/tracetools.h"
34 
35 namespace rclcpp
36 {
37 namespace experimental
38 {
39 
40 template<
41  typename MessageT,
42  typename Alloc = std::allocator<void>,
44  typename CallbackMessageT = MessageT>
46 {
47 public:
49 
50  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
51  using MessageAlloc = typename MessageAllocTraits::allocator_type;
52  using ConstMessageSharedPtr = std::shared_ptr<const MessageT>;
53  using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
54 
55  using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
56  MessageT,
57  Alloc,
58  Deleter
59  >::UniquePtr;
60 
62  AnySubscriptionCallback<CallbackMessageT, Alloc> callback,
63  std::shared_ptr<Alloc> allocator,
64  rclcpp::Context::SharedPtr context,
65  const std::string & topic_name,
66  rmw_qos_profile_t qos_profile,
67  rclcpp::IntraProcessBufferType buffer_type)
68  : SubscriptionIntraProcessBase(topic_name, qos_profile),
69  any_callback_(callback)
70  {
72  throw std::runtime_error("SubscriptionIntraProcess wrong callback type");
73  }
74 
75  // Create the intra-process buffer.
76  buffer_ = rclcpp::experimental::create_intra_process_buffer<MessageT, Alloc, Deleter>(
77  buffer_type,
78  qos_profile,
79  allocator);
80 
81  // Create the guard condition.
82  rcl_guard_condition_options_t guard_condition_options =
83  rcl_guard_condition_get_default_options();
84 
85  gc_ = rcl_get_zero_initialized_guard_condition();
86  rcl_ret_t ret = rcl_guard_condition_init(
87  &gc_, context->get_rcl_context().get(), guard_condition_options);
88 
89  if (RCL_RET_OK != ret) {
90  throw std::runtime_error("SubscriptionIntraProcess init error initializing guard condition");
91  }
92 
93  TRACEPOINT(
94  rclcpp_subscription_callback_added,
95  (const void *)this,
96  (const void *)&any_callback_);
97  // The callback object gets copied, so if registration is done too early/before this point
98  // (e.g. in `AnySubscriptionCallback::set()`), its address won't match any address used later
99  // in subsequent tracepoints.
100 #ifndef TRACETOOLS_DISABLED
101  any_callback_.register_callback_for_tracing();
102 #endif
103  }
104 
105  bool
107  {
108  (void)wait_set;
109  return buffer_->has_data();
110  }
111 
112  void execute()
113  {
114  execute_impl<CallbackMessageT>();
115  }
116 
117  void
119  {
120  buffer_->add_shared(std::move(message));
121  trigger_guard_condition();
122  }
123 
124  void
126  {
127  buffer_->add_unique(std::move(message));
128  trigger_guard_condition();
129  }
130 
131  bool
133  {
134  return buffer_->use_take_shared_method();
135  }
136 
137 private:
138  void
139  trigger_guard_condition()
140  {
141  rcl_ret_t ret = rcl_trigger_guard_condition(&gc_);
142  (void)ret;
143  }
144 
145  template<typename T>
147  execute_impl()
148  {
149  throw std::runtime_error("Subscription intra-process can't handle serialized messages");
150  }
151 
152  template<class T>
154  execute_impl()
155  {
156  rmw_message_info_t msg_info;
157  msg_info.publisher_gid = {0, {0}};
158  msg_info.from_intra_process = true;
159 
160  if (any_callback_.use_take_shared_method()) {
161  ConstMessageSharedPtr msg = buffer_->consume_shared();
162  any_callback_.dispatch_intra_process(msg, msg_info);
163  } else {
164  MessageUniquePtr msg = buffer_->consume_unique();
165  any_callback_.dispatch_intra_process(std::move(msg), msg_info);
166  }
167  }
168 
169  AnySubscriptionCallback<CallbackMessageT, Alloc> any_callback_;
170  BufferUniquePtr buffer_;
171 };
172 
173 } // namespace experimental
174 } // namespace rclcpp
175 
176 #endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
std::is_same
rclcpp::experimental::SubscriptionIntraProcess::provide_intra_process_message
void provide_intra_process_message(ConstMessageSharedPtr message)
Definition: subscription_intra_process.hpp:118
std::shared_ptr
std::move
T move(T... args)
rmw_message_info_t
rmw.h
rmw_message_info_t::publisher_gid
rmw_gid_t publisher_gid
rclcpp::experimental::SubscriptionIntraProcess::BufferUniquePtr
typename rclcpp::experimental::buffers::IntraProcessBuffer< MessageT, Alloc, Deleter >::UniquePtr BufferUniquePtr
Definition: subscription_intra_process.hpp:59
rclcpp::experimental::SubscriptionIntraProcess::provide_intra_process_message
void provide_intra_process_message(MessageUniquePtr message)
Definition: subscription_intra_process.hpp:125
rclcpp::experimental::SubscriptionIntraProcess::use_take_shared_method
bool use_take_shared_method() const
Definition: subscription_intra_process.hpp:132
rcl_guard_condition_options_t
rclcpp::experimental::SubscriptionIntraProcessBase
Definition: subscription_intra_process_base.hpp:36
rclcpp::experimental::SubscriptionIntraProcess::MessageAllocTraits
allocator::AllocRebind< MessageT, Alloc > MessageAllocTraits
Definition: subscription_intra_process.hpp:50
rclcpp
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
subscription_intra_process_base.hpp
create_intra_process_buffer.hpp
RCLCPP_SMART_PTR_DEFINITIONS
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
rclcpp::experimental::SubscriptionIntraProcess::MessageUniquePtr
std::unique_ptr< MessageT, Deleter > MessageUniquePtr
Definition: subscription_intra_process.hpp:53
rclcpp::experimental::SubscriptionIntraProcess::MessageAlloc
typename MessageAllocTraits::allocator_type MessageAlloc
Definition: subscription_intra_process.hpp:51
std::enable_if
intra_process_buffer.hpp
rclcpp::allocator::AllocRebind
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
rcl_wait_set_t
rclcpp::experimental::SubscriptionIntraProcess
Definition: subscription_intra_process.hpp:45
rclcpp::AnySubscriptionCallback
Definition: any_subscription_callback.hpp:37
std::runtime_error
any_subscription_callback.hpp
rclcpp::Context
Context which encapsulates shared state between nodes and other similar entities.
Definition: context.hpp:56
type_support_decl.hpp
rclcpp::allocator::Deleter
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
std
rclcpp::experimental::SubscriptionIntraProcess::execute
void execute()
Execute any entities of the Waitable that are ready.
Definition: subscription_intra_process.hpp:112
rmw_qos_profile_t
rclcpp::IntraProcessBufferType
IntraProcessBufferType
Definition: intra_process_buffer_type.hpp:23
std::allocator
rclcpp::experimental::SubscriptionIntraProcessBase::gc_
rcl_guard_condition_t gc_
Definition: subscription_intra_process_base.hpp:75
std::unique_ptr
rmw_message_info_t::from_intra_process
bool from_intra_process
waitable.hpp
std::default_delete
rclcpp::experimental::SubscriptionIntraProcess::is_ready
bool is_ready(rcl_wait_set_t *wait_set)
Check if the Waitable is ready.
Definition: subscription_intra_process.hpp:106
rclcpp::experimental::SubscriptionIntraProcess::ConstMessageSharedPtr
std::shared_ptr< const MessageT > ConstMessageSharedPtr
Definition: subscription_intra_process.hpp:52