/
/
/
1"""Some helpers for Filesystem based Musicproviders."""
2
3from __future__ import annotations
4
5import errno
6import logging
7import os
8import re
9from collections.abc import Iterator
10from dataclasses import dataclass
11
12from music_assistant.helpers.compare import compare_strings
13
14logger = logging.getLogger(__name__)
15
16IGNORE_DIRS = ("recycle", "Recently-Snaphot", "#recycle", "System Volume Information", "lost+found")
17
18
19@dataclass
20class FileSystemItem:
21 """Representation of an item (file or directory) on the filesystem.
22
23 - filename: Name (not path) of the file (or directory).
24 - relative_path: Relative path to the item on this filesystem provider.
25 - absolute_path: Absolute path to this item.
26 - parent_path: Absolute path to the parent directory.
27 - is_dir: Boolean if item is directory (not file).
28 - checksum: Checksum for this path (usually last modified time) None for dir.
29 - file_size : File size in number of bytes or None if unknown (or not a file).
30 - created_at: File creation timestamp (Unix epoch) or None for directories.
31 """
32
33 filename: str
34 relative_path: str
35 absolute_path: str
36 is_dir: bool
37 checksum: str | None = None
38 file_size: int | None = None
39 created_at: int | None = None # file creation timestamp (Unix epoch)
40
41 @property
42 def ext(self) -> str | None:
43 """Return file extension."""
44 try:
45 # convert to lowercase to make it case insensitive when comparing
46 return self.filename.rsplit(".", 1)[1].lower()
47 except IndexError:
48 return None
49
50 @property
51 def name(self) -> str:
52 """Return file name (without extension)."""
53 return self.filename.rsplit(".", 1)[0]
54
55 @property
56 def parent_path(self) -> str:
57 """Return parent path of this item."""
58 return os.path.dirname(self.absolute_path)
59
60 @property
61 def parent_name(self) -> str:
62 """Return parent name of this item."""
63 return os.path.basename(self.parent_path)
64
65 @property
66 def relative_parent_path(self) -> str:
67 """Return relative parent path of this item."""
68 return os.path.dirname(self.relative_path)
69
70 @classmethod
71 def from_dir_entry(cls, entry: os.DirEntry[str], base_path: str) -> FileSystemItem:
72 """Create FileSystemItem from os.DirEntry. NOT Async friendly.
73
74 :raises OSError: If the file cannot be stat'd (e.g., invalid filename encoding).
75 """
76 if entry.is_dir(follow_symlinks=False):
77 return cls(
78 filename=entry.name,
79 relative_path=get_relative_path(base_path, entry.path),
80 absolute_path=entry.path,
81 is_dir=True,
82 checksum=None,
83 file_size=None,
84 )
85 # This can raise OSError for files with invalid encoding (e.g., emojis on SMB mounts)
86 # Let the caller handle the exception
87 stat = entry.stat(follow_symlinks=False)
88 # st_birthtime is available on macOS/Windows, st_ctime on Linux
89 # (on Linux st_ctime is metadata change time, not creation time)
90 created_at = int(getattr(stat, "st_birthtime", stat.st_ctime))
91 return cls(
92 filename=entry.name,
93 relative_path=get_relative_path(base_path, entry.path),
94 absolute_path=entry.path,
95 is_dir=False,
96 checksum=str(int(stat.st_mtime)),
97 file_size=stat.st_size,
98 created_at=created_at,
99 )
100
101
102def get_artist_dir(
103 artist_name: str,
104 album_dir: str | None,
105) -> str | None:
106 """Look for (Album)Artist directory in path of a track (or album)."""
107 if not album_dir:
108 return None
109 parentdir = os.path.dirname(album_dir)
110 # account for disc or album sublevel by ignoring (max) 2 levels if needed
111 matched_dir: str | None = None
112 for _ in range(3):
113 dirname = parentdir.rsplit(os.sep)[-1]
114 if compare_strings(artist_name, dirname, False):
115 # literal match
116 # we keep hunting further down to account for the
117 # edge case where the album name has the same name as the artist
118 matched_dir = parentdir
119 parentdir = os.path.dirname(parentdir)
120 return matched_dir
121
122
123def tokenize(input_str: str, delimiters: str) -> list[str]:
124 """Tokenizes the album names or paths."""
125 normalised = re.sub(delimiters, "^^^", input_str)
126 return [x for x in normalised.split("^^^") if x != ""]
127
128
129def _dir_contains_album_name(id3_album_name: str, directory_name: str) -> bool:
130 """Check if a directory name contains an album name.
131
132 This function tokenizes both input strings using different delimiters and
133 checks if the album name is a substring of the directory name.
134
135 First iteration considers the literal dash as one of the separators. The
136 second pass is to catch edge cases where the literal dash is part of the
137 album's name, not an actual separator. For example, an album like 'Aphex
138 Twin - Selected Ambient Works 85-92' would be correctly handled.
139
140 Args:
141 id3_album_name (str): The album name to search for.
142 directory_name (str): The directory name to search in.
143
144 Returns:
145 bool: True if the directory name contains the album name, False otherwise.
146 """
147 for delims in ["[-_ ]", "[_ ]"]:
148 tokenized_album_name = tokenize(id3_album_name, delims)
149 tokenized_dirname = tokenize(directory_name, delims)
150
151 # Exact match, potentially just on the album name
152 # in case artist's name is not included in id3_album_name
153 if all(token in tokenized_dirname for token in tokenized_album_name):
154 return True
155
156 if len(tokenized_album_name) <= len(tokenized_dirname) and compare_strings(
157 "".join(tokenized_album_name),
158 "".join(tokenized_dirname[0 : len(tokenized_album_name)]),
159 False,
160 ):
161 return True
162 return False
163
164
165def get_album_dir(track_dir: str, album_name: str) -> str | None:
166 """Return album/parent directory of a track."""
167 parentdir = track_dir
168 # account for disc sublevel by ignoring 1 level if needed
169 for _ in range(2):
170 dirname = parentdir.rsplit(os.sep)[-1]
171 if compare_strings(album_name, dirname, False):
172 # literal match
173 return parentdir
174 if compare_strings(album_name, dirname.split(" - ")[-1], False):
175 # account for ArtistName - AlbumName format in the directory name
176 return parentdir
177 if compare_strings(album_name, dirname.split(" - ")[-1].split("(")[0], False):
178 # account for ArtistName - AlbumName (Version) format in the directory name
179 return parentdir
180
181 if any(sep in dirname for sep in ["-", " ", "_"]) and album_name:
182 album_chunks = album_name.split(" - ", 1)
183 album_name_includes_artist = len(album_chunks) > 1
184 just_album_name = album_chunks[1] if album_name_includes_artist else None
185
186 # attempt matching using tokenized version of path and album name
187 # with _dir_contains_album_name()
188 if just_album_name and _dir_contains_album_name(just_album_name, dirname):
189 return parentdir
190
191 if _dir_contains_album_name(album_name, dirname):
192 return parentdir
193
194 if compare_strings(album_name.split("(", maxsplit=1)[0], dirname, False):
195 # account for AlbumName (Version) format in the album name
196 return parentdir
197 if compare_strings(album_name.split("(", maxsplit=1)[0], dirname.split(" - ")[-1], False):
198 # account for ArtistName - AlbumName (Version) format
199 return parentdir
200 if len(album_name) > 8 and album_name in dirname:
201 # dirname contains album name
202 # (could potentially lead to false positives, hence the length check)
203 return parentdir
204 parentdir = os.path.dirname(parentdir)
205 return None
206
207
208def get_relative_path(base_path: str, path: str) -> str:
209 """Return the relative path string for a path."""
210 if path.startswith(base_path):
211 path = path.split(base_path)[1]
212 for sep in ("/", "\\"):
213 if path.startswith(sep):
214 path = path[1:]
215 return path
216
217
218def get_absolute_path(base_path: str, path: str) -> str:
219 """Return the absolute path string for a path."""
220 if path.startswith(base_path):
221 return path
222 return os.path.join(base_path, path)
223
224
225def recursive_iter(
226 path: str,
227 base_path: str,
228 supported_extensions: set[str],
229 log: logging.Logger,
230) -> Iterator[FileSystemItem]:
231 """Recursively traverse directory entries yielding supported files.
232
233 :param path: The directory path to scan.
234 :param base_path: The root base path for constructing relative paths.
235 :param supported_extensions: Set of file extensions to include (lowercase, no dot).
236 :param log: Logger instance to use for warnings/debug messages.
237 """
238 try:
239 scan_iter = os.scandir(path)
240 except OSError as err:
241 if err.errno == errno.EINVAL:
242 log.warning(
243 "Skipping directory '%s' - unsupported characters in path",
244 path,
245 )
246 else:
247 log.warning("Unable to scan directory %s: %s", path, err)
248 return
249 with scan_iter:
250 for item in scan_iter:
251 if item.name in IGNORE_DIRS or item.name.startswith((".", "_")):
252 continue
253 try:
254 is_dir = item.is_dir(follow_symlinks=False)
255 is_file = item.is_file(follow_symlinks=False)
256 except OSError as err:
257 if err.errno == errno.EINVAL:
258 log.warning(
259 "Skipping '%s' - unsupported characters in name",
260 item.name,
261 )
262 continue
263 if is_dir:
264 yield from recursive_iter(item.path, base_path, supported_extensions, log)
265 elif is_file:
266 if "." not in item.name:
267 continue
268 ext = item.name.rsplit(".", 1)[1].lower()
269 if ext not in supported_extensions:
270 continue
271 try:
272 yield FileSystemItem.from_dir_entry(item, base_path)
273 except OSError as err:
274 if err.errno == errno.EINVAL:
275 log.warning(
276 "Skipping '%s' - unsupported characters in name",
277 item.name,
278 )
279 else:
280 log.debug(
281 "Skipping file %s due to OS error: %s",
282 item.path,
283 str(err),
284 )
285
286
287def sorted_scandir(base_path: str, sub_path: str, sort: bool = False) -> list[FileSystemItem]:
288 """
289 Implement os.scandir that returns (optionally) sorted entries.
290
291 Not async friendly!
292 """
293
294 def nat_key(name: str) -> tuple[int | str, ...]:
295 """Sort key for natural sorting."""
296 return tuple(int(s) if s.isdigit() else s for s in re.split(r"(\d+)", name))
297
298 if base_path not in sub_path:
299 sub_path = os.path.join(base_path, sub_path)
300 items: list[FileSystemItem] = []
301 try:
302 entries = os.scandir(sub_path)
303 except OSError as err:
304 if err.errno == errno.EINVAL:
305 logger.warning(
306 "Skipping directory '%s' - unsupported characters in path",
307 sub_path,
308 )
309 return items
310 raise
311 with entries:
312 for entry in entries:
313 try:
314 is_dir = entry.is_dir(follow_symlinks=False)
315 is_file = entry.is_file(follow_symlinks=False)
316 except OSError as err:
317 if err.errno == errno.EINVAL:
318 logger.warning(
319 "Skipping '%s' - unsupported characters in name",
320 entry.name,
321 )
322 continue
323 if not (is_dir or is_file):
324 continue
325 if entry.name in IGNORE_DIRS or entry.name.startswith("."):
326 continue
327 try:
328 items.append(FileSystemItem.from_dir_entry(entry, base_path))
329 except OSError as err:
330 if err.errno == errno.EINVAL:
331 logger.warning(
332 "Skipping '%s' - unsupported characters in name",
333 entry.name,
334 )
335 else:
336 logger.debug("Skipping '%s' due to OS error: %s", entry.name, err)
337 continue
338
339 if sort:
340 return sorted(
341 items,
342 # sort by (natural) name
343 key=lambda x: nat_key(x.name),
344 )
345 return items
346