rclcpp  master
C++ ROS Client Library API
thread_safe_synchronization.hpp
Go to the documentation of this file.
1 // Copyright 2020 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__WAIT_SET_POLICIES__THREAD_SAFE_SYNCHRONIZATION_HPP_
16 #define RCLCPP__WAIT_SET_POLICIES__THREAD_SAFE_SYNCHRONIZATION_HPP_
17 
18 #include <chrono>
19 #include <functional>
20 #include <memory>
21 #include <utility>
22 
23 #include "rclcpp/client.hpp"
24 #include "rclcpp/exceptions.hpp"
26 #include "rclcpp/macros.hpp"
27 #include "rclcpp/service.hpp"
30 #include "rclcpp/timer.hpp"
32 #include "rclcpp/wait_result.hpp"
36 #include "rclcpp/waitable.hpp"
37 
38 namespace rclcpp
39 {
40 namespace wait_set_policies
41 {
42 
44 
68 {
69 protected:
70  explicit ThreadSafeSynchronization(rclcpp::Context::SharedPtr context)
71  : extra_guard_conditions_{{std::make_shared<rclcpp::GuardCondition>(context)}},
72  wprw_lock_([this]() {this->interrupt_waiting_wait_set();})
73  {}
74  ~ThreadSafeSynchronization() = default;
75 
77 
83  {
85  }
86 
88 
91  void
93  {
94  extra_guard_conditions_[0]->trigger();
95  }
96 
98  void
101  const rclcpp::SubscriptionWaitSetMask & mask,
104  > add_subscription_function)
105  {
108  add_subscription_function(std::move(subscription), mask);
109  }
110 
112  void
115  const rclcpp::SubscriptionWaitSetMask & mask,
118  > remove_subscription_function)
119  {
122  remove_subscription_function(std::move(subscription), mask);
123  }
124 
126  void
128  std::shared_ptr<rclcpp::GuardCondition> && guard_condition,
129  std::function<void(std::shared_ptr<rclcpp::GuardCondition>&&)> add_guard_condition_function)
130  {
133  add_guard_condition_function(std::move(guard_condition));
134  }
135 
137  void
139  std::shared_ptr<rclcpp::GuardCondition> && guard_condition,
140  std::function<void(std::shared_ptr<rclcpp::GuardCondition>&&)> remove_guard_condition_function)
141  {
144  remove_guard_condition_function(std::move(guard_condition));
145  }
146 
148  void
151  std::function<void(std::shared_ptr<rclcpp::TimerBase>&&)> add_timer_function)
152  {
155  add_timer_function(std::move(timer));
156  }
157 
159  void
162  std::function<void(std::shared_ptr<rclcpp::TimerBase>&&)> remove_timer_function)
163  {
166  remove_timer_function(std::move(timer));
167  }
168 
170  void
173  std::function<void(std::shared_ptr<rclcpp::ClientBase>&&)> add_client_function)
174  {
177  add_client_function(std::move(client));
178  }
179 
181  void
184  std::function<void(std::shared_ptr<rclcpp::ClientBase>&&)> remove_client_function)
185  {
188  remove_client_function(std::move(client));
189  }
190 
192  void
195  std::function<void(std::shared_ptr<rclcpp::ServiceBase>&&)> add_service_function)
196  {
199  add_service_function(std::move(service));
200  }
201 
203  void
206  std::function<void(std::shared_ptr<rclcpp::ServiceBase>&&)> remove_service_function)
207  {
210  remove_service_function(std::move(service));
211  }
212 
214  void
217  std::shared_ptr<void> && associated_entity,
220  > add_waitable_function)
221  {
224  add_waitable_function(std::move(waitable), std::move(associated_entity));
225  }
226 
228  void
231  std::function<void(std::shared_ptr<rclcpp::Waitable>&&)> remove_waitable_function)
232  {
235  remove_waitable_function(std::move(waitable));
236  }
237 
239  void
240  sync_prune_deleted_entities(std::function<void()> prune_deleted_entities_function)
241  {
244  prune_deleted_entities_function();
245  }
246 
248  template<class WaitResultT>
249  WaitResultT
251  std::chrono::nanoseconds time_to_wait_ns,
252  std::function<void()> rebuild_rcl_wait_set,
253  std::function<rcl_wait_set_t & ()> get_rcl_wait_set,
254  std::function<WaitResultT(WaitResultKind wait_result_kind)> create_wait_result)
255  {
256  // Assumption: this function assumes that some measure has been taken to
257  // ensure none of the entities being waited on by the wait set are allowed
258  // to go out of scope and therefore be deleted.
259  // In the case of the StaticStorage policy, this is ensured because it
260  // retains shared ownership of all entites for the duration of its own life.
261  // In the case of the DynamicStorage policy, this is ensured by the function
262  // which calls this function, by acquiring shared ownership of the entites
263  // for the duration of this function.
264 
265  // Setup looping predicate.
266  auto start = std::chrono::steady_clock::now();
267  std::function<bool()> should_loop = this->create_loop_predicate(time_to_wait_ns, start);
268 
269  // Wait until exit condition is met.
270  do {
271  {
272  // We have to prevent the entity sets from being mutated while building
273  // the rcl wait set.
276  // Rebuild the wait set.
277  // This will resize the wait set if needed, due to e.g. adding or removing
278  // entities since the last wait, but this should never occur in static
279  // storage wait sets since they cannot be changed after construction.
280  // This will also clear the wait set and re-add all the entities, which
281  // prepares it to be waited on again.
282  rebuild_rcl_wait_set();
283  }
284 
285  rcl_wait_set_t & rcl_wait_set = get_rcl_wait_set();
286 
287  // Wait unconditionally until timeout condition occurs since we assume
288  // there are no conditions that would require the wait to stop and reset,
289  // like asynchronously adding or removing an entity, i.e. explicitly
290  // providing no thread-safety.
291 
292  // Calculate how much time there is left to wait, unless blocking indefinitely.
293  auto time_left_to_wait_ns = this->calculate_time_left_to_wait(time_to_wait_ns, start);
294 
295  // Then wait for entities to become ready.
296 
297  // It is ok to wait while not having the lock acquired, because the state
298  // in the rcl wait set will not be updated until this method calls
299  // rebuild_rcl_wait_set().
300  rcl_ret_t ret = rcl_wait(&rcl_wait_set, time_left_to_wait_ns.count());
301  if (RCL_RET_OK == ret) {
302  // Something has become ready in the wait set, first check if it was
303  // the guard condition added by this class and/or a user defined guard condition.
304  const rcl_guard_condition_t * interrupt_guard_condition_ptr =
305  &(extra_guard_conditions_[0]->get_rcl_guard_condition());
306  bool was_interrupted_by_this_class = false;
307  bool any_user_guard_conditions_triggered = false;
308  for (size_t index = 0; index < rcl_wait_set.size_of_guard_conditions; ++index) {
309  const rcl_guard_condition_t * current = rcl_wait_set.guard_conditions[index];
310  if (nullptr != current) {
311  // Something is ready.
312  if (rcl_wait_set.guard_conditions[index] == interrupt_guard_condition_ptr) {
313  // This means that this class triggered a guard condition to interrupt this wait.
314  was_interrupted_by_this_class = true;
315  } else {
316  // This means it was a user guard condition.
317  any_user_guard_conditions_triggered = true;
318  }
319  }
320  }
321 
322  if (!was_interrupted_by_this_class || any_user_guard_conditions_triggered) {
323  // In this case we know:
324  // - something was ready
325  // - it was either:
326  // - not interrupted by this class, or
327  // - maybe it was, but there were also user defined guard conditions.
328  //
329  // We cannot ignore user defined guard conditions, but we can ignore
330  // other kinds of user defined entities, because they will still be
331  // ready next time we wait, whereas guard conditions are cleared.
332  // Therefore we need to create a WaitResult and return it.
333 
334  // The WaitResult will call sync_wait_result_acquire() and
335  // sync_wait_result_release() to ensure thread-safety by preventing
336  // the mutation of the entity sets while introspecting after waiting.
337  return create_wait_result(WaitResultKind::Ready);
338  }
339  // If we get here the we interrupted the wait set and there were no user
340  // guard conditions that needed to be handled.
341  // So we will loop and it will re-acquire the lock and rebuild the
342  // rcl wait set.
343  } else if (RCL_RET_TIMEOUT == ret) {
344  // The wait set timed out, exit the loop.
345  break;
346  } else if (RCL_RET_WAIT_SET_EMPTY == ret) {
347  // Wait set was empty, return Empty.
348  return create_wait_result(WaitResultKind::Empty);
349  } else {
350  // Some other error case, throw.
352  }
353  } while (should_loop());
354 
355  // Wait did not result in ready items, return timeout.
356  return create_wait_result(WaitResultKind::Timeout);
357  }
358 
359  void
361  {
363  }
364 
365  void
367  {
369  }
370 
371 protected:
374 };
375 
376 } // namespace wait_set_policies
377 } // namespace rclcpp
378 
379 #endif // RCLCPP__WAIT_SET_POLICIES__THREAD_SAFE_SYNCHRONIZATION_HPP_
rclcpp::wait_set_policies::ThreadSafeSynchronization::ThreadSafeSynchronization
ThreadSafeSynchronization(rclcpp::Context::SharedPtr context)
Definition: thread_safe_synchronization.hpp:70
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_remove_subscription
void sync_remove_subscription(std::shared_ptr< rclcpp::SubscriptionBase > &&subscription, const rclcpp::SubscriptionWaitSetMask &mask, std::function< void(std::shared_ptr< rclcpp::SubscriptionBase > &&, const rclcpp::SubscriptionWaitSetMask &) > remove_subscription_function)
Remove guard condition.
Definition: thread_safe_synchronization.hpp:113
rclcpp::wait_set_policies::ThreadSafeSynchronization::~ThreadSafeSynchronization
~ThreadSafeSynchronization()=default
synchronization_policy_common.hpp
exceptions.hpp
client.hpp
std::shared_ptr< rclcpp::SubscriptionBase >
rclcpp::wait_set_policies::detail::SynchronizationPolicyCommon::create_loop_predicate
std::function< bool()> create_loop_predicate(std::chrono::nanoseconds time_to_wait_ns, std::chrono::steady_clock::time_point start)
Definition: synchronization_policy_common.hpp:36
rclcpp::Ready
@ Ready
Definition: wait_result_kind.hpp:26
rclcpp::WaitResultKind
WaitResultKind
Represents the various kinds of results from waiting on a wait set.
Definition: wait_result_kind.hpp:24
std::move
T move(T... args)
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_remove_waitable
void sync_remove_waitable(std::shared_ptr< rclcpp::Waitable > &&waitable, std::function< void(std::shared_ptr< rclcpp::Waitable > &&)> remove_waitable_function)
Remove waitable.
Definition: thread_safe_synchronization.hpp:229
write_preferring_read_write_lock.hpp
rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock::get_read_mutex
ReadMutex & get_read_mutex()
Return read mutex which can be used with standard constructs like std::lock_guard.
rcl_ret_t
rmw_ret_t rcl_ret_t
rclcpp::wait_set_policies::ThreadSafeSynchronization::wprw_lock_
rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock wprw_lock_
Definition: thread_safe_synchronization.hpp:373
rclcpp::SubscriptionWaitSetMask
Options used to determine what parts of a subscription get added to or removed from a wait set.
Definition: subscription_wait_set_mask.hpp:24
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_add_waitable
void sync_add_waitable(std::shared_ptr< rclcpp::Waitable > &&waitable, std::shared_ptr< void > &&associated_entity, std::function< void(std::shared_ptr< rclcpp::Waitable > &&, std::shared_ptr< void > &&) > add_waitable_function)
Add waitable.
Definition: thread_safe_synchronization.hpp:215
rcl_wait_set_t::size_of_guard_conditions
size_t size_of_guard_conditions
rclcpp::wait_set_policies::detail::SynchronizationPolicyCommon::calculate_time_left_to_wait
std::chrono::nanoseconds calculate_time_left_to_wait(std::chrono::nanoseconds original_time_to_wait_ns, std::chrono::steady_clock::time_point start)
Definition: synchronization_policy_common.hpp:51
std::chrono::nanoseconds
std::lock_guard
rcl_guard_condition_t
std::function
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_wait
WaitResultT sync_wait(std::chrono::nanoseconds time_to_wait_ns, std::function< void()> rebuild_rcl_wait_set, std::function< rcl_wait_set_t &()> get_rcl_wait_set, std::function< WaitResultT(WaitResultKind wait_result_kind)> create_wait_result)
Implements wait.
Definition: thread_safe_synchronization.hpp:250
subscription_base.hpp
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_wait_result_release
void sync_wait_result_release()
Definition: thread_safe_synchronization.hpp:366
guard_condition.hpp
rclcpp
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_add_subscription
void sync_add_subscription(std::shared_ptr< rclcpp::SubscriptionBase > &&subscription, const rclcpp::SubscriptionWaitSetMask &mask, std::function< void(std::shared_ptr< rclcpp::SubscriptionBase > &&, const rclcpp::SubscriptionWaitSetMask &) > add_subscription_function)
Add subscription.
Definition: thread_safe_synchronization.hpp:99
timer.hpp
rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock::get_write_mutex
WriteMutex & get_write_mutex()
Return write mutex which can be used with standard constructs like std::lock_guard.
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_add_client
void sync_add_client(std::shared_ptr< rclcpp::ClientBase > &&client, std::function< void(std::shared_ptr< rclcpp::ClientBase > &&)> add_client_function)
Add client.
Definition: thread_safe_synchronization.hpp:171
rclcpp::wait_set_policies::ThreadSafeSynchronization
WaitSet policy that provides thread-safe synchronization for the wait set.
Definition: thread_safe_synchronization.hpp:67
rcl_wait_set_t::guard_conditions
const rcl_guard_condition_t ** guard_conditions
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_remove_service
void sync_remove_service(std::shared_ptr< rclcpp::ServiceBase > &&service, std::function< void(std::shared_ptr< rclcpp::ServiceBase > &&)> remove_service_function)
Remove service.
Definition: thread_safe_synchronization.hpp:204
macros.hpp
rclcpp::wait_set_policies::ThreadSafeSynchronization::extra_guard_conditions_
std::array< std::shared_ptr< rclcpp::GuardCondition >, 1 > extra_guard_conditions_
Definition: thread_safe_synchronization.hpp:372
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_add_guard_condition
void sync_add_guard_condition(std::shared_ptr< rclcpp::GuardCondition > &&guard_condition, std::function< void(std::shared_ptr< rclcpp::GuardCondition > &&)> add_guard_condition_function)
Add guard condition.
Definition: thread_safe_synchronization.hpp:127
subscription_wait_set_mask.hpp
std::array
rclcpp::Timeout
@ Timeout
Definition: wait_result_kind.hpp:27
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_wait_result_acquire
void sync_wait_result_acquire()
Definition: thread_safe_synchronization.hpp:360
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_remove_guard_condition
void sync_remove_guard_condition(std::shared_ptr< rclcpp::GuardCondition > &&guard_condition, std::function< void(std::shared_ptr< rclcpp::GuardCondition > &&)> remove_guard_condition_function)
Remove guard condition.
Definition: thread_safe_synchronization.hpp:138
RCL_RET_WAIT_SET_EMPTY
#define RCL_RET_WAIT_SET_EMPTY
rcl_wait_set_t
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_remove_client
void sync_remove_client(std::shared_ptr< rclcpp::ClientBase > &&client, std::function< void(std::shared_ptr< rclcpp::ClientBase > &&)> remove_client_function)
Remove client.
Definition: thread_safe_synchronization.hpp:182
rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock::ReadMutex::unlock
void unlock()
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_remove_timer
void sync_remove_timer(std::shared_ptr< rclcpp::TimerBase > &&timer, std::function< void(std::shared_ptr< rclcpp::TimerBase > &&)> remove_timer_function)
Remove timer.
Definition: thread_safe_synchronization.hpp:160
rclcpp::wait_set_policies::detail::SynchronizationPolicyCommon
Common structure for synchronization policies.
Definition: synchronization_policy_common.hpp:29
rclcpp::wait_set_policies::ThreadSafeSynchronization::get_extra_guard_conditions
const std::array< std::shared_ptr< rclcpp::GuardCondition >, 1 > & get_extra_guard_conditions()
Return any "extra" guard conditions needed to implement the synchronization policy.
Definition: thread_safe_synchronization.hpp:82
rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock::ReadMutex::lock
void lock()
rclcpp::wait_set_policies::ThreadSafeSynchronization::interrupt_waiting_wait_set
void interrupt_waiting_wait_set()
Interrupt any waiting wait set.
Definition: thread_safe_synchronization.hpp:92
wait_result_kind.hpp
visibility_control.hpp
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_prune_deleted_entities
void sync_prune_deleted_entities(std::function< void()> prune_deleted_entities_function)
Prune deleted entities.
Definition: thread_safe_synchronization.hpp:240
rclcpp::Empty
@ Empty
Definition: wait_result_kind.hpp:28
rclcpp::exceptions::throw_from_rcl_error
void throw_from_rcl_error(rcl_ret_t ret, const std::string &prefix="", const rcl_error_state_t *error_state=nullptr, void(*reset_error)()=rcl_reset_error)
Throw a C++ std::exception which was created based on an rcl error.
RCL_RET_TIMEOUT
#define RCL_RET_TIMEOUT
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_add_service
void sync_add_service(std::shared_ptr< rclcpp::ServiceBase > &&service, std::function< void(std::shared_ptr< rclcpp::ServiceBase > &&)> add_service_function)
Add service.
Definition: thread_safe_synchronization.hpp:193
wait_result.hpp
RCL_RET_OK
#define RCL_RET_OK
service.hpp
rclcpp::wait_set_policies::detail::WritePreferringReadWriteLock
Writer-perferring read-write lock.
Definition: write_preferring_read_write_lock.hpp:170
rclcpp::wait_set_policies::ThreadSafeSynchronization::sync_add_timer
void sync_add_timer(std::shared_ptr< rclcpp::TimerBase > &&timer, std::function< void(std::shared_ptr< rclcpp::TimerBase > &&)> add_timer_function)
Add timer.
Definition: thread_safe_synchronization.hpp:149
rcl_wait
rcl_ret_t rcl_wait(rcl_wait_set_t *wait_set, int64_t timeout)
waitable.hpp
std::chrono::steady_clock::now
T now(T... args)