rclcpp  master
C++ ROS Client Library API
publisher.hpp
Go to the documentation of this file.
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__PUBLISHER_HPP_
16 #define RCLCPP__PUBLISHER_HPP_
17 
18 #include <rmw/error_handling.h>
19 #include <rmw/rmw.h>
20 
21 #include <functional>
22 #include <iostream>
23 #include <memory>
24 #include <sstream>
25 #include <string>
26 #include <utility>
27 
28 #include "rcl/error_handling.h"
29 #include "rcl/publisher.h"
30 
36 #include "rclcpp/macros.hpp"
42 
43 namespace rclcpp
44 {
45 
46 template<typename MessageT, typename AllocatorT>
47 class LoanedMessage;
48 
50 template<typename MessageT, typename AllocatorT = std::allocator<void>>
51 class Publisher : public PublisherBase
52 {
53 public:
55  using MessageAllocator = typename MessageAllocatorTraits::allocator_type;
59 
61 
62 
63 
74  rclcpp::node_interfaces::NodeBaseInterface * node_base,
75  const std::string & topic,
76  const rclcpp::QoS & qos,
77  const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
78  : PublisherBase(
79  node_base,
80  topic,
81  *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
82  options.template to_rcl_publisher_options<MessageT>(qos)),
83  options_(options),
85  {
87 
88  if (options_.event_callbacks.deadline_callback) {
89  this->add_event_handler(
90  options_.event_callbacks.deadline_callback,
91  RCL_PUBLISHER_OFFERED_DEADLINE_MISSED);
92  }
93  if (options_.event_callbacks.liveliness_callback) {
94  this->add_event_handler(
95  options_.event_callbacks.liveliness_callback,
96  RCL_PUBLISHER_LIVELINESS_LOST);
97  }
98  if (options_.event_callbacks.incompatible_qos_callback) {
99  this->add_event_handler(
100  options_.event_callbacks.incompatible_qos_callback,
101  RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
102  } else if (options_.use_default_callbacks) {
103  // Register default callback when not specified
104  try {
105  this->add_event_handler(
106  [this](QOSOfferedIncompatibleQoSInfo & info) {
108  },
109  RCL_PUBLISHER_OFFERED_INCOMPATIBLE_QOS);
110  } catch (UnsupportedEventTypeException & /*exc*/) {
111  // pass
112  }
113  }
114  // Setup continues in the post construction method, post_init_setup().
115  }
116 
118  virtual
119  void
122  const std::string & topic,
123  const rclcpp::QoS & qos,
125  {
126  // Topic is unused for now.
127  (void)topic;
128  (void)options;
129 
130  // If needed, setup intra process communication.
132  auto context = node_base->get_context();
133  // Get the intra process manager instance for this context.
134  auto ipm = context->get_sub_context<rclcpp::experimental::IntraProcessManager>();
135  // Register the publisher with the intra process manager.
137  throw std::invalid_argument(
138  "intraprocess communication is not allowed with keep all history qos policy");
139  }
140  if (qos.get_rmw_qos_profile().depth == 0) {
141  throw std::invalid_argument(
142  "intraprocess communication is not allowed with a zero qos history depth value");
143  }
145  throw std::invalid_argument(
146  "intraprocess communication allowed only with volatile durability");
147  }
148  uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this());
149  this->setup_intra_process(
150  intra_process_publisher_id,
151  ipm);
152  }
153  }
154 
155  virtual ~Publisher()
156  {}
157 
159 
175  {
177  }
178 
180 
184  virtual void
186  {
188  this->do_inter_process_publish(*msg);
189  return;
190  }
191  // If an interprocess subscription exist, then the unique_ptr is promoted
192  // to a shared_ptr and published.
193  // This allows doing the intraprocess publish first and then doing the
194  // interprocess publish, resulting in lower publish-to-subscribe latency.
195  // It's not possible to do that with an unique_ptr,
196  // as do_intra_process_publish takes the ownership of the message.
197  bool inter_process_publish_needed =
199 
200  if (inter_process_publish_needed) {
201  auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg));
202  this->do_inter_process_publish(*shared_msg);
203  } else {
205  }
206  }
207 
208  virtual void
209  publish(const MessageT & msg)
210  {
211  // Avoid allocating when not using intra process.
213  // In this case we're not using intra process.
214  return this->do_inter_process_publish(msg);
215  }
216  // Otherwise we have to allocate memory in a unique_ptr and pass it along.
217  // As the message is not const, a copy should be made.
218  // A shared_ptr<const MessageT> could also be constructed here.
219  auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1);
220  MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg);
221  MessageUniquePtr unique_msg(ptr, message_deleter_);
222  this->publish(std::move(unique_msg));
223  }
224 
225  void
226  publish(const rcl_serialized_message_t & serialized_msg)
227  {
228  return this->do_serialized_publish(&serialized_msg);
229  }
230 
231  void
232  publish(const SerializedMessage & serialized_msg)
233  {
234  return this->do_serialized_publish(&serialized_msg.get_rcl_serialized_message());
235  }
236 
238 
245  void
247  {
248  if (!loaned_msg.is_valid()) {
249  throw std::runtime_error("loaned message is not valid");
250  }
252  // TODO(Karsten1987): support loaned message passed by intraprocess
253  throw std::runtime_error("storing loaned messages in intra process is not supported yet");
254  }
255 
256  // verify that publisher supports loaned messages
257  // TODO(Karsten1987): This case separation has to be done in rclcpp
258  // otherwise we have to ensure that every middleware implements
259  // `rmw_publish_loaned_message` explicitly the same way as `rmw_publish`
260  // by taking a copy of the ros message.
261  if (this->can_loan_messages()) {
262  // we release the ownership from the rclpp::LoanedMessage instance
263  // and let the middleware clean up the memory.
264  this->do_loaned_message_publish(loaned_msg.release());
265  } else {
266  // we don't release the ownership, let the middleware copy the ros message
267  // and thus the destructor of rclcpp::LoanedMessage cleans up the memory.
268  this->do_inter_process_publish(loaned_msg.get());
269  }
270  }
271 
274  {
275  return message_allocator_;
276  }
277 
278 protected:
279  void
280  do_inter_process_publish(const MessageT & msg)
281  {
282  auto status = rcl_publish(publisher_handle_.get(), &msg, nullptr);
283 
284  if (RCL_RET_PUBLISHER_INVALID == status) {
285  rcl_reset_error(); // next call will reset error message if not context
286  if (rcl_publisher_is_valid_except_context(publisher_handle_.get())) {
287  rcl_context_t * context = rcl_publisher_get_context(publisher_handle_.get());
288  if (nullptr != context && !rcl_context_is_valid(context)) {
289  // publisher is invalid due to context being shutdown
290  return;
291  }
292  }
293  }
294  if (RCL_RET_OK != status) {
295  rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
296  }
297  }
298 
299  void
300  do_serialized_publish(const rcl_serialized_message_t * serialized_msg)
301  {
303  // TODO(Karsten1987): support serialized message passed by intraprocess
304  throw std::runtime_error("storing serialized messages in intra process is not supported yet");
305  }
306  auto status = rcl_publish_serialized_message(publisher_handle_.get(), serialized_msg, nullptr);
307  if (RCL_RET_OK != status) {
308  rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
309  }
310  }
311 
312  void
314  {
315  auto status = rcl_publish_loaned_message(publisher_handle_.get(), msg, nullptr);
316 
317  if (RCL_RET_PUBLISHER_INVALID == status) {
318  rcl_reset_error(); // next call will reset error message if not context
319  if (rcl_publisher_is_valid_except_context(publisher_handle_.get())) {
320  rcl_context_t * context = rcl_publisher_get_context(publisher_handle_.get());
321  if (nullptr != context && !rcl_context_is_valid(context)) {
322  // publisher is invalid due to context being shutdown
323  return;
324  }
325  }
326  }
327  if (RCL_RET_OK != status) {
328  rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
329  }
330  }
331 
332  void
334  {
335  auto ipm = weak_ipm_.lock();
336  if (!ipm) {
337  throw std::runtime_error(
338  "intra process publish called after destruction of intra process manager");
339  }
340  if (!msg) {
341  throw std::runtime_error("cannot publish msg which is a null pointer");
342  }
343 
344  ipm->template do_intra_process_publish<MessageT, AllocatorT>(
346  std::move(msg),
348  }
349 
352  {
353  auto ipm = weak_ipm_.lock();
354  if (!ipm) {
355  throw std::runtime_error(
356  "intra process publish called after destruction of intra process manager");
357  }
358  if (!msg) {
359  throw std::runtime_error("cannot publish msg which is a null pointer");
360  }
361 
362  return ipm->template do_intra_process_publish_and_return_shared<MessageT, AllocatorT>(
364  std::move(msg),
366  }
367 
369 
374 
376 
378 };
379 
380 } // namespace rclcpp
381 
382 #endif // RCLCPP__PUBLISHER_HPP_
rclcpp::PublisherBase::weak_ipm_
IntraProcessManagerWeakPtr weak_ipm_
Definition: publisher_base.hpp:224
RMW_QOS_POLICY_HISTORY_KEEP_ALL
RMW_QOS_POLICY_HISTORY_KEEP_ALL
rclcpp::Publisher::do_intra_process_publish_and_return_shared
std::shared_ptr< const MessageT > do_intra_process_publish_and_return_shared(std::unique_ptr< MessageT, MessageDeleter > msg)
Definition: publisher.hpp:351
std::weak_ptr::lock
T lock(T... args)
rclcpp::Publisher::options_
const rclcpp::PublisherOptionsWithAllocator< AllocatorT > options_
Copy of original options passed during construction.
Definition: publisher.hpp:373
std::string
rclcpp::Publisher
A publisher publishes messages of any type to a topic.
Definition: publisher.hpp:51
std::shared_ptr
std::move
T move(T... args)
rmw.h
rclcpp::node_interfaces::NodeBaseInterface::get_context
virtual rclcpp::Context::SharedPtr get_context()=0
Return the context of the node.
rclcpp::Publisher::do_intra_process_publish
void do_intra_process_publish(std::unique_ptr< MessageT, MessageDeleter > msg)
Definition: publisher.hpp:333
rclcpp::PublisherBase::default_incompatible_qos_callback
void default_incompatible_qos_callback(QOSOfferedIncompatibleQoSInfo &info) const
rclcpp::SerializedMessage::get_rcl_serialized_message
rcl_serialized_message_t & get_rcl_serialized_message()
Get the underlying rcl_serialized_t handle.
rclcpp::QoS::get_rmw_qos_profile
rmw_qos_profile_t & get_rmw_qos_profile()
Return the rmw qos profile.
rclcpp::Publisher::do_inter_process_publish
void do_inter_process_publish(const MessageT &msg)
Definition: publisher.hpp:280
rclcpp::Publisher::get_allocator
std::shared_ptr< MessageAllocator > get_allocator() const
Definition: publisher.hpp:273
error_handling.h
rclcpp::Publisher::borrow_loaned_message
rclcpp::LoanedMessage< MessageT, AllocatorT > borrow_loaned_message()
Borrow a loaned ROS message from the middleware.
Definition: publisher.hpp:174
std::shared_ptr::get
T get(T... args)
loaned_message.hpp
rclcpp::LoanedMessage
Definition: loaned_message.hpp:32
rclcpp::Publisher::do_loaned_message_publish
void do_loaned_message_publish(MessageT *msg)
Definition: publisher.hpp:313
rmw_qos_profile_t::durability
enum rmw_qos_durability_policy_t durability
rclcpp::Publisher::publish
virtual void publish(const MessageT &msg)
Definition: publisher.hpp:209
rclcpp::Publisher::do_serialized_publish
void do_serialized_publish(const rcl_serialized_message_t *serialized_msg)
Definition: publisher.hpp:300
rclcpp::PublisherBase::publisher_handle_
std::shared_ptr< rcl_publisher_t > publisher_handle_
Definition: publisher_base.hpp:217
rclcpp
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
rclcpp::Publisher::message_allocator_
std::shared_ptr< MessageAllocator > message_allocator_
Definition: publisher.hpp:375
rclcpp::Publisher< statistics_msgs::msg::MetricsMessage >::MessageAllocator
typename MessageAllocatorTraits::allocator_type MessageAllocator
Definition: publisher.hpp:55
rclcpp::node_interfaces::NodeBaseInterface
Pure virtual interface class for the NodeBase part of the Node API.
Definition: node_base_interface.hpp:36
rclcpp::QoS
Encapsulation of Quality of Service settings.
Definition: qos.hpp:59
allocator_deleter.hpp
RCLCPP_SMART_PTR_DEFINITIONS
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
rclcpp::PublisherBase::setup_intra_process
void setup_intra_process(uint64_t intra_process_publisher_id, IntraProcessManagerSharedPtr ipm)
Implementation utility function used to setup intra process publishing after creation.
macros.hpp
std::enable_shared_from_this< PublisherBase >::shared_from_this
T shared_from_this(T... args)
rclcpp::allocator::set_allocator_for_deleter
void set_allocator_for_deleter(D *deleter, Alloc *alloc)
Definition: allocator_deleter.hpp:72
allocator_common.hpp
resolve_use_intra_process.hpp
publisher_options.hpp
rclcpp::Publisher::publish
void publish(rclcpp::LoanedMessage< MessageT, AllocatorT > &&loaned_msg)
Publish an instance of a LoanedMessage.
Definition: publisher.hpp:246
node_base_interface.hpp
rclcpp::allocator::AllocRebind
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
rclcpp::Publisher< statistics_msgs::msg::MetricsMessage >::MessageAllocatorTraits
allocator::AllocRebind< statistics_msgs::msg::MetricsMessage, std::allocator< void > > MessageAllocatorTraits
Definition: publisher.hpp:54
rclcpp::PublisherBase
Definition: publisher_base.hpp:55
std::runtime_error
rclcpp::UnsupportedEventTypeException
Definition: qos_event.hpp:64
rmw_qos_profile_t::history
enum rmw_qos_history_policy_t history
std::invalid_argument
rclcpp::PublisherBase::add_event_handler
void add_event_handler(const EventCallbackT &callback, const rcl_publisher_event_type_t event_type)
Definition: publisher_base.hpp:199
publisher_base.hpp
intra_process_manager.hpp
rclcpp::Publisher::message_deleter_
MessageDeleter message_deleter_
Definition: publisher.hpp:377
rclcpp::PublisherBase::intra_process_is_enabled_
bool intra_process_is_enabled_
Definition: publisher_base.hpp:223
rclcpp::Publisher::publish
void publish(const rcl_serialized_message_t &serialized_msg)
Definition: publisher.hpp:226
rclcpp::PublisherOptionsWithAllocator
Structure containing optional configuration for Publishers.
Definition: publisher_options.hpp:57
rclcpp::detail::resolve_use_intra_process
bool resolve_use_intra_process(const OptionsT &options, const NodeBaseT &node_base)
Return whether or not intra process is enabled, resolving "NodeDefault" if needed.
Definition: resolve_use_intra_process.hpp:31
type_support_decl.hpp
rclcpp::Publisher::~Publisher
virtual ~Publisher()
Definition: publisher.hpp:155
rcl_context_t
visibility_control.hpp
rclcpp::allocator::Deleter
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
std
rclcpp::PublisherBase::can_loan_messages
bool can_loan_messages() const
Check if publisher instance can loan messages.
rclcpp::PublisherBase::intra_process_publisher_id_
uint64_t intra_process_publisher_id_
Definition: publisher_base.hpp:225
rmw_qos_incompatible_event_status_t
rclcpp::SerializedMessage
Object oriented version of rcl_serialized_message_t with destructor to avoid memory leaks.
Definition: serialized_message.hpp:27
rclcpp::PublisherBase::get_subscription_count
size_t get_subscription_count() const
Get subscription count.
rclcpp::exceptions::throw_from_rcl_error
void throw_from_rcl_error(rcl_ret_t ret, const std::string &prefix="", const rcl_error_state_t *error_state=nullptr, void(*reset_error)()=rcl_reset_error)
Throw a C++ std::exception which was created based on an rcl error.
rclcpp::experimental::IntraProcessManager
This class performs intra process communication between nodes.
Definition: intra_process_manager.hpp:91
rclcpp::Publisher::post_init_setup
virtual void post_init_setup(rclcpp::node_interfaces::NodeBaseInterface *node_base, const std::string &topic, const rclcpp::QoS &qos, const rclcpp::PublisherOptionsWithAllocator< AllocatorT > &options)
Called post construction, so that construction may continue after shared_from_this() works.
Definition: publisher.hpp:120
std::unique_ptr
rclcpp::Publisher::publish
void publish(const SerializedMessage &serialized_msg)
Definition: publisher.hpp:232
RMW_QOS_POLICY_DURABILITY_VOLATILE
RMW_QOS_POLICY_DURABILITY_VOLATILE
rclcpp::PublisherBase::get_intra_process_subscription_count
size_t get_intra_process_subscription_count() const
Get intraprocess subscription count.
rclcpp::Publisher::publish
virtual void publish(std::unique_ptr< MessageT, MessageDeleter > msg)
Send a message to the topic for this publisher.
Definition: publisher.hpp:185
rmw_qos_profile_t::depth
size_t depth
rclcpp::Publisher< statistics_msgs::msg::MetricsMessage >::MessageDeleter
allocator::Deleter< MessageAllocator, statistics_msgs::msg::MetricsMessage > MessageDeleter
Definition: publisher.hpp:56