rclcpp  master
C++ ROS Client Library API
executor.hpp
Go to the documentation of this file.
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXECUTOR_HPP_
16 #define RCLCPP__EXECUTOR_HPP_
17 
18 #include <algorithm>
19 #include <cassert>
20 #include <chrono>
21 #include <cstdlib>
22 #include <iostream>
23 #include <list>
24 #include <map>
25 #include <memory>
26 #include <mutex>
27 #include <string>
28 #include <vector>
29 
30 #include "rcl/guard_condition.h"
31 #include "rcl/wait.h"
32 
33 #include "rclcpp/context.hpp"
41 #include "rclcpp/utilities.hpp"
43 #include "rclcpp/scope_exit.hpp"
44 
45 namespace rclcpp
46 {
47 
48 typedef std::map<rclcpp::CallbackGroup::WeakPtr,
49  rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
51 
52 // Forward declaration is used in convenience method signature.
53 class Node;
54 
56 
65 class Executor
66 {
67 public:
69 
70 
71 
75  explicit Executor(const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions());
76 
79  virtual ~Executor();
80 
82  // It is up to the implementation of Executor to implement spin.
83  virtual void
84  spin() = 0;
85 
87 
104  virtual void
106  rclcpp::CallbackGroup::SharedPtr group_ptr,
107  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
108  bool notify = true);
109 
111 
125 
127 
139 
141 
154 
156 
175  virtual void
177  rclcpp::CallbackGroup::SharedPtr group_ptr,
178  bool notify = true);
179 
181 
200  virtual void
201  add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true);
202 
204 
208  virtual void
209  add_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true);
210 
212 
228  virtual void
229  remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true);
230 
232 
236  virtual void
237  remove_node(std::shared_ptr<rclcpp::Node> node_ptr, bool notify = true);
238 
240 
246  template<typename RepT = int64_t, typename T = std::milli>
247  void
249  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
251  {
253  node,
254  std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
255  );
256  }
257 
259  template<typename NodeT = rclcpp::Node, typename RepT = int64_t, typename T = std::milli>
260  void
264  {
266  node->get_node_base_interface(),
267  std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
268  );
269  }
270 
272 
276  void
277  spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node);
278 
281  void
283 
285 
296  virtual void
298 
300 
313  virtual void
314  spin_all(std::chrono::nanoseconds max_duration);
315 
317  virtual void
319 
321 
330  template<typename FutureT, typename TimeRepT = int64_t, typename TimeT = std::milli>
333  const FutureT & future,
335  {
336  // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
337  // inside a callback executed by an executor.
338 
339  // Check the future before entering the while loop.
340  // If the future is already complete, don't try to spin.
341  std::future_status status = future.wait_for(std::chrono::seconds(0));
342  if (status == std::future_status::ready) {
344  }
345 
346  auto end_time = std::chrono::steady_clock::now();
347  std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
348  timeout);
349  if (timeout_ns > std::chrono::nanoseconds::zero()) {
350  end_time += timeout_ns;
351  }
352  std::chrono::nanoseconds timeout_left = timeout_ns;
353 
354  if (spinning.exchange(true)) {
355  throw std::runtime_error("spin_until_future_complete() called while already spinning");
356  }
357  RCLCPP_SCOPE_EXIT(this->spinning.store(false); );
358  while (rclcpp::ok(this->context_) && spinning.load()) {
359  // Do one item of work.
360  spin_once_impl(timeout_left);
361 
362  // Check if the future is set, return SUCCESS if it is.
363  status = future.wait_for(std::chrono::seconds(0));
364  if (status == std::future_status::ready) {
366  }
367  // If the original timeout is < 0, then this is blocking, never TIMEOUT.
368  if (timeout_ns < std::chrono::nanoseconds::zero()) {
369  continue;
370  }
371  // Otherwise check if we still have time to wait, return TIMEOUT if not.
372  auto now = std::chrono::steady_clock::now();
373  if (now >= end_time) {
375  }
376  // Subtract the elapsed time from the original timeout.
377  timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
378  }
379 
380  // The future did not complete before ok() returned false, return INTERRUPTED.
382  }
383 
385 
390  void
391  cancel();
392 
394 
401  void
402  set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
403 
404 protected:
406  void
408  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
409  std::chrono::nanoseconds timeout);
410 
412  void
413  spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive);
414 
416 
422  void
424 
426  static void
428  rclcpp::SubscriptionBase::SharedPtr subscription);
429 
431  static void
432  execute_timer(rclcpp::TimerBase::SharedPtr timer);
433 
435  static void
436  execute_service(rclcpp::ServiceBase::SharedPtr service);
437 
439  static void
440  execute_client(rclcpp::ClientBase::SharedPtr client);
441 
446  void
448 
450  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
452  const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
453  rclcpp::CallbackGroup::SharedPtr group);
454 
456 
461  bool
462  has_node(
463  const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
464  const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) const;
465 
467  rclcpp::CallbackGroup::SharedPtr
468  get_group_by_timer(rclcpp::TimerBase::SharedPtr timer);
469 
471 
475  virtual void
477  rclcpp::CallbackGroup::SharedPtr group_ptr,
478  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr,
479  WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
480  bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
481 
483 
487  virtual void
489  rclcpp::CallbackGroup::SharedPtr group_ptr,
490  WeakCallbackGroupsToNodesMap & weak_groups_to_nodes,
491  bool notify = true) RCPPUTILS_TSA_REQUIRES(mutex_);
492 
494  bool
495  get_next_ready_executable(AnyExecutable & any_executable);
496 
498  bool
500  AnyExecutable & any_executable,
501  const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes);
502 
504  bool
506  AnyExecutable & any_executable,
507  std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
508 
510 
521  virtual void
523 
525  std::atomic_bool spinning;
526 
528  rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();
529 
531 
533  rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
534 
535  // Mutex to protect the subsequent memory_strategy_.
536  mutable std::mutex mutex_;
537 
539  memory_strategy::MemoryStrategy::SharedPtr
540  memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_);
541 
543  std::shared_ptr<rclcpp::Context> context_;
544 
546 
548  virtual void
549  spin_once_impl(std::chrono::nanoseconds timeout);
550 
551  typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
552  const rcl_guard_condition_t *,
553  std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
555 
558  weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
559 
562  weak_groups_associated_with_executor_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
563 
566  weak_groups_to_nodes_associated_with_executor_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
567 
570  weak_groups_to_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
571 
573  std::list<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>
574  weak_nodes_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
575 
578 };
579 
580 } // namespace rclcpp
581 
582 #endif // RCLCPP__EXECUTOR_HPP_
rclcpp::Executor::spin_some
virtual void spin_some(std::chrono::nanoseconds max_duration=std::chrono::nanoseconds(0))
Collect work once and execute all available work, optionally within a duration.
rclcpp::GuardCondition
A condition that can be waited on in a single wait set and asynchronously triggered.
Definition: guard_condition.hpp:31
rclcpp::Executor::spin_node_once_nanoseconds
void spin_node_once_nanoseconds(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::nanoseconds timeout)
rclcpp::FutureReturnCode
FutureReturnCode
Return codes to be used with spin_until_future_complete.
Definition: future_return_code.hpp:33
scope_exit.hpp
rclcpp::Executor::spin_node_once
void spin_node_once(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::duration< RepT, T > timeout=std::chrono::duration< RepT, T >(-1))
Add a node to executor, execute the next available unit of work, and remove the node.
Definition: executor.hpp:248
std::shared_ptr
guard_condition.h
rclcpp::Executor::remove_callback_group_from_map
virtual void remove_callback_group_from_map(rclcpp::CallbackGroup::SharedPtr group_ptr, WeakCallbackGroupsToNodesMap &weak_groups_to_nodes, bool notify=true) RCPPUTILS_TSA_REQUIRES(mutex_)
Remove a callback group from the executor.
RCLCPP_DISABLE_COPY
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
rclcpp::Executor::interrupt_guard_condition_
rcl_guard_condition_t interrupt_guard_condition_
Guard condition for signaling the rmw layer to wake up for special events.
Definition: executor.hpp:528
rclcpp::Executor::spin_once
virtual void spin_once(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
rclcpp::Executor::execute_client
static void execute_client(rclcpp::ClientBase::SharedPtr client)
rclcpp::Executor::spin
virtual void spin()=0
Do work periodically as it becomes available to us. Blocking call, may block indefinitely.
std::vector< rclcpp::CallbackGroup::WeakPtr >
rclcpp::Executor::add_callback_groups_from_nodes_associated_to_executor
virtual void add_callback_groups_from_nodes_associated_to_executor() RCPPUTILS_TSA_REQUIRES(mutex_)
Add all callback groups that can be automatically added from associated nodes.
wait.h
std::chrono::duration
context.hpp
rclcpp::WeakCallbackGroupsToNodesMap
std::map< rclcpp::CallbackGroup::WeakPtr, rclcpp::node_interfaces::NodeBaseInterface::WeakPtr, std::owner_less< rclcpp::CallbackGroup::WeakPtr > > WeakCallbackGroupsToNodesMap
Definition: executor.hpp:50
rclcpp::Executor::execute_any_executable
void execute_any_executable(AnyExecutable &any_exec)
Find the next available executable and do the work associated with it.
rclcpp::Executor
Coordinate the order and timing of available communication tasks.
Definition: executor.hpp:65
std::owner_less
std::chrono::nanoseconds::zero
T zero(T... args)
rcl_guard_condition_t
rclcpp::OnShutdownCallbackHandle
Definition: context.hpp:51
guard_condition.hpp
rclcpp::Executor::get_manually_added_callback_groups
virtual std::vector< rclcpp::CallbackGroup::WeakPtr > get_manually_added_callback_groups()
Get callback groups that belong to executor.
rclcpp::Executor::cancel
void cancel()
Cancel any running spin* function, causing it to return.
rclcpp::ok
bool ok(rclcpp::Context::SharedPtr context=nullptr)
Check rclcpp's status.
rclcpp
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
RCLCPP_PUBLIC
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
rclcpp::Executor::RCPPUTILS_TSA_GUARDED_BY
WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_ RCPPUTILS_TSA_GUARDED_BY(mutex_)
maps nodes to guard conditions
rclcpp::Executor::get_group_by_timer
rclcpp::CallbackGroup::SharedPtr get_group_by_timer(rclcpp::TimerBase::SharedPtr timer)
rclcpp::Executor::execute_timer
static void execute_timer(rclcpp::TimerBase::SharedPtr timer)
rclcpp::Executor::RCPPUTILS_TSA_PT_GUARDED_BY
memory_strategy::MemoryStrategy::SharedPtr memory_strategy_ RCPPUTILS_TSA_PT_GUARDED_BY(mutex_)
The memory strategy: an interface for handling user-defined memory allocation strategies.
rclcpp::Executor::get_next_ready_executable_from_map
bool get_next_ready_executable_from_map(AnyExecutable &any_executable, const WeakCallbackGroupsToNodesMap &weak_groups_to_nodes)
rclcpp::Node
Node is the single point of entry for creating publishers and subscribers.
Definition: node.hpp:77
rclcpp::Executor::execute_service
static void execute_service(rclcpp::ServiceBase::SharedPtr service)
rclcpp::ExecutorOptions
Options to be passed to the executor constructor.
Definition: executor_options.hpp:28
rclcpp::FutureReturnCode::SUCCESS
@ SUCCESS
rclcpp::Executor::shutdown_callback_handle_
rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_
shutdown callback handle registered to Context
Definition: executor.hpp:577
rclcpp::Executor::remove_node
virtual void remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Remove a node from the executor.
node_base_interface.hpp
future_return_code.hpp
rclcpp::Executor::spin_until_future_complete
FutureReturnCode spin_until_future_complete(const FutureT &future, std::chrono::duration< TimeRepT, TimeT > timeout=std::chrono::duration< TimeRepT, TimeT >(-1))
Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
Definition: executor.hpp:332
rclcpp::FutureReturnCode::TIMEOUT
@ TIMEOUT
rclcpp::Executor::wait_for_work
void wait_for_work(std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
rclcpp::FutureReturnCode::INTERRUPTED
@ INTERRUPTED
rclcpp::Executor::~Executor
virtual ~Executor()
Default destructor.
rcl_wait_set_t
rclcpp::AnyExecutable
Definition: any_executable.hpp:33
rclcpp::Executor::spin_node_once
void spin_node_once(std::shared_ptr< NodeT > node, std::chrono::duration< RepT, T > timeout=std::chrono::duration< RepT, T >(-1))
Convenience function which takes Node and forwards NodeBaseInterface.
Definition: executor.hpp:261
std::runtime_error
RCLCPP_SCOPE_EXIT
#define RCLCPP_SCOPE_EXIT(code)
Definition: scope_exit.hpp:49
rclcpp::Executor::add_node
virtual void add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a node to the executor.
std::map
rclcpp::Executor::get_next_executable
bool get_next_executable(AnyExecutable &any_executable, std::chrono::nanoseconds timeout=std::chrono::nanoseconds(-1))
rclcpp::Executor::get_all_callback_groups
virtual std::vector< rclcpp::CallbackGroup::WeakPtr > get_all_callback_groups()
Get callback groups that belong to executor.
rclcpp::Executor::Executor
Executor(const rclcpp::ExecutorOptions &options=rclcpp::ExecutorOptions())
Default constructor.
rclcpp::Executor::get_node_by_group
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr get_node_by_group(const WeakCallbackGroupsToNodesMap &weak_groups_to_nodes, rclcpp::CallbackGroup::SharedPtr group)
rclcpp::Executor::set_memory_strategy
void set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy)
Support dynamic switching of the memory strategy.
rclcpp::Executor::get_next_ready_executable
bool get_next_ready_executable(AnyExecutable &any_executable)
executor_options.hpp
rclcpp::Context
Context which encapsulates shared state between nodes and other similar entities.
Definition: context.hpp:68
default_context.hpp
rclcpp::Executor::wait_set_
rcl_wait_set_t wait_set_
Wait set for managing entities that the rmw layer waits on.
Definition: executor.hpp:533
rclcpp::Executor::add_callback_group
virtual void add_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify=true)
Add a callback group to an executor.
visibility_control.hpp
std
rclcpp::Executor::has_node
bool has_node(const rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, const WeakCallbackGroupsToNodesMap &weak_groups_to_nodes) const
Return true if the node has been added to this executor.
rclcpp::Executor::spin_some_impl
void spin_some_impl(std::chrono::nanoseconds max_duration, bool exhaustive)
rclcpp::Executor::get_automatically_added_callback_groups_from_nodes
virtual std::vector< rclcpp::CallbackGroup::WeakPtr > get_automatically_added_callback_groups_from_nodes()
Get callback groups that belong to executor.
rclcpp::Executor::remove_callback_group
virtual void remove_callback_group(rclcpp::CallbackGroup::SharedPtr group_ptr, bool notify=true)
Remove a callback group from the executor.
rclcpp::Executor::mutex_
std::mutex mutex_
Definition: executor.hpp:536
RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
rclcpp::Executor::shutdown_guard_condition_
std::shared_ptr< rclcpp::GuardCondition > shutdown_guard_condition_
Definition: executor.hpp:530
rclcpp::CallbackGroup
Definition: callback_group.hpp:49
rclcpp::Executor::spin_node_some
void spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node)
Add a node, complete all immediately available work, and remove the node.
rclcpp::Executor::add_callback_group_to_map
virtual void add_callback_group_to_map(rclcpp::CallbackGroup::SharedPtr group_ptr, rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, WeakCallbackGroupsToNodesMap &weak_groups_to_nodes, bool notify=true) RCPPUTILS_TSA_REQUIRES(mutex_)
Add a callback group to an executor.
rclcpp::Executor::spinning
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins.
Definition: executor.hpp:525
rclcpp::Executor::spin_all
virtual void spin_all(std::chrono::nanoseconds max_duration)
Collect and execute work repeatedly within a duration or until no more work is available.
rclcpp::Executor::context_
std::shared_ptr< rclcpp::Context > context_
The context associated with this executor.
Definition: executor.hpp:543
rclcpp::Executor::spin_once_impl
virtual void spin_once_impl(std::chrono::nanoseconds timeout)
memory_strategies.hpp
utilities.hpp
memory_strategy.hpp
rclcpp::Executor::execute_subscription
static void execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription)
std::chrono::steady_clock::now
T now(T... args)