rclcpp  master
C++ ROS Client Library API
allocator_memory_strategy.hpp
Go to the documentation of this file.
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
16 #define RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
17 
18 #include <memory>
19 #include <vector>
20 
21 #include "rcl/allocator.h"
22 
25 #include "rclcpp/node.hpp"
27 
28 #include "rcutils/logging_macros.h"
29 
30 #include "rmw/types.h"
31 
32 namespace rclcpp
33 {
34 namespace memory_strategies
35 {
36 namespace allocator_memory_strategy
37 {
38 
40 
45 template<typename Alloc = std::allocator<void>>
47 {
48 public:
50 
51  using VoidAllocTraits = typename allocator::AllocRebind<void *, Alloc>;
52  using VoidAlloc = typename VoidAllocTraits::allocator_type;
53 
54  explicit AllocatorMemoryStrategy(std::shared_ptr<Alloc> allocator)
55  {
56  allocator_ = std::make_shared<VoidAlloc>(*allocator.get());
57  }
58 
60  {
61  allocator_ = std::make_shared<VoidAlloc>();
62  }
63 
64  void add_guard_condition(const rcl_guard_condition_t * guard_condition)
65  {
66  for (const auto & existing_guard_condition : guard_conditions_) {
67  if (existing_guard_condition == guard_condition) {
68  return;
69  }
70  }
71  guard_conditions_.push_back(guard_condition);
72  }
73 
74  void remove_guard_condition(const rcl_guard_condition_t * guard_condition)
75  {
76  for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) {
77  if (*it == guard_condition) {
78  guard_conditions_.erase(it);
79  break;
80  }
81  }
82  }
83 
85  {
86  subscription_handles_.clear();
87  service_handles_.clear();
88  client_handles_.clear();
89  timer_handles_.clear();
90  }
91 
92  virtual void remove_null_handles(rcl_wait_set_t * wait_set)
93  {
94  for (size_t i = 0; i < wait_set->size_of_subscriptions; ++i) {
95  if (!wait_set->subscriptions[i]) {
96  subscription_handles_[i].reset();
97  }
98  }
99  for (size_t i = 0; i < wait_set->size_of_services; ++i) {
100  if (!wait_set->services[i]) {
101  service_handles_[i].reset();
102  }
103  }
104  for (size_t i = 0; i < wait_set->size_of_clients; ++i) {
105  if (!wait_set->clients[i]) {
106  client_handles_[i].reset();
107  }
108  }
109  for (size_t i = 0; i < wait_set->size_of_timers; ++i) {
110  if (!wait_set->timers[i]) {
111  timer_handles_[i].reset();
112  }
113  }
114 
115  subscription_handles_.erase(
116  std::remove(subscription_handles_.begin(), subscription_handles_.end(), nullptr),
117  subscription_handles_.end()
118  );
119 
120  service_handles_.erase(
121  std::remove(service_handles_.begin(), service_handles_.end(), nullptr),
122  service_handles_.end()
123  );
124 
125  client_handles_.erase(
126  std::remove(client_handles_.begin(), client_handles_.end(), nullptr),
127  client_handles_.end()
128  );
129 
130  timer_handles_.erase(
131  std::remove(timer_handles_.begin(), timer_handles_.end(), nullptr),
132  timer_handles_.end()
133  );
134  }
135 
136  bool collect_entities(const WeakNodeVector & weak_nodes)
137  {
138  bool has_invalid_weak_nodes = false;
139  for (auto & weak_node : weak_nodes) {
140  auto node = weak_node.lock();
141  if (!node) {
142  has_invalid_weak_nodes = true;
143  continue;
144  }
145  for (auto & weak_group : node->get_callback_groups()) {
146  auto group = weak_group.lock();
147  if (!group || !group->can_be_taken_from().load()) {
148  continue;
149  }
150  for (auto & weak_subscription : group->get_subscription_ptrs()) {
151  auto subscription = weak_subscription.lock();
152  if (subscription) {
153  subscription_handles_.push_back(subscription->get_subscription_handle());
154  if (subscription->get_intra_process_subscription_handle()) {
155  subscription_handles_.push_back(
156  subscription->get_intra_process_subscription_handle());
157  }
158  }
159  }
160  for (auto & weak_service : group->get_service_ptrs()) {
161  auto service = weak_service.lock();
162  if (service) {
163  service_handles_.push_back(service->get_service_handle());
164  }
165  }
166  for (auto & weak_client : group->get_client_ptrs()) {
167  auto client = weak_client.lock();
168  if (client) {
169  client_handles_.push_back(client->get_client_handle());
170  }
171  }
172  for (auto & weak_timer : group->get_timer_ptrs()) {
173  auto timer = weak_timer.lock();
174  if (timer) {
175  timer_handles_.push_back(timer->get_timer_handle());
176  }
177  }
178  }
179  }
180  return has_invalid_weak_nodes;
181  }
182 
184  {
185  for (auto subscription : subscription_handles_) {
186  if (rcl_wait_set_add_subscription(wait_set, subscription.get()) != RCL_RET_OK) {
188  "rclcpp",
189  "Couldn't add subscription to wait set: %s", rcl_get_error_string_safe());
190  return false;
191  }
192  }
193 
194  for (auto client : client_handles_) {
195  if (rcl_wait_set_add_client(wait_set, client.get()) != RCL_RET_OK) {
197  "rclcpp",
198  "Couldn't add client to wait set: %s", rcl_get_error_string_safe());
199  return false;
200  }
201  }
202 
203  for (auto service : service_handles_) {
204  if (rcl_wait_set_add_service(wait_set, service.get()) != RCL_RET_OK) {
206  "rclcpp",
207  "Couldn't add service to wait set: %s", rcl_get_error_string_safe());
208  return false;
209  }
210  }
211 
212  for (auto timer : timer_handles_) {
213  if (rcl_wait_set_add_timer(wait_set, timer.get()) != RCL_RET_OK) {
215  "rclcpp",
216  "Couldn't add timer to wait set: %s", rcl_get_error_string_safe());
217  return false;
218  }
219  }
220 
221  for (auto guard_condition : guard_conditions_) {
222  if (rcl_wait_set_add_guard_condition(wait_set, guard_condition) != RCL_RET_OK) {
224  "rclcpp",
225  "Couldn't add guard_condition to wait set: %s",
227  return false;
228  }
229  }
230  return true;
231  }
232 
233  virtual void
235  executor::AnyExecutable & any_exec,
236  const WeakNodeVector & weak_nodes)
237  {
238  auto it = subscription_handles_.begin();
239  while (it != subscription_handles_.end()) {
240  auto subscription = get_subscription_by_handle(*it, weak_nodes);
241  if (subscription) {
242  // Figure out if this is for intra-process or not.
243  bool is_intra_process = false;
244  if (subscription->get_intra_process_subscription_handle()) {
245  is_intra_process = subscription->get_intra_process_subscription_handle() == *it;
246  }
247  // Find the group for this handle and see if it can be serviced
248  auto group = get_group_by_subscription(subscription, weak_nodes);
249  if (!group) {
250  // Group was not found, meaning the subscription is not valid...
251  // Remove it from the ready list and continue looking
252  it = subscription_handles_.erase(it);
253  continue;
254  }
255  if (!group->can_be_taken_from().load()) {
256  // Group is mutually exclusive and is being used, so skip it for now
257  // Leave it to be checked next time, but continue searching
258  ++it;
259  continue;
260  }
261  // Otherwise it is safe to set and return the any_exec
262  if (is_intra_process) {
263  any_exec.subscription_intra_process = subscription;
264  } else {
265  any_exec.subscription = subscription;
266  }
267  any_exec.callback_group = group;
268  any_exec.node_base = get_node_by_group(group, weak_nodes);
269  subscription_handles_.erase(it);
270  return;
271  }
272  // Else, the subscription is no longer valid, remove it and continue
273  it = subscription_handles_.erase(it);
274  }
275  }
276 
277  virtual void
279  executor::AnyExecutable & any_exec,
280  const WeakNodeVector & weak_nodes)
281  {
282  auto it = service_handles_.begin();
283  while (it != service_handles_.end()) {
284  auto service = get_service_by_handle(*it, weak_nodes);
285  if (service) {
286  // Find the group for this handle and see if it can be serviced
287  auto group = get_group_by_service(service, weak_nodes);
288  if (!group) {
289  // Group was not found, meaning the service is not valid...
290  // Remove it from the ready list and continue looking
291  it = service_handles_.erase(it);
292  continue;
293  }
294  if (!group->can_be_taken_from().load()) {
295  // Group is mutually exclusive and is being used, so skip it for now
296  // Leave it to be checked next time, but continue searching
297  ++it;
298  continue;
299  }
300  // Otherwise it is safe to set and return the any_exec
301  any_exec.service = service;
302  any_exec.callback_group = group;
303  any_exec.node_base = get_node_by_group(group, weak_nodes);
304  service_handles_.erase(it);
305  return;
306  }
307  // Else, the service is no longer valid, remove it and continue
308  it = service_handles_.erase(it);
309  }
310  }
311 
312  virtual void
314  {
315  auto it = client_handles_.begin();
316  while (it != client_handles_.end()) {
317  auto client = get_client_by_handle(*it, weak_nodes);
318  if (client) {
319  // Find the group for this handle and see if it can be serviced
320  auto group = get_group_by_client(client, weak_nodes);
321  if (!group) {
322  // Group was not found, meaning the service is not valid...
323  // Remove it from the ready list and continue looking
324  it = client_handles_.erase(it);
325  continue;
326  }
327  if (!group->can_be_taken_from().load()) {
328  // Group is mutually exclusive and is being used, so skip it for now
329  // Leave it to be checked next time, but continue searching
330  ++it;
331  continue;
332  }
333  // Otherwise it is safe to set and return the any_exec
334  any_exec.client = client;
335  any_exec.callback_group = group;
336  any_exec.node_base = get_node_by_group(group, weak_nodes);
337  client_handles_.erase(it);
338  return;
339  }
340  // Else, the service is no longer valid, remove it and continue
341  it = client_handles_.erase(it);
342  }
343  }
344 
346  {
347  return rclcpp::allocator::get_rcl_allocator<void *, VoidAlloc>(*allocator_.get());
348  }
349 
351  {
352  return subscription_handles_.size();
353  }
354 
356  {
357  return service_handles_.size();
358  }
359 
360  size_t number_of_ready_clients() const
361  {
362  return client_handles_.size();
363  }
364 
366  {
367  return guard_conditions_.size();
368  }
369 
370  size_t number_of_ready_timers() const
371  {
372  return timer_handles_.size();
373  }
374 
375 private:
376  template<typename T>
377  using VectorRebind =
379 
380  VectorRebind<const rcl_guard_condition_t *> guard_conditions_;
381 
382  VectorRebind<std::shared_ptr<const rcl_subscription_t>> subscription_handles_;
383  VectorRebind<std::shared_ptr<const rcl_service_t>> service_handles_;
384  VectorRebind<std::shared_ptr<const rcl_client_t>> client_handles_;
385  VectorRebind<std::shared_ptr<const rcl_timer_t>> timer_handles_;
386 
387  std::shared_ptr<VoidAlloc> allocator_;
388 };
389 
390 } // namespace allocator_memory_strategy
391 } // namespace memory_strategies
392 } // namespace rclcpp
393 
394 #endif // RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
const rcl_service_t ** services
virtual void remove_null_handles(rcl_wait_set_t *wait_set)
Definition: allocator_memory_strategy.hpp:92
Delegate for handling memory allocations while the Executor is executing.
Definition: memory_strategy.hpp:40
virtual void get_next_subscription(executor::AnyExecutable &any_exec, const WeakNodeVector &weak_nodes)
Definition: allocator_memory_strategy.hpp:234
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group
Definition: any_executable.hpp:49
AllocatorMemoryStrategy()
Definition: allocator_memory_strategy.hpp:59
rclcpp::SubscriptionBase::SharedPtr subscription
Definition: any_executable.hpp:43
rclcpp::ServiceBase::SharedPtr service
Definition: any_executable.hpp:46
size_t size_of_clients
rclcpp::ClientBase::SharedPtr client
Definition: any_executable.hpp:47
static rclcpp::ServiceBase::SharedPtr get_service_by_handle(std::shared_ptr< const rcl_service_t > service_handle, const WeakNodeVector &weak_nodes)
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base
Definition: any_executable.hpp:50
rcl_ret_t rcl_wait_set_add_subscription(rcl_wait_set_t *wait_set, const rcl_subscription_t *subscription)
size_t size_of_subscriptions
size_t size_of_services
Definition: allocator_common.hpp:24
const rcl_client_t ** clients
static rclcpp::SubscriptionBase::SharedPtr get_subscription_by_handle(std::shared_ptr< const rcl_subscription_t > subscriber_handle, const WeakNodeVector &weak_nodes)
T remove(T... args)
const rcl_timer_t ** timers
static rclcpp::node_interfaces::NodeBaseInterface::SharedPtr get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr group, const WeakNodeVector &weak_nodes)
static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_client(rclcpp::ClientBase::SharedPtr client, const WeakNodeVector &weak_nodes)
#define RCL_RET_OK
size_t number_of_guard_conditions() const
Definition: allocator_memory_strategy.hpp:365
bool collect_entities(const WeakNodeVector &weak_nodes)
Definition: allocator_memory_strategy.hpp:136
Delegate for handling memory allocations while the Executor is executing.
Definition: allocator_memory_strategy.hpp:46
size_t number_of_ready_clients() const
Definition: allocator_memory_strategy.hpp:360
rcl_ret_t rcl_wait_set_add_guard_condition(rcl_wait_set_t *wait_set, const rcl_guard_condition_t *guard_condition)
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
size_t size_of_timers
void remove_guard_condition(const rcl_guard_condition_t *guard_condition)
Definition: allocator_memory_strategy.hpp:74
void clear_handles()
Definition: allocator_memory_strategy.hpp:84
rclcpp::SubscriptionBase::SharedPtr subscription_intra_process
Definition: any_executable.hpp:44
#define rcl_get_error_string_safe
size_t number_of_ready_timers() const
Definition: allocator_memory_strategy.hpp:370
rcl_ret_t rcl_wait_set_add_client(rcl_wait_set_t *wait_set, const rcl_client_t *client)
#define RCUTILS_LOG_ERROR_NAMED(name,...)
T get(T... args)
rcl_ret_t rcl_wait_set_add_service(rcl_wait_set_t *wait_set, const rcl_service_t *service)
virtual void get_next_service(executor::AnyExecutable &any_exec, const WeakNodeVector &weak_nodes)
Definition: allocator_memory_strategy.hpp:278
Definition: any_executable.hpp:34
static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_service(rclcpp::ServiceBase::SharedPtr service, const WeakNodeVector &weak_nodes)
virtual rcl_allocator_t get_allocator()
Definition: allocator_memory_strategy.hpp:345
bool add_handles_to_wait_set(rcl_wait_set_t *wait_set)
Definition: allocator_memory_strategy.hpp:183
void add_guard_condition(const rcl_guard_condition_t *guard_condition)
Definition: allocator_memory_strategy.hpp:64
virtual void get_next_client(executor::AnyExecutable &any_exec, const WeakNodeVector &weak_nodes)
Definition: allocator_memory_strategy.hpp:313
typename allocator::AllocRebind< void *, Alloc > VoidAllocTraits
Definition: allocator_memory_strategy.hpp:51
static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_subscription(rclcpp::SubscriptionBase::SharedPtr subscription, const WeakNodeVector &weak_nodes)
rcl_ret_t rcl_wait_set_add_timer(rcl_wait_set_t *wait_set, const rcl_timer_t *timer)
static rclcpp::ClientBase::SharedPtr get_client_by_handle(std::shared_ptr< const rcl_client_t > client_handle, const WeakNodeVector &weak_nodes)
size_t number_of_ready_subscriptions() const
Definition: allocator_memory_strategy.hpp:350
typename VoidAllocTraits::allocator_type VoidAlloc
Definition: allocator_memory_strategy.hpp:52
const rcl_subscription_t ** subscriptions
size_t number_of_ready_services() const
Definition: allocator_memory_strategy.hpp:355