rclcpp  master
C++ ROS Client Library API
intra_process_manager.hpp
Go to the documentation of this file.
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__INTRA_PROCESS_MANAGER_HPP_
16 #define RCLCPP__INTRA_PROCESS_MANAGER_HPP_
17 
18 #include <rmw/types.h>
19 
20 #include <algorithm>
21 #include <atomic>
22 #include <cstdint>
23 #include <exception>
24 #include <map>
25 #include <memory>
26 #include <mutex>
27 #include <unordered_map>
28 #include <set>
29 
33 #include "rclcpp/macros.hpp"
34 #include "rclcpp/publisher.hpp"
35 #include "rclcpp/subscription.hpp"
37 
38 namespace rclcpp
39 {
40 namespace intra_process_manager
41 {
42 
44 
123 {
124 private:
126 
127 public:
129 
131  explicit IntraProcessManager(
132  IntraProcessManagerImplBase::SharedPtr state = create_default_impl());
133 
135  virtual ~IntraProcessManager();
136 
138 
151  uint64_t
152  add_subscription(subscription::SubscriptionBase::SharedPtr subscription);
153 
155 
161  void
162  remove_subscription(uint64_t intra_process_subscription_id);
163 
165 
187  template<typename MessageT, typename Alloc>
188  uint64_t
190  size_t buffer_size = 0)
191  {
192  auto id = IntraProcessManager::get_next_unique_id();
193  size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size();
194  auto mrb = mapped_ring_buffer::MappedRingBuffer<MessageT,
196  size, publisher->get_allocator());
197  impl_->add_publisher(id, publisher, mrb, size);
198  return id;
199  }
200 
202 
208  void
209  remove_publisher(uint64_t intra_process_publisher_id);
210 
212 
242  template<typename MessageT, typename Alloc = std::allocator<void>,
243  typename Deleter = std::default_delete<MessageT>>
244  uint64_t
246  uint64_t intra_process_publisher_id,
248  {
249  using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
251  uint64_t message_seq = 0;
252  mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id(
253  intra_process_publisher_id, message_seq);
254  typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
255  if (!typed_buffer) {
256  throw std::runtime_error("Typecast failed due to incorrect message type");
257  }
258 
259  // Insert the message into the ring buffer using the message_seq to identify it.
260  bool did_replace = typed_buffer->push_and_replace(message_seq, message);
261  // TODO(wjwwood): do something when a message was displaced. log debug?
262  (void)did_replace; // Avoid unused variable warning.
263 
264  impl_->store_intra_process_message(intra_process_publisher_id, message_seq);
265 
266  // Return the message sequence which is sent to the subscription.
267  return message_seq;
268  }
269 
271 
305  template<typename MessageT, typename Alloc = std::allocator<void>,
306  typename Deleter = std::default_delete<MessageT>>
307  void
309  uint64_t intra_process_publisher_id,
310  uint64_t message_sequence_number,
311  uint64_t requesting_subscriptions_intra_process_id,
313  {
314  using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
316  message = nullptr;
317 
318  size_t target_subs_size = 0;
319  std::lock_guard<std::mutex> lock(take_mutex_);
320  mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message(
321  intra_process_publisher_id,
322  message_sequence_number,
323  requesting_subscriptions_intra_process_id,
324  target_subs_size
325  );
326  typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
327  if (!typed_buffer) {
328  return;
329  }
330  // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left.
331  if (target_subs_size) {
332  // There are more subscriptions to serve, return a copy.
333  typed_buffer->get_copy_at_key(message_sequence_number, message);
334  } else {
335  // This is the last one to be returned, transfer ownership.
336  typed_buffer->pop_at_key(message_sequence_number, message);
337  }
338  }
339 
342  bool
343  matches_any_publishers(const rmw_gid_t * id) const;
344 
345 private:
347  static uint64_t
348  get_next_unique_id();
349 
350  IntraProcessManagerImplBase::SharedPtr impl_;
351  std::mutex take_mutex_;
352 };
353 
354 } // namespace intra_process_manager
355 } // namespace rclcpp
356 
357 #endif // RCLCPP__INTRA_PROCESS_MANAGER_HPP_
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription&#39;s unique id.
Definition: allocator_common.hpp:24
Ring buffer container of unique_ptr&#39;s of T, which can be accessed by a key.
Definition: mapped_ring_buffer.hpp:59
uint64_t add_subscription(subscription::SubscriptionBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
uint64_t store_intra_process_message(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > &message)
Store a message in the manager, and return the message sequence number.
Definition: intra_process_manager.hpp:245
void take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, std::unique_ptr< MessageT, Deleter > &message)
Take an intra process message.
Definition: intra_process_manager.hpp:308
typename MessageAllocTraits::allocator_type MessageAlloc
Definition: publisher.hpp:162
IntraProcessManager(IntraProcessManagerImplBase::SharedPtr state=create_default_impl())
void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher&#39;s unique id.
uint64_t add_publisher(typename publisher::Publisher< MessageT, Alloc >::SharedPtr publisher, size_t buffer_size=0)
Register a publisher with the manager, returns the publisher unique id.
Definition: intra_process_manager.hpp:189
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
std::shared_ptr< MessageAlloc > get_allocator() const
Definition: publisher.hpp:285
T static_pointer_cast(T... args)
size_t get_queue_size() const
Get the queue size for this publisher.
This class facilitates intra process communication between nodes.
Definition: intra_process_manager.hpp:122
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
IntraProcessManagerImplBase::SharedPtr create_default_impl()
A publisher publishes messages of any type to a topic.
Definition: publisher.hpp:158