rmw_fastrtps_shared_cpp  master
Code shared on static and dynamic type support of rmw_fastrtps_cpp.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
custom_client_info.hpp
Go to the documentation of this file.
1 // Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_CLIENT_INFO_HPP_
16 #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_CLIENT_INFO_HPP_
17 
18 #include <atomic>
19 #include <condition_variable>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <set>
24 #include <utility>
25 #include <string>
26 
27 #include "fastcdr/FastBuffer.h"
28 
29 #include "fastdds/dds/core/status/PublicationMatchedStatus.hpp"
30 #include "fastdds/dds/core/status/SubscriptionMatchedStatus.hpp"
31 #include "fastdds/dds/publisher/DataWriter.hpp"
32 #include "fastdds/dds/publisher/DataWriterListener.hpp"
33 #include "fastdds/dds/subscriber/DataReader.hpp"
34 #include "fastdds/dds/subscriber/DataReaderListener.hpp"
35 #include "fastdds/dds/subscriber/SampleInfo.hpp"
36 #include "fastdds/dds/topic/TypeSupport.hpp"
37 
38 #include "fastdds/rtps/common/Guid.h"
39 #include "fastdds/rtps/common/InstanceHandle.h"
40 #include "fastdds/rtps/common/SampleIdentity.h"
41 
43 
45 
46 class ClientListener;
47 class ClientPubListener;
48 
49 typedef struct CustomClientInfo
50 {
51  eprosima::fastdds::dds::TypeSupport request_type_support_{nullptr};
52  const void * request_type_support_impl_{nullptr};
53  eprosima::fastdds::dds::TypeSupport response_type_support_{nullptr};
54  const void * response_type_support_impl_{nullptr};
55  eprosima::fastdds::dds::DataReader * response_reader_{nullptr};
56  eprosima::fastdds::dds::DataWriter * request_writer_{nullptr};
57 
60 
62  eprosima::fastrtps::rtps::GUID_t writer_guid_;
63  eprosima::fastrtps::rtps::GUID_t reader_guid_;
64 
65  const char * typesupport_identifier_{nullptr};
70 
71 typedef struct CustomClientResponse
72 {
73  eprosima::fastrtps::rtps::SampleIdentity sample_identity_;
75  eprosima::fastdds::dds::SampleInfo sample_info_ {};
77 
78 class ClientListener : public eprosima::fastdds::dds::DataReaderListener
79 {
80 public:
82  : info_(info), list_has_data_(false),
83  conditionMutex_(nullptr), conditionVariable_(nullptr) {}
84 
85 
86  void
87  on_data_available(eprosima::fastdds::dds::DataReader * reader)
88  {
89  assert(reader);
90 
91  CustomClientResponse response;
92  // Todo(sloretz) eliminate heap allocation pending eprosima/Fast-CDR#19
93  response.buffer_.reset(new eprosima::fastcdr::FastBuffer());
94 
96  data.is_cdr_buffer = true;
97  data.data = response.buffer_.get();
98  data.impl = nullptr; // not used when is_cdr_buffer is true
99  if (reader->take_next_sample(&data, &response.sample_info_) == ReturnCode_t::RETCODE_OK) {
100  if (response.sample_info_.valid_data) {
101  response.sample_identity_ = response.sample_info_.related_sample_identity;
102 
103  if (response.sample_identity_.writer_guid() == info_->reader_guid_ ||
104  response.sample_identity_.writer_guid() == info_->writer_guid_)
105  {
106  std::lock_guard<std::mutex> lock(internalMutex_);
107 
108  if (conditionMutex_ != nullptr) {
109  std::unique_lock<std::mutex> clock(*conditionMutex_);
110  list.emplace_back(std::move(response));
111  // the change to list_has_data_ needs to be mutually exclusive with
112  // rmw_wait() which checks hasData() and decides if wait() needs to
113  // be called
114  list_has_data_.store(true);
115  clock.unlock();
116  conditionVariable_->notify_one();
117  } else {
118  list.emplace_back(std::move(response));
119  list_has_data_.store(true);
120  }
121  }
122  }
123  }
124  }
125 
126  bool
128  {
129  std::lock_guard<std::mutex> lock(internalMutex_);
130 
131  if (conditionMutex_ != nullptr) {
132  std::unique_lock<std::mutex> clock(*conditionMutex_);
133  return popResponse(response);
134  }
135  return popResponse(response);
136  }
137 
138  void
139  attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable)
140  {
141  std::lock_guard<std::mutex> lock(internalMutex_);
142  conditionMutex_ = conditionMutex;
143  conditionVariable_ = conditionVariable;
144  }
145 
146  void
148  {
149  std::lock_guard<std::mutex> lock(internalMutex_);
150  conditionMutex_ = nullptr;
151  conditionVariable_ = nullptr;
152  }
153 
154  bool
156  {
157  return list_has_data_.load();
158  }
159 
161  eprosima::fastdds::dds::DataReader * /* reader */,
162  const eprosima::fastdds::dds::SubscriptionMatchedStatus & info) final
163  {
164  if (info_ == nullptr) {
165  return;
166  }
167  if (info.current_count_change == 1) {
168  publishers_.insert(eprosima::fastrtps::rtps::iHandle2GUID(info.last_publication_handle));
169  } else if (info.current_count_change == -1) {
170  publishers_.erase(eprosima::fastrtps::rtps::iHandle2GUID(info.last_publication_handle));
171  } else {
172  return;
173  }
174  info_->response_subscriber_matched_count_.store(publishers_.size());
175  }
176 
177 private:
178  bool popResponse(CustomClientResponse & response) RCPPUTILS_TSA_REQUIRES(internalMutex_)
179  {
180  if (!list.empty()) {
181  response = std::move(list.front());
182  list.pop_front();
183  list_has_data_.store(!list.empty());
184  return true;
185  }
186  return false;
187  };
188 
189  CustomClientInfo * info_;
190  std::mutex internalMutex_;
191  std::list<CustomClientResponse> list RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
192  std::atomic_bool list_has_data_;
193  std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
194  std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
196 };
197 
198 class ClientPubListener : public eprosima::fastdds::dds::DataWriterListener
199 {
200 public:
202  : info_(info)
203  {
204  }
205 
207  eprosima::fastdds::dds::DataWriter * /* writer */,
208  const eprosima::fastdds::dds::PublicationMatchedStatus & info) final
209  {
210  if (info_ == nullptr) {
211  return;
212  }
213  if (info.current_count_change == 1) {
214  subscriptions_.insert(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle));
215  } else if (info.current_count_change == -1) {
216  subscriptions_.erase(eprosima::fastrtps::rtps::iHandle2GUID(info.last_subscription_handle));
217  } else {
218  return;
219  }
220  info_->request_publisher_matched_count_.store(subscriptions_.size());
221  }
222 
223 private:
224  CustomClientInfo * info_;
226 };
227 
228 #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_CLIENT_INFO_HPP_
rmw_fastrtps_shared_cpp::SerializedData::impl
const void * impl
Definition: TypeSupport.hpp:41
ClientListener::hasData
bool hasData()
Definition: custom_client_info.hpp:155
CustomClientResponse::buffer_
std::unique_ptr< eprosima::fastcdr::FastBuffer > buffer_
Definition: custom_client_info.hpp:74
std::string
std::list
std::move
T move(T... args)
CustomClientInfo::reader_guid_
eprosima::fastrtps::rtps::GUID_t reader_guid_
Definition: custom_client_info.hpp:63
CustomClientResponse::sample_identity_
eprosima::fastrtps::rtps::SampleIdentity sample_identity_
Definition: custom_client_info.hpp:73
CustomClientResponse
struct CustomClientResponse CustomClientResponse
CustomClientInfo::request_publisher_matched_count_
std::atomic_size_t request_publisher_matched_count_
Definition: custom_client_info.hpp:68
std::set::size
T size(T... args)
rmw_fastrtps_shared_cpp::SerializedData::is_cdr_buffer
bool is_cdr_buffer
Definition: TypeSupport.hpp:39
ClientListener::attachCondition
void attachCondition(std::mutex *conditionMutex, std::condition_variable *conditionVariable)
Definition: custom_client_info.hpp:139
std::unique_ptr::get
T get(T... args)
std::lock_guard
TypeSupport.hpp
RCPPUTILS_TSA_REQUIRES
#define RCPPUTILS_TSA_REQUIRES(...)
ClientListener::detachCondition
void detachCondition()
Definition: custom_client_info.hpp:147
std::unique_ptr::reset
T reset(T... args)
CustomClientInfo::typesupport_identifier_
const char * typesupport_identifier_
Definition: custom_client_info.hpp:65
ClientPubListener
Definition: custom_client_info.hpp:198
CustomClientInfo::response_topic_
std::string response_topic_
Definition: custom_client_info.hpp:59
CustomClientInfo::response_type_support_impl_
const void * response_type_support_impl_
Definition: custom_client_info.hpp:54
rmw_fastrtps_shared_cpp::SerializedData
Definition: TypeSupport.hpp:37
rmw_fastrtps_shared_cpp::SerializedData::data
void * data
Definition: TypeSupport.hpp:40
ClientPubListener::on_publication_matched
void on_publication_matched(eprosima::fastdds::dds::DataWriter *, const eprosima::fastdds::dds::PublicationMatchedStatus &info) final
Definition: custom_client_info.hpp:206
std::unique_lock
ClientListener::on_data_available
void on_data_available(eprosima::fastdds::dds::DataReader *reader)
Definition: custom_client_info.hpp:87
CustomClientInfo::request_writer_
eprosima::fastdds::dds::DataWriter * request_writer_
Definition: custom_client_info.hpp:56
std::set::erase
T erase(T... args)
CustomClientInfo::response_subscriber_matched_count_
std::atomic_size_t response_subscriber_matched_count_
Definition: custom_client_info.hpp:67
ClientListener
Definition: custom_client_info.hpp:78
CustomClientInfo
Definition: custom_client_info.hpp:49
CustomClientInfo::writer_guid_
eprosima::fastrtps::rtps::GUID_t writer_guid_
Definition: custom_client_info.hpp:62
CustomClientInfo::request_topic_
std::string request_topic_
Definition: custom_client_info.hpp:58
CustomClientInfo::listener_
ClientListener * listener_
Definition: custom_client_info.hpp:61
CustomClientResponse
Definition: custom_client_info.hpp:71
ClientListener::on_subscription_matched
void on_subscription_matched(eprosima::fastdds::dds::DataReader *, const eprosima::fastdds::dds::SubscriptionMatchedStatus &info) final
Definition: custom_client_info.hpp:160
CustomClientInfo
struct CustomClientInfo CustomClientInfo
CustomClientInfo::request_type_support_
eprosima::fastdds::dds::TypeSupport request_type_support_
Definition: custom_client_info.hpp:51
std::set::insert
T insert(T... args)
ClientListener::ClientListener
ClientListener(CustomClientInfo *info)
Definition: custom_client_info.hpp:81
thread_safety_annotations.hpp
std::condition_variable
CustomClientResponse::sample_info_
eprosima::fastdds::dds::SampleInfo sample_info_
Definition: custom_client_info.hpp:75
CustomClientInfo::pub_listener_
ClientPubListener * pub_listener_
Definition: custom_client_info.hpp:66
std::mutex
CustomClientInfo::response_type_support_
eprosima::fastdds::dds::TypeSupport response_type_support_
Definition: custom_client_info.hpp:53
CustomClientInfo::request_type_support_impl_
const void * request_type_support_impl_
Definition: custom_client_info.hpp:52
std::unique_ptr< eprosima::fastcdr::FastBuffer >
ClientListener::getResponse
bool getResponse(CustomClientResponse &response)
Definition: custom_client_info.hpp:127
CustomClientInfo::response_reader_
eprosima::fastdds::dds::DataReader * response_reader_
Definition: custom_client_info.hpp:55
std::set< eprosima::fastrtps::rtps::GUID_t >
ClientPubListener::ClientPubListener
ClientPubListener(CustomClientInfo *info)
Definition: custom_client_info.hpp:201