rclcpp  master
C++ ROS Client Library API
subscription_intra_process.hpp
Go to the documentation of this file.
1 // Copyright 2019 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
16 #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
17 
18 #include <rmw/rmw.h>
19 
20 #include <functional>
21 #include <memory>
22 #include <string>
23 #include <utility>
24 
25 #include "rcl/error_handling.h"
26 
32 #include "rclcpp/waitable.hpp"
33 #include "tracetools/tracetools.h"
34 
35 namespace rclcpp
36 {
37 namespace experimental
38 {
39 
40 template<
41  typename MessageT,
42  typename Alloc = std::allocator<void>,
44  typename CallbackMessageT = MessageT>
46 {
47 public:
49 
50  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
51  using MessageAlloc = typename MessageAllocTraits::allocator_type;
52  using ConstMessageSharedPtr = std::shared_ptr<const MessageT>;
53  using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
54 
55  using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer<
56  MessageT,
57  Alloc,
58  Deleter
59  >::UniquePtr;
60 
62  AnySubscriptionCallback<CallbackMessageT, Alloc> callback,
63  std::shared_ptr<Alloc> allocator,
64  rclcpp::Context::SharedPtr context,
65  const std::string & topic_name,
66  rmw_qos_profile_t qos_profile,
67  rclcpp::IntraProcessBufferType buffer_type)
68  : SubscriptionIntraProcessBase(topic_name, qos_profile),
69  any_callback_(callback)
70  {
72  throw std::runtime_error("SubscriptionIntraProcess wrong callback type");
73  }
74 
75  // Create the intra-process buffer.
76  buffer_ = rclcpp::experimental::create_intra_process_buffer<MessageT, Alloc, Deleter>(
77  buffer_type,
78  qos_profile,
79  allocator);
80 
81  // Create the guard condition.
82  rcl_guard_condition_options_t guard_condition_options =
84 
87  &gc_, context->get_rcl_context().get(), guard_condition_options);
88 
89  if (RCL_RET_OK != ret) {
90  throw std::runtime_error("SubscriptionIntraProcess init error initializing guard condition");
91  }
92 
93  TRACEPOINT(
94  rclcpp_subscription_callback_added,
95  (const void *)this,
96  (const void *)&any_callback_);
97  // The callback object gets copied, so if registration is done too early/before this point
98  // (e.g. in `AnySubscriptionCallback::set()`), its address won't match any address used later
99  // in subsequent tracepoints.
100  any_callback_.register_callback_for_tracing();
101  }
102 
103  bool
105  {
106  (void)wait_set;
107  return buffer_->has_data();
108  }
109 
110  void execute()
111  {
112  execute_impl<CallbackMessageT>();
113  }
114 
115  void
117  {
118  buffer_->add_shared(std::move(message));
119  trigger_guard_condition();
120  }
121 
122  void
124  {
125  buffer_->add_unique(std::move(message));
126  trigger_guard_condition();
127  }
128 
129  bool
131  {
132  return buffer_->use_take_shared_method();
133  }
134 
135 private:
136  void
137  trigger_guard_condition()
138  {
140  (void)ret;
141  }
142 
143  template<typename T>
145  execute_impl()
146  {
147  throw std::runtime_error("Subscription intra-process can't handle serialized messages");
148  }
149 
150  template<class T>
152  execute_impl()
153  {
154  rmw_message_info_t msg_info;
155  msg_info.from_intra_process = true;
156 
157  if (any_callback_.use_take_shared_method()) {
158  ConstMessageSharedPtr msg = buffer_->consume_shared();
159  any_callback_.dispatch_intra_process(msg, msg_info);
160  } else {
161  MessageUniquePtr msg = buffer_->consume_unique();
162  any_callback_.dispatch_intra_process(std::move(msg), msg_info);
163  }
164  }
165 
167  BufferUniquePtr buffer_;
168 };
169 
170 } // namespace experimental
171 } // namespace rclcpp
172 
173 #endif // RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_HPP_
bool use_take_shared_method() const
Definition: any_subscription_callback.hpp:230
rcl_guard_condition_options_t rcl_guard_condition_get_default_options(void)
Definition: subscription_intra_process_base.hpp:36
rmw_ret_t rcl_ret_t
void provide_intra_process_message(ConstMessageSharedPtr message)
Definition: subscription_intra_process.hpp:116
Context which encapsulates shared state between nodes and other similar entities. ...
Definition: context.hpp:53
void dispatch_intra_process(ConstMessageSharedPtr message, const rmw_message_info_t &message_info)
Definition: any_subscription_callback.hpp:183
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
Definition: subscription_intra_process.hpp:45
rcl_guard_condition_t gc_
Definition: subscription_intra_process_base.hpp:75
rcl_guard_condition_t rcl_get_zero_initialized_guard_condition(void)
#define RCL_RET_OK
allocator::AllocRebind< MessageT, Alloc > MessageAllocTraits
Definition: subscription_intra_process.hpp:50
rcl_ret_t rcl_trigger_guard_condition(rcl_guard_condition_t *guard_condition)
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
Definition: any_subscription_callback.hpp:36
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
typename MessageAllocTraits::allocator_type MessageAlloc
Definition: subscription_intra_process.hpp:51
void execute()
Execute any entities of the Waitable that are ready.
Definition: subscription_intra_process.hpp:110
bool is_ready(rcl_wait_set_t *wait_set)
Check if the Waitable is ready.
Definition: subscription_intra_process.hpp:104
void provide_intra_process_message(MessageUniquePtr message)
Definition: subscription_intra_process.hpp:123
T move(T... args)
typename rclcpp::experimental::buffers::IntraProcessBuffer< MessageT, Alloc, Deleter >::UniquePtr BufferUniquePtr
Definition: subscription_intra_process.hpp:59
rcl_ret_t rcl_guard_condition_init(rcl_guard_condition_t *guard_condition, rcl_context_t *context, const rcl_guard_condition_options_t options)
bool use_take_shared_method() const
Definition: subscription_intra_process.hpp:130
IntraProcessBufferType
Definition: intra_process_buffer_type.hpp:23
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101