15 #ifndef RCLCPP__INTRA_PROCESS_MANAGER_HPP_ 16 #define RCLCPP__INTRA_PROCESS_MANAGER_HPP_ 26 #include <unordered_map> 39 namespace intra_process_manager
186 template<
typename MessageT,
typename Alloc>
189 size_t buffer_size = 0)
191 auto id = IntraProcessManager::get_next_unique_id();
192 size_t size = buffer_size > 0 ? buffer_size : publisher->
get_queue_size();
196 impl_->add_publisher(
id, publisher, mrb, size);
241 template<
typename MessageT,
typename Alloc = std::allocator<
void>,
242 typename Deleter = std::default_delete<MessageT>>
245 uint64_t intra_process_publisher_id,
246 std::unique_ptr<MessageT, Deleter> & message)
248 using MRBMessageAlloc =
typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
250 uint64_t message_seq = 0;
251 mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->get_publisher_info_for_id(
252 intra_process_publisher_id, message_seq);
253 typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
255 throw std::runtime_error(
"Typecast failed due to incorrect message type");
259 bool did_replace = typed_buffer->push_and_replace(message_seq, message);
263 impl_->store_intra_process_message(intra_process_publisher_id, message_seq);
304 template<
typename MessageT,
typename Alloc = std::allocator<
void>,
305 typename Deleter = std::default_delete<MessageT>>
308 uint64_t intra_process_publisher_id,
309 uint64_t message_sequence_number,
310 uint64_t requesting_subscriptions_intra_process_id,
311 std::unique_ptr<MessageT, Deleter> & message)
313 using MRBMessageAlloc =
typename std::allocator_traits<Alloc>::template rebind_alloc<MessageT>;
317 size_t target_subs_size = 0;
318 mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer = impl_->take_intra_process_message(
319 intra_process_publisher_id,
320 message_sequence_number,
321 requesting_subscriptions_intra_process_id,
324 typename TypedMRB::SharedPtr typed_buffer = std::static_pointer_cast<TypedMRB>(buffer);
329 if (target_subs_size) {
331 typed_buffer->get_copy_at_key(message_sequence_number, message);
334 typed_buffer->pop_at_key(message_sequence_number, message);
346 get_next_unique_id();
348 IntraProcessManagerImplBase::SharedPtr impl_;
354 #endif // RCLCPP__INTRA_PROCESS_MANAGER_HPP_
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
void remove_subscription(uint64_t intra_process_subscription_id)
Unregister a subscription using the subscription's unique id.
Definition: allocator_common.hpp:24
Ring buffer container of unique_ptr's of T, which can be accessed by a key.
Definition: mapped_ring_buffer.hpp:59
uint64_t add_subscription(subscription::SubscriptionBase::SharedPtr subscription)
Register a subscription with the manager, returns subscriptions unique id.
uint64_t store_intra_process_message(uint64_t intra_process_publisher_id, std::unique_ptr< MessageT, Deleter > &message)
Store a message in the manager, and return the message sequence number.
Definition: intra_process_manager.hpp:244
void take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, std::unique_ptr< MessageT, Deleter > &message)
Take an intra process message.
Definition: intra_process_manager.hpp:307
typename MessageAllocTraits::allocator_type MessageAlloc
Definition: publisher.hpp:150
IntraProcessManager(IntraProcessManagerImplBase::SharedPtr state=create_default_impl())
void remove_publisher(uint64_t intra_process_publisher_id)
Unregister a publisher using the publisher's unique id.
uint64_t add_publisher(typename publisher::Publisher< MessageT, Alloc >::SharedPtr publisher, size_t buffer_size=0)
Register a publisher with the manager, returns the publisher unique id.
Definition: intra_process_manager.hpp:188
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
bool matches_any_publishers(const rmw_gid_t *id) const
Return true if the given rmw_gid_t matches any stored Publishers.
std::shared_ptr< MessageAlloc > get_allocator() const
Definition: publisher.hpp:273
size_t get_queue_size() const
Get the queue size for this publisher.
This class facilitates intra process communication between nodes.
Definition: intra_process_manager.hpp:121
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
virtual ~IntraProcessManager()
IntraProcessManagerImplBase::SharedPtr create_default_impl()
A publisher publishes messages of any type to a topic.
Definition: publisher.hpp:146