/
/
/
1#include "bag_recorder_backend/bag_recorder_node.hpp"
2#include <rclcpp/serialization.hpp>
3#include <rosbag2_storage/serialized_bag_message.hpp>
4#include <filesystem>
5#include <chrono>
6#include <iomanip>
7#include <sstream>
8
9namespace bag_recorder_backend
10{
11
12BagRecorderNode::BagRecorderNode(const rclcpp::NodeOptions & options)
13: Node("bag_recorder_node", options),
14 recording_active_(false),
15 should_stop_(false),
16 output_directory_("/tmp/rosbags")
17{
18 declare_parameters();
19 load_parameters();
20
21 // Initialize status
22 status_.is_recording = false;
23 status_.recorded_messages = 0;
24 status_.recording_duration = 0.0;
25
26 // Start status update thread
27 status_update_thread_ = std::thread(&BagRecorderNode::update_status, this);
28
29 RCLCPP_INFO(this->get_logger(), "Bag Recorder Node initialized");
30}
31
32BagRecorderNode::~BagRecorderNode()
33{
34 should_stop_ = true;
35
36 if (recording_active_) {
37 stop_recording();
38 }
39
40 if (status_update_thread_.joinable()) {
41 status_update_thread_.join();
42 }
43}
44
45void BagRecorderNode::declare_parameters()
46{
47 this->declare_parameter("output_directory", "/tmp/rosbags");
48 this->declare_parameter("default_storage_id", "sqlite3");
49 this->declare_parameter("default_serialization_format", "cdr");
50 this->declare_parameter("max_bagfile_size", 0);
51 this->declare_parameter("max_bagfile_duration", 0.0);
52}
53
54void BagRecorderNode::load_parameters()
55{
56 output_directory_ = this->get_parameter("output_directory").as_string();
57
58 // Create output directory if it doesn't exist
59 std::filesystem::create_directories(output_directory_);
60}
61
62bool BagRecorderNode::start_recording(const RecordingConfig & config)
63{
64 std::lock_guard<std::mutex> lock(writer_mutex_);
65
66 if (recording_active_) {
67 RCLCPP_WARN(this->get_logger(), "Recording already active");
68 return false;
69 }
70
71 try {
72 current_config_ = config;
73
74 // Ensure unique output path to avoid database conflicts
75 current_config_.output_path = ensure_unique_output_path(config.output_path);
76
77 setup_writer(current_config_);
78 create_subscriptions(current_config_.topics);
79
80 recording_active_ = true;
81 recording_start_time_ = this->now();
82
83 std::lock_guard<std::mutex> status_lock(status_mutex_);
84 status_.is_recording = true;
85 status_.current_bag_path = current_config_.output_path;
86 status_.recorded_messages = 0;
87 status_.recording_duration = 0.0;
88 status_.active_topics = current_config_.topics;
89
90 RCLCPP_INFO(this->get_logger(), "Recording started: %s", current_config_.output_path.c_str());
91 return true;
92
93 } catch (const std::exception & e) {
94 RCLCPP_ERROR(this->get_logger(), "Failed to start recording: %s", e.what());
95 cleanup_writer();
96 return false;
97 }
98}
99
100bool BagRecorderNode::stop_recording()
101{
102 std::lock_guard<std::mutex> lock(writer_mutex_);
103
104 if (!recording_active_) {
105 RCLCPP_WARN(this->get_logger(), "No active recording to stop");
106 return false;
107 }
108
109 try {
110 recording_active_ = false;
111 remove_subscriptions();
112 cleanup_writer();
113
114 std::lock_guard<std::mutex> status_lock(status_mutex_);
115 status_.is_recording = false;
116 status_.active_topics.clear();
117
118 RCLCPP_INFO(this->get_logger(), "Recording stopped");
119 return true;
120
121 } catch (const std::exception & e) {
122 RCLCPP_ERROR(this->get_logger(), "Error stopping recording: %s", e.what());
123 return false;
124 }
125}
126
127bool BagRecorderNode::is_recording() const
128{
129 return recording_active_;
130}
131
132RecordingStatus BagRecorderNode::get_status() const
133{
134 std::lock_guard<std::mutex> lock(status_mutex_);
135 return status_;
136}
137
138std::vector<std::string> BagRecorderNode::get_available_topics() const
139{
140 auto topic_names_and_types = this->get_topic_names_and_types();
141 std::vector<std::string> topics;
142
143 for (const auto & topic_info : topic_names_and_types) {
144 topics.push_back(topic_info.first);
145 }
146
147 return topics;
148}
149
150std::vector<std::string> BagRecorderNode::get_topic_types() const
151{
152 auto topic_names_and_types = this->get_topic_names_and_types();
153 std::vector<std::string> types;
154
155 for (const auto & topic_info : topic_names_and_types) {
156 for (const auto & type : topic_info.second) {
157 types.push_back(type);
158 }
159 }
160
161 return types;
162}
163
164void BagRecorderNode::set_output_directory(const std::string & directory)
165{
166 output_directory_ = directory;
167 std::filesystem::create_directories(output_directory_);
168}
169
170std::string BagRecorderNode::get_output_directory() const
171{
172 return output_directory_;
173}
174
175void BagRecorderNode::setup_writer(const RecordingConfig & config)
176{
177 // Configure storage options
178 storage_options_.uri = config.output_path;
179 storage_options_.storage_id = config.storage_id;
180 storage_options_.max_bagfile_size = config.max_bagfile_size;
181 storage_options_.max_bagfile_duration =
182 std::chrono::duration_cast<std::chrono::nanoseconds>(
183 std::chrono::duration<double>(config.max_bagfile_duration)).count();
184
185 // Configure converter options
186 converter_options_.input_serialization_format = config.serialization_format;
187 converter_options_.output_serialization_format = config.serialization_format;
188
189 // Create and open writer
190 writer_ = std::make_unique<rosbag2_cpp::Writer>();
191 writer_->open(storage_options_, converter_options_);
192
193 // Create topic metadata for all topics
194 discover_topics();
195 for (const auto & topic_name : config.topics) {
196 if (topic_types_.find(topic_name) != topic_types_.end()) {
197 rosbag2_storage::TopicMetadata topic_metadata;
198 topic_metadata.name = topic_name;
199 topic_metadata.type = topic_types_[topic_name];
200 topic_metadata.serialization_format = config.serialization_format;
201
202 writer_->create_topic(topic_metadata);
203 }
204 }
205}
206
207void BagRecorderNode::cleanup_writer()
208{
209 if (writer_) {
210 writer_->close();
211 writer_.reset();
212 }
213}
214
215void BagRecorderNode::discover_topics()
216{
217 auto topic_names_and_types = this->get_topic_names_and_types();
218 topic_types_.clear();
219
220 for (const auto & topic_info : topic_names_and_types) {
221 if (!topic_info.second.empty()) {
222 topic_types_[topic_info.first] = topic_info.second[0];
223 }
224 }
225}
226
227void BagRecorderNode::create_subscriptions(const std::vector<std::string> & topics)
228{
229 remove_subscriptions(); // Clean up any existing subscriptions
230
231 for (const auto & topic_name : topics) {
232 if (topic_types_.find(topic_name) != topic_types_.end()) {
233 auto subscription = this->create_generic_subscription(
234 topic_name,
235 topic_types_[topic_name],
236 rclcpp::QoS(10),
237 [this, topic_name](std::shared_ptr<rclcpp::SerializedMessage> msg) {
238 this->generic_callback(msg, topic_name, topic_types_[topic_name]);
239 }
240 );
241
242 active_subscriptions_.push_back(subscription);
243 }
244 }
245}
246
247void BagRecorderNode::remove_subscriptions()
248{
249 active_subscriptions_.clear();
250}
251
252void BagRecorderNode::generic_callback(
253 std::shared_ptr<rclcpp::SerializedMessage> msg,
254 const std::string & topic_name,
255 const std::string & type_name)
256{
257 if (!recording_active_ || !writer_) {
258 return;
259 }
260
261 std::lock_guard<std::mutex> lock(writer_mutex_);
262
263 try {
264 // Create bag message
265 auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
266 bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
267 new rcutils_uint8_array_t(msg->get_rcl_serialized_message()),
268 [](rcutils_uint8_array_t * data) {
269 delete data;
270 }
271 );
272 bag_message->time_stamp = this->now().nanoseconds();
273 bag_message->topic_name = topic_name;
274
275 // Write to bag
276 writer_->write(bag_message);
277
278 // Update statistics
279 std::lock_guard<std::mutex> status_lock(status_mutex_);
280 status_.recorded_messages++;
281
282 } catch (const std::exception & e) {
283 RCLCPP_ERROR(this->get_logger(), "Error writing message: %s", e.what());
284 }
285}
286
287void BagRecorderNode::update_status()
288{
289 while (!should_stop_) {
290 if (recording_active_) {
291 auto current_time = this->now();
292 double duration = (current_time - recording_start_time_).seconds();
293
294 std::lock_guard<std::mutex> lock(status_mutex_);
295 status_.recording_duration = duration;
296 }
297
298 std::this_thread::sleep_for(std::chrono::milliseconds(100));
299 }
300}
301
302std::string BagRecorderNode::ensure_unique_output_path(const std::string & requested_path)
303{
304 // If path is empty or default, generate timestamped path
305 if (requested_path.empty() || requested_path == "/tmp/rosbags") {
306 auto now = std::chrono::system_clock::now();
307 auto time_t = std::chrono::system_clock::to_time_t(now);
308 auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(
309 now.time_since_epoch()) % 1000;
310
311 std::stringstream ss;
312 ss << "/tmp/rosbags/recording_"
313 << std::put_time(std::localtime(&time_t), "%Y%m%d_%H%M%S")
314 << "_" << std::setfill('0') << std::setw(3) << millis.count();
315
316 return ss.str();
317 }
318
319 // For custom paths, check if directory exists
320 if (std::filesystem::exists(requested_path)) {
321 // If directory exists and contains bag files, create a timestamped variant
322 if (std::filesystem::is_directory(requested_path) && !std::filesystem::is_empty(requested_path)) {
323 RCLCPP_WARN(this->get_logger(),
324 "Output directory %s already exists and is not empty. Creating timestamped variant.",
325 requested_path.c_str());
326
327 auto now = std::chrono::system_clock::now();
328 auto time_t = std::chrono::system_clock::to_time_t(now);
329 auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(
330 now.time_since_epoch()) % 1000;
331
332 std::stringstream ss;
333 ss << requested_path << "_"
334 << std::put_time(std::localtime(&time_t), "%Y%m%d_%H%M%S")
335 << "_" << std::setfill('0') << std::setw(3) << millis.count();
336
337 return ss.str();
338 }
339 }
340
341 return requested_path;
342}
343
344} // namespace bag_recorder_backend