32 #ifndef TF2_ROS__MESSAGE_FILTER_H_
33 #define TF2_ROS__MESSAGE_FILTER_H_
35 #include <message_filters/connection.h>
36 #include <message_filters/message_traits.h>
37 #include <message_filters/simple_filter.h>
38 #include <tf2/buffer_core_interface.h>
43 #include <builtin_interfaces/msg/time.hpp>
44 #include <rclcpp/rclcpp.hpp>
56 #include <type_traits>
59 #define TF2_ROS_MESSAGEFILTER_DEBUG(fmt, ...) \
60 RCUTILS_LOG_DEBUG_NAMED( \
61 "tf2_ros_message_filter", \
62 std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), \
63 getTargetFramesString().c_str(), __VA_ARGS__)
65 #define TF2_ROS_MESSAGEFILTER_WARN(fmt, ...) \
66 RCUTILS_LOG_WARN_NAMED( \
67 "tf2_ros_message_filter", \
68 std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), \
69 getTargetFramesString().c_str(), __VA_ARGS__)
74 namespace filter_failure_reasons
97 static std::string get_filter_failure_reason_string(
103 "the timestamp on the message is earlier than all the data in the transform cache";
105 return "the frame id of the message is empty";
107 return "did not find a valid transform, this usually happens at startup ...";
109 return "discarding message because the queue is full";
124 virtual void clear() = 0;
127 virtual void setTolerance(
const rclcpp::Duration & tolerance) = 0;
146 template<
class M,
class BufferT = tf2_ros::Buffer>
151 typedef message_filters::MessageEvent<M const>
MEvent;
162 template<
typename TimeRepT =
int64_t,
typename TimeT = std::nano>
164 BufferT & buffer,
const std::string & target_frame, uint32_t queue_size,
165 const rclcpp::Node::SharedPtr & node,
168 :
MessageFilter(buffer, target_frame, queue_size, node->get_node_logging_interface(),
169 node->get_node_clock_interface(), buffer_timeout)
173 "Buffer type must implement tf2::BufferCoreInterface");
176 "Buffer type must implement tf2_ros::AsyncBufferInterface");
189 template<
typename TimeRepT =
int64_t,
typename TimeT = std::nano>
191 BufferT & buffer,
const std::string & target_frame, uint32_t queue_size,
192 const rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr & node_logging,
193 const rclcpp::node_interfaces::NodeClockInterface::SharedPtr & node_clock,
196 : node_logging_(node_logging),
197 node_clock_(node_clock),
199 queue_size_(queue_size),
200 buffer_timeout_(buffer_timeout)
216 template<
class F,
typename TimeRepT =
int64_t,
typename TimeT = std::nano>
218 F & f, BufferT & buffer,
const std::string & target_frame, uint32_t queue_size,
219 const rclcpp::Node::SharedPtr & node,
222 :
MessageFilter(f, buffer, target_frame, queue_size, node->get_node_logging_interface(),
223 node->get_node_clock_interface(), buffer_timeout)
238 template<
class F,
typename TimeRepT =
int64_t,
typename TimeT = std::nano>
240 F & f, BufferT & buffer,
const std::string & target_frame, uint32_t queue_size,
241 const rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr & node_logging,
242 const rclcpp::node_interfaces::NodeClockInterface::SharedPtr & node_clock,
245 : node_logging_(node_logging),
246 node_clock_(node_clock),
248 queue_size_(queue_size),
249 buffer_timeout_(buffer_timeout)
262 message_connection_.disconnect();
263 message_connection_ = f.registerCallback(&MessageFilter::incomingMessage,
this);
271 message_connection_.disconnect();
275 "Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, "
276 "Messages received: %llu, Total dropped: %llu",
277 static_cast<uint64_t
>(successful_transform_count_),
278 static_cast<uint64_t
>(failed_out_the_back_count_),
279 static_cast<uint64_t
>(transform_message_count_),
280 static_cast<uint64_t
>(incoming_message_count_),
281 static_cast<uint64_t
>(dropped_message_count_));
290 frames.push_back(target_frame);
301 target_frames_.
resize(target_frames.size());
303 target_frames.begin(), target_frames.end(),
304 target_frames_.
begin(), this->stripSlash);
305 expected_success_count_ = target_frames_.
size() * (time_tolerance_.nanoseconds() ? 2 : 1);
308 for (V_string::iterator it = target_frames_.
begin(); it != target_frames_.
end(); ++it) {
311 target_frames_string_ = ss.
str();
320 return target_frames_string_;
329 time_tolerance_ = tolerance;
330 expected_success_count_ = target_frames_.
size() * (time_tolerance_.nanoseconds() ? 2 : 1);
344 warned_about_empty_frame_id_ =
false;
349 if (target_frames_.
empty()) {
353 namespace mt = message_filters::message_traits;
354 const MConstPtr & message = evt.getMessage();
355 std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
356 rclcpp::Time stamp = mt::TimeStamp<M>::value(*message);
358 if (frame_id.
empty()) {
366 info.handles.
reserve(expected_success_count_);
372 target_frames_copy = target_frames_;
375 V_string::iterator it = target_frames_copy.
begin();
376 V_string::iterator end = target_frames_copy.end();
377 for (; it != end; ++it) {
380 next_handle_index_, tf2::timeFromSec(stamp.seconds()), target_frame);
381 info.handles.push_back(next_handle_index_++);
383 if (time_tolerance_.nanoseconds()) {
386 tf2::timeFromSec((stamp + time_tolerance_).seconds()),
388 info.handles.push_back(next_handle_index_++);
398 if (queue_size_ != 0 && messages_.
size() + 1 > queue_size_) {
399 ++dropped_message_count_;
400 const MessageInfo & front = messages_.
front();
402 "Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)",
404 (mt::FrameId<M>::value(*front.event.getMessage())).c_str(),
405 mt::TimeStamp<M>::value(*front.event.getMessage()).seconds());
418 "Added message in frame %s at time %.3f, count now %d",
419 frame_id.
c_str(), stamp.seconds(), messages_.
size());
420 ++incoming_message_count_;
422 for (
const auto & param : wait_params) {
423 const auto & handle = std::get<0>(param);
424 const auto & stamp = std::get<1>(param);
425 const auto & target_frame = std::get<2>(param);
426 buffer_.waitForTransform(
431 std::bind(&MessageFilter::transformReadyCallback,
this, std::placeholders::_1, handle));
442 builtin_interfaces::msg::Time t = node_clock_->get_clock()->now();
451 message_filters::Connection registerFailureCallback(
const FailureCallback & callback)
453 message_connection_failure.disconnect();
454 message_connection_failure = this->registerCallback(callback,
this);
460 queue_size_ = new_queue_size;
471 successful_transform_count_ = 0;
472 failed_out_the_back_count_ = 0;
473 transform_message_count_ = 0;
474 incoming_message_count_ = 0;
475 dropped_message_count_ = 0;
476 time_tolerance_ = rclcpp::Duration(0, 0);
477 warned_about_empty_frame_id_ =
false;
478 expected_success_count_ = 1;
483 namespace mt = message_filters::message_traits;
486 bool event_found =
false;
493 typename L_MessageInfo::iterator msg_it = messages_.
begin();
494 typename L_MessageInfo::iterator msg_end = messages_.
end();
496 for (; msg_it != msg_end; ++msg_it) {
497 MessageInfo & info = *msg_it;
498 auto handle_it =
std::find(info.handles.begin(), info.handles.end(), handle);
499 if (handle_it != info.handles.end()) {
501 ++info.success_count;
502 if (info.success_count >= expected_success_count_) {
503 saved_event = msg_it->event;
504 messages_.
erase(msg_it);
516 bool can_transform =
true;
517 const MConstPtr & message = saved_event.getMessage();
518 std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
519 rclcpp::Time stamp = mt::TimeStamp<M>::value(*message);
521 bool transform_available =
true;
526 transform_available =
false;
530 if (transform_available) {
533 typename V_string::iterator it = target_frames_.
begin();
534 typename V_string::iterator
end = target_frames_.
end();
535 for (; it !=
end; ++it) {
537 if (!buffer_.canTransform(target, frame_id, tf2::timeFromSec(stamp.seconds()), NULL)) {
538 can_transform =
false;
542 if (time_tolerance_.nanoseconds()) {
543 if (!buffer_.canTransform(
545 tf2::timeFromSec((stamp + time_tolerance_).seconds()), NULL))
547 can_transform =
false;
553 can_transform =
false;
558 "Message ready in frame %s at time %.3f, count now %d",
559 frame_id.
c_str(), stamp.seconds(), messages_.
size());
561 ++successful_transform_count_;
562 messageReady(saved_event);
564 ++dropped_message_count_;
567 "Discarding message in frame %s at time %.3f, count now %d",
568 frame_id.
c_str(), stamp.seconds(), messages_.
size());
569 messageDropped(saved_event, error);
576 void incomingMessage(
const message_filters::MessageEvent<M const> & evt)
583 if (!next_failure_warning_.nanoseconds()) {
584 next_failure_warning_ = node_clock_->get_clock()->now() + rclcpp::Duration(15, 0);
587 if (node_clock_->get_clock()->now() >= next_failure_warning_) {
588 if (incoming_message_count_ - messages_.
size() == 0) {
592 double dropped_pct =
static_cast<double>(dropped_message_count_) /
593 static_cast<double>(incoming_message_count_ - messages_.
size());
594 if (dropped_pct > 0.95) {
596 "Dropped %.2f%% of messages so far. Please turn the "
597 "[tf2_ros_message_filter.message_notifier] rosconsole logger to DEBUG for more "
600 next_failure_warning_ = node_clock_->get_clock()->now() + rclcpp::Duration(60, 0);
602 if (
static_cast<double>(failed_out_the_back_count_) /
603 static_cast<double>(dropped_message_count_) > 0.5)
606 " The majority of dropped messages were due to messages growing older than the TF "
607 "cache time. The last message's timestamp was: %f, and the last frame_id was: %s",
608 last_out_the_back_stamp_.seconds(), last_out_the_back_frame_.
c_str());
616 struct CBQueueCallback :
public ros::CallbackInterface
627 virtual CallResult call()
630 filter_->signalMessage(event_);
632 filter_->signalFailure(event_, reason_);
650 if (callback_queue_) {
651 ros::CallbackInterfacePtr cb(
new CBQueueCallback(
this, evt,
false, reason));
652 callback_queue_->addCallback(cb, (uint64_t)
this);
656 signalFailure(evt, reason);
660 void messageReady(
const MEvent & evt)
664 if (callback_queue_) {
665 ros::CallbackInterfacePtr cb(
new CBQueueCallback(
667 callback_queue_->addCallback(cb, (uint64_t)
this);
671 this->signalMessage(evt);
677 namespace mt = message_filters::message_traits;
678 const MConstPtr & message = evt.getMessage();
679 std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
680 rclcpp::Time stamp = mt::TimeStamp<M>::value(*message);
682 node_logging_->get_logger(),
683 "Message Filter dropping message: frame '%s' at time %.3f for reason '%s'",
684 frame_id.
c_str(), stamp.seconds(), get_filter_failure_reason_string(reason).c_str());
689 if (!in.
empty() && (in[0] ==
'/')) {
699 const rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging_;
701 const rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock_;
710 uint32_t queue_size_;
712 uint64_t next_handle_index_ = 0;
716 : success_count(0) {}
720 uint64_t success_count;
723 L_MessageInfo messages_;
727 uint64_t expected_success_count_;
729 bool warned_about_empty_frame_id_;
731 uint64_t successful_transform_count_;
732 uint64_t failed_out_the_back_count_;
733 uint64_t transform_message_count_;
734 uint64_t incoming_message_count_;
735 uint64_t dropped_message_count_;
737 rclcpp::Time last_out_the_back_stamp_;
740 rclcpp::Time next_failure_warning_;
744 rclcpp::Duration time_tolerance_ = rclcpp::Duration(0, 0);
746 message_filters::Connection message_connection_;
747 message_filters::Connection message_connection_failure;
750 tf2::Duration buffer_timeout_;
754 #endif // TF2_ROS__MESSAGE_FILTER_H_