import json
import logging
import os
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
import h5py
from esrf_pathlib import ESRFPath
from . import path_utils
_DATA_FORMATS = ("esrfv1", "esrfv2", "esrfv3", "id16bspec", "mx")
MX_METADATA_FILENAME = "metadata.json"
logger = logging.getLogger(__name__)
[docs]
def get_session_dir(
proposal: str,
beamline: str,
session: str,
root_dir: Optional[str] = None,
raw_data_format: str = "esrfv3",
) -> str:
"""Get the session directory from the proposal, beamlines and session name."""
if raw_data_format not in _DATA_FORMATS:
_raise_raw_data_format_error(raw_data_format)
if "*" in (proposal, beamline, session):
if root_dir is None:
root_dir = os.path.join(os.sep, "data", "visitor")
return os.path.join(root_dir, proposal, beamline, session)
path = ESRFPath.from_fields(
data_root=root_dir,
proposal=proposal,
beamline=beamline,
session_date=session,
)
return str(path)
[docs]
def parse_session_dir(
session_dir: str, raw_data_format: str = "esrfv3"
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Get proposal, beamline and session name from the session directory."""
try:
path = ESRFPath(session_dir)
proposal = path.proposal
beamline = path.beamline_normalized
session = path.session_date.strftime("%Y%m%d")
except (ValueError, TypeError, AttributeError, RuntimeError):
logger.warning("Not a valid session directory: %r", session_dir)
return None, None, None
return proposal, beamline, session
[docs]
def get_raw_data_dir(session_dir: str, raw_data_format: str = "esrfv3") -> str:
"""Get the raw data directory from proposal, beamline and session name.
This is the directory when Bliss saves the raw data.
"""
if raw_data_format in ("esrfv3", "id16bspec", "mx"):
return path_utils.markdir(os.path.join(session_dir, "RAW_DATA"))
if raw_data_format == "esrfv2":
return path_utils.markdir(os.path.join(session_dir, "raw"))
if raw_data_format == "esrfv1":
return path_utils.markdir(session_dir)
_raise_raw_data_format_error(raw_data_format)
[docs]
def get_dataset_filters(
raw_root_dir: str, raw_data_format: str = "esrfv3"
) -> List[str]:
"""Get the dataset directory search filters from the raw data directory."""
if raw_data_format in ("esrfv1", "esrfv2", "esrfv3", "id16bspec"):
return [path_utils.markdir(os.path.join(raw_root_dir, "*", "*"))]
elif raw_data_format == "mx":
filters = []
for root, dirs, files in os.walk(raw_root_dir):
if MX_METADATA_FILENAME in files:
filters.append(path_utils.markdir(root))
return filters
_raise_raw_data_format_error(raw_data_format)
[docs]
def get_raw_dataset_name(
dataset_dir: str, raw_data_format: str = "esrfv3"
) -> Optional[str]:
"""Get the raw data dataset name from the dataset directory."""
if raw_data_format in ("esrfv1", "esrfv2", "esrfv3"):
collection, collection_dataset = path_utils.split(dataset_dir)[-2:]
if not collection_dataset.startswith(collection):
return None
dataset_name = collection_dataset[len(collection) + 1 :]
if not dataset_name:
return None
return dataset_name
if raw_data_format in ("id16bspec", "mx"):
return path_utils.split(dataset_dir)[-1]
_raise_raw_data_format_error(raw_data_format)
def _raise_raw_data_format_error(raw_data_format: str) -> None:
if raw_data_format in _DATA_FORMATS:
raise RuntimeError(f"Implementation error for '{raw_data_format}'")
else:
raise NotImplementedError(
f"Raw data format '{raw_data_format}' is not supported"
)
def _raw_dataset_metadata_esrf(dataset_dir: str) -> Dict[str, str]:
try:
path = ESRFPath(dataset_dir)
dataset_file = str(path.raw_dataset_file)
if not os.path.exists(dataset_file):
raise FileNotFoundError("HDF5 file does not exist")
except AttributeError as e:
raise FileNotFoundError(f"Dataset directory is not valid ({e})") from e
dataset_metadata = dict()
enddate = None
try:
with h5py.File(dataset_file, "r", locking=False) as f:
if not _is_bliss_raw_dataset_file(f):
raise ValueError("HDF5 file not created by Bliss")
startdate = f.attrs.get("file_time")
for scan in map(str, sorted(map(float, list(f)))):
sample_name = _read_hdf5_dataset(
f, f"/{scan}/sample/name", default=None
)
if sample_name is not None:
dataset_metadata["Sample_name"] = str(sample_name)
enddate = _read_hdf5_dataset(f, f"/{scan}/end_time", default=enddate)
except Exception as e:
raise RuntimeError(f"HDF5 reading error ({e})") from e
if startdate is not None:
dataset_metadata["startDate"] = startdate
if enddate is not None:
dataset_metadata["endDate"] = enddate
return dataset_metadata
def _raw_dataset_metadata_id16bspec(dataset_dir: str) -> Dict[str, str]:
dataset_metadata = dict()
path = ESRFPath(dataset_dir)
filename = f"{path.proposal}-{path.collection}-{path.dataset}.h5"
dataset_file = os.path.join(dataset_dir, filename)
if not os.path.exists(dataset_file):
raise FileNotFoundError("HDF5 file does not exist")
startdate = None
enddate = None
try:
with h5py.File(dataset_file, "r", locking=False) as f:
for name in f:
entry = f[name]
try:
startdate = _read_hdf5_dataset(entry, "start_time", default=None)
enddate = _read_hdf5_dataset(entry, "end_time", default=None)
except KeyError as e:
raise ValueError(f"Time could not be read from HDF5 ({e})") from e
break
except Exception as e:
raise RuntimeError(f"HDF5 reading error ({e})") from e
if startdate is not None:
dataset_metadata["startDate"] = startdate
if enddate is not None:
dataset_metadata["endDate"] = enddate
dataset_metadata["Sample_name"] = path.collection
return dataset_metadata
def _raw_dataset_metadata_mx(dataset_dir: str) -> Dict[str, str]:
"""Read metadata from MX_METADATA_FILENAME for 'mx' format."""
metadata_file = os.path.join(dataset_dir, MX_METADATA_FILENAME)
if not os.path.isfile(metadata_file):
raise FileNotFoundError(f"{MX_METADATA_FILENAME} not found in {dataset_dir}")
with open(metadata_file, "r") as f:
metadata = json.load(f)
return {key: str(value) for key, value in metadata.items() if value is not None}
def _is_bliss_raw_dataset_file(f: h5py.File) -> bool:
return f.attrs.get("creator", "").lower() in ("bliss", "blissdata", "blisswriter")
def _read_hdf5_dataset(parent: h5py.Group, name: str, default=None) -> Any:
try:
value = parent[name][()]
except KeyError:
return default
try:
return value.decode()
except AttributeError:
pass
return value