32 #ifndef TF2_ROS_MESSAGE_FILTER_H 
   33 #define TF2_ROS_MESSAGE_FILTER_H 
   41 #include <message_filters/connection.h> 
   42 #include <message_filters/message_traits.h> 
   43 #include <message_filters/simple_filter.h> 
   44 #include <rclcpp/rclcpp.hpp> 
   45 #include <tf2/buffer_core_interface.h> 
   50 #define TF2_ROS_MESSAGEFILTER_DEBUG(fmt, ...) \ 
   51   RCUTILS_LOG_DEBUG_NAMED("tf2_ros_message_filter", \ 
   52     std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), \ 
   53     getTargetFramesString().c_str(), __VA_ARGS__) 
   55 #define TF2_ROS_MESSAGEFILTER_WARN(fmt, ...) \ 
   56   RCUTILS_LOG_WARN_NAMED("tf2_ros_message_filter", \ 
   57     std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), \ 
   58     getTargetFramesString().c_str(), __VA_ARGS__) 
   63 namespace filter_failure_reasons
 
   88       return "EmptyFrameID";
 
   90       return "Invalid Failure Reason";
 
  102   virtual void clear() = 0;
 
  105   virtual void setTolerance(
const rclcpp::Duration & tolerance) = 0;
 
  124 template<
class M, 
class BufferT = tf2_ros::Buffer>
 
  129   typedef message_filters::MessageEvent<M const> 
MEvent;
 
  147   template<
typename TimeRepT = 
int64_t, 
typename TimeT = std::nano>
 
  149     BufferT & buffer, 
const std::string & target_frame, uint32_t queue_size,
 
  150     const rclcpp::Node::SharedPtr & node,
 
  152   : 
MessageFilter(buffer, target_frame, queue_size, node->get_node_logging_interface(),
 
  153       node->get_node_clock_interface(), buffer_timeout)
 
  156                   "Buffer type must implement tf2::BufferCoreInterface");
 
  158                   "Buffer type must implement tf2_ros::AsyncBufferInterface");
 
  171   template<
typename TimeRepT = 
int64_t, 
typename TimeT = std::nano>
 
  173     BufferT & buffer, 
const std::string & target_frame, uint32_t queue_size,
 
  174     const rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr & node_logging,
 
  175     const rclcpp::node_interfaces::NodeClockInterface::SharedPtr & node_clock,
 
  177   : node_logging_(node_logging),
 
  178     node_clock_(node_clock),
 
  180     queue_size_(queue_size),
 
  181     buffer_timeout_(buffer_timeout)
 
  197   template<
class F, 
typename TimeRepT = 
int64_t, 
typename TimeT = std::nano>
 
  199     F & f, BufferT & buffer, 
const std::string & target_frame, uint32_t queue_size,
 
  200     const rclcpp::Node::SharedPtr & node,
 
  202   : 
MessageFilter(f, buffer, target_frame, queue_size, node->get_node_logging_interface(),
 
  203       node->get_node_clock_interface(), buffer_timeout)
 
  218   template<
class F, 
typename TimeRepT = 
int64_t, 
typename TimeT = std::nano>
 
  220     F & f, BufferT & buffer, 
const std::string & target_frame, uint32_t queue_size,
 
  221     const rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr & node_logging,
 
  222     const rclcpp::node_interfaces::NodeClockInterface::SharedPtr & node_clock,
 
  224   : node_logging_(node_logging),
 
  225     node_clock_(node_clock),
 
  227     queue_size_(queue_size),
 
  228     buffer_timeout_(buffer_timeout)
 
  241     message_connection_.disconnect();
 
  242     message_connection_ = f.registerCallback(&MessageFilter::incomingMessage, 
this);
 
  250     message_connection_.disconnect();
 
  254       "Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
 
  255       static_cast<long long unsigned int>(successful_transform_count_),
 
  256       static_cast<long long unsigned int>(failed_out_the_back_count_),
 
  257       static_cast<long long unsigned int>(transform_message_count_),
 
  258       static_cast<long long unsigned int>(incoming_message_count_),
 
  259       static_cast<long long unsigned int>(dropped_message_count_));
 
  268     frames.push_back(target_frame);
 
  279     target_frames_.
resize(target_frames.size());
 
  281       target_frames_.
begin(), this->stripSlash);
 
  282     expected_success_count_ = target_frames_.
