rclcpp  master
C++ ROS Client Library API
wait_set_template.hpp
Go to the documentation of this file.
1 // Copyright 2020 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__WAIT_SET_TEMPLATE_HPP_
16 #define RCLCPP__WAIT_SET_TEMPLATE_HPP_
17 
18 #include <chrono>
19 #include <memory>
20 #include <utility>
21 
22 #include "rcl/wait.h"
23 
24 #include "rclcpp/client.hpp"
25 #include "rclcpp/context.hpp"
28 #include "rclcpp/macros.hpp"
29 #include "rclcpp/scope_exit.hpp"
30 #include "rclcpp/service.hpp"
33 #include "rclcpp/timer.hpp"
35 #include "rclcpp/wait_result.hpp"
36 #include "rclcpp/waitable.hpp"
37 
38 namespace rclcpp
39 {
40 
42 
46 template<class SynchronizationPolicy, class StoragePolicy>
47 class WaitSetTemplate final : private SynchronizationPolicy, private StoragePolicy
48 {
49 public:
51 
52  using typename StoragePolicy::SubscriptionEntry;
53  using typename StoragePolicy::WaitableEntry;
54 
56 
70  explicit
72  const typename StoragePolicy::SubscriptionsIterable & subscriptions = {},
73  const typename StoragePolicy::GuardConditionsIterable & guard_conditions = {},
74  const typename StoragePolicy::TimersIterable & timers = {},
75  const typename StoragePolicy::ClientsIterable & clients = {},
76  const typename StoragePolicy::ServicesIterable & services = {},
77  const typename StoragePolicy::WaitablesIterable & waitables = {},
78  rclcpp::Context::SharedPtr context =
80  : SynchronizationPolicy(context),
81  StoragePolicy(
82  subscriptions,
83  guard_conditions,
84  // this method comes from the SynchronizationPolicy
85  this->get_extra_guard_conditions(),
86  timers,
87  clients,
88  services,
89  waitables,
90  context)
91  {}
92 
94 
99  const rcl_wait_set_t &
101  {
102  // this method comes from the StoragePolicy
103  return this->storage_get_rcl_wait_set();
104  }
105 
107 
129  void
133  {
134  if (nullptr == subscription) {
135  throw std::invalid_argument("subscription is nullptr");
136  }
137  // this method comes from the SynchronizationPolicy
138  this->sync_add_subscription(
139  std::move(subscription),
140  mask,
141  [this](
142  std::shared_ptr<rclcpp::SubscriptionBase> && inner_subscription,
143  const rclcpp::SubscriptionWaitSetMask & mask)
144  {
145  // These methods comes from the StoragePolicy, and may not exist for
146  // fixed sized storage policies.
147  // It will throw if the subscription has already been added.
148  if (mask.include_subscription) {
149  auto local_subscription = inner_subscription;
150  bool already_in_use =
151  local_subscription->exchange_in_use_by_wait_set_state(local_subscription.get(), true);
152  if (already_in_use) {
153  throw std::runtime_error("subscription already associated with a wait set");
154  }
155  this->storage_add_subscription(std::move(local_subscription));
156  }
157  if (mask.include_events) {
158  for (auto event : inner_subscription->get_event_handlers()) {
159  auto local_subscription = inner_subscription;
160  bool already_in_use =
161  local_subscription->exchange_in_use_by_wait_set_state(event.get(), true);
162  if (already_in_use) {
163  throw std::runtime_error("subscription event already associated with a wait set");
164  }
165  this->storage_add_waitable(std::move(event), std::move(local_subscription));
166  }
167  }
169  auto local_subscription = inner_subscription;
170  auto waitable = inner_subscription->get_intra_process_waitable();
171  if (nullptr != waitable) {
172  bool already_in_use = local_subscription->exchange_in_use_by_wait_set_state(
173  waitable.get(),
174  true);
175  if (already_in_use) {
176  throw std::runtime_error(
177  "subscription intra-process waitable already associated with a wait set");
178  }
179  this->storage_add_waitable(
180  std::move(inner_subscription->get_intra_process_waitable()),
181  std::move(local_subscription));
182  }
183  }
184  });
185  }
186 
188 
203  void
207  {
208  if (nullptr == subscription) {
209  throw std::invalid_argument("subscription is nullptr");
210  }
211  // this method comes from the SynchronizationPolicy
212  this->sync_remove_subscription(
213  std::move(subscription),
214  mask,
215  [this](
216  std::shared_ptr<rclcpp::SubscriptionBase> && inner_subscription,
217  const rclcpp::SubscriptionWaitSetMask & mask)
218  {
219  // This method comes from the StoragePolicy, and it may not exist for
220  // fixed sized storage policies.
221  // It will throw if the subscription is not in the wait set.
222  if (mask.include_subscription) {
223  auto local_subscription = inner_subscription;
224  local_subscription->exchange_in_use_by_wait_set_state(local_subscription.get(), false);
225  this->storage_remove_subscription(std::move(local_subscription));
226  }
227  if (mask.include_events) {
228  for (auto event : inner_subscription->get_event_handlers()) {
229  auto local_subscription = inner_subscription;
230  local_subscription->exchange_in_use_by_wait_set_state(event.get(), false);
231  this->storage_remove_waitable(std::move(event));
232  }
233  }
235  auto local_waitable = inner_subscription->get_intra_process_waitable();
236  inner_subscription->exchange_in_use_by_wait_set_state(local_waitable.get(), false);
237  if (nullptr != local_waitable) {
238  // This is the case when intra process is disabled for the subscription.
239  this->storage_remove_waitable(std::move(local_waitable));
240  }
241  }
242  });
243  }
244 
246 
272  void
274  {
275  if (nullptr == guard_condition) {
276  throw std::invalid_argument("guard_condition is nullptr");
277  }
278  // this method comes from the SynchronizationPolicy
279  this->sync_add_guard_condition(
280  std::move(guard_condition),
281  [this](std::shared_ptr<rclcpp::GuardCondition> && inner_guard_condition) {
282  bool already_in_use = inner_guard_condition->exchange_in_use_by_wait_set_state(true);
283  if (already_in_use) {
284  throw std::runtime_error("guard condition already in use by another wait set");
285  }
286  // This method comes from the StoragePolicy, and it may not exist for
287  // fixed sized storage policies.
288  // It will throw if the guard condition has already been added.
289  this->storage_add_guard_condition(std::move(inner_guard_condition));
290  });
291  }
292 
294 
312  void
314  {
315  if (nullptr == guard_condition) {
316  throw std::invalid_argument("guard_condition is nullptr");
317  }
318  // this method comes from the SynchronizationPolicy
319  this->sync_remove_guard_condition(
320  std::move(guard_condition),
321  [this](std::shared_ptr<rclcpp::GuardCondition> && inner_guard_condition) {
322  inner_guard_condition->exchange_in_use_by_wait_set_state(false);
323  // This method comes from the StoragePolicy, and it may not exist for
324  // fixed sized storage policies.
325  // It will throw if the guard condition is not in the wait set.
326  this->storage_remove_guard_condition(std::move(inner_guard_condition));
327  });
328  }
329 
331 
340  void
342  {
343  if (nullptr == timer) {
344  throw std::invalid_argument("timer is nullptr");
345  }
346  // this method comes from the SynchronizationPolicy
347  this->sync_add_timer(
348  std::move(timer),
349  [this](std::shared_ptr<rclcpp::TimerBase> && inner_timer) {
350  bool already_in_use = inner_timer->exchange_in_use_by_wait_set_state(true);
351  if (already_in_use) {
352  throw std::runtime_error("timer already in use by another wait set");
353  }
354  // This method comes from the StoragePolicy, and it may not exist for
355  // fixed sized storage policies.
356  // It will throw if the timer has already been added.
357  this->storage_add_timer(std::move(inner_timer));
358  });
359  }
360 
362 
370  void
372  {
373  if (nullptr == timer) {
374  throw std::invalid_argument("timer is nullptr");
375  }
376  // this method comes from the SynchronizationPolicy
377  this->sync_remove_timer(
378  std::move(timer),
379  [this](std::shared_ptr<rclcpp::TimerBase> && inner_timer) {
380  inner_timer->exchange_in_use_by_wait_set_state(false);
381  // This method comes from the StoragePolicy, and it may not exist for
382  // fixed sized storage policies.
383  // It will throw if the timer is not in the wait set.
384  this->storage_remove_timer(std::move(inner_timer));
385  });
386  }
387 
389 
398  void
400  {
401  if (nullptr == client) {
402  throw std::invalid_argument("client is nullptr");
403  }
404  // this method comes from the SynchronizationPolicy
405  this->sync_add_client(
406  std::move(client),
407  [this](std::shared_ptr<rclcpp::ClientBase> && inner_client) {
408  bool already_in_use = inner_client->exchange_in_use_by_wait_set_state(true);
409  if (already_in_use) {
410  throw std::runtime_error("client already in use by another wait set");
411  }
412  // This method comes from the StoragePolicy, and it may not exist for
413  // fixed sized storage policies.
414  // It will throw if the client has already been added.
415  this->storage_add_client(std::move(inner_client));
416  });
417  }
418 
420 
428  void
430  {
431  if (nullptr == client) {
432  throw std::invalid_argument("client is nullptr");
433  }
434  // this method comes from the SynchronizationPolicy
435  this->sync_remove_client(
436  std::move(client),
437  [this](std::shared_ptr<rclcpp::ClientBase> && inner_client) {
438  inner_client->exchange_in_use_by_wait_set_state(false);
439  // This method comes from the StoragePolicy, and it may not exist for
440  // fixed sized storage policies.
441  // It will throw if the client is not in the wait set.
442  this->storage_remove_client(std::move(inner_client));
443  });
444  }
445 
447 
456  void
458  {
459  if (nullptr == service) {
460  throw std::invalid_argument("service is nullptr");
461  }
462  // this method comes from the SynchronizationPolicy
463  this->sync_add_service(
464  std::move(service),
465  [this](std::shared_ptr<rclcpp::ServiceBase> && inner_service) {
466  bool already_in_use = inner_service->exchange_in_use_by_wait_set_state(true);
467  if (already_in_use) {
468  throw std::runtime_error("service already in use by another wait set");
469  }
470  // This method comes from the StoragePolicy, and it may not exist for
471  // fixed sized storage policies.
472  // It will throw if the service has already been added.
473  this->storage_add_service(std::move(inner_service));
474  });
475  }
476 
478 
486  void
488  {
489  if (nullptr == service) {
490  throw std::invalid_argument("service is nullptr");
491  }
492  // this method comes from the SynchronizationPolicy
493  this->sync_remove_service(
494  std::move(service),
495  [this](std::shared_ptr<rclcpp::ServiceBase> && inner_service) {
496  inner_service->exchange_in_use_by_wait_set_state(false);
497  // This method comes from the StoragePolicy, and it may not exist for
498  // fixed sized storage policies.
499  // It will throw if the service is not in the wait set.
500  this->storage_remove_service(std::move(inner_service));
501  });
502  }
503 
505 
528  void
531  std::shared_ptr<void> associated_entity = nullptr)
532  {
533  if (nullptr == waitable) {
534  throw std::invalid_argument("waitable is nullptr");
535  }
536  // this method comes from the SynchronizationPolicy
537  this->sync_add_waitable(
538  std::move(waitable),
539  std::move(associated_entity),
540  [this](
541  std::shared_ptr<rclcpp::Waitable> && inner_waitable,
542  std::shared_ptr<void> && associated_entity)
543  {
544  bool already_in_use = inner_waitable->exchange_in_use_by_wait_set_state(true);
545  if (already_in_use) {
546  throw std::runtime_error("waitable already in use by another wait set");
547  }
548  // This method comes from the StoragePolicy, and it may not exist for
549  // fixed sized storage policies.
550  // It will throw if the waitable has already been added.
551  this->storage_add_waitable(std::move(inner_waitable), std::move(associated_entity));
552  });
553  }
554 
556 
564  void
566  {
567  if (nullptr == waitable) {
568  throw std::invalid_argument("waitable is nullptr");
569  }
570  // this method comes from the SynchronizationPolicy
571  this->sync_remove_waitable(
572  std::move(waitable),
573  [this](std::shared_ptr<rclcpp::Waitable> && inner_waitable) {
574  inner_waitable->exchange_in_use_by_wait_set_state(false);
575  // This method comes from the StoragePolicy, and it may not exist for
576  // fixed sized storage policies.
577  // It will throw if the waitable is not in the wait set.
578  this->storage_remove_waitable(std::move(inner_waitable));
579  });
580  }
581 
583 
598  void
600  {
601  // this method comes from the SynchronizationPolicy
602  this->sync_prune_deleted_entities(
603  [this]() {
604  // This method comes from the StoragePolicy, and it may not exist for
605  // fixed sized storage policies.
606  this->storage_prune_deleted_entities();
607  });
608  }
609 
611 
651  template<class Rep = int64_t, class Period = std::milli>
655  {
656  auto time_to_wait_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(time_to_wait);
657 
658  // ensure the ownership of the entities in the wait set is shared for the duration of wait
659  this->storage_acquire_ownerships();
660  RCLCPP_SCOPE_EXIT({this->storage_release_ownerships();});
661 
662  // this method comes from the SynchronizationPolicy
663  return this->template sync_wait<WaitResult<WaitSetTemplate>>(
664  // pass along the time_to_wait duration as nanoseconds
665  time_to_wait_ns,
666  // this method provides the ability to rebuild the wait set, if needed
667  [this]() {
668  // This method comes from the StoragePolicy
669  this->storage_rebuild_rcl_wait_set(
670  // This method comes from the SynchronizationPolicy
671  this->get_extra_guard_conditions()
672  );
673  },
674  // this method provides access to the rcl wait set
675  [this]() -> rcl_wait_set_t & {
676  // This method comes from the StoragePolicy
677  return this->storage_get_rcl_wait_set();
678  },
679  // this method provides a way to create the WaitResult
680  [this](WaitResultKind wait_result_kind) -> WaitResult<WaitSetTemplate> {
681  // convert the result into a WaitResult
682  switch (wait_result_kind) {
689  default:
690  auto msg = "unknown WaitResultKind with value: " + std::to_string(wait_result_kind);
691  throw std::runtime_error(msg);
692  }
693  }
694  );
695  }
696 
697 private:
698  // Add WaitResult type as a friend so it can call private methods for
699  // acquiring and releasing resources as the WaitResult is initialized and
700  // destructed, respectively.
702 
704 
709  void
710  wait_result_acquire()
711  {
712  if (wait_result_holding_) {
713  throw std::runtime_error("wait_result_acquire() called while already holding");
714  }
715  wait_result_holding_ = true;
716  // this method comes from the SynchronizationPolicy
717  this->sync_wait_result_acquire();
718  // this method comes from the StoragePolicy
719  this->storage_acquire_ownerships();
720  }
721 
723 
728  void
729  wait_result_release()
730  {
731  if (!wait_result_holding_) {
732  throw std::runtime_error("wait_result_release() called while not holding");
733  }
734  wait_result_holding_ = false;
735  // this method comes from the StoragePolicy
736  this->storage_release_ownerships();
737  // this method comes from the SynchronizationPolicy
738  this->sync_wait_result_release();
739  }
740 
741  bool wait_result_holding_ = false;
742 };
743 
744 } // namespace rclcpp
745 
746 #endif // RCLCPP__WAIT_SET_TEMPLATE_HPP_
rclcpp::WaitSetTemplate::wait
RCUTILS_WARN_UNUSED WaitResult< WaitSetTemplate > wait(std::chrono::duration< Rep, Period > time_to_wait=std::chrono::duration< Rep, Period >(-1))
Wait for any of the entities in the wait set to be ready, or a period of time to pass.
Definition: wait_set_template.hpp:654
scope_exit.hpp
client.hpp
std::shared_ptr< rclcpp::SubscriptionBase >
rclcpp::Ready
@ Ready
Definition: wait_result_kind.hpp:26
rclcpp::WaitResultKind
WaitResultKind
Represents the various kinds of results from waiting on a wait set.
Definition: wait_result_kind.hpp:24
std::move
T move(T... args)
rclcpp::WaitSetTemplate::add_service
void add_service(std::shared_ptr< rclcpp::ServiceBase > service)
Add a service to this wait set.
Definition: wait_set_template.hpp:457
rclcpp::WaitSetTemplate
Encapsulates sets of waitable items which can be waited on as a group.
Definition: wait_set_template.hpp:47
rclcpp::SubscriptionWaitSetMask
Options used to determine what parts of a subscription get added to or removed from a wait set.
Definition: subscription_wait_set_mask.hpp:24
std::chrono::duration
rclcpp::WaitSetTemplate::prune_deleted_entities
void prune_deleted_entities()
Remove any destroyed entities from the wait set.
Definition: wait_set_template.hpp:599
context.hpp
subscription_base.hpp
guard_condition.hpp
rclcpp::contexts::get_global_default_context
DefaultContext::SharedPtr get_global_default_context()
Definition: default_context.hpp:49
rclcpp::WaitSetTemplate::WaitSetTemplate
WaitSetTemplate(const typename StoragePolicy::SubscriptionsIterable &subscriptions={}, const typename StoragePolicy::GuardConditionsIterable &guard_conditions={}, const typename StoragePolicy::TimersIterable &timers={}, const typename StoragePolicy::ClientsIterable &clients={}, const typename StoragePolicy::ServicesIterable &services={}, const typename StoragePolicy::WaitablesIterable &waitables={}, rclcpp::Context::SharedPtr context=rclcpp::contexts::get_global_default_context())
Construct a wait set with optional initial waitable entities and optional custom context.
Definition: wait_set_template.hpp:71
rclcpp::SubscriptionWaitSetMask::include_events
bool include_events
If true, include any events attached to the subscription.
Definition: subscription_wait_set_mask.hpp:30
rclcpp
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
rclcpp::SubscriptionWaitSetMask::include_intra_process_waitable
bool include_intra_process_waitable
If true, include the waitable used to handle intra process communication.
Definition: subscription_wait_set_mask.hpp:32
timer.hpp
rclcpp::WaitSetTemplate::remove_timer
void remove_timer(std::shared_ptr< rclcpp::TimerBase > timer)
Remove a timer from this wait set.
Definition: wait_set_template.hpp:371
macros.hpp
rclcpp::WaitSetTemplate::remove_subscription
void remove_subscription(std::shared_ptr< rclcpp::SubscriptionBase > subscription, rclcpp::SubscriptionWaitSetMask mask={})
Remove a subscription from this wait set.
Definition: wait_set_template.hpp:204
rclcpp::WaitSetTemplate::add_timer
void add_timer(std::shared_ptr< rclcpp::TimerBase > timer)
Add a timer to this wait set.
Definition: wait_set_template.hpp:341
rclcpp::WaitSetTemplate::add_guard_condition
void add_guard_condition(std::shared_ptr< rclcpp::GuardCondition > guard_condition)
Add a guard condition to this wait set.
Definition: wait_set_template.hpp:273
rclcpp::WaitSetTemplate::get_rcl_wait_set
const rcl_wait_set_t & get_rcl_wait_set() const
Return the internal rcl wait set object.
Definition: wait_set_template.hpp:100
std::to_string
T to_string(T... args)
subscription_wait_set_mask.hpp
rclcpp::Timeout
@ Timeout
Definition: wait_result_kind.hpp:27
rcl_wait_set_t
rclcpp::WaitSetTemplate::remove_service
void remove_service(std::shared_ptr< rclcpp::ServiceBase > service)
Remove a service from this wait set.
Definition: wait_set_template.hpp:487
std::runtime_error
std::invalid_argument
rclcpp::WaitSetTemplate::remove_client
void remove_client(std::shared_ptr< rclcpp::ClientBase > client)
Remove a client from this wait set.
Definition: wait_set_template.hpp:429
RCLCPP_SCOPE_EXIT
#define RCLCPP_SCOPE_EXIT(code)
Definition: scope_exit.hpp:49
RCUTILS_WARN_UNUSED
#define RCUTILS_WARN_UNUSED
rclcpp::WaitSetTemplate::add_waitable
void add_waitable(std::shared_ptr< rclcpp::Waitable > waitable, std::shared_ptr< void > associated_entity=nullptr)
Add a waitable to this wait set.
Definition: wait_set_template.hpp:529
default_context.hpp
visibility_control.hpp
rclcpp::Empty
@ Empty
Definition: wait_result_kind.hpp:28
rclcpp::WaitSetTemplate::add_client
void add_client(std::shared_ptr< rclcpp::ClientBase > client)
Add a client to this wait set.
Definition: wait_set_template.hpp:399
rclcpp::WaitSetTemplate::remove_waitable
void remove_waitable(std::shared_ptr< rclcpp::Waitable > waitable)
Remove a waitable from this wait set.
Definition: wait_set_template.hpp:565
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
rclcpp::WaitSetTemplate::add_subscription
void add_subscription(std::shared_ptr< rclcpp::SubscriptionBase > subscription, rclcpp::SubscriptionWaitSetMask mask={})
Add a subscription to this wait set.
Definition: wait_set_template.hpp:130
rclcpp::WaitSetTemplate::remove_guard_condition
void remove_guard_condition(std::shared_ptr< rclcpp::GuardCondition > guard_condition)
Remove a guard condition from this wait set.
Definition: wait_set_template.hpp:313
wait_result.hpp
service.hpp
rclcpp::SubscriptionWaitSetMask::include_subscription
bool include_subscription
If true, include the actual subscription.
Definition: subscription_wait_set_mask.hpp:28
waitable.hpp
rclcpp::WaitResult
Interface for introspecting a wait set after waiting on it.
Definition: wait_result.hpp:54