rclcpp  master
C++ ROS Client Library API
intra_process_manager_impl.hpp
Go to the documentation of this file.
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
16 #define RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
17 
18 #include <algorithm>
19 #include <atomic>
20 #include <cstring>
21 #include <functional>
22 #include <limits>
23 #include <map>
24 #include <memory>
25 #include <mutex>
26 #include <set>
27 #include <string>
28 #include <unordered_map>
29 #include <utility>
30 
31 #include "rclcpp/macros.hpp"
33 #include "rclcpp/publisher.hpp"
34 #include "rclcpp/subscription.hpp"
36 
37 namespace rclcpp
38 {
39 namespace intra_process_manager
40 {
41 
43 {
44 public:
46 
47  IntraProcessManagerImplBase() = default;
48  virtual ~IntraProcessManagerImplBase() = default;
49 
50  virtual void
51  add_subscription(uint64_t id, SubscriptionBase::SharedPtr subscription) = 0;
52 
53  virtual void
54  remove_subscription(uint64_t intra_process_subscription_id) = 0;
55 
56  virtual void add_publisher(
57  uint64_t id,
58  PublisherBase::WeakPtr publisher,
59  mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
60  size_t size) = 0;
61 
62  virtual void
63  remove_publisher(uint64_t intra_process_publisher_id) = 0;
64 
65  virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
67  uint64_t intra_process_publisher_id,
68  uint64_t & message_seq) = 0;
69 
70  virtual void
71  store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) = 0;
72 
73  virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
75  uint64_t intra_process_publisher_id,
76  uint64_t message_sequence_number,
77  uint64_t requesting_subscriptions_intra_process_id,
78  size_t & size) = 0;
79 
80  virtual bool
81  matches_any_publishers(const rmw_gid_t * id) const = 0;
82 
83 private:
85 };
86 
87 template<typename Allocator = std::allocator<void>>
89 {
90 public:
91  IntraProcessManagerImpl() = default;
92  ~IntraProcessManagerImpl() = default;
93 
94  void
95  add_subscription(uint64_t id, SubscriptionBase::SharedPtr subscription)
96  {
97  subscriptions_[id] = subscription;
98  // subscription->get_topic_name() -> const char * can be used as the key,
99  // since subscriptions_ shares the ownership of subscription
100  subscription_ids_by_topic_[subscription->get_topic_name()].insert(id);
101  }
102 
103  void
104  remove_subscription(uint64_t intra_process_subscription_id)
105  {
106  subscriptions_.erase(intra_process_subscription_id);
107  for (auto & pair : subscription_ids_by_topic_) {
108  pair.second.erase(intra_process_subscription_id);
109  }
110  // Iterate over all publisher infos and all stored subscription id's and
111  // remove references to this subscription's id.
112  for (auto & publisher_pair : publishers_) {
113  for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) {
114  sub_pair.second.erase(intra_process_subscription_id);
115  }
116  }
117  }
118 
120  uint64_t id,
121  PublisherBase::WeakPtr publisher,
122  mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
123  size_t size)
124  {
125  publishers_[id].publisher = publisher;
126  // As long as the size of the ring buffer is less than the max sequence number, we're safe.
127  if (size > std::numeric_limits<uint64_t>::max()) {
128  throw std::invalid_argument("the calculated buffer size is too large");
129  }
130  publishers_[id].sequence_number.store(0);
131 
132  publishers_[id].buffer = mrb;
133  publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
134  }
135 
136  void
137  remove_publisher(uint64_t intra_process_publisher_id)
138  {
139  publishers_.erase(intra_process_publisher_id);
140  }
141 
142  // return message_seq and mrb
143  mapped_ring_buffer::MappedRingBufferBase::SharedPtr
145  uint64_t intra_process_publisher_id,
146  uint64_t & message_seq)
147  {
148  std::lock_guard<std::mutex> lock(runtime_mutex_);
149  auto it = publishers_.find(intra_process_publisher_id);
150  if (it == publishers_.end()) {
151  throw std::runtime_error("get_publisher_info_for_id called with invalid publisher id");
152  }
153  PublisherInfo & info = it->second;
154  // Calculate the next message sequence number.
155  message_seq = info.sequence_number.fetch_add(1);
156 
157  return info.buffer;
158  }
159 
160  void
161  store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
162  {
163  std::lock_guard<std::mutex> lock(runtime_mutex_);
164  auto it = publishers_.find(intra_process_publisher_id);
165  if (it == publishers_.end()) {
166  throw std::runtime_error("store_intra_process_message called with invalid publisher id");
167  }
168  PublisherInfo & info = it->second;
169  auto publisher = info.publisher.lock();
170  if (!publisher) {
171  throw std::runtime_error("publisher has unexpectedly gone out of scope");
172  }
173 
174  // Figure out what subscriptions should receive the message.
175  auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()];
176  // Store the list for later comparison.
177  if (info.target_subscriptions_by_message_sequence.count(message_seq) == 0) {
178  info.target_subscriptions_by_message_sequence.emplace(
179  message_seq, AllocSet(std::less<uint64_t>(), uint64_allocator));
180  } else {
181  info.target_subscriptions_by_message_sequence[message_seq].clear();
182  }
183  std::copy(
184  destined_subscriptions.begin(), destined_subscriptions.end(),
185  // Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq]
187  info.target_subscriptions_by_message_sequence[message_seq],
188  // This ends up only being a hint to std::set, could also be .begin().
189  info.target_subscriptions_by_message_sequence[message_seq].end()
190  )
191  );
192  }
193 
194  mapped_ring_buffer::MappedRingBufferBase::SharedPtr
196  uint64_t intra_process_publisher_id,
197  uint64_t message_sequence_number,
198  uint64_t requesting_subscriptions_intra_process_id,
199  size_t & size
200  )
201  {
202  std::lock_guard<std::mutex> lock(runtime_mutex_);
203  PublisherInfo * info;
204  {
205  auto it = publishers_.find(intra_process_publisher_id);
206  if (it == publishers_.end()) {
207  // Publisher is either invalid or no longer exists.
208  return 0;
209  }
210  info = &it->second;
211  }
212  // Figure out how many subscriptions are left.
213  AllocSet * target_subs;
214  {
215  auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number);
216  if (it == info->target_subscriptions_by_message_sequence.end()) {
217  // Message is no longer being stored by this publisher.
218  return 0;
219  }
220  target_subs = &it->second;
221  }
222  {
223  auto it = std::find(
224  target_subs->begin(), target_subs->end(),
225  requesting_subscriptions_intra_process_id);
226  if (it == target_subs->end()) {
227  // This publisher id/message seq pair was not intended for this subscription.
228  return 0;
229  }
230  target_subs->erase(it);
231  }
232  size = target_subs->size();
233  return info->buffer;
234  }
235 
236  bool
238  {
239  for (auto & publisher_pair : publishers_) {
240  auto publisher = publisher_pair.second.publisher.lock();
241  if (!publisher) {
242  continue;
243  }
244  if (*publisher.get() == id) {
245  return true;
246  }
247  }
248  return false;
249  }
250 
251 private:
253 
254  template<typename T>
255  using RebindAlloc = typename std::allocator_traits<Allocator>::template rebind_alloc<T>;
256 
257  RebindAlloc<uint64_t> uint64_allocator;
258 
259  using AllocSet = std::set<uint64_t, std::less<uint64_t>, RebindAlloc<uint64_t>>;
261  uint64_t, SubscriptionBase::WeakPtr,
263  RebindAlloc<std::pair<const uint64_t, SubscriptionBase::WeakPtr>>>;
264 
265  struct strcmp_wrapper
266  {
267  bool
268  operator()(const char * lhs, const char * rhs) const
269  {
270  return std::strcmp(lhs, rhs) < 0;
271  }
272  };
273  using IDTopicMap = std::map<
274  const char *,
275  AllocSet,
276  strcmp_wrapper,
277  RebindAlloc<std::pair<const char * const, AllocSet>>>;
278 
279  SubscriptionMap subscriptions_;
280 
281  IDTopicMap subscription_ids_by_topic_;
282 
283  struct PublisherInfo
284  {
285  RCLCPP_DISABLE_COPY(PublisherInfo)
286 
287  PublisherInfo() = default;
288 
289  PublisherBase::WeakPtr publisher;
290  std::atomic<uint64_t> sequence_number;
291  mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
292 
293  using TargetSubscriptionsMap = std::unordered_map<
294  uint64_t, AllocSet,
295  std::hash<uint64_t>, std::equal_to<uint64_t>,
296  RebindAlloc<std::pair<const uint64_t, AllocSet>>>;
297  TargetSubscriptionsMap target_subscriptions_by_message_sequence;
298  };
299 
301  uint64_t, PublisherInfo,
302  std::hash<uint64_t>, std::equal_to<uint64_t>,
303  RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>;
304 
305  PublisherMap publishers_;
306 
307  std::mutex runtime_mutex_;
308 };
309 
311 IntraProcessManagerImplBase::SharedPtr
313 
314 } // namespace intra_process_manager
315 } // namespace rclcpp
316 
317 #endif // RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
void add_publisher(uint64_t id, PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)
Definition: intra_process_manager_impl.hpp:119
mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)
Definition: intra_process_manager_impl.hpp:144
T copy(T... args)
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)
Definition: intra_process_manager_impl.hpp:195
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)=0
Definition: allocator_common.hpp:24
virtual bool matches_any_publishers(const rmw_gid_t *id) const =0
T end(T... args)
virtual void remove_publisher(uint64_t intra_process_publisher_id)=0
void add_subscription(uint64_t id, SubscriptionBase::SharedPtr subscription)
Definition: intra_process_manager_impl.hpp:95
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)=0
T strcmp(T... args)
virtual void add_publisher(uint64_t id, PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)=0
T erase(T... args)
virtual void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)=0
Definition: intra_process_manager_impl.hpp:88
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
void remove_publisher(uint64_t intra_process_publisher_id)
Definition: intra_process_manager_impl.hpp:137
T find(T... args)
T size(T... args)
Definition: intra_process_manager_impl.hpp:42
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
virtual void add_subscription(uint64_t id, SubscriptionBase::SharedPtr subscription)=0
T begin(T... args)
bool matches_any_publishers(const rmw_gid_t *id) const
Definition: intra_process_manager_impl.hpp:237
virtual void remove_subscription(uint64_t intra_process_subscription_id)=0
T inserter(T... args)
void remove_subscription(uint64_t intra_process_subscription_id)
Definition: intra_process_manager_impl.hpp:104
IntraProcessManagerImplBase::SharedPtr create_default_impl()
void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
Definition: intra_process_manager_impl.hpp:161