rmw_fastrtps_shared_cpp  master
Code shared on static and dynamic type support of rmw_fastrtps_cpp.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
custom_client_info.hpp
Go to the documentation of this file.
1 // Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_CLIENT_INFO_HPP_
16 #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_CLIENT_INFO_HPP_
17 
18 #include <atomic>
19 #include <condition_variable>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <set>
24 #include <utility>
25 
26 #include "fastcdr/FastBuffer.h"
27 
28 #include "fastrtps/subscriber/SampleInfo.h"
29 #include "fastrtps/subscriber/Subscriber.h"
30 #include "fastrtps/subscriber/SubscriberListener.h"
31 #include "fastrtps/participant/Participant.h"
32 #include "fastrtps/publisher/Publisher.h"
33 #include "fastrtps/publisher/PublisherListener.h"
34 
36 
38 
39 class ClientListener;
40 class ClientPubListener;
41 
42 typedef struct CustomClientInfo
43 {
48  eprosima::fastrtps::Subscriber * response_subscriber_;
49  eprosima::fastrtps::Publisher * request_publisher_;
51  eprosima::fastrtps::rtps::GUID_t writer_guid_;
52  eprosima::fastrtps::rtps::GUID_t reader_guid_;
53  eprosima::fastrtps::Participant * participant_;
59 
60 typedef struct CustomClientResponse
61 {
62  eprosima::fastrtps::rtps::SampleIdentity sample_identity_;
64  eprosima::fastrtps::SampleInfo_t sample_info_ {};
66 
67 class ClientListener : public eprosima::fastrtps::SubscriberListener
68 {
69 public:
71  : info_(info), list_has_data_(false),
72  conditionMutex_(nullptr), conditionVariable_(nullptr) {}
73 
74 
75  void
76  onNewDataMessage(eprosima::fastrtps::Subscriber * sub)
77  {
78  assert(sub);
79 
80  CustomClientResponse response;
81  // Todo(sloretz) eliminate heap allocation pending eprosima/Fast-CDR#19
82  response.buffer_.reset(new eprosima::fastcdr::FastBuffer());
83 
85  data.is_cdr_buffer = true;
86  data.data = response.buffer_.get();
87  data.impl = nullptr; // not used when is_cdr_buffer is true
88  if (sub->takeNextData(&data, &response.sample_info_)) {
89  if (eprosima::fastrtps::rtps::ALIVE == response.sample_info_.sampleKind) {
90  response.sample_identity_ = response.sample_info_.related_sample_identity;
91 
92  if (response.sample_identity_.writer_guid() == info_->reader_guid_ ||
93  response.sample_identity_.writer_guid() == info_->writer_guid_)
94  {
95  std::lock_guard<std::mutex> lock(internalMutex_);
96 
97  if (conditionMutex_ != nullptr) {
98  std::unique_lock<std::mutex> clock(*conditionMutex_);
99  list.emplace_back(std::move(response));
100  // the change to list_has_data_ needs to be mutually exclusive with
101  // rmw_wait() which checks hasData() and decides if wait() needs to
102  // be called
103  list_has_data_.store(true);
104  clock.unlock();
105  conditionVariable_->notify_one();
106  } else {
107  list.emplace_back(std::move(response));
108  list_has_data_.store(true);
109  }
110  }
111  }
112  }
113  }
114 
115  bool
117  {
118  std::lock_guard<std::mutex> lock(internalMutex_);
119 
120  if (conditionMutex_ != nullptr) {
121  std::unique_lock<std::mutex> clock(*conditionMutex_);
122  return popResponse(response);
123  }
124  return popResponse(response);
125  }
126 
127  void
128  attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable)
129  {
130  std::lock_guard<std::mutex> lock(internalMutex_);
131  conditionMutex_ = conditionMutex;
132  conditionVariable_ = conditionVariable;
133  }
134 
135  void
137  {
138  std::lock_guard<std::mutex> lock(internalMutex_);
139  conditionMutex_ = nullptr;
140  conditionVariable_ = nullptr;
141  }
142 
143  bool
145  {
146  return list_has_data_.load();
147  }
148 
150  eprosima::fastrtps::Subscriber * sub,
151  eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
152  {
153  (void)sub;
154  if (info_ == nullptr) {
155  return;
156  }
157  if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
158  publishers_.insert(matchingInfo.remoteEndpointGuid);
159  } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
160  publishers_.erase(matchingInfo.remoteEndpointGuid);
161  } else {
162  return;
163  }
164  info_->response_subscriber_matched_count_.store(publishers_.size());
165  }
166 
167 private:
168  bool popResponse(CustomClientResponse & response) RCPPUTILS_TSA_REQUIRES(internalMutex_)
169  {
170  if (!list.empty()) {
171  response = std::move(list.front());
172  list.pop_front();
173  list_has_data_.store(!list.empty());
174  return true;
175  }
176  return false;
177  };
178 
179  CustomClientInfo * info_;
180  std::mutex internalMutex_;
181  std::list<CustomClientResponse> list RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
182  std::atomic_bool list_has_data_;
183  std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
184  std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
186 };
187 
188 class ClientPubListener : public eprosima::fastrtps::PublisherListener
189 {
190 public:
192  : info_(info)
193  {
194  }
195 
197  eprosima::fastrtps::Publisher * pub,
198  eprosima::fastrtps::rtps::MatchingInfo & matchingInfo)
199  {
200  (void) pub;
201  if (info_ == nullptr) {
202  return;
203  }
204  if (eprosima::fastrtps::rtps::MATCHED_MATCHING == matchingInfo.status) {
205  subscriptions_.insert(matchingInfo.remoteEndpointGuid);
206  } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == matchingInfo.status) {
207  subscriptions_.erase(matchingInfo.remoteEndpointGuid);
208  } else {
209  return;
210  }
211  info_->request_publisher_matched_count_.store(subscriptions_.size());
212  }
213 
214 private:
215  CustomClientInfo * info_;
217 };
218 
219 #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_CLIENT_INFO_HPP_
CustomClientInfo::response_type_support_
rmw_fastrtps_shared_cpp::TypeSupport * response_type_support_
Definition: custom_client_info.hpp:46
rmw_fastrtps_shared_cpp::SerializedData::impl
const void * impl
Definition: TypeSupport.hpp:38
ClientListener::hasData
bool hasData()
Definition: custom_client_info.hpp:144
CustomClientResponse::buffer_
std::unique_ptr< eprosima::fastcdr::FastBuffer > buffer_
Definition: custom_client_info.hpp:63
std::list
std::move
T move(T... args)
CustomClientInfo::reader_guid_
eprosima::fastrtps::rtps::GUID_t reader_guid_
Definition: custom_client_info.hpp:52
CustomClientInfo::request_type_support_
rmw_fastrtps_shared_cpp::TypeSupport * request_type_support_
Definition: custom_client_info.hpp:44
CustomClientResponse::sample_identity_
eprosima::fastrtps::rtps::SampleIdentity sample_identity_
Definition: custom_client_info.hpp:62
CustomClientResponse
struct CustomClientResponse CustomClientResponse
CustomClientInfo::request_publisher_matched_count_
std::atomic_size_t request_publisher_matched_count_
Definition: custom_client_info.hpp:57
std::set::size
T size(T... args)
rmw_fastrtps_shared_cpp::TypeSupport
Definition: TypeSupport.hpp:41
rmw_fastrtps_shared_cpp::SerializedData::is_cdr_buffer
bool is_cdr_buffer
Definition: TypeSupport.hpp:36
ClientListener::attachCondition
void attachCondition(std::mutex *conditionMutex, std::condition_variable *conditionVariable)
Definition: custom_client_info.hpp:128
std::unique_ptr::get
T get(T... args)
std::lock_guard
CustomClientInfo::request_publisher_
eprosima::fastrtps::Publisher * request_publisher_
Definition: custom_client_info.hpp:49
TypeSupport.hpp
RCPPUTILS_TSA_REQUIRES
#define RCPPUTILS_TSA_REQUIRES(...)
ClientListener::detachCondition
void detachCondition()
Definition: custom_client_info.hpp:136
ClientPubListener::onPublicationMatched
void onPublicationMatched(eprosima::fastrtps::Publisher *pub, eprosima::fastrtps::rtps::MatchingInfo &matchingInfo)
Definition: custom_client_info.hpp:196
std::unique_ptr::reset
T reset(T... args)
CustomClientInfo::typesupport_identifier_
const char * typesupport_identifier_
Definition: custom_client_info.hpp:54
ClientPubListener
Definition: custom_client_info.hpp:188
CustomClientInfo::response_type_support_impl_
const void * response_type_support_impl_
Definition: custom_client_info.hpp:47
rmw_fastrtps_shared_cpp::SerializedData
Definition: TypeSupport.hpp:34
rmw_fastrtps_shared_cpp::SerializedData::data
void * data
Definition: TypeSupport.hpp:37
std::unique_lock
ClientListener::onSubscriptionMatched
void onSubscriptionMatched(eprosima::fastrtps::Subscriber *sub, eprosima::fastrtps::rtps::MatchingInfo &matchingInfo)
Definition: custom_client_info.hpp:149
std::set::erase
T erase(T... args)
CustomClientInfo::response_subscriber_matched_count_
std::atomic_size_t response_subscriber_matched_count_
Definition: custom_client_info.hpp:56
ClientListener
Definition: custom_client_info.hpp:67
CustomClientInfo
Definition: custom_client_info.hpp:42
CustomClientInfo::writer_guid_
eprosima::fastrtps::rtps::GUID_t writer_guid_
Definition: custom_client_info.hpp:51
CustomClientInfo::listener_
ClientListener * listener_
Definition: custom_client_info.hpp:50
CustomClientResponse
Definition: custom_client_info.hpp:60
CustomClientInfo::participant_
eprosima::fastrtps::Participant * participant_
Definition: custom_client_info.hpp:53
CustomClientInfo
struct CustomClientInfo CustomClientInfo
ClientListener::onNewDataMessage
void onNewDataMessage(eprosima::fastrtps::Subscriber *sub)
Definition: custom_client_info.hpp:76
std::set::insert
T insert(T... args)
ClientListener::ClientListener
ClientListener(CustomClientInfo *info)
Definition: custom_client_info.hpp:70
thread_safety_annotations.hpp
std::condition_variable
CustomClientInfo::pub_listener_
ClientPubListener * pub_listener_
Definition: custom_client_info.hpp:55
std::mutex
CustomClientInfo::request_type_support_impl_
const void * request_type_support_impl_
Definition: custom_client_info.hpp:45
CustomClientResponse::sample_info_
eprosima::fastrtps::SampleInfo_t sample_info_
Definition: custom_client_info.hpp:64
std::unique_ptr< eprosima::fastcdr::FastBuffer >
CustomClientInfo::response_subscriber_
eprosima::fastrtps::Subscriber * response_subscriber_
Definition: custom_client_info.hpp:48
ClientListener::getResponse
bool getResponse(CustomClientResponse &response)
Definition: custom_client_info.hpp:116
std::set< eprosima::fastrtps::rtps::GUID_t >
ClientPubListener::ClientPubListener
ClientPubListener(CustomClientInfo *info)
Definition: custom_client_info.hpp:191