15 #ifndef RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_ 16 #define RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_ 30 #include <unordered_map> 43 namespace intra_process_manager
62 PublisherBase::WeakPtr publisher,
63 mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
69 virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
71 uint64_t intra_process_publisher_id,
72 uint64_t & message_seq) = 0;
77 virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
79 uint64_t intra_process_publisher_id,
80 uint64_t message_sequence_number,
81 uint64_t requesting_subscriptions_intra_process_id,
94 template<
typename Allocator = std::allocator<
void>>
104 subscriptions_[id] = subscription;
105 subscription_ids_by_topic_[fixed_size_string(subscription->get_topic_name())].insert(
id);
111 subscriptions_.erase(intra_process_subscription_id);
112 for (
auto & pair : subscription_ids_by_topic_) {
113 pair.second.erase(intra_process_subscription_id);
117 for (
auto & publisher_pair : publishers_) {
118 for (
auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) {
119 sub_pair.second.erase(intra_process_subscription_id);
126 PublisherBase::WeakPtr publisher,
127 mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
130 publishers_[id].publisher = publisher;
135 publishers_[id].sequence_number.store(0);
137 publishers_[id].buffer = mrb;
138 publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
144 publishers_.erase(intra_process_publisher_id);
148 mapped_ring_buffer::MappedRingBufferBase::SharedPtr
150 uint64_t intra_process_publisher_id,
151 uint64_t & message_seq)
154 auto it = publishers_.find(intra_process_publisher_id);
155 if (it == publishers_.end()) {
156 throw std::runtime_error(
"get_publisher_info_for_id called with invalid publisher id");
158 PublisherInfo & info = it->second;
160 message_seq = info.sequence_number.fetch_add(1);
169 auto it = publishers_.find(intra_process_publisher_id);
170 if (it == publishers_.end()) {
171 throw std::runtime_error(
"store_intra_process_message called with invalid publisher id");
173 PublisherInfo & info = it->second;
174 auto publisher = info.publisher.lock();
180 auto & destined_subscriptions =
181 subscription_ids_by_topic_[fixed_size_string(publisher->get_topic_name())];
183 if (info.target_subscriptions_by_message_sequence.count(message_seq) == 0) {
184 info.target_subscriptions_by_message_sequence.emplace(
187 info.target_subscriptions_by_message_sequence[message_seq].clear();
190 destined_subscriptions.begin(), destined_subscriptions.end(),
193 info.target_subscriptions_by_message_sequence[message_seq],
195 info.target_subscriptions_by_message_sequence[message_seq].end()
200 mapped_ring_buffer::MappedRingBufferBase::SharedPtr
202 uint64_t intra_process_publisher_id,
203 uint64_t message_sequence_number,
204 uint64_t requesting_subscriptions_intra_process_id,
209 PublisherInfo * info;
211 auto it = publishers_.find(intra_process_publisher_id);
212 if (it == publishers_.end()) {
221 auto it = info->target_subscriptions_by_message_sequence.
find(message_sequence_number);
222 if (it == info->target_subscriptions_by_message_sequence.end()) {
226 target_subs = &it->second;
230 target_subs->
begin(), target_subs->
end(),
231 requesting_subscriptions_intra_process_id);
232 if (it == target_subs->
end()) {
236 target_subs->
erase(it);
238 size = target_subs->
size();
245 for (
auto & publisher_pair : publishers_) {
246 auto publisher = publisher_pair.second.publisher.lock();
250 if (*publisher.get() == id) {
260 auto publisher_it = publishers_.find(intra_process_publisher_id);
261 if (publisher_it == publishers_.end()) {
265 auto publisher = publisher_it->second.publisher.lock();
270 subscription_ids_by_topic_.find(fixed_size_string(publisher->get_topic_name()));
271 if (sub_map_it == subscription_ids_by_topic_.end()) {
275 return sub_map_it->second.size();
284 fixed_size_string(
const char * str)
const 288 if (size > ret.
size()) {
294 struct strcmp_wrapper
306 RebindAlloc<uint64_t> uint64_allocator;
310 uint64_t, SubscriptionBase::WeakPtr,
312 RebindAlloc<std::pair<const uint64_t, SubscriptionBase::WeakPtr>>>;
318 RebindAlloc<std::pair<const FixedSizeString, AllocSet>>>;
328 PublisherInfo() =
default;
330 PublisherBase::WeakPtr publisher;
332 mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
336 std::hash<uint64_t>, std::equal_to<uint64_t>,
337 RebindAlloc<std::pair<const uint64_t, AllocSet>>>;
338 TargetSubscriptionsMap target_subscriptions_by_message_sequence;
342 uint64_t, PublisherInfo,
343 std::hash<uint64_t>, std::equal_to<uint64_t>,
344 RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>;
352 IntraProcessManagerImplBase::SharedPtr
358 #endif // RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_ void add_publisher(uint64_t id, PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)
Definition: intra_process_manager_impl.hpp:124
mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)
Definition: intra_process_manager_impl.hpp:149
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)
Definition: intra_process_manager_impl.hpp:201
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)=0
This header provides the get_node_topics_interface() template function.
Definition: allocator_common.hpp:24
virtual bool matches_any_publishers(const rmw_gid_t *id) const =0
virtual void remove_publisher(uint64_t intra_process_publisher_id)=0
void add_subscription(uint64_t id, SubscriptionBase::SharedPtr subscription)
Definition: intra_process_manager_impl.hpp:102
virtual ~IntraProcessManagerImplBase()=default
IntraProcessManagerImplBase()=default
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)=0
virtual void add_publisher(uint64_t id, PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)=0
virtual size_t get_subscription_count(uint64_t intra_process_publisher_id) const =0
virtual void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)=0
Definition: intra_process_manager_impl.hpp:95
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
void remove_publisher(uint64_t intra_process_publisher_id)
Definition: intra_process_manager_impl.hpp:142
Definition: intra_process_manager_impl.hpp:46
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
virtual void add_subscription(uint64_t id, SubscriptionBase::SharedPtr subscription)=0
size_t get_subscription_count(uint64_t intra_process_publisher_id) const
Definition: intra_process_manager_impl.hpp:258
bool matches_any_publishers(const rmw_gid_t *id) const
Definition: intra_process_manager_impl.hpp:243
virtual void remove_subscription(uint64_t intra_process_subscription_id)=0
void remove_subscription(uint64_t intra_process_subscription_id)
Definition: intra_process_manager_impl.hpp:109
IntraProcessManagerImplBase::SharedPtr create_default_impl()
void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
Definition: intra_process_manager_impl.hpp:166