rclcpp  beta1
C++ ROS Client Library API
intra_process_manager.hpp
Go to the documentation of this file.
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__INTRA_PROCESS_MANAGER_HPP_
16 #define RCLCPP__INTRA_PROCESS_MANAGER_HPP_
17 
18 #include <rmw/types.h>
19 
20 #include <algorithm>
21 #include <atomic>
22 #include <cstdint>
23 #include <exception>
24 #include <map>
25 #include <memory>
26 #include <unordered_map>
27 #include <set>
28 
32 #include "rclcpp/macros.hpp"
33 #include "rclcpp/publisher.hpp"
34 #include "rclcpp/subscription.hpp"
36 
37 namespace rclcpp
38 {
39 namespace intra_process_manager
40 {
41 
43 
122 {
123 private:
125 
126 public:
128 
130  explicit IntraProcessManager(
131  IntraProcessManagerImplBase::SharedPtr state = create_default_impl());
132 
134  virtual ~IntraProcessManager();
135 
137 
150  uint64_t
151  add_subscription(subscription::SubscriptionBase::SharedPtr subscription);
152 
154 
160  void
161  remove_subscription(uint64_t intra_process_subscription_id);
162 
164 
186  template<typename MessageT, typename Alloc>
187  uint64_t
189  size_t buffer_size = 0)
190  {
191  auto id = IntraProcessManager::get_next_unique_id();
192  size_t size = buffer_size > 0 ? buffer_size : publisher->get_queue_size();
193  auto mrb = mapped_ring_buffer::MappedRingBuffer<MessageT,
195  size, publisher->get_allocator());
196  impl_->add_publisher(id, publisher, mrb, size);
197  return id;
198  }
199 
201 
207  void
208  remove_publisher(uint64_t intra_process_publisher_id);
209 
211 
241  template<typename MessageT, typename Alloc = std::allocator<void>,
242  typename Deleter = std::default_delete<MessageT>>
243  uint64_t
245  uint64_t intra_process_publisher_id,
247  {
248  using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
250  uint64_t message_seq = 0;
251  mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id(
252  intra_process_publisher_id, message_seq);
253  typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
254  if (!typed_buffer) {
255  throw std::runtime_error("Typecast failed due to incorrect message type");
256  }
257 
258  // Insert the message into the ring buffer using the message_seq to identify it.
259  bool did_replace = typed_buffer->push_and_replace(message_seq, message);
260  // TODO(wjwwood): do something when a message was displaced. log debug?
261  (void)did_replace; // Avoid unused variable warning.
262 
263  impl_->store_intra_process_message(intra_process_publisher_id, message_seq);
264 
265  // Return the message sequence which is sent to the subscription.
266  return message_seq;
267  }
268 
270 
304  template<typename MessageT, typename Alloc = std::allocator<void>,
305  typename Deleter = std::default_delete<MessageT>>
306  void
308  uint64_t intra_process_publisher_id,
309  uint64_t message_sequence_number,
310  uint64_t requesting_subscriptions_intra_process_id,
312  {
313  using MRBMessageAlloc = typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
315  message = nullptr;
316 
317  size_t target_subs_size = 0;
318  mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message(
319  intra_process_publisher_id,
320  message_sequence_number,
321  requesting_subscriptions_intra_process_id,
322  target_subs_size
323  );
324  typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
325  if (!typed_buffer) {
326  return;
327  }
328  // Return a copy or the unique_ptr (ownership) depending on how many subscriptions are left.
329  if (target_subs_size) {
330  // There are more subscriptions to serve, return a copy.
331  typed_buffer->get_copy_at_key(message_sequence_number, message);
332  } else {
333  // This is the last one to be returned, transfer ownership.
334  typed_buffer->pop_at_key(message_sequence_number, message);
335  }
336  }
337 
340  bool
341  matches_any_publishers(const rmw_gid_t * id) const;
342 
343 private:
345  static uint64_t
346  get_next_unique_id();
347 
348  IntraProcessManagerImplBase::SharedPtr impl_;
349 };
350 
351 } // namespace intra_process_manager
352 } // namespace rclcpp
353 
354 #endif // RCLCPP__INTRA_PROCESS_MANAGER_HPP_
void take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, std::unique_ptr< MessageT, Deleter > &message)
Take an intra process message.
Definition: intra_process_manager.hpp:307
bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
This class facilitates intra process communication between nodes.
Definition: intra_process_manager.hpp:121
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
IntraProcessManager(IntraProcessManagerImplBase::SharedPtr state=create_default_impl())
Definition: allocator_common.hpp:24
typename MessageAllocTraits::allocator_type MessageAlloc
Definition: publisher.hpp:150
Ring buffer container of unique_ptr&#39;s of T, which can be accessed by a key.
Definition: mapped_ring_buffer.hpp:59
uint64_t store_intra_process_message(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > &message)
Store a message in the manager, and return the message sequence number.
Definition: intra_process_manager.hpp:244
uint64_t add_publisher(typename publisher::Publisher< MessageT, Alloc >::SharedPtr publisher, size_t buffer_size=0)
Register a publisher with the manager, returns the publisher unique id.
Definition: intra_process_manager.hpp:188
void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher&#39;s unique id.
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
T static_pointer_cast(T... args)
void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription&#39;s unique id.
uint64_t add_subscription(subscription::SubscriptionBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
A publisher publishes messages of any type to a topic.
Definition: publisher.hpp:146
std::shared_ptr< MessageAlloc > get_allocator() const
Definition: publisher.hpp:264
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
IntraProcessManagerImplBase::SharedPtr create_default_impl()
size_t get_queue_size() const
Get the queue size for this publisher.