rclcpp  master
C++ ROS Client Library API
intra_process_manager.hpp
Go to the documentation of this file.
1 // Copyright 2019 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
16 #define RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
17 
18 #include <rmw/types.h>
19 
20 #include <shared_mutex>
21 
22 #include <algorithm>
23 #include <atomic>
24 #include <cstdint>
25 #include <exception>
26 #include <map>
27 #include <memory>
28 #include <string>
29 #include <unordered_map>
30 #include <utility>
31 #include <vector>
32 
36 #include "rclcpp/logger.hpp"
37 #include "rclcpp/logging.hpp"
38 #include "rclcpp/macros.hpp"
41 
42 namespace rclcpp
43 {
44 
45 namespace experimental
46 {
47 
49 
92 {
93 private:
95 
96 public:
98 
101 
103  virtual ~IntraProcessManager();
104 
106 
116  uint64_t
117  add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription);
118 
120 
126  void
127  remove_subscription(uint64_t intra_process_subscription_id);
128 
130 
140  uint64_t
141  add_publisher(rclcpp::PublisherBase::SharedPtr publisher);
142 
144 
150  void
151  remove_publisher(uint64_t intra_process_publisher_id);
152 
154 
176  template<
177  typename MessageT,
178  typename Alloc = std::allocator<void>,
180  void
182  uint64_t intra_process_publisher_id,
185  {
186  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
187  using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
188 
190 
191  auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
192  if (publisher_it == pub_to_subs_.end()) {
193  // Publisher is either invalid or no longer exists.
194  RCLCPP_WARN(
195  rclcpp::get_logger("rclcpp"),
196  "Calling do_intra_process_publish for invalid or no longer existing publisher id");
197  return;
198  }
199  const auto & sub_ids = publisher_it->second;
200 
201  if (sub_ids.take_ownership_subscriptions.empty()) {
202  // None of the buffers require ownership, so we promote the pointer
203  std::shared_ptr<MessageT> msg = std::move(message);
204 
205  this->template add_shared_msg_to_buffers<MessageT>(msg, sub_ids.take_shared_subscriptions);
206  } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
207  sub_ids.take_shared_subscriptions.size() <= 1)
208  {
209  // There is at maximum 1 buffer that does not require ownership.
210  // So we this case is equivalent to all the buffers requiring ownership
211 
212  // Merge the two vector of ids into a unique one
213  std::vector<uint64_t> concatenated_vector(sub_ids.take_shared_subscriptions);
214  concatenated_vector.insert(
215  concatenated_vector.end(),
216  sub_ids.take_ownership_subscriptions.begin(),
217  sub_ids.take_ownership_subscriptions.end());
218 
219  this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
220  std::move(message),
221  concatenated_vector,
222  allocator);
223  } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
224  sub_ids.take_shared_subscriptions.size() > 1)
225  {
226  // Construct a new shared pointer from the message
227  // for the buffers that do not require ownership
228  auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
229 
230  this->template add_shared_msg_to_buffers<MessageT>(
231  shared_msg, sub_ids.take_shared_subscriptions);
232  this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
233  std::move(message), sub_ids.take_ownership_subscriptions, allocator);
234  }
235  }
236 
237  template<
238  typename MessageT,
239  typename Alloc = std::allocator<void>,
243  uint64_t intra_process_publisher_id,
246  {
247  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
248  using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
249 
251 
252  auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
253  if (publisher_it == pub_to_subs_.end()) {
254  // Publisher is either invalid or no longer exists.
255  RCLCPP_WARN(
256  rclcpp::get_logger("rclcpp"),
257  "Calling do_intra_process_publish for invalid or no longer existing publisher id");
258  return nullptr;
259  }
260  const auto & sub_ids = publisher_it->second;
261 
262  if (sub_ids.take_ownership_subscriptions.empty()) {
263  // If there are no owning, just convert to shared.
264  std::shared_ptr<MessageT> shared_msg = std::move(message);
265  if (!sub_ids.take_shared_subscriptions.empty()) {
266  this->template add_shared_msg_to_buffers<MessageT>(
267  shared_msg, sub_ids.take_shared_subscriptions);
268  }
269  return shared_msg;
270  } else {
271  // Construct a new shared pointer from the message for the buffers that
272  // do not require ownership and to return.
273  auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
274 
275  if (!sub_ids.take_shared_subscriptions.empty()) {
276  this->template add_shared_msg_to_buffers<MessageT>(
277  shared_msg,
278  sub_ids.take_shared_subscriptions);
279  }
280  if (!sub_ids.take_ownership_subscriptions.empty()) {
281  this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
282  std::move(message),
283  sub_ids.take_ownership_subscriptions,
284  allocator);
285  }
286 
287  return shared_msg;
288  }
289  }
290 
293  bool
294  matches_any_publishers(const rmw_gid_t * id) const;
295 
298  size_t
299  get_subscription_count(uint64_t intra_process_publisher_id) const;
300 
302  rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
303  get_subscription_intra_process(uint64_t intra_process_subscription_id);
304 
305 private:
306  struct SubscriptionInfo
307  {
308  SubscriptionInfo() = default;
309 
310  rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription;
311  rmw_qos_profile_t qos;
312  const char * topic_name;
313  bool use_take_shared_method;
314  };
315 
316  struct PublisherInfo
317  {
318  PublisherInfo() = default;
319 
320  rclcpp::PublisherBase::WeakPtr publisher;
321  rmw_qos_profile_t qos;
322  const char * topic_name;
323  };
324 
325  struct SplittedSubscriptions
326  {
327  std::vector<uint64_t> take_shared_subscriptions;
328  std::vector<uint64_t> take_ownership_subscriptions;
329  };
330 
331  using SubscriptionMap =
333 
334  using PublisherMap =
336 
337  using PublisherToSubscriptionIdsMap =
339 
341  static
342  uint64_t
343  get_next_unique_id();
344 
346  void
347  insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method);
348 
350  bool
351  can_communicate(PublisherInfo pub_info, SubscriptionInfo sub_info) const;
352 
353  template<typename MessageT>
354  void
355  add_shared_msg_to_buffers(
357  std::vector<uint64_t> subscription_ids)
358  {
359  for (auto id : subscription_ids) {
360  auto subscription_it = subscriptions_.find(id);
361  if (subscription_it == subscriptions_.end()) {
362  throw std::runtime_error("subscription has unexpectedly gone out of scope");
363  }
364  auto subscription_base = subscription_it->second.subscription;
365 
366  auto subscription = std::static_pointer_cast<
368  >(subscription_base);
369 
370  subscription->provide_intra_process_message(message);
371  }
372  }
373 
374  template<
375  typename MessageT,
376  typename Alloc = std::allocator<void>,
378  void
379  add_owned_msg_to_buffers(
381  std::vector<uint64_t> subscription_ids,
382  std::shared_ptr<typename allocator::AllocRebind<MessageT, Alloc>::allocator_type> allocator)
383  {
384  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
385  using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
386 
387  for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) {
388  auto subscription_it = subscriptions_.find(*it);
389  if (subscription_it == subscriptions_.end()) {
390  throw std::runtime_error("subscription has unexpectedly gone out of scope");
391  }
392  auto subscription_base = subscription_it->second.subscription;
393 
394  auto subscription = std::static_pointer_cast<
396  >(subscription_base);
397 
398  if (std::next(it) == subscription_ids.end()) {
399  // If this is the last subscription, give up ownership
400  subscription->provide_intra_process_message(std::move(message));
401  } else {
402  // Copy the message since we have additional subscriptions to serve
403  MessageUniquePtr copy_message;
404  Deleter deleter = message.get_deleter();
405  auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1);
406  MessageAllocTraits::construct(*allocator.get(), ptr, *message);
407  copy_message = MessageUniquePtr(ptr, deleter);
408 
409  subscription->provide_intra_process_message(std::move(copy_message));
410  }
411  }
412  }
413 
414  PublisherToSubscriptionIdsMap pub_to_subs_;
415  SubscriptionMap subscriptions_;
416  PublisherMap publishers_;
417 
418  mutable std::shared_timed_mutex mutex_;
419 };
420 
421 } // namespace experimental
422 } // namespace rclcpp
423 
424 #endif // RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
RCLCPP_WARN
#define RCLCPP_WARN(logger,...)
Definition: logging.hpp:977
std::unique_ptr::get_deleter
T get_deleter(T... args)
rclcpp::experimental::SubscriptionIntraProcess::provide_intra_process_message
void provide_intra_process_message(ConstMessageSharedPtr message)
Definition: subscription_intra_process.hpp:118
std::shared_ptr
rclcpp::experimental::IntraProcessManager::do_intra_process_publish_and_return_shared
std::shared_ptr< const MessageT > do_intra_process_publish_and_return_shared(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > message, std::shared_ptr< typename allocator::AllocRebind< MessageT, Alloc >::allocator_type > allocator)
Definition: intra_process_manager.hpp:242
std::move
T move(T... args)
RCLCPP_DISABLE_COPY
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
std::vector< uint64_t >
std::unordered_map::find
T find(T... args)
rclcpp::experimental::IntraProcessManager::remove_subscription
void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription's unique id.
rclcpp::get_logger
Logger get_logger(const std::string &name)
Return a named logger.
rclcpp::experimental::IntraProcessManager::get_subscription_count
size_t get_subscription_count(uint64_t intra_process_publisher_id) const
Return the number of intraprocess subscriptions that are matched with a given publisher id.
rclcpp::experimental::IntraProcessManager::IntraProcessManager
IntraProcessManager()
std::shared_timed_mutex
rclcpp::experimental::IntraProcessManager::add_publisher
uint64_t add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
Register a publisher with the manager, returns the publisher unique id.
rclcpp
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
subscription_intra_process_base.hpp
rclcpp::experimental::IntraProcessManager::do_intra_process_publish
void do_intra_process_publish(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > message, std::shared_ptr< typename allocator::AllocRebind< MessageT, Alloc >::allocator_type > allocator)
Publishes an intra-process message, passed as a unique pointer.
Definition: intra_process_manager.hpp:181
RCLCPP_PUBLIC
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
rclcpp::experimental::IntraProcessManager::add_subscription
uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
allocator_deleter.hpp
RCLCPP_SMART_PTR_DEFINITIONS
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
macros.hpp
logging.hpp
rclcpp::allocator::AllocRebind
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
std::static_pointer_cast
T static_pointer_cast(T... args)
rclcpp::experimental::SubscriptionIntraProcess
Definition: subscription_intra_process.hpp:45
std::runtime_error
rclcpp::experimental::IntraProcessManager::matches_any_publishers
bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
publisher_base.hpp
rclcpp::experimental::IntraProcessManager::~IntraProcessManager
virtual ~IntraProcessManager()
rmw_gid_t
std::vector::begin
T begin(T... args)
visibility_control.hpp
rclcpp::allocator::Deleter
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
std::vector::insert
T insert(T... args)
logger.hpp
rmw_qos_profile_t
std::allocator
std::unordered_map::end
T end(T... args)
subscription_intra_process.hpp
rclcpp::experimental::IntraProcessManager
This class performs intra process communication between nodes.
Definition: intra_process_manager.hpp:91
types.h
std::unique_ptr
std::unordered_map< uint64_t, SubscriptionInfo >
std::default_delete
rclcpp::experimental::IntraProcessManager::remove_publisher
void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher's unique id.
std::shared_lock
rclcpp::experimental::IntraProcessManager::get_subscription_intra_process
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr get_subscription_intra_process(uint64_t intra_process_subscription_id)
std::next
T next(T... args)