/
/
/
1"""Utilities for image manipulation and retrieval."""
2
3from __future__ import annotations
4
5import asyncio
6import hashlib
7import itertools
8import os
9import random
10from base64 import b64decode
11from collections import OrderedDict
12from collections.abc import Iterable
13from io import BytesIO
14from typing import TYPE_CHECKING, cast
15
16import aiofiles
17from aiohttp.client_exceptions import ClientError
18from PIL import Image, UnidentifiedImageError
19
20from music_assistant.helpers.security import is_safe_path
21from music_assistant.helpers.tags import get_embedded_image
22from music_assistant.models.metadata_provider import MetadataProvider
23from music_assistant.models.music_provider import MusicProvider
24from music_assistant.models.plugin import PluginProvider
25
26if TYPE_CHECKING:
27 from music_assistant_models.media_items import MediaItemImage
28 from PIL.Image import Image as ImageClass
29
30 from music_assistant.mass import MusicAssistant
31
32
33# Thumbnail cache: on-disk (persistent) + small in-memory FIFO (hot path)
34_THUMB_CACHE_DIR = "thumbnails"
35_THUMB_MEMORY_CACHE_MAX = 50
36
37_thumb_memory_cache: OrderedDict[str, bytes] = OrderedDict()
38
39
40def _create_thumb_hash(provider: str, path_or_url: str) -> str:
41 """Create a safe filesystem hash from provider and image path."""
42 raw = f"{provider}/{path_or_url}"
43 return hashlib.sha256(raw.encode(), usedforsecurity=False).hexdigest()
44
45
46def _thumb_cache_filename(thumb_hash: str, size: int | None, image_format: str) -> str:
47 """Build the cache filename for a thumbnail."""
48 ext = image_format.lower()
49 if ext == "jpeg":
50 ext = "jpg"
51 return f"{thumb_hash}_{size or 0}.{ext}"
52
53
54def _get_from_memory_cache(key: str) -> bytes | None:
55 """Retrieve thumbnail from in-memory FIFO cache."""
56 if key in _thumb_memory_cache:
57 _thumb_memory_cache.move_to_end(key)
58 return _thumb_memory_cache[key]
59 return None
60
61
62def _put_in_memory_cache(key: str, data: bytes) -> None:
63 """Store thumbnail in in-memory FIFO cache."""
64 _thumb_memory_cache[key] = data
65 _thumb_memory_cache.move_to_end(key)
66 while len(_thumb_memory_cache) > _THUMB_MEMORY_CACHE_MAX:
67 _thumb_memory_cache.popitem(last=False)
68
69
70async def get_image_data(mass: MusicAssistant, path_or_url: str, provider: str) -> bytes:
71 """Create thumbnail from image url."""
72 # TODO: add local cache here !
73 if prov := mass.get_provider(provider):
74 assert isinstance(prov, MusicProvider | MetadataProvider | PluginProvider)
75 if resolved_image := await prov.resolve_image(path_or_url):
76 if isinstance(resolved_image, bytes):
77 return resolved_image
78 if isinstance(resolved_image, str):
79 path_or_url = resolved_image
80 # handle HTTP location
81 if path_or_url.startswith("http"):
82 try:
83 async with mass.http_session_no_ssl.get(path_or_url, raise_for_status=True) as resp:
84 return await resp.read()
85 except ClientError as err:
86 raise FileNotFoundError from err
87 # handle base64 embedded images
88 if path_or_url.startswith("data:image"):
89 return b64decode(path_or_url.split(",")[-1])
90 # handle FILE location (of type image)
91 if path_or_url.endswith(("jpg", "JPG", "png", "PNG", "jpeg")) and is_safe_path(path_or_url):
92 if await asyncio.to_thread(os.path.isfile, path_or_url):
93 async with aiofiles.open(path_or_url, "rb") as _file:
94 return cast("bytes", await _file.read())
95 # use ffmpeg for embedded images
96 if is_safe_path(path_or_url) and (img_data := await get_embedded_image(path_or_url)):
97 return img_data
98 msg = f"Image not found: {path_or_url}"
99 raise FileNotFoundError(msg)
100
101
102async def get_image_thumb(
103 mass: MusicAssistant,
104 path_or_url: str,
105 size: int | None,
106 provider: str,
107 image_format: str = "PNG",
108) -> bytes:
109 """Get (optimized) thumbnail from image url.
110
111 Uses a two-tier cache (in-memory FIFO + on-disk) keyed by a hash of
112 provider + path so that repeated requests never trigger ffmpeg or
113 PIL processing again. Concurrent requests for the same thumbnail
114 are de-duplicated via create_task.
115
116 :param mass: The MusicAssistant instance.
117 :param path_or_url: Path or URL to the source image.
118 :param size: Target thumbnail size (square), or None for original.
119 :param provider: Provider identifier for the image source.
120 :param image_format: Output format (PNG or JPEG/JPG).
121 """
122 image_format = image_format.upper()
123 if image_format == "JPG":
124 image_format = "JPEG"
125
126 thumb_hash = _create_thumb_hash(provider, path_or_url)
127 cache_filename = _thumb_cache_filename(thumb_hash, size, image_format)
128
129 # 1. Check in-memory FIFO cache
130 if cached := _get_from_memory_cache(cache_filename):
131 return cached
132
133 # 2. Check on-disk cache
134 thumb_dir = os.path.join(mass.cache_path, _THUMB_CACHE_DIR)
135 cache_filepath = os.path.join(thumb_dir, cache_filename)
136 if await asyncio.to_thread(os.path.isfile, cache_filepath):
137 async with aiofiles.open(cache_filepath, "rb") as f:
138 thumb_data = cast("bytes", await f.read())
139 _put_in_memory_cache(cache_filename, thumb_data)
140 return thumb_data
141
142 # 3. Generate thumbnail (de-duplicated across concurrent requests)
143 task: asyncio.Task[bytes] = mass.create_task(
144 _generate_and_cache_thumb,
145 mass,
146 path_or_url,
147 size,
148 provider,
149 image_format,
150 cache_filepath,
151 task_id=f"thumb.{cache_filename}",
152 abort_existing=False,
153 )
154 thumb_data = await asyncio.shield(task)
155 _put_in_memory_cache(cache_filename, thumb_data)
156 return thumb_data
157
158
159async def _generate_and_cache_thumb(
160 mass: MusicAssistant,
161 path_or_url: str,
162 size: int | None,
163 provider: str,
164 image_format: str,
165 cache_filepath: str,
166) -> bytes:
167 """Generate a thumbnail, persist it on disk, and return the bytes.
168
169 :param mass: The MusicAssistant instance.
170 :param path_or_url: Path or URL to the source image.
171 :param size: Target thumbnail size (square), or None for original.
172 :param provider: Provider identifier for the image source.
173 :param image_format: Normalized output format (PNG or JPEG).
174 :param cache_filepath: Absolute path where the thumbnail will be stored.
175 """
176 img_data = await get_image_data(mass, path_or_url, provider)
177 if not img_data or not isinstance(img_data, bytes):
178 raise FileNotFoundError(f"Image not found: {path_or_url}")
179
180 if not size and image_format.encode() in img_data:
181 thumb_data = img_data
182 else:
183
184 def _create_image() -> bytes:
185 data = BytesIO()
186 try:
187 img = Image.open(BytesIO(img_data))
188 except UnidentifiedImageError:
189 raise FileNotFoundError(f"Invalid image: {path_or_url}")
190 if size:
191 img.thumbnail((size, size), Image.Resampling.LANCZOS)
192 mode = "RGBA" if image_format == "PNG" else "RGB"
193 if image_format == "JPEG":
194 img.convert(mode).save(data, image_format, quality=95, optimize=False)
195 else:
196 img.convert(mode).save(data, image_format, optimize=False)
197 return data.getvalue()
198
199 thumb_data = await asyncio.to_thread(_create_image)
200
201 # Persist to disk cache (best-effort, don't fail on I/O errors)
202 try:
203 await asyncio.to_thread(os.makedirs, os.path.dirname(cache_filepath), exist_ok=True)
204 async with aiofiles.open(cache_filepath, "wb") as f:
205 await f.write(thumb_data)
206 except OSError:
207 pass
208
209 return thumb_data
210
211
212async def create_collage(
213 mass: MusicAssistant,
214 images: Iterable[MediaItemImage],
215 dimensions: tuple[int, int] = (1500, 1500),
216) -> bytes:
217 """Create a basic collage image from multiple image urls."""
218 image_size = 250
219
220 def _new_collage() -> ImageClass:
221 return Image.new("RGB", (dimensions[0], dimensions[1]), color=(255, 255, 255, 255))
222
223 collage = await asyncio.to_thread(_new_collage)
224
225 def _add_to_collage(img_data: bytes, coord_x: int, coord_y: int) -> None:
226 data = BytesIO(img_data)
227 photo = Image.open(data).convert("RGB")
228 photo = photo.resize((image_size, image_size))
229 collage.paste(photo, (coord_x, coord_y))
230 del data
231
232 # prevent duplicates with a set
233 images = list(set(images))
234 random.shuffle(images)
235 iter_images = itertools.cycle(images)
236
237 for x_co in range(0, dimensions[0], image_size):
238 for y_co in range(0, dimensions[1], image_size):
239 for _ in range(5):
240 img = next(iter_images)
241 img_data = await get_image_data(mass, img.path, img.provider)
242 if img_data:
243 await asyncio.to_thread(_add_to_collage, img_data, x_co, y_co)
244 del img_data
245 break
246
247 def _save_collage() -> bytes:
248 final_data = BytesIO()
249 collage.convert("RGB").save(final_data, "JPEG", optimize=True)
250 return final_data.getvalue()
251
252 return await asyncio.to_thread(_save_collage)
253
254
255async def get_icon_string(icon_path: str) -> str:
256 """Get svg icon as string."""
257 ext = icon_path.rsplit(".")[-1]
258 assert ext == "svg"
259 async with aiofiles.open(icon_path) as _file:
260 xml_data = await _file.read()
261 assert isinstance(xml_data, str) # for type checking
262 return xml_data.replace("\n", "").strip()
263