Source code for pyicat_plus.client.archive

import json
from typing import List
from typing import Optional

from . import defaults
from .deprecation_utils import warn_deprecated_module
from .messaging import IcatMessagingClient
from .types import ArchiveStatusLevel
from .types import ArchiveStatusLevel as StatusLevel  # noqa F401 deprecated
from .types import ArchiveStatusType
from .types import ArchiveStatusType as StatusType  # noqa F401 deprecated

warn_deprecated_module()


[docs] class IcatArchiveStatusClient: """Client for storing archive and restoration status in ICAT.""" def __init__( self, queue_urls: Optional[List[str]] = None, queue_name: Optional[str] = None, monitor_port: Optional[int] = None, timeout: Optional[float] = None, ): if queue_name is None: queue_name = defaults.ARCHIVE_QUEUE if queue_urls is None: queue_urls = defaults.ARCHIVE_BROKERS self._client = IcatMessagingClient( queue_urls, queue_name, monitor_port=monitor_port, timeout=timeout )
[docs] def disconnect(self): self._client.disconnect()
[docs] def send_archive_status( self, dataset_id: int, type: ArchiveStatusType, level: ArchiveStatusLevel, message: str, ): assert dataset_id, "ICAT requires the datasetId" assert type, "ICAT requires the type" assert level, "ICAT requires the level" root = { "datasetId": dataset_id, "type": type.value, "level": level.value, "message": message, } data = json.dumps(root).encode("utf-8") self._client.send(data)
[docs] def check_health(self): """Raises an exception when not healthy""" self._client.check_health()