rclcpp  master
C++ ROS Client Library API
subscription_factory.hpp
Go to the documentation of this file.
1 // Copyright 2016 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__SUBSCRIPTION_FACTORY_HPP_
16 #define RCLCPP__SUBSCRIPTION_FACTORY_HPP_
17 
18 #include <functional>
19 #include <memory>
20 #include <string>
21 #include <utility>
22 
23 #include "rcl/subscription.h"
24 
25 #include "rosidl_typesupport_cpp/message_type_support.hpp"
26 
27 #include "rclcpp/subscription.hpp"
31 
32 namespace rclcpp
33 {
34 
36 
47 {
48  // Creates a Subscription<MessageT> object and returns it as a SubscriptionBase.
49  using SubscriptionFactoryFunction = std::function<
50  rclcpp::subscription::SubscriptionBase::SharedPtr(
52  const std::string & topic_name,
53  rcl_subscription_options_t & subscription_options)>;
54 
56 
57  // Function that takes a MessageT from the intra process manager
58  using SetupIntraProcessFunction = std::function<void(
59  rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm,
60  rclcpp::subscription::SubscriptionBase::SharedPtr subscription,
61  const rcl_subscription_options_t & subscription_options)>;
62 
64 };
65 
67 template<typename MessageT, typename CallbackT, typename Alloc, typename SubscriptionT>
70  CallbackT && callback,
72  msg_mem_strat,
73  std::shared_ptr<Alloc> allocator)
74 {
75  SubscriptionFactory factory;
76 
78  AnySubscriptionCallback<MessageT, Alloc> any_subscription_callback(allocator);
79  any_subscription_callback.set(std::forward<CallbackT>(callback));
80 
81  auto message_alloc =
82  std::make_shared<typename subscription::Subscription<MessageT, Alloc>::MessageAlloc>();
83 
84  // factory function that creates a MessageT specific SubscriptionT
86  [allocator, msg_mem_strat, any_subscription_callback, message_alloc](
88  const std::string & topic_name,
89  rcl_subscription_options_t & subscription_options
90  ) -> rclcpp::subscription::SubscriptionBase::SharedPtr
91  {
92  subscription_options.allocator =
93  rclcpp::allocator::get_rcl_allocator<MessageT>(*message_alloc.get());
94 
97 
99  node_base->get_shared_rcl_node_handle(),
100  topic_name,
101  subscription_options,
102  any_subscription_callback,
103  msg_mem_strat);
104  auto sub_base_ptr = std::dynamic_pointer_cast<SubscriptionBase>(sub);
105  return sub_base_ptr;
106  };
107 
108  // function that will setup intra process communications for the subscription
109  factory.setup_intra_process =
110  [message_alloc](
111  rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm,
112  rclcpp::subscription::SubscriptionBase::SharedPtr subscription,
113  const rcl_subscription_options_t & subscription_options)
114  {
115  rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = ipm;
116  uint64_t intra_process_subscription_id = ipm->add_subscription(subscription);
117 
118  auto intra_process_options = rcl_subscription_get_default_options();
119  intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator<MessageT>(
120  *message_alloc.get());
121  intra_process_options.qos = subscription_options.qos;
122  intra_process_options.ignore_local_publications = false;
123 
124  // function that will be called to take a MessageT from the intra process manager
125  auto take_intra_process_message_func =
126  [weak_ipm](
127  uint64_t publisher_id,
128  uint64_t message_sequence,
129  uint64_t subscription_id,
131  {
132  auto ipm = weak_ipm.lock();
133  if (!ipm) {
134  // TODO(wjwwood): should this just return silently? Or return with a logged warning?
135  throw std::runtime_error(
136  "intra process take called after destruction of intra process manager");
137  }
138  ipm->take_intra_process_message<MessageT, Alloc>(
139  publisher_id, message_sequence, subscription_id, message);
140  };
141 
142  // function that is called to see if the publisher id matches any local publishers
143  auto matches_any_publisher_func =
144  [weak_ipm](const rmw_gid_t * sender_gid) -> bool
145  {
146  auto ipm = weak_ipm.lock();
147  if (!ipm) {
148  throw std::runtime_error(
149  "intra process publisher check called "
150  "after destruction of intra process manager");
151  }
152  return ipm->matches_any_publishers(sender_gid);
153  };
154 
155  auto typed_sub_ptr = std::dynamic_pointer_cast<SubscriptionT>(subscription);
156  typed_sub_ptr->setup_intra_process(
157  intra_process_subscription_id,
158  take_intra_process_message_func,
159  matches_any_publisher_func,
160  intra_process_options
161  );
162  };
163  // end definition of factory function to setup intra process
164 
165  // return the factory now that it is populated
166  return factory;
167 }
168 
169 } // namespace rclcpp
170 
171 #endif // RCLCPP__SUBSCRIPTION_FACTORY_HPP_
Default allocation strategy for messages received by subscriptions.
Definition: message_memory_strategy.hpp:33
Subscription implementation, templated on the type of message this subscription receives.
Definition: subscription.hpp:124
Definition: any_subscription_callback.hpp:37
std::unique_ptr< MessageT, MessageDeleter > MessageUniquePtr
Definition: subscription.hpp:132
std::function< void(rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm, rclcpp::subscription::SubscriptionBase::SharedPtr subscription, const rcl_subscription_options_t &subscription_options)> SetupIntraProcessFunction
Definition: subscription_factory.hpp:61
Definition: allocator_common.hpp:24
SubscriptionFactory create_subscription_factory(CallbackT &&callback, typename rclcpp::message_memory_strategy::MessageMemoryStrategy< MessageT, Alloc >::SharedPtr msg_mem_strat, std::shared_ptr< Alloc > allocator)
Return a SubscriptionFactory with functions for creating a SubscriptionT<MessageT, Alloc>.
Definition: subscription_factory.hpp:69
rcl_subscription_options_t rcl_subscription_get_default_options(void)
SubscriptionFactoryFunction create_typed_subscription
Definition: subscription_factory.hpp:55
Pure virtual interface class for the NodeBase part of the Node API.
Definition: node_base_interface.hpp:36
Factory with functions used to create a Subscription<MessageT>.
Definition: subscription_factory.hpp:46
Definition: subscription.hpp:53
SetupIntraProcessFunction setup_intra_process
Definition: subscription_factory.hpp:63
std::function< rclcpp::subscription::SubscriptionBase::SharedPtr(rclcpp::node_interfaces::NodeBaseInterface *node_base, const std::string &topic_name, rcl_subscription_options_t &subscription_options)> SubscriptionFactoryFunction
Definition: subscription_factory.hpp:53