15 #ifndef RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_ 16 #define RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_ 28 #include <unordered_map> 39 namespace intra_process_manager
58 PublisherBase::WeakPtr publisher,
59 mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
65 virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
67 uint64_t intra_process_publisher_id,
68 uint64_t & message_seq) = 0;
73 virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
75 uint64_t intra_process_publisher_id,
76 uint64_t message_sequence_number,
77 uint64_t requesting_subscriptions_intra_process_id,
87 template<
typename Allocator = std::allocator<
void>>
97 subscriptions_[id] = subscription;
100 subscription_ids_by_topic_[subscription->get_topic_name()].
insert(
id);
106 subscriptions_.
erase(intra_process_subscription_id);
107 for (
auto & pair : subscription_ids_by_topic_) {
108 pair.second.erase(intra_process_subscription_id);
112 for (
auto & publisher_pair : publishers_) {
113 for (
auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) {
114 sub_pair.second.erase(intra_process_subscription_id);
121 PublisherBase::WeakPtr publisher,
122 mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
125 publishers_[id].publisher = publisher;
130 publishers_[id].sequence_number.store(0);
132 publishers_[id].buffer = mrb;
133 publishers_[id].target_subscriptions_by_message_sequence.
reserve(size);
139 publishers_.
erase(intra_process_publisher_id);
143 mapped_ring_buffer::MappedRingBufferBase::SharedPtr
145 uint64_t intra_process_publisher_id,
146 uint64_t & message_seq)
149 auto it = publishers_.
find(intra_process_publisher_id);
150 if (it == publishers_.
end()) {
151 throw std::runtime_error(
"get_publisher_info_for_id called with invalid publisher id");
153 PublisherInfo & info = it->second;
155 message_seq = info.sequence_number.fetch_add(1);
164 auto it = publishers_.
find(intra_process_publisher_id);
165 if (it == publishers_.
end()) {
166 throw std::runtime_error(
"store_intra_process_message called with invalid publisher id");
168 PublisherInfo & info = it->second;
169 auto publisher = info.publisher.lock();
175 auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()];
177 if (info.target_subscriptions_by_message_sequence.count(message_seq) == 0) {
178 info.target_subscriptions_by_message_sequence.
emplace(
181 info.target_subscriptions_by_message_sequence[message_seq].clear();
184 destined_subscriptions.begin(), destined_subscriptions.end(),
187 info.target_subscriptions_by_message_sequence[message_seq],
189 info.target_subscriptions_by_message_sequence[message_seq].end()
194 mapped_ring_buffer::MappedRingBufferBase::SharedPtr
196 uint64_t intra_process_publisher_id,
197 uint64_t message_sequence_number,
198 uint64_t requesting_subscriptions_intra_process_id,
203 PublisherInfo * info;
205 auto it = publishers_.
find(intra_process_publisher_id);
206 if (it == publishers_.
end()) {
215 auto it = info->target_subscriptions_by_message_sequence.
find(message_sequence_number);
216 if (it == info->target_subscriptions_by_message_sequence.end()) {
220 target_subs = &it->second;
224 target_subs->
begin(), target_subs->
end(),
225 requesting_subscriptions_intra_process_id);
226 if (it == target_subs->
end()) {
230 target_subs->
erase(it);
232 size = target_subs->
size();
239 for (
auto & publisher_pair : publishers_) {
240 auto publisher = publisher_pair.second.publisher.lock();
244 if (*publisher.get() == id) {
257 RebindAlloc<uint64_t> uint64_allocator;
261 uint64_t, SubscriptionBase::WeakPtr,
263 RebindAlloc<std::pair<const uint64_t, SubscriptionBase::WeakPtr>>>;
268 operator()(
const char * lhs,
const char * rhs)
const 277 RebindAlloc<std::pair<const char * const, AllocSet>>>;
279 SubscriptionMap subscriptions_;
281 IDTopicMap subscription_ids_by_topic_;
287 PublisherInfo() =
default;
289 PublisherBase::WeakPtr publisher;
291 mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
296 RebindAlloc<std::pair<const uint64_t, AllocSet>>>;
297 TargetSubscriptionsMap target_subscriptions_by_message_sequence;
301 uint64_t, PublisherInfo,
303 RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>;
305 PublisherMap publishers_;
311 IntraProcessManagerImplBase::SharedPtr
317 #endif // RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
virtual void add_subscription(uint64_t id, SubscriptionBase::SharedPtr subscription)=0
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
~IntraProcessManagerImplBase()=default
Definition: allocator_common.hpp:24
~IntraProcessManagerImpl()=default
Definition: intra_process_manager_impl.hpp:42
virtual void remove_publisher(uint64_t intra_process_publisher_id)=0
virtual void remove_subscription(uint64_t intra_process_subscription_id)=0
void add_subscription(uint64_t id, SubscriptionBase::SharedPtr subscription)
Definition: intra_process_manager_impl.hpp:95
mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)
Definition: intra_process_manager_impl.hpp:144
void add_publisher(uint64_t id, PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)
Definition: intra_process_manager_impl.hpp:119
void remove_subscription(uint64_t intra_process_subscription_id)
Definition: intra_process_manager_impl.hpp:104
virtual bool matches_any_publishers(const rmw_gid_t *id) const =0
mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)
Definition: intra_process_manager_impl.hpp:195
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)=0
bool matches_any_publishers(const rmw_gid_t *id) const
Definition: intra_process_manager_impl.hpp:237
virtual void add_publisher(uint64_t id, PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)=0
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
IntraProcessManagerImpl()=default
void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
Definition: intra_process_manager_impl.hpp:161
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)=0
Definition: intra_process_manager_impl.hpp:88
IntraProcessManagerImplBase::SharedPtr create_default_impl()
IntraProcessManagerImplBase()=default
virtual void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)=0
void remove_publisher(uint64_t intra_process_publisher_id)
Definition: intra_process_manager_impl.hpp:137