32 #ifndef TF2_ROS_MESSAGE_FILTER_H
33 #define TF2_ROS_MESSAGE_FILTER_H
41 #include <message_filters/connection.h>
42 #include <message_filters/message_traits.h>
43 #include <message_filters/simple_filter.h>
44 #include <rclcpp/rclcpp.hpp>
45 #include <tf2/buffer_core_interface.h>
50 #define TF2_ROS_MESSAGEFILTER_DEBUG(fmt, ...) \
51 RCUTILS_LOG_DEBUG_NAMED("tf2_ros_message_filter", \
52 std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), \
53 getTargetFramesString().c_str(), __VA_ARGS__)
55 #define TF2_ROS_MESSAGEFILTER_WARN(fmt, ...) \
56 RCUTILS_LOG_WARN_NAMED("tf2_ros_message_filter", \
57 std::string(std::string("MessageFilter [target=%s]: ") + std::string(fmt)).c_str(), \
58 getTargetFramesString().c_str(), __VA_ARGS__)
63 namespace filter_failure_reasons
88 return "EmptyFrameID";
90 return "Invalid Failure Reason";
102 virtual void clear() = 0;
105 virtual void setTolerance(
const rclcpp::Duration & tolerance) = 0;
124 template<
class M,
class BufferT = tf2_ros::Buffer>
129 typedef message_filters::MessageEvent<M const>
MEvent;
147 template<
typename TimeRepT =
int64_t,
typename TimeT = std::nano>
149 BufferT & buffer,
const std::string & target_frame, uint32_t queue_size,
150 const rclcpp::Node::SharedPtr & node,
152 :
MessageFilter(buffer, target_frame, queue_size, node->get_node_logging_interface(),
153 node->get_node_clock_interface(), buffer_timeout)
156 "Buffer type must implement tf2::BufferCoreInterface");
158 "Buffer type must implement tf2_ros::AsyncBufferInterface");
171 template<
typename TimeRepT =
int64_t,
typename TimeT = std::nano>
173 BufferT & buffer,
const std::string & target_frame, uint32_t queue_size,
174 const rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr & node_logging,
175 const rclcpp::node_interfaces::NodeClockInterface::SharedPtr & node_clock,
177 : node_logging_(node_logging),
178 node_clock_(node_clock),
180 queue_size_(queue_size),
181 buffer_timeout_(buffer_timeout)
197 template<
class F,
typename TimeRepT =
int64_t,
typename TimeT = std::nano>
199 F & f, BufferT & buffer,
const std::string & target_frame, uint32_t queue_size,
200 const rclcpp::Node::SharedPtr & node,
202 :
MessageFilter(f, buffer, target_frame, queue_size, node->get_node_logging_interface(),
203 node->get_node_clock_interface(), buffer_timeout)
218 template<
class F,
typename TimeRepT =
int64_t,
typename TimeT = std::nano>
220 F & f, BufferT & buffer,
const std::string & target_frame, uint32_t queue_size,
221 const rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr & node_logging,
222 const rclcpp::node_interfaces::NodeClockInterface::SharedPtr & node_clock,
224 : node_logging_(node_logging),
225 node_clock_(node_clock),
227 queue_size_(queue_size),
228 buffer_timeout_(buffer_timeout)
241 message_connection_.disconnect();
242 message_connection_ = f.registerCallback(&MessageFilter::incomingMessage,
this);
250 message_connection_.disconnect();
254 "Successful Transforms: %llu, Discarded due to age: %llu, Transform messages received: %llu, Messages received: %llu, Total dropped: %llu",
255 static_cast<long long unsigned int>(successful_transform_count_),
256 static_cast<long long unsigned int>(failed_out_the_back_count_),
257 static_cast<long long unsigned int>(transform_message_count_),
258 static_cast<long long unsigned int>(incoming_message_count_),
259 static_cast<long long unsigned int>(dropped_message_count_));
268 frames.push_back(target_frame);
279 target_frames_.
resize(target_frames.size());
281 target_frames_.
begin(), this->stripSlash);
282 expected_success_count_ = target_frames_.
size() * (time_tolerance_.nanoseconds() ? 2 : 1);
285 for (V_string::iterator it = target_frames_.
begin(); it != target_frames_.
end(); ++it) {
288 target_frames_string_ = ss.
str();
297 return target_frames_string_;
306 time_tolerance_ = tolerance;
307 expected_success_count_ = target_frames_.
size() * (time_tolerance_.nanoseconds() ? 2 : 1);
322 warned_about_empty_frame_id_ =
false;
327 if (target_frames_.
empty()) {
331 namespace mt = message_filters::message_traits;
332 const MConstPtr & message = evt.getMessage();
333 std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
334 rclcpp::Time stamp = mt::TimeStamp<M>::value(*message);
336 if (frame_id.
empty()) {
343 info.handles.reserve(expected_success_count_);
349 target_frames_copy = target_frames_;
352 V_string::iterator it = target_frames_copy.
begin();
353 V_string::iterator end = target_frames_copy.end();
354 for (; it != end; ++it) {
356 auto future = buffer_.waitForTransform(
359 tf2::timeFromSec(stamp.seconds()),
361 std::bind(&MessageFilter::transformReadyCallback,
this, std::placeholders::_1, next_handle_index_));
365 if (status == std::future_status::ready) {
368 ++info.success_count;
371 info.handles.push_back(next_handle_index_++);
380 if (time_tolerance_.nanoseconds()) {
381 future = buffer_.waitForTransform(
384 tf2::timeFromSec((stamp + time_tolerance_).seconds()),
386 std::bind(&MessageFilter::transformReadyCallback,
this, std::placeholders::_1, next_handle_index_));
389 if (status == std::future_status::ready) {
392 ++info.success_count;
395 info.handles.push_back(next_handle_index_++);
409 if (info.success_count == expected_success_count_) {
417 if (queue_size_ != 0 && message_count_ + 1 > queue_size_) {
419 ++dropped_message_count_;
420 const MessageInfo & front = messages_.
front();
422 "Removed oldest message because buffer is full, count now %d (frame_id=%s, stamp=%f)",
424 (mt::FrameId<M>::value(*front.event.getMessage())).c_str(),
425 mt::TimeStamp<M>::value(*front.event.getMessage()).seconds());
440 frame_id.
c_str(), stamp.seconds(), message_count_);
441 ++incoming_message_count_;
451 builtin_interfaces::msg::Time t = node_clock_->get_clock()->now();
460 message_filters::Connection registerFailureCallback(
const FailureCallback & callback)
462 message_connection_failure.disconnect();
463 message_connection_failure = this->registerCallback(callback,
this);
469 queue_size_ = new_queue_size;
481 successful_transform_count_ = 0;
482 failed_out_the_back_count_ = 0;
483 transform_message_count_ = 0;
484 incoming_message_count_ = 0;
485 dropped_message_count_ = 0;
486 time_tolerance_ = rclcpp::Duration(0, 0);
487 warned_about_empty_frame_id_ =
false;
488 expected_success_count_ = 1;
493 namespace mt = message_filters::message_traits;
496 typename L_MessageInfo::iterator msg_it = messages_.
begin();
497 typename L_MessageInfo::iterator msg_end = messages_.
end();
505 for (; msg_it != msg_end; ++msg_it) {
506 MessageInfo & info = *msg_it;
507 auto handle_it =
std::find(info.handles.begin(), info.handles.end(), handle);
508 if (handle_it != info.handles.end()) {
510 ++info.success_count;
511 if (info.success_count >= expected_success_count_) {
512 saved_event = msg_it->event;
513 messages_.
erase(msg_it);
523 if (msg_it == msg_end) {
527 bool can_transform =
true;
528 const MConstPtr & message = saved_event.getMessage();
529 std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
530 rclcpp::Time stamp = mt::TimeStamp<M>::value(*message);
532 bool transform_available =
true;
537 transform_available =
false;
540 if (transform_available) {
543 typename V_string::iterator it = target_frames_.
begin();
544 typename V_string::iterator
end = target_frames_.
end();
545 for (; it !=
end; ++it) {
547 if (!buffer_.canTransform(target, frame_id, tf2::timeFromSec(stamp.seconds()), NULL)) {
548 can_transform =
false;
552 if (time_tolerance_.nanoseconds()) {
553 if (!buffer_.canTransform(target, frame_id,
554 tf2::timeFromSec((stamp + time_tolerance_).seconds()), NULL))
556 can_transform =
false;
562 can_transform =
false;
567 frame_id.
c_str(), stamp.seconds(), message_count_ - 1);
569 ++successful_transform_count_;
570 messageReady(saved_event);
572 ++dropped_message_count_;
575 frame_id.
c_str(), stamp.seconds(), message_count_ - 1);
583 void incomingMessage(
const message_filters::MessageEvent<M const> & evt)
590 if (!next_failure_warning_.nanoseconds()) {
591 next_failure_warning_ = node_clock_->get_clock()->now() + rclcpp::Duration(15, 0);
594 if (node_clock_->get_clock()->now() >= next_failure_warning_) {
595 if (incoming_message_count_ - message_count_ == 0) {
599 double dropped_pct =
static_cast<double>(dropped_message_count_) /
600 static_cast<double>(incoming_message_count_ - message_count_);
601 if (dropped_pct > 0.95) {
603 "Dropped %.2f%% of messages so far. Please turn the [%s.message_notifier] rosconsole logger to DEBUG for more information.", dropped_pct * 100,
604 "tf2_ros_message_filter");
605 next_failure_warning_ = node_clock_->get_clock()->now() + rclcpp::Duration(60, 0);
607 if (
static_cast<double>(failed_out_the_back_count_) /
static_cast<double>(dropped_message_count_) > 0.5) {
609 " The majority of dropped messages were due to messages growing older than the TF cache time. The last message's timestamp was: %f, and the last frame_id was: %s",
610 last_out_the_back_stamp_.seconds(), last_out_the_back_frame_.
c_str());
618 struct CBQueueCallback :
public ros::CallbackInterface
628 virtual CallResult call()
632 filter_->signalMessage(event_);
636 filter_->signalFailure(event_, reason_);
654 if (callback_queue_) {
655 ros::CallbackInterfacePtr cb(
new CBQueueCallback(
this, evt,
false, reason));
656 callback_queue_->addCallback(cb, (uint64_t)
this);
661 signalFailure(evt, reason);
665 void messageReady(
const MEvent & evt)
669 if (callback_queue_) {
671 callback_queue_->addCallback(cb, (uint64_t)
this);
676 this->signalMessage(evt);
682 namespace mt = message_filters::message_traits;
683 const MConstPtr & message = evt.getMessage();
684 std::string frame_id = stripSlash(mt::FrameId<M>::value(*message));
685 rclcpp::Time stamp = mt::TimeStamp<M>::value(*message);
687 node_logging_->get_logger(),
688 "Message Filter dropping message: frame '%s' at time %.3f for reason '%s'",
689 frame_id.
c_str(), stamp.seconds(), get_filter_failure_reason_string(reason).c_str());
694 if (!in.
empty() && (in [0] ==
'/')) {
704 const rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging_;
706 const rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock_;
715 uint32_t queue_size_;
717 uint64_t next_handle_index_ = 0;
721 : success_count(0) {}
725 uint64_t success_count;
728 L_MessageInfo messages_;
731 uint64_t message_count_;
734 uint64_t expected_success_count_;
736 bool warned_about_empty_frame_id_;
738 uint64_t successful_transform_count_;
739 uint64_t failed_out_the_back_count_;
740 uint64_t transform_message_count_;
741 uint64_t incoming_message_count_;
742 uint64_t dropped_message_count_;
744 rclcpp::Time last_out_the_back_stamp_;
747 rclcpp::Time next_failure_warning_;
750 rclcpp::Duration time_tolerance_ = rclcpp::Duration(0, 0);
752 message_filters::Connection message_connection_;
753 message_filters::Connection message_connection_failure;
756 tf2::Duration buffer_timeout_;