15 #ifndef RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_    16 #define RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_    28 #include <unordered_map>    39 namespace intra_process_manager
    51   add_subscription(uint64_t 
id, subscription::SubscriptionBase::SharedPtr subscription) = 0;
    57     publisher::PublisherBase::WeakPtr publisher,
    58     mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
    64   virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
    66     uint64_t intra_process_publisher_id,
    67     uint64_t & message_seq) = 0;
    72   virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
    74     uint64_t message_sequence_number,
    75     uint64_t requesting_subscriptions_intra_process_id,
    85 template<
typename Allocator = std::allocator<
void>>
    95     subscriptions_[id] = subscription;
    98     subscription_ids_by_topic_[subscription->get_topic_name()].insert(
id);
   104     subscriptions_.erase(intra_process_subscription_id);
   105     for (
auto & pair : subscription_ids_by_topic_) {
   106       pair.second.erase(intra_process_subscription_id);
   110     for (
auto & publisher_pair : publishers_) {
   111       for (
auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) {
   112         sub_pair.second.erase(intra_process_subscription_id);
   118     publisher::PublisherBase::WeakPtr publisher,
   119     mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
   122     publishers_[id].publisher = publisher;
   127     publishers_[id].sequence_number.store(0);
   129     publishers_[id].buffer = mrb;
   130     publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
   136     publishers_.erase(intra_process_publisher_id);
   140   mapped_ring_buffer::MappedRingBufferBase::SharedPtr
   142     uint64_t intra_process_publisher_id,
   143     uint64_t & message_seq)
   146     auto it = publishers_.find(intra_process_publisher_id);
   147     if (it == publishers_.end()) {
   148       throw std::runtime_error(
"get_publisher_info_for_id called with invalid publisher id");
   150     PublisherInfo & info = it->second;
   152     message_seq = info.sequence_number.fetch_add(1);
   161     auto it = publishers_.find(intra_process_publisher_id);
   162     if (it == publishers_.end()) {
   163       throw std::runtime_error(
"store_intra_process_message called with invalid publisher id");
   165     PublisherInfo & info = it->second;
   166     auto publisher = info.publisher.lock();
   172     auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()];
   174     if (info.target_subscriptions_by_message_sequence.count(message_seq) == 0) {
   175       info.target_subscriptions_by_message_sequence.emplace(
   178       info.target_subscriptions_by_message_sequence[message_seq].clear();
   181       destined_subscriptions.begin(), destined_subscriptions.end(),
   184         info.target_subscriptions_by_message_sequence[message_seq],
   186         info.target_subscriptions_by_message_sequence[message_seq].end()
   191   mapped_ring_buffer::MappedRingBufferBase::SharedPtr
   193     uint64_t message_sequence_number,
   194     uint64_t requesting_subscriptions_intra_process_id,
   199     PublisherInfo * info;
   201       auto it = publishers_.find(intra_process_publisher_id);
   202       if (it == publishers_.end()) {
   211       auto it = info->target_subscriptions_by_message_sequence.
find(message_sequence_number);
   212       if (it == info->target_subscriptions_by_message_sequence.end()) {
   216       target_subs = &it->second;
   220         target_subs->
begin(), target_subs->
end(),
   221         requesting_subscriptions_intra_process_id);
   222       if (it == target_subs->
end()) {
   226       target_subs->
erase(it);
   228     size = target_subs->
size();
   235     for (
auto & publisher_pair : publishers_) {
   236       auto publisher = publisher_pair.second.publisher.lock();
   240       if (*publisher.get() == id) {
   253   RebindAlloc<uint64_t> uint64_allocator;
   258       RebindAlloc<std::pair<const uint64_t, subscription::SubscriptionBase::WeakPtr>>>;
   263     operator()(
const char * lhs, 
const char * rhs)
 const   272       RebindAlloc<std::pair<const std::string, AllocSet>>>;
   282     PublisherInfo() = 
default;
   284     publisher::PublisherBase::WeakPtr publisher;
   286     mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
   289         std::hash<uint64_t>, std::equal_to<uint64_t>,
   290         RebindAlloc<std::pair<const uint64_t, AllocSet>>>;
   291     TargetSubscriptionsMap target_subscriptions_by_message_sequence;
   295       std::hash<uint64_t>, std::equal_to<uint64_t>,
   296       RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>;
   304 IntraProcessManagerImplBase::SharedPtr
   310 #endif  // RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_ 
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
 
~IntraProcessManagerImplBase()=default
 
void add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription)
Definition: intra_process_manager_impl.hpp:93
 
Definition: allocator_common.hpp:24
 
virtual void add_publisher(uint64_t id, publisher::PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)=0
 
Definition: intra_process_manager_impl.hpp:42
 
virtual void remove_publisher(uint64_t intra_process_publisher_id)=0
 
virtual void remove_subscription(uint64_t intra_process_subscription_id)=0
 
mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)
Definition: intra_process_manager_impl.hpp:141
 
void remove_subscription(uint64_t intra_process_subscription_id)
Definition: intra_process_manager_impl.hpp:102
 
virtual bool matches_any_publishers(const rmw_gid_t *id) const =0
 
mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)
Definition: intra_process_manager_impl.hpp:192
 
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)=0
 
bool matches_any_publishers(const rmw_gid_t *id) const
Definition: intra_process_manager_impl.hpp:233
 
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
 
void add_publisher(uint64_t id, publisher::PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)
Definition: intra_process_manager_impl.hpp:117
 
virtual void add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription)=0
 
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
 
void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
Definition: intra_process_manager_impl.hpp:158
 
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)=0
 
Definition: intra_process_manager_impl.hpp:86
 
IntraProcessManagerImplBase::SharedPtr create_default_impl()
 
IntraProcessManagerImplBase()=default
 
virtual void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)=0
 
void remove_publisher(uint64_t intra_process_publisher_id)
Definition: intra_process_manager_impl.hpp:134