Source code for pyicat_plus.client.metadata
import os
from typing import List
from typing import Optional
from .._icat_messaging.serializers.dataset import serialize_dataset_message
from .._icat_messaging.serializers.investigation import serialize_investigation_message
from . import defaults
from .messaging import IcatMessagingClient
[docs]
class IcatMetadataClient:
"""Client for storing dataset metadata in ICAT."""
def __init__(
self,
queue_urls: Optional[List[str]] = None,
queue_name: Optional[str] = None,
monitor_port: Optional[int] = None,
timeout: Optional[float] = None,
):
if queue_urls is None:
defaults.METADATA_BROKERS
if queue_name is None:
queue_name = defaults.METADATA_QUEUE
self._client = IcatMessagingClient(
queue_urls, queue_name, monitor_port=monitor_port, timeout=timeout
)
[docs]
def send_metadata(
self,
beamline: str,
proposal: str,
dataset: str,
path: str,
metadata: dict,
strict: bool = False,
):
"""
Send dataset metadata to ICAT.
:param beamline: The beamline name of the proposal.
:param proposal: The proposal name.
:param dataset: The dataset name.
:param path: The path to the dataset on disk.
:param metadata: A dictionary of metadata to be attached to the dataset.
:param strict: Strict metadata validation.
:raises IcatMetadataSerializationError: If metadata serialization fails.
:raises IcatMetadataValidationError: If ``strict=True`` and the metadata is invalid.
:emits IcatMetadataValidationWarning: If ``strict=False`` and the metadata is invalid.
:emits IcatMetadataDeprecatedWarning: If metadata is deprecated.
"""
data = serialize_dataset_message(
beamline=beamline,
proposal=proposal,
dataset=dataset,
path=path,
metadata=metadata,
strict=strict,
)
self._client.send(data)
[docs]
def store_metadata(
self,
filename: str,
beamline: str,
proposal: str,
dataset: str,
path: str,
metadata: dict,
strict: bool = False,
):
"""
Save dataset metadata in an XML file.
:param beamline: The beamline name of the proposal.
:param proposal: The proposal name.
:param dataset: The dataset name.
:param path: The path to the dataset on disk.
:param metadata: A dictionary of metadata to be attached to the dataset.
:param strict: Strict metadata validation.
:raises IcatMetadataValidationError: If ``strict=True`` and the metadata is invalid.
:emits IcatMetadataValidationWarning: If ``strict=False`` and the metadata is invalid.
:emits IcatMetadataDeprecatedWarning: If metadata is deprecated.
"""
data = serialize_dataset_message(
beamline=beamline,
proposal=proposal,
dataset=dataset,
path=path,
metadata=metadata,
strict=strict,
)
filename, ext = os.path.splitext(filename)
if not ext:
ext = ".xml"
filename += ext
dirname = os.path.dirname(filename)
if dirname:
os.makedirs(dirname, exist_ok=True)
with open(filename, "wb") as f:
f.write(data)
[docs]
def send_metadata_from_file(self, filename: str):
filename, ext = os.path.splitext(filename)
if not ext:
ext = ".xml"
filename += ext
with open(filename, "rb") as f:
data = f.read()
self._client.send(data)
[docs]
def start_investigation(
self,
beamline: str,
proposal: str,
start_datetime=None,
end_datetime=None,
strict: bool = False,
):
"""
Send investigation metadata to ICAT.
:param beamline: The beamline name of the proposal.
:param proposal: The proposal name.
:param start_datetime: Start data or the investigation.
:param end_datetime: for unit tests (deprecated).
:param strict: Strict metadata validation.
:raises IcatMetadataValidationError: If ``strict=True`` and the metadata is invalid.
:emits IcatMetadataValidationWarning: If ``strict=False`` and the metadata is invalid.
:emits IcatMetadataDeprecatedWarning: If metadata is deprecated.
"""
data = serialize_investigation_message(
beamline=beamline,
proposal=proposal,
start_datetime=start_datetime,
end_datetime=end_datetime,
strict=strict,
)
self._client.reconnect()
self._client.send(data)
[docs]
def reschedule_investigation(self, investigation_id: str, strict: bool = False):
"""
:raises IcatMetadataValidationError: If ``strict=True`` and the metadata is invalid.
:emits IcatMetadataValidationWarning: If ``strict=False`` and the metadata is invalid.
"""
data = serialize_investigation_message(
investigation_id=investigation_id,
beamline="",
proposal="",
strict=strict,
)
self._client.reconnect()
self._client.send(data)
[docs]
def check_health(self):
"""Raises an exception when not healthy"""
self._client.check_health()