rclcpp  master
C++ ROS Client Library API
intra_process_manager.hpp
Go to the documentation of this file.
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__INTRA_PROCESS_MANAGER_HPP_
16 #define RCLCPP__INTRA_PROCESS_MANAGER_HPP_
17 
18 #include <rmw/types.h>
19 
20 #include <algorithm>
21 #include <atomic>
22 #include <cstdint>
23 #include <exception>
24 #include <map>
25 #include <memory>
26 #include <mutex>
27 #include <unordered_map>
28 #include <utility>
29 #include <set>
30 
34 #include "rclcpp/macros.hpp"
38 
39 namespace rclcpp
40 {
41 namespace intra_process_manager
42 {
43 
45 
124 {
125 private:
127 
128 public:
130 
132  explicit IntraProcessManager(
133  IntraProcessManagerImplBase::SharedPtr state = create_default_impl());
134 
136  virtual ~IntraProcessManager();
137 
139 
152  uint64_t
153  add_subscription(SubscriptionBase::SharedPtr subscription);
154 
156 
162  void
163  remove_subscription(uint64_t intra_process_subscription_id);
164 
166 
189  uint64_t
191  rclcpp::PublisherBase::SharedPtr publisher,
192  size_t buffer_size = 0);
193 
195 
201  void
202  remove_publisher(uint64_t intra_process_publisher_id);
203 
205 
235  template<
236  typename MessageT, typename Alloc = std::allocator<void>>
237  uint64_t
239  uint64_t intra_process_publisher_id,
241  {
242  using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
244  uint64_t message_seq = 0;
245  mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id(
246  intra_process_publisher_id, message_seq);
247  typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
248  if (!typed_buffer) {
249  throw std::runtime_error("Typecast failed due to incorrect message type");
250  }
251 
252  // Insert the message into the ring buffer using the message_seq to identify it.
253  bool did_replace = typed_buffer->push_and_replace(message_seq, message);
254  // TODO(wjwwood): do something when a message was displaced. log debug?
255  (void)did_replace; // Avoid unused variable warning.
256 
257  impl_->store_intra_process_message(intra_process_publisher_id, message_seq);
258 
259  // Return the message sequence which is sent to the subscription.
260  return message_seq;
261  }
262 
263  template<
264  typename MessageT, typename Alloc = std::allocator<void>,
266  uint64_t
268  uint64_t intra_process_publisher_id,
270  {
271  using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
273  uint64_t message_seq = 0;
274  mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id(
275  intra_process_publisher_id, message_seq);
276  typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
277  if (!typed_buffer) {
278  throw std::runtime_error("Typecast failed due to incorrect message type");
279  }
280 
281  // Insert the message into the ring buffer using the message_seq to identify it.
282  bool did_replace = typed_buffer->push_and_replace(message_seq, std::move(message));
283  // TODO(wjwwood): do something when a message was displaced. log debug?
284  (void)did_replace; // Avoid unused variable warning.
285 
286  impl_->store_intra_process_message(intra_process_publisher_id, message_seq);
287 
288  // Return the message sequence which is sent to the subscription.
289  return message_seq;
290  }
291 
293 
327  template<
328  typename MessageT, typename Alloc = std::allocator<void>,
329  typename Deleter = std::default_delete<MessageT>>
330  void
332  uint64_t intra_process_publisher_id,
333  uint64_t message_sequence_number,
334  uint64_t requesting_subscriptions_intra_process_id,
336  {
337  using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
339  message = nullptr;
340 
341  size_t target_subs_size = 0;
342  std::lock_guard<std::mutex> lock(take_mutex_);
343  mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message(
344  intra_process_publisher_id,
345  message_sequence_number,
346  requesting_subscriptions_intra_process_id,
347  target_subs_size
348  );
349  typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
350  if (!typed_buffer) {
351  return;
352  }
353  // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left.
354  if (target_subs_size) {
355  // There are more subscriptions to serve, return a copy.
356  typed_buffer->get(message_sequence_number, message);
357  } else {
358  // This is the last one to be returned, transfer ownership.
359  typed_buffer->pop(message_sequence_number, message);
360  }
361  }
362 
363  template<
364  typename MessageT, typename Alloc = std::allocator<void>>
365  void
367  uint64_t intra_process_publisher_id,
368  uint64_t message_sequence_number,
369  uint64_t requesting_subscriptions_intra_process_id,
371  {
372  using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
374  message = nullptr;
375 
376  size_t target_subs_size = 0;
377  std::lock_guard<std::mutex> lock(take_mutex_);
378  mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message(
379  intra_process_publisher_id,
380  message_sequence_number,
381  requesting_subscriptions_intra_process_id,
382  target_subs_size
383  );
384  typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
385  if (!typed_buffer) {
386  return;
387  }
388  // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left.
389  if (target_subs_size) {
390  // There are more subscriptions to serve, return a copy.
391  typed_buffer->get(message_sequence_number, message);
392  } else {
393  // This is the last one to be returned, transfer ownership.
394  typed_buffer->pop(message_sequence_number, message);
395  }
396  }
397 
400  bool
401  matches_any_publishers(const rmw_gid_t * id) const;
402 
405  size_t
406  get_subscription_count(uint64_t intra_process_publisher_id) const;
407 
408 private:
410  static uint64_t
411  get_next_unique_id();
412 
413  IntraProcessManagerImplBase::SharedPtr impl_;
414  std::mutex take_mutex_;
415 };
416 
417 } // namespace intra_process_manager
418 } // namespace rclcpp
419 
420 #endif // RCLCPP__INTRA_PROCESS_MANAGER_HPP_
uint64_t store_intra_process_message(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > message)
Definition: intra_process_manager.hpp:267
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription&#39;s unique id.
This header provides the get_node_topics_interface() template function.
Definition: allocator_common.hpp:24
void take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, std::shared_ptr< const MessageT > &message)
Definition: intra_process_manager.hpp:366
size_t get_subscription_count(uint64_t intra_process_publisher_id) const
Return the number of intraprocess subscriptions to a topic, given the publisher id.
Ring buffer container of shared_ptr&#39;s or unique_ptr&#39;s of T, which can be accessed by a key...
Definition: mapped_ring_buffer.hpp:60
uint64_t store_intra_process_message(uint64_t intra_process_publisher_id, std::shared_ptr< const MessageT > message)
Store a message in the manager, and return the message sequence number.
Definition: intra_process_manager.hpp:238
void take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, std::unique_ptr< MessageT, Deleter > &message)
Take an intra process message.
Definition: intra_process_manager.hpp:331
IntraProcessManager(IntraProcessManagerImplBase::SharedPtr state=create_default_impl())
void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher&#39;s unique id.
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
T static_pointer_cast(T... args)
T move(T... args)
This class facilitates intra process communication between nodes.
Definition: intra_process_manager.hpp:123
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
uint64_t add_subscription(SubscriptionBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
IntraProcessManagerImplBase::SharedPtr create_default_impl()
uint64_t add_publisher(rclcpp::PublisherBase::SharedPtr publisher, size_t buffer_size=0)
Register a publisher with the manager, returns the publisher unique id.