15 #ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
16 #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
19 #include <condition_variable>
22 #include <unordered_set>
23 #include <unordered_map>
25 #include "fastcdr/FastBuffer.h"
27 #include "fastdds/dds/core/status/PublicationMatchedStatus.hpp"
28 #include "fastdds/dds/core/status/SubscriptionMatchedStatus.hpp"
29 #include "fastdds/dds/publisher/DataWriter.hpp"
30 #include "fastdds/dds/publisher/DataWriterListener.hpp"
31 #include "fastdds/dds/subscriber/DataReader.hpp"
32 #include "fastdds/dds/subscriber/DataReaderListener.hpp"
33 #include "fastdds/dds/subscriber/SampleInfo.hpp"
34 #include "fastdds/dds/topic/TypeSupport.hpp"
36 #include "fastdds/rtps/common/Guid.h"
37 #include "fastdds/rtps/common/InstanceHandle.h"
38 #include "fastdds/rtps/common/SampleIdentity.h"
88 eprosima::fastrtps::rtps::GUID_t,
99 eprosima::fastdds::dds::DataWriter * ,
100 const eprosima::fastdds::dds::PublicationMatchedStatus & info)
final
103 if (info.current_count_change == 1) {
104 subscriptions_.insert(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle));
105 }
else if (info.current_count_change == -1) {
106 eprosima::fastrtps::rtps::GUID_t erase_endpoint_guid =
107 eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle);
108 subscriptions_.erase(erase_endpoint_guid);
109 auto endpoint = clients_endpoints_.find(erase_endpoint_guid);
110 if (endpoint != clients_endpoints_.end()) {
111 clients_endpoints_.erase(endpoint->second);
112 clients_endpoints_.erase(erase_endpoint_guid);
120 template<
class Rep,
class Period>
123 const eprosima::fastrtps::rtps::GUID_t & guid,
128 return subscriptions_.find(guid) != subscriptions_.end();
132 return cv_.
wait_for(lock, rel_time, guid_is_present);
137 const eprosima::fastrtps::rtps::GUID_t & guid)
142 if (clients_endpoints_.find(guid) == clients_endpoints_.end()) {
157 auto endpoint = clients_endpoints_.find(endpointGuid);
158 if (endpoint != clients_endpoints_.end()) {
159 clients_endpoints_.erase(endpoint->second);
160 clients_endpoints_.erase(endpointGuid);
165 const eprosima::fastrtps::rtps::GUID_t & readerGuid,
166 const eprosima::fastrtps::rtps::GUID_t & writerGuid)
169 clients_endpoints_.emplace(readerGuid, writerGuid);
170 clients_endpoints_.emplace(writerGuid, readerGuid);
175 subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
176 clients_endpoints_map_t clients_endpoints_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
184 : info_(info), list_has_data_(false),
185 conditionMutex_(nullptr), conditionVariable_(nullptr)
191 eprosima::fastdds::dds::DataReader * ,
192 const eprosima::fastdds::dds::SubscriptionMatchedStatus & info)
final
194 if (info.current_count_change == -1) {
196 eprosima::fastrtps::rtps::iHandle2GUID(info.last_publication_handle));
206 request.
buffer_ =
new eprosima::fastcdr::FastBuffer();
212 if (reader->take_next_sample(&data, &request.
sample_info_) == ReturnCode_t::RETCODE_OK) {
216 const eprosima::fastrtps::rtps::GUID_t & reader_guid =
217 request.
sample_info_.related_sample_identity.writer_guid();
218 if (reader_guid != eprosima::fastrtps::rtps::GUID_t::unknown() ) {
223 const eprosima::fastrtps::rtps::GUID_t & writer_guid =
229 if (conditionMutex_ !=
nullptr) {
231 list.push_back(request);
235 list_has_data_.store(
true);
237 conditionVariable_->notify_one();
239 list.push_back(request);
240 list_has_data_.store(
true);
252 if (conditionMutex_ !=
nullptr) {
255 request = list.front();
257 list_has_data_.store(!list.empty());
261 request = list.front();
263 list_has_data_.store(!list.empty());
274 conditionMutex_ = conditionMutex;
275 conditionVariable_ = conditionVariable;
282 conditionMutex_ =
nullptr;
283 conditionVariable_ =
nullptr;
289 return list_has_data_.load();
296 std::atomic_bool list_has_data_;
297 std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
301 #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_