Source code for pyicat_plus.tests.test_icat_sync

import datetime
import logging
import os
import re
import time
from contextlib import contextmanager
from typing import Callable
from typing import Generator
from typing import Optional
from typing import Tuple

import h5py

from ..apps import sync_raw
from ..client.main import IcatClient
from ..utils import path_utils
from ..utils import sync_types


[docs] def test_unregistered_datasets(tmpdir, icat_main_client): client, messages = icat_main_client root_dir = str(tmpdir) exp_session_test = _init_unregistered_experiment( client, messages, root_dir, "hg333", "id99" ) args = client, "hg333", "id99", "20231019" kwargs = {"root_dir": root_dir, "raw_data_format": "esrfv3"} exp_session = sync_raw.sync_session(*args, **kwargs) assert len(exp_session.datasets["registered"]) == 0 assert len(exp_session.datasets["unregistered"]) == 6 if exp_session != exp_session_test: assert exp_session.as_dict() == exp_session_test.as_dict() first_datasets = exp_session_test.datasets["unregistered"][:3] last_datasets = exp_session_test.datasets["unregistered"][3:] for dataset in first_datasets: _store_dataset(client, dataset) messages.get(timeout=10) exp_session = sync_raw.sync_session(*args, **kwargs) assert len(exp_session.datasets["registered"]) == 3 assert len(exp_session.datasets["unregistered"]) == 3 for dataset in last_datasets: _store_dataset(client, dataset) messages.get(timeout=10) exp_session = sync_raw.sync_session(*args, **kwargs) assert len(exp_session.datasets["registered"]) == 6 assert len(exp_session.datasets["unregistered"]) == 0
[docs] def test_unregistered_datasets_content(tmpdir, icat_main_client): client, messages = icat_main_client root_dir = str(tmpdir) _ = _init_unregistered_experiment(client, messages, root_dir, "hg333", "id99") exp_session = sync_raw.sync_session( client, "hg333", "id99", "20231019", root_dir=root_dir, raw_data_format="esrfv3" ) unregistered = [dset.as_dict() for dset in exp_session.datasets["unregistered"]] date_to_string = _astimezone_string(datetime.timezone.utc, iso=True) for dataset in unregistered: dataset["path"] = os.path.relpath(dataset["path"], root_dir) dataset["raw_root_dir"] = os.path.relpath(dataset["raw_root_dir"], root_dir) dataset["startdate"] = date_to_string(dataset["startdate"]) dataset["enddate"] = date_to_string(dataset["enddate"]) metadata = dataset["metadata"] metadata["startDate"] = date_to_string(metadata["startDate"]) metadata["endDate"] = date_to_string(metadata["endDate"]) expected = [ { "beamline": "id99", "enddate": "2023-10-19T21:00:00+00:00", "icat_dataset": None, "metadata": { "Sample_name": "too_soon_sample", "endDate": "2023-10-19T21:00:00+00:00", "startDate": "2023-10-19T05:00:00+00:00", "investigationId": 0, }, "name": "0001", "path": "hg333/id99/20231019/RAW_DATA/too_soon_collection/too_soon_collection_0001", "proposal": "hg333", "raw_root_dir": "hg333/id99/20231019/RAW_DATA", "startdate": "2023-10-19T05:00:00+00:00", "status_reason": [ "started 1:00:00 before the start of the session", ], }, { "beamline": "id99", "enddate": "2023-10-20T16:00:00+00:00", "icat_dataset": None, "metadata": { "Sample_name": "first_sample", "endDate": "2023-10-20T16:00:00+00:00", "startDate": "2023-10-20T00:00:00+00:00", "investigationId": 0, }, "name": "0001", "path": "hg333/id99/20231019/RAW_DATA/first_collection/first_collection_0001", "proposal": "hg333", "raw_root_dir": "hg333/id99/20231019/RAW_DATA", "startdate": "2023-10-20T00:00:00+00:00", "status_reason": [], }, { "beamline": "id99", "enddate": "2023-10-21T09:00:00+00:00", "icat_dataset": None, "metadata": { "Sample_name": "first_sample", "endDate": "2023-10-21T09:00:00+00:00", "startDate": "2023-10-20T17:00:00+00:00", "investigationId": 0, }, "name": "0002", "path": "hg333/id99/20231019/RAW_DATA/first_collection/first_collection_0002", "proposal": "hg333", "raw_root_dir": "hg333/id99/20231019/RAW_DATA", "startdate": "2023-10-20T17:00:00+00:00", "status_reason": [], }, { "beamline": "id99", "enddate": "2023-10-22T19:00:00+00:00", "icat_dataset": None, "metadata": { "Sample_name": "second_sample", "endDate": "2023-10-22T19:00:00+00:00", "startDate": "2023-10-22T03:00:00+00:00", "investigationId": 0, }, "name": "0001", "path": "hg333/id99/20231019/RAW_DATA/second_collection/second_collection_0001", "proposal": "hg333", "raw_root_dir": "hg333/id99/20231019/RAW_DATA", "startdate": "2023-10-22T03:00:00+00:00", "status_reason": [], }, { "beamline": "id99", "enddate": "2023-10-23T12:00:00+00:00", "icat_dataset": None, "metadata": { "Sample_name": "second_sample", "endDate": "2023-10-23T12:00:00+00:00", "startDate": "2023-10-22T20:00:00+00:00", "investigationId": 0, }, "name": "0002", "path": "hg333/id99/20231019/RAW_DATA/second_collection/second_collection_0002", "proposal": "hg333", "raw_root_dir": "hg333/id99/20231019/RAW_DATA", "startdate": "2023-10-22T20:00:00+00:00", "status_reason": [], }, { "beamline": "id99", "enddate": "2023-10-24T07:00:00+00:00", "icat_dataset": None, "metadata": { "Sample_name": "too_late_collection", "endDate": "2023-10-24T07:00:00+00:00", "startDate": "2023-10-23T13:00:00+00:00", "investigationId": 0, }, "name": "0001", "path": "hg333/id99/20231019/RAW_DATA/too_late_collection/too_late_collection_0001", "proposal": "hg333", "raw_root_dir": "hg333/id99/20231019/RAW_DATA", "startdate": "2023-10-23T13:00:00+00:00", "status_reason": [ "ended 1:00:00 after the end of the session", ], }, ] for dataset in expected: for k in ("path", "raw_root_dir"): dataset[k] = dataset[k].replace("/", os.path.sep) assert unregistered == expected
[docs] def test_missing_files_datasets(tmpdir, icat_main_client, icat_add_files_client): client, messages = icat_main_client client_add, messages_add = icat_add_files_client root_dir = str(tmpdir) _ = _init_empty_files_experiment(client, messages, root_dir, "md444", "id99") args = client, "md444", "id99", "20230201" kwargs = {"root_dir": root_dir, "raw_data_format": "esrfv3"} exp_session = sync_raw.sync_session(*args, **kwargs) assert len(exp_session.datasets["registered"]) == 1 assert len(exp_session.datasets["registered_without_files"]) == 1 assert len(exp_session.datasets["unregistered"]) == 0 for dataset in exp_session.datasets["registered_without_files"]: client_add.add_files(dataset.icat_dataset.icat_dataset_id) messages_add.get(timeout=10) exp_session = sync_raw.sync_session(*args, **kwargs) assert len(exp_session.datasets["registered"]) == 2 assert len(exp_session.datasets["registered_without_files"]) == 0 assert len(exp_session.datasets["unregistered"]) == 0
[docs] def test_session_serialization(tmpdir, icat_main_client): client, messages = icat_main_client root_dir = str(tmpdir) _ = _init_unregistered_experiment(client, messages, root_dir, "hg333", "id99") args = client, "hg333", "id99", "20231019" kwargs = {"root_dir": root_dir, "raw_data_format": "esrfv3"} exp_session = sync_raw.sync_session(*args, **kwargs) exp_session_copy = exp_session.from_dict(exp_session.as_dict()) assert exp_session == exp_session_copy
[docs] def test_session_pprint(caplog, tmpdir, icat_main_client): client, messages = icat_main_client root_dir = str(tmpdir) _ = _init_unregistered_experiment(client, messages, root_dir, "hg333", "id99") args = client, "hg333", "id99", "20231019" kwargs = {"root_dir": root_dir, "raw_data_format": "esrfv3"} exp_session = sync_raw.sync_session(*args, **kwargs) for dataset in exp_session.datasets["unregistered"][:3]: _store_dataset(client, dataset) messages.get(timeout=10) exp_session = sync_raw.sync_session(*args, **kwargs) date_to_string = _astimezone_string( exp_session.icat_investigation.startdate.tzinfo, iso=False ) startdate = date_to_string(exp_session.icat_investigation.startdate) enddate = date_to_string(exp_session.icat_investigation.enddate) expected = f""" -------------------------------- Registered datasets: /hg333/id99/20231019/RAW_DATA/too_soon_collection/too_soon_collection_0001/ /hg333/id99/20231019/RAW_DATA/first_collection/first_collection_0001/ /hg333/id99/20231019/RAW_DATA/first_collection/first_collection_0002/ Invalid datasets: /hg333/id99/20231019/RAW_DATA/empty_file/empty_file_0001/ Cannot extract dataset metadata: HDF5 reading error (HDF5 file not created by Bliss) Unregistered datasets: /hg333/id99/20231019/RAW_DATA/second_collection/second_collection_0001/ /hg333/id99/20231019/RAW_DATA/second_collection/second_collection_0002/ /hg333/id99/20231019/RAW_DATA/too_late_collection/too_late_collection_0001/ ended 1:00:00 after the end of the session Datasets (TODO): 3 registered 1 invalid 0 not uploaded 3 unregistered 0 registered without files Directory: /hg333/id99/20231019/RAW_DATA/ Start time: 2023-10-19 Investigation ID: 0 URL: https://data.esrf.fr/investigation/0/datasets Search URL: https://data.esrf.fr/beamline/id99?search=hg333 Start time: {startdate} <REPLACED> End time: {enddate} <REPLACED> Duration: 5 days, 0:00:00 -------------------------------- """ expected = re.sub( r"(/hg333[^\n]*)", lambda match: match.group(0).replace("/", os.path.sep), expected, ) caplog.set_level(logging.DEBUG, logger="ICAT SYNC") exp_session.pprint() captured = "\n".join(caplog.messages) captured = captured.replace(root_dir, "") # Remove the sub-strings that depend on the time this test is run captured = re.sub(r"\(.*ago\)", "<REPLACED>", captured) assert captured == expected
class _ExperimentalSession(sync_types.ExperimentalSession): def create_dataset( self, collection: str, dataset: str, sample_name: str, startdate: datetime.datetime, duration: datetime.timedelta, status_reason: Tuple[str] = tuple(), has_content: bool = True, ) -> sync_types.Dataset: """Create Bliss dataset directory with HDF5 file. The file contains 3 scans or nothing.""" with _init_dataset(self.raw_root_dir, collection, dataset) as nxroot: dataset_dir = os.path.dirname(nxroot.filename) if has_content: nxroot.attrs["creator"] = "Bliss" nxroot.attrs["file_time"] = startdate.isoformat() now, scan_duration, deadtime = _chunk_duration( startdate, startdate + duration, 3 ) enddate = _save_scan(nxroot, now, scan_duration, sample_name) now += scan_duration + deadtime enddate = _save_scan( nxroot, now, scan_duration, sample_name, failed=True ) now += scan_duration + deadtime enddate = _save_scan(nxroot, now, scan_duration, sample_name) time.sleep(0.050) # sync_raw sorts by creation time if has_content: medata_startdate = startdate medata_enddate = enddate metadata = { "Sample_name": sample_name, "startDate": medata_startdate, "endDate": medata_enddate, "investigationId": 0, } else: startdate = None enddate = None metadata = None dataset = sync_types.Dataset( path=dataset_dir, proposal=self.proposal, beamline=self.beamline, name=dataset, raw_root_dir=self.raw_root_dir, status_reason=list(status_reason), startdate=startdate, enddate=enddate, metadata=metadata, ) if has_content: assert dataset.get_status() == "unregistered" else: assert dataset.get_status() == "invalid" self.add_dataset(dataset) return dataset def _init_unregistered_experiment( client: IcatClient, messages, root_dir: str, proposal: str, beamline: str ) -> _ExperimentalSession: """Create several datasets on disk: - 1 normal dataset starting after the startdate (uregistered) - 2 normal datasets (uregistered) - 1 dataset with an empty file (uregistered) - 2 normal datasets (uregistered) - 1 normal dataset ending after the enddate (uregistered) """ startdate = _as_datetime("2023-10-19T08:00:00+02:00") enddate = startdate + datetime.timedelta(days=5) exp_session = _start_investigation( client, messages, root_dir, proposal, beamline, startdate, enddate ) now, dataset_duration, deadtime = _chunk_duration( startdate, enddate, 7, deadtime=3600 ) # Dataset starts before the startdate exp_session.create_dataset( "too_soon_collection", "0001", "too_soon_sample", now - 2 * deadtime, dataset_duration, ["started 1:00:00 before the start of the session"], ) now += dataset_duration + deadtime # Normal _ = exp_session.create_dataset( "first_collection", "0001", "first_sample", now, dataset_duration ) now += dataset_duration + deadtime # Normal _ = exp_session.create_dataset( "first_collection", "0002", "first_sample", now, dataset_duration ) now += dataset_duration + deadtime # Empty HDF5 file _ = exp_session.create_dataset( "empty_file", "0001", "empty_sample", now, dataset_duration, has_content=False, status_reason=[ "Cannot extract dataset metadata: HDF5 reading error (HDF5 file not created by Bliss)" ], ) now += dataset_duration + deadtime # Normal _ = exp_session.create_dataset( "second_collection", "0001", "second_sample", now, dataset_duration ) now += dataset_duration + deadtime # Normal _ = exp_session.create_dataset( "second_collection", "0002", "second_sample", now, dataset_duration ) now += dataset_duration + deadtime # Dataset ends after the enddate _ = exp_session.create_dataset( "too_late_collection", "0001", "too_late_collection", now, dataset_duration + 2 * deadtime, ["ended 1:00:00 after the end of the session"], ) return exp_session def _init_empty_files_experiment( client: IcatClient, messages, root_dir: str, proposal: str, beamline: str ): """Create several datasets on disk: - 1 normal dataset (registered) - 1 normal dataset (registered without files) """ startdate = _as_datetime("2023-02-01T08:00:00+01:00") enddate = startdate + datetime.timedelta(days=5) exp_session = _start_investigation( client, messages, root_dir, proposal, beamline, startdate, enddate ) now, dataset_duration, deadtime = _chunk_duration( startdate, enddate, 2, deadtime=3600 ) # Normal dataset = exp_session.create_dataset( "collection_normal", "0001", "sample_normal", now, dataset_duration ) _store_dataset(client, dataset) messages.get(timeout=10) now += dataset_duration + deadtime # Directory is empty when the file count is calculated (on the ICAT server side) dataset = exp_session.create_dataset( "collection_missing", "0002", "sample_missing", now, dataset_duration ) with _temporary_empty_directory(dataset.path): _store_dataset(client, dataset) messages.get(timeout=10) return exp_session @contextmanager def _temporary_empty_directory(directory_path: str) -> Generator[None, None, None]: original_name = path_utils.basename(directory_path) new_name = f"{original_name}_tmp" renamed_path = os.path.join(path_utils.dirname(directory_path), new_name) try: os.rename(directory_path, renamed_path) os.makedirs(directory_path) yield finally: os.rmdir(directory_path) os.rename(renamed_path, directory_path) def _start_investigation( client: IcatClient, messages, root_dir: str, proposal: str, beamline: str, startdate: datetime.datetime, enddate: datetime.datetime, ) -> _ExperimentalSession: session = startdate.strftime("%Y%m%d") session_dir = path_utils.markdir( os.path.join(root_dir, proposal, beamline, session) ) raw_root_dir = path_utils.markdir(os.path.join(session_dir, "RAW_DATA")) os.makedirs(raw_root_dir, exist_ok=True) exp_session = _ExperimentalSession( session_dir=session_dir, raw_root_dir=raw_root_dir, raw_data_format="esrfv3", proposal=proposal, beamline=beamline, session=session, startdate=startdate.date(), search_url=f"https://data.esrf.fr/beamline/{beamline}?search={proposal}", datasets=dict(), ) client.start_investigation( beamline=beamline, proposal=proposal, start_datetime=startdate, end_datetime=enddate, ) messages.get(timeout=10) exp_session.icat_investigation = sync_types.IcatInvestigation( id=0, url="https://data.esrf.fr/investigation/0/datasets", search_url=f"https://data.esrf.fr/beamline/{beamline}?search={proposal}", registered_datasets=list(), startdate=startdate, enddate=enddate, ) return exp_session def _store_dataset(client: IcatClient, dataset: sync_types.Dataset) -> None: client.store_dataset( beamline=dataset.beamline, proposal=dataset.proposal, dataset=dataset.name, path=dataset.path, metadata=dataset.metadata, ) def _chunk_duration( startdate: datetime.datetime, enddate: datetime.datetime, nchunks: int, deadtime: int = 0, ): deadtime = datetime.timedelta(seconds=deadtime) total_deadtime = (nchunks + 1) * deadtime total_time = enddate - startdate chunk_duration = (total_time - total_deadtime) / nchunks now = startdate + deadtime return now, chunk_duration, deadtime def _save_scan( nxroot: h5py.File, startdate: datetime.datetime, duration: datetime.timedelta, sample_name: str, failed: bool = False, ) -> Optional[datetime.datetime]: scans = list(nxroot) if scans: scan = max(map(int, map(float, scans))) + 1 else: scan = 1 name = f"{scan}.1" grp = nxroot.create_group(name) grp["start_time"] = startdate.isoformat() if failed: return enddate = startdate + duration grp["end_time"] = enddate.isoformat() grp["sample/name"] = sample_name return enddate @contextmanager def _init_dataset( raw_root_dir: str, collection: str, dataset: str, ) -> Generator[h5py.File, None, None]: basename = f"{collection}_{dataset}" dataset_dir = os.path.join(raw_root_dir, collection, basename) os.makedirs(dataset_dir, exist_ok=True) filename = os.path.basename(dataset_dir) + ".h5" with h5py.File(os.path.join(dataset_dir, filename), mode="w") as f: yield f def _as_datetime(isoformat: str) -> datetime.datetime: return datetime.datetime.fromisoformat(isoformat).astimezone() def _astimezone_string(tzinfo, iso: bool = True) -> Callable[[datetime.datetime], str]: def astimezone_string(dt: datetime.datetime) -> str: dt = dt.astimezone(tz=tzinfo) if iso: return dt.isoformat() return str(dt) return astimezone_string