rclcpp  master
C++ ROS Client Library API
publisher.hpp
Go to the documentation of this file.
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__PUBLISHER_HPP_
16 #define RCLCPP__PUBLISHER_HPP_
17 
18 #include <rmw/error_handling.h>
19 #include <rmw/rmw.h>
20 
21 #include <functional>
22 #include <iostream>
23 #include <memory>
24 #include <sstream>
25 #include <string>
26 #include <utility>
27 
28 #include "rcl/error_handling.h"
29 #include "rcl/publisher.h"
30 
36 #include "rclcpp/macros.hpp"
42 
43 namespace rclcpp
44 {
45 
46 template<typename MessageT, typename AllocatorT>
47 class LoanedMessage;
48 
50 template<typename MessageT, typename AllocatorT = std::allocator<void>>
51 class Publisher : public PublisherBase
52 {
53 public:
55  using MessageAllocator = typename MessageAllocatorTraits::allocator_type;
59 
61 
63  rclcpp::node_interfaces::NodeBaseInterface * node_base,
64  const std::string & topic,
65  const rclcpp::QoS & qos,
66  const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
67  : PublisherBase(
68  node_base,
69  topic,
70  *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
71  options.template to_rcl_publisher_options<MessageT>(qos)),
72  options_(options),
74  {
76 
78  this->add_event_handler(
81  }
83  this->add_event_handler(
86  }
87 
88  // Setup continues in the post construction method, post_init_setup().
89  }
90 
92  virtual
93  void
96  const std::string & topic,
97  const rclcpp::QoS & qos,
99  {
100  // Topic is unused for now.
101  (void)topic;
102  (void)options;
103 
104  // If needed, setup intra process communication.
106  auto context = node_base->get_context();
107  // Get the intra process manager instance for this context.
108  auto ipm = context->get_sub_context<rclcpp::experimental::IntraProcessManager>();
109  // Register the publisher with the intra process manager.
111  throw std::invalid_argument(
112  "intraprocess communication is not allowed with keep all history qos policy");
113  }
114  if (qos.get_rmw_qos_profile().depth == 0) {
115  throw std::invalid_argument(
116  "intraprocess communication is not allowed with a zero qos history depth value");
117  }
119  throw std::invalid_argument(
120  "intraprocess communication allowed only with volatile durability");
121  }
122  uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this());
123  this->setup_intra_process(
124  intra_process_publisher_id,
125  ipm);
126  }
127  }
128 
129  virtual ~Publisher()
130  {}
131 
133 
149  {
151  }
152 
154 
158  virtual void
160  {
162  this->do_inter_process_publish(*msg);
163  return;
164  }
165  // If an interprocess subscription exist, then the unique_ptr is promoted
166  // to a shared_ptr and published.
167  // This allows doing the intraprocess publish first and then doing the
168  // interprocess publish, resulting in lower publish-to-subscribe latency.
169  // It's not possible to do that with an unique_ptr,
170  // as do_intra_process_publish takes the ownership of the message.
171  bool inter_process_publish_needed =
173 
174  if (inter_process_publish_needed) {
175  auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg));
176  this->do_inter_process_publish(*shared_msg);
177  } else {
179  }
180  }
181 
182  virtual void
183  publish(const MessageT & msg)
184  {
185  // Avoid allocating when not using intra process.
187  // In this case we're not using intra process.
188  return this->do_inter_process_publish(msg);
189  }
190  // Otherwise we have to allocate memory in a unique_ptr and pass it along.
191  // As the message is not const, a copy should be made.
192  // A shared_ptr<const MessageT> could also be constructed here.
193  auto ptr = MessageAllocatorTraits::allocate(*message_allocator_.get(), 1);
194  MessageAllocatorTraits::construct(*message_allocator_.get(), ptr, msg);
195  MessageUniquePtr unique_msg(ptr, message_deleter_);
196  this->publish(std::move(unique_msg));
197  }
198 
199  void
200  publish(const rcl_serialized_message_t & serialized_msg)
201  {
202  return this->do_serialized_publish(&serialized_msg);
203  }
204 
206 
213  void
215  {
216  if (!loaned_msg.is_valid()) {
217  throw std::runtime_error("loaned message is not valid");
218  }
220  // TODO(Karsten1987): support loaned message passed by intraprocess
221  throw std::runtime_error("storing loaned messages in intra process is not supported yet");
222  }
223 
224  // verify that publisher supports loaned messages
225  // TODO(Karsten1987): This case separation has to be done in rclcpp
226  // otherwise we have to ensure that every middleware implements
227  // `rmw_publish_loaned_message` explicitly the same way as `rmw_publish`
228  // by taking a copy of the ros message.
229  if (this->can_loan_messages()) {
230  // we release the ownership from the rclpp::LoanedMessage instance
231  // and let the middleware clean up the memory.
232  this->do_loaned_message_publish(loaned_msg.release());
233  } else {
234  // we don't release the ownership, let the middleware copy the ros message
235  // and thus the destructor of rclcpp::LoanedMessage cleans up the memory.
236  this->do_inter_process_publish(loaned_msg.get());
237  }
238  }
239 
242  {
243  return message_allocator_;
244  }
245 
246 protected:
247  void
248  do_inter_process_publish(const MessageT & msg)
249  {
250  auto status = rcl_publish(&publisher_handle_, &msg, nullptr);
251 
252  if (RCL_RET_PUBLISHER_INVALID == status) {
253  rcl_reset_error(); // next call will reset error message if not context
256  if (nullptr != context && !rcl_context_is_valid(context)) {
257  // publisher is invalid due to context being shutdown
258  return;
259  }
260  }
261  }
262  if (RCL_RET_OK != status) {
263  rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
264  }
265  }
266 
267  void
269  {
271  // TODO(Karsten1987): support serialized message passed by intraprocess
272  throw std::runtime_error("storing serialized messages in intra process is not supported yet");
273  }
274  auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr);
275  if (RCL_RET_OK != status) {
276  rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
277  }
278  }
279 
280  void
282  {
283  auto status = rcl_publish_loaned_message(&publisher_handle_, msg, nullptr);
284 
285  if (RCL_RET_PUBLISHER_INVALID == status) {
286  rcl_reset_error(); // next call will reset error message if not context
289  if (nullptr != context && !rcl_context_is_valid(context)) {
290  // publisher is invalid due to context being shutdown
291  return;
292  }
293  }
294  }
295  if (RCL_RET_OK != status) {
296  rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
297  }
298  }
299 
300  void
302  {
303  auto ipm = weak_ipm_.lock();
304  if (!ipm) {
305  throw std::runtime_error(
306  "intra process publish called after destruction of intra process manager");
307  }
308  if (!msg) {
309  throw std::runtime_error("cannot publish msg which is a null pointer");
310  }
311 
312  ipm->template do_intra_process_publish<MessageT, AllocatorT>(
314  std::move(msg),
316  }
317 
320  {
321  auto ipm = weak_ipm_.lock();
322  if (!ipm) {
323  throw std::runtime_error(
324  "intra process publish called after destruction of intra process manager");
325  }
326  if (!msg) {
327  throw std::runtime_error("cannot publish msg which is a null pointer");
328  }
329 
330  return ipm->template do_intra_process_publish_and_return_shared<MessageT, AllocatorT>(
332  std::move(msg),
334  }
335 
337 
342 
344 
346 };
347 
348 } // namespace rclcpp
349 
350 #endif // RCLCPP__PUBLISHER_HPP_
uint64_t intra_process_publisher_id_
Definition: publisher_base.hpp:221
void do_serialized_publish(const rcl_serialized_message_t *serialized_msg)
Definition: publisher.hpp:268
rcl_ret_t rcl_publish_serialized_message(const rcl_publisher_t *publisher, const rcl_serialized_message_t *serialized_message, rmw_publisher_allocation_t *allocation)
rcl_ret_t rcl_publish(const rcl_publisher_t *publisher, const void *ros_message, rmw_publisher_allocation_t *allocation)
virtual ~Publisher()
Definition: publisher.hpp:129
size_t get_intra_process_subscription_count() const
Get intraprocess subscription count.
typename MessageAllocatorTraits::allocator_type MessageAllocator
Definition: publisher.hpp:55
virtual void publish(std::unique_ptr< MessageT, MessageDeleter > msg)
Send a message to the topic for this publisher.
Definition: publisher.hpp:159
Encapsulation of Quality of Service settings.
Definition: qos.hpp:55
std::shared_ptr< const MessageT > do_intra_process_publish_and_return_shared(std::unique_ptr< MessageT, MessageDeleter > msg)
Definition: publisher.hpp:319
#define rcl_reset_error
PublisherEventCallbacks event_callbacks
Callbacks for various events related to publishers.
Definition: publisher_options.hpp:45
void do_inter_process_publish(const MessageT &msg)
Definition: publisher.hpp:248
RMW_QOS_POLICY_HISTORY_KEEP_ALL
Definition: publisher_base.hpp:55
virtual rclcpp::Context::SharedPtr get_context()=0
Return the context of the node.
void publish(rclcpp::LoanedMessage< MessageT, AllocatorT > &&loaned_msg)
Publish an instance of a LoanedMessage.
Definition: publisher.hpp:214
rmw_qos_profile_t & get_rmw_qos_profile()
Return the rmw qos profile.
This header provides the get_node_base_interface() template function.
Definition: allocator_common.hpp:24
std::shared_ptr< MessageAllocator > message_allocator_
Definition: publisher.hpp:343
QOSLivelinessLostCallbackType liveliness_callback
Definition: qos_event.hpp:45
bool intra_process_is_enabled_
Definition: publisher_base.hpp:219
void do_intra_process_publish(std::unique_ptr< MessageT, MessageDeleter > msg)
Definition: publisher.hpp:301
allocator::Deleter< MessageAllocator, rcl_interfaces::msg::ParameterEvent > MessageDeleter
Definition: publisher.hpp:56
enum rmw_qos_durability_policy_t durability
#define RCL_RET_OK
size_t get_subscription_count() const
Get subscription count.
void set_allocator_for_deleter(D *deleter, Alloc *alloc)
Definition: allocator_deleter.hpp:72
void throw_from_rcl_error(rcl_ret_t ret, const std::string &prefix="", const rcl_error_state_t *error_state=nullptr, void(*reset_error)()=rcl_reset_error)
Throw a C++ std::exception which was created based on an rcl error.
bool can_loan_messages() const
Check if publisher instance can loan messages.
rcl_ret_t rcl_publish_loaned_message(const rcl_publisher_t *publisher, void *ros_message, rmw_publisher_allocation_t *allocation)
std::shared_ptr< MessageAllocator > get_allocator() const
Definition: publisher.hpp:241
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
RMW_QOS_POLICY_DURABILITY_VOLATILE
bool resolve_use_intra_process(const OptionsT &options, const NodeBaseT &node_base)
Return whether or not intra process is enabled, resolving "NodeDefault" if needed.
Definition: resolve_use_intra_process.hpp:31
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
bool rcl_context_is_valid(rcl_context_t *context)
#define RCL_RET_PUBLISHER_INVALID
rclcpp::LoanedMessage< MessageT, AllocatorT > borrow_loaned_message()
Borrow a loaned ROS message from the middleware.
Definition: publisher.hpp:148
This class performs intra process communication between nodes.
Definition: intra_process_manager.hpp:91
enum rmw_qos_history_policy_t history
Pure virtual interface class for the NodeBase part of the Node API.
Definition: node_base_interface.hpp:36
virtual void publish(const MessageT &msg)
Definition: publisher.hpp:183
T lock(T... args)
A publisher publishes messages of any type to a topic.
Definition: publisher.hpp:51
bool rcl_publisher_is_valid_except_context(const rcl_publisher_t *publisher)
virtual void post_init_setup(rclcpp::node_interfaces::NodeBaseInterface *node_base, const std::string &topic, const rclcpp::QoS &qos, const rclcpp::PublisherOptionsWithAllocator< AllocatorT > &options)
Called post construction, so that construction may continue after shared_from_this() works...
Definition: publisher.hpp:94
T move(T... args)
Structure containing optional configuration for Publishers.
Definition: publisher_options.hpp:57
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED
T get(T... args)
QOSDeadlineOfferedCallbackType deadline_callback
Definition: qos_event.hpp:44
IntraProcessManagerWeakPtr weak_ipm_
Definition: publisher_base.hpp:220
allocator::AllocRebind< rcl_interfaces::msg::ParameterEvent, std::allocator< void > > MessageAllocatorTraits
Definition: publisher.hpp:54
void do_loaned_message_publish(MessageT *msg)
Definition: publisher.hpp:281
MessageDeleter message_deleter_
Definition: publisher.hpp:345
void setup_intra_process(uint64_t intra_process_publisher_id, IntraProcessManagerSharedPtr ipm)
Implementation utility function used to setup intra process publishing after creation.
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
const rclcpp::PublisherOptionsWithAllocator< AllocatorT > options_
Copy of original options passed during construction.
Definition: publisher.hpp:341
RCL_PUBLISHER_LIVELINESS_LOST
void publish(const rcl_serialized_message_t &serialized_msg)
Definition: publisher.hpp:200
Definition: loaned_message.hpp:32
void add_event_handler(const EventCallbackT &callback, const rcl_publisher_event_type_t event_type)
Definition: publisher_base.hpp:199
rcl_context_t * rcl_publisher_get_context(const rcl_publisher_t *publisher)
rcl_publisher_t publisher_handle_
Definition: publisher_base.hpp:213