rclcpp  master
C++ ROS Client Library API
subscription_factory.hpp
Go to the documentation of this file.
1 // Copyright 2016 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__SUBSCRIPTION_FACTORY_HPP_
16 #define RCLCPP__SUBSCRIPTION_FACTORY_HPP_
17 
18 #include <functional>
19 #include <memory>
20 #include <string>
21 #include <utility>
22 
23 #include "rcl/subscription.h"
24 
25 #include "rosidl_typesupport_cpp/message_type_support.hpp"
26 
27 #include "rclcpp/subscription.hpp"
32 
33 namespace rclcpp
34 {
35 
37 
48 {
49  // Creates a Subscription<MessageT> object and returns it as a SubscriptionBase.
51  rclcpp::SubscriptionBase::SharedPtr(
53  const std::string & topic_name,
54  rcl_subscription_options_t & subscription_options)>;
55 
57 
58  // Function that takes a MessageT from the intra process manager
60  void(
61  rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm,
62  rclcpp::SubscriptionBase::SharedPtr subscription,
63  const rcl_subscription_options_t & subscription_options)>;
64 
66 };
67 
69 template<
70  typename MessageT,
71  typename CallbackT,
72  typename Alloc,
73  typename CallbackMessageT,
74  typename SubscriptionT>
77  CallbackT && callback,
79  CallbackMessageT, Alloc>::SharedPtr
80  msg_mem_strat,
81  std::shared_ptr<Alloc> allocator)
82 {
83  SubscriptionFactory factory;
84 
86  AnySubscriptionCallback<CallbackMessageT, Alloc> any_subscription_callback(allocator);
87  any_subscription_callback.set(std::forward<CallbackT>(callback));
88 
89  auto message_alloc =
90  std::make_shared<typename Subscription<CallbackMessageT, Alloc>::MessageAlloc>();
91 
92  // factory function that creates a MessageT specific SubscriptionT
94  [allocator, msg_mem_strat, any_subscription_callback, message_alloc](
96  const std::string & topic_name,
97  rcl_subscription_options_t & subscription_options
98  ) -> rclcpp::SubscriptionBase::SharedPtr
99  {
100  subscription_options.allocator =
101  rclcpp::allocator::get_rcl_allocator<CallbackMessageT>(*message_alloc.get());
102 
103  using rclcpp::Subscription;
105 
107  node_base->get_shared_rcl_node_handle(),
108  *rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
109  topic_name,
110  subscription_options,
111  any_subscription_callback,
112  msg_mem_strat);
113  auto sub_base_ptr = std::dynamic_pointer_cast<SubscriptionBase>(sub);
114  return sub_base_ptr;
115  };
116 
117  // function that will setup intra process communications for the subscription
118  factory.setup_intra_process =
119  [message_alloc](
120  rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm,
121  rclcpp::SubscriptionBase::SharedPtr subscription,
122  const rcl_subscription_options_t & subscription_options)
123  {
124  rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = ipm;
125  uint64_t intra_process_subscription_id = ipm->add_subscription(subscription);
126 
127  auto intra_process_options = rcl_subscription_get_default_options();
128  intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator<CallbackMessageT>(
129  *message_alloc.get());
130  intra_process_options.qos = subscription_options.qos;
131  intra_process_options.ignore_local_publications = false;
132 
133  // function that will be called to take a MessageT from the intra process manager
134  auto take_intra_process_message_func =
135  [weak_ipm](
136  uint64_t publisher_id,
137  uint64_t message_sequence,
138  uint64_t subscription_id,
140  {
141  auto ipm = weak_ipm.lock();
142  if (!ipm) {
143  // TODO(wjwwood): should this just return silently? Or return with a logged warning?
144  throw std::runtime_error(
145  "intra process take called after destruction of intra process manager");
146  }
147  ipm->take_intra_process_message<CallbackMessageT, Alloc>(
148  publisher_id, message_sequence, subscription_id, message);
149  };
150 
151  // function that is called to see if the publisher id matches any local publishers
152  auto matches_any_publisher_func =
153  [weak_ipm](const rmw_gid_t * sender_gid) -> bool
154  {
155  auto ipm = weak_ipm.lock();
156  if (!ipm) {
157  throw std::runtime_error(
158  "intra process publisher check called "
159  "after destruction of intra process manager");
160  }
161  return ipm->matches_any_publishers(sender_gid);
162  };
163 
164  auto typed_sub_ptr = std::dynamic_pointer_cast<SubscriptionT>(subscription);
165  typed_sub_ptr->setup_intra_process(
166  intra_process_subscription_id,
167  take_intra_process_message_func,
168  matches_any_publisher_func,
169  intra_process_options
170  );
171  };
172  // end definition of factory function to setup intra process
173 
174  // return the factory now that it is populated
175  return factory;
176 }
177 
178 } // namespace rclcpp
179 
180 #endif // RCLCPP__SUBSCRIPTION_FACTORY_HPP_
Subscription implementation, templated on the type of message this subscription receives.
Definition: subscription.hpp:147
SetupIntraProcessFunction setup_intra_process
Definition: subscription_factory.hpp:65
Definition: allocator_common.hpp:24
Definition: any_subscription_callback.hpp:34
rcl_subscription_options_t rcl_subscription_get_default_options(void)
Definition: subscription.hpp:51
T dynamic_pointer_cast(T... args)
Default allocation strategy for messages received by subscriptions.
Definition: message_memory_strategy.hpp:38
Pure virtual interface class for the NodeBase part of the Node API.
Definition: node_base_interface.hpp:36
SubscriptionFactoryFunction create_typed_subscription
Definition: subscription_factory.hpp:56
Factory with functions used to create a Subscription<MessageT>.
Definition: subscription_factory.hpp:47
void set(CallbackT callback)
Definition: any_subscription_callback.hpp:79
SubscriptionFactory create_subscription_factory(CallbackT &&callback, typename rclcpp::message_memory_strategy::MessageMemoryStrategy< CallbackMessageT, Alloc >::SharedPtr msg_mem_strat, std::shared_ptr< Alloc > allocator)
Return a SubscriptionFactory with functions for creating a SubscriptionT<MessageT, Alloc>.
Definition: subscription_factory.hpp:76