rclcpp  master
C++ ROS Client Library API
intra_process_manager_impl.hpp
Go to the documentation of this file.
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
16 #define RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
17 
18 #include <algorithm>
19 #include <array>
20 #include <atomic>
21 #include <cstring>
22 #include <functional>
23 #include <limits>
24 #include <map>
25 #include <memory>
26 #include <mutex>
27 #include <set>
28 #include <stdexcept>
29 #include <string>
30 #include <unordered_map>
31 #include <utility>
32 
34 
35 #include "rclcpp/macros.hpp"
40 
41 namespace rclcpp
42 {
43 namespace intra_process_manager
44 {
45 
47 {
48 public:
50 
51  IntraProcessManagerImplBase() = default;
52  virtual ~IntraProcessManagerImplBase() = default;
53 
54  virtual void
55  add_subscription(uint64_t id, SubscriptionBase::SharedPtr subscription) = 0;
56 
57  virtual void
58  remove_subscription(uint64_t intra_process_subscription_id) = 0;
59 
60  virtual void add_publisher(
61  uint64_t id,
62  PublisherBase::WeakPtr publisher,
63  mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
64  size_t size) = 0;
65 
66  virtual void
67  remove_publisher(uint64_t intra_process_publisher_id) = 0;
68 
69  virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
71  uint64_t intra_process_publisher_id,
72  uint64_t & message_seq) = 0;
73 
74  virtual void
75  store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) = 0;
76 
77  virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
79  uint64_t intra_process_publisher_id,
80  uint64_t message_sequence_number,
81  uint64_t requesting_subscriptions_intra_process_id,
82  size_t & size) = 0;
83 
84  virtual bool
85  matches_any_publishers(const rmw_gid_t * id) const = 0;
86 
87  virtual size_t
88  get_subscription_count(uint64_t intra_process_publisher_id) const = 0;
89 
90 private:
92 };
93 
94 template<typename Allocator = std::allocator<void>>
96 {
97 public:
98  IntraProcessManagerImpl() = default;
99  ~IntraProcessManagerImpl() = default;
100 
101  void
102  add_subscription(uint64_t id, SubscriptionBase::SharedPtr subscription)
103  {
104  subscriptions_[id] = subscription;
105  subscription_ids_by_topic_[fixed_size_string(subscription->get_topic_name())].insert(id);
106  }
107 
108  void
109  remove_subscription(uint64_t intra_process_subscription_id)
110  {
111  subscriptions_.erase(intra_process_subscription_id);
112  for (auto & pair : subscription_ids_by_topic_) {
113  pair.second.erase(intra_process_subscription_id);
114  }
115  // Iterate over all publisher infos and all stored subscription id's and
116  // remove references to this subscription's id.
117  for (auto & publisher_pair : publishers_) {
118  for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) {
119  sub_pair.second.erase(intra_process_subscription_id);
120  }
121  }
122  }
123 
125  uint64_t id,
126  PublisherBase::WeakPtr publisher,
127  mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
128  size_t size)
129  {
130  publishers_[id].publisher = publisher;
131  // As long as the size of the ring buffer is less than the max sequence number, we're safe.
132  if (size > std::numeric_limits<uint64_t>::max()) {
133  throw std::invalid_argument("the calculated buffer size is too large");
134  }
135  publishers_[id].sequence_number.store(0);
136 
137  publishers_[id].buffer = mrb;
138  publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
139  }
140 
141  void
142  remove_publisher(uint64_t intra_process_publisher_id)
143  {
144  publishers_.erase(intra_process_publisher_id);
145  }
146 
147  // return message_seq and mrb
148  mapped_ring_buffer::MappedRingBufferBase::SharedPtr
150  uint64_t intra_process_publisher_id,
151  uint64_t & message_seq)
152  {
153  std::lock_guard<std::mutex> lock(runtime_mutex_);
154  auto it = publishers_.find(intra_process_publisher_id);
155  if (it == publishers_.end()) {
156  throw std::runtime_error("get_publisher_info_for_id called with invalid publisher id");
157  }
158  PublisherInfo & info = it->second;
159  // Calculate the next message sequence number.
160  message_seq = info.sequence_number.fetch_add(1);
161 
162  return info.buffer;
163  }
164 
165  void
166  store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
167  {
168  std::lock_guard<std::mutex> lock(runtime_mutex_);
169  auto it = publishers_.find(intra_process_publisher_id);
170  if (it == publishers_.end()) {
171  throw std::runtime_error("store_intra_process_message called with invalid publisher id");
172  }
173  PublisherInfo & info = it->second;
174  auto publisher = info.publisher.lock();
175  if (!publisher) {
176  throw std::runtime_error("publisher has unexpectedly gone out of scope");
177  }
178 
179  // Figure out what subscriptions should receive the message.
180  auto & destined_subscriptions =
181  subscription_ids_by_topic_[fixed_size_string(publisher->get_topic_name())];
182  // Store the list for later comparison.
183  if (info.target_subscriptions_by_message_sequence.count(message_seq) == 0) {
184  info.target_subscriptions_by_message_sequence.emplace(
185  message_seq, AllocSet(std::less<uint64_t>(), uint64_allocator));
186  } else {
187  info.target_subscriptions_by_message_sequence[message_seq].clear();
188  }
189  std::copy(
190  destined_subscriptions.begin(), destined_subscriptions.end(),
191  // Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq]
193  info.target_subscriptions_by_message_sequence[message_seq],
194  // This ends up only being a hint to std::set, could also be .begin().
195  info.target_subscriptions_by_message_sequence[message_seq].end()
196  )
197  );
198  }
199 
200  mapped_ring_buffer::MappedRingBufferBase::SharedPtr
202  uint64_t intra_process_publisher_id,
203  uint64_t message_sequence_number,
204  uint64_t requesting_subscriptions_intra_process_id,
205  size_t & size
206  )
207  {
208  std::lock_guard<std::mutex> lock(runtime_mutex_);
209  PublisherInfo * info;
210  {
211  auto it = publishers_.find(intra_process_publisher_id);
212  if (it == publishers_.end()) {
213  // Publisher is either invalid or no longer exists.
214  return 0;
215  }
216  info = &it->second;
217  }
218  // Figure out how many subscriptions are left.
219  AllocSet * target_subs;
220  {
221  auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number);
222  if (it == info->target_subscriptions_by_message_sequence.end()) {
223  // Message is no longer being stored by this publisher.
224  return 0;
225  }
226  target_subs = &it->second;
227  }
228  {
229  auto it = std::find(
230  target_subs->begin(), target_subs->end(),
231  requesting_subscriptions_intra_process_id);
232  if (it == target_subs->end()) {
233  // This publisher id/message seq pair was not intended for this subscription.
234  return 0;
235  }
236  target_subs->erase(it);
237  }
238  size = target_subs->size();
239  return info->buffer;
240  }
241 
242  bool
244  {
245  for (auto & publisher_pair : publishers_) {
246  auto publisher = publisher_pair.second.publisher.lock();
247  if (!publisher) {
248  continue;
249  }
250  if (*publisher.get() == id) {
251  return true;
252  }
253  }
254  return false;
255  }
256 
257  size_t
258  get_subscription_count(uint64_t intra_process_publisher_id) const
259  {
260  auto publisher_it = publishers_.find(intra_process_publisher_id);
261  if (publisher_it == publishers_.end()) {
262  // Publisher is either invalid or no longer exists.
263  return 0;
264  }
265  auto publisher = publisher_it->second.publisher.lock();
266  if (!publisher) {
267  throw std::runtime_error("publisher has unexpectedly gone out of scope");
268  }
269  auto sub_map_it =
270  subscription_ids_by_topic_.find(fixed_size_string(publisher->get_topic_name()));
271  if (sub_map_it == subscription_ids_by_topic_.end()) {
272  // No intraprocess subscribers
273  return 0;
274  }
275  return sub_map_it->second.size();
276  }
277 
278 private:
280 
282 
284  fixed_size_string(const char * str) const
285  {
286  FixedSizeString ret;
287  size_t size = std::strlen(str) + 1;
288  if (size > ret.size()) {
289  throw std::runtime_error("failed to copy topic name");
290  }
291  std::memcpy(ret.data(), str, size);
292  return ret;
293  }
294  struct strcmp_wrapper
295  {
296  bool
297  operator()(const FixedSizeString lhs, const FixedSizeString rhs) const
298  {
299  return std::strcmp(lhs.data(), rhs.data()) < 0;
300  }
301  };
302 
303  template<typename T>
304  using RebindAlloc = typename std::allocator_traits<Allocator>::template rebind_alloc<T>;
305 
306  RebindAlloc<uint64_t> uint64_allocator;
307 
308  using AllocSet = std::set<uint64_t, std::less<uint64_t>, RebindAlloc<uint64_t>>;
310  uint64_t, SubscriptionBase::WeakPtr,
312  RebindAlloc<std::pair<const uint64_t, SubscriptionBase::WeakPtr>>>;
313 
314  using IDTopicMap = std::map<
316  AllocSet,
317  strcmp_wrapper,
318  RebindAlloc<std::pair<const FixedSizeString, AllocSet>>>;
319 
320  SubscriptionMap subscriptions_;
321 
322  IDTopicMap subscription_ids_by_topic_;
323 
324  struct PublisherInfo
325  {
326  RCLCPP_DISABLE_COPY(PublisherInfo)
327 
328  PublisherInfo() = default;
329 
330  PublisherBase::WeakPtr publisher;
331  std::atomic<uint64_t> sequence_number;
332  mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
333 
334  using TargetSubscriptionsMap = std::unordered_map<
335  uint64_t, AllocSet,
336  std::hash<uint64_t>, std::equal_to<uint64_t>,
337  RebindAlloc<std::pair<const uint64_t, AllocSet>>>;
338  TargetSubscriptionsMap target_subscriptions_by_message_sequence;
339  };
340 
342  uint64_t, PublisherInfo,
343  std::hash<uint64_t>, std::equal_to<uint64_t>,
344  RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>;
345 
346  PublisherMap publishers_;
347 
348  std::mutex runtime_mutex_;
349 };
350 
352 IntraProcessManagerImplBase::SharedPtr
354 
355 } // namespace intra_process_manager
356 } // namespace rclcpp
357 
358 #endif // RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
void add_publisher(uint64_t id, PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)
Definition: intra_process_manager_impl.hpp:124
mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)
Definition: intra_process_manager_impl.hpp:149
T copy(T... args)
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)
Definition: intra_process_manager_impl.hpp:201
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)=0
This header provides the get_node_topics_interface() template function.
Definition: allocator_common.hpp:24
virtual bool matches_any_publishers(const rmw_gid_t *id) const =0
T end(T... args)
virtual void remove_publisher(uint64_t intra_process_publisher_id)=0
void add_subscription(uint64_t id, SubscriptionBase::SharedPtr subscription)
Definition: intra_process_manager_impl.hpp:102
T data(T... args)
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)=0
T memcpy(T... args)
T strlen(T... args)
T strcmp(T... args)
virtual void add_publisher(uint64_t id, PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)=0
virtual size_t get_subscription_count(uint64_t intra_process_publisher_id) const =0
T erase(T... args)
virtual void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)=0
Definition: intra_process_manager_impl.hpp:95
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
void remove_publisher(uint64_t intra_process_publisher_id)
Definition: intra_process_manager_impl.hpp:142
T find(T... args)
T size(T... args)
Definition: intra_process_manager_impl.hpp:46
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
virtual void add_subscription(uint64_t id, SubscriptionBase::SharedPtr subscription)=0
T begin(T... args)
size_t get_subscription_count(uint64_t intra_process_publisher_id) const
Definition: intra_process_manager_impl.hpp:258
bool matches_any_publishers(const rmw_gid_t *id) const
Definition: intra_process_manager_impl.hpp:243
virtual void remove_subscription(uint64_t intra_process_subscription_id)=0
T inserter(T... args)
void remove_subscription(uint64_t intra_process_subscription_id)
Definition: intra_process_manager_impl.hpp:109
IntraProcessManagerImplBase::SharedPtr create_default_impl()
void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
Definition: intra_process_manager_impl.hpp:166