rclcpp  master
C++ ROS Client Library API
publisher.hpp
Go to the documentation of this file.
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__PUBLISHER_HPP_
16 #define RCLCPP__PUBLISHER_HPP_
17 
18 #include <rmw/error_handling.h>
19 #include <rmw/rmw.h>
20 
21 #include <functional>
22 #include <iostream>
23 #include <memory>
24 #include <sstream>
25 #include <string>
26 #include <utility>
27 
28 #include "rcl/error_handling.h"
29 #include "rcl/publisher.h"
30 
36 #include "rclcpp/macros.hpp"
42 
43 #include "tracetools/tracetools.h"
44 
45 namespace rclcpp
46 {
47 
48 template<typename MessageT, typename AllocatorT>
49 class LoanedMessage;
50 
52 template<typename MessageT, typename AllocatorT = std::allocator<void>>
53 class Publisher : public PublisherBase
54 {
55 public:
57  using MessageAllocator = typename MessageAllocatorTraits::allocator_type;
61 
63 
64 
65 
76  rclcpp::node_interfaces::NodeBaseInterface * node_base,
77  const std::string & topic,
78  const rclcpp::QoS & qos,
79  const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
80  : PublisherBase(
81  node_base,
82  topic,
83  *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
84  options.template to_rcl_publisher_options<MessageT>(qos)),
85  options_(options),
87  {
89 
90  if (options_.event_callbacks.deadline_callback) {
91  this->add_event_handler(
92  options_.event_callbacks.deadline_callback,
93  RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
94  }
95  if (options_.event_callbacks.liveliness_callback) {
96  this->add_event_handler(
97  options_.event_callbacks.liveliness_callback,
98  RCL_PUBLISHER_LIVELINESS_LOST);
99  }
100  if (options_.event_callbacks.incompatible_qos_callback) {
101  this->add_event_handler(
102  options_.event_callbacks.incompatible_qos_callback,
103  RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
104  } else if (options_.use_default_callbacks) {
105  // Register default callback when not specified
106  try {
107  this->add_event_handler(
108  [this](QOSOfferedIncompatibleQoSInfo & info) {
110  },
111  RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
112  } catch (UnsupportedEventTypeException & /*exc*/) {
113  // pass
114  }
115  }
116  // Setup continues in the post construction method, post_init_setup().
117  }
118 
120  virtual
121  void
124  const std::string & topic,
125  const rclcpp::QoS & qos,
127  {
128  // Topic is unused for now.
129  (void)topic;
130  (void)options;
131 
132  // If needed, setup intra process communication.
134  auto context = node_base->get_context();
135  // Get the intra process manager instance for this context.
136  auto ipm = context->get_sub_context<rclcpp::experimental::IntraProcessManager>();
137  // Register the publisher with the intra process manager.
139  throw std::invalid_argument(
140  "intraprocess communication is not allowed with keep all history qos policy");
141  }
142  if (qos.get_rmw_qos_profile().depth == 0) {
143  throw std::invalid_argument(
144  "intraprocess communication is not allowed with a zero qos history depth value");
145  }
147  throw std::invalid_argument(
148  "intraprocess communication allowed only with volatile durability");
149  }
150  uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this());
151  this->setup_intra_process(
152  intra_process_publisher_id,
153  ipm);
154  }
155  }
156 
157  virtual ~Publisher()
158  {}
159 
161 
177  {
179  }
180 
182 
186  virtual void
188  {
190  this->do_inter_process_publish(*msg);
191  return;
192  }
193  // If an interprocess subscription exist, then the unique_ptr is promoted
194  // to a shared_ptr and published.
195  // This allows doing the intraprocess publish first and then doing the
196  // interprocess publish, resulting in lower publish-to-subscribe latency.
197  // It's not possible to do that with an unique_ptr,
198  // as do_intra_process_publish takes the ownership of the message.
199  bool inter_process_publish_needed =
201 
202  if (inter_process_publish_needed) {
203  auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg));
204  this->do_inter_process_publish(*shared_msg);
205  } else {
207  }
208  }
209 
210  virtual void
211  publish(const MessageT & msg)
212  {
213  // Avoid allocating when not using intra process.
215  // In this case we're not using intra process.
216  return this->do_inter_process_publish(msg);
217  }
218  // Otherwise we have to allocate memory in a unique_ptr and pass it along.
219  // As the message is not const, a copy should be made.
220  // A shared_ptr<const MessageT> could also be constructed here.
221  auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1);
222  MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg);
223  MessageUniquePtr unique_msg(ptr, message_deleter_);
224  this->publish(std::move(unique_msg));
225  }
226 
227  void
228  publish(const rcl_serialized_message_t & serialized_msg)
229  {
230  return this->do_serialized_publish(&serialized_msg);
231  }
232 
233  void
234  publish(const SerializedMessage & serialized_msg)
235  {
236  return this->do_serialized_publish(&serialized_msg.get_rcl_serialized_message());
237  }
238 
240 
247  void
249  {
250  if (!loaned_msg.is_valid()) {
251  throw std::runtime_error("loaned message is not valid");
252  }
254  // TODO(Karsten1987): support loaned message passed by intraprocess
255  throw std::runtime_error("storing loaned messages in intra process is not supported yet");
256  }
257 
258  // verify that publisher supports loaned messages
259  // TODO(Karsten1987): This case separation has to be done in rclcpp
260  // otherwise we have to ensure that every middleware implements
261  // `rmw_publish_loaned_message` explicitly the same way as `rmw_publish`
262  // by taking a copy of the ros message.
263  if (this->can_loan_messages()) {
264  // we release the ownership from the rclpp::LoanedMessage instance
265  // and let the middleware clean up the memory.
266  this->do_loaned_message_publish(std::move(loaned_msg.release()));
267  } else {
268  // we don't release the ownership, let the middleware copy the ros message
269  // and thus the destructor of rclcpp::LoanedMessage cleans up the memory.
270  this->do_inter_process_publish(loaned_msg.get());
271  }
272  }
273 
276  {
277  return message_allocator_;
278  }
279 
280 protected:
281  void
282  do_inter_process_publish(const MessageT & msg)
283  {
284  TRACEPOINT(
285  rclcpp_publish,
286  static_cast<const void *>(publisher_handle_.get()),
287  static_cast<const void *>(&msg));
288  auto status = rcl_publish(publisher_handle_.get(), &msg, nullptr);
289 
290  if (RCL_RET_PUBLISHER_INVALID == status) {
291  rcl_reset_error(); // next call will reset error message if not context
294  if (nullptr != context && !rcl_context_is_valid(context)) {
295  // publisher is invalid due to context being shutdown
296  return;
297  }
298  }
299  }
300  if (RCL_RET_OK != status) {
301  rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
302  }
303  }
304 
305  void
307  {
309  // TODO(Karsten1987): support serialized message passed by intraprocess
310  throw std::runtime_error("storing serialized messages in intra process is not supported yet");
311  }
312  auto status = rcl_publish_serialized_message(publisher_handle_.get(), serialized_msg, nullptr);
313  if (RCL_RET_OK != status) {
314  rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
315  }
316  }
317 
318  void
319  do_loaned_message_publish(std::unique_ptr<MessageT, std::function<void(MessageT *)>> msg)
320  {
321  auto status = rcl_publish_loaned_message(publisher_handle_.get(), msg.get(), nullptr);
322 
323  if (RCL_RET_PUBLISHER_INVALID == status) {
324  rcl_reset_error(); // next call will reset error message if not context
327  if (nullptr != context && !rcl_context_is_valid(context)) {
328  // publisher is invalid due to context being shutdown
329  return;
330  }
331  }
332  }
333  if (RCL_RET_OK != status) {
334  rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
335  }
336  }
337 
338  void
340  {
341  auto ipm = weak_ipm_.lock();
342  if (!ipm) {
343  throw std::runtime_error(
344  "intra process publish called after destruction of intra process manager");
345  }
346  if (!msg) {
347  throw std::runtime_error("cannot publish msg which is a null pointer");
348  }
349 
350  ipm->template do_intra_process_publish<MessageT, AllocatorT>(
352  std::move(msg),
354  }
355 
358  {
359  auto ipm = weak_ipm_.lock();
360  if (!ipm) {
361  throw std::runtime_error(
362  "intra process publish called after destruction of intra process manager");
363  }
364  if (!msg) {
365  throw std::runtime_error("cannot publish msg which is a null pointer");
366  }
367 
368  return ipm->template do_intra_process_publish_and_return_shared<MessageT, AllocatorT>(
370  std::move(msg),
372  }
373 
375 
380 
382 
384 };
385 
386 } // namespace rclcpp
387 
388 #endif // RCLCPP__PUBLISHER_HPP_
rmw_serialized_message_t
rclcpp::PublisherBase::weak_ipm_
IntraProcessManagerWeakPtr weak_ipm_
Definition: publisher_base.hpp:234
rcl_publisher_get_context
rcl_context_t * rcl_publisher_get_context(const rcl_publisher_t *publisher)
RMW_QOS_POLICY_HISTORY_KEEP_ALL
RMW_QOS_POLICY_HISTORY_KEEP_ALL
rclcpp::Publisher::do_intra_process_publish_and_return_shared
std::shared_ptr< const MessageT > do_intra_process_publish_and_return_shared(std::unique_ptr< MessageT, MessageDeleter > msg)
Definition: publisher.hpp:357
RCL_RET_PUBLISHER_INVALID
#define RCL_RET_PUBLISHER_INVALID
std::weak_ptr::lock
T lock(T... args)
rcl_publisher_is_valid_except_context
bool rcl_publisher_is_valid_except_context(const rcl_publisher_t *publisher)
rclcpp::Publisher::options_
const rclcpp::PublisherOptionsWithAllocator< AllocatorT > options_
Copy of original options passed during construction.
Definition: publisher.hpp:379
std::string
rclcpp::Publisher
A publisher publishes messages of any type to a topic.
Definition: publisher.hpp:53
std::shared_ptr
std::move
T move(T... args)
rmw.h
rclcpp::node_interfaces::NodeBaseInterface::get_context
virtual rclcpp::Context::SharedPtr get_context()=0
Return the context of the node.
rclcpp::Publisher::do_intra_process_publish
void do_intra_process_publish(std::unique_ptr< MessageT, MessageDeleter > msg)
Definition: publisher.hpp:339
rclcpp::PublisherBase::default_incompatible_qos_callback
void default_incompatible_qos_callback(QOSOfferedIncompatibleQoSInfo &info) const
rclcpp::SerializedMessage::get_rcl_serialized_message
rcl_serialized_message_t & get_rcl_serialized_message()
Get the underlying rcl_serialized_t handle.
rclcpp::QoS::get_rmw_qos_profile
rmw_qos_profile_t & get_rmw_qos_profile()
Return the rmw qos profile.
rclcpp::Publisher::do_inter_process_publish
void do_inter_process_publish(const MessageT &msg)
Definition: publisher.hpp:282
rclcpp::Publisher::get_allocator
std::shared_ptr< MessageAllocator > get_allocator() const
Definition: publisher.hpp:275
error_handling.h
rclcpp::Publisher::borrow_loaned_message
rclcpp::LoanedMessage< MessageT, AllocatorT > borrow_loaned_message()
Borrow a loaned ROS message from the middleware.
Definition: publisher.hpp:176
std::shared_ptr::get
T get(T... args)
loaned_message.hpp
rclcpp::LoanedMessage
Definition: loaned_message.hpp:32
rmw_qos_profile_t::durability
enum rmw_qos_durability_policy_t durability
rclcpp::Publisher::publish
virtual void publish(const MessageT &msg)
Definition: publisher.hpp:211
rclcpp::Publisher::do_serialized_publish
void do_serialized_publish(const rcl_serialized_message_t *serialized_msg)
Definition: publisher.hpp:306
rclcpp::PublisherBase::publisher_handle_
std::shared_ptr< rcl_publisher_t > publisher_handle_
Definition: publisher_base.hpp:227
std::function
publisher.h
rclcpp
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
rclcpp::Publisher::message_allocator_
std::shared_ptr< MessageAllocator > message_allocator_
Definition: publisher.hpp:381
rclcpp::Publisher< statistics_msgs::msg::MetricsMessage >::MessageAllocator
typename MessageAllocatorTraits::allocator_type MessageAllocator
Definition: publisher.hpp:57
rclcpp::node_interfaces::NodeBaseInterface
Pure virtual interface class for the NodeBase part of the Node API.
Definition: node_base_interface.hpp:36
rcl_publish_serialized_message
rcl_ret_t rcl_publish_serialized_message(const rcl_publisher_t *publisher, const rcl_serialized_message_t *serialized_message, rmw_publisher_allocation_t *allocation)
rclcpp::QoS
Encapsulation of Quality of Service settings.
Definition: qos.hpp:110
allocator_deleter.hpp
RCLCPP_SMART_PTR_DEFINITIONS
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
rclcpp::PublisherBase::setup_intra_process
void setup_intra_process(uint64_t intra_process_publisher_id, IntraProcessManagerSharedPtr ipm)
Implementation utility function used to setup intra process publishing after creation.
macros.hpp
std::enable_shared_from_this< PublisherBase >::shared_from_this
T shared_from_this(T... args)
rclcpp::allocator::set_allocator_for_deleter
void set_allocator_for_deleter(D *deleter, Alloc *alloc)
Definition: allocator_deleter.hpp:72
allocator_common.hpp
resolve_use_intra_process.hpp
publisher_options.hpp
rclcpp::Publisher::publish
void publish(rclcpp::LoanedMessage< MessageT, AllocatorT > &&loaned_msg)
Publish an instance of a LoanedMessage.
Definition: publisher.hpp:248
node_base_interface.hpp
rclcpp::allocator::AllocRebind
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
rclcpp::Publisher< statistics_msgs::msg::MetricsMessage >::MessageAllocatorTraits
allocator::AllocRebind< statistics_msgs::msg::MetricsMessage, std::allocator< void > > MessageAllocatorTraits
Definition: publisher.hpp:56
rclcpp::PublisherBase
Definition: publisher_base.hpp:56
rcl_publish
rcl_ret_t rcl_publish(const rcl_publisher_t *publisher, const void *ros_message, rmw_publisher_allocation_t *allocation)
std::runtime_error
rclcpp::UnsupportedEventTypeException
Definition: qos_event.hpp:69
rmw_qos_profile_t::history
enum rmw_qos_history_policy_t history
std::invalid_argument
rclcpp::PublisherBase::add_event_handler
void add_event_handler(const EventCallbackT &callback, const rcl_publisher_event_type_t event_type)
Definition: publisher_base.hpp:209
publisher_base.hpp
intra_process_manager.hpp
rclcpp::Publisher::message_deleter_
MessageDeleter message_deleter_
Definition: publisher.hpp:383
rclcpp::PublisherBase::intra_process_is_enabled_
bool intra_process_is_enabled_
Definition: publisher_base.hpp:233
rclcpp::Publisher::publish
void publish(const rcl_serialized_message_t &serialized_msg)
Definition: publisher.hpp:228
rclcpp::PublisherOptionsWithAllocator
Structure containing optional configuration for Publishers.
Definition: publisher_options.hpp:65
rclcpp::detail::resolve_use_intra_process
bool resolve_use_intra_process(const OptionsT &options, const NodeBaseT &node_base)
Return whether or not intra process is enabled, resolving "NodeDefault" if needed.
Definition: resolve_use_intra_process.hpp:31
type_support_decl.hpp
rclcpp::Publisher::do_loaned_message_publish
void do_loaned_message_publish(std::unique_ptr< MessageT, std::function< void(MessageT *)>> msg)
Definition: publisher.hpp:319
rclcpp::Publisher::~Publisher
virtual ~Publisher()
Definition: publisher.hpp:157
rcl_context_t
visibility_control.hpp
rclcpp::allocator::Deleter
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
std
rclcpp::PublisherBase::can_loan_messages
bool can_loan_messages() const
Check if publisher instance can loan messages.
rclcpp::PublisherBase::intra_process_publisher_id_
uint64_t intra_process_publisher_id_
Definition: publisher_base.hpp:235
rmw_qos_incompatible_event_status_t
rclcpp::SerializedMessage
Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks.
Definition: serialized_message.hpp:27
rclcpp::PublisherBase::get_subscription_count
size_t get_subscription_count() const
Get subscription count.
rclcpp::exceptions::throw_from_rcl_error
void throw_from_rcl_error(rcl_ret_t ret, const std::string &prefix="", const rcl_error_state_t *error_state=nullptr, void(*reset_error)()=rcl_reset_error)
Throw a C++ std::exception which was created based on an rcl error.
rclcpp::experimental::IntraProcessManager
This class performs intra process communication between nodes.
Definition: intra_process_manager.hpp:91
rclcpp::Publisher::post_init_setup
virtual void post_init_setup(rclcpp::node_interfaces::NodeBaseInterface *node_base, const std::string &topic, const rclcpp::QoS &qos, const rclcpp::PublisherOptionsWithAllocator< AllocatorT > &options)
Called post construction, so that construction may continue after shared_from_this() works.
Definition: publisher.hpp:122
rcl_publish_loaned_message
rcl_ret_t rcl_publish_loaned_message(const rcl_publisher_t *publisher, void *ros_message, rmw_publisher_allocation_t *allocation)
RCL_RET_OK
#define RCL_RET_OK
std::unique_ptr
rclcpp::Publisher::publish
void publish(const SerializedMessage &serialized_msg)
Definition: publisher.hpp:234
RMW_QOS_POLICY_DURABILITY_VOLATILE
RMW_QOS_POLICY_DURABILITY_VOLATILE
rclcpp::PublisherBase::get_intra_process_subscription_count
size_t get_intra_process_subscription_count() const
Get intraprocess subscription count.
rclcpp::Publisher::publish
virtual void publish(std::unique_ptr< MessageT, MessageDeleter > msg)
Send a message to the topic for this publisher.
Definition: publisher.hpp:187
rmw_qos_profile_t::depth
size_t depth
rcl_context_is_valid
bool rcl_context_is_valid(const rcl_context_t *context)
rclcpp::Publisher< statistics_msgs::msg::MetricsMessage >::MessageDeleter
allocator::Deleter< MessageAllocator, statistics_msgs::msg::MetricsMessage > MessageDeleter
Definition: publisher.hpp:58