15 #ifndef RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
16 #define RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
20 #include <shared_mutex>
29 #include <unordered_map>
45 namespace experimental
117 add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription);
182 uint64_t intra_process_publisher_id,
187 using MessageAllocatorT =
typename MessageAllocTraits::allocator_type;
191 auto publisher_it = pub_to_subs_.
find(intra_process_publisher_id);
192 if (publisher_it == pub_to_subs_.
end()) {
196 "Calling do_intra_process_publish for invalid or no longer existing publisher id");
199 const auto & sub_ids = publisher_it->second;
201 if (sub_ids.take_ownership_subscriptions.empty()) {
205 this->
template add_shared_msg_to_buffers<MessageT>(msg, sub_ids.take_shared_subscriptions);
206 }
else if (!sub_ids.take_ownership_subscriptions.empty() &&
207 sub_ids.take_shared_subscriptions.size() <= 1)
214 concatenated_vector.
insert(
215 concatenated_vector.
end(),
216 sub_ids.take_ownership_subscriptions.begin(),
217 sub_ids.take_ownership_subscriptions.end());
219 this->
template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
223 }
else if (!sub_ids.take_ownership_subscriptions.empty() &&
224 sub_ids.take_shared_subscriptions.size() > 1)
228 auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
230 this->
template add_shared_msg_to_buffers<MessageT>(
231 shared_msg, sub_ids.take_shared_subscriptions);
232 this->
template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
233 std::move(message), sub_ids.take_ownership_subscriptions, allocator);
243 uint64_t intra_process_publisher_id,
248 using MessageAllocatorT =
typename MessageAllocTraits::allocator_type;
252 auto publisher_it = pub_to_subs_.
find(intra_process_publisher_id);
253 if (publisher_it == pub_to_subs_.
end()) {
257 "Calling do_intra_process_publish for invalid or no longer existing publisher id");
260 const auto & sub_ids = publisher_it->second;
262 if (sub_ids.take_ownership_subscriptions.empty()) {
265 if (!sub_ids.take_shared_subscriptions.empty()) {
266 this->
template add_shared_msg_to_buffers<MessageT>(
267 shared_msg, sub_ids.take_shared_subscriptions);
273 auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
275 if (!sub_ids.take_shared_subscriptions.empty()) {
276 this->
template add_shared_msg_to_buffers<MessageT>(
278 sub_ids.take_shared_subscriptions);
280 if (!sub_ids.take_ownership_subscriptions.empty()) {
281 this->
template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
283 sub_ids.take_ownership_subscriptions,
302 rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
306 struct SubscriptionInfo
308 SubscriptionInfo() =
default;
310 rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription;
312 const char * topic_name;
313 bool use_take_shared_method;
318 PublisherInfo() =
default;
320 rclcpp::PublisherBase::WeakPtr publisher;
322 const char * topic_name;
325 struct SplittedSubscriptions
331 using SubscriptionMap =
337 using PublisherToSubscriptionIdsMap =
343 get_next_unique_id();
347 insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id,
bool use_take_shared_method);
351 can_communicate(PublisherInfo pub_info, SubscriptionInfo sub_info)
const;
353 template<
typename MessageT>
355 add_shared_msg_to_buffers(
359 for (
auto id : subscription_ids) {
360 auto subscription_it = subscriptions_.
find(
id);
361 if (subscription_it == subscriptions_.
end()) {
364 auto subscription_base = subscription_it->second.subscription;
368 >(subscription_base);
379 add_owned_msg_to_buffers(
382 std::shared_ptr<
typename allocator::AllocRebind<MessageT, Alloc>::allocator_type> allocator)
384 using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
387 for (
auto it = subscription_ids.
begin(); it != subscription_ids.
end(); it++) {
388 auto subscription_it = subscriptions_.
find(*it);
389 if (subscription_it == subscriptions_.
end()) {
392 auto subscription_base = subscription_it->second.subscription;
396 >(subscription_base);
403 MessageUniquePtr copy_message;
405 auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1);
406 MessageAllocTraits::construct(*allocator.get(), ptr, *message);
407 copy_message = MessageUniquePtr(ptr, deleter);
409 subscription->provide_intra_process_message(
std::move(copy_message));
414 PublisherToSubscriptionIdsMap pub_to_subs_;
415 SubscriptionMap subscriptions_;
416 PublisherMap publishers_;
424 #endif // RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_