Source code for pyicat_plus.tests.servers.icat_db
import datetime
import json
import logging
import os
from contextlib import contextmanager
from glob import glob
from typing import Dict
from typing import Generator
from typing import List
from typing import Optional
logger = logging.getLogger("STOMP SUBSCRIBER")
[docs]
class IcatDb:
def __init__(self, root_dir: Optional[str] = None):
if root_dir is None:
root_dir = "."
if root_dir:
os.makedirs(root_dir, exist_ok=True)
self._root_dir = root_dir
self._investigations = os.path.join(root_dir, "investigations.json")
[docs]
def start_investigation(self, investigation: dict) -> None:
investigation["instrument"] = {"name": investigation["instrument"]}
investigation["proposal"] = investigation["experiment"]
_add_table(self._investigations, investigation)
[docs]
def store_dataset(self, dataset: dict) -> None:
investigation = _find_data(self._investigations, dataset["startDate"])
if investigation is None:
investigation_id = _get_investigation_id(dataset)
investigation = _find_investigation(self._investigations, investigation_id)
if investigation is None:
logger.error(
"Dataset not stored because no investigation found: %s", dataset
)
return
filename = os.path.join(self._root_dir, f"datasets{investigation['id']}.json")
dataset["id"] = self._get_next_dataset_id()
_add_table(filename, dataset)
[docs]
@contextmanager
def update_dataset(self, dataset_id: int) -> Generator[dict, None, None]:
for filename in glob(os.path.join(self._root_dir, "datasets*.json")):
data = _get_row(filename, dataset_id)
if data is None:
continue
yield data
_edit_table(filename, data)
break
else:
yield
[docs]
def get_all_investigations(self) -> List[Dict]:
return _read_table(self._investigations)
[docs]
def get_investigations(self, instrument: str, experiment: str) -> List[Dict]:
return [
investigation
for investigation in _read_table(self._investigations)
if experiment == investigation["experiment"]
and instrument == investigation["instrument"]["name"]
]
[docs]
def get_datasets(self, investigation_id) -> List[Dict]:
filename = os.path.join(self._root_dir, f"datasets{investigation_id}.json")
return _read_table(filename)
def _get_next_dataset_id(self) -> int:
max_ids = [
_get_max_id(filename)
for filename in glob(os.path.join(self._root_dir, "datasets*.json"))
]
if max_ids:
return max(max_ids) + 1
return 0
def _read_table(filename: str) -> List[dict]:
if not os.path.isfile(filename):
return list()
with open(filename, "r") as f:
return json.load(f)
def _write_table(filename: str, data: dict) -> Dict:
with open(filename, "w") as f:
json.dump(data, f)
logger.info("Metadata in %s updated", filename)
def _add_table(filename: str, data: dict) -> None:
table = _read_table(filename)
if "id" not in data:
if table:
data_id = max(row["id"] for row in table) + 1
else:
data_id = 0
data = dict(data)
data["id"] = data_id
table.append(data)
logger.info("Add metadata to %s: %s", filename, data)
_write_table(filename, table)
def _edit_table(filename: str, data: dict) -> None:
table = _read_table(filename)
for row in table:
if row["id"] == data["id"]:
logger.info("Edit metadata of %s: %s", filename, data)
row.update(data)
_write_table(filename, table)
break
else:
_add_table(filename, data)
def _get_row(filename: str, dataset_id: int) -> Optional[dict]:
table = _read_table(filename)
for row in table:
if row["id"] == dataset_id:
return row
def _get_max_id(filename: str) -> int:
table = _read_table(filename)
if table:
return max(row["id"] for row in table)
return -1
def _find_data(filename: str, date: str) -> Optional[Dict]:
date = datetime.datetime.fromisoformat(date).astimezone()
for data in _read_table(filename):
start_date = datetime.datetime.fromisoformat(data["startDate"]).astimezone()
end_date = data.get("endDate")
if end_date is None:
# infinite timeslot
inside_timeslot = date >= start_date
else:
# finite timeslot
end_date = datetime.datetime.fromisoformat(end_date).astimezone()
inside_timeslot = date >= start_date and date <= end_date
if inside_timeslot:
return data
def _get_investigation_id(dataset: dict) -> Optional[str]:
for param in dataset.get("parameter", []):
if param["name"] == "investigationId":
return param["value"]
return None
def _find_investigation(filename: str, investigation_id: str) -> Optional[Dict]:
for data in _read_table(filename):
if _get_investigation_id(data) == investigation_id:
return data