import datetime
import logging
import os
import re
import time
from contextlib import contextmanager
from typing import Callable
from typing import Generator
from typing import Optional
from typing import Tuple
import h5py
from ..apps import sync_raw
from ..client.main import IcatClient
from ..utils import path_utils
from ..utils import sync_types
[docs]
def test_unregistered_datasets(tmpdir, icat_main_client):
client, messages = icat_main_client
root_dir = str(tmpdir)
exp_session_test = _init_unregistered_experiment(
client, messages, root_dir, "hg333", "id99"
)
args = client, "hg333", "id99", "20231019"
kwargs = {"root_dir": root_dir, "raw_data_format": "esrfv3"}
exp_session = sync_raw.sync_session(*args, **kwargs)
assert len(exp_session.datasets["registered"]) == 0
assert len(exp_session.datasets["unregistered"]) == 6
if exp_session != exp_session_test:
assert exp_session.as_dict() == exp_session_test.as_dict()
first_datasets = exp_session_test.datasets["unregistered"][:3]
last_datasets = exp_session_test.datasets["unregistered"][3:]
for dataset in first_datasets:
_store_dataset(client, dataset)
messages.get(timeout=10)
exp_session = sync_raw.sync_session(*args, **kwargs)
assert len(exp_session.datasets["registered"]) == 3
assert len(exp_session.datasets["unregistered"]) == 3
for dataset in last_datasets:
_store_dataset(client, dataset)
messages.get(timeout=10)
exp_session = sync_raw.sync_session(*args, **kwargs)
assert len(exp_session.datasets["registered"]) == 6
assert len(exp_session.datasets["unregistered"]) == 0
[docs]
def test_unregistered_datasets_content(tmpdir, icat_main_client):
client, messages = icat_main_client
root_dir = str(tmpdir)
_ = _init_unregistered_experiment(client, messages, root_dir, "hg333", "id99")
exp_session = sync_raw.sync_session(
client, "hg333", "id99", "20231019", root_dir=root_dir, raw_data_format="esrfv3"
)
unregistered = [dset.as_dict() for dset in exp_session.datasets["unregistered"]]
date_to_string = _astimezone_string(datetime.timezone.utc, iso=True)
for dataset in unregistered:
dataset["path"] = os.path.relpath(dataset["path"], root_dir)
dataset["raw_root_dir"] = os.path.relpath(dataset["raw_root_dir"], root_dir)
dataset["startdate"] = date_to_string(dataset["startdate"])
dataset["enddate"] = date_to_string(dataset["enddate"])
metadata = dataset["metadata"]
metadata["startDate"] = date_to_string(metadata["startDate"])
metadata["endDate"] = date_to_string(metadata["endDate"])
expected = [
{
"beamline": "id99",
"enddate": "2023-10-19T21:00:00+00:00",
"icat_dataset": None,
"metadata": {
"Sample_name": "too_soon_sample",
"endDate": "2023-10-19T21:00:00+00:00",
"startDate": "2023-10-19T05:00:00+00:00",
"investigationId": 0,
},
"name": "0001",
"path": "hg333/id99/20231019/RAW_DATA/too_soon_collection/too_soon_collection_0001",
"proposal": "hg333",
"raw_root_dir": "hg333/id99/20231019/RAW_DATA",
"startdate": "2023-10-19T05:00:00+00:00",
"status_reason": [
"started 1:00:00 before the start of the session",
],
},
{
"beamline": "id99",
"enddate": "2023-10-20T16:00:00+00:00",
"icat_dataset": None,
"metadata": {
"Sample_name": "first_sample",
"endDate": "2023-10-20T16:00:00+00:00",
"startDate": "2023-10-20T00:00:00+00:00",
"investigationId": 0,
},
"name": "0001",
"path": "hg333/id99/20231019/RAW_DATA/first_collection/first_collection_0001",
"proposal": "hg333",
"raw_root_dir": "hg333/id99/20231019/RAW_DATA",
"startdate": "2023-10-20T00:00:00+00:00",
"status_reason": [],
},
{
"beamline": "id99",
"enddate": "2023-10-21T09:00:00+00:00",
"icat_dataset": None,
"metadata": {
"Sample_name": "first_sample",
"endDate": "2023-10-21T09:00:00+00:00",
"startDate": "2023-10-20T17:00:00+00:00",
"investigationId": 0,
},
"name": "0002",
"path": "hg333/id99/20231019/RAW_DATA/first_collection/first_collection_0002",
"proposal": "hg333",
"raw_root_dir": "hg333/id99/20231019/RAW_DATA",
"startdate": "2023-10-20T17:00:00+00:00",
"status_reason": [],
},
{
"beamline": "id99",
"enddate": "2023-10-22T19:00:00+00:00",
"icat_dataset": None,
"metadata": {
"Sample_name": "second_sample",
"endDate": "2023-10-22T19:00:00+00:00",
"startDate": "2023-10-22T03:00:00+00:00",
"investigationId": 0,
},
"name": "0001",
"path": "hg333/id99/20231019/RAW_DATA/second_collection/second_collection_0001",
"proposal": "hg333",
"raw_root_dir": "hg333/id99/20231019/RAW_DATA",
"startdate": "2023-10-22T03:00:00+00:00",
"status_reason": [],
},
{
"beamline": "id99",
"enddate": "2023-10-23T12:00:00+00:00",
"icat_dataset": None,
"metadata": {
"Sample_name": "second_sample",
"endDate": "2023-10-23T12:00:00+00:00",
"startDate": "2023-10-22T20:00:00+00:00",
"investigationId": 0,
},
"name": "0002",
"path": "hg333/id99/20231019/RAW_DATA/second_collection/second_collection_0002",
"proposal": "hg333",
"raw_root_dir": "hg333/id99/20231019/RAW_DATA",
"startdate": "2023-10-22T20:00:00+00:00",
"status_reason": [],
},
{
"beamline": "id99",
"enddate": "2023-10-24T07:00:00+00:00",
"icat_dataset": None,
"metadata": {
"Sample_name": "too_late_collection",
"endDate": "2023-10-24T07:00:00+00:00",
"startDate": "2023-10-23T13:00:00+00:00",
"investigationId": 0,
},
"name": "0001",
"path": "hg333/id99/20231019/RAW_DATA/too_late_collection/too_late_collection_0001",
"proposal": "hg333",
"raw_root_dir": "hg333/id99/20231019/RAW_DATA",
"startdate": "2023-10-23T13:00:00+00:00",
"status_reason": [
"ended 1:00:00 after the end of the session",
],
},
]
for dataset in expected:
for k in ("path", "raw_root_dir"):
dataset[k] = dataset[k].replace("/", os.path.sep)
assert unregistered == expected
[docs]
def test_missing_files_datasets(tmpdir, icat_main_client, icat_add_files_client):
client, messages = icat_main_client
client_add, messages_add = icat_add_files_client
root_dir = str(tmpdir)
_ = _init_empty_files_experiment(client, messages, root_dir, "md444", "id99")
args = client, "md444", "id99", "20230201"
kwargs = {"root_dir": root_dir, "raw_data_format": "esrfv3"}
exp_session = sync_raw.sync_session(*args, **kwargs)
assert len(exp_session.datasets["registered"]) == 1
assert len(exp_session.datasets["registered_without_files"]) == 1
assert len(exp_session.datasets["unregistered"]) == 0
for dataset in exp_session.datasets["registered_without_files"]:
client_add.add_files(dataset.icat_dataset.icat_dataset_id)
messages_add.get(timeout=10)
exp_session = sync_raw.sync_session(*args, **kwargs)
assert len(exp_session.datasets["registered"]) == 2
assert len(exp_session.datasets["registered_without_files"]) == 0
assert len(exp_session.datasets["unregistered"]) == 0
[docs]
def test_session_serialization(tmpdir, icat_main_client):
client, messages = icat_main_client
root_dir = str(tmpdir)
_ = _init_unregistered_experiment(client, messages, root_dir, "hg333", "id99")
args = client, "hg333", "id99", "20231019"
kwargs = {"root_dir": root_dir, "raw_data_format": "esrfv3"}
exp_session = sync_raw.sync_session(*args, **kwargs)
exp_session_copy = exp_session.from_dict(exp_session.as_dict())
assert exp_session == exp_session_copy
[docs]
def test_session_pprint(caplog, tmpdir, icat_main_client):
client, messages = icat_main_client
root_dir = str(tmpdir)
_ = _init_unregistered_experiment(client, messages, root_dir, "hg333", "id99")
args = client, "hg333", "id99", "20231019"
kwargs = {"root_dir": root_dir, "raw_data_format": "esrfv3"}
exp_session = sync_raw.sync_session(*args, **kwargs)
for dataset in exp_session.datasets["unregistered"][:3]:
_store_dataset(client, dataset)
messages.get(timeout=10)
exp_session = sync_raw.sync_session(*args, **kwargs)
date_to_string = _astimezone_string(
exp_session.icat_investigation.startdate.tzinfo, iso=False
)
startdate = date_to_string(exp_session.icat_investigation.startdate)
enddate = date_to_string(exp_session.icat_investigation.enddate)
expected = f"""
--------------------------------
Registered datasets:
/hg333/id99/20231019/RAW_DATA/too_soon_collection/too_soon_collection_0001/
/hg333/id99/20231019/RAW_DATA/first_collection/first_collection_0001/
/hg333/id99/20231019/RAW_DATA/first_collection/first_collection_0002/
Invalid datasets:
/hg333/id99/20231019/RAW_DATA/empty_file/empty_file_0001/
Cannot extract dataset metadata: HDF5 reading error (HDF5 file not created by Bliss)
Unregistered datasets:
/hg333/id99/20231019/RAW_DATA/second_collection/second_collection_0001/
/hg333/id99/20231019/RAW_DATA/second_collection/second_collection_0002/
/hg333/id99/20231019/RAW_DATA/too_late_collection/too_late_collection_0001/
ended 1:00:00 after the end of the session
Datasets (TODO):
3 registered
1 invalid
0 not uploaded
3 unregistered
0 registered without files
Directory: /hg333/id99/20231019/RAW_DATA/
Start time: 2023-10-19
Investigation ID: 0
URL: https://data.esrf.fr/investigation/0/datasets
Search URL: https://data.esrf.fr/beamline/id99?search=hg333
Start time: {startdate} <REPLACED>
End time: {enddate} <REPLACED>
Duration: 5 days, 0:00:00
--------------------------------
"""
expected = re.sub(
r"(/hg333[^\n]*)",
lambda match: match.group(0).replace("/", os.path.sep),
expected,
)
caplog.set_level(logging.DEBUG, logger="ICAT SYNC")
exp_session.pprint()
captured = "\n".join(caplog.messages)
captured = captured.replace(root_dir, "")
# Remove the sub-strings that depend on the time this test is run
captured = re.sub(r"\(.*ago\)", "<REPLACED>", captured)
assert captured == expected
class _ExperimentalSession(sync_types.ExperimentalSession):
def create_dataset(
self,
collection: str,
dataset: str,
sample_name: str,
startdate: datetime.datetime,
duration: datetime.timedelta,
status_reason: Tuple[str] = tuple(),
has_content: bool = True,
) -> sync_types.Dataset:
"""Create Bliss dataset directory with HDF5 file. The file contains 3 scans or nothing."""
with _init_dataset(self.raw_root_dir, collection, dataset) as nxroot:
dataset_dir = os.path.dirname(nxroot.filename)
if has_content:
nxroot.attrs["creator"] = "Bliss"
nxroot.attrs["file_time"] = startdate.isoformat()
now, scan_duration, deadtime = _chunk_duration(
startdate, startdate + duration, 3
)
enddate = _save_scan(nxroot, now, scan_duration, sample_name)
now += scan_duration + deadtime
enddate = _save_scan(
nxroot, now, scan_duration, sample_name, failed=True
)
now += scan_duration + deadtime
enddate = _save_scan(nxroot, now, scan_duration, sample_name)
time.sleep(0.050) # sync_raw sorts by creation time
if has_content:
medata_startdate = startdate
medata_enddate = enddate
metadata = {
"Sample_name": sample_name,
"startDate": medata_startdate,
"endDate": medata_enddate,
"investigationId": 0,
}
else:
startdate = None
enddate = None
metadata = None
dataset = sync_types.Dataset(
path=dataset_dir,
proposal=self.proposal,
beamline=self.beamline,
name=dataset,
raw_root_dir=self.raw_root_dir,
status_reason=list(status_reason),
startdate=startdate,
enddate=enddate,
metadata=metadata,
)
if has_content:
assert dataset.get_status() == "unregistered"
else:
assert dataset.get_status() == "invalid"
self.add_dataset(dataset)
return dataset
def _init_unregistered_experiment(
client: IcatClient, messages, root_dir: str, proposal: str, beamline: str
) -> _ExperimentalSession:
"""Create several datasets on disk:
- 1 normal dataset starting after the startdate (uregistered)
- 2 normal datasets (uregistered)
- 1 dataset with an empty file (uregistered)
- 2 normal datasets (uregistered)
- 1 normal dataset ending after the enddate (uregistered)
"""
startdate = _as_datetime("2023-10-19T08:00:00+02:00")
enddate = startdate + datetime.timedelta(days=5)
exp_session = _start_investigation(
client, messages, root_dir, proposal, beamline, startdate, enddate
)
now, dataset_duration, deadtime = _chunk_duration(
startdate, enddate, 7, deadtime=3600
)
# Dataset starts before the startdate
exp_session.create_dataset(
"too_soon_collection",
"0001",
"too_soon_sample",
now - 2 * deadtime,
dataset_duration,
["started 1:00:00 before the start of the session"],
)
now += dataset_duration + deadtime
# Normal
_ = exp_session.create_dataset(
"first_collection", "0001", "first_sample", now, dataset_duration
)
now += dataset_duration + deadtime
# Normal
_ = exp_session.create_dataset(
"first_collection", "0002", "first_sample", now, dataset_duration
)
now += dataset_duration + deadtime
# Empty HDF5 file
_ = exp_session.create_dataset(
"empty_file",
"0001",
"empty_sample",
now,
dataset_duration,
has_content=False,
status_reason=[
"Cannot extract dataset metadata: HDF5 reading error (HDF5 file not created by Bliss)"
],
)
now += dataset_duration + deadtime
# Normal
_ = exp_session.create_dataset(
"second_collection", "0001", "second_sample", now, dataset_duration
)
now += dataset_duration + deadtime
# Normal
_ = exp_session.create_dataset(
"second_collection", "0002", "second_sample", now, dataset_duration
)
now += dataset_duration + deadtime
# Dataset ends after the enddate
_ = exp_session.create_dataset(
"too_late_collection",
"0001",
"too_late_collection",
now,
dataset_duration + 2 * deadtime,
["ended 1:00:00 after the end of the session"],
)
return exp_session
def _init_empty_files_experiment(
client: IcatClient, messages, root_dir: str, proposal: str, beamline: str
):
"""Create several datasets on disk:
- 1 normal dataset (registered)
- 1 normal dataset (registered without files)
"""
startdate = _as_datetime("2023-02-01T08:00:00+01:00")
enddate = startdate + datetime.timedelta(days=5)
exp_session = _start_investigation(
client, messages, root_dir, proposal, beamline, startdate, enddate
)
now, dataset_duration, deadtime = _chunk_duration(
startdate, enddate, 2, deadtime=3600
)
# Normal
dataset = exp_session.create_dataset(
"collection_normal", "0001", "sample_normal", now, dataset_duration
)
_store_dataset(client, dataset)
messages.get(timeout=10)
now += dataset_duration + deadtime
# Directory is empty when the file count is calculated (on the ICAT server side)
dataset = exp_session.create_dataset(
"collection_missing", "0002", "sample_missing", now, dataset_duration
)
with _temporary_empty_directory(dataset.path):
_store_dataset(client, dataset)
messages.get(timeout=10)
return exp_session
@contextmanager
def _temporary_empty_directory(directory_path: str) -> Generator[None, None, None]:
original_name = path_utils.basename(directory_path)
new_name = f"{original_name}_tmp"
renamed_path = os.path.join(path_utils.dirname(directory_path), new_name)
try:
os.rename(directory_path, renamed_path)
os.makedirs(directory_path)
yield
finally:
os.rmdir(directory_path)
os.rename(renamed_path, directory_path)
def _start_investigation(
client: IcatClient,
messages,
root_dir: str,
proposal: str,
beamline: str,
startdate: datetime.datetime,
enddate: datetime.datetime,
) -> _ExperimentalSession:
session = startdate.strftime("%Y%m%d")
session_dir = path_utils.markdir(
os.path.join(root_dir, proposal, beamline, session)
)
raw_root_dir = path_utils.markdir(os.path.join(session_dir, "RAW_DATA"))
os.makedirs(raw_root_dir, exist_ok=True)
exp_session = _ExperimentalSession(
session_dir=session_dir,
raw_root_dir=raw_root_dir,
raw_data_format="esrfv3",
proposal=proposal,
beamline=beamline,
session=session,
startdate=startdate.date(),
search_url=f"https://data.esrf.fr/beamline/{beamline}?search={proposal}",
datasets=dict(),
)
client.start_investigation(
beamline=beamline,
proposal=proposal,
start_datetime=startdate,
end_datetime=enddate,
)
messages.get(timeout=10)
exp_session.icat_investigation = sync_types.IcatInvestigation(
id=0,
url="https://data.esrf.fr/investigation/0/datasets",
search_url=f"https://data.esrf.fr/beamline/{beamline}?search={proposal}",
registered_datasets=list(),
startdate=startdate,
enddate=enddate,
)
return exp_session
def _store_dataset(client: IcatClient, dataset: sync_types.Dataset) -> None:
client.store_dataset(
beamline=dataset.beamline,
proposal=dataset.proposal,
dataset=dataset.name,
path=dataset.path,
metadata=dataset.metadata,
)
def _chunk_duration(
startdate: datetime.datetime,
enddate: datetime.datetime,
nchunks: int,
deadtime: int = 0,
):
deadtime = datetime.timedelta(seconds=deadtime)
total_deadtime = (nchunks + 1) * deadtime
total_time = enddate - startdate
chunk_duration = (total_time - total_deadtime) / nchunks
now = startdate + deadtime
return now, chunk_duration, deadtime
def _save_scan(
nxroot: h5py.File,
startdate: datetime.datetime,
duration: datetime.timedelta,
sample_name: str,
failed: bool = False,
) -> Optional[datetime.datetime]:
scans = list(nxroot)
if scans:
scan = max(map(int, map(float, scans))) + 1
else:
scan = 1
name = f"{scan}.1"
grp = nxroot.create_group(name)
grp["start_time"] = startdate.isoformat()
if failed:
return
enddate = startdate + duration
grp["end_time"] = enddate.isoformat()
grp["sample/name"] = sample_name
return enddate
@contextmanager
def _init_dataset(
raw_root_dir: str,
collection: str,
dataset: str,
) -> Generator[h5py.File, None, None]:
basename = f"{collection}_{dataset}"
dataset_dir = os.path.join(raw_root_dir, collection, basename)
os.makedirs(dataset_dir, exist_ok=True)
filename = os.path.basename(dataset_dir) + ".h5"
with h5py.File(os.path.join(dataset_dir, filename), mode="w") as f:
yield f
def _as_datetime(isoformat: str) -> datetime.datetime:
return datetime.datetime.fromisoformat(isoformat).astimezone()
def _astimezone_string(tzinfo, iso: bool = True) -> Callable[[datetime.datetime], str]:
def astimezone_string(dt: datetime.datetime) -> str:
dt = dt.astimezone(tz=tzinfo)
if iso:
return dt.isoformat()
return str(dt)
return astimezone_string