rclcpp  master
C++ ROS Client Library API
allocator_memory_strategy.hpp
Go to the documentation of this file.
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
16 #define RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
17 
18 #include <memory>
19 #include <vector>
20 
21 #include "rcl/allocator.h"
22 
25 #include "rclcpp/node.hpp"
27 
28 #include "rcutils/logging_macros.h"
29 
30 #include "rmw/types.h"
31 
32 namespace rclcpp
33 {
34 namespace memory_strategies
35 {
36 namespace allocator_memory_strategy
37 {
38 
40 
45 template<typename Alloc = std::allocator<void>>
47 {
48 public:
50 
51  using VoidAllocTraits = typename allocator::AllocRebind<void *, Alloc>;
52  using VoidAlloc = typename VoidAllocTraits::allocator_type;
53 
54  explicit AllocatorMemoryStrategy(std::shared_ptr<Alloc> allocator)
55  {
56  allocator_ = std::make_shared<VoidAlloc>(*allocator.get());
57  }
58 
60  {
61  allocator_ = std::make_shared<VoidAlloc>();
62  }
63 
64  void add_guard_condition(const rcl_guard_condition_t * guard_condition)
65  {
66  for (const auto & existing_guard_condition : guard_conditions_) {
67  if (existing_guard_condition == guard_condition) {
68  return;
69  }
70  }
71  guard_conditions_.push_back(guard_condition);
72  }
73 
74  void remove_guard_condition(const rcl_guard_condition_t * guard_condition)
75  {
76  for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) {
77  if (*it == guard_condition) {
78  guard_conditions_.erase(it);
79  break;
80  }
81  }
82  }
83 
85  {
86  subscription_handles_.clear();
87  service_handles_.clear();
88  client_handles_.clear();
89  timer_handles_.clear();
90  waitable_handles_.clear();
91  }
92 
93  virtual void remove_null_handles(rcl_wait_set_t * wait_set)
94  {
95  // TODO(jacobperron): Check if wait set sizes are what we expect them to be?
96  // e.g. wait_set->size_of_clients == client_handles_.size()
97 
98  // Important to use subscription_handles_.size() instead of wait set's size since
99  // there may be more subscriptions in the wait set due to Waitables added to the end.
100  // The same logic applies for other entities.
101  for (size_t i = 0; i < subscription_handles_.size(); ++i) {
102  if (!wait_set->subscriptions[i]) {
103  subscription_handles_[i].reset();
104  }
105  }
106  for (size_t i = 0; i < service_handles_.size(); ++i) {
107  if (!wait_set->services[i]) {
108  service_handles_[i].reset();
109  }
110  }
111  for (size_t i = 0; i < client_handles_.size(); ++i) {
112  if (!wait_set->clients[i]) {
113  client_handles_[i].reset();
114  }
115  }
116  for (size_t i = 0; i < timer_handles_.size(); ++i) {
117  if (!wait_set->timers[i]) {
118  timer_handles_[i].reset();
119  }
120  }
121  for (size_t i = 0; i < waitable_handles_.size(); ++i) {
122  if (!waitable_handles_[i]->is_ready(wait_set)) {
123  waitable_handles_[i].reset();
124  }
125  }
126 
127  subscription_handles_.erase(
128  std::remove(subscription_handles_.begin(), subscription_handles_.end(), nullptr),
129  subscription_handles_.end()
130  );
131 
132  service_handles_.erase(
133  std::remove(service_handles_.begin(), service_handles_.end(), nullptr),
134  service_handles_.end()
135  );
136 
137  client_handles_.erase(
138  std::remove(client_handles_.begin(), client_handles_.end(), nullptr),
139  client_handles_.end()
140  );
141 
142  timer_handles_.erase(
143  std::remove(timer_handles_.begin(), timer_handles_.end(), nullptr),
144  timer_handles_.end()
145  );
146 
147  waitable_handles_.erase(
148  std::remove(waitable_handles_.begin(), waitable_handles_.end(), nullptr),
149  waitable_handles_.end()
150  );
151  }
152 
153  bool collect_entities(const WeakNodeList & weak_nodes)
154  {
155  bool has_invalid_weak_nodes = false;
156  for (auto & weak_node : weak_nodes) {
157  auto node = weak_node.lock();
158  if (!node) {
159  has_invalid_weak_nodes = true;
160  continue;
161  }
162  for (auto & weak_group : node->get_callback_groups()) {
163  auto group = weak_group.lock();
164  if (!group || !group->can_be_taken_from().load()) {
165  continue;
166  }
167  group->find_subscription_ptrs_if(
168  [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) {
169  subscription_handles_.push_back(subscription->get_subscription_handle());
170  return false;
171  });
172 
173  group->find_service_ptrs_if([this](const rclcpp::ServiceBase::SharedPtr & service) {
174  service_handles_.push_back(service->get_service_handle());
175  return false;
176  });
177  group->find_client_ptrs_if([this](const rclcpp::ClientBase::SharedPtr & client) {
178  client_handles_.push_back(client->get_client_handle());
179  return false;
180  });
181  group->find_timer_ptrs_if([this](const rclcpp::TimerBase::SharedPtr & timer) {
182  timer_handles_.push_back(timer->get_timer_handle());
183  return false;
184  });
185  group->find_waitable_ptrs_if([this](const rclcpp::Waitable::SharedPtr & waitable) {
186  waitable_handles_.push_back(waitable);
187  return false;
188  });
189  }
190  }
191  return has_invalid_weak_nodes;
192  }
193 
195  {
196  for (auto subscription : subscription_handles_) {
197  if (rcl_wait_set_add_subscription(wait_set, subscription.get(), NULL) != RCL_RET_OK) {
199  "rclcpp",
200  "Couldn't add subscription to wait set: %s", rcl_get_error_string().str);
201  return false;
202  }
203  }
204 
205  for (auto client : client_handles_) {
206  if (rcl_wait_set_add_client(wait_set, client.get(), NULL) != RCL_RET_OK) {
208  "rclcpp",
209  "Couldn't add client to wait set: %s", rcl_get_error_string().str);
210  return false;
211  }
212  }
213 
214  for (auto service : service_handles_) {
215  if (rcl_wait_set_add_service(wait_set, service.get(), NULL) != RCL_RET_OK) {
217  "rclcpp",
218  "Couldn't add service to wait set: %s", rcl_get_error_string().str);
219  return false;
220  }
221  }
222 
223  for (auto timer : timer_handles_) {
224  if (rcl_wait_set_add_timer(wait_set, timer.get(), NULL) != RCL_RET_OK) {
226  "rclcpp",
227  "Couldn't add timer to wait set: %s", rcl_get_error_string().str);
228  return false;
229  }
230  }
231 
232  for (auto guard_condition : guard_conditions_) {
233  if (rcl_wait_set_add_guard_condition(wait_set, guard_condition, NULL) != RCL_RET_OK) {
235  "rclcpp",
236  "Couldn't add guard_condition to wait set: %s",
237  rcl_get_error_string().str);
238  return false;
239  }
240  }
241 
242  for (auto waitable : waitable_handles_) {
243  if (!waitable->add_to_wait_set(wait_set)) {
245  "rclcpp",
246  "Couldn't add waitable to wait set: %s", rcl_get_error_string().str);
247  return false;
248  }
249  }
250  return true;
251  }
252 
253  virtual void
255  executor::AnyExecutable & any_exec,
256  const WeakNodeList & weak_nodes)
257  {
258  auto it = subscription_handles_.begin();
259  while (it != subscription_handles_.end()) {
260  auto subscription = get_subscription_by_handle(*it, weak_nodes);
261  if (subscription) {
262  // Find the group for this handle and see if it can be serviced
263  auto group = get_group_by_subscription(subscription, weak_nodes);
264  if (!group) {
265  // Group was not found, meaning the subscription is not valid...
266  // Remove it from the ready list and continue looking
267  it = subscription_handles_.erase(it);
268  continue;
269  }
270  if (!group->can_be_taken_from().load()) {
271  // Group is mutually exclusive and is being used, so skip it for now
272  // Leave it to be checked next time, but continue searching
273  ++it;
274  continue;
275  }
276  // Otherwise it is safe to set and return the any_exec
277  any_exec.subscription = subscription;
278  any_exec.callback_group = group;
279  any_exec.node_base = get_node_by_group(group, weak_nodes);
280  subscription_handles_.erase(it);
281  return;
282  }
283  // Else, the subscription is no longer valid, remove it and continue
284  it = subscription_handles_.erase(it);
285  }
286  }
287 
288  virtual void
290  executor::AnyExecutable & any_exec,
291  const WeakNodeList & weak_nodes)
292  {
293  auto it = service_handles_.begin();
294  while (it != service_handles_.end()) {
295  auto service = get_service_by_handle(*it, weak_nodes);
296  if (service) {
297  // Find the group for this handle and see if it can be serviced
298  auto group = get_group_by_service(service, weak_nodes);
299  if (!group) {
300  // Group was not found, meaning the service is not valid...
301  // Remove it from the ready list and continue looking
302  it = service_handles_.erase(it);
303  continue;
304  }
305  if (!group->can_be_taken_from().load()) {
306  // Group is mutually exclusive and is being used, so skip it for now
307  // Leave it to be checked next time, but continue searching
308  ++it;
309  continue;
310  }
311  // Otherwise it is safe to set and return the any_exec
312  any_exec.service = service;
313  any_exec.callback_group = group;
314  any_exec.node_base = get_node_by_group(group, weak_nodes);
315  service_handles_.erase(it);
316  return;
317  }
318  // Else, the service is no longer valid, remove it and continue
319  it = service_handles_.erase(it);
320  }
321  }
322 
323  virtual void
324  get_next_client(executor::AnyExecutable & any_exec, const WeakNodeList & weak_nodes)
325  {
326  auto it = client_handles_.begin();
327  while (it != client_handles_.end()) {
328  auto client = get_client_by_handle(*it, weak_nodes);
329  if (client) {
330  // Find the group for this handle and see if it can be serviced
331  auto group = get_group_by_client(client, weak_nodes);
332  if (!group) {
333  // Group was not found, meaning the service is not valid...
334  // Remove it from the ready list and continue looking
335  it = client_handles_.erase(it);
336  continue;
337  }
338  if (!group->can_be_taken_from().load()) {
339  // Group is mutually exclusive and is being used, so skip it for now
340  // Leave it to be checked next time, but continue searching
341  ++it;
342  continue;
343  }
344  // Otherwise it is safe to set and return the any_exec
345  any_exec.client = client;
346  any_exec.callback_group = group;
347  any_exec.node_base = get_node_by_group(group, weak_nodes);
348  client_handles_.erase(it);
349  return;
350  }
351  // Else, the service is no longer valid, remove it and continue
352  it = client_handles_.erase(it);
353  }
354  }
355 
356  virtual void
358  executor::AnyExecutable & any_exec,
359  const WeakNodeList & weak_nodes)
360  {
361  auto it = timer_handles_.begin();
362  while (it != timer_handles_.end()) {
363  auto timer = get_timer_by_handle(*it, weak_nodes);
364  if (timer) {
365  // Find the group for this handle and see if it can be serviced
366  auto group = get_group_by_timer(timer, weak_nodes);
367  if (!group) {
368  // Group was not found, meaning the timer is not valid...
369  // Remove it from the ready list and continue looking
370  it = timer_handles_.erase(it);
371  continue;
372  }
373  if (!group->can_be_taken_from().load()) {
374  // Group is mutually exclusive and is being used, so skip it for now
375  // Leave it to be checked next time, but continue searching
376  ++it;
377  continue;
378  }
379  // Otherwise it is safe to set and return the any_exec
380  any_exec.timer = timer;
381  any_exec.callback_group = group;
382  any_exec.node_base = get_node_by_group(group, weak_nodes);
383  timer_handles_.erase(it);
384  return;
385  }
386  // Else, the service is no longer valid, remove it and continue
387  it = timer_handles_.erase(it);
388  }
389  }
390 
391  virtual void
393  {
394  auto it = waitable_handles_.begin();
395  while (it != waitable_handles_.end()) {
396  auto waitable = *it;
397  if (waitable) {
398  // Find the group for this handle and see if it can be serviced
399  auto group = get_group_by_waitable(waitable, weak_nodes);
400  if (!group) {
401  // Group was not found, meaning the waitable is not valid...
402  // Remove it from the ready list and continue looking
403  it = waitable_handles_.erase(it);
404  continue;
405  }
406  if (!group->can_be_taken_from().load()) {
407  // Group is mutually exclusive and is being used, so skip it for now
408  // Leave it to be checked next time, but continue searching
409  ++it;
410  continue;
411  }
412  // Otherwise it is safe to set and return the any_exec
413  any_exec.waitable = waitable;
414  any_exec.callback_group = group;
415  any_exec.node_base = get_node_by_group(group, weak_nodes);
416  waitable_handles_.erase(it);
417  return;
418  }
419  // Else, the waitable is no longer valid, remove it and continue
420  it = waitable_handles_.erase(it);
421  }
422  }
423 
425  {
426  return rclcpp::allocator::get_rcl_allocator<void *, VoidAlloc>(*allocator_.get());
427  }
428 
430  {
431  size_t number_of_subscriptions = subscription_handles_.size();
432  for (auto waitable : waitable_handles_) {
433  number_of_subscriptions += waitable->get_number_of_ready_subscriptions();
434  }
435  return number_of_subscriptions;
436  }
437 
439  {
440  size_t number_of_services = service_handles_.size();
441  for (auto waitable : waitable_handles_) {
442  number_of_services += waitable->get_number_of_ready_services();
443  }
444  return number_of_services;
445  }
446 
447  size_t number_of_ready_events() const
448  {
449  size_t number_of_events = 0;
450  for (auto waitable : waitable_handles_) {
451  number_of_events += waitable->get_number_of_ready_events();
452  }
453  return number_of_events;
454  }
455 
456  size_t number_of_ready_clients() const
457  {
458  size_t number_of_clients = client_handles_.size();
459  for (auto waitable : waitable_handles_) {
460  number_of_clients += waitable->get_number_of_ready_clients();
461  }
462  return number_of_clients;
463  }
464 
466  {
467  size_t number_of_guard_conditions = guard_conditions_.size();
468  for (auto waitable : waitable_handles_) {
469  number_of_guard_conditions += waitable->get_number_of_ready_guard_conditions();
470  }
472  }
473 
474  size_t number_of_ready_timers() const
475  {
476  size_t number_of_timers = timer_handles_.size();
477  for (auto waitable : waitable_handles_) {
478  number_of_timers += waitable->get_number_of_ready_timers();
479  }
480  return number_of_timers;
481  }
482 
483  size_t number_of_waitables() const
484  {
485  return waitable_handles_.size();
486  }
487 
488 private:
489  template<typename T>
490  using VectorRebind =
492 
494 
499  VectorRebind<std::shared_ptr<Waitable>> waitable_handles_;
500 
501  std::shared_ptr<VoidAlloc> allocator_;
502 };
503 
504 } // namespace allocator_memory_strategy
505 } // namespace memory_strategies
506 } // namespace rclcpp
507 
508 #endif // RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
#define rcl_get_error_string
const rcl_service_t ** services
rclcpp::ServiceBase::SharedPtr service
Definition: any_executable.hpp:46
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base
Definition: any_executable.hpp:51
Delegate for handling memory allocations while the Executor is executing.
Definition: memory_strategy.hpp:41
rclcpp::Waitable::SharedPtr waitable
Definition: any_executable.hpp:48
size_t number_of_guard_conditions() const
Definition: allocator_memory_strategy.hpp:465
rclcpp::TimerBase::SharedPtr timer
Definition: any_executable.hpp:45
Definition: any_executable.hpp:35
virtual rcl_allocator_t get_allocator()
Definition: allocator_memory_strategy.hpp:424
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
rclcpp::ClientBase::SharedPtr client
Definition: any_executable.hpp:47
size_t number_of_ready_subscriptions() const
Definition: allocator_memory_strategy.hpp:429
size_t number_of_ready_services() const
Definition: allocator_memory_strategy.hpp:438
const rcl_client_t ** clients
AllocatorMemoryStrategy()
Definition: allocator_memory_strategy.hpp:59
bool collect_entities(const WeakNodeList &weak_nodes)
Definition: allocator_memory_strategy.hpp:153
T remove(T... args)
typename VoidAllocTraits::allocator_type VoidAlloc
Definition: allocator_memory_strategy.hpp:52
const rcl_timer_t ** timers
virtual void get_next_service(executor::AnyExecutable &any_exec, const WeakNodeList &weak_nodes)
Definition: allocator_memory_strategy.hpp:289
#define RCL_RET_OK
rclcpp::SubscriptionBase::SharedPtr subscription
Definition: any_executable.hpp:44
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
virtual void get_next_waitable(executor::AnyExecutable &any_exec, const WeakNodeList &weak_nodes)
Definition: allocator_memory_strategy.hpp:392
static rclcpp::node_interfaces::NodeBaseInterface::SharedPtr get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr group, const WeakNodeList &weak_nodes)
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
virtual void get_next_client(executor::AnyExecutable &any_exec, const WeakNodeList &weak_nodes)
Definition: allocator_memory_strategy.hpp:324
virtual void get_next_subscription(executor::AnyExecutable &any_exec, const WeakNodeList &weak_nodes)
Definition: allocator_memory_strategy.hpp:254
void add_guard_condition(const rcl_guard_condition_t *guard_condition)
Definition: allocator_memory_strategy.hpp:64
static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_waitable(rclcpp::Waitable::SharedPtr waitable, const WeakNodeList &weak_nodes)
static rclcpp::TimerBase::SharedPtr get_timer_by_handle(std::shared_ptr< const rcl_timer_t > timer_handle, const WeakNodeList &weak_nodes)
rcl_ret_t rcl_wait_set_add_timer(rcl_wait_set_t *wait_set, const rcl_timer_t *timer, size_t *index)
static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_client(rclcpp::ClientBase::SharedPtr client, const WeakNodeList &weak_nodes)
size_t number_of_ready_clients() const
Definition: allocator_memory_strategy.hpp:456
typename allocator::AllocRebind< void *, Alloc > VoidAllocTraits
Definition: allocator_memory_strategy.hpp:51
rcl_ret_t rcl_wait_set_add_service(rcl_wait_set_t *wait_set, const rcl_service_t *service, size_t *index)
#define RCUTILS_LOG_ERROR_NAMED(name,...)
rclcpp::callback_group::CallbackGroup::SharedPtr callback_group
Definition: any_executable.hpp:50
T get(T... args)
virtual void remove_null_handles(rcl_wait_set_t *wait_set)
Definition: allocator_memory_strategy.hpp:93
static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_service(rclcpp::ServiceBase::SharedPtr service, const WeakNodeList &weak_nodes)
void remove_guard_condition(const rcl_guard_condition_t *guard_condition)
Definition: allocator_memory_strategy.hpp:74
Set the data type used in the intra-process buffer as std::shared_ptr<MessageT>
void clear_handles()
Definition: allocator_memory_strategy.hpp:84
static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_subscription(rclcpp::SubscriptionBase::SharedPtr subscription, const WeakNodeList &weak_nodes)
static rclcpp::SubscriptionBase::SharedPtr get_subscription_by_handle(std::shared_ptr< const rcl_subscription_t > subscriber_handle, const WeakNodeList &weak_nodes)
static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_timer(rclcpp::TimerBase::SharedPtr timer, const WeakNodeList &weak_nodes)
rcl_ret_t rcl_wait_set_add_subscription(rcl_wait_set_t *wait_set, const rcl_subscription_t *subscription, size_t *index)
size_t number_of_waitables() const
Definition: allocator_memory_strategy.hpp:483
rcl_ret_t rcl_wait_set_add_client(rcl_wait_set_t *wait_set, const rcl_client_t *client, size_t *index)
virtual void get_next_timer(executor::AnyExecutable &any_exec, const WeakNodeList &weak_nodes)
Definition: allocator_memory_strategy.hpp:357
static rclcpp::ServiceBase::SharedPtr get_service_by_handle(std::shared_ptr< const rcl_service_t > service_handle, const WeakNodeList &weak_nodes)
static rclcpp::ClientBase::SharedPtr get_client_by_handle(std::shared_ptr< const rcl_client_t > client_handle, const WeakNodeList &weak_nodes)
size_t number_of_ready_timers() const
Definition: allocator_memory_strategy.hpp:474
size_t number_of_ready_events() const
Definition: allocator_memory_strategy.hpp:447
bool add_handles_to_wait_set(rcl_wait_set_t *wait_set)
Definition: allocator_memory_strategy.hpp:194
Delegate for handling memory allocations while the Executor is executing.
Definition: allocator_memory_strategy.hpp:46
const rcl_subscription_t ** subscriptions
rcl_ret_t rcl_wait_set_add_guard_condition(rcl_wait_set_t *wait_set, const rcl_guard_condition_t *guard_condition, size_t *index)