size() * (time_tolerance_.nanoseconds() ? 2 : 1);
 
  285     for (V_string::iterator it = target_frames_.
begin(); it != target_frames_.
end(); ++it) {
 
  288     target_frames_string_ = ss.
str();
 
  297     return target_frames_string_;
 
  306     time_tolerance_ = tolerance;
 
  307     expected_success_count_ = target_frames_.
size() * (time_tolerance_.nanoseconds() ? 2 : 1);
 
  322     warned_about_empty_frame_id_ = 
false;
 
  327     if (target_frames_.
empty()) {
 
  331     namespace mt = message_filters::message_traits;
 
  332     const MConstPtr & message = evt.getMessage();
 
  333     std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
 
  334     rclcpp::Time stamp = mt::TimeStamp<M>::value(*message);
 
  336     if (frame_id.
empty()) {
 
  343     info.handles.reserve(expected_success_count_);
 
  349         target_frames_copy = target_frames_;
 
  352       V_string::iterator it = target_frames_copy.
begin();
 
  353       V_string::iterator end = target_frames_copy.end();
 
  354       for (; it != end; ++it) {
 
  356         auto future = buffer_.waitForTransform(
 
  359             tf2::timeFromSec(stamp.seconds()),
 
  361             std::bind(&MessageFilter::transformReadyCallback, 
this, std::placeholders::_1, next_handle_index_));
 
  365           if (status == std::future_status::ready) {
 
  368             ++info.success_count;
 
  371             info.handles.push_back(next_handle_index_++);
 
  380         if (time_tolerance_.nanoseconds()) {
 
  381           future = buffer_.waitForTransform(
 
  384               tf2::timeFromSec((stamp + time_tolerance_).seconds()),
 
  386               std::bind(&MessageFilter::transformReadyCallback, 
this, std::placeholders::_1, next_handle_index_));
 
  389             if (status == std::future_status::ready) {
 
  392               ++info.success_count;
 
  395               info.handles.push_back(next_handle_index_++);
 
  409     if (info.success_count == expected_success_count_) {
 
  417       if (queue_size_ != 0 && message_count_ + 1 > queue_size_) {
 
  419         ++dropped_message_count_;
 
  420         const MessageInfo & front = messages_.
front();
 
  422           "Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)",
 
  424           (mt::FrameId<M>::value(*front.event.getMessage())).c_str(),
 
  425           mt::TimeStamp<M>::value(*front.event.getMessage()).seconds());
 
  440       frame_id.
c_str(), stamp.seconds(), message_count_);
 
  441     ++incoming_message_count_;
 
  451     builtin_interfaces::msg::Time t = node_clock_->get_clock()->now();
 
  460   message_filters::Connection registerFailureCallback(
const FailureCallback & callback)
 
  462     message_connection_failure.disconnect();
 
  463     message_connection_failure = this->registerCallback(callback, 
this);
 
  469     queue_size_ = new_queue_size;
 
  481     successful_transform_count_ = 0;
 
  482     failed_out_the_back_count_ = 0;
 
  483     transform_message_count_ = 0;
 
  484     incoming_message_count_ = 0;
 
  485     dropped_message_count_ = 0;
 
  486     time_tolerance_ = rclcpp::Duration(0, 0);
 
  487     warned_about_empty_frame_id_ = 
false;
 
  488     expected_success_count_ = 1;
 
  493     namespace mt = message_filters::message_traits;
 
  496     typename L_MessageInfo::iterator msg_it = messages_.
begin();
 
  497     typename L_MessageInfo::iterator msg_end = messages_.
end();
 
  505       for (; msg_it != msg_end; ++msg_it) {
 
  506         MessageInfo & info = *msg_it;
 
  507         auto handle_it = 
std::find(info.handles.begin(), info.handles.end(), handle);
 
  508         if (handle_it != info.handles.end()) {
 
  510           ++info.success_count;
 
  511           if (info.success_count >= expected_success_count_) {
 
  512             saved_event = msg_it->event;
 
  513             messages_.
erase(msg_it);
 
  523     if (msg_it == msg_end) {
 
  527     bool can_transform = 
true;
 
  528     const MConstPtr & message = saved_event.getMessage();
 
  529     std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
 
  530     rclcpp::Time stamp = mt::TimeStamp<M>::value(*message);
 
  532     bool transform_available = 
true;
 
  537       transform_available = 
false;
 
  540     if (transform_available) {
 
  543       typename V_string::iterator it = target_frames_.
begin();
 
  544       typename V_string::iterator 
end = target_frames_.
end();
 
  545       for (; it != 
end; ++it) {
 
  547         if (!buffer_.canTransform(target, frame_id, tf2::timeFromSec(stamp.seconds()), NULL)) {
 
  548           can_transform = 
false;
 
  552         if (time_tolerance_.nanoseconds()) {
 
  553           if (!buffer_.canTransform(target, frame_id,
 
  554             tf2::timeFromSec((stamp + time_tolerance_).seconds()), NULL))
 
  556             can_transform = 
false;
 
  562       can_transform = 
false;
 
  567         frame_id.
c_str(), stamp.seconds(), message_count_ - 1);
 
  569       ++successful_transform_count_;
 
  570       messageReady(saved_event);
 
  572       ++dropped_message_count_;
 
  575         frame_id.
c_str(), stamp.seconds(), message_count_ - 1);
 
  583   void incomingMessage(
const message_filters::MessageEvent<M const> & evt)
 
  590     if (!next_failure_warning_.nanoseconds()) {
 
  591       next_failure_warning_ = node_clock_->get_clock()->now() + rclcpp::Duration(15, 0);
 
  594     if (node_clock_->get_clock()->now() >= next_failure_warning_) {
 
  595       if (incoming_message_count_ - message_count_ == 0) {
 
  599       double dropped_pct = 
static_cast<double>(dropped_message_count_) /
 
  600         static_cast<double>(incoming_message_count_ - message_count_);
 
  601       if (dropped_pct > 0.95) {
 
  603           "Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct * 100,
 
  604           "tf2_ros_message_filter");
 
  605         next_failure_warning_ = node_clock_->get_clock()->now() + rclcpp::Duration(60, 0);
 
  607         if (
static_cast<double>(failed_out_the_back_count_) / 
static_cast<double>(dropped_message_count_) > 0.5) {
 
  609             "  The majority of dropped messages were due to messages growing older than the TF cache time.  The last message's timestamp was: %f, and the last frame_id was: %s",
 
  610             last_out_the_back_stamp_.seconds(), last_out_the_back_frame_.
c_str());
 
  618   struct CBQueueCallback : 
public ros::CallbackInterface
 
  628     virtual CallResult call()
 
  632         filter_->signalMessage(event_);
 
  636         filter_->signalFailure(event_, reason_);
 
  654     if (callback_queue_) {
 
  655       ros::CallbackInterfacePtr cb(
new CBQueueCallback(
this, evt, 
false, reason));
 
  656       callback_queue_->addCallback(cb, (uint64_t)
this);
 
  661       signalFailure(evt, reason);
 
  665   void messageReady(
const MEvent & evt)
 
  669     if (callback_queue_) {
 
  671       callback_queue_->addCallback(cb, (uint64_t)
this);
 
  676       this->signalMessage(evt);
 
  682     namespace mt = message_filters::message_traits;
 
  683     const MConstPtr & message = evt.getMessage();
 
  684     std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
 
  685     rclcpp::Time stamp = mt::TimeStamp<M>::value(*message);
 
  687       node_logging_->get_logger(),
 
  688       "Message Filter dropping message: frame '%s' at time %.3f for reason '%s'",
 
  689       frame_id.
c_str(), stamp.seconds(), get_filter_failure_reason_string(reason).c_str());
 
  694     if (!in.
empty() && (in [0] == 
'/')) {
 
  704   const rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging_;
 
  706   const rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock_;
 
  715   uint32_t queue_size_;
 
  717   uint64_t next_handle_index_ = 0;
 
  721     : success_count(0) {}
 
  725     uint64_t success_count;
 
  728   L_MessageInfo messages_;
 
  731   uint64_t message_count_;
 
  734   uint64_t expected_success_count_;
 
  736   bool warned_about_empty_frame_id_;
 
  738   uint64_t successful_transform_count_;
 
  739   uint64_t failed_out_the_back_count_;
 
  740   uint64_t transform_message_count_;
 
  741   uint64_t incoming_message_count_;
 
  742   uint64_t dropped_message_count_;
 
  744   rclcpp::Time last_out_the_back_stamp_;
 
  747   rclcpp::Time next_failure_warning_;
 
  750   rclcpp::Duration time_tolerance_ = rclcpp::Duration(0, 0);
 
  752   message_filters::Connection message_connection_;
 
  753   message_filters::Connection message_connection_failure;
 
  756   tf2::Duration buffer_timeout_;