rclcpp  master
C++ ROS Client Library API
intra_process_manager.hpp
Go to the documentation of this file.
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__INTRA_PROCESS_MANAGER_HPP_
16 #define RCLCPP__INTRA_PROCESS_MANAGER_HPP_
17 
18 #include <rmw/types.h>
19 
20 #include <algorithm>
21 #include <atomic>
22 #include <cstdint>
23 #include <exception>
24 #include <map>
25 #include <memory>
26 #include <mutex>
27 #include <unordered_map>
28 #include <set>
29 
33 #include "rclcpp/macros.hpp"
34 #include "rclcpp/publisher.hpp"
35 #include "rclcpp/subscription.hpp"
37 
38 namespace rclcpp
39 {
40 namespace intra_process_manager
41 {
42 
44 
123 {
124 private:
126 
127 public:
129 
131  explicit IntraProcessManager(
132  IntraProcessManagerImplBase::SharedPtr state = create_default_impl());
133 
135  virtual ~IntraProcessManager();
136 
138 
151  uint64_t
152  add_subscription(SubscriptionBase::SharedPtr subscription);
153 
155 
161  void
162  remove_subscription(uint64_t intra_process_subscription_id);
163 
165 
187  template<typename MessageT, typename Alloc>
188  uint64_t
190  typename Publisher<MessageT, Alloc>::SharedPtr publisher,
191  size_t buffer_size = 0)
192  {
193  auto id = IntraProcessManager::get_next_unique_id();
194  size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size();
196  MessageT,
198  >::make_shared(size, publisher->get_allocator());
199  impl_->add_publisher(id, publisher, mrb, size);
200  return id;
201  }
202 
204 
210  void
211  remove_publisher(uint64_t intra_process_publisher_id);
212 
214 
244  template<
245  typename MessageT, typename Alloc = std::allocator<void>,
247  uint64_t
249  uint64_t intra_process_publisher_id,
251  {
252  using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
254  uint64_t message_seq = 0;
255  mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id(
256  intra_process_publisher_id, message_seq);
257  typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
258  if (!typed_buffer) {
259  throw std::runtime_error("Typecast failed due to incorrect message type");
260  }
261 
262  // Insert the message into the ring buffer using the message_seq to identify it.
263  bool did_replace = typed_buffer->push_and_replace(message_seq, message);
264  // TODO(wjwwood): do something when a message was displaced. log debug?
265  (void)did_replace; // Avoid unused variable warning.
266 
267  impl_->store_intra_process_message(intra_process_publisher_id, message_seq);
268 
269  // Return the message sequence which is sent to the subscription.
270  return message_seq;
271  }
272 
274 
308  template<
309  typename MessageT, typename Alloc = std::allocator<void>,
310  typename Deleter = std::default_delete<MessageT>>
311  void
313  uint64_t intra_process_publisher_id,
314  uint64_t message_sequence_number,
315  uint64_t requesting_subscriptions_intra_process_id,
317  {
318  using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
320  message = nullptr;
321 
322  size_t target_subs_size = 0;
323  std::lock_guard<std::mutex> lock(take_mutex_);
324  mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message(
325  intra_process_publisher_id,
326  message_sequence_number,
327  requesting_subscriptions_intra_process_id,
328  target_subs_size
329  );
330  typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
331  if (!typed_buffer) {
332  return;
333  }
334  // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left.
335  if (target_subs_size) {
336  // There are more subscriptions to serve, return a copy.
337  typed_buffer->get_copy_at_key(message_sequence_number, message);
338  } else {
339  // This is the last one to be returned, transfer ownership.
340  typed_buffer->pop_at_key(message_sequence_number, message);
341  }
342  }
343 
346  bool
347  matches_any_publishers(const rmw_gid_t * id) const;
348 
349 private:
351  static uint64_t
352  get_next_unique_id();
353 
354  IntraProcessManagerImplBase::SharedPtr impl_;
355  std::mutex take_mutex_;
356 };
357 
358 } // namespace intra_process_manager
359 } // namespace rclcpp
360 
361 #endif // RCLCPP__INTRA_PROCESS_MANAGER_HPP_
std::shared_ptr< MessageAlloc > get_allocator() const
Definition: publisher.hpp:308
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
typename MessageAllocTraits::allocator_type MessageAlloc
Definition: publisher.hpp:159
void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription&#39;s unique id.
Definition: allocator_common.hpp:24
Ring buffer container of unique_ptr&#39;s of T, which can be accessed by a key.
Definition: mapped_ring_buffer.hpp:59
uint64_t add_publisher(typename Publisher< MessageT, Alloc >::SharedPtr publisher, size_t buffer_size=0)
Register a publisher with the manager, returns the publisher unique id.
Definition: intra_process_manager.hpp:189
uint64_t store_intra_process_message(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > &message)
Store a message in the manager, and return the message sequence number.
Definition: intra_process_manager.hpp:248
void take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, std::unique_ptr< MessageT, Deleter > &message)
Take an intra process message.
Definition: intra_process_manager.hpp:312
size_t get_queue_size() const
Get the queue size for this publisher.
IntraProcessManager(IntraProcessManagerImplBase::SharedPtr state=create_default_impl())
void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher&#39;s unique id.
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
A publisher publishes messages of any type to a topic.
Definition: publisher.hpp:155
T static_pointer_cast(T... args)
This class facilitates intra process communication between nodes.
Definition: intra_process_manager.hpp:122
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
uint64_t add_subscription(SubscriptionBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
IntraProcessManagerImplBase::SharedPtr create_default_impl()