/
/
/
1# ruff: noqa: D100, D101, D102
2
3from __future__ import annotations
4
5from collections.abc import Callable
6from typing import Any, Protocol
7
8type RetVal = dict[str, Any] | None
9type RetTuple = tuple[RetVal, RetVal]
10
11SnapclientCallback = Callable[["SnapclientProto"], Any]
12SnapgroupCallback = Callable[["SnapgroupProto"], Any]
13SnapstreamCallback = Callable[["SnapstreamProto"], Any]
14
15
16class SnapgroupProto(Protocol):
17 @property
18 def identifier(self) -> str: ...
19 @property
20 def name(self) -> str: ...
21 @property
22 def clients(self) -> list[str]: ...
23 @property
24 def stream(self) -> str: ...
25 def callback(self) -> None: ...
26 def set_callback(self, func: SnapgroupCallback | None) -> None: ...
27 async def set_name(self, name: str) -> RetVal: ...
28 async def set_stream(self, stream_id: str) -> RetVal: ...
29 async def add_client(self, client_identifier: str) -> None: ...
30
31
32class SnapclientProto(Protocol):
33 # read-only properties used by your code
34 @property
35 def identifier(self) -> str: ...
36
37 @property
38 def group(self) -> SnapgroupProto | None: ...
39
40 @property
41 def friendly_name(self) -> str: ...
42
43 @property
44 def connected(self) -> bool: ...
45
46 @property
47 def name(self) -> str: ...
48
49 @property
50 def latency(self) -> int: ...
51
52 @property
53 def muted(self) -> bool: ...
54
55 @property
56 def volume(self) -> int: ...
57
58 _client: dict[str, Any]
59
60 def groups_available(self) -> list[SnapgroupProto]: ...
61
62 async def set_muted(self, status: bool) -> None: ...
63 async def set_volume(self, percent: int, update_group: bool = True) -> None: ...
64
65 def callback(self) -> None: ...
66 def set_callback(self, func: SnapclientCallback | None) -> None: ...
67
68
69class SnapstreamProto(Protocol):
70 _stream: dict[str, Any]
71
72 @property
73 def identifier(self) -> str: ...
74 @property
75 def friendly_name(self) -> str: ...
76 @property
77 def status(self) -> str: ...
78 @property
79 def path(self) -> str | None: ...
80 def callback(self) -> None: ...
81 def set_callback(self, func: SnapstreamCallback | None) -> None: ...
82
83
84class SnapserverProto(Protocol):
85 # lifecycle
86 async def start(self) -> Any: ...
87 def stop(self) -> None: ...
88
89 # collections (as in the external lib: list(...) views)
90 @property
91 def groups(self) -> list[SnapgroupProto]: ...
92 @property
93 def clients(self) -> list[SnapclientProto]: ...
94 @property
95 def streams(self) -> list[SnapstreamProto]: ...
96
97 # accessors
98 def group(self, group_identifier: str) -> SnapgroupProto: ...
99 def client(self, client_identifier: str) -> SnapclientProto: ...
100 def stream(self, stream_identifier: str) -> SnapstreamProto: ...
101
102 async def delete_client(self, identifier: str) -> RetTuple: ...
103
104 async def status(self) -> tuple[Any, Any]: ...
105
106 async def group_clients(self, identifier: str, clients: list[str]) -> RetVal: ...
107 async def group_name(self, identifier: str, name: str) -> RetVal: ...
108 async def group_stream(self, identifier: str, stream_id: str) -> RetVal: ...
109
110 async def client_name(self, identifier: str, name: str) -> RetVal: ...
111 async def client_latency(self, identifier: str, latency: int) -> RetVal: ...
112 async def client_volume(self, identifier: str, volume: dict[str, Any]) -> RetVal: ...
113
114 async def stream_add_stream(self, stream_uri: str) -> RetVal: ...
115 async def stream_remove_stream(self, identifier: str) -> RetVal: ...
116
117 # state sync
118 def synchronize(self, status: dict[str, Any]) -> None: ...
119
120 # callbacks (if you use them)
121 def set_on_update_callback(self, func: Callable[[], Any] | None) -> None: ...
122 def set_on_connect_callback(self, func: Callable[[], Any] | None) -> None: ...
123 def set_on_disconnect_callback(self, func: Callable[[Exception], Any] | None) -> None: ...
124 def set_new_client_callback(self, func: Callable[[SnapclientProto], Any] | None) -> None: ...
125