15 #ifndef RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
16 #define RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
20 #include <shared_mutex>
29 #include <unordered_map>
45 namespace experimental
117 add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription);
182 uint64_t intra_process_publisher_id,
187 using MessageAllocatorT =
typename MessageAllocTraits::allocator_type;
191 auto publisher_it = pub_to_subs_.
find(intra_process_publisher_id);
192 if (publisher_it == pub_to_subs_.
end()) {
196 "Calling do_intra_process_publish for invalid or no longer existing publisher id");
199 const auto & sub_ids = publisher_it->second;
201 if (sub_ids.take_ownership_subscriptions.empty()) {
205 this->
template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
206 msg, sub_ids.take_shared_subscriptions);
207 }
else if (!sub_ids.take_ownership_subscriptions.empty() &&
208 sub_ids.take_shared_subscriptions.size() <= 1)
215 concatenated_vector.
insert(
216 concatenated_vector.
end(),
217 sub_ids.take_ownership_subscriptions.begin(),
218 sub_ids.take_ownership_subscriptions.end());
220 this->
template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
224 }
else if (!sub_ids.take_ownership_subscriptions.empty() &&
225 sub_ids.take_shared_subscriptions.size() > 1)
229 auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
231 this->
template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
232 shared_msg, sub_ids.take_shared_subscriptions);
233 this->
template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
234 std::move(message), sub_ids.take_ownership_subscriptions, allocator);
244 uint64_t intra_process_publisher_id,
249 using MessageAllocatorT =
typename MessageAllocTraits::allocator_type;
253 auto publisher_it = pub_to_subs_.
find(intra_process_publisher_id);
254 if (publisher_it == pub_to_subs_.
end()) {
258 "Calling do_intra_process_publish for invalid or no longer existing publisher id");
261 const auto & sub_ids = publisher_it->second;
263 if (sub_ids.take_ownership_subscriptions.empty()) {
266 if (!sub_ids.take_shared_subscriptions.empty()) {
267 this->
template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
268 shared_msg, sub_ids.take_shared_subscriptions);
274 auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
276 if (!sub_ids.take_shared_subscriptions.empty()) {
277 this->
template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
279 sub_ids.take_shared_subscriptions);
281 if (!sub_ids.take_ownership_subscriptions.empty()) {
282 this->
template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
284 sub_ids.take_ownership_subscriptions,
303 rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
307 struct SubscriptionInfo
309 SubscriptionInfo() =
default;
311 rclcpp::experimental::SubscriptionIntraProcessBase::WeakPtr subscription;
313 const char * topic_name;
314 bool use_take_shared_method;
319 PublisherInfo() =
default;
321 rclcpp::PublisherBase::WeakPtr publisher;
323 const char * topic_name;
326 struct SplittedSubscriptions
332 using SubscriptionMap =
338 using PublisherToSubscriptionIdsMap =
344 get_next_unique_id();
348 insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id,
bool use_take_shared_method);
352 can_communicate(PublisherInfo pub_info, SubscriptionInfo sub_info)
const;
359 add_shared_msg_to_buffers(
363 for (
auto id : subscription_ids) {
364 auto subscription_it = subscriptions_.
find(
id);
365 if (subscription_it == subscriptions_.
end()) {
368 auto subscription_base = subscription_it->second.subscription.lock();
369 if (subscription_base) {
372 >(subscription_base);
373 if (
nullptr == subscription) {
375 "failed to dynamic cast SubscriptionIntraProcessBase to "
376 "SubscriptionIntraProcess<MessageT, Alloc, Deleter>, which "
377 "can happen when the publisher and subscription use different "
378 "allocator types, which is not supported");
381 subscription->provide_intra_process_message(message);
383 subscriptions_.
erase(
id);
393 add_owned_msg_to_buffers(
396 std::shared_ptr<
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type> allocator)
398 using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
401 for (
auto it = subscription_ids.
begin(); it != subscription_ids.
end(); it++) {
402 auto subscription_it = subscriptions_.
find(*it);
403 if (subscription_it == subscriptions_.
end()) {
406 auto subscription_base = subscription_it->second.subscription.lock();
407 if (subscription_base) {
410 >(subscription_base);
411 if (
nullptr == subscription) {
413 "failed to dynamic cast SubscriptionIntraProcessBase to "
414 "SubscriptionIntraProcess<MessageT, Alloc, Deleter>, which "
415 "can happen when the publisher and subscription use different "
416 "allocator types, which is not supported");
421 subscription->provide_intra_process_message(
std::move(message));
424 MessageUniquePtr copy_message;
426 auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1);
427 MessageAllocTraits::construct(*allocator.get(), ptr, *message);
428 copy_message = MessageUniquePtr(ptr, deleter);
430 subscription->provide_intra_process_message(
std::move(copy_message));
433 subscriptions_.
erase(subscription_it);
438 PublisherToSubscriptionIdsMap pub_to_subs_;
439 SubscriptionMap subscriptions_;
440 PublisherMap publishers_;
448 #endif // RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_