rmw_fastrtps_shared_cpp  master
Code shared on static and dynamic type support of rmw_fastrtps_cpp.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
custom_service_info.hpp
Go to the documentation of this file.
1 // Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
16 #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
17 
18 #include <atomic>
19 #include <condition_variable>
20 #include <list>
21 #include <mutex>
22 #include <unordered_set>
23 
24 #include "fastcdr/FastBuffer.h"
25 
26 #include "fastrtps/participant/Participant.h"
27 #include "fastrtps/publisher/Publisher.h"
28 #include "fastrtps/publisher/PublisherListener.h"
29 #include "fastrtps/subscriber/Subscriber.h"
30 #include "fastrtps/subscriber/SubscriberListener.h"
31 #include "fastrtps/subscriber/SampleInfo.h"
32 
34 
37 
38 class ServiceListener;
39 class ServicePubListener;
40 
41 typedef struct CustomServiceInfo
42 {
47  eprosima::fastrtps::Subscriber * request_subscriber_;
48  eprosima::fastrtps::Publisher * response_publisher_;
51  eprosima::fastrtps::Participant * participant_;
54 
55 typedef struct CustomServiceRequest
56 {
57  eprosima::fastrtps::rtps::SampleIdentity sample_identity_;
58  eprosima::fastcdr::FastBuffer * buffer_;
59  eprosima::fastrtps::SampleInfo_t sample_info_ {};
60 
62  : buffer_(nullptr) {}
64 
65 class ServiceListener : public eprosima::fastrtps::SubscriberListener
66 {
67 public:
69  : info_(info), list_has_data_(false),
70  conditionMutex_(nullptr), conditionVariable_(nullptr)
71  {
72  (void)info_;
73  }
74 
75 
76  void
77  onNewDataMessage(eprosima::fastrtps::Subscriber * sub)
78  {
79  assert(sub);
80 
81  CustomServiceRequest request;
82  request.buffer_ = new eprosima::fastcdr::FastBuffer();
83 
85  data.is_cdr_buffer = true;
86  data.data = request.buffer_;
87  data.impl = nullptr; // not used when is_cdr_buffer is true
88  if (sub->takeNextData(&data, &request.sample_info_)) {
89  if (eprosima::fastrtps::rtps::ALIVE == request.sample_info_.sampleKind) {
90  request.sample_identity_ = request.sample_info_.sample_identity;
91  // Use response subscriber guid (on related_sample_identity) when present.
92  const eprosima::fastrtps::rtps::GUID_t & reader_guid =
93  request.sample_info_.related_sample_identity.writer_guid();
94  if (reader_guid != eprosima::fastrtps::rtps::GUID_t::unknown() ) {
95  request.sample_identity_.writer_guid() = reader_guid;
96  }
97 
98  std::lock_guard<std::mutex> lock(internalMutex_);
99 
100  if (conditionMutex_ != nullptr) {
101  std::unique_lock<std::mutex> clock(*conditionMutex_);
102  list.push_back(request);
103  // the change to list_has_data_ needs to be mutually exclusive with
104  // rmw_wait() which checks hasData() and decides if wait() needs to
105  // be called
106  list_has_data_.store(true);
107  clock.unlock();
108  conditionVariable_->notify_one();
109  } else {
110  list.push_back(request);
111  list_has_data_.store(true);
112  }
113  }
114  }
115  }
116 
119  {
120  std::lock_guard<std::mutex> lock(internalMutex_);
121  CustomServiceRequest request;
122 
123  if (conditionMutex_ != nullptr) {
124  std::unique_lock<std::mutex> clock(*conditionMutex_);
125  if (!list.empty()) {
126  request = list.front();
127  list.pop_front();
128  list_has_data_.store(!list.empty());
129  }
130  } else {
131  if (!list.empty()) {
132  request = list.front();
133  list.pop_front();
134  list_has_data_.store(!list.empty());
135  }
136  }
137 
138  return request;
139  }
140 
141  void
142  attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable)
143  {
144  std::lock_guard<std::mutex> lock(internalMutex_);
145  conditionMutex_ = conditionMutex;
146  conditionVariable_ = conditionVariable;
147  }
148 
149  void
151  {
152  std::lock_guard<std::mutex> lock(internalMutex_);
153  conditionMutex_ = nullptr;
154  conditionVariable_ = nullptr;
155  }
156 
157  bool
159  {
160  return list_has_data_.load();
161  }
162 
163 private:
164  CustomServiceInfo * info_;
165  std::mutex internalMutex_;
166  std::list<CustomServiceRequest> list RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
167  std::atomic_bool list_has_data_;
168  std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
169  std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
170 };
171 
172 class ServicePubListener : public eprosima::fastrtps::PublisherListener
173 {
174 public:
175  ServicePubListener() = default;
176 
177  template<class Rep, class Period>
179  const eprosima::fastrtps::rtps::GUID_t & guid,
180  const std::chrono::duration<Rep, Period> & rel_time)
181  {
182  auto guid_is_present = [this, guid]() RCPPUTILS_TSA_REQUIRES(mutex_)->bool
183  {
184  return subscriptions_.find(guid) != subscriptions_.end();
185  };
186 
187  std::unique_lock<std::mutex> lock(mutex_);
188  return cv_.wait_for(lock, rel_time, guid_is_present);
189  }
190 
192  eprosima::fastrtps::Publisher * pub,
193  eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
194  {
195  (void) pub;
196  std::lock_guard<std::mutex> lock(mutex_);
197  if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
198  subscriptions_.insert(matchingInfo.remoteEndpointGuid);
199  } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
200  subscriptions_.erase(matchingInfo.remoteEndpointGuid);
201  } else {
202  return;
203  }
204  cv_.notify_all();
205  }
206 
207 private:
208  using subscriptions_set_t =
209  std::unordered_set<eprosima::fastrtps::rtps::GUID_t,
211 
212  std::mutex mutex_;
213  subscriptions_set_t subscriptions_ RCPPUTILS_TSA_GUARDED_BY(mutex_);
215 };
216 
217 #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
CustomServiceRequest
Definition: custom_service_info.hpp:55
rmw_fastrtps_shared_cpp::SerializedData::impl
const void * impl
Definition: TypeSupport.hpp:38
std::list
std::unordered_set
rmw_fastrtps_shared_cpp::TypeSupport
Definition: TypeSupport.hpp:41
rmw_fastrtps_shared_cpp::SerializedData::is_cdr_buffer
bool is_cdr_buffer
Definition: TypeSupport.hpp:36
std::chrono::duration
CustomServiceInfo::typesupport_identifier_
const char * typesupport_identifier_
Definition: custom_service_info.hpp:52
std::lock_guard
TypeSupport.hpp
ServicePubListener::wait_for_subscription
bool wait_for_subscription(const eprosima::fastrtps::rtps::GUID_t &guid, const std::chrono::duration< Rep, Period > &rel_time)
Definition: custom_service_info.hpp:178
CustomServiceRequest::CustomServiceRequest
CustomServiceRequest()
Definition: custom_service_info.hpp:61
RCPPUTILS_TSA_REQUIRES
#define RCPPUTILS_TSA_REQUIRES(...)
CustomServiceRequest::sample_identity_
eprosima::fastrtps::rtps::SampleIdentity sample_identity_
Definition: custom_service_info.hpp:57
ServicePubListener::onPublicationMatched
void onPublicationMatched(eprosima::fastrtps::Publisher *pub, eprosima::fastrtps::rtps::MatchingInfo &matchingInfo)
Definition: custom_service_info.hpp:191
ServiceListener
Definition: custom_service_info.hpp:65
CustomServiceInfo::response_publisher_
eprosima::fastrtps::Publisher * response_publisher_
Definition: custom_service_info.hpp:48
rmw_fastrtps_shared_cpp::hash_fastrtps_guid
Definition: guid_utils.hpp:59
ServiceListener::hasData
bool hasData()
Definition: custom_service_info.hpp:158
CustomServiceRequest::sample_info_
eprosima::fastrtps::SampleInfo_t sample_info_
Definition: custom_service_info.hpp:59
ServicePubListener::ServicePubListener
ServicePubListener()=default
CustomServiceInfo::request_type_support_
rmw_fastrtps_shared_cpp::TypeSupport * request_type_support_
Definition: custom_service_info.hpp:43
CustomServiceInfo::response_type_support_impl_
const void * response_type_support_impl_
Definition: custom_service_info.hpp:46
CustomServiceInfo
Definition: custom_service_info.hpp:41
rmw_fastrtps_shared_cpp::SerializedData
Definition: TypeSupport.hpp:34
rmw_fastrtps_shared_cpp::SerializedData::data
void * data
Definition: TypeSupport.hpp:37
std::unique_lock
CustomServiceInfo::response_type_support_
rmw_fastrtps_shared_cpp::TypeSupport * response_type_support_
Definition: custom_service_info.hpp:45
CustomServiceInfo::request_type_support_impl_
const void * request_type_support_impl_
Definition: custom_service_info.hpp:44
guid_utils.hpp
CustomServiceInfo::listener_
ServiceListener * listener_
Definition: custom_service_info.hpp:49
CustomServiceRequest
struct CustomServiceRequest CustomServiceRequest
CustomServiceInfo::participant_
eprosima::fastrtps::Participant * participant_
Definition: custom_service_info.hpp:51
std::condition_variable::wait_for
T wait_for(T... args)
CustomServiceRequest::buffer_
eprosima::fastcdr::FastBuffer * buffer_
Definition: custom_service_info.hpp:58
ServiceListener::attachCondition
void attachCondition(std::mutex *conditionMutex, std::condition_variable *conditionVariable)
Definition: custom_service_info.hpp:142
thread_safety_annotations.hpp
std::condition_variable
ServiceListener::detachCondition
void detachCondition()
Definition: custom_service_info.hpp:150
CustomServiceInfo::pub_listener_
ServicePubListener * pub_listener_
Definition: custom_service_info.hpp:50
ServiceListener::onNewDataMessage
void onNewDataMessage(eprosima::fastrtps::Subscriber *sub)
Definition: custom_service_info.hpp:77
CustomServiceInfo
struct CustomServiceInfo CustomServiceInfo
CustomServiceInfo::request_subscriber_
eprosima::fastrtps::Subscriber * request_subscriber_
Definition: custom_service_info.hpp:47
ServiceListener::ServiceListener
ServiceListener(CustomServiceInfo *info)
Definition: custom_service_info.hpp:68
std::mutex
ServicePubListener
Definition: custom_service_info.hpp:172
std::condition_variable::notify_all
T notify_all(T... args)
ServiceListener::getRequest
CustomServiceRequest getRequest()
Definition: custom_service_info.hpp:118