rclcpp  beta1
C++ ROS Client Library API
intra_process_manager_impl.hpp
Go to the documentation of this file.
1 // Copyright 2015 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
16 #define RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
17 
18 #include <algorithm>
19 #include <atomic>
20 #include <cstring>
21 #include <functional>
22 #include <limits>
23 #include <map>
24 #include <memory>
25 #include <mutex>
26 #include <set>
27 #include <string>
28 #include <unordered_map>
29 #include <utility>
30 
31 #include "rclcpp/macros.hpp"
33 #include "rclcpp/publisher.hpp"
34 #include "rclcpp/subscription.hpp"
36 
37 namespace rclcpp
38 {
39 namespace intra_process_manager
40 {
41 
43 {
44 public:
46 
47  IntraProcessManagerImplBase() = default;
48  ~IntraProcessManagerImplBase() = default;
49 
50  virtual void
51  add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription) = 0;
52 
53  virtual void
54  remove_subscription(uint64_t intra_process_subscription_id) = 0;
55 
56  virtual void add_publisher(uint64_t id,
57  publisher::PublisherBase::WeakPtr publisher,
58  mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
59  size_t size) = 0;
60 
61  virtual void
62  remove_publisher(uint64_t intra_process_publisher_id) = 0;
63 
64  virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
66  uint64_t intra_process_publisher_id,
67  uint64_t & message_seq) = 0;
68 
69  virtual void
70  store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq) = 0;
71 
72  virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
73  take_intra_process_message(uint64_t intra_process_publisher_id,
74  uint64_t message_sequence_number,
75  uint64_t requesting_subscriptions_intra_process_id,
76  size_t & size) = 0;
77 
78  virtual bool
79  matches_any_publishers(const rmw_gid_t * id) const = 0;
80 
81 private:
83 };
84 
85 template<typename Allocator = std::allocator<void>>
87 {
88 public:
89  IntraProcessManagerImpl() = default;
90  ~IntraProcessManagerImpl() = default;
91 
92  void
93  add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription)
94  {
95  subscriptions_[id] = subscription;
96  // subscription->get_topic_name() -> const char * can be used as the key,
97  // since subscriptions_ shares the ownership of subscription
98  subscription_ids_by_topic_[subscription->get_topic_name()].insert(id);
99  }
100 
101  void
102  remove_subscription(uint64_t intra_process_subscription_id)
103  {
104  subscriptions_.erase(intra_process_subscription_id);
105  for (auto & pair : subscription_ids_by_topic_) {
106  pair.second.erase(intra_process_subscription_id);
107  }
108  // Iterate over all publisher infos and all stored subscription id's and
109  // remove references to this subscription's id.
110  for (auto & publisher_pair : publishers_) {
111  for (auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) {
112  sub_pair.second.erase(intra_process_subscription_id);
113  }
114  }
115  }
116 
117  void add_publisher(uint64_t id,
118  publisher::PublisherBase::WeakPtr publisher,
119  mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
120  size_t size)
121  {
122  publishers_[id].publisher = publisher;
123  // As long as the size of the ring buffer is less than the max sequence number, we're safe.
124  if (size > std::numeric_limits<uint64_t>::max()) {
125  throw std::invalid_argument("the calculated buffer size is too large");
126  }
127  publishers_[id].sequence_number.store(0);
128 
129  publishers_[id].buffer = mrb;
130  publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
131  }
132 
133  void
134  remove_publisher(uint64_t intra_process_publisher_id)
135  {
136  publishers_.erase(intra_process_publisher_id);
137  }
138 
139  // return message_seq and mrb
140  mapped_ring_buffer::MappedRingBufferBase::SharedPtr
142  uint64_t intra_process_publisher_id,
143  uint64_t & message_seq)
144  {
145  std::lock_guard<std::mutex> lock(runtime_mutex_);
146  auto it = publishers_.find(intra_process_publisher_id);
147  if (it == publishers_.end()) {
148  throw std::runtime_error("get_publisher_info_for_id called with invalid publisher id");
149  }
150  PublisherInfo & info = it->second;
151  // Calculate the next message sequence number.
152  message_seq = info.sequence_number.fetch_add(1);
153 
154  return info.buffer;
155  }
156 
157  void
158  store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
159  {
160  std::lock_guard<std::mutex> lock(runtime_mutex_);
161  auto it = publishers_.find(intra_process_publisher_id);
162  if (it == publishers_.end()) {
163  throw std::runtime_error("store_intra_process_message called with invalid publisher id");
164  }
165  PublisherInfo & info = it->second;
166  auto publisher = info.publisher.lock();
167  if (!publisher) {
168  throw std::runtime_error("publisher has unexpectedly gone out of scope");
169  }
170 
171  // Figure out what subscriptions should receive the message.
172  auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()];
173  // Store the list for later comparison.
174  if (info.target_subscriptions_by_message_sequence.count(message_seq) == 0) {
175  info.target_subscriptions_by_message_sequence.emplace(
176  message_seq, AllocSet(std::less<uint64_t>(), uint64_allocator));
177  } else {
178  info.target_subscriptions_by_message_sequence[message_seq].clear();
179  }
180  std::copy(
181  destined_subscriptions.begin(), destined_subscriptions.end(),
182  // Memory allocation occurs in info.target_subscriptions_by_message_sequence[message_seq]
184  info.target_subscriptions_by_message_sequence[message_seq],
185  // This ends up only being a hint to std::set, could also be .begin().
186  info.target_subscriptions_by_message_sequence[message_seq].end()
187  )
188  );
189  }
190 
191  mapped_ring_buffer::MappedRingBufferBase::SharedPtr
192  take_intra_process_message(uint64_t intra_process_publisher_id,
193  uint64_t message_sequence_number,
194  uint64_t requesting_subscriptions_intra_process_id,
195  size_t & size
196  )
197  {
198  std::lock_guard<std::mutex> lock(runtime_mutex_);
199  PublisherInfo * info;
200  {
201  auto it = publishers_.find(intra_process_publisher_id);
202  if (it == publishers_.end()) {
203  // Publisher is either invalid or no longer exists.
204  return 0;
205  }
206  info = &it->second;
207  }
208  // Figure out how many subscriptions are left.
209  AllocSet * target_subs;
210  {
211  auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number);
212  if (it == info->target_subscriptions_by_message_sequence.end()) {
213  // Message is no longer being stored by this publisher.
214  return 0;
215  }
216  target_subs = &it->second;
217  }
218  {
219  auto it = std::find(
220  target_subs->begin(), target_subs->end(),
221  requesting_subscriptions_intra_process_id);
222  if (it == target_subs->end()) {
223  // This publisher id/message seq pair was not intended for this subscription.
224  return 0;
225  }
226  target_subs->erase(it);
227  }
228  size = target_subs->size();
229  return info->buffer;
230  }
231 
232  bool
234  {
235  for (auto & publisher_pair : publishers_) {
236  auto publisher = publisher_pair.second.publisher.lock();
237  if (!publisher) {
238  continue;
239  }
240  if (*publisher.get() == id) {
241  return true;
242  }
243  }
244  return false;
245  }
246 
247 private:
249 
250  template<typename T>
251  using RebindAlloc = typename std::allocator_traits<Allocator>::template rebind_alloc<T>;
252 
253  RebindAlloc<uint64_t> uint64_allocator;
254 
255  using AllocSet = std::set<uint64_t, std::less<uint64_t>, RebindAlloc<uint64_t>>;
256  using SubscriptionMap = std::unordered_map<uint64_t, subscription::SubscriptionBase::WeakPtr,
258  RebindAlloc<std::pair<const uint64_t, subscription::SubscriptionBase::WeakPtr>>>;
259 
260  struct strcmp_wrapper : public std::binary_function<const char *, const char *, bool>
261  {
262  bool
263  operator()(const char * lhs, const char * rhs) const
264  {
265  return std::strcmp(lhs, rhs) < 0;
266  }
267  };
268  using IDTopicMap = std::map<
269  const char *,
270  AllocSet,
271  strcmp_wrapper,
272  RebindAlloc<std::pair<const std::string, AllocSet>>>;
273 
274  SubscriptionMap subscriptions_;
275 
276  IDTopicMap subscription_ids_by_topic_;
277 
278  struct PublisherInfo
279  {
280  RCLCPP_DISABLE_COPY(PublisherInfo)
281 
282  PublisherInfo() = default;
283 
284  publisher::PublisherBase::WeakPtr publisher;
285  std::atomic<uint64_t> sequence_number;
286  mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
287 
288  using TargetSubscriptionsMap = std::unordered_map<uint64_t, AllocSet,
289  std::hash<uint64_t>, std::equal_to<uint64_t>,
290  RebindAlloc<std::pair<const uint64_t, AllocSet>>>;
291  TargetSubscriptionsMap target_subscriptions_by_message_sequence;
292  };
293 
294  using PublisherMap = std::unordered_map<uint64_t, PublisherInfo,
295  std::hash<uint64_t>, std::equal_to<uint64_t>,
296  RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>;
297 
298  PublisherMap publishers_;
299 
300  std::mutex runtime_mutex_;
301 };
302 
304 IntraProcessManagerImplBase::SharedPtr
306 
307 } // namespace intra_process_manager
308 } // namespace rclcpp
309 
310 #endif // RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
T copy(T... args)
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
void add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription)
Definition: intra_process_manager_impl.hpp:93
Definition: allocator_common.hpp:24
virtual void add_publisher(uint64_t id, publisher::PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)=0
Definition: intra_process_manager_impl.hpp:42
T end(T... args)
virtual void remove_publisher(uint64_t intra_process_publisher_id)=0
virtual void remove_subscription(uint64_t intra_process_subscription_id)=0
mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)
Definition: intra_process_manager_impl.hpp:141
void remove_subscription(uint64_t intra_process_subscription_id)
Definition: intra_process_manager_impl.hpp:102
T strcmp(T... args)
T erase(T... args)
virtual bool matches_any_publishers(const rmw_gid_t *id) const =0
mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)
Definition: intra_process_manager_impl.hpp:192
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)=0
bool matches_any_publishers(const rmw_gid_t *id) const
Definition: intra_process_manager_impl.hpp:233
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
void add_publisher(uint64_t id, publisher::PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)
Definition: intra_process_manager_impl.hpp:117
T find(T... args)
T size(T... args)
virtual void add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription)=0
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
Definition: intra_process_manager_impl.hpp:158
T begin(T... args)
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)=0
Definition: intra_process_manager_impl.hpp:86
T inserter(T... args)
IntraProcessManagerImplBase::SharedPtr create_default_impl()
virtual void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)=0
void remove_publisher(uint64_t intra_process_publisher_id)
Definition: intra_process_manager_impl.hpp:134