rclcpp  master
C++ ROS Client Library API
publisher.hpp
Go to the documentation of this file.
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__PUBLISHER_HPP_
16 #define RCLCPP__PUBLISHER_HPP_
17 
18 #include <rmw/error_handling.h>
19 #include <rmw/rmw.h>
20 
21 #include <functional>
22 #include <iostream>
23 #include <memory>
24 #include <sstream>
25 #include <string>
26 #include <utility>
27 
28 #include "rcl/error_handling.h"
29 #include "rcl/publisher.h"
30 
31 #include "rcl_interfaces/msg/intra_process_message.hpp"
32 
36 #include "rclcpp/macros.hpp"
40 
41 namespace rclcpp
42 {
43 
45 template<typename MessageT, typename Alloc = std::allocator<void>>
46 class Publisher : public PublisherBase
47 {
48 public:
50  using MessageAlloc = typename MessageAllocTraits::allocator_type;
54 
56 
58  rclcpp::node_interfaces::NodeBaseInterface * node_base,
59  const std::string & topic,
60  const rcl_publisher_options_t & publisher_options,
61  const PublisherEventCallbacks & event_callbacks,
62  const std::shared_ptr<MessageAlloc> & allocator)
63  : PublisherBase(
64  node_base,
65  topic,
66  *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
67  publisher_options),
68  message_allocator_(allocator)
69  {
71 
72  if (event_callbacks.deadline_callback) {
73  this->add_event_handler(event_callbacks.deadline_callback,
75  }
76  if (event_callbacks.liveliness_callback) {
77  this->add_event_handler(event_callbacks.liveliness_callback,
79  }
80  }
81 
82  virtual ~Publisher()
83  {}
84 
85  mapped_ring_buffer::MappedRingBufferBase::SharedPtr
86  make_mapped_ring_buffer(size_t size) const override
87  {
89  MessageT,
91  >::make_shared(size, this->get_allocator());
92  }
93 
95 
99  virtual void
101  {
103  this->do_inter_process_publish(msg.get());
104  return;
105  }
106  // If an interprocess subscription exist, then the unique_ptr is promoted
107  // to a shared_ptr and published.
108  // This allows doing the intraprocess publish first and then doing the
109  // interprocess publish, resulting in lower publish-to-subscribe latency.
110  // It's not possible to do that with an unique_ptr,
111  // as do_intra_process_publish takes the ownership of the message.
112  uint64_t message_seq;
113  bool inter_process_publish_needed =
115  MessageSharedPtr shared_msg;
116  if (inter_process_publish_needed) {
117  shared_msg = std::move(msg);
118  message_seq =
120  } else {
121  message_seq =
123  }
124  this->do_intra_process_publish(message_seq);
125  if (inter_process_publish_needed) {
126  this->do_inter_process_publish(shared_msg.get());
127  }
128  }
129 
130 // Skip deprecated attribute in windows, as it raise a warning in template specialization.
131 #if !defined(_WIN32)
132  [[deprecated(
133  "publishing an unique_ptr is prefered when using intra process communication."
134  " If using a shared_ptr, use publish(*msg).")]]
135 #endif
136  virtual void
138  {
139  publish(*msg);
140  }
141 
142  virtual void
143  publish(const MessageT & msg)
144  {
145  // Avoid allocating when not using intra process.
147  // In this case we're not using intra process.
148  return this->do_inter_process_publish(&msg);
149  }
150  // Otherwise we have to allocate memory in a unique_ptr and pass it along.
151  // As the message is not const, a copy should be made.
152  // A shared_ptr<const MessageT> could also be constructed here.
153  auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
154  MessageAllocTraits::construct(*message_allocator_.get(), ptr, msg);
155  MessageUniquePtr unique_msg(ptr, message_deleter_);
156  this->publish(std::move(unique_msg));
157  }
158 
159 // Skip deprecated attribute in windows, as it raise a warning in template specialization.
160 #if !defined(_WIN32)
161  [[deprecated(
162  "Use publish(*msg). Check against nullptr before calling if necessary.")]]
163 #endif
164  virtual void
165  publish(const MessageT * msg)
166  {
167  if (!msg) {
168  throw std::runtime_error("msg argument is nullptr");
169  }
170  return this->publish(*msg);
171  }
172 
173  void
174  publish(const rcl_serialized_message_t & serialized_msg)
175  {
176  return this->do_serialized_publish(&serialized_msg);
177  }
178 
179 // Skip deprecated attribute in windows, as it raise a warning in template specialization.
180 #if !defined(_WIN32)
181  [[deprecated(
182  "Use publish(*serialized_msg). Check against nullptr before calling if necessary.")]]
183 #endif
184  void
185  publish(const rcl_serialized_message_t * serialized_msg)
186  {
187  return this->do_serialized_publish(serialized_msg);
188  }
189 
190 // Skip deprecated attribute in windows, as it raise a warning in template specialization.
191 #if !defined(_WIN32)
192  [[deprecated(
193  "Use publish(*serialized_msg). Check against nullptr before calling if necessary.")]]
194 #endif
195  void
197  {
198  return this->do_serialized_publish(serialized_msg.get());
199  }
200 
202  {
203  return message_allocator_;
204  }
205 
206 protected:
207  void
208  do_inter_process_publish(const MessageT * msg)
209  {
210  auto status = rcl_publish(&publisher_handle_, msg, nullptr);
211  if (RCL_RET_PUBLISHER_INVALID == status) {
212  rcl_reset_error(); // next call will reset error message if not context
215  if (nullptr != context && !rcl_context_is_valid(context)) {
216  // publisher is invalid due to context being shutdown
217  return;
218  }
219  }
220  }
221  if (RCL_RET_OK != status) {
222  rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
223  }
224  }
225 
226  void
228  {
230  // TODO(Karsten1987): support serialized message passed by intraprocess
231  throw std::runtime_error("storing serialized messages in intra process is not supported yet");
232  }
233  auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg, nullptr);
234  if (RCL_RET_OK != status) {
235  rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
236  }
237  }
238 
239  void
240  do_intra_process_publish(uint64_t message_seq)
241  {
242  rcl_interfaces::msg::IntraProcessMessage ipm;
243  ipm.publisher_id = intra_process_publisher_id_;
244  ipm.message_sequence = message_seq;
245  auto status = rcl_publish(&intra_process_publisher_handle_, &ipm, nullptr);
246  if (RCL_RET_PUBLISHER_INVALID == status) {
247  rcl_reset_error(); // next call will reset error message if not context
250  if (nullptr != context && !rcl_context_is_valid(context)) {
251  // publisher is invalid due to context being shutdown
252  return;
253  }
254  }
255  }
256  if (RCL_RET_OK != status) {
257  rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish intra process message");
258  }
259  }
260 
261  uint64_t
263  uint64_t publisher_id,
265  {
266  auto ipm = weak_ipm_.lock();
267  if (!ipm) {
268  throw std::runtime_error(
269  "intra process publish called after destruction of intra process manager");
270  }
271  if (!msg) {
272  throw std::runtime_error("cannot publisher msg which is a null pointer");
273  }
274  uint64_t message_seq =
275  ipm->template store_intra_process_message<MessageT, Alloc>(publisher_id, msg);
276  return message_seq;
277  }
278 
279  uint64_t
281  uint64_t publisher_id,
283  {
284  auto ipm = weak_ipm_.lock();
285  if (!ipm) {
286  throw std::runtime_error(
287  "intra process publish called after destruction of intra process manager");
288  }
289  if (!msg) {
290  throw std::runtime_error("cannot publisher msg which is a null pointer");
291  }
292  uint64_t message_seq =
293  ipm->template store_intra_process_message<MessageT, Alloc>(publisher_id, std::move(msg));
294  return message_seq;
295  }
296 
298 
300 };
301 
302 } // namespace rclcpp
303 
304 #endif // RCLCPP__PUBLISHER_HPP_
uint64_t intra_process_publisher_id_
Definition: publisher_base.hpp:224
std::shared_ptr< MessageAlloc > get_allocator() const
Definition: publisher.hpp:201
rcl_ret_t rcl_publish_serialized_message(const rcl_publisher_t *publisher, const rcl_serialized_message_t *serialized_message, rmw_publisher_allocation_t *allocation)
rcl_ret_t rcl_publish(const rcl_publisher_t *publisher, const void *ros_message, rmw_publisher_allocation_t *allocation)
size_t get_intra_process_subscription_count() const
Get intraprocess subscription count.
#define rcl_reset_error
void publish(const rcl_serialized_message_t &serialized_msg)
Definition: publisher.hpp:174
MessageDeleter message_deleter_
Definition: publisher.hpp:299
Contains callbacks for various types of events a Publisher can receive from the middleware.
Definition: qos_event.hpp:42
typename MessageAllocTraits::allocator_type MessageAlloc
Definition: publisher.hpp:50
Definition: publisher_base.hpp:55
void do_intra_process_publish(uint64_t message_seq)
Definition: publisher.hpp:240
This header provides the get_node_topics_interface() template function.
Definition: allocator_common.hpp:24
virtual ~Publisher()
Definition: publisher.hpp:82
bool intra_process_is_enabled_
Definition: publisher_base.hpp:222
Ring buffer container of shared_ptr&#39;s or unique_ptr&#39;s of T, which can be accessed by a key...
Definition: mapped_ring_buffer.hpp:60
rcl_publisher_t intra_process_publisher_handle_
Definition: publisher_base.hpp:216
#define RCL_RET_OK
void do_serialized_publish(const rcl_serialized_message_t *serialized_msg)
Definition: publisher.hpp:227
size_t get_subscription_count() const
Get subscription count.
allocator::Deleter< MessageAlloc, rcl_interfaces::msg::ParameterEvent > MessageDeleter
Definition: publisher.hpp:51
void set_allocator_for_deleter(D *deleter, Alloc *alloc)
Definition: allocator_deleter.hpp:72
void throw_from_rcl_error(rcl_ret_t ret, const std::string &prefix="", const rcl_error_state_t *error_state=nullptr, void(*reset_error)()=rcl_reset_error)
Throw a C++ std::exception which was created based on an rcl error.
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
bool rcl_context_is_valid(rcl_context_t *context)
virtual void publish(std::unique_ptr< MessageT, MessageDeleter > msg)
Send a message to the topic for this publisher.
Definition: publisher.hpp:100
#define RCL_RET_PUBLISHER_INVALID
void publish(std::shared_ptr< const rcl_serialized_message_t > serialized_msg)
Definition: publisher.hpp:196
T lock(T... args)
A publisher publishes messages of any type to a topic.
Definition: publisher.hpp:46
bool rcl_publisher_is_valid_except_context(const rcl_publisher_t *publisher)
virtual void publish(const MessageT *msg)
Definition: publisher.hpp:165
T move(T... args)
void do_inter_process_publish(const MessageT *msg)
Definition: publisher.hpp:208
std::shared_ptr< MessageAlloc > message_allocator_
Definition: publisher.hpp:297
RCL_PUBLISHER_OFFERED_DEADLINE_MISSED
T get(T... args)
virtual void publish(const MessageT &msg)
Definition: publisher.hpp:143
IntraProcessManagerWeakPtr weak_ipm_
Definition: publisher_base.hpp:223
void publish(const rcl_serialized_message_t *serialized_msg)
Definition: publisher.hpp:185
uint64_t store_intra_process_message(uint64_t publisher_id, std::unique_ptr< MessageT, MessageDeleter > msg)
Definition: publisher.hpp:280
allocator::AllocRebind< rcl_interfaces::msg::ParameterEvent, std::allocator< void > > MessageAllocTraits
Definition: publisher.hpp:49
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
mapped_ring_buffer::MappedRingBufferBase::SharedPtr make_mapped_ring_buffer(size_t size) const override
Implementation utility function that creates a typed mapped ring buffer.
Definition: publisher.hpp:86
RCL_PUBLISHER_LIVELINESS_LOST
virtual void publish(const std::shared_ptr< const MessageT > &msg)
Definition: publisher.hpp:137
void add_event_handler(const EventCallbackT &callback, const rcl_publisher_event_type_t event_type)
Definition: publisher_base.hpp:201
rcl_context_t * rcl_publisher_get_context(const rcl_publisher_t *publisher)
uint64_t store_intra_process_message(uint64_t publisher_id, std::shared_ptr< const MessageT > msg)
Definition: publisher.hpp:262
rcl_publisher_t publisher_handle_
Definition: publisher_base.hpp:215