Module aioimmich.api
aioimmich api.
Classes
class CacheEntry (etag: str | None, result: dict | list | None)
-
Expand source code
@dataclass class CacheEntry: """Representation of response cache entry.""" etag: str | None result: dict | list | None
Representation of response cache entry.
Instance variables
var etag : str | None
var result : dict | list | None
class ImmichApi (aiohttp_session: ClientSession,
api_key: str,
device_id: str,
host: str,
port: int = 2283,
use_ssl: bool = True)-
Expand source code
class ImmichApi: """immich api.""" def __init__( self, aiohttp_session: ClientSession, api_key: str, device_id: str, host: str, port: int = 2283, use_ssl: bool = True, ) -> None: """Immich api init.""" self.session: ClientSession = aiohttp_session self.api_key = api_key self.base_url = f"{'https' if use_ssl else 'http'}://{host}:{port}/api" self.cache: dict[str, CacheEntry] = {} self.device_id = device_id async def async_do_request( self, end_point: str, params: dict | None = None, data: dict | None = None, raw_data: dict | None = None, method: str = "GET", application: str = "json", raw_response_content: bool = False, ) -> list | dict | bytes | StreamReader | None: """Perform the request and handle errors.""" headers = {"Accept": f"application/{application}", "x-api-key": self.api_key} url = f"{self.base_url}/{end_point}" cache_key = f"{method}_{end_point}_{params}" if cache_key not in self.cache: self.cache[cache_key] = CacheEntry(None, None) cache_entry = self.cache[cache_key] if (etag := cache_entry.etag) is not None: headers.update({"If-None-Match": etag}) LOGGER.debug( "REQUEST url: %s params: %s data: %s headers: %s", url, params, data, {**headers, "x-api-key": "**********"}, ) try: resp = await self.session.request( method, url, params=params, json=data, data=raw_data, headers=headers ) LOGGER.debug("RESPONSE headers: %s", dict(resp.headers)) if resp.status == 304: # 304 = not modified LOGGER.debug("RESPONSE from cache") return cache_entry.result if 200 <= resp.status < 300: if raw_response_content: LOGGER.debug("RESPONSE as stream") return resp.content if application == "json": result = await resp.json() if cache_entry.etag is None: cache_entry.etag = resp.headers.get("Etag") cache_entry.result = result LOGGER.debug("RESPONSE: %s", result) return result LOGGER.debug("RESPONSE as bytes") return await resp.read() err_result = await resp.json() LOGGER.debug("RESPONSE %s", err_result) if resp.status == 400: raise ImmichError(err_result) if resp.status == 401: raise ImmichUnauthorizedError(err_result) if resp.status == 403: raise ImmichForbiddenError(err_result) if resp.status == 404: raise ImmichNotFoundError(err_result) return resp.raise_for_status() except CONNECT_ERRORS as err: LOGGER.debug("connection error", exc_info=True) LOGGER.error( "Error while getting data: %s: %s", err.__class__.__name__, err.__class__.__cause__, ) raise err
immich api.
Immich api init.
Methods
async def async_do_request(self,
end_point: str,
params: dict | None = None,
data: dict | None = None,
raw_data: dict | None = None,
method: str = 'GET',
application: str = 'json',
raw_response_content: bool = False) ‑> list | dict | bytes | aiohttp.streams.StreamReader | None-
Expand source code
async def async_do_request( self, end_point: str, params: dict | None = None, data: dict | None = None, raw_data: dict | None = None, method: str = "GET", application: str = "json", raw_response_content: bool = False, ) -> list | dict | bytes | StreamReader | None: """Perform the request and handle errors.""" headers = {"Accept": f"application/{application}", "x-api-key": self.api_key} url = f"{self.base_url}/{end_point}" cache_key = f"{method}_{end_point}_{params}" if cache_key not in self.cache: self.cache[cache_key] = CacheEntry(None, None) cache_entry = self.cache[cache_key] if (etag := cache_entry.etag) is not None: headers.update({"If-None-Match": etag}) LOGGER.debug( "REQUEST url: %s params: %s data: %s headers: %s", url, params, data, {**headers, "x-api-key": "**********"}, ) try: resp = await self.session.request( method, url, params=params, json=data, data=raw_data, headers=headers ) LOGGER.debug("RESPONSE headers: %s", dict(resp.headers)) if resp.status == 304: # 304 = not modified LOGGER.debug("RESPONSE from cache") return cache_entry.result if 200 <= resp.status < 300: if raw_response_content: LOGGER.debug("RESPONSE as stream") return resp.content if application == "json": result = await resp.json() if cache_entry.etag is None: cache_entry.etag = resp.headers.get("Etag") cache_entry.result = result LOGGER.debug("RESPONSE: %s", result) return result LOGGER.debug("RESPONSE as bytes") return await resp.read() err_result = await resp.json() LOGGER.debug("RESPONSE %s", err_result) if resp.status == 400: raise ImmichError(err_result) if resp.status == 401: raise ImmichUnauthorizedError(err_result) if resp.status == 403: raise ImmichForbiddenError(err_result) if resp.status == 404: raise ImmichNotFoundError(err_result) return resp.raise_for_status() except CONNECT_ERRORS as err: LOGGER.debug("connection error", exc_info=True) LOGGER.error( "Error while getting data: %s: %s", err.__class__.__name__, err.__class__.__cause__, ) raise err
Perform the request and handle errors.
class ImmichSubApi (api: ImmichApi)
-
Expand source code
class ImmichSubApi: """immich sub api.""" def __init__(self, api: ImmichApi) -> None: """Immich sub api init.""" self.api = api
immich sub api.
Immich sub api init.
Subclasses