15 #ifndef RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_ 16 #define RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_ 28 #include <unordered_map> 39 namespace intra_process_manager
51 add_subscription(uint64_t
id, subscription::SubscriptionBase::SharedPtr subscription) = 0;
57 publisher::PublisherBase::WeakPtr publisher,
58 mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
64 virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
66 uint64_t intra_process_publisher_id,
67 uint64_t & message_seq) = 0;
72 virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr
74 uint64_t message_sequence_number,
75 uint64_t requesting_subscriptions_intra_process_id,
85 template<
typename Allocator = std::allocator<
void>>
95 subscriptions_[id] = subscription;
98 subscription_ids_by_topic_[subscription->get_topic_name()].insert(
id);
104 subscriptions_.erase(intra_process_subscription_id);
105 for (
auto & pair : subscription_ids_by_topic_) {
106 pair.second.erase(intra_process_subscription_id);
110 for (
auto & publisher_pair : publishers_) {
111 for (
auto & sub_pair : publisher_pair.second.target_subscriptions_by_message_sequence) {
112 sub_pair.second.erase(intra_process_subscription_id);
118 publisher::PublisherBase::WeakPtr publisher,
119 mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb,
122 publishers_[id].publisher = publisher;
124 if (size > std::numeric_limits<uint64_t>::max()) {
125 throw std::invalid_argument(
"the calculated buffer size is too large");
127 publishers_[id].sequence_number.store(0);
129 publishers_[id].buffer = mrb;
130 publishers_[id].target_subscriptions_by_message_sequence.reserve(size);
136 publishers_.erase(intra_process_publisher_id);
140 mapped_ring_buffer::MappedRingBufferBase::SharedPtr
142 uint64_t intra_process_publisher_id,
143 uint64_t & message_seq)
145 std::lock_guard<std::mutex> lock(runtime_mutex_);
146 auto it = publishers_.find(intra_process_publisher_id);
147 if (it == publishers_.end()) {
148 throw std::runtime_error(
"get_publisher_info_for_id called with invalid publisher id");
150 PublisherInfo & info = it->second;
152 message_seq = info.sequence_number.fetch_add(1);
160 std::lock_guard<std::mutex> lock(runtime_mutex_);
161 auto it = publishers_.find(intra_process_publisher_id);
162 if (it == publishers_.end()) {
163 throw std::runtime_error(
"store_intra_process_message called with invalid publisher id");
165 PublisherInfo & info = it->second;
166 auto publisher = info.publisher.lock();
168 throw std::runtime_error(
"publisher has unexpectedly gone out of scope");
172 auto & destined_subscriptions = subscription_ids_by_topic_[publisher->get_topic_name()];
174 if (info.target_subscriptions_by_message_sequence.count(message_seq) == 0) {
175 info.target_subscriptions_by_message_sequence.emplace(
176 message_seq, AllocSet(std::less<uint64_t>(), uint64_allocator));
178 info.target_subscriptions_by_message_sequence[message_seq].clear();
181 destined_subscriptions.begin(), destined_subscriptions.end(),
184 info.target_subscriptions_by_message_sequence[message_seq],
186 info.target_subscriptions_by_message_sequence[message_seq].end()
191 mapped_ring_buffer::MappedRingBufferBase::SharedPtr
193 uint64_t message_sequence_number,
194 uint64_t requesting_subscriptions_intra_process_id,
198 std::lock_guard<std::mutex> lock(runtime_mutex_);
199 PublisherInfo * info;
201 auto it = publishers_.find(intra_process_publisher_id);
202 if (it == publishers_.end()) {
209 AllocSet * target_subs;
211 auto it = info->target_subscriptions_by_message_sequence.find(message_sequence_number);
212 if (it == info->target_subscriptions_by_message_sequence.end()) {
216 target_subs = &it->second;
220 target_subs->begin(), target_subs->end(),
221 requesting_subscriptions_intra_process_id);
222 if (it == target_subs->end()) {
226 target_subs->erase(it);
228 size = target_subs->size();
235 for (
auto & publisher_pair : publishers_) {
236 auto publisher = publisher_pair.second.publisher.lock();
240 if (*publisher.get() == id) {
251 using RebindAlloc =
typename std::allocator_traits<Allocator>::template rebind_alloc<T>;
253 RebindAlloc<uint64_t> uint64_allocator;
255 using AllocSet = std::set<uint64_t, std::less<uint64_t>, RebindAlloc<uint64_t>>;
256 using SubscriptionMap = std::unordered_map<uint64_t, subscription::SubscriptionBase::WeakPtr,
257 std::hash<uint64_t>, std::equal_to<uint64_t>,
258 RebindAlloc<std::pair<const uint64_t, subscription::SubscriptionBase::WeakPtr>>>;
260 struct strcmp_wrapper :
public std::binary_function<const char *, const char *, bool>
263 operator()(
const char * lhs,
const char * rhs)
const 265 return std::strcmp(lhs, rhs) < 0;
268 using IDTopicMap = std::map<
272 RebindAlloc<std::pair<const std::string, AllocSet>>>;
274 SubscriptionMap subscriptions_;
276 IDTopicMap subscription_ids_by_topic_;
282 PublisherInfo() =
default;
284 publisher::PublisherBase::WeakPtr publisher;
285 std::atomic<uint64_t> sequence_number;
286 mapped_ring_buffer::MappedRingBufferBase::SharedPtr buffer;
288 using TargetSubscriptionsMap = std::unordered_map<uint64_t, AllocSet,
289 std::hash<uint64_t>, std::equal_to<uint64_t>,
290 RebindAlloc<std::pair<const uint64_t, AllocSet>>>;
291 TargetSubscriptionsMap target_subscriptions_by_message_sequence;
294 using PublisherMap = std::unordered_map<uint64_t, PublisherInfo,
295 std::hash<uint64_t>, std::equal_to<uint64_t>,
296 RebindAlloc<std::pair<const uint64_t, PublisherInfo>>>;
298 PublisherMap publishers_;
300 std::mutex runtime_mutex_;
304 IntraProcessManagerImplBase::SharedPtr
310 #endif // RCLCPP__INTRA_PROCESS_MANAGER_IMPL_HPP_ mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)
Definition: intra_process_manager_impl.hpp:141
#define RCLCPP_DISABLE_COPY(...)
Definition: macros.hpp:26
mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)
Definition: intra_process_manager_impl.hpp:192
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr get_publisher_info_for_id(uint64_t intra_process_publisher_id, uint64_t &message_seq)=0
Definition: allocator_common.hpp:24
virtual bool matches_any_publishers(const rmw_gid_t *id) const =0
virtual void remove_publisher(uint64_t intra_process_publisher_id)=0
void add_publisher(uint64_t id, publisher::PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)
Definition: intra_process_manager_impl.hpp:117
IntraProcessManagerImplBase()=default
virtual mapped_ring_buffer::MappedRingBufferBase::SharedPtr take_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_sequence_number, uint64_t requesting_subscriptions_intra_process_id, size_t &size)=0
void add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription)
Definition: intra_process_manager_impl.hpp:93
virtual void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)=0
virtual void add_publisher(uint64_t id, publisher::PublisherBase::WeakPtr publisher, mapped_ring_buffer::MappedRingBufferBase::SharedPtr mrb, size_t size)=0
Definition: intra_process_manager_impl.hpp:86
#define RCLCPP_SMART_PTR_DEFINITIONS_NOT_COPYABLE(...)
Definition: macros.hpp:51
void remove_publisher(uint64_t intra_process_publisher_id)
Definition: intra_process_manager_impl.hpp:134
Definition: intra_process_manager_impl.hpp:42
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
~IntraProcessManagerImplBase()=default
bool matches_any_publishers(const rmw_gid_t *id) const
Definition: intra_process_manager_impl.hpp:233
virtual void remove_subscription(uint64_t intra_process_subscription_id)=0
void remove_subscription(uint64_t intra_process_subscription_id)
Definition: intra_process_manager_impl.hpp:102
IntraProcessManagerImplBase::SharedPtr create_default_impl()
virtual void add_subscription(uint64_t id, subscription::SubscriptionBase::SharedPtr subscription)=0
void store_intra_process_message(uint64_t intra_process_publisher_id, uint64_t message_seq)
Definition: intra_process_manager_impl.hpp:158