/
/
/
1"""Several helper/utils to compare objects."""
2
3from __future__ import annotations
4
5import re
6from difflib import SequenceMatcher
7
8import unidecode
9from music_assistant_models.enums import ExternalID, MediaType
10from music_assistant_models.media_items import (
11 Album,
12 Artist,
13 Audiobook,
14 ItemMapping,
15 MediaItem,
16 MediaItemMetadata,
17 MediaItemType,
18 Playlist,
19 Podcast,
20 Radio,
21 Track,
22)
23
24IGNORE_VERSIONS = (
25 "explicit", # explicit is matched separately
26 "music from and inspired by the motion picture",
27 "original soundtrack",
28 "hi-res", # quality is handled separately
29)
30
31
32def compare_media_item(
33 base_item: MediaItemType | ItemMapping,
34 compare_item: MediaItemType | ItemMapping,
35 strict: bool = True,
36) -> bool | None:
37 """Compare two media items and return True if they match."""
38 if base_item.media_type == MediaType.ARTIST and compare_item.media_type == MediaType.ARTIST:
39 assert isinstance(base_item, Artist | ItemMapping) # for type checking
40 assert isinstance(compare_item, Artist | ItemMapping) # for type checking
41 return compare_artist(base_item, compare_item, strict)
42 if base_item.media_type == MediaType.ALBUM and compare_item.media_type == MediaType.ALBUM:
43 assert isinstance(base_item, Album | ItemMapping) # for type checking
44 assert isinstance(compare_item, Album | ItemMapping) # for type checking
45 return compare_album(base_item, compare_item, strict)
46 if base_item.media_type == MediaType.TRACK and compare_item.media_type == MediaType.TRACK:
47 assert isinstance(base_item, Track) # for type checking
48 assert isinstance(compare_item, Track) # for type checking
49 return compare_track(base_item, compare_item, strict)
50 if base_item.media_type == MediaType.PLAYLIST and compare_item.media_type == MediaType.PLAYLIST:
51 assert isinstance(base_item, Playlist | ItemMapping) # for type checking
52 assert isinstance(compare_item, Playlist | ItemMapping) # for type checking
53 return compare_playlist(base_item, compare_item, strict)
54 if base_item.media_type == MediaType.RADIO and compare_item.media_type == MediaType.RADIO:
55 assert isinstance(base_item, Radio | ItemMapping) # for type checking
56 assert isinstance(compare_item, Radio | ItemMapping) # for type checking
57 return compare_radio(base_item, compare_item, strict)
58 if (
59 base_item.media_type == MediaType.AUDIOBOOK
60 and compare_item.media_type == MediaType.AUDIOBOOK
61 ):
62 assert isinstance(base_item, Audiobook | ItemMapping) # for type checking
63 assert isinstance(compare_item, Audiobook | ItemMapping) # for type checking
64 return compare_audiobook(base_item, compare_item, strict)
65 if base_item.media_type == MediaType.PODCAST and compare_item.media_type == MediaType.PODCAST:
66 assert isinstance(base_item, Podcast | ItemMapping) # for type checking
67 assert isinstance(compare_item, Podcast | ItemMapping) # for type checking
68 return compare_podcast(base_item, compare_item, strict)
69 assert isinstance(base_item, ItemMapping) # for type checking
70 assert isinstance(compare_item, ItemMapping) # for type checking
71 return compare_item_mapping(base_item, compare_item, strict)
72
73
74def compare_artist(
75 base_item: Artist | ItemMapping,
76 compare_item: Artist | ItemMapping,
77 strict: bool = True,
78) -> bool | None:
79 """Compare two artist items and return True if they match."""
80 # return early on exact item_id match
81 if compare_item_ids(base_item, compare_item):
82 return True
83 # return early on (un)matched external id
84 for ext_id in (ExternalID.DISCOGS, ExternalID.MB_ARTIST, ExternalID.TADB):
85 external_id_match = compare_external_ids(
86 base_item.external_ids, compare_item.external_ids, ext_id
87 )
88 if external_id_match is not None:
89 return external_id_match
90 # finally comparing on (exact) name match
91 return compare_strings(base_item.name, compare_item.name, strict=strict)
92
93
94def compare_album(
95 base_item: Album | ItemMapping,
96 compare_item: Album | ItemMapping,
97 strict: bool = True,
98) -> bool | None:
99 """Compare two album items and return True if they match."""
100 # return early on exact item_id match
101 if compare_item_ids(base_item, compare_item):
102 return True
103
104 # return early on (un)matched external id
105 for ext_id in (
106 ExternalID.DISCOGS,
107 ExternalID.MB_ALBUM,
108 ExternalID.TADB,
109 ExternalID.ASIN,
110 ExternalID.BARCODE,
111 ):
112 external_id_match = compare_external_ids(
113 base_item.external_ids, compare_item.external_ids, ext_id
114 )
115 if external_id_match is not None:
116 return external_id_match
117
118 # compare version
119 if not compare_version(base_item.version, compare_item.version):
120 return False
121 # compare name
122 if not compare_strings(base_item.name, compare_item.name, strict=True):
123 return False
124 if not strict and (isinstance(base_item, ItemMapping) or isinstance(compare_item, ItemMapping)):
125 return True
126 # for strict matching we REQUIRE both items to be a real album object
127 assert isinstance(base_item, Album)
128 assert isinstance(compare_item, Album)
129 # compare year
130 if base_item.year and compare_item.year and base_item.year != compare_item.year:
131 return False
132 # compare explicitness
133 if compare_explicit(base_item.metadata, compare_item.metadata) is False:
134 return False
135 # compare album artist(s)
136 return compare_artists(base_item.artists, compare_item.artists, not strict)
137
138
139def compare_track(
140 base_item: Track,
141 compare_item: Track,
142 strict: bool = True,
143 track_albums: list[Album] | None = None,
144) -> bool:
145 """Compare two track items and return True if they match."""
146 # return early on exact item_id match
147 if compare_item_ids(base_item, compare_item):
148 return True
149 # return early on (un)matched primary/unique external id
150 for ext_id in (
151 ExternalID.MB_RECORDING,
152 ExternalID.MB_TRACK,
153 ExternalID.ACOUSTID,
154 ):
155 external_id_match = compare_external_ids(
156 base_item.external_ids, compare_item.external_ids, ext_id
157 )
158 if external_id_match is not None:
159 return external_id_match
160 # check secondary external id matches
161 for ext_id in (
162 ExternalID.DISCOGS,
163 ExternalID.TADB,
164 ExternalID.ISRC,
165 ExternalID.ASIN,
166 ):
167 external_id_match = compare_external_ids(
168 base_item.external_ids, compare_item.external_ids, ext_id
169 )
170 if external_id_match is True:
171 # we got a 'soft-match' on a secondary external id (like ISRC)
172 # but we do a double check on duration
173 if abs(base_item.duration - compare_item.duration) <= 8:
174 return True
175
176 # compare name
177 if not compare_strings(base_item.name, compare_item.name, strict=True):
178 return False
179 # track artist(s) must match
180 if not compare_artists(base_item.artists, compare_item.artists, any_match=not strict):
181 return False
182 # track version must match
183 if strict and not compare_version(base_item.version, compare_item.version):
184 return False
185 # check if both tracks are (not) explicit
186 if base_item.metadata.explicit is None and isinstance(base_item.album, Album):
187 base_item.metadata.explicit = base_item.album.metadata.explicit
188 if compare_item.metadata.explicit is None and isinstance(compare_item.album, Album):
189 compare_item.metadata.explicit = compare_item.album.metadata.explicit
190 if strict and compare_explicit(base_item.metadata, compare_item.metadata) is False:
191 return False
192
193 # exact albumtrack match = 100% match
194 if (
195 base_item.album
196 and compare_item.album
197 and compare_album(base_item.album, compare_item.album, False)
198 and base_item.disc_number
199 and compare_item.disc_number
200 and base_item.track_number
201 and compare_item.track_number
202 and base_item.disc_number == compare_item.disc_number
203 and base_item.track_number == compare_item.track_number
204 ):
205 return True
206
207 # fallback: exact album match and (near-exact) track duration match
208 if (
209 base_item.album is not None
210 and compare_item.album is not None
211 and (base_item.track_number == 0 or compare_item.track_number == 0)
212 and compare_album(base_item.album, compare_item.album, False)
213 and abs(base_item.duration - compare_item.duration) <= 3
214 ):
215 return True
216
217 # fallback: additional compare albums provided for base track
218 if (
219 compare_item.album is not None
220 and track_albums
221 and abs(base_item.duration - compare_item.duration) <= 3
222 ):
223 for track_album in track_albums:
224 if compare_album(track_album, compare_item.album, False):
225 return True
226
227 # fallback edge case: albumless track with same duration
228 if (
229 base_item.album is None
230 and compare_item.album is None
231 and base_item.disc_number == 0
232 and compare_item.disc_number == 0
233 and base_item.track_number == 0
234 and compare_item.track_number == 0
235 and base_item.duration == compare_item.duration
236 ):
237 return True
238
239 if strict:
240 # in strict mode, we require an exact album match so return False here
241 return False
242
243 # Accept last resort (in non strict mode): (near) exact duration,
244 # otherwise fail all other cases.
245 # Note that as this stage, all other info already matches,
246 # such as title, artist etc.
247 return abs(base_item.duration - compare_item.duration) <= 2
248
249
250def compare_playlist(
251 base_item: Playlist | ItemMapping,
252 compare_item: Playlist | ItemMapping,
253 strict: bool = True,
254) -> bool | None:
255 """Compare two Playlist items and return True if they match."""
256 # require (exact) name match
257 if not compare_strings(base_item.name, compare_item.name, strict=strict):
258 return False
259 # require exact owner match (if not ItemMapping)
260 if isinstance(base_item, Playlist) and isinstance(compare_item, Playlist):
261 if not compare_strings(base_item.owner, compare_item.owner):
262 return False
263 # a playlist is always unique - so do a strict compare on item id(s)
264 return compare_item_ids(base_item, compare_item)
265
266
267def compare_radio(
268 base_item: Radio | ItemMapping,
269 compare_item: Radio | ItemMapping,
270 strict: bool = True,
271) -> bool | None:
272 """Compare two Radio items and return True if they match."""
273 # return early on exact item_id match
274 if compare_item_ids(base_item, compare_item):
275 return True
276 # compare version
277 if not compare_version(base_item.version, compare_item.version):
278 return False
279 # finally comparing on (exact) name match
280 return compare_strings(base_item.name, compare_item.name, strict=strict)
281
282
283def compare_audiobook(
284 base_item: Audiobook | ItemMapping,
285 compare_item: Audiobook | ItemMapping,
286 strict: bool = True,
287) -> bool | None:
288 """Compare two Audiobook items and return True if they match."""
289 # return early on exact item_id match
290 if compare_item_ids(base_item, compare_item):
291 return True
292
293 # return early on (un)matched external id
294 for ext_id in (
295 ExternalID.ASIN,
296 ExternalID.BARCODE,
297 ):
298 external_id_match = compare_external_ids(
299 base_item.external_ids, compare_item.external_ids, ext_id
300 )
301 if external_id_match is not None:
302 return external_id_match
303
304 # compare version
305 if not compare_version(base_item.version, compare_item.version):
306 return False
307 # compare name
308 if not compare_strings(base_item.name, compare_item.name, strict=True):
309 return False
310 if not strict and (isinstance(base_item, ItemMapping) or isinstance(compare_item, ItemMapping)):
311 return True
312 # for strict matching we REQUIRE both items to be a real Audiobook object
313 assert isinstance(base_item, Audiobook)
314 assert isinstance(compare_item, Audiobook)
315 # compare publisher
316 if (
317 base_item.publisher
318 and compare_item.publisher
319 and not compare_strings(base_item.publisher, compare_item.publisher, strict=True)
320 ):
321 return False
322 # compare author(s)
323 for author in base_item.authors:
324 author_safe = create_safe_string(author)
325 if author_safe in [create_safe_string(x) for x in compare_item.authors]:
326 return True
327 return False
328
329
330def compare_podcast(
331 base_item: Podcast | ItemMapping,
332 compare_item: Podcast | ItemMapping,
333 strict: bool = True,
334) -> bool | None:
335 """Compare two Podcast items and return True if they match."""
336 # return early on exact item_id match
337 if compare_item_ids(base_item, compare_item):
338 return True
339
340 # return early on (un)matched external id
341 for ext_id in (
342 ExternalID.ASIN,
343 ExternalID.BARCODE,
344 ):
345 external_id_match = compare_external_ids(
346 base_item.external_ids, compare_item.external_ids, ext_id
347 )
348 if external_id_match is not None:
349 return external_id_match
350
351 # compare version
352 if not compare_version(base_item.version, compare_item.version):
353 return False
354 # compare name
355 if not compare_strings(base_item.name, compare_item.name, strict=True):
356 return False
357 if not strict and (isinstance(base_item, ItemMapping) or isinstance(compare_item, ItemMapping)):
358 return True
359 # for strict matching we REQUIRE both items to be a real Podcast object
360 assert isinstance(base_item, Podcast)
361 assert isinstance(compare_item, Podcast)
362 # compare publisher
363 return not (
364 base_item.publisher
365 and compare_item.publisher
366 and not compare_strings(base_item.publisher, compare_item.publisher, strict=True)
367 )
368
369
370def compare_item_mapping(
371 base_item: ItemMapping,
372 compare_item: ItemMapping,
373 strict: bool = True,
374) -> bool | None:
375 """Compare two ItemMapping items and return True if they match."""
376 # return early on exact item_id match
377 if compare_item_ids(base_item, compare_item):
378 return True
379 # return early on (un)matched external id
380 # check all ExternalID, as ItemMapping is a minimized obj for all MediaItems
381 for ext_id in ExternalID:
382 external_id_match = compare_external_ids(
383 base_item.external_ids, compare_item.external_ids, ext_id
384 )
385 if external_id_match is not None:
386 return external_id_match
387 # compare version
388 if not compare_version(base_item.version, compare_item.version):
389 return False
390 # finally comparing on (exact) name match
391 return compare_strings(base_item.name, compare_item.name, strict=strict)
392
393
394def compare_artists(
395 base_items: list[Artist | ItemMapping],
396 compare_items: list[Artist | ItemMapping],
397 any_match: bool = True,
398) -> bool:
399 """Compare two lists of artist and return True if both lists match (exactly)."""
400 if not base_items or not compare_items:
401 return False
402 # match if first artist matches in both lists
403 if compare_artist(base_items[0], compare_items[0]):
404 return True
405 # compare the artist lists
406 matches = 0
407 for base_item in base_items:
408 for compare_item in compare_items:
409 if compare_artist(base_item, compare_item):
410 if any_match:
411 return True
412 matches += 1
413 return len(base_items) == len(compare_items) == matches
414
415
416def compare_albums(
417 base_items: list[Album | ItemMapping],
418 compare_items: list[Album | ItemMapping],
419 any_match: bool = True,
420) -> bool:
421 """Compare two lists of albums and return True if a match was found."""
422 matches = 0
423 for base_item in base_items:
424 for compare_item in compare_items:
425 if compare_album(base_item, compare_item):
426 if any_match:
427 return True
428 matches += 1
429 return len(base_items) == matches
430
431
432def compare_item_ids(
433 base_item: MediaItem | ItemMapping, compare_item: MediaItem | ItemMapping
434) -> bool:
435 """Compare item_id(s) of two media items."""
436 if not base_item.provider or not compare_item.provider:
437 return False
438 if not base_item.item_id or not compare_item.item_id:
439 return False
440 if base_item.provider == compare_item.provider and base_item.item_id == compare_item.item_id:
441 return True
442
443 base_prov_ids = getattr(base_item, "provider_mappings", None)
444 compare_prov_ids = getattr(compare_item, "provider_mappings", None)
445
446 if base_prov_ids is not None:
447 assert isinstance(base_item, MediaItem) # for type checking
448 for prov_l in base_item.provider_mappings:
449 if (
450 prov_l.provider_instance == compare_item.provider
451 and prov_l.item_id == compare_item.item_id
452 ):
453 return True
454
455 if compare_prov_ids is not None:
456 assert isinstance(compare_item, MediaItem) # for type checking
457 for prov_r in compare_item.provider_mappings:
458 if (
459 prov_r.provider_instance == base_item.provider
460 and prov_r.item_id == base_item.item_id
461 ):
462 return True
463
464 if base_prov_ids is not None and compare_prov_ids is not None:
465 assert isinstance(base_item, MediaItem) # for type checking
466 assert isinstance(compare_item, MediaItem) # for type checking
467 for prov_l in base_item.provider_mappings:
468 for prov_r in compare_item.provider_mappings:
469 if prov_l.provider_domain != prov_r.provider_domain:
470 continue
471 if (
472 prov_l.is_unique or prov_r.is_unique
473 ) and prov_l.provider_instance != prov_r.provider_instance:
474 continue
475 if prov_l.item_id == prov_r.item_id:
476 return True
477 return False
478
479
480def compare_external_ids(
481 external_ids_base: set[tuple[ExternalID, str]],
482 external_ids_compare: set[tuple[ExternalID, str]],
483 external_id_type: ExternalID,
484) -> bool | None:
485 """Compare external ids and return True if a match was found."""
486 base_ids = {x[1] for x in external_ids_base if x[0] == external_id_type}
487 if not base_ids:
488 # return early if the requested external id type is not present in the base set
489 return None
490 compare_ids = {x[1] for x in external_ids_compare if x[0] == external_id_type}
491 if not compare_ids:
492 # return early if the requested external id type is not present in the compare set
493 return None
494 for base_id in base_ids:
495 if base_id in compare_ids:
496 return True
497 # handle upc stored as EAN-13 barcode
498 if external_id_type == ExternalID.BARCODE and len(base_id) == 12:
499 if f"0{base_id}" in compare_ids:
500 return True
501 # handle EAN-13 stored as UPC barcode
502 if external_id_type == ExternalID.BARCODE and len(base_id) == 13:
503 if base_id[1:] in compare_ids:
504 return True
505 # return false if the identifier is unique (e.g. musicbrainz id)
506 if external_id_type.is_unique:
507 return False
508 return None
509
510
511def create_safe_string(input_str: str, lowercase: bool = True, replace_space: bool = False) -> str:
512 """Return clean lowered string for compare actions."""
513 # handle some special cases
514 if input_str in ("P!nk", "p!nk"):
515 input_str = input_str.replace("!", "i")
516 if input_str in ("Whâ", "whâ"):
517 input_str = input_str.replace("â", "o")
518 if input_str in ("KoЯn", "koЯn"):
519 input_str = input_str.replace("Я", "r")
520 if input_str == "$hort":
521 input_str = input_str.replace("$hort", "short")
522 input_str = input_str.lower().strip() if lowercase else input_str.strip()
523 unaccented_string = unidecode.unidecode(input_str)
524 regex = r"[^a-zA-Z0-9]" if replace_space else r"[^a-zA-Z0-9 ]"
525 return re.sub(regex, "", unaccented_string)
526
527
528def loose_compare_strings(base: str, alt: str) -> bool:
529 """Compare strings and return True even on partial match."""
530 # this is used to display 'versions' of the same track/album
531 # where we account for other spelling or some additional wording in the title
532 if len(base) <= 3 or len(alt) <= 3:
533 return compare_strings(base, alt, True)
534 word_count = len(base.strip().split(" "))
535 if word_count == 1 and len(base) < 10:
536 return compare_strings(base, alt, False)
537 base_comp = create_safe_string(base)
538 alt_comp = create_safe_string(alt)
539 if base_comp in alt_comp:
540 return True
541 return base_comp in alt_comp
542
543
544def compare_strings(str1: str, str2: str, strict: bool = True) -> bool:
545 """Compare strings and return True if we have an (almost) perfect match."""
546 if not str1 or not str2:
547 return False
548 str1_lower = str1.lower()
549 str2_lower = str2.lower()
550 if strict:
551 return str1_lower == str2_lower
552 # return early if total length mismatch
553 if abs(len(str1) - len(str2)) > 4:
554 return False
555 # handle '&' vs 'And'
556 if " & " in str1_lower and " and " in str2_lower:
557 str2 = str2_lower.replace(" and ", " & ")
558 elif " and " in str1_lower and " & " in str2:
559 str2 = str2_lower.replace(" & ", " and ")
560 if create_safe_string(str1) == create_safe_string(str2):
561 return True
562 # last resort: use difflib to compare strings
563 required_accuracy = 0.9 if (len(str1) + len(str2)) > 18 else 0.8
564 return SequenceMatcher(a=str1_lower, b=str2_lower).ratio() > required_accuracy
565
566
567def compare_version(base_version: str, compare_version: str) -> bool:
568 """Compare version string."""
569 if not base_version and not compare_version:
570 return True
571 if not base_version and compare_version.lower() in IGNORE_VERSIONS:
572 return True
573 if not compare_version and base_version.lower() in IGNORE_VERSIONS:
574 return True
575 if not base_version and compare_version:
576 return False
577 if base_version and not compare_version:
578 return False
579
580 if " " not in base_version and " " not in compare_version:
581 return compare_strings(base_version, compare_version, False)
582
583 # do this the hard way as sometimes the version string is in the wrong order
584 base_versions = sorted(base_version.lower().split(" "))
585 compare_versions = sorted(compare_version.lower().split(" "))
586 # filter out words we can ignore (such as 'version')
587 ignore_words = [
588 *IGNORE_VERSIONS,
589 "version",
590 "edition",
591 "variant",
592 "versie",
593 "versione",
594 ]
595 base_versions = [x for x in base_versions if x not in ignore_words]
596 compare_versions = [x for x in compare_versions if x not in ignore_words]
597
598 return base_versions == compare_versions
599
600
601def compare_explicit(base: MediaItemMetadata, compare: MediaItemMetadata) -> bool | None:
602 """Compare if explicit is same in metadata."""
603 if base.explicit is not None and compare.explicit is not None:
604 # explicitness info is not always present in metadata
605 # only strict compare them if both have the info set
606 return base.explicit == compare.explicit
607 return None
608