rclcpp  master
C++ ROS Client Library API
publisher.hpp
Go to the documentation of this file.
1 // Copyright 2014 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__PUBLISHER_HPP_
16 #define RCLCPP__PUBLISHER_HPP_
17 
18 #include <rmw/error_handling.h>
19 #include <rmw/rmw.h>
20 
21 #include <functional>
22 #include <iostream>
23 #include <memory>
24 #include <sstream>
25 #include <string>
26 
27 #include "rcl/error_handling.h"
28 #include "rcl/publisher.h"
29 
30 #include "rcl_interfaces/msg/intra_process_message.hpp"
31 
34 #include "rclcpp/macros.hpp"
38 
39 namespace rclcpp
40 {
41 
42 // Forward declaration is used for friend statement.
43 namespace node_interfaces
44 {
45 class NodeTopicsInterface;
46 }
47 
49 {
50  friend ::rclcpp::node_interfaces::NodeTopicsInterface;
51 
52 public:
54 
55 
56 
67  const std::string & topic,
68  const rosidl_message_type_support_t & type_support,
69  const rcl_publisher_options_t & publisher_options);
70 
72  virtual ~PublisherBase();
73 
75 
77  const char *
78  get_topic_name() const;
79 
81 
83  size_t
84  get_queue_size() const;
85 
87 
89  const rmw_gid_t &
90  get_gid() const;
91 
93 
95  const rmw_gid_t &
96  get_intra_process_gid() const;
97 
99 
102  get_publisher_handle();
103 
105 
107  const rcl_publisher_t *
108  get_publisher_handle() const;
109 
111 
117  bool
118  operator==(const rmw_gid_t & gid) const;
119 
121 
127  bool
128  operator==(const rmw_gid_t * gid) const;
129 
131 
134  void
135  setup_intra_process(
136  uint64_t intra_process_publisher_id,
137  StoreMessageCallbackT callback,
138  const rcl_publisher_options_t & intra_process_options);
139 
140 protected:
142 
144  rcl_publisher_t intra_process_publisher_handle_ = rcl_get_zero_initialized_publisher();
145 
148 
151 };
152 
154 template<typename MessageT, typename Alloc = std::allocator<void>>
155 class Publisher : public PublisherBase
156 {
157 public:
159  using MessageAlloc = typename MessageAllocTraits::allocator_type;
162 
164 
166  rclcpp::node_interfaces::NodeBaseInterface * node_base,
167  const std::string & topic,
168  const rcl_publisher_options_t & publisher_options,
169  const std::shared_ptr<MessageAlloc> & allocator)
170  : PublisherBase(
171  node_base,
172  topic,
173  *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
174  publisher_options),
175  message_allocator_(allocator)
176  {
177  allocator::set_allocator_for_deleter(&message_deleter_, message_allocator_.get());
178  }
179 
180  virtual ~Publisher()
181  {}
182 
184 
188  virtual void
190  {
191  this->do_inter_process_publish(msg.get());
192  if (store_intra_process_message_) {
193  // Take the pointer from the unique_msg, release it and pass as a void *
194  // to the ipm. The ipm should then capture it again as a unique_ptr of
195  // the correct type.
196  // TODO(wjwwood):
197  // investigate how to transfer the custom deleter (if there is one)
198  // from the incoming unique_ptr through to the ipm's unique_ptr.
199  // See: http://stackoverflow.com/questions/11002641/dynamic-casting-for-unique-ptr
200  MessageT * msg_ptr = msg.get();
201  msg.release();
202  uint64_t message_seq =
203  store_intra_process_message_(intra_process_publisher_id_, msg_ptr, typeid(MessageT));
204  rcl_interfaces::msg::IntraProcessMessage ipm;
205  ipm.publisher_id = intra_process_publisher_id_;
206  ipm.message_sequence = message_seq;
207  auto status = rcl_publish(&intra_process_publisher_handle_, &ipm);
208  if (RCL_RET_PUBLISHER_INVALID == status) {
209  rcl_reset_error(); // next call will reset error message if not context
210  if (rcl_publisher_is_valid_except_context(&intra_process_publisher_handle_)) {
211  rcl_context_t * context = rcl_publisher_get_context(&intra_process_publisher_handle_);
212  if (nullptr != context && !rcl_context_is_valid(context)) {
213  // publisher is invalid due to context being shutdown
214  return;
215  }
216  }
217  }
218  if (RCL_RET_OK != status) {
219  rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish intra process message");
220  }
221  } else {
222  // Always destroy the message, even if we don't consume it, for consistency.
223  msg.reset();
224  }
225  }
226 
227  virtual void
229  {
230  // Avoid allocating when not using intra process.
231  if (!store_intra_process_message_) {
232  // In this case we're not using intra process.
233  return this->do_inter_process_publish(msg.get());
234  }
235  // Otherwise we have to allocate memory in a unique_ptr and pass it along.
236  // TODO(wjwwood):
237  // The intra process manager should probably also be able to store
238  // shared_ptr's and do the "smart" thing based on other intra process
239  // subscriptions. For now call the other publish().
240  auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
241  MessageAllocTraits::construct(*message_allocator_.get(), ptr, *msg.get());
242  MessageUniquePtr unique_msg(ptr, message_deleter_);
243  return this->publish(unique_msg);
244  }
245 
246  virtual void
248  {
249  // Avoid allocating when not using intra process.
250  if (!store_intra_process_message_) {
251  // In this case we're not using intra process.
252  return this->do_inter_process_publish(msg.get());
253  }
254  // Otherwise we have to allocate memory in a unique_ptr and pass it along.
255  // TODO(wjwwood):
256  // The intra process manager should probably also be able to store
257  // shared_ptr's and do the "smart" thing based on other intra process
258  // subscriptions. For now call the other publish().
259  auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
260  MessageAllocTraits::construct(*message_allocator_.get(), ptr, *msg.get());
261  MessageUniquePtr unique_msg(ptr, message_deleter_);
262  return this->publish(unique_msg);
263  }
264 
265  virtual void
266  publish(const MessageT & msg)
267  {
268  // Avoid allocating when not using intra process.
269  if (!store_intra_process_message_) {
270  // In this case we're not using intra process.
271  return this->do_inter_process_publish(&msg);
272  }
273  // Otherwise we have to allocate memory in a unique_ptr and pass it along.
274  auto ptr = MessageAllocTraits::allocate(*message_allocator_.get(), 1);
275  MessageAllocTraits::construct(*message_allocator_.get(), ptr, msg);
276  MessageUniquePtr unique_msg(ptr, message_deleter_);
277  return this->publish(unique_msg);
278  }
279 
280  virtual void
281  publish(const MessageT * msg)
282  {
283  if (!msg) {
284  throw std::runtime_error("msg argument is nullptr");
285  }
286  return this->publish(*msg);
287  }
288 
289  void
290  publish(const rcl_serialized_message_t * serialized_msg)
291  {
292  if (store_intra_process_message_) {
293  // TODO(Karsten1987): support serialized message passed by intraprocess
294  throw std::runtime_error("storing serialized messages in intra process is not supported yet");
295  }
296  auto status = rcl_publish_serialized_message(&publisher_handle_, serialized_msg);
297  if (RCL_RET_OK != status) {
298  rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish serialized message");
299  }
300  }
301 
302  void
304  {
305  return this->publish(serialized_msg.get());
306  }
307 
309  {
310  return message_allocator_;
311  }
312 
313 protected:
314  void
315  do_inter_process_publish(const MessageT * msg)
316  {
317  auto status = rcl_publish(&publisher_handle_, msg);
318  if (RCL_RET_PUBLISHER_INVALID == status) {
319  rcl_reset_error(); // next call will reset error message if not context
320  if (rcl_publisher_is_valid_except_context(&publisher_handle_)) {
321  rcl_context_t * context = rcl_publisher_get_context(&publisher_handle_);
322  if (nullptr != context && !rcl_context_is_valid(context)) {
323  // publisher is invalid due to context being shutdown
324  return;
325  }
326  }
327  }
328  if (RCL_RET_OK != status) {
329  rclcpp::exceptions::throw_from_rcl_error(status, "failed to publish message");
330  }
331  }
332 
334 
336 };
337 
338 } // namespace rclcpp
339 
340 #endif // RCLCPP__PUBLISHER_HPP_
uint64_t intra_process_publisher_id_
Definition: publisher.hpp:146
virtual void publish(std::shared_ptr< const MessageT > msg)
Definition: publisher.hpp:247
std::shared_ptr< MessageAlloc > get_allocator() const
Definition: publisher.hpp:308
#define rcl_reset_error
MessageDeleter message_deleter_
Definition: publisher.hpp:335
typename MessageAllocTraits::allocator_type MessageAlloc
Definition: publisher.hpp:159
Definition: publisher.hpp:48
Definition: allocator_common.hpp:24
virtual ~Publisher()
Definition: publisher.hpp:180
T release(T... args)
#define RCL_RET_OK
allocator::Deleter< MessageAlloc, rcl_interfaces::msg::ParameterEvent > MessageDeleter
Definition: publisher.hpp:160
void set_allocator_for_deleter(D *deleter, Alloc *alloc)
Definition: allocator_deleter.hpp:72
void throw_from_rcl_error(rcl_ret_t ret, const std::string &prefix="", const rcl_error_state_t *error_state=nullptr, void(*reset_error)()=rcl_reset_error)
Throw a C++ std::exception which was created based on an rcl error.
typename std::allocator_traits< Alloc >::template rebind_traits< T > AllocRebind
Definition: allocator_common.hpp:30
rcl_publisher_t rcl_get_zero_initialized_publisher(void)
#define RCLCPP_SMART_PTR_DEFINITIONS(...)
Definition: macros.hpp:36
bool rcl_context_is_valid(rcl_context_t *context)
rmw_gid_t rmw_gid_
Definition: publisher.hpp:149
#define RCL_RET_PUBLISHER_INVALID
void publish(std::shared_ptr< const rcl_serialized_message_t > serialized_msg)
Definition: publisher.hpp:303
Pure virtual interface class for the NodeBase part of the Node API.
Definition: node_base_interface.hpp:36
A publisher publishes messages of any type to a topic.
Definition: publisher.hpp:155
bool rcl_publisher_is_valid_except_context(const rcl_publisher_t *publisher)
virtual void publish(const MessageT *msg)
Definition: publisher.hpp:281
T reset(T... args)
std::shared_ptr< rcl_node_t > rcl_node_handle_
Definition: publisher.hpp:141
void do_inter_process_publish(const MessageT *msg)
Definition: publisher.hpp:315
std::shared_ptr< MessageAlloc > message_allocator_
Definition: publisher.hpp:333
virtual void publish(std::unique_ptr< MessageT, MessageDeleter > &msg)
Send a message to the topic for this publisher.
Definition: publisher.hpp:189
T get(T... args)
virtual void publish(const MessageT &msg)
Definition: publisher.hpp:266
virtual void publish(const std::shared_ptr< MessageT > &msg)
Definition: publisher.hpp:228
StoreMessageCallbackT store_intra_process_message_
Definition: publisher.hpp:147
rcl_ret_t rcl_publish_serialized_message(const rcl_publisher_t *publisher, const rcl_serialized_message_t *serialized_message)
void publish(const rcl_serialized_message_t *serialized_msg)
Definition: publisher.hpp:290
#define RCLCPP_PUBLIC
Definition: visibility_control.hpp:50
allocator::AllocRebind< rcl_interfaces::msg::ParameterEvent, std::allocator< void > > MessageAllocTraits
Definition: publisher.hpp:158
rmw_gid_t intra_process_rmw_gid_
Definition: publisher.hpp:150
typename std::conditional< std::is_same< typename std::allocator_traits< Alloc >::template rebind_alloc< T >, typename std::allocator< void >::template rebind< T >::other >::value, std::default_delete< T >, AllocatorDeleter< Alloc > >::type Deleter
Definition: allocator_deleter.hpp:101
rcl_ret_t rcl_publish(const rcl_publisher_t *publisher, const void *ros_message)
rcl_context_t * rcl_publisher_get_context(const rcl_publisher_t *publisher)