rclcpp  master
C++ ROS Client Library API
intra_process_manager.hpp
Go to the documentation of this file.
1 // Copyright 2019 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
16 #define RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
17 
18 #include <rmw/types.h>
19 
20 #include <shared_mutex>
21 
22 #include <algorithm>
23 #include <atomic>
24 #include <cstdint>
25 #include <exception>
26 #include <map>
27 #include <memory>
28 #include <string>
29 #include <unordered_map>
30 #include <utility>
31 #include <vector>
32 
36 #include "rclcpp/logger.hpp"
37 #include "rclcpp/logging.hpp"
38 #include "rclcpp/macros.hpp"
41 
42 namespace rclcpp
43 {
44 
45 namespace experimental
46 {
47 
49 
92 {
93 private:
95 
96 public:
98 
101 
103  virtual ~IntraProcessManager();
104 
106 
116  uint64_t
117  add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription);
118 
120 
126  void
127  remove_subscription(uint64_t intra_process_subscription_id);
128 
130 
140  uint64_t
142 
144 
150  void
151  remove_publisher(uint64_t intra_process_publisher_id);
152 
154 
176  template<
177  typename MessageT,
178  typename Alloc = std::allocator<void>,
180  void
182  uint64_t intra_process_publisher_id,
185  {
186  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
187  using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
188 
190 
191  auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
192  if (publisher_it == pub_to_subs_.end()) {
193  // Publisher is either invalid or no longer exists.
194  RCLCPP_WARN(
195  rclcpp::get_logger("rclcpp"),
196  "Calling do_intra_process_publish for invalid or no longer existing publisher id");
197  return;
198  }
199  const auto & sub_ids = publisher_it->second;
200 
201  if (sub_ids.take_ownership_subscriptions.empty()) {
202  // None of the buffers require ownership, so we promote the pointer
203  std::shared_ptr<MessageT> msg = std::move(message);
204 
205  this->template add_shared_msg_to_buffers<MessageT>(msg, sub_ids.take_shared_subscriptions);
206  } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
207  sub_ids.take_shared_subscriptions.size() <= 1)
208  {
209  // There is at maximum 1 buffer that does not require ownership.
210  // So we this case is equivalent to all the buffers requiring ownership
211 
212  // Merge the two vector of ids into a unique one
213  std::vector<uint64_t> concatenated_vector(sub_ids.take_shared_subscriptions);
214  concatenated_vector.insert(
215  concatenated_vector.end(),
216  sub_ids.take_ownership_subscriptions.begin(),
217  sub_ids.take_ownership_subscriptions.end());
218 
219  this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
220  std::move(message),
221  concatenated_vector,
222  allocator);
223  } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
224  sub_ids.take_shared_subscriptions.size() > 1)
225  {
226  // Construct a new shared pointer from the message
227  // for the buffers that do not require ownership
228  auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
229 
230  this->template add_shared_msg_to_buffers<MessageT>(shared_msg,
231  sub_ids.take_shared_subscriptions);
232  this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
233  std::move(message),
234  sub_ids.take_ownership_subscriptions,
235  allocator);
236  }
237  }
238 
239  template<
240  typename MessageT,
241  typename Alloc = std::allocator<void>,
242  typename Deleter = std::default_delete<MessageT>>
245  uint64_t intra_process_publisher_id,
248  {
249  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
250  using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
251 
253 
254  auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
255  if (publisher_it == pub_to_subs_.end()) {
256  // Publisher is either invalid or no longer exists.
257  RCLCPP_WARN(
258  rclcpp::get_logger("rclcpp"),
259  "Calling do_intra_process_publish for invalid or no longer existing publisher id");
260  return nullptr;
261  }
262  const auto & sub_ids = publisher_it->second;
263 
264  if (sub_ids.take_ownership_subscriptions.empty()) {
265  // If there are no owning, just convert to shared.
266  std::shared_ptr<MessageT> shared_msg = std::move(message);
267  if (!sub_ids.take_shared_subscriptions.empty()) {
268  this->template add_shared_msg_to_buffers<MessageT>(shared_msg,
269  sub_ids.take_shared_subscriptions);
270  }
271  return shared_msg;
272  } else {
273  // Construct a new shared pointer from the message for the buffers that
274  // do not require ownership and to return.
275  auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
276 
277  if (!sub_ids.take_shared_subscriptions.empty()) {
278  this->template add_shared_msg_to_buffers<MessageT>(
279  shared_msg,
280  sub_ids.take_shared_subscriptions);
281  }
282  if (!sub_ids.take_ownership_subscriptions.empty()) {
283  this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
284  std::move(message),
285  sub_ids.take_ownership_subscriptions,
286  allocator);
287  }
288 
289  return shared_msg;
290  }
291  }
292 
295  bool
296  matches_any_publishers(const rmw_gid_t * id) const;
297 
300  size_t
301  get_subscription_count(uint64_t intra_process_publisher_id) const;
302 
304  rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
305  get_subscription_intra_process(uint64_t intra_process_subscription_id);
306 
307 private:
308  struct SubscriptionInfo
309  {
310  SubscriptionInfo() = default;
311 
312  rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription;
313  rmw_qos_profile_t qos;
314  const char * topic_name;
315  bool use_take_shared_method;
316  };
317 
318  struct PublisherInfo
319  {
320  PublisherInfo() = default;
321 
322  rclcpp::PublisherBase::WeakPtr publisher;
323  rmw_qos_profile_t qos;
324  const char * topic_name;
325  };
326 
327  struct SplittedSubscriptions
328  {
329  std::vector<uint64_t> take_shared_subscriptions;
330  std::vector<uint64_t> take_ownership_subscriptions;
331  };
332 
333  using SubscriptionMap =
335 
336  using PublisherMap =
338 
341 
343  static
344  uint64_t
345  get_next_unique_id();
346 
348  void
349  insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method);
350 
352  bool
353  can_communicate(PublisherInfo pub_info, SubscriptionInfo sub_info) const;
354 
355  template<typename MessageT>
356  void
357  add_shared_msg_to_buffers(
359  std::vector<uint64_t> subscription_ids)
360  {
361  for (auto id : subscription_ids) {
362  auto subscription_it = subscriptions_.find(id);
363  if (subscription_it == subscriptions_.end()) {
364  throw std::runtime_error("subscription has unexpectedly gone out of scope");
365  }
366  auto subscription_base = subscription_it->second.subscription;
367 
368  auto subscription = std::static_pointer_cast<
370  >(subscription_base);
371 
372  subscription->provide_intra_process_message(message);
373  }
374  }
375 
376  template<
377  typename MessageT,
378  typename Alloc = std::allocator<void>,
379  typename Deleter = std::default_delete<MessageT>>
380  void
381  add_owned_msg_to_buffers(
383  std::vector<uint64_t> subscription_ids,
385  {
386  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
387  using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
388 
389  for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) {
390  auto subscription_it = subscriptions_.find(*it);
391  if (subscription_it == subscriptions_.end()) {
392  throw std::runtime_error("subscription has unexpectedly gone out of scope");
393  }
394  auto subscription_base = subscription_it->second.subscription;
395 
396  auto subscription = std::static_pointer_cast<
398  >(subscription_base);
399 
400  if (std::next(it) == subscription_ids.end()) {
401  // If this is the last subscription, give up ownership
402  subscription->provide_intra_process_message(std::move(message));
403  } else {
404  // Copy the message since we have additional subscriptions to serve
405  MessageUniquePtr copy_message;
406  Deleter deleter = message.get_deleter();
407  auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1);
408  MessageAllocTraits::construct(*allocator.get(), ptr, *message);
409  copy_message = MessageUniquePtr(ptr, deleter);
410 
411  subscription->provide_intra_process_message(std::move(copy_message));
412  }
413  }
414  }
415 
416  PublisherToSubscriptionIdsMap pub_to_subs_;
417  SubscriptionMap subscriptions_;
418  PublisherMap publishers_;
419 
420  mutable std::shared_timed_mutex mutex_;
421 };
422 
423 } // namespace experimental
424 } // namespace rclcpp
425 
426 #endif // RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
void do_intra_process_publish(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > message, std::shared_ptr< typename allocator::AllocRebind< MessageT, Alloc >::allocator_type > allocator)
Publishes an intra-process message, passed as a unique pointer.
Definition: intra_process_manager.hpp:181
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
void provide_intra_process_message(ConstMessageSharedPtr message)
Definition: subscription_intra_process.hpp:116
Logger get_logger(const std::string &name)
Return a named logger.
T get_deleter(T... args)
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher&#39;s unique id.
T end(T... args)
bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
Definition: subscription_intra_process.hpp:45
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
T next(T... args)
This class performs intra process communication between nodes.
Definition: intra_process_manager.hpp:91
T static_pointer_cast(T... args)
uint64_t add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
Register a publisher with the manager, returns the publisher unique id.
T move(T... args)
T insert(T... args)
T find(T... args)
uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr get_subscription_intra_process(uint64_t intra_process_subscription_id)
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
Set the data type used in the intra-process buffer as std::shared_ptr<MessageT>
T begin(T... args)
std::shared_ptr< const MessageT > do_intra_process_publish_and_return_shared(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > message, std::shared_ptr< typename allocator::AllocRebind< MessageT, Alloc >::allocator_type > allocator)
Definition: intra_process_manager.hpp:244
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
#define RCLCPP_WARN(logger,...)
Definition: logging.hpp:976
void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription&#39;s unique id.
size_t get_subscription_count(uint64_t intra_process_publisher_id) const
Return the number of intraprocess subscriptions that are matched with a given publisher id...