/
/
/
1"""Several helper/utils to compare objects."""
2
3from __future__ import annotations
4
5import re
6from difflib import SequenceMatcher
7
8import unidecode
9from music_assistant_models.enums import ExternalID, MediaType
10from music_assistant_models.media_items import (
11 Album,
12 Artist,
13 Audiobook,
14 ItemMapping,
15 MediaItem,
16 MediaItemMetadata,
17 MediaItemType,
18 Playlist,
19 Podcast,
20 Radio,
21 Track,
22)
23
24IGNORE_VERSIONS = (
25 "explicit", # explicit is matched separately
26 "music from and inspired by the motion picture",
27 "original soundtrack",
28 "hi-res", # quality is handled separately
29)
30
31
32def compare_media_item(
33 base_item: MediaItemType | ItemMapping,
34 compare_item: MediaItemType | ItemMapping,
35 strict: bool = True,
36) -> bool | None:
37 """Compare two media items and return True if they match."""
38 if base_item.media_type == MediaType.ARTIST and compare_item.media_type == MediaType.ARTIST:
39 assert isinstance(base_item, Artist | ItemMapping) # for type checking
40 assert isinstance(compare_item, Artist | ItemMapping) # for type checking
41 return compare_artist(base_item, compare_item, strict)
42 if base_item.media_type == MediaType.ALBUM and compare_item.media_type == MediaType.ALBUM:
43 assert isinstance(base_item, Album | ItemMapping) # for type checking
44 assert isinstance(compare_item, Album | ItemMapping) # for type checking
45 return compare_album(base_item, compare_item, strict)
46 if base_item.media_type == MediaType.TRACK and compare_item.media_type == MediaType.TRACK:
47 assert isinstance(base_item, Track) # for type checking
48 assert isinstance(compare_item, Track) # for type checking
49 return compare_track(base_item, compare_item, strict)
50 if base_item.media_type == MediaType.PLAYLIST and compare_item.media_type == MediaType.PLAYLIST:
51 assert isinstance(base_item, Playlist | ItemMapping) # for type checking
52 assert isinstance(compare_item, Playlist | ItemMapping) # for type checking
53 return compare_playlist(base_item, compare_item, strict)
54 if base_item.media_type == MediaType.RADIO and compare_item.media_type == MediaType.RADIO:
55 assert isinstance(base_item, Radio | ItemMapping) # for type checking
56 assert isinstance(compare_item, Radio | ItemMapping) # for type checking
57 return compare_radio(base_item, compare_item, strict)
58 if (
59 base_item.media_type == MediaType.AUDIOBOOK
60 and compare_item.media_type == MediaType.AUDIOBOOK
61 ):
62 assert isinstance(base_item, Audiobook | ItemMapping) # for type checking
63 assert isinstance(compare_item, Audiobook | ItemMapping) # for type checking
64 return compare_audiobook(base_item, compare_item, strict)
65 if base_item.media_type == MediaType.PODCAST and compare_item.media_type == MediaType.PODCAST:
66 assert isinstance(base_item, Podcast | ItemMapping) # for type checking
67 assert isinstance(compare_item, Podcast | ItemMapping) # for type checking
68 return compare_podcast(base_item, compare_item, strict)
69 assert isinstance(base_item, ItemMapping) # for type checking
70 assert isinstance(compare_item, ItemMapping) # for type checking
71 return compare_item_mapping(base_item, compare_item, strict)
72
73
74def compare_artist(
75 base_item: Artist | ItemMapping,
76 compare_item: Artist | ItemMapping,
77 strict: bool = True,
78) -> bool | None:
79 """Compare two artist items and return True if they match."""
80 # return early on exact item_id match
81 if compare_item_ids(base_item, compare_item):
82 return True
83 # return early on (un)matched external id
84 for ext_id in (ExternalID.DISCOGS, ExternalID.MB_ARTIST, ExternalID.TADB):
85 external_id_match = compare_external_ids(
86 base_item.external_ids, compare_item.external_ids, ext_id
87 )
88 if external_id_match is not None:
89 return external_id_match
90 # finally comparing on (exact) name match
91 return compare_strings(base_item.name, compare_item.name, strict=strict)
92
93
94def compare_album(
95 base_item: Album | ItemMapping,
96 compare_item: Album | ItemMapping,
97 strict: bool = True,
98) -> bool | None:
99 """Compare two album items and return True if they match."""
100 # return early on exact item_id match
101 if compare_item_ids(base_item, compare_item):
102 return True
103
104 # return early on (un)matched external id
105 for ext_id in (
106 ExternalID.DISCOGS,
107 ExternalID.MB_ALBUM,
108 ExternalID.TADB,
109 ExternalID.ASIN,
110 ExternalID.BARCODE,
111 ):
112 external_id_match = compare_external_ids(
113 base_item.external_ids, compare_item.external_ids, ext_id
114 )
115 if external_id_match is not None:
116 return external_id_match
117
118 # compare version
119 if not compare_version(base_item.version, compare_item.version):
120 return False
121 # compare name
122 if not compare_strings(base_item.name, compare_item.name, strict=True):
123 return False
124 if not strict and (isinstance(base_item, ItemMapping) or isinstance(compare_item, ItemMapping)):
125 return True
126 # for strict matching we REQUIRE both items to be a real album object
127 assert isinstance(base_item, Album)
128 assert isinstance(compare_item, Album)
129 # compare year
130 if base_item.year and compare_item.year and base_item.year != compare_item.year:
131 return False
132 # compare explicitness
133 if compare_explicit(base_item.metadata, compare_item.metadata) is False:
134 return False
135 # compare album artist(s)
136 return compare_artists(base_item.artists, compare_item.artists, not strict)
137
138
139def compare_track(
140 base_item: Track,
141 compare_item: Track,
142 strict: bool = True,
143 track_albums: list[Album] | None = None,
144) -> bool:
145 """Compare two track items and return True if they match."""
146 # return early on exact item_id match
147 if compare_item_ids(base_item, compare_item):
148 return True
149 # return early on (un)matched primary/unique external id
150 for ext_id in (
151 ExternalID.MB_RECORDING,
152 ExternalID.MB_TRACK,
153 ExternalID.ACOUSTID,
154 ):
155 external_id_match = compare_external_ids(
156 base_item.external_ids, compare_item.external_ids, ext_id
157 )
158 if external_id_match is not None:
159 return external_id_match
160 # check secondary external id matches
161 for ext_id in (
162 ExternalID.DISCOGS,
163 ExternalID.TADB,
164 ExternalID.ISRC,
165 ExternalID.ASIN,
166 ):
167 external_id_match = compare_external_ids(
168 base_item.external_ids, compare_item.external_ids, ext_id
169 )
170 if external_id_match is True:
171 # we got a 'soft-match' on a secondary external id (like ISRC)
172 # but we do a double check on duration
173 if abs(base_item.duration - compare_item.duration) <= 8:
174 return True
175
176 # compare name
177 if not compare_strings(base_item.name, compare_item.name, strict=True):
178 return False
179 # track artist(s) must match
180 if not compare_artists(base_item.artists, compare_item.artists, any_match=not strict):
181 return False
182 # track version must match
183 if strict and not compare_version(base_item.version, compare_item.version):
184 return False
185 # check if both tracks are (not) explicit
186 if base_item.metadata.explicit is None and isinstance(base_item.album, Album):
187 base_item.metadata.explicit = base_item.album.metadata.explicit
188 if compare_item.metadata.explicit is None and isinstance(compare_item.album, Album):
189 compare_item.metadata.explicit = compare_item.album.metadata.explicit
190 if strict and compare_explicit(base_item.metadata, compare_item.metadata) is False:
191 return False
192
193 # exact albumtrack match = 100% match
194 if (
195 base_item.album
196 and compare_item.album
197 and compare_album(base_item.album, compare_item.album, False)
198 and base_item.disc_number
199 and compare_item.disc_number
200 and base_item.track_number
201 and compare_item.track_number
202 and base_item.disc_number == compare_item.disc_number
203 and base_item.track_number == compare_item.track_number
204 ):
205 return True
206
207 # fallback: exact album match and (near-exact) track duration match
208 if (
209 base_item.album is not None
210 and compare_item.album is not None
211 and (base_item.track_number == 0 or compare_item.track_number == 0)
212 and compare_album(base_item.album, compare_item.album, False)
213 and abs(base_item.duration - compare_item.duration) <= 3
214 ):
215 return True
216
217 # fallback: additional compare albums provided for base track
218 if (
219 compare_item.album is not None
220 and track_albums
221 and abs(base_item.duration - compare_item.duration) <= 3
222 ):
223 for track_album in track_albums:
224 if compare_album(track_album, compare_item.album, False):
225 return True
226
227 # fallback edge case: albumless track with same duration
228 if (
229 base_item.album is None
230 and compare_item.album is None
231 and base_item.disc_number == 0
232 and compare_item.disc_number == 0
233 and base_item.track_number == 0
234 and compare_item.track_number == 0
235 and base_item.duration == compare_item.duration
236 ):
237 return True
238
239 if strict:
240 # in strict mode, we require an exact album match so return False here
241 return False
242
243 # Accept last resort (in non strict mode): (near) exact duration,
244 # otherwise fail all other cases.
245 # Note that as this stage, all other info already matches,
246 # such as title, artist etc.
247 return abs(base_item.duration - compare_item.duration) <= 2
248
249
250def compare_playlist(
251 base_item: Playlist | ItemMapping,
252 compare_item: Playlist | ItemMapping,
253 strict: bool = True,
254) -> bool | None:
255 """Compare two Playlist items and return True if they match."""
256 # require (exact) name match
257 if not compare_strings(base_item.name, compare_item.name, strict=strict):
258 return False
259 # require exact owner match (if not ItemMapping)
260 if isinstance(base_item, Playlist) and isinstance(compare_item, Playlist):
261 if not compare_strings(base_item.owner, compare_item.owner):
262 return False
263 # a playlist is always unique - so do a strict compare on item id(s)
264 return compare_item_ids(base_item, compare_item)
265
266
267def compare_radio(
268 base_item: Radio | ItemMapping,
269 compare_item: Radio | ItemMapping,
270 strict: bool = True,
271) -> bool | None:
272 """Compare two Radio items and return True if they match."""
273 # return early on exact item_id match
274 if compare_item_ids(base_item, compare_item):
275 return True
276 # compare version
277 if not compare_version(base_item.version, compare_item.version):
278 return False
279 # finally comparing on (exact) name match
280 return compare_strings(base_item.name, compare_item.name, strict=strict)
281
282
283def compare_audiobook(
284 base_item: Audiobook | ItemMapping,
285 compare_item: Audiobook | ItemMapping,
286 strict: bool = True,
287) -> bool | None:
288 """Compare two Audiobook items and return True if they match."""
289 # return early on exact item_id match
290 if compare_item_ids(base_item, compare_item):
291 return True
292
293 # return early on (un)matched external id
294 for ext_id in (
295 ExternalID.ASIN,
296 ExternalID.BARCODE,
297 ):
298 external_id_match = compare_external_ids(
299 base_item.external_ids, compare_item.external_ids, ext_id
300 )
301 if external_id_match is not None:
302 return external_id_match
303
304 # compare version
305 if not compare_version(base_item.version, compare_item.version):
306 return False
307 # compare name
308 if not compare_strings(base_item.name, compare_item.name, strict=True):
309 return False
310 if not strict and (isinstance(base_item, ItemMapping) or isinstance(compare_item, ItemMapping)):
311 return True
312 # for strict matching we REQUIRE both items to be a real Audiobook object
313 assert isinstance(base_item, Audiobook)
314 assert isinstance(compare_item, Audiobook)
315 # compare publisher
316 if (
317 base_item.publisher
318 and compare_item.publisher
319 and not compare_strings(base_item.publisher, compare_item.publisher, strict=True)
320 ):
321 return False
322 # compare narrator(s) â different narrators indicate different recordings and must not be merged
323 if base_item.narrators and compare_item.narrators:
324 base_narrators = {create_safe_string(n) for n in base_item.narrators}
325 compare_narrators = {create_safe_string(n) for n in compare_item.narrators}
326 if base_narrators.isdisjoint(compare_narrators):
327 return False
328 # compare author(s)
329 for author in base_item.authors:
330 author_safe = create_safe_string(author)
331 if author_safe in [create_safe_string(x) for x in compare_item.authors]:
332 return True
333 return False
334
335
336def compare_podcast(
337 base_item: Podcast | ItemMapping,
338 compare_item: Podcast | ItemMapping,
339 strict: bool = True,
340) -> bool | None:
341 """Compare two Podcast items and return True if they match."""
342 # return early on exact item_id match
343 if compare_item_ids(base_item, compare_item):
344 return True
345
346 # return early on (un)matched external id
347 for ext_id in (
348 ExternalID.ASIN,
349 ExternalID.BARCODE,
350 ):
351 external_id_match = compare_external_ids(
352 base_item.external_ids, compare_item.external_ids, ext_id
353 )
354 if external_id_match is not None:
355 return external_id_match
356
357 # compare version
358 if not compare_version(base_item.version, compare_item.version):
359 return False
360 # compare name
361 if not compare_strings(base_item.name, compare_item.name, strict=True):
362 return False
363 if not strict and (isinstance(base_item, ItemMapping) or isinstance(compare_item, ItemMapping)):
364 return True
365 # for strict matching we REQUIRE both items to be a real Podcast object
366 assert isinstance(base_item, Podcast)
367 assert isinstance(compare_item, Podcast)
368 # compare publisher
369 return not (
370 base_item.publisher
371 and compare_item.publisher
372 and not compare_strings(base_item.publisher, compare_item.publisher, strict=True)
373 )
374
375
376def compare_item_mapping(
377 base_item: ItemMapping,
378 compare_item: ItemMapping,
379 strict: bool = True,
380) -> bool | None:
381 """Compare two ItemMapping items and return True if they match."""
382 # return early on exact item_id match
383 if compare_item_ids(base_item, compare_item):
384 return True
385 # return early on (un)matched external id
386 # check all ExternalID, as ItemMapping is a minimized obj for all MediaItems
387 for ext_id in ExternalID:
388 external_id_match = compare_external_ids(
389 base_item.external_ids, compare_item.external_ids, ext_id
390 )
391 if external_id_match is not None:
392 return external_id_match
393 # compare version
394 if not compare_version(base_item.version, compare_item.version):
395 return False
396 # finally comparing on (exact) name match
397 return compare_strings(base_item.name, compare_item.name, strict=strict)
398
399
400def compare_artists(
401 base_items: list[Artist | ItemMapping],
402 compare_items: list[Artist | ItemMapping],
403 any_match: bool = True,
404) -> bool:
405 """Compare two lists of artist and return True if both lists match (exactly)."""
406 if not base_items or not compare_items:
407 return False
408 # match if first artist matches in both lists
409 if compare_artist(base_items[0], compare_items[0]):
410 return True
411 # compare the artist lists
412 matches = 0
413 for base_item in base_items:
414 for compare_item in compare_items:
415 if compare_artist(base_item, compare_item):
416 if any_match:
417 return True
418 matches += 1
419 return len(base_items) == len(compare_items) == matches
420
421
422def compare_albums(
423 base_items: list[Album | ItemMapping],
424 compare_items: list[Album | ItemMapping],
425 any_match: bool = True,
426) -> bool:
427 """Compare two lists of albums and return True if a match was found."""
428 matches = 0
429 for base_item in base_items:
430 for compare_item in compare_items:
431 if compare_album(base_item, compare_item):
432 if any_match:
433 return True
434 matches += 1
435 return len(base_items) == matches
436
437
438def compare_item_ids(
439 base_item: MediaItem | ItemMapping, compare_item: MediaItem | ItemMapping
440) -> bool:
441 """Compare item_id(s) of two media items."""
442 if not base_item.provider or not compare_item.provider:
443 return False
444 if not base_item.item_id or not compare_item.item_id:
445 return False
446 if base_item.provider == compare_item.provider and base_item.item_id == compare_item.item_id:
447 return True
448
449 base_prov_ids = getattr(base_item, "provider_mappings", None)
450 compare_prov_ids = getattr(compare_item, "provider_mappings", None)
451
452 if base_prov_ids is not None:
453 assert isinstance(base_item, MediaItem) # for type checking
454 for prov_l in base_item.provider_mappings:
455 if (
456 prov_l.provider_instance == compare_item.provider
457 and prov_l.item_id == compare_item.item_id
458 ):
459 return True
460
461 if compare_prov_ids is not None:
462 assert isinstance(compare_item, MediaItem) # for type checking
463 for prov_r in compare_item.provider_mappings:
464 if (
465 prov_r.provider_instance == base_item.provider
466 and prov_r.item_id == base_item.item_id
467 ):
468 return True
469
470 if base_prov_ids is not None and compare_prov_ids is not None:
471 assert isinstance(base_item, MediaItem) # for type checking
472 assert isinstance(compare_item, MediaItem) # for type checking
473 for prov_l in base_item.provider_mappings:
474 for prov_r in compare_item.provider_mappings:
475 if prov_l.provider_domain != prov_r.provider_domain:
476 continue
477 if (
478 prov_l.is_unique or prov_r.is_unique
479 ) and prov_l.provider_instance != prov_r.provider_instance:
480 continue
481 if prov_l.item_id == prov_r.item_id:
482 return True
483 return False
484
485
486def compare_external_ids(
487 external_ids_base: set[tuple[ExternalID, str]],
488 external_ids_compare: set[tuple[ExternalID, str]],
489 external_id_type: ExternalID,
490) -> bool | None:
491 """Compare external ids and return True if a match was found."""
492 base_ids = {x[1] for x in external_ids_base if x[0] == external_id_type}
493 if not base_ids:
494 # return early if the requested external id type is not present in the base set
495 return None
496 compare_ids = {x[1] for x in external_ids_compare if x[0] == external_id_type}
497 if not compare_ids:
498 # return early if the requested external id type is not present in the compare set
499 return None
500 for base_id in base_ids:
501 if base_id in compare_ids:
502 return True
503 # handle upc stored as EAN-13 barcode
504 if external_id_type == ExternalID.BARCODE and len(base_id) == 12:
505 if f"0{base_id}" in compare_ids:
506 return True
507 # handle EAN-13 stored as UPC barcode
508 if external_id_type == ExternalID.BARCODE and len(base_id) == 13:
509 if base_id[1:] in compare_ids:
510 return True
511 # return false if the identifier is unique (e.g. musicbrainz id)
512 if external_id_type.is_unique:
513 return False
514 return None
515
516
517def create_safe_string(input_str: str, lowercase: bool = True, replace_space: bool = False) -> str:
518 """Return clean lowered string for compare actions."""
519 # handle some special cases
520 if input_str in ("P!nk", "p!nk"):
521 input_str = input_str.replace("!", "i")
522 if input_str in ("Whâ", "whâ"):
523 input_str = input_str.replace("â", "o")
524 if input_str in ("KoЯn", "koЯn"):
525 input_str = input_str.replace("Я", "r")
526 if input_str == "$hort":
527 input_str = input_str.replace("$hort", "short")
528 input_str = input_str.lower().strip() if lowercase else input_str.strip()
529 unaccented_string = unidecode.unidecode(input_str)
530 regex = r"[^a-zA-Z0-9]" if replace_space else r"[^a-zA-Z0-9 ]"
531 return re.sub(regex, "", unaccented_string)
532
533
534def loose_compare_strings(base: str, alt: str) -> bool:
535 """Compare strings and return True even on partial match."""
536 # this is used to display 'versions' of the same track/album
537 # where we account for other spelling or some additional wording in the title
538 if len(base) <= 3 or len(alt) <= 3:
539 return compare_strings(base, alt, True)
540 word_count = len(base.strip().split(" "))
541 if word_count == 1 and len(base) < 10:
542 return compare_strings(base, alt, False)
543 base_comp = create_safe_string(base)
544 alt_comp = create_safe_string(alt)
545 if base_comp in alt_comp:
546 return True
547 return base_comp in alt_comp
548
549
550def compare_strings(str1: str, str2: str, strict: bool = True) -> bool:
551 """Compare strings and return True if we have an (almost) perfect match."""
552 if not str1 or not str2:
553 return False
554 str1_lower = str1.lower()
555 str2_lower = str2.lower()
556 if strict:
557 return str1_lower == str2_lower
558 # return early if total length mismatch
559 if abs(len(str1) - len(str2)) > 4:
560 return False
561 # handle '&' vs 'And'
562 if " & " in str1_lower and " and " in str2_lower:
563 str2 = str2_lower.replace(" and ", " & ")
564 elif " and " in str1_lower and " & " in str2:
565 str2 = str2_lower.replace(" & ", " and ")
566 if create_safe_string(str1) == create_safe_string(str2):
567 return True
568 # last resort: use difflib to compare strings
569 required_accuracy = 0.9 if (len(str1) + len(str2)) > 18 else 0.8
570 return SequenceMatcher(a=str1_lower, b=str2_lower).ratio() > required_accuracy
571
572
573def compare_version(base_version: str, compare_version: str) -> bool:
574 """Compare version string."""
575 if not base_version and not compare_version:
576 return True
577 if not base_version and compare_version.lower() in IGNORE_VERSIONS:
578 return True
579 if not compare_version and base_version.lower() in IGNORE_VERSIONS:
580 return True
581 if not base_version and compare_version:
582 return False
583 if base_version and not compare_version:
584 return False
585
586 if " " not in base_version and " " not in compare_version:
587 return compare_strings(base_version, compare_version, False)
588
589 # do this the hard way as sometimes the version string is in the wrong order
590 base_versions = sorted(base_version.lower().split(" "))
591 compare_versions = sorted(compare_version.lower().split(" "))
592 # filter out words we can ignore (such as 'version')
593 ignore_words = [
594 *IGNORE_VERSIONS,
595 "version",
596 "edition",
597 "variant",
598 "versie",
599 "versione",
600 ]
601 base_versions = [x for x in base_versions if x not in ignore_words]
602 compare_versions = [x for x in compare_versions if x not in ignore_words]
603
604 return base_versions == compare_versions
605
606
607def compare_explicit(base: MediaItemMetadata, compare: MediaItemMetadata) -> bool | None:
608 """Compare if explicit is same in metadata."""
609 if base.explicit is not None and compare.explicit is not None:
610 # explicitness info is not always present in metadata
611 # only strict compare them if both have the info set
612 return base.explicit == compare.explicit
613 return None
614