rmw_fastrtps_shared_cpp  master
Code shared on static and dynamic type support of rmw_fastrtps_cpp.
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
custom_subscriber_info.hpp
Go to the documentation of this file.
1 // Copyright 2016-2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_
16 #define RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_
17 
18 #include <atomic>
19 #include <condition_variable>
20 #include <mutex>
21 #include <set>
22 #include <utility>
23 
24 #include "fastrtps/subscriber/Subscriber.h"
25 #include "fastrtps/subscriber/SubscriberListener.h"
26 
28 
29 #include "rmw/impl/cpp/macros.hpp"
30 
33 
34 
35 class SubListener;
36 
38 {
39  virtual ~CustomSubscriberInfo() = default;
40 
41  eprosima::fastrtps::Subscriber * subscriber_;
44  const void * type_support_impl_;
47 
50  getListener() const final;
51 };
52 
53 class SubListener : public EventListenerInterface, public eprosima::fastrtps::SubscriberListener
54 {
55 public:
57  : data_(0),
58  deadline_changes_(false),
59  liveliness_changes_(false),
60  conditionMutex_(nullptr),
61  conditionVariable_(nullptr)
62  {
63  // Field is not used right now
64  (void)info;
65  }
66 
67  // SubscriberListener implementation
68  void
70  eprosima::fastrtps::Subscriber * sub, eprosima::fastrtps::rtps::MatchingInfo & info) final
71  {
72  {
73  std::lock_guard<std::mutex> lock(internalMutex_);
74  if (eprosima::fastrtps::rtps::MATCHED_MATCHING == info.status) {
75  publishers_.insert(info.remoteEndpointGuid);
76  } else if (eprosima::fastrtps::rtps::REMOVED_MATCHING == info.status) {
77  publishers_.erase(info.remoteEndpointGuid);
78  }
79  }
80  data_taken(sub);
81  }
82 
83  void
84  onNewDataMessage(eprosima::fastrtps::Subscriber * sub) final
85  {
86  data_taken(sub);
87  }
88 
90  void
91  on_requested_deadline_missed(
92  eprosima::fastrtps::Subscriber *,
93  const eprosima::fastrtps::RequestedDeadlineMissedStatus &) final;
94 
96  void
97  on_liveliness_changed(
98  eprosima::fastrtps::Subscriber *,
99  const eprosima::fastrtps::LivelinessChangedStatus &) final;
100 
101  // EventListenerInterface implementation
103  bool
104  hasEvent(rmw_event_type_t event_type) const final;
105 
107  bool
108  takeNextEvent(rmw_event_type_t event_type, void * event_info) final;
109 
110  // SubListener API
111  void
112  attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable)
113  {
114  std::lock_guard<std::mutex> lock(internalMutex_);
115  conditionMutex_ = conditionMutex;
116  conditionVariable_ = conditionVariable;
117  }
118 
119  void
121  {
122  std::lock_guard<std::mutex> lock(internalMutex_);
123  conditionMutex_ = nullptr;
124  conditionVariable_ = nullptr;
125  }
126 
127  bool
128  hasData() const
129  {
130  return data_.load(std::memory_order_relaxed) > 0;
131  }
132 
133  void
134  data_taken(eprosima::fastrtps::Subscriber * sub)
135  {
136  // Make sure to call into Fast-RTPS before taking the lock to avoid an
137  // ABBA deadlock between internalMutex_ and mutexes inside of Fast-RTPS.
138 #if FASTRTPS_VERSION_MAJOR == 1 && FASTRTPS_VERSION_MINOR < 9
139  uint64_t unread_count = sub->getUnreadCount();
140 #else
141  uint64_t unread_count = sub->get_unread_count();
142 #endif
143 
144  std::lock_guard<std::mutex> lock(internalMutex_);
145  ConditionalScopedLock clock(conditionMutex_, conditionVariable_);
146  data_.store(unread_count, std::memory_order_relaxed);
147  }
148 
149  size_t publisherCount()
150  {
151  std::lock_guard<std::mutex> lock(internalMutex_);
152  return publishers_.size();
153  }
154 
155 private:
156  mutable std::mutex internalMutex_;
157 
158  std::atomic_size_t data_;
159 
160  std::atomic_bool deadline_changes_;
161  eprosima::fastrtps::RequestedDeadlineMissedStatus requested_deadline_missed_status_
162  RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
163 
164  std::atomic_bool liveliness_changes_;
165  eprosima::fastrtps::LivelinessChangedStatus liveliness_changed_status_
166  RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
167 
168  std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
169  std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
170 
172 };
173 
174 #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_
CustomSubscriberInfo::typesupport_identifier_
const char * typesupport_identifier_
Definition: custom_subscriber_info.hpp:46
SubListener::onNewDataMessage
void onNewDataMessage(eprosima::fastrtps::Subscriber *sub) final
Definition: custom_subscriber_info.hpp:84
rmw_event_type_t
rmw_event_type_t
RCPPUTILS_TSA_GUARDED_BY
#define RCPPUTILS_TSA_GUARDED_BY(x)
rmw_fastrtps_shared_cpp::TypeSupport
Definition: TypeSupport.hpp:41
CustomSubscriberInfo::type_support_
rmw_fastrtps_shared_cpp::TypeSupport * type_support_
Definition: custom_subscriber_info.hpp:43
SubListener::publisherCount
size_t publisherCount()
Definition: custom_subscriber_info.hpp:149
std::lock_guard
RMW_FASTRTPS_SHARED_CPP_PUBLIC
#define RMW_FASTRTPS_SHARED_CPP_PUBLIC
Definition: visibility_control.h:50
SubListener::attachCondition
void attachCondition(std::mutex *conditionMutex, std::condition_variable *conditionVariable)
Connect a condition variable so a waiter can be notified of new data.
Definition: custom_subscriber_info.hpp:112
TypeSupport.hpp
CustomSubscriberInfo::subscription_gid_
rmw_gid_t subscription_gid_
Definition: custom_subscriber_info.hpp:45
CustomSubscriberInfo::listener_
SubListener * listener_
Definition: custom_subscriber_info.hpp:42
CustomSubscriberInfo::getListener
EventListenerInterface * getListener() const final
CustomSubscriberInfo::type_support_impl_
const void * type_support_impl_
Definition: custom_subscriber_info.hpp:44
CustomSubscriberInfo
Definition: custom_subscriber_info.hpp:37
SubListener::data_taken
void data_taken(eprosima::fastrtps::Subscriber *sub)
Definition: custom_subscriber_info.hpp:134
SubListener::detachCondition
void detachCondition()
Unset the information from attachCondition.
Definition: custom_subscriber_info.hpp:120
EventListenerInterface::ConditionalScopedLock
Definition: custom_event_info.hpp:70
CustomSubscriberInfo::~CustomSubscriberInfo
virtual ~CustomSubscriberInfo()=default
SubListener::hasData
bool hasData() const
Definition: custom_subscriber_info.hpp:128
SubListener
Definition: custom_subscriber_info.hpp:53
rmw_gid_t
thread_safety_annotations.hpp
std::condition_variable
CustomSubscriberInfo::subscriber_
eprosima::fastrtps::Subscriber * subscriber_
Definition: custom_subscriber_info.hpp:41
custom_event_info.hpp
SubListener::SubListener
SubListener(CustomSubscriberInfo *info)
Definition: custom_subscriber_info.hpp:56
std::mutex
eprosima
Definition: qos.hpp:29
macros.hpp
std::set< eprosima::fastrtps::rtps::GUID_t >
CustomEventInfo
Definition: custom_event_info.hpp:98
EventListenerInterface
Definition: custom_event_info.hpp:39
SubListener::onSubscriptionMatched
void onSubscriptionMatched(eprosima::fastrtps::Subscriber *sub, eprosima::fastrtps::rtps::MatchingInfo &info) final
Definition: custom_subscriber_info.hpp:69