music-assistant-server

9.4 KBPY
helpers.py
9.4 KB254 lines • python
1"""Some helpers for Filesystem based Musicproviders."""
2
3from __future__ import annotations
4
5import os
6import re
7from dataclasses import dataclass
8
9from music_assistant.helpers.compare import compare_strings
10
11IGNORE_DIRS = ("recycle", "Recently-Snaphot", "#recycle", "System Volume Information", "lost+found")
12
13
14@dataclass
15class FileSystemItem:
16    """Representation of an item (file or directory) on the filesystem.
17
18    - filename: Name (not path) of the file (or directory).
19    - relative_path: Relative path to the item on this filesystem provider.
20    - absolute_path: Absolute path to this item.
21    - parent_path: Absolute path to the parent directory.
22    - is_dir: Boolean if item is directory (not file).
23    - checksum: Checksum for this path (usually last modified time) None for dir.
24    - file_size : File size in number of bytes or None if unknown (or not a file).
25    - created_at: File creation timestamp (Unix epoch) or None for directories.
26    """
27
28    filename: str
29    relative_path: str
30    absolute_path: str
31    is_dir: bool
32    checksum: str | None = None
33    file_size: int | None = None
34    created_at: int | None = None  # file creation timestamp (Unix epoch)
35
36    @property
37    def ext(self) -> str | None:
38        """Return file extension."""
39        try:
40            # convert to lowercase to make it case insensitive when comparing
41            return self.filename.rsplit(".", 1)[1].lower()
42        except IndexError:
43            return None
44
45    @property
46    def name(self) -> str:
47        """Return file name (without extension)."""
48        return self.filename.rsplit(".", 1)[0]
49
50    @property
51    def parent_path(self) -> str:
52        """Return parent path of this item."""
53        return os.path.dirname(self.absolute_path)
54
55    @property
56    def parent_name(self) -> str:
57        """Return parent name of this item."""
58        return os.path.basename(self.parent_path)
59
60    @property
61    def relative_parent_path(self) -> str:
62        """Return relative parent path of this item."""
63        return os.path.dirname(self.relative_path)
64
65    @classmethod
66    def from_dir_entry(cls, entry: os.DirEntry[str], base_path: str) -> FileSystemItem:
67        """Create FileSystemItem from os.DirEntry. NOT Async friendly.
68
69        :raises OSError: If the file cannot be stat'd (e.g., invalid filename encoding).
70        """
71        if entry.is_dir(follow_symlinks=False):
72            return cls(
73                filename=entry.name,
74                relative_path=get_relative_path(base_path, entry.path),
75                absolute_path=entry.path,
76                is_dir=True,
77                checksum=None,
78                file_size=None,
79            )
80        # This can raise OSError for files with invalid encoding (e.g., emojis on SMB mounts)
81        # Let the caller handle the exception
82        stat = entry.stat(follow_symlinks=False)
83        # st_birthtime is available on macOS/Windows, st_ctime on Linux
84        # (on Linux st_ctime is metadata change time, not creation time)
85        created_at = int(getattr(stat, "st_birthtime", stat.st_ctime))
86        return cls(
87            filename=entry.name,
88            relative_path=get_relative_path(base_path, entry.path),
89            absolute_path=entry.path,
90            is_dir=False,
91            checksum=str(int(stat.st_mtime)),
92            file_size=stat.st_size,
93            created_at=created_at,
94        )
95
96
97def get_artist_dir(
98    artist_name: str,
99    album_dir: str | None,
100) -> str | None:
101    """Look for (Album)Artist directory in path of a track (or album)."""
102    if not album_dir:
103        return None
104    parentdir = os.path.dirname(album_dir)
105    # account for disc or album sublevel by ignoring (max) 2 levels if needed
106    matched_dir: str | None = None
107    for _ in range(3):
108        dirname = parentdir.rsplit(os.sep)[-1]
109        if compare_strings(artist_name, dirname, False):
110            # literal match
111            # we keep hunting further down to account for the
112            # edge case where the album name has the same name as the artist
113            matched_dir = parentdir
114        parentdir = os.path.dirname(parentdir)
115    return matched_dir
116
117
118def tokenize(input_str: str, delimiters: str) -> list[str]:
119    """Tokenizes the album names or paths."""
120    normalised = re.sub(delimiters, "^^^", input_str)
121    return [x for x in normalised.split("^^^") if x != ""]
122
123
124def _dir_contains_album_name(id3_album_name: str, directory_name: str) -> bool:
125    """Check if a directory name contains an album name.
126
127    This function tokenizes both input strings using different delimiters and
128    checks if the album name is a substring of the directory name.
129
130    First iteration considers the literal dash as one of the separators. The
131    second pass is to catch edge cases where the literal dash is part of the
132    album's name, not an actual separator. For example, an album like 'Aphex
133    Twin - Selected Ambient Works 85-92' would be correctly handled.
134
135    Args:
136        id3_album_name (str): The album name to search for.
137        directory_name (str): The directory name to search in.
138
139    Returns:
140        bool: True if the directory name contains the album name, False otherwise.
141    """
142    for delims in ["[-_ ]", "[_ ]"]:
143        tokenized_album_name = tokenize(id3_album_name, delims)
144        tokenized_dirname = tokenize(directory_name, delims)
145
146        # Exact match, potentially just on the album name
147        # in case artist's name is not included in id3_album_name
148        if all(token in tokenized_dirname for token in tokenized_album_name):
149            return True
150
151        if len(tokenized_album_name) <= len(tokenized_dirname) and compare_strings(
152            "".join(tokenized_album_name),
153            "".join(tokenized_dirname[0 : len(tokenized_album_name)]),
154            False,
155        ):
156            return True
157    return False
158
159
160def get_album_dir(track_dir: str, album_name: str) -> str | None:
161    """Return album/parent directory of a track."""
162    parentdir = track_dir
163    # account for disc sublevel by ignoring 1 level if needed
164    for _ in range(2):
165        dirname = parentdir.rsplit(os.sep)[-1]
166        if compare_strings(album_name, dirname, False):
167            # literal match
168            return parentdir
169        if compare_strings(album_name, dirname.split(" - ")[-1], False):
170            # account for ArtistName - AlbumName format in the directory name
171            return parentdir
172        if compare_strings(album_name, dirname.split(" - ")[-1].split("(")[0], False):
173            # account for ArtistName - AlbumName (Version) format in the directory name
174            return parentdir
175
176        if any(sep in dirname for sep in ["-", " ", "_"]) and album_name:
177            album_chunks = album_name.split(" - ", 1)
178            album_name_includes_artist = len(album_chunks) > 1
179            just_album_name = album_chunks[1] if album_name_includes_artist else None
180
181            # attempt matching using tokenized version of path and album name
182            # with _dir_contains_album_name()
183            if just_album_name and _dir_contains_album_name(just_album_name, dirname):
184                return parentdir
185
186            if _dir_contains_album_name(album_name, dirname):
187                return parentdir
188
189        if compare_strings(album_name.split("(")[0], dirname, False):
190            # account for AlbumName (Version) format in the album name
191            return parentdir
192        if compare_strings(album_name.split("(")[0], dirname.split(" - ")[-1], False):
193            # account for ArtistName - AlbumName (Version) format
194            return parentdir
195        if len(album_name) > 8 and album_name in dirname:
196            # dirname contains album name
197            # (could potentially lead to false positives, hence the length check)
198            return parentdir
199        parentdir = os.path.dirname(parentdir)
200    return None
201
202
203def get_relative_path(base_path: str, path: str) -> str:
204    """Return the relative path string for a path."""
205    if path.startswith(base_path):
206        path = path.split(base_path)[1]
207    for sep in ("/", "\\"):
208        if path.startswith(sep):
209            path = path[1:]
210    return path
211
212
213def get_absolute_path(base_path: str, path: str) -> str:
214    """Return the absolute path string for a path."""
215    if path.startswith(base_path):
216        return path
217    return os.path.join(base_path, path)
218
219
220def sorted_scandir(base_path: str, sub_path: str, sort: bool = False) -> list[FileSystemItem]:
221    """
222    Implement os.scandir that returns (optionally) sorted entries.
223
224    Not async friendly!
225    """
226
227    def nat_key(name: str) -> tuple[int | str, ...]:
228        """Sort key for natural sorting."""
229        return tuple(int(s) if s.isdigit() else s for s in re.split(r"(\d+)", name))
230
231    if base_path not in sub_path:
232        sub_path = os.path.join(base_path, sub_path)
233    items = []
234    for entry in os.scandir(sub_path):
235        # filter out invalid dirs and hidden files
236        if not (entry.is_dir(follow_symlinks=False) or entry.is_file(follow_symlinks=False)):
237            continue
238        if entry.name in IGNORE_DIRS or entry.name.startswith("."):
239            continue
240        try:
241            items.append(FileSystemItem.from_dir_entry(entry, base_path))
242        except OSError:
243            # Skip files that cannot be stat'd (e.g., invalid encoding on SMB mounts)
244            # This typically happens with emoji or special unicode characters
245            continue
246
247    if sort:
248        return sorted(
249            items,
250            # sort by (natural) name
251            key=lambda x: nat_key(x.name),
252        )
253    return items
254