rclcpp  beta1
C++ ROS Client Library API
allocator_memory_strategy.hpp
Go to the documentation of this file.
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
16 #define RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
17 
18 #include <memory>
19 #include <vector>
20 
21 #include "rcl/allocator.h"
22 
25 #include "rclcpp/node.hpp"
27 
28 #include "rmw/types.h"
29 
30 namespace rclcpp
31 {
32 namespace memory_strategies
33 {
34 namespace allocator_memory_strategy
35 {
36 
38 
43 template<typename Alloc = std::allocator<void>>
45 {
46 public:
48 
49  using ExecAllocTraits = allocator::AllocRebind<executor::AnyExecutable, Alloc>;
50  using ExecAlloc = typename ExecAllocTraits::allocator_type;
51  using ExecDeleter = allocator::Deleter<ExecAlloc, executor::AnyExecutable>;
52  using VoidAllocTraits = typename allocator::AllocRebind<void *, Alloc>;
53  using VoidAlloc = typename VoidAllocTraits::allocator_type;
54 
55  explicit AllocatorMemoryStrategy(std::shared_ptr<Alloc> allocator)
56  {
57  executable_allocator_ = std::make_shared<ExecAlloc>(*allocator.get());
58  allocator_ = std::make_shared<VoidAlloc>(*allocator.get());
59  }
60 
62  {
63  executable_allocator_ = std::make_shared<ExecAlloc>();
64  allocator_ = std::make_shared<VoidAlloc>();
65  }
66 
67  void add_guard_condition(const rcl_guard_condition_t * guard_condition)
68  {
69  for (const auto & existing_guard_condition : guard_conditions_) {
70  if (existing_guard_condition == guard_condition) {
71  return;
72  }
73  }
74  guard_conditions_.push_back(guard_condition);
75  }
76 
77  void remove_guard_condition(const rcl_guard_condition_t * guard_condition)
78  {
79  for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) {
80  if (*it == guard_condition) {
81  guard_conditions_.erase(it);
82  break;
83  }
84  }
85  }
86 
88  {
89  subscription_handles_.clear();
90  service_handles_.clear();
91  client_handles_.clear();
92  timer_handles_.clear();
93  }
94 
95  virtual void remove_null_handles(rcl_wait_set_t * wait_set)
96  {
97  for (size_t i = 0; i < wait_set->size_of_subscriptions; ++i) {
98  if (!wait_set->subscriptions[i]) {
99  subscription_handles_[i] = nullptr;
100  }
101  }
102  for (size_t i = 0; i < wait_set->size_of_services; ++i) {
103  if (!wait_set->services[i]) {
104  service_handles_[i] = nullptr;
105  }
106  }
107  for (size_t i = 0; i < wait_set->size_of_clients; ++i) {
108  if (!wait_set->clients[i]) {
109  client_handles_[i] = nullptr;
110  }
111  }
112  for (size_t i = 0; i < wait_set->size_of_timers; ++i) {
113  if (!wait_set->timers[i]) {
114  timer_handles_[i] = nullptr;
115  }
116  }
117 
118  subscription_handles_.erase(
119  std::remove(subscription_handles_.begin(), subscription_handles_.end(), nullptr),
120  subscription_handles_.end()
121  );
122 
123  service_handles_.erase(
124  std::remove(service_handles_.begin(), service_handles_.end(), nullptr),
125  service_handles_.end()
126  );
127 
128  client_handles_.erase(
129  std::remove(client_handles_.begin(), client_handles_.end(), nullptr),
130  client_handles_.end()
131  );
132 
133  timer_handles_.erase(
134  std::remove(timer_handles_.begin(), timer_handles_.end(), nullptr),
135  timer_handles_.end()
136  );
137  }
138 
139  bool collect_entities(const WeakNodeVector & weak_nodes)
140  {
141  bool has_invalid_weak_nodes = false;
142  for (auto & weak_node : weak_nodes) {
143  auto node = weak_node.lock();
144  if (!node) {
145  has_invalid_weak_nodes = true;
146  continue;
147  }
148  for (auto & weak_group : node->get_callback_groups()) {
149  auto group = weak_group.lock();
150  if (!group || !group->can_be_taken_from().load()) {
151  continue;
152  }
153  for (auto & weak_subscription : group->get_subscription_ptrs()) {
154  auto subscription = weak_subscription.lock();
155  if (subscription) {
156  subscription_handles_.push_back(subscription->get_subscription_handle());
157  if (subscription->get_intra_process_subscription_handle()) {
158  subscription_handles_.push_back(
159  subscription->get_intra_process_subscription_handle());
160  }
161  }
162  }
163  for (auto & weak_service : group->get_service_ptrs()) {
164  auto service = weak_service.lock();
165  if (service) {
166  service_handles_.push_back(service->get_service_handle());
167  }
168  }
169  for (auto & weak_client : group->get_client_ptrs()) {
170  auto client = weak_client.lock();
171  if (client) {
172  client_handles_.push_back(client->get_client_handle());
173  }
174  }
175  for (auto & weak_timer : group->get_timer_ptrs()) {
176  auto timer = weak_timer.lock();
177  if (timer) {
178  timer_handles_.push_back(timer->get_timer_handle());
179  }
180  }
181  }
182  }
183  return has_invalid_weak_nodes;
184  }
185 
187  {
188  for (auto subscription : subscription_handles_) {
189  if (rcl_wait_set_add_subscription(wait_set, subscription) != RCL_RET_OK) {
190  fprintf(stderr, "Couldn't add subscription to waitset: %s\n", rcl_get_error_string_safe());
191  return false;
192  }
193  }
194 
195  for (auto client : client_handles_) {
196  if (rcl_wait_set_add_client(wait_set, client) != RCL_RET_OK) {
197  fprintf(stderr, "Couldn't add client to waitset: %s\n", rcl_get_error_string_safe());
198  return false;
199  }
200  }
201 
202  for (auto service : service_handles_) {
203  if (rcl_wait_set_add_service(wait_set, service) != RCL_RET_OK) {
204  fprintf(stderr, "Couldn't add service to waitset: %s\n", rcl_get_error_string_safe());
205  return false;
206  }
207  }
208 
209  for (auto timer : timer_handles_) {
210  if (rcl_wait_set_add_timer(wait_set, timer) != RCL_RET_OK) {
211  fprintf(stderr, "Couldn't add timer to waitset: %s\n", rcl_get_error_string_safe());
212  return false;
213  }
214  }
215 
216  for (auto guard_condition : guard_conditions_) {
217  if (rcl_wait_set_add_guard_condition(wait_set, guard_condition) != RCL_RET_OK) {
218  fprintf(stderr, "Couldn't add guard_condition to waitset: %s\n",
220  return false;
221  }
222  }
223  return true;
224  }
225 
227  // \return Shared pointer to the fresh executable.
228  executor::AnyExecutable::SharedPtr instantiate_next_executable()
229  {
230  return std::allocate_shared<executor::AnyExecutable>(*executable_allocator_.get());
231  }
232 
233  virtual void
234  get_next_subscription(executor::AnyExecutable::SharedPtr any_exec,
235  const WeakNodeVector & weak_nodes)
236  {
237  auto it = subscription_handles_.begin();
238  while (it != subscription_handles_.end()) {
239  auto subscription = get_subscription_by_handle(*it, weak_nodes);
240  if (subscription) {
241  // Figure out if this is for intra-process or not.
242  bool is_intra_process = false;
243  if (subscription->get_intra_process_subscription_handle()) {
244  is_intra_process = subscription->get_intra_process_subscription_handle() == *it;
245  }
246  // Find the group for this handle and see if it can be serviced
247  auto group = get_group_by_subscription(subscription, weak_nodes);
248  if (!group) {
249  // Group was not found, meaning the subscription is not valid...
250  // Remove it from the ready list and continue looking
251  subscription_handles_.erase(it);
252  continue;
253  }
254  if (!group->can_be_taken_from().load()) {
255  // Group is mutually exclusive and is being used, so skip it for now
256  // Leave it to be checked next time, but continue searching
257  ++it;
258  continue;
259  }
260  // Otherwise it is safe to set and return the any_exec
261  if (is_intra_process) {
262  any_exec->subscription_intra_process = subscription;
263  } else {
264  any_exec->subscription = subscription;
265  }
266  any_exec->callback_group = group;
267  any_exec->node_base = get_node_by_group(group, weak_nodes);
268  subscription_handles_.erase(it);
269  return;
270  }
271  // Else, the subscription is no longer valid, remove it and continue
272  subscription_handles_.erase(it);
273  }
274  }
275 
276  virtual void
277  get_next_service(executor::AnyExecutable::SharedPtr any_exec,
278  const WeakNodeVector & weak_nodes)
279  {
280  auto it = service_handles_.begin();
281  while (it != service_handles_.end()) {
282  auto service = get_service_by_handle(*it, weak_nodes);
283  if (service) {
284  // Find the group for this handle and see if it can be serviced
285  auto group = get_group_by_service(service, weak_nodes);
286  if (!group) {
287  // Group was not found, meaning the service is not valid...
288  // Remove it from the ready list and continue looking
289  service_handles_.erase(it);
290  continue;
291  }
292  if (!group->can_be_taken_from().load()) {
293  // Group is mutually exclusive and is being used, so skip it for now
294  // Leave it to be checked next time, but continue searching
295  ++it;
296  continue;
297  }
298  // Otherwise it is safe to set and return the any_exec
299  any_exec->service = service;
300  any_exec->callback_group = group;
301  any_exec->node_base = get_node_by_group(group, weak_nodes);
302  service_handles_.erase(it);
303  return;
304  }
305  // Else, the service is no longer valid, remove it and continue
306  service_handles_.erase(it);
307  }
308  }
309 
310  virtual void
311  get_next_client(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector & weak_nodes)
312  {
313  auto it = client_handles_.begin();
314  while (it != client_handles_.end()) {
315  auto client = get_client_by_handle(*it, weak_nodes);
316  if (client) {
317  // Find the group for this handle and see if it can be serviced
318  auto group = get_group_by_client(client, weak_nodes);
319  if (!group) {
320  // Group was not found, meaning the service is not valid...
321  // Remove it from the ready list and continue looking
322  client_handles_.erase(it);
323  continue;
324  }
325  if (!group->can_be_taken_from().load()) {
326  // Group is mutually exclusive and is being used, so skip it for now
327  // Leave it to be checked next time, but continue searching
328  ++it;
329  continue;
330  }
331  // Otherwise it is safe to set and return the any_exec
332  any_exec->client = client;
333  any_exec->callback_group = group;
334  any_exec->node_base = get_node_by_group(group, weak_nodes);
335  client_handles_.erase(it);
336  return;
337  }
338  // Else, the service is no longer valid, remove it and continue
339  client_handles_.erase(it);
340  }
341  }
342 
344  {
345  return rclcpp::allocator::get_rcl_allocator<void *, VoidAlloc>(*allocator_.get());
346  }
347 
349  {
350  return subscription_handles_.size();
351  }
352 
354  {
355  return service_handles_.size();
356  }
357 
358  size_t number_of_ready_clients() const
359  {
360  return client_handles_.size();
361  }
362 
364  {
365  return guard_conditions_.size();
366  }
367 
368  size_t number_of_ready_timers() const
369  {
370  return timer_handles_.size();
371  }
372 
373 private:
374  template<typename T>
375  using VectorRebind =
377 
379 
380  VectorRebind<const rcl_subscription_t *> subscription_handles_;
381  VectorRebind<const rcl_service_t *> service_handles_;
382  VectorRebind<const rcl_client_t *> client_handles_;
383  VectorRebind<const rcl_timer_t *> timer_handles_;
384 
385  std::shared_ptr<ExecAlloc> executable_allocator_;
386  std::shared_ptr<VoidAlloc> allocator_;
387 };
388 
389 } // namespace allocator_memory_strategy
390 } // namespace memory_strategies
391 } // namespace rclcpp
392 
393 #endif // RCLCPP__STRATEGIES__ALLOCATOR_MEMORY_STRATEGY_HPP_
static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_client(rclcpp::client::ClientBase::SharedPtr client, const WeakNodeVector &weak_nodes)
const rcl_service_t ** services
virtual void get_next_service(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector &weak_nodes)
Definition: allocator_memory_strategy.hpp:277
virtual void remove_null_handles(rcl_wait_set_t *wait_set)
Definition: allocator_memory_strategy.hpp:95
Delegate for handling memory allocations while the Executor is executing.
Definition: memory_strategy.hpp:40
AllocatorMemoryStrategy()
Definition: allocator_memory_strategy.hpp:61
executor::AnyExecutable::SharedPtr instantiate_next_executable()
Provide a newly initialized AnyExecutable object.
Definition: allocator_memory_strategy.hpp:228
size_t size_of_clients
rcl_ret_t rcl_wait_set_add_subscription(rcl_wait_set_t *wait_set, const rcl_subscription_t *subscription)
size_t size_of_subscriptions
size_t size_of_services
Definition: allocator_common.hpp:24
const rcl_client_t ** clients
T remove(T... args)
const rcl_timer_t ** timers
static rclcpp::node_interfaces::NodeBaseInterface::SharedPtr get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr group, const WeakNodeVector &weak_nodes)
#define RCL_RET_OK
size_t number_of_guard_conditions() const
Definition: allocator_memory_strategy.hpp:363
bool collect_entities(const WeakNodeVector &weak_nodes)
Definition: allocator_memory_strategy.hpp:139
Delegate for handling memory allocations while the Executor is executing.
Definition: allocator_memory_strategy.hpp:44
size_t number_of_ready_clients() const
Definition: allocator_memory_strategy.hpp:358
rcl_ret_t rcl_wait_set_add_guard_condition(rcl_wait_set_t *wait_set, const rcl_guard_condition_t *guard_condition)
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
size_t size_of_timers
void remove_guard_condition(const rcl_guard_condition_t *guard_condition)
Definition: allocator_memory_strategy.hpp:77
void clear_handles()
Definition: allocator_memory_strategy.hpp:87
virtual void get_next_client(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector &weak_nodes)
Definition: allocator_memory_strategy.hpp:311
#define rcl_get_error_string_safe
size_t number_of_ready_timers() const
Definition: allocator_memory_strategy.hpp:368
rcl_ret_t rcl_wait_set_add_client(rcl_wait_set_t *wait_set, const rcl_client_t *client)
T get(T... args)
typename ExecAllocTraits::allocator_type ExecAlloc
Definition: allocator_memory_strategy.hpp:50
static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_service(rclcpp::service::ServiceBase::SharedPtr service, const WeakNodeVector &weak_nodes)
rcl_ret_t rcl_wait_set_add_service(rcl_wait_set_t *wait_set, const rcl_service_t *service)
static rclcpp::subscription::SubscriptionBase::SharedPtr get_subscription_by_handle(const rcl_subscription_t *subscriber_handle, const WeakNodeVector &weak_nodes)
static rclcpp::client::ClientBase::SharedPtr get_client_by_handle(const rcl_client_t *client_handle, const WeakNodeVector &weak_nodes)
static rclcpp::service::ServiceBase::SharedPtr get_service_by_handle(const rcl_service_t *service_handle, const WeakNodeVector &weak_nodes)
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
virtual rcl_allocator_t get_allocator()
Definition: allocator_memory_strategy.hpp:343
virtual void get_next_subscription(executor::AnyExecutable::SharedPtr any_exec, const WeakNodeVector &weak_nodes)
Definition: allocator_memory_strategy.hpp:234
allocator::Deleter< ExecAlloc, executor::AnyExecutable > ExecDeleter
Definition: allocator_memory_strategy.hpp:51
static rclcpp::callback_group::CallbackGroup::SharedPtr get_group_by_subscription(rclcpp::subscription::SubscriptionBase::SharedPtr subscription, const WeakNodeVector &weak_nodes)
void add_guard_condition(const rcl_guard_condition_t *guard_condition)
Definition: allocator_memory_strategy.hpp:67
allocator::AllocRebind< executor::AnyExecutable, Alloc > ExecAllocTraits
Definition: allocator_memory_strategy.hpp:49
typename allocator::AllocRebind< void *, Alloc > VoidAllocTraits
Definition: allocator_memory_strategy.hpp:52
bool add_handles_to_waitset(rcl_wait_set_t *wait_set)
Definition: allocator_memory_strategy.hpp:186
rcl_ret_t rcl_wait_set_add_timer(rcl_wait_set_t *wait_set, const rcl_timer_t *timer)
size_t number_of_ready_subscriptions() const
Definition: allocator_memory_strategy.hpp:348
typename VoidAllocTraits::allocator_type VoidAlloc
Definition: allocator_memory_strategy.hpp:53
const rcl_subscription_t ** subscriptions
size_t number_of_ready_services() const
Definition: allocator_memory_strategy.hpp:353