rclcpp  master
C++ ROS Client Library API
executor.hpp
Go to the documentation of this file.
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXECUTOR_HPP_
16 #define RCLCPP__EXECUTOR_HPP_
17 
18 #include <algorithm>
19 #include <cassert>
20 #include <chrono>
21 #include <cstdlib>
22 #include <iostream>
23 #include <list>
24 #include <memory>
25 #include <string>
26 #include <vector>
27 
28 #include "rcl/guard_condition.h"
29 #include "rcl/wait.h"
30 
34 #include "rclcpp/utilities.hpp"
36 
37 namespace rclcpp
38 {
39 
40 // Forward declaration is used in convenience method signature.
41 namespace node
42 {
43 class Node;
44 } // namespace node
45 
46 namespace executor
47 {
48 
50 
56 
58 std::ostream &
59 operator<<(std::ostream & os, const FutureReturnCode & future_return_code);
60 
62 std::string
63 to_string(const FutureReturnCode & future_return_code);
64 
66 
70 {
71  memory_strategy::MemoryStrategy::SharedPtr memory_strategy;
72  size_t max_conditions = 0;
73 };
74 
75 static inline ExecutorArgs create_default_executor_arguments()
76 {
77  ExecutorArgs args;
79  args.max_conditions = 0;
80  return args;
81 }
82 
84 
93 class Executor
94 {
95 public:
97 
98 
99  // \param[in] ms The memory strategy to be used with this executor.
101  explicit Executor(const ExecutorArgs & args = create_default_executor_arguments());
102 
105  virtual ~Executor();
106 
108  // It is up to the implementation of Executor to implement spin.
109  virtual void
110  spin() = 0;
111 
113 
121  virtual void
122  add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true);
123 
126  virtual void
127  add_node(std::shared_ptr<rclcpp::node::Node> node_ptr, bool notify = true);
128 
130 
137  virtual void
138  remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true);
139 
142  virtual void
143  remove_node(std::shared_ptr<rclcpp::node::Node> node_ptr, bool notify = true);
144 
146 
152  template<typename T = std::milli>
153  void
154  spin_node_once(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
155  std::chrono::duration<int64_t, T> timeout = std::chrono::duration<int64_t, T>(-1))
156  {
157  return spin_node_once_nanoseconds(
158  node,
159  std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
160  );
161  }
162 
164  template<typename NodeT = rclcpp::node::Node, typename T = std::milli>
165  void
166  spin_node_once(std::shared_ptr<NodeT> node,
167  std::chrono::duration<int64_t, T> timeout = std::chrono::duration<int64_t, T>(-1))
168  {
169  return spin_node_once_nanoseconds(
170  node->get_node_base_interface(),
171  std::chrono::duration_cast<std::chrono::nanoseconds>(timeout)
172  );
173  }
174 
176 
180  void
181  spin_node_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node);
182 
185  void
186  spin_node_some(std::shared_ptr<rclcpp::node::Node> node);
187 
189 
196  virtual void
197  spin_some();
198 
200  virtual void
201  spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
202 
204 
213  template<typename ResponseT, typename TimeT = std::milli>
216  std::shared_future<ResponseT> & future,
217  std::chrono::duration<int64_t, TimeT> timeout = std::chrono::duration<int64_t, TimeT>(-1))
218  {
219  // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete
220  // inside a callback executed by an executor.
221 
222  // Check the future before entering the while loop.
223  // If the future is already complete, don't try to spin.
224  std::future_status status = future.wait_for(std::chrono::seconds(0));
225  if (status == std::future_status::ready) {
226  return FutureReturnCode::SUCCESS;
227  }
228 
229  auto end_time = std::chrono::steady_clock::now();
230  std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
231  timeout);
232  if (timeout_ns > std::chrono::nanoseconds::zero()) {
233  end_time += timeout_ns;
234  }
235  std::chrono::nanoseconds timeout_left = timeout_ns;
236 
237  while (rclcpp::utilities::ok()) {
238  // Do one item of work.
239  spin_once(timeout_left);
240  // Check if the future is set, return SUCCESS if it is.
241  status = future.wait_for(std::chrono::seconds(0));
242  if (status == std::future_status::ready) {
243  return FutureReturnCode::SUCCESS;
244  }
245  // If the original timeout is < 0, then this is blocking, never TIMEOUT.
246  if (timeout_ns < std::chrono::nanoseconds::zero()) {
247  continue;
248  }
249  // Otherwise check if we still have time to wait, return TIMEOUT if not.
250  auto now = std::chrono::steady_clock::now();
251  if (now >= end_time) {
252  return FutureReturnCode::TIMEOUT;
253  }
254  // Subtract the elapsed time from the original timeout.
255  timeout_left = std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - now);
256  }
257 
258  // The future did not complete before ok() returned false, return INTERRUPTED.
259  return FutureReturnCode::INTERRUPTED;
260  }
261 
263  /* This function can be called asynchonously from any thread. */
265  void
266  cancel();
267 
269 
275  void
276  set_memory_strategy(memory_strategy::MemoryStrategy::SharedPtr memory_strategy);
277 
278 protected:
280  void
281  spin_node_once_nanoseconds(
282  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node,
283  std::chrono::nanoseconds timeout);
284 
286 
290  void
291  execute_any_executable(AnyExecutable::SharedPtr any_exec);
292 
294  static void
295  execute_subscription(
296  rclcpp::subscription::SubscriptionBase::SharedPtr subscription);
297 
299  static void
300  execute_intra_process_subscription(
301  rclcpp::subscription::SubscriptionBase::SharedPtr subscription);
302 
304  static void
305  execute_timer(rclcpp::timer::TimerBase::SharedPtr timer);
306 
308  static void
309  execute_service(rclcpp::service::ServiceBase::SharedPtr service);
310 
312  static void
313  execute_client(rclcpp::client::ClientBase::SharedPtr client);
314 
316  void
317  wait_for_work(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
318 
320  rclcpp::node_interfaces::NodeBaseInterface::SharedPtr
321  get_node_by_group(rclcpp::callback_group::CallbackGroup::SharedPtr group);
322 
324  rclcpp::callback_group::CallbackGroup::SharedPtr
325  get_group_by_timer(rclcpp::timer::TimerBase::SharedPtr timer);
326 
328  void
329  get_next_timer(AnyExecutable::SharedPtr any_exec);
330 
332  AnyExecutable::SharedPtr
333  get_next_ready_executable();
334 
336  AnyExecutable::SharedPtr
337  get_next_executable(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1));
338 
340  std::atomic_bool spinning;
341 
344 
347 
349  memory_strategy::MemoryStrategy::SharedPtr memory_strategy_;
350 
351 private:
352  RCLCPP_DISABLE_COPY(Executor)
353 
354  std::vector<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr> weak_nodes_;
355 };
356 
357 } // namespace executor
358 } // namespace rclcpp
359 
360 #endif // RCLCPP__EXECUTOR_HPP_
bool ok()
Check rclcpp&#39;s status.
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
std::atomic_bool spinning
Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins...
Definition: executor.hpp:340
FutureReturnCode spin_until_future_complete(std::shared_future< ResponseT > &future, std::chrono::duration< int64_t, TimeT > timeout=std::chrono::duration< int64_t, TimeT >(-1))
Spin (blocking) until the future is complete, it times out waiting, or rclcpp is interrupted.
Definition: executor.hpp:215
void spin_node_once(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, std::chrono::duration< int64_t, T > timeout=std::chrono::duration< int64_t, T >(-1))
Add a node to executor, execute the next available unit of work, and remove the node.
Definition: executor.hpp:154
Definition: allocator_common.hpp:24
std::ostream & operator<<(std::ostream &os, const FutureReturnCode &future_return_code)
rcl_wait_set_t rcl_get_zero_initialized_wait_set(void)
memory_strategy::MemoryStrategy::SharedPtr create_default_strategy()
rcl_guard_condition_t rcl_get_zero_initialized_guard_condition(void)
std::string to_string(const FutureReturnCode &future_return_code)
void spin_node_once(std::shared_ptr< NodeT > node, std::chrono::duration< int64_t, T > timeout=std::chrono::duration< int64_t, T >(-1))
Convenience function which takes Node and forwards NodeBaseInterface.
Definition: executor.hpp:166
memory_strategy::MemoryStrategy::SharedPtr memory_strategy_
The memory strategy: an interface for handling user-defined memory allocation strategies.
Definition: executor.hpp:349
Coordinate the order and timing of available communication tasks.
Definition: executor.hpp:93
Node is the single point of entry for creating publishers and subscribers.
Definition: node.hpp:62
void spin_some(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Create a default single-threaded executor and execute any immediately available work.
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
FutureReturnCode
Return codes to be used with spin_until_future_complete.
Definition: executor.hpp:55
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
size_t max_conditions
Definition: executor.hpp:72
memory_strategy::MemoryStrategy::SharedPtr memory_strategy
Definition: executor.hpp:71
Definition: executor.hpp:69
void spin(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr)
Create a default single-threaded executor and spin the specified node.