rclcpp  master
C++ ROS Client Library API
subscription_topic_statistics.hpp
Go to the documentation of this file.
1 // Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_
16 #define RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_
17 
18 #include <memory>
19 #include <string>
20 #include <utility>
21 #include <vector>
22 
23 #include "libstatistics_collector/collector/generate_statistics_message.hpp"
24 #include "libstatistics_collector/moving_average_statistics/types.hpp"
25 #include "libstatistics_collector/topic_statistics_collector/constants.hpp"
26 #include "libstatistics_collector/topic_statistics_collector/received_message_age.hpp"
27 #include "libstatistics_collector/topic_statistics_collector/received_message_period.hpp"
28 
29 #include "rcl/time.h"
30 #include "rclcpp/time.hpp"
31 #include "rclcpp/publisher.hpp"
32 #include "rclcpp/timer.hpp"
33 
34 #include "statistics_msgs/msg/metrics_message.hpp"
35 
36 namespace rclcpp
37 {
38 namespace topic_statistics
39 {
40 
41 constexpr const char kDefaultPublishTopicName[]{"/statistics"};
43 
44 using libstatistics_collector::collector::GenerateStatisticMessage;
45 using statistics_msgs::msg::MetricsMessage;
46 using libstatistics_collector::moving_average_statistics::StatisticData;
47 
54 template<typename CallbackMessageT>
56 {
57  using TopicStatsCollector =
58  libstatistics_collector::topic_statistics_collector::TopicStatisticsCollector<
59  CallbackMessageT>;
60  using ReceivedMessageAge =
61  libstatistics_collector::topic_statistics_collector::ReceivedMessageAgeCollector<
62  CallbackMessageT>;
63  using ReceivedMessagePeriod =
64  libstatistics_collector::topic_statistics_collector::ReceivedMessagePeriodCollector<
65  CallbackMessageT>;
66 
67 public:
69 
81  const std::string & node_name,
83  : node_name_(node_name),
84  publisher_(std::move(publisher))
85  {
86  // TODO(dbbonnie): ros-tooling/aws-roadmap/issues/226, received message age
87 
88  if (nullptr == publisher_) {
89  throw std::invalid_argument("publisher pointer is nullptr");
90  }
91 
92  bring_up();
93  }
94 
96  {
97  tear_down();
98  }
99 
101 
107  virtual void handle_message(
108  const CallbackMessageT & received_message,
109  const rclcpp::Time now_nanoseconds) const
110  {
111  std::lock_guard<std::mutex> lock(mutex_);
112  for (const auto & collector : subscriber_statistics_collectors_) {
113  collector->OnMessageReceived(received_message, now_nanoseconds.nanoseconds());
114  }
115  }
116 
118 
121  void set_publisher_timer(rclcpp::TimerBase::SharedPtr publisher_timer)
122  {
123  publisher_timer_ = publisher_timer;
124  }
125 
127 
131  {
133  rclcpp::Time window_end{get_current_nanoseconds_since_epoch()};
134 
135  {
136  std::lock_guard<std::mutex> lock(mutex_);
137  for (auto & collector : subscriber_statistics_collectors_) {
138  const auto collected_stats = collector->GetStatisticsResults();
139  collector->ClearCurrentMeasurements();
140 
141  auto message = libstatistics_collector::collector::GenerateStatisticMessage(
142  node_name_,
143  collector->GetMetricName(),
144  collector->GetMetricUnit(),
145  window_start_,
146  window_end,
147  collected_stats);
148  msgs.push_back(message);
149  }
150  }
151 
152  for (auto & msg : msgs) {
153  publisher_->publish(msg);
154  }
155  window_start_ = window_end;
156  }
157 
158 protected:
160 
166  {
168  std::lock_guard<std::mutex> lock(mutex_);
169  for (const auto & collector : subscriber_statistics_collectors_) {
170  data.push_back(collector->GetStatisticsResults());
171  }
172  return data;
173  }
174 
175 private:
177 
180  void bring_up()
181  {
182  auto received_message_age = std::make_unique<ReceivedMessageAge>();
183  received_message_age->Start();
184  subscriber_statistics_collectors_.emplace_back(std::move(received_message_age));
185 
186  auto received_message_period = std::make_unique<ReceivedMessagePeriod>();
187  received_message_period->Start();
188  {
189  std::lock_guard<std::mutex> lock(mutex_);
190  subscriber_statistics_collectors_.emplace_back(std::move(received_message_period));
191  }
192 
193  window_start_ = rclcpp::Time(get_current_nanoseconds_since_epoch());
194  }
195 
197 
200  void tear_down()
201  {
202  {
203  std::lock_guard<std::mutex> lock(mutex_);
204  for (auto & collector : subscriber_statistics_collectors_) {
205  collector->Stop();
206  }
207 
208  subscriber_statistics_collectors_.clear();
209  }
210 
211  if (publisher_timer_) {
212  publisher_timer_->cancel();
213  publisher_timer_.reset();
214  }
215 
216  publisher_.reset();
217  }
218 
220 
223  int64_t get_current_nanoseconds_since_epoch() const
224  {
225  const auto now = std::chrono::system_clock::now();
226  return std::chrono::duration_cast<std::chrono::nanoseconds>(now.time_since_epoch()).count();
227  }
228 
230  mutable std::mutex mutex_;
232  std::vector<std::unique_ptr<TopicStatsCollector>> subscriber_statistics_collectors_{};
234  const std::string node_name_;
238  rclcpp::TimerBase::SharedPtr publisher_timer_;
240  rclcpp::Time window_start_;
241 };
242 } // namespace topic_statistics
243 } // namespace rclcpp
244 
245 #endif // RCLCPP__TOPIC_STATISTICS__SUBSCRIPTION_TOPIC_STATISTICS_HPP_
rclcpp::topic_statistics::SubscriptionTopicStatistics
Definition: subscription_topic_statistics.hpp:55
rclcpp::topic_statistics::kDefaultPublishTopicName
constexpr const char kDefaultPublishTopicName[]
Definition: subscription_topic_statistics.hpp:41
rclcpp::topic_statistics::SubscriptionTopicStatistics::set_publisher_timer
void set_publisher_timer(rclcpp::TimerBase::SharedPtr publisher_timer)
Set the timer used to publish statistics messages.
Definition: subscription_topic_statistics.hpp:121
rclcpp::Publisher
A publisher publishes messages of any type to a topic.
Definition: publisher.hpp:53
std::string
std::move
T move(T... args)
rclcpp::topic_statistics::kDefaultPublishingPeriod
constexpr const std::chrono::milliseconds kDefaultPublishingPeriod
Definition: subscription_topic_statistics.hpp:42
std::vector
time.h
std::chrono::milliseconds
rclcpp::Time
Definition: time.hpp:31
std::lock_guard
rclcpp::topic_statistics::SubscriptionTopicStatistics::get_current_collector_data
std::vector< StatisticData > get_current_collector_data() const
Return a vector of all the currently collected data.
Definition: subscription_topic_statistics.hpp:165
rclcpp
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
publisher.hpp
rclcpp::Time::nanoseconds
rcl_time_point_value_t nanoseconds() const
Get the nanoseconds since epoch.
timer.hpp
std::vector::push_back
T push_back(T... args)
rclcpp::topic_statistics::SubscriptionTopicStatistics::publish_message_and_reset_measurements
virtual void publish_message_and_reset_measurements()
Publish a populated MetricsStatisticsMessage.
Definition: subscription_topic_statistics.hpp:130
rclcpp::topic_statistics::SubscriptionTopicStatistics::SubscriptionTopicStatistics
SubscriptionTopicStatistics(const std::string &node_name, rclcpp::Publisher< statistics_msgs::msg::MetricsMessage >::SharedPtr publisher)
Construct a SubscriptionTopicStatistics object.
Definition: subscription_topic_statistics.hpp:80
std::invalid_argument
rclcpp::topic_statistics::SubscriptionTopicStatistics::handle_message
virtual void handle_message(const CallbackMessageT &received_message, const rclcpp::Time now_nanoseconds) const
Handle a message received by the subscription to collect statistics.
Definition: subscription_topic_statistics.hpp:107
std::vector::emplace_back
T emplace_back(T... args)
std
std::mutex
time.hpp
rclcpp::Publisher::publish
virtual void publish(std::unique_ptr< MessageT, MessageDeleter > msg)
Send a message to the topic for this publisher.
Definition: publisher.hpp:187
rclcpp::topic_statistics::SubscriptionTopicStatistics::~SubscriptionTopicStatistics
virtual ~SubscriptionTopicStatistics()
Definition: subscription_topic_statistics.hpp:95
std::chrono::system_clock::now
T now(T... args)