rmw_fastrtps_shared_cpp  master
Code shared on static and dynamic type support of rmw_fastrtps_cpp.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
custom_subscriber_info.hpp
Go to the documentation of this file.
1 // Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_
16 #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_
17 
18 #include <atomic>
19 #include <condition_variable>
20 #include <mutex>
21 #include <set>
22 #include <utility>
23 
24 #include "fastdds/dds/core/status/DeadlineMissedStatus.hpp"
25 #include "fastdds/dds/core/status/LivelinessChangedStatus.hpp"
26 #include "fastdds/dds/core/status/SubscriptionMatchedStatus.hpp"
27 #include "fastdds/dds/subscriber/DataReader.hpp"
28 #include "fastdds/dds/subscriber/DataReaderListener.hpp"
29 #include "fastdds/dds/topic/TypeSupport.hpp"
30 
31 #include "fastdds/rtps/common/Guid.h"
32 #include "fastdds/rtps/common/InstanceHandle.h"
33 
35 
36 #include "rmw/impl/cpp/macros.hpp"
37 
39 
40 
41 class SubListener;
42 
44 {
45  virtual ~CustomSubscriberInfo() = default;
46 
47  eprosima::fastdds::dds::DataReader * data_reader_ {nullptr};
48  SubListener * listener_{nullptr};
49  eprosima::fastdds::dds::TypeSupport type_support_;
50  const void * type_support_impl_{nullptr};
52  const char * typesupport_identifier_{nullptr};
53 
56  getListener() const final;
57 };
58 
59 class SubListener : public EventListenerInterface, public eprosima::fastdds::dds::DataReaderListener
60 {
61 public:
63  : data_(false),
64  deadline_changes_(false),
65  liveliness_changes_(false),
66  conditionMutex_(nullptr),
67  conditionVariable_(nullptr)
68  {
69  // Field is not used right now
70  (void)info;
71  }
72 
73  // DataReaderListener implementation
74  void
76  eprosima::fastdds::dds::DataReader * reader,
77  const eprosima::fastdds::dds::SubscriptionMatchedStatus & info) final
78  {
79  {
80  std::lock_guard<std::mutex> lock(internalMutex_);
81  if (info.current_count_change == 1) {
82  publishers_.insert(eprosima::fastrtps::rtps::iHandle2GUID(info.last_publication_handle));
83  } else if (info.current_count_change == -1) {
84  publishers_.erase(eprosima::fastrtps::rtps::iHandle2GUID(info.last_publication_handle));
85  }
86  }
87  update_has_data(reader);
88  }
89 
90  void
91  on_data_available(eprosima::fastdds::dds::DataReader * reader) final
92  {
93  update_has_data(reader);
94  }
95 
97  void
98  on_requested_deadline_missed(
99  eprosima::fastdds::dds::DataReader *,
100  const eprosima::fastrtps::RequestedDeadlineMissedStatus &) final;
101 
103  void
104  on_liveliness_changed(
105  eprosima::fastdds::dds::DataReader *,
106  const eprosima::fastrtps::LivelinessChangedStatus &) final;
107 
108  // EventListenerInterface implementation
110  bool
111  hasEvent(rmw_event_type_t event_type) const final;
112 
114  bool
115  takeNextEvent(rmw_event_type_t event_type, void * event_info) final;
116 
117  // SubListener API
118  void
119  attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable)
120  {
121  std::lock_guard<std::mutex> lock(internalMutex_);
122  conditionMutex_ = conditionMutex;
123  conditionVariable_ = conditionVariable;
124  }
125 
126  void
128  {
129  std::lock_guard<std::mutex> lock(internalMutex_);
130  conditionMutex_ = nullptr;
131  conditionVariable_ = nullptr;
132  }
133 
134  bool
135  hasData() const
136  {
137  return data_.load(std::memory_order_relaxed);
138  }
139 
140  void
141  update_has_data(eprosima::fastdds::dds::DataReader * reader)
142  {
143  // Make sure to call into Fast DDS before taking the lock to avoid an
144  // ABBA deadlock between internalMutex_ and mutexes inside of Fast DDS.
145  auto unread_count = reader->get_unread_count();
146  bool has_data = unread_count > 0;
147 
148  std::lock_guard<std::mutex> lock(internalMutex_);
149  ConditionalScopedLock clock(conditionMutex_, conditionVariable_);
150  data_.store(has_data, std::memory_order_relaxed);
151  }
152 
153  size_t publisherCount()
154  {
155  std::lock_guard<std::mutex> lock(internalMutex_);
156  return publishers_.size();
157  }
158 
159 private:
160  mutable std::mutex internalMutex_;
161 
162  std::atomic_bool data_;
163 
164  std::atomic_bool deadline_changes_;
165  eprosima::fastdds::dds::RequestedDeadlineMissedStatus requested_deadline_missed_status_
166  RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
167 
168  std::atomic_bool liveliness_changes_;
169  eprosima::fastdds::dds::LivelinessChangedStatus liveliness_changed_status_
170  RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
171 
172  std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
173  std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
174 
176 };
177 
178 #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_
CustomSubscriberInfo::typesupport_identifier_
const char * typesupport_identifier_
Definition: custom_subscriber_info.hpp:52
SubListener::on_data_available
void on_data_available(eprosima::fastdds::dds::DataReader *reader) final
Definition: custom_subscriber_info.hpp:91
CustomSubscriberInfo::data_reader_
eprosima::fastdds::dds::DataReader * data_reader_
Definition: custom_subscriber_info.hpp:47
rmw_event_type_t
rmw_event_type_t
RCPPUTILS_TSA_GUARDED_BY
#define RCPPUTILS_TSA_GUARDED_BY(x)
SubListener::publisherCount
size_t publisherCount()
Definition: custom_subscriber_info.hpp:153
std::lock_guard
RMW_FASTRTPS_SHARED_CPP_PUBLIC
#define RMW_FASTRTPS_SHARED_CPP_PUBLIC
Definition: visibility_control.h:50
SubListener::on_subscription_matched
void on_subscription_matched(eprosima::fastdds::dds::DataReader *reader, const eprosima::fastdds::dds::SubscriptionMatchedStatus &info) final
Definition: custom_subscriber_info.hpp:75
SubListener::attachCondition
void attachCondition(std::mutex *conditionMutex, std::condition_variable *conditionVariable)
Connect a condition variable so a waiter can be notified of new data.
Definition: custom_subscriber_info.hpp:119
CustomSubscriberInfo::subscription_gid_
rmw_gid_t subscription_gid_
Definition: custom_subscriber_info.hpp:51
CustomSubscriberInfo::listener_
SubListener * listener_
Definition: custom_subscriber_info.hpp:48
CustomSubscriberInfo::getListener
EventListenerInterface * getListener() const final
CustomSubscriberInfo::type_support_
eprosima::fastdds::dds::TypeSupport type_support_
Definition: custom_subscriber_info.hpp:49
CustomSubscriberInfo::type_support_impl_
const void * type_support_impl_
Definition: custom_subscriber_info.hpp:50
SubListener::update_has_data
void update_has_data(eprosima::fastdds::dds::DataReader *reader)
Definition: custom_subscriber_info.hpp:141
CustomSubscriberInfo
Definition: custom_subscriber_info.hpp:43
SubListener::detachCondition
void detachCondition()
Unset the information from attachCondition.
Definition: custom_subscriber_info.hpp:127
EventListenerInterface::ConditionalScopedLock
Definition: custom_event_info.hpp:63
CustomSubscriberInfo::~CustomSubscriberInfo
virtual ~CustomSubscriberInfo()=default
SubListener::hasData
bool hasData() const
Definition: custom_subscriber_info.hpp:135
SubListener
Definition: custom_subscriber_info.hpp:59
rmw_gid_t
thread_safety_annotations.hpp
std::condition_variable
custom_event_info.hpp
SubListener::SubListener
SubListener(CustomSubscriberInfo *info)
Definition: custom_subscriber_info.hpp:62
std::mutex
macros.hpp
std::set< eprosima::fastrtps::rtps::GUID_t >
CustomEventInfo
Definition: custom_event_info.hpp:91
EventListenerInterface
Definition: custom_event_info.hpp:32