15 #ifndef RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_ 16 #define RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_ 28 #include <unordered_map> 39 namespace intra_process_manager
51 add_subscription(uint64_t
id, subscription::SubscriptionBase::SharedPtr subscription) = 0;
57 publisher::PublisherBase::WeakPtr publisher,
58 mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
64 virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
66 uint64_t intra_process_publisher_id,
67 uint64_t & message_seq) = 0;
72 virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
74 uint64_t message_sequence_number,
75 uint64_t requesting_subscriptions_intra_process_id,
85 template<
typename Allocator = std::allocator<
void>>
95 subscriptions_[id] = subscription;
98 subscription_ids_by_topic_[subscription->get_topic_name()].insert(
id);
104 subscriptions_.erase(intra_process_subscription_id);
105 for (
auto & pair : subscription_ids_by_topic_) {
106 pair.second.erase(intra_process_subscription_id);
110 for (
auto & publisher_pair : publishers_) {
111 for (
auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) {
112 sub_pair.second.erase(intra_process_subscription_id);
118 publisher::PublisherBase::WeakPtr publisher,
119 mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
122 publishers_[id].publisher = publisher;
127 publishers_[id].sequence_number.store(0);
129 publishers_[id].buffer = mrb;
130 publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
136 publishers_.erase(intra_process_publisher_id);
140 mapped_ring_buffer::MappedRingBufferBase::SharedPtr
142 uint64_t intra_process_publisher_id,
143 uint64_t & message_seq)
146 auto it = publishers_.find(intra_process_publisher_id);
147 if (it == publishers_.end()) {
148 throw std::runtime_error(
"get_publisher_info_for_id called with invalid publisher id");
150 PublisherInfo & info = it->second;
152 message_seq = info.sequence_number.fetch_add(1);
161 auto it = publishers_.find(intra_process_publisher_id);
162 if (it == publishers_.end()) {
163 throw std::runtime_error(
"store_intra_process_message called with invalid publisher id");
165 PublisherInfo & info = it->second;
166 auto publisher = info.publisher.lock();
172 auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()];
174 if (info.target_subscriptions_by_message_sequence.count(message_seq) == 0) {
175 info.target_subscriptions_by_message_sequence.emplace(
178 info.target_subscriptions_by_message_sequence[message_seq].clear();
181 destined_subscriptions.begin(), destined_subscriptions.end(),
184 info.target_subscriptions_by_message_sequence[message_seq],
186 info.target_subscriptions_by_message_sequence[message_seq].end()
191 mapped_ring_buffer::MappedRingBufferBase::SharedPtr
193 uint64_t message_sequence_number,
194 uint64_t requesting_subscriptions_intra_process_id,
199 PublisherInfo * info;
201 auto it = publishers_.find(intra_process_publisher_id);
202 if (it == publishers_.end()) {
211 auto it = info->target_subscriptions_by_message_sequence.
find(message_sequence_number);
212 if (it == info->target_subscriptions_by_message_sequence.end()) {
216 target_subs = &it->second;
220 target_subs->
begin(), target_subs->
end(),
221 requesting_subscriptions_intra_process_id);
222 if (it == target_subs->
end()) {
226 target_subs->
erase(it);
228 size = target_subs->
size();
235 for (
auto & publisher_pair : publishers_) {
236 auto publisher = publisher_pair.second.publisher.lock();
240 if (*publisher.get() == id) {
253 RebindAlloc<uint64_t> uint64_allocator;
258 RebindAlloc<std::pair<const uint64_t, subscription::SubscriptionBase::WeakPtr>>>;
263 operator()(
const char * lhs,
const char * rhs)
const 272 RebindAlloc<std::pair<const std::string, AllocSet>>>;
282 PublisherInfo() =
default;
284 publisher::PublisherBase::WeakPtr publisher;
286 mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
289 std::hash<uint64_t>, std::equal_to<uint64_t>,
290 RebindAlloc<std::pair<const uint64_t, AllocSet>>>;
291 TargetSubscriptionsMap target_subscriptions_by_message_sequence;
295 std::hash<uint64_t>, std::equal_to<uint64_t>,
296 RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>;
304 IntraProcessManagerImplBase::SharedPtr
310 #endif // RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
~IntraProcessManagerImplBase()=default
void add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription)
Definition: intra_process_manager_impl.hpp:93
Definition: allocator_common.hpp:24
virtual void add_publisher(uint64_t id, publisher::PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)=0
Definition: intra_process_manager_impl.hpp:42
virtual void remove_publisher(uint64_t intra_process_publisher_id)=0
virtual void remove_subscription(uint64_t intra_process_subscription_id)=0
mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)
Definition: intra_process_manager_impl.hpp:141
void remove_subscription(uint64_t intra_process_subscription_id)
Definition: intra_process_manager_impl.hpp:102
virtual bool matches_any_publishers(const rmw_gid_t *id) const =0
mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)
Definition: intra_process_manager_impl.hpp:192
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)=0
bool matches_any_publishers(const rmw_gid_t *id) const
Definition: intra_process_manager_impl.hpp:233
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
void add_publisher(uint64_t id, publisher::PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)
Definition: intra_process_manager_impl.hpp:117
virtual void add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription)=0
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
Definition: intra_process_manager_impl.hpp:158
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)=0
Definition: intra_process_manager_impl.hpp:86
IntraProcessManagerImplBase::SharedPtr create_default_impl()
IntraProcessManagerImplBase()=default
virtual void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)=0
void remove_publisher(uint64_t intra_process_publisher_id)
Definition: intra_process_manager_impl.hpp:134