/
/
/
1"""Some helpers for Filesystem based Musicproviders."""
2
3from __future__ import annotations
4
5import os
6import re
7from dataclasses import dataclass
8
9from music_assistant.helpers.compare import compare_strings
10
11IGNORE_DIRS = ("recycle", "Recently-Snaphot", "#recycle", "System Volume Information", "lost+found")
12
13
14@dataclass
15class FileSystemItem:
16 """Representation of an item (file or directory) on the filesystem.
17
18 - filename: Name (not path) of the file (or directory).
19 - relative_path: Relative path to the item on this filesystem provider.
20 - absolute_path: Absolute path to this item.
21 - parent_path: Absolute path to the parent directory.
22 - is_dir: Boolean if item is directory (not file).
23 - checksum: Checksum for this path (usually last modified time) None for dir.
24 - file_size : File size in number of bytes or None if unknown (or not a file).
25 - created_at: File creation timestamp (Unix epoch) or None for directories.
26 """
27
28 filename: str
29 relative_path: str
30 absolute_path: str
31 is_dir: bool
32 checksum: str | None = None
33 file_size: int | None = None
34 created_at: int | None = None # file creation timestamp (Unix epoch)
35
36 @property
37 def ext(self) -> str | None:
38 """Return file extension."""
39 try:
40 # convert to lowercase to make it case insensitive when comparing
41 return self.filename.rsplit(".", 1)[1].lower()
42 except IndexError:
43 return None
44
45 @property
46 def name(self) -> str:
47 """Return file name (without extension)."""
48 return self.filename.rsplit(".", 1)[0]
49
50 @property
51 def parent_path(self) -> str:
52 """Return parent path of this item."""
53 return os.path.dirname(self.absolute_path)
54
55 @property
56 def parent_name(self) -> str:
57 """Return parent name of this item."""
58 return os.path.basename(self.parent_path)
59
60 @property
61 def relative_parent_path(self) -> str:
62 """Return relative parent path of this item."""
63 return os.path.dirname(self.relative_path)
64
65 @classmethod
66 def from_dir_entry(cls, entry: os.DirEntry[str], base_path: str) -> FileSystemItem:
67 """Create FileSystemItem from os.DirEntry. NOT Async friendly.
68
69 :raises OSError: If the file cannot be stat'd (e.g., invalid filename encoding).
70 """
71 if entry.is_dir(follow_symlinks=False):
72 return cls(
73 filename=entry.name,
74 relative_path=get_relative_path(base_path, entry.path),
75 absolute_path=entry.path,
76 is_dir=True,
77 checksum=None,
78 file_size=None,
79 )
80 # This can raise OSError for files with invalid encoding (e.g., emojis on SMB mounts)
81 # Let the caller handle the exception
82 stat = entry.stat(follow_symlinks=False)
83 # st_birthtime is available on macOS/Windows, st_ctime on Linux
84 # (on Linux st_ctime is metadata change time, not creation time)
85 created_at = int(getattr(stat, "st_birthtime", stat.st_ctime))
86 return cls(
87 filename=entry.name,
88 relative_path=get_relative_path(base_path, entry.path),
89 absolute_path=entry.path,
90 is_dir=False,
91 checksum=str(int(stat.st_mtime)),
92 file_size=stat.st_size,
93 created_at=created_at,
94 )
95
96
97def get_artist_dir(
98 artist_name: str,
99 album_dir: str | None,
100) -> str | None:
101 """Look for (Album)Artist directory in path of a track (or album)."""
102 if not album_dir:
103 return None
104 parentdir = os.path.dirname(album_dir)
105 # account for disc or album sublevel by ignoring (max) 2 levels if needed
106 matched_dir: str | None = None
107 for _ in range(3):
108 dirname = parentdir.rsplit(os.sep)[-1]
109 if compare_strings(artist_name, dirname, False):
110 # literal match
111 # we keep hunting further down to account for the
112 # edge case where the album name has the same name as the artist
113 matched_dir = parentdir
114 parentdir = os.path.dirname(parentdir)
115 return matched_dir
116
117
118def tokenize(input_str: str, delimiters: str) -> list[str]:
119 """Tokenizes the album names or paths."""
120 normalised = re.sub(delimiters, "^^^", input_str)
121 return [x for x in normalised.split("^^^") if x != ""]
122
123
124def _dir_contains_album_name(id3_album_name: str, directory_name: str) -> bool:
125 """Check if a directory name contains an album name.
126
127 This function tokenizes both input strings using different delimiters and
128 checks if the album name is a substring of the directory name.
129
130 First iteration considers the literal dash as one of the separators. The
131 second pass is to catch edge cases where the literal dash is part of the
132 album's name, not an actual separator. For example, an album like 'Aphex
133 Twin - Selected Ambient Works 85-92' would be correctly handled.
134
135 Args:
136 id3_album_name (str): The album name to search for.
137 directory_name (str): The directory name to search in.
138
139 Returns:
140 bool: True if the directory name contains the album name, False otherwise.
141 """
142 for delims in ["[-_ ]", "[_ ]"]:
143 tokenized_album_name = tokenize(id3_album_name, delims)
144 tokenized_dirname = tokenize(directory_name, delims)
145
146 # Exact match, potentially just on the album name
147 # in case artist's name is not included in id3_album_name
148 if all(token in tokenized_dirname for token in tokenized_album_name):
149 return True
150
151 if len(tokenized_album_name) <= len(tokenized_dirname) and compare_strings(
152 "".join(tokenized_album_name),
153 "".join(tokenized_dirname[0 : len(tokenized_album_name)]),
154 False,
155 ):
156 return True
157 return False
158
159
160def get_album_dir(track_dir: str, album_name: str) -> str | None:
161 """Return album/parent directory of a track."""
162 parentdir = track_dir
163 # account for disc sublevel by ignoring 1 level if needed
164 for _ in range(2):
165 dirname = parentdir.rsplit(os.sep)[-1]
166 if compare_strings(album_name, dirname, False):
167 # literal match
168 return parentdir
169 if compare_strings(album_name, dirname.split(" - ")[-1], False):
170 # account for ArtistName - AlbumName format in the directory name
171 return parentdir
172 if compare_strings(album_name, dirname.split(" - ")[-1].split("(")[0], False):
173 # account for ArtistName - AlbumName (Version) format in the directory name
174 return parentdir
175
176 if any(sep in dirname for sep in ["-", " ", "_"]) and album_name:
177 album_chunks = album_name.split(" - ", 1)
178 album_name_includes_artist = len(album_chunks) > 1
179 just_album_name = album_chunks[1] if album_name_includes_artist else None
180
181 # attempt matching using tokenized version of path and album name
182 # with _dir_contains_album_name()
183 if just_album_name and _dir_contains_album_name(just_album_name, dirname):
184 return parentdir
185
186 if _dir_contains_album_name(album_name, dirname):
187 return parentdir
188
189 if compare_strings(album_name.split("(")[0], dirname, False):
190 # account for AlbumName (Version) format in the album name
191 return parentdir
192 if compare_strings(album_name.split("(")[0], dirname.split(" - ")[-1], False):
193 # account for ArtistName - AlbumName (Version) format
194 return parentdir
195 if len(album_name) > 8 and album_name in dirname:
196 # dirname contains album name
197 # (could potentially lead to false positives, hence the length check)
198 return parentdir
199 parentdir = os.path.dirname(parentdir)
200 return None
201
202
203def get_relative_path(base_path: str, path: str) -> str:
204 """Return the relative path string for a path."""
205 if path.startswith(base_path):
206 path = path.split(base_path)[1]
207 for sep in ("/", "\\"):
208 if path.startswith(sep):
209 path = path[1:]
210 return path
211
212
213def get_absolute_path(base_path: str, path: str) -> str:
214 """Return the absolute path string for a path."""
215 if path.startswith(base_path):
216 return path
217 return os.path.join(base_path, path)
218
219
220def sorted_scandir(base_path: str, sub_path: str, sort: bool = False) -> list[FileSystemItem]:
221 """
222 Implement os.scandir that returns (optionally) sorted entries.
223
224 Not async friendly!
225 """
226
227 def nat_key(name: str) -> tuple[int | str, ...]:
228 """Sort key for natural sorting."""
229 return tuple(int(s) if s.isdigit() else s for s in re.split(r"(\d+)", name))
230
231 if base_path not in sub_path:
232 sub_path = os.path.join(base_path, sub_path)
233 items = []
234 for entry in os.scandir(sub_path):
235 # filter out invalid dirs and hidden files
236 if not (entry.is_dir(follow_symlinks=False) or entry.is_file(follow_symlinks=False)):
237 continue
238 if entry.name in IGNORE_DIRS or entry.name.startswith("."):
239 continue
240 try:
241 items.append(FileSystemItem.from_dir_entry(entry, base_path))
242 except OSError:
243 # Skip files that cannot be stat'd (e.g., invalid encoding on SMB mounts)
244 # This typically happens with emoji or special unicode characters
245 continue
246
247 if sort:
248 return sorted(
249 items,
250 # sort by (natural) name
251 key=lambda x: nat_key(x.name),
252 )
253 return items
254