/
/
/
1#include "bag_recorder_backend/bag_recorder_node.hpp"
2
3#include <chrono>
4#include <filesystem>
5#include <iomanip>
6#include <rclcpp/serialization.hpp>
7#include <rosbag2_storage/serialized_bag_message.hpp>
8#include <sstream>
9
10namespace bag_recorder_backend {
11
12BagRecorderNode::BagRecorderNode(const rclcpp::NodeOptions& options)
13 : Node("bag_recorder_node", options), recording_active_(false), should_stop_(false), output_directory_("/tmp/rosbags") {
14 declare_parameters();
15 load_parameters();
16
17 // Initialize status
18 status_.is_recording = false;
19 status_.recorded_messages = 0;
20 status_.recording_duration = 0.0;
21
22 // Start status update thread
23 status_update_thread_ = std::thread(&BagRecorderNode::update_status, this);
24
25 RCLCPP_INFO(this->get_logger(), "Bag Recorder Node initialized");
26}
27
28BagRecorderNode::~BagRecorderNode() {
29 should_stop_ = true;
30
31 if (recording_active_) {
32 stop_recording();
33 }
34
35 if (status_update_thread_.joinable()) {
36 status_update_thread_.join();
37 }
38}
39
40void BagRecorderNode::declare_parameters() {
41 this->declare_parameter("output_directory", "/tmp/rosbags");
42 this->declare_parameter("default_storage_id", "sqlite3");
43 this->declare_parameter("default_serialization_format", "cdr");
44 this->declare_parameter("max_bagfile_size", 0);
45 this->declare_parameter("max_bagfile_duration", 0.0);
46}
47
48void BagRecorderNode::load_parameters() {
49 output_directory_ = this->get_parameter("output_directory").as_string();
50
51 // Create output directory if it doesn't exist
52 std::filesystem::create_directories(output_directory_);
53}
54
55bool BagRecorderNode::start_recording(const RecordingConfig& config) {
56 std::lock_guard<std::mutex> lock(writer_mutex_);
57
58 if (recording_active_) {
59 RCLCPP_WARN(this->get_logger(), "Recording already active");
60 return false;
61 }
62
63 try {
64 current_config_ = config;
65
66 // Ensure unique output path to avoid database conflicts
67 current_config_.output_path = ensure_unique_output_path(config.output_path);
68
69 setup_writer(current_config_);
70 create_subscriptions(current_config_.topics);
71
72 recording_active_ = true;
73 recording_start_time_ = this->now();
74
75 std::lock_guard<std::mutex> status_lock(status_mutex_);
76 status_.is_recording = true;
77 status_.current_bag_path = current_config_.output_path;
78 status_.recorded_messages = 0;
79 status_.recording_duration = 0.0;
80 status_.active_topics = current_config_.topics;
81
82 RCLCPP_INFO(this->get_logger(), "Recording started: %s", current_config_.output_path.c_str());
83 return true;
84
85 } catch (const std::exception& e) {
86 RCLCPP_ERROR(this->get_logger(), "Failed to start recording: %s", e.what());
87 cleanup_writer();
88 return false;
89 }
90}
91
92bool BagRecorderNode::stop_recording() {
93 std::lock_guard<std::mutex> lock(writer_mutex_);
94
95 if (!recording_active_) {
96 RCLCPP_WARN(this->get_logger(), "No active recording to stop");
97 return false;
98 }
99
100 try {
101 recording_active_ = false;
102 remove_subscriptions();
103 cleanup_writer();
104
105 std::lock_guard<std::mutex> status_lock(status_mutex_);
106 status_.is_recording = false;
107 status_.active_topics.clear();
108
109 RCLCPP_INFO(this->get_logger(), "Recording stopped");
110 return true;
111
112 } catch (const std::exception& e) {
113 RCLCPP_ERROR(this->get_logger(), "Error stopping recording: %s", e.what());
114 return false;
115 }
116}
117
118bool BagRecorderNode::is_recording() const { return recording_active_; }
119
120RecordingStatus BagRecorderNode::get_status() const {
121 std::lock_guard<std::mutex> lock(status_mutex_);
122 return status_;
123}
124
125std::vector<std::string> BagRecorderNode::get_available_topics() const {
126 auto topic_names_and_types = this->get_topic_names_and_types();
127 std::vector<std::string> topics;
128
129 for (const auto& topic_info : topic_names_and_types) {
130 topics.push_back(topic_info.first);
131 }
132
133 return topics;
134}
135
136std::vector<std::string> BagRecorderNode::get_topic_types() const {
137 auto topic_names_and_types = this->get_topic_names_and_types();
138 std::vector<std::string> types;
139
140 for (const auto& topic_info : topic_names_and_types) {
141 for (const auto& type : topic_info.second) {
142 types.push_back(type);
143 }
144 }
145
146 return types;
147}
148
149void BagRecorderNode::set_output_directory(const std::string& directory) {
150 output_directory_ = directory;
151 std::filesystem::create_directories(output_directory_);
152}
153
154std::string BagRecorderNode::get_output_directory() const { return output_directory_; }
155
156void BagRecorderNode::setup_writer(const RecordingConfig& config) {
157 // Configure storage options
158 storage_options_.uri = config.output_path;
159 storage_options_.storage_id = config.storage_id;
160 storage_options_.max_bagfile_size = config.max_bagfile_size;
161 storage_options_.max_bagfile_duration = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::duration<double>(config.max_bagfile_duration)).count();
162
163 // Configure converter options
164 converter_options_.input_serialization_format = config.serialization_format;
165 converter_options_.output_serialization_format = config.serialization_format;
166
167 // Create and open writer
168 writer_ = std::make_unique<rosbag2_cpp::Writer>();
169 writer_->open(storage_options_, converter_options_);
170
171 // Create topic metadata for all topics
172 discover_topics();
173 for (const auto& topic_name : config.topics) {
174 if (topic_types_.find(topic_name) != topic_types_.end()) {
175 rosbag2_storage::TopicMetadata topic_metadata;
176 topic_metadata.name = topic_name;
177 topic_metadata.type = topic_types_[topic_name];
178 topic_metadata.serialization_format = config.serialization_format;
179
180 writer_->create_topic(topic_metadata);
181 }
182 }
183}
184
185void BagRecorderNode::cleanup_writer() {
186 if (writer_) {
187 writer_->close();
188 writer_.reset();
189 }
190}
191
192void BagRecorderNode::discover_topics() {
193 auto topic_names_and_types = this->get_topic_names_and_types();
194 topic_types_.clear();
195
196 for (const auto& topic_info : topic_names_and_types) {
197 if (!topic_info.second.empty()) {
198 topic_types_[topic_info.first] = topic_info.second[0];
199 }
200 }
201}
202
203void BagRecorderNode::create_subscriptions(const std::vector<std::string>& topics) {
204 remove_subscriptions(); // Clean up any existing subscriptions
205
206 for (const auto& topic_name : topics) {
207 if (topic_types_.find(topic_name) != topic_types_.end()) {
208 auto subscription = this->create_generic_subscription(
209 topic_name, topic_types_[topic_name], rclcpp::QoS(10),
210 [this, topic_name](std::shared_ptr<rclcpp::SerializedMessage> msg) { this->generic_callback(msg, topic_name, topic_types_[topic_name]); });
211
212 active_subscriptions_.push_back(subscription);
213 }
214 }
215}
216
217void BagRecorderNode::remove_subscriptions() { active_subscriptions_.clear(); }
218
219void BagRecorderNode::generic_callback(std::shared_ptr<rclcpp::SerializedMessage> msg, const std::string& topic_name, const std::string& type_name) {
220 if (!recording_active_ || !writer_) {
221 return;
222 }
223
224 std::lock_guard<std::mutex> lock(writer_mutex_);
225
226 try {
227 // Create bag message
228 auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
229 bag_message->serialized_data =
230 std::shared_ptr<rcutils_uint8_array_t>(new rcutils_uint8_array_t(msg->get_rcl_serialized_message()), [](rcutils_uint8_array_t* data) { delete data; });
231 bag_message->time_stamp = this->now().nanoseconds();
232 bag_message->topic_name = topic_name;
233
234 // Write to bag
235 writer_->write(bag_message);
236
237 // Update statistics
238 std::lock_guard<std::mutex> status_lock(status_mutex_);
239 status_.recorded_messages++;
240
241 } catch (const std::exception& e) {
242 RCLCPP_ERROR(this->get_logger(), "Error writing message: %s", e.what());
243 }
244}
245
246void BagRecorderNode::update_status() {
247 while (!should_stop_) {
248 if (recording_active_) {
249 auto current_time = this->now();
250 double duration = (current_time - recording_start_time_).seconds();
251
252 std::lock_guard<std::mutex> lock(status_mutex_);
253 status_.recording_duration = duration;
254 }
255
256 std::this_thread::sleep_for(std::chrono::milliseconds(100));
257 }
258}
259
260std::string BagRecorderNode::ensure_unique_output_path(const std::string& requested_path) {
261 // If path is empty or default, generate timestamped path
262 if (requested_path.empty() || requested_path == "/tmp/rosbags") {
263 auto now = std::chrono::system_clock::now();
264 auto time_t = std::chrono::system_clock::to_time_t(now);
265 auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()) % 1000;
266
267 std::stringstream ss;
268 ss << "/tmp/rosbags/recording_" << std::put_time(std::localtime(&time_t), "%Y%m%d_%H%M%S") << "_" << std::setfill('0') << std::setw(3) << millis.count();
269
270 return ss.str();
271 }
272
273 // For custom paths, check if directory exists
274 if (std::filesystem::exists(requested_path)) {
275 // If directory exists and contains bag files, create a timestamped variant
276 if (std::filesystem::is_directory(requested_path) && !std::filesystem::is_empty(requested_path)) {
277 RCLCPP_WARN(this->get_logger(), "Output directory %s already exists and is not empty. Creating timestamped variant.", requested_path.c_str());
278
279 auto now = std::chrono::system_clock::now();
280 auto time_t = std::chrono::system_clock::to_time_t(now);
281 auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()) % 1000;
282
283 std::stringstream ss;
284 ss << requested_path << "_" << std::put_time(std::localtime(&time_t), "%Y%m%d_%H%M%S") << "_" << std::setfill('0') << std::setw(3) << millis.count();
285
286 return ss.str();
287 }
288 }
289
290 return requested_path;
291}
292
293} // namespace bag_recorder_backend