Source code for pyicat_plus.client.update_metadata

import json
from typing import List
from typing import Optional

from . import defaults
from .messaging import IcatMessagingClient


[docs] class IcatUpdateMetadataClient: """Client for updating the metadata linked to datasets.""" def __init__( self, queue_urls: Optional[List[str]] = None, queue_name: Optional[str] = None, monitor_port: Optional[int] = None, timeout: Optional[float] = None, ): if queue_name is None: queue_name = defaults.UPDATE_METADATA_QUEUE if queue_urls is None: queue_urls = defaults.UPDATE_METADATA_BROKERS self._client = IcatMessagingClient( queue_urls, queue_name, monitor_port=monitor_port, timeout=timeout )
[docs] def disconnect(self): self._client.disconnect()
[docs] def send_update_metadata( self, proposal: str, beamline: str, dataset_paths: List[str], metadata_name: str, metadata_value: str, ): if not proposal: raise AssertionError("ICAT requires the proposal name") if not beamline: raise AssertionError("ICAT requires the beamline name") if not dataset_paths: raise AssertionError("ICAT requires the dataset paths") if not metadata_name: raise AssertionError("ICAT requires the metadata name") if not metadata_value: raise AssertionError("ICAT requires the metadata value") root = { "proposal": proposal, "beamline": beamline, "datasetPaths": dataset_paths, "metadataName": metadata_name, "metadataValue": metadata_value, } data = json.dumps(root).encode("utf-8") self._client.send(data)
[docs] def check_health(self): """Raises an exception when not healthy""" self._client.check_health()