rclcpp  master
C++ ROS Client Library API
intra_process_manager.hpp
Go to the documentation of this file.
1 // Copyright 2019 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
16 #define RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
17 
18 #include <rmw/types.h>
19 
20 #include <shared_mutex>
21 
22 #include <algorithm>
23 #include <atomic>
24 #include <cstdint>
25 #include <exception>
26 #include <map>
27 #include <memory>
28 #include <string>
29 #include <unordered_map>
30 #include <utility>
31 #include <vector>
32 
36 #include "rclcpp/logger.hpp"
37 #include "rclcpp/logging.hpp"
38 #include "rclcpp/macros.hpp"
41 
42 namespace rclcpp
43 {
44 
45 namespace experimental
46 {
47 
49 
92 {
93 private:
95 
96 public:
98 
101 
103  virtual ~IntraProcessManager();
104 
106 
116  uint64_t
117  add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription);
118 
120 
126  void
127  remove_subscription(uint64_t intra_process_subscription_id);
128 
130 
140  uint64_t
141  add_publisher(rclcpp::PublisherBase::SharedPtr publisher);
142 
144 
150  void
151  remove_publisher(uint64_t intra_process_publisher_id);
152 
154 
176  template<
177  typename MessageT,
178  typename Alloc = std::allocator<void>,
180  void
182  uint64_t intra_process_publisher_id,
185  {
186  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
187  using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
188 
190 
191  auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
192  if (publisher_it == pub_to_subs_.end()) {
193  // Publisher is either invalid or no longer exists.
194  RCLCPP_WARN(
195  rclcpp::get_logger("rclcpp"),
196  "Calling do_intra_process_publish for invalid or no longer existing publisher id");
197  return;
198  }
199  const auto & sub_ids = publisher_it->second;
200 
201  if (sub_ids.take_ownership_subscriptions.empty()) {
202  // None of the buffers require ownership, so we promote the pointer
203  std::shared_ptr<MessageT> msg = std::move(message);
204 
205  this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
206  msg, sub_ids.take_shared_subscriptions);
207  } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
208  sub_ids.take_shared_subscriptions.size() <= 1)
209  {
210  // There is at maximum 1 buffer that does not require ownership.
211  // So this case is equivalent to all the buffers requiring ownership
212 
213  // Merge the two vector of ids into a unique one
214  std::vector<uint64_t> concatenated_vector(sub_ids.take_shared_subscriptions);
215  concatenated_vector.insert(
216  concatenated_vector.end(),
217  sub_ids.take_ownership_subscriptions.begin(),
218  sub_ids.take_ownership_subscriptions.end());
219 
220  this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
221  std::move(message),
222  concatenated_vector,
223  allocator);
224  } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
225  sub_ids.take_shared_subscriptions.size() > 1)
226  {
227  // Construct a new shared pointer from the message
228  // for the buffers that do not require ownership
229  auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
230 
231  this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
232  shared_msg, sub_ids.take_shared_subscriptions);
233  this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
234  std::move(message), sub_ids.take_ownership_subscriptions, allocator);
235  }
236  }
237 
238  template<
239  typename MessageT,
240  typename Alloc = std::allocator<void>,
244  uint64_t intra_process_publisher_id,
247  {
248  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
249  using MessageAllocatorT = typename MessageAllocTraits::allocator_type;
250 
252 
253  auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
254  if (publisher_it == pub_to_subs_.end()) {
255  // Publisher is either invalid or no longer exists.
256  RCLCPP_WARN(
257  rclcpp::get_logger("rclcpp"),
258  "Calling do_intra_process_publish for invalid or no longer existing publisher id");
259  return nullptr;
260  }
261  const auto & sub_ids = publisher_it->second;
262 
263  if (sub_ids.take_ownership_subscriptions.empty()) {
264  // If there are no owning, just convert to shared.
265  std::shared_ptr<MessageT> shared_msg = std::move(message);
266  if (!sub_ids.take_shared_subscriptions.empty()) {
267  this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
268  shared_msg, sub_ids.take_shared_subscriptions);
269  }
270  return shared_msg;
271  } else {
272  // Construct a new shared pointer from the message for the buffers that
273  // do not require ownership and to return.
274  auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
275 
276  if (!sub_ids.take_shared_subscriptions.empty()) {
277  this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
278  shared_msg,
279  sub_ids.take_shared_subscriptions);
280  }
281  if (!sub_ids.take_ownership_subscriptions.empty()) {
282  this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
283  std::move(message),
284  sub_ids.take_ownership_subscriptions,
285  allocator);
286  }
287 
288  return shared_msg;
289  }
290  }
291 
294  bool
295  matches_any_publishers(const rmw_gid_t * id) const;
296 
299  size_t
300  get_subscription_count(uint64_t intra_process_publisher_id) const;
301 
303  rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
304  get_subscription_intra_process(uint64_t intra_process_subscription_id);
305 
306 private:
307  struct SubscriptionInfo
308  {
309  SubscriptionInfo() = default;
310 
311  rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr subscription;
312  rmw_qos_profile_t qos;
313  const char * topic_name;
314  bool use_take_shared_method;
315  };
316 
317  struct PublisherInfo
318  {
319  PublisherInfo() = default;
320 
321  rclcpp::PublisherBase::WeakPtr publisher;
322  rmw_qos_profile_t qos;
323  const char * topic_name;
324  };
325 
326  struct SplittedSubscriptions
327  {
328  std::vector<uint64_t> take_shared_subscriptions;
329  std::vector<uint64_t> take_ownership_subscriptions;
330  };
331 
332  using SubscriptionMap =
334 
335  using PublisherMap =
337 
338  using PublisherToSubscriptionIdsMap =
340 
342  static
343  uint64_t
344  get_next_unique_id();
345 
347  void
348  insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method);
349 
351  bool
352  can_communicate(PublisherInfo pub_info, SubscriptionInfo sub_info) const;
353 
354  template<
355  typename MessageT,
356  typename Alloc,
357  typename Deleter>
358  void
359  add_shared_msg_to_buffers(
361  std::vector<uint64_t> subscription_ids)
362  {
363  for (auto id : subscription_ids) {
364  auto subscription_it = subscriptions_.find(id);
365  if (subscription_it == subscriptions_.end()) {
366  throw std::runtime_error("subscription has unexpectedly gone out of scope");
367  }
368  auto subscription_base = subscription_it->second.subscription.lock();
369  if (subscription_base) {
370  auto subscription = std::dynamic_pointer_cast<
372  >(subscription_base);
373  if (nullptr == subscription) {
374  throw std::runtime_error(
375  "failed to dynamic cast SubscriptionIntraProcessBase to "
376  "SubscriptionIntraProcess<MessageT, Alloc, Deleter>, which "
377  "can happen when the publisher and subscription use different "
378  "allocator types, which is not supported");
379  }
380 
381  subscription->provide_intra_process_message(message);
382  } else {
383  subscriptions_.erase(id);
384  }
385  }
386  }
387 
388  template<
389  typename MessageT,
390  typename Alloc = std::allocator<void>,
392  void
393  add_owned_msg_to_buffers(
395  std::vector<uint64_t> subscription_ids,
396  std::shared_ptr<typename allocator::AllocRebind<MessageT, Alloc>::allocator_type> allocator)
397  {
398  using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
399  using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
400 
401  for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) {
402  auto subscription_it = subscriptions_.find(*it);
403  if (subscription_it == subscriptions_.end()) {
404  throw std::runtime_error("subscription has unexpectedly gone out of scope");
405  }
406  auto subscription_base = subscription_it->second.subscription.lock();
407  if (subscription_base) {
408  auto subscription = std::dynamic_pointer_cast<
410  >(subscription_base);
411  if (nullptr == subscription) {
412  throw std::runtime_error(
413  "failed to dynamic cast SubscriptionIntraProcessBase to "
414  "SubscriptionIntraProcess<MessageT, Alloc, Deleter>, which "
415  "can happen when the publisher and subscription use different "
416  "allocator types, which is not supported");
417  }
418 
419  if (std::next(it) == subscription_ids.end()) {
420  // If this is the last subscription, give up ownership
421  subscription->provide_intra_process_message(std::move(message));
422  } else {
423  // Copy the message since we have additional subscriptions to serve
424  MessageUniquePtr copy_message;
425  Deleter deleter = message.get_deleter();
426  auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1);
427  MessageAllocTraits::construct(*allocator.get(), ptr, *message);
428  copy_message = MessageUniquePtr(ptr, deleter);
429 
430  subscription->provide_intra_process_message(std::move(copy_message));
431  }
432  } else {
433  subscriptions_.erase(subscription_it);
434  }
435  }
436  }
437 
438  PublisherToSubscriptionIdsMap pub_to_subs_;
439  SubscriptionMap subscriptions_;
440  PublisherMap publishers_;
441 
442  mutable std::shared_timed_mutex mutex_;
443 };
444 
445 } // namespace experimental
446 } // namespace rclcpp
447 
448 #endif // RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
RCLCPP_WARN
#define RCLCPP_WARN(logger,...)
Definition: logging.hpp:963
std::unique_ptr::get_deleter
T get_deleter(T... args)
std::shared_ptr
rclcpp::experimental::IntraProcessManager::do_intra_process_publish_and_return_shared
std::shared_ptr< const MessageT > do_intra_process_publish_and_return_shared(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > message, std::shared_ptr< typename allocator::AllocRebind< MessageT, Alloc >::allocator_type > allocator)
Definition: intra_process_manager.hpp:243
std::move
T move(T... args)
RCLCPP_DISABLE_COPY
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
std::vector< uint64_t >
std::unordered_map::find
T find(T... args)
rclcpp::experimental::IntraProcessManager::remove_subscription
void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription's unique id.
rclcpp::get_logger
Logger get_logger(const std::string &name)
Return a named logger.
rclcpp::experimental::IntraProcessManager::get_subscription_count
size_t get_subscription_count(uint64_t intra_process_publisher_id) const
Return the number of intraprocess subscriptions that are matched with a given publisher id.
rclcpp::experimental::IntraProcessManager::IntraProcessManager
IntraProcessManager()
std::shared_timed_mutex
rclcpp::experimental::IntraProcessManager::add_publisher
uint64_t add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
Register a publisher with the manager, returns the publisher unique id.
rclcpp
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
subscription_intra_process_base.hpp
rclcpp::experimental::IntraProcessManager::do_intra_process_publish
void do_intra_process_publish(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > message, std::shared_ptr< typename allocator::AllocRebind< MessageT, Alloc >::allocator_type > allocator)
Publishes an intra-process message, passed as a unique pointer.
Definition: intra_process_manager.hpp:181
RCLCPP_PUBLIC
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
rclcpp::experimental::IntraProcessManager::add_subscription
uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
allocator_deleter.hpp
RCLCPP_SMART_PTR_DEFINITIONS
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
macros.hpp
logging.hpp
rclcpp::allocator::AllocRebind
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
std::dynamic_pointer_cast
T dynamic_pointer_cast(T... args)
std::unordered_map::erase
T erase(T... args)
rclcpp::experimental::SubscriptionIntraProcess
Definition: subscription_intra_process.hpp:47
std::runtime_error
rclcpp::experimental::IntraProcessManager::matches_any_publishers
bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
publisher_base.hpp
rclcpp::experimental::IntraProcessManager::~IntraProcessManager
virtual ~IntraProcessManager()
rmw_gid_t
std::vector::begin
T begin(T... args)
visibility_control.hpp
rclcpp::allocator::Deleter
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
std::vector::insert
T insert(T... args)
logger.hpp
rmw_qos_profile_t
std::allocator
std::unordered_map::end
T end(T... args)
subscription_intra_process.hpp
rclcpp::experimental::IntraProcessManager
This class performs intra process communication between nodes.
Definition: intra_process_manager.hpp:91
types.h
std::unique_ptr
std::unordered_map< uint64_t, SubscriptionInfo >
std::default_delete
rclcpp::experimental::IntraProcessManager::remove_publisher
void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher's unique id.
std::shared_lock
rclcpp::experimental::IntraProcessManager::get_subscription_intra_process
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr get_subscription_intra_process(uint64_t intra_process_subscription_id)
std::next
T next(T... args)