15 #ifndef RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_ 16 #define RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_ 20 #include <shared_mutex> 29 #include <unordered_map> 45 namespace experimental
117 add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription);
182 uint64_t intra_process_publisher_id,
187 using MessageAllocatorT =
typename MessageAllocTraits::allocator_type;
191 auto publisher_it = pub_to_subs_.
find(intra_process_publisher_id);
192 if (publisher_it == pub_to_subs_.
end()) {
196 "Calling do_intra_process_publish for invalid or no longer existing publisher id");
199 const auto & sub_ids = publisher_it->second;
201 if (sub_ids.take_ownership_subscriptions.empty()) {
205 this->
template add_shared_msg_to_buffers<MessageT>(msg, sub_ids.take_shared_subscriptions);
206 }
else if (!sub_ids.take_ownership_subscriptions.empty() &&
207 sub_ids.take_shared_subscriptions.size() <= 1)
214 concatenated_vector.
insert(
215 concatenated_vector.
end(),
216 sub_ids.take_ownership_subscriptions.begin(),
217 sub_ids.take_ownership_subscriptions.end());
219 this->
template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
223 }
else if (!sub_ids.take_ownership_subscriptions.empty() &&
224 sub_ids.take_shared_subscriptions.size() > 1)
228 auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
230 this->
template add_shared_msg_to_buffers<MessageT>(shared_msg,
231 sub_ids.take_shared_subscriptions);
232 this->
template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
234 sub_ids.take_ownership_subscriptions,
245 uint64_t intra_process_publisher_id,
250 using MessageAllocatorT =
typename MessageAllocTraits::allocator_type;
254 auto publisher_it = pub_to_subs_.
find(intra_process_publisher_id);
255 if (publisher_it == pub_to_subs_.
end()) {
259 "Calling do_intra_process_publish for invalid or no longer existing publisher id");
262 const auto & sub_ids = publisher_it->second;
264 if (sub_ids.take_ownership_subscriptions.empty()) {
267 if (!sub_ids.take_shared_subscriptions.empty()) {
268 this->
template add_shared_msg_to_buffers<MessageT>(shared_msg,
269 sub_ids.take_shared_subscriptions);
275 auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
277 if (!sub_ids.take_shared_subscriptions.empty()) {
278 this->
template add_shared_msg_to_buffers<MessageT>(
280 sub_ids.take_shared_subscriptions);
282 if (!sub_ids.take_ownership_subscriptions.empty()) {
283 this->
template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
285 sub_ids.take_ownership_subscriptions,
304 rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr
308 struct SubscriptionInfo
310 SubscriptionInfo() =
default;
312 rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription;
314 const char * topic_name;
315 bool use_take_shared_method;
320 PublisherInfo() =
default;
322 rclcpp::PublisherBase::WeakPtr publisher;
324 const char * topic_name;
327 struct SplittedSubscriptions
345 get_next_unique_id();
349 insert_sub_id_for_pub(uint64_t sub_id, uint64_t pub_id,
bool use_take_shared_method);
353 can_communicate(PublisherInfo pub_info, SubscriptionInfo sub_info)
const;
355 template<
typename MessageT>
357 add_shared_msg_to_buffers(
361 for (
auto id : subscription_ids) {
362 auto subscription_it = subscriptions_.
find(
id);
363 if (subscription_it == subscriptions_.
end()) {
366 auto subscription_base = subscription_it->second.subscription;
370 >(subscription_base);
381 add_owned_msg_to_buffers(
389 for (
auto it = subscription_ids.
begin(); it != subscription_ids.
end(); it++) {
390 auto subscription_it = subscriptions_.
find(*it);
391 if (subscription_it == subscriptions_.
end()) {
394 auto subscription_base = subscription_it->second.subscription;
398 >(subscription_base);
405 MessageUniquePtr copy_message;
407 auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1);
408 MessageAllocTraits::construct(*allocator.get(), ptr, *message);
409 copy_message = MessageUniquePtr(ptr, deleter);
411 subscription->provide_intra_process_message(
std::move(copy_message));
426 #endif // RCLCPP__EXPERIMENTAL__INTRA_PROCESS_MANAGER_HPP_
void do_intra_process_publish(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > message, std::shared_ptr< typename allocator::AllocRebind< MessageT, Alloc >::allocator_type > allocator)
Publishes an intra-process message, passed as a unique pointer.
Definition: intra_process_manager.hpp:181
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
void provide_intra_process_message(ConstMessageSharedPtr message)
Definition: subscription_intra_process.hpp:116
Logger get_logger(const std::string &name)
Return a named logger.
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher's unique id.
bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
Definition: subscription_intra_process.hpp:45
virtual ~IntraProcessManager()
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
This class performs intra process communication between nodes.
Definition: intra_process_manager.hpp:91
T static_pointer_cast(T... args)
uint64_t add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
Register a publisher with the manager, returns the publisher unique id.
uint64_t add_subscription(rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr get_subscription_intra_process(uint64_t intra_process_subscription_id)
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
Set the data type used in the intra-process buffer as std::shared_ptr<MessageT>
std::shared_ptr< const MessageT > do_intra_process_publish_and_return_shared(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > message, std::shared_ptr< typename allocator::AllocRebind< MessageT, Alloc >::allocator_type > allocator)
Definition: intra_process_manager.hpp:244
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
#define RCLCPP_WARN(logger,...)
Definition: logging.hpp:976
void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription's unique id.
size_t get_subscription_count(uint64_t intra_process_publisher_id) const
Return the number of intraprocess subscriptions that are matched with a given publisher id...