rclcpp  master
C++ ROS Client Library API
subscription_factory.hpp
Go to the documentation of this file.
1 // Copyright 2016 Open Source Robotics Foundation, Inc.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef RCLCPP__SUBSCRIPTION_FACTORY_HPP_
16 #define RCLCPP__SUBSCRIPTION_FACTORY_HPP_
17 
18 #include <functional>
19 #include <memory>
20 #include <string>
21 #include <utility>
22 
23 #include "rcl/subscription.h"
24 
25 #include "rosidl_typesupport_cpp/message_type_support.hpp"
26 
27 #include "rclcpp/subscription.hpp"
31 
32 namespace rclcpp
33 {
34 
36 
47 {
48  // Creates a Subscription<MessageT> object and returns it as a SubscriptionBase.
50  rclcpp::SubscriptionBase::SharedPtr(
52  const std::string & topic_name,
53  rcl_subscription_options_t & subscription_options)>;
54 
56 
57  // Function that takes a MessageT from the intra process manager
59  void(
60  rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm,
61  rclcpp::SubscriptionBase::SharedPtr subscription,
62  const rcl_subscription_options_t & subscription_options)>;
63 
65 };
66 
68 template<typename MessageT, typename CallbackT, typename Alloc, typename SubscriptionT>
71  CallbackT && callback,
73  msg_mem_strat,
74  std::shared_ptr<Alloc> allocator)
75 {
76  SubscriptionFactory factory;
77 
79  AnySubscriptionCallback<MessageT, Alloc> any_subscription_callback(allocator);
80  any_subscription_callback.set(std::forward<CallbackT>(callback));
81 
82  auto message_alloc =
83  std::make_shared<typename Subscription<MessageT, Alloc>::MessageAlloc>();
84 
85  // factory function that creates a MessageT specific SubscriptionT
87  [allocator, msg_mem_strat, any_subscription_callback, message_alloc](
89  const std::string & topic_name,
90  rcl_subscription_options_t & subscription_options
91  ) -> rclcpp::SubscriptionBase::SharedPtr
92  {
93  subscription_options.allocator =
94  rclcpp::allocator::get_rcl_allocator<MessageT>(*message_alloc.get());
95 
98 
100  node_base->get_shared_rcl_node_handle(),
101  topic_name,
102  subscription_options,
103  any_subscription_callback,
104  msg_mem_strat);
105  auto sub_base_ptr = std::dynamic_pointer_cast<SubscriptionBase>(sub);
106  return sub_base_ptr;
107  };
108 
109  // function that will setup intra process communications for the subscription
110  factory.setup_intra_process =
111  [message_alloc](
112  rclcpp::intra_process_manager::IntraProcessManager::SharedPtr ipm,
113  rclcpp::SubscriptionBase::SharedPtr subscription,
114  const rcl_subscription_options_t & subscription_options)
115  {
116  rclcpp::intra_process_manager::IntraProcessManager::WeakPtr weak_ipm = ipm;
117  uint64_t intra_process_subscription_id = ipm->add_subscription(subscription);
118 
119  auto intra_process_options = rcl_subscription_get_default_options();
120  intra_process_options.allocator = rclcpp::allocator::get_rcl_allocator<MessageT>(
121  *message_alloc.get());
122  intra_process_options.qos = subscription_options.qos;
123  intra_process_options.ignore_local_publications = false;
124 
125  // function that will be called to take a MessageT from the intra process manager
126  auto take_intra_process_message_func =
127  [weak_ipm](
128  uint64_t publisher_id,
129  uint64_t message_sequence,
130  uint64_t subscription_id,
132  {
133  auto ipm = weak_ipm.lock();
134  if (!ipm) {
135  // TODO(wjwwood): should this just return silently? Or return with a logged warning?
136  throw std::runtime_error(
137  "intra process take called after destruction of intra process manager");
138  }
139  ipm->take_intra_process_message<MessageT, Alloc>(
140  publisher_id, message_sequence, subscription_id, message);
141  };
142 
143  // function that is called to see if the publisher id matches any local publishers
144  auto matches_any_publisher_func =
145  [weak_ipm](const rmw_gid_t * sender_gid) -> bool
146  {
147  auto ipm = weak_ipm.lock();
148  if (!ipm) {
149  throw std::runtime_error(
150  "intra process publisher check called "
151  "after destruction of intra process manager");
152  }
153  return ipm->matches_any_publishers(sender_gid);
154  };
155 
156  auto typed_sub_ptr = std::dynamic_pointer_cast<SubscriptionT>(subscription);
157  typed_sub_ptr->setup_intra_process(
158  intra_process_subscription_id,
159  take_intra_process_message_func,
160  matches_any_publisher_func,
161  intra_process_options
162  );
163  };
164  // end definition of factory function to setup intra process
165 
166  // return the factory now that it is populated
167  return factory;
168 }
169 
170 } // namespace rclcpp
171 
172 #endif // RCLCPP__SUBSCRIPTION_FACTORY_HPP_
Subscription implementation, templated on the type of message this subscription receives.
Definition: subscription.hpp:123
SetupIntraProcessFunction setup_intra_process
Definition: subscription_factory.hpp:64
Definition: allocator_common.hpp:24
Definition: any_subscription_callback.hpp:34
SubscriptionFactory create_subscription_factory(CallbackT &&callback, typename rclcpp::message_memory_strategy::MessageMemoryStrategy< MessageT, Alloc >::SharedPtr msg_mem_strat, std::shared_ptr< Alloc > allocator)
Return a SubscriptionFactory with functions for creating a SubscriptionT<MessageT, Alloc>.
Definition: subscription_factory.hpp:70
rcl_subscription_options_t rcl_subscription_get_default_options(void)
Definition: subscription.hpp:50
T dynamic_pointer_cast(T... args)
Default allocation strategy for messages received by subscriptions.
Definition: message_memory_strategy.hpp:33
Pure virtual interface class for the NodeBase part of the Node API.
Definition: node_base_interface.hpp:36
SubscriptionFactoryFunction create_typed_subscription
Definition: subscription_factory.hpp:55
Factory with functions used to create a Subscription<MessageT>.
Definition: subscription_factory.hpp:46
void set(CallbackT callback)
Definition: any_subscription_callback.hpp:79