Source code for pyicat_plus.client.metadata
import os
from typing import List
from typing import Optional
from xml.etree import ElementTree
from . import defaults
from .messaging import IcatMessagingClient
from .xmlns import dataset_as_xml
from .xmlns import investigation_as_xml
[docs]
class IcatMetadataClient:
"""Client for storing dataset metadata in ICAT."""
def __init__(
self,
queue_urls: Optional[List[str]] = None,
queue_name: Optional[str] = None,
monitor_port: Optional[int] = None,
timeout: Optional[float] = None,
):
if queue_urls is None:
defaults.METADATA_BROKERS
if queue_name is None:
queue_name = defaults.METADATA_QUEUE
self._client = IcatMessagingClient(
queue_urls, queue_name, monitor_port=monitor_port, timeout=timeout
)
[docs]
def send_metadata(
self,
beamline: str,
proposal: str,
dataset: str,
path: str,
metadata: dict,
):
"""
Send dataset metadata to ICAT.
:param beamline: The beamline name of the proposal.
:param proposal: The proposal name.
:param dataset: The dataset name.
:param path: The path to the dataset on disk.
:param metadata: A dictionary of metadata to be attached to the dataset.
"""
root = dataset_as_xml(
beamline=beamline,
proposal=proposal,
dataset=dataset,
path=path,
metadata=metadata,
)
self._client.send(ElementTree.tostring(root))
[docs]
def store_metadata(
self,
filename: str,
beamline: str,
proposal: str,
dataset: str,
path: str,
metadata: dict,
):
root = dataset_as_xml(
beamline=beamline,
proposal=proposal,
dataset=dataset,
path=path,
metadata=metadata,
)
filename, ext = os.path.splitext(filename)
if not ext:
ext = ".xml"
filename += ext
dirname = os.path.dirname(filename)
if dirname:
os.makedirs(dirname, exist_ok=True)
with open(filename, "wb") as f:
f.write(ElementTree.tostring(root))
[docs]
def send_metadata_from_file(self, filename: str):
filename, ext = os.path.splitext(filename)
if not ext:
ext = ".xml"
filename += ext
with open(filename, "rb") as f:
payload = f.read()
self._client.send(payload)
[docs]
def start_investigation(
self, beamline: str, proposal: str, start_datetime=None, end_datetime=None
):
root = investigation_as_xml(
beamline=beamline,
proposal=proposal,
start_datetime=start_datetime,
end_datetime=end_datetime,
)
self._client.reconnect()
self._client.send(ElementTree.tostring(root))
[docs]
def reschedule_investigation(self, investigation_id: str):
root = investigation_as_xml(
investigation_id=investigation_id,
beamline="",
proposal="",
)
self._client.reconnect()
self._client.send(ElementTree.tostring(root))
[docs]
def check_health(self):
"""Raises an exception when not healthy"""
self._client.check_health()