import base64
import datetime
import logging
import mimetypes
import socket
import warnings
from enum import Enum
from importlib.metadata import version as get_version
from typing import Iterable
from typing import List
from typing import Optional
from typing import Tuple
from urllib.parse import urljoin
import requests
from ..utils.url import normalize_url
from . import defaults
release = get_version("pyicat_plus")
logger = logging.getLogger(__name__)
[docs]
class IcatElogbookClient:
"""Client for the e-logbook part of the ICAT+ REST API.
REST API docs:
https://icatplus.esrf.fr/api-docs/
The ICAT+ server project:
https://gitlab.esrf.fr/icat/icat-plus/-/blob/master/README.md
"""
DEFAULT_SCHEME = "https"
def __init__(
self,
url: str,
api_key: Optional[str] = None,
timeout: Optional[float] = None,
**payload,
):
if api_key is None:
api_key = defaults.ELOGBOOK_TOKEN
url = normalize_url(url, default_scheme=self.DEFAULT_SCHEME)
path = f"dataacquisition/{api_key}/notification"
self._message_url = urljoin(url, path)
path = f"dataacquisition/{api_key}/base64"
self._data_url = urljoin(url, path)
self._init_payload = payload
self._init_payload.setdefault("machine", socket.getfqdn())
self._init_payload.setdefault("software", "pyicat-plus_v" + release)
self.raise_error = True
if timeout is None:
timeout = 0.1
self.timeout = timeout
def _merge_payloads(self, message_payload: dict, call_payload: dict) -> dict:
payloads = self._sorted_payloads(message_payload, call_payload)
result = {k: v for payload in payloads for k, v in payload.items()}
tags = self._merge_payload_tags(*payloads)
if tags:
result.pop("tags", None)
result["tag"] = tags
return result
def _sorted_payloads(self, message_payload: dict, call_payload: dict) -> List[dict]:
"""Sorted by increasing priority"""
return [message_payload, self._init_payload, call_payload]
def _merge_payload_tags(self, *payloads: Iterable[dict]) -> List[dict]:
"""The payload tags can be eithers a list of strings or a list of dictionaries.
The return value are the merged tags as a list of dictionaries.
"""
names = set()
tags = list()
for payload in payloads:
ptags = payload.get("tag", list()) + payload.get("tags", list())
for tag in ptags:
if isinstance(tag, str):
if tag in names:
continue
names.add(tag)
tags.append({"name": tag})
else:
if tag["name"] in names:
continue
names.add(tag["name"])
tags.append(tag)
return tags
def _post_with_payload(
self, url: str, message_payload: dict, call_payload: dict
) -> None:
payload = self._merge_payloads(message_payload, call_payload)
payload.setdefault(
"creationDate", datetime.datetime.now().astimezone().isoformat()
)
try:
response = requests.post(url, json=payload, timeout=self.timeout)
except requests.exceptions.ReadTimeout:
return # we have no confirmation that the call succeeded
except Exception as e:
if self.raise_error:
raise
logger.exception(e)
return
if self.raise_error:
response.raise_for_status()
elif not response.ok:
logger.error("%s: %s", response, response.text)
[docs]
def send_message(
self,
message: str,
message_type: Optional[str] = None,
editable: Optional[bool] = None,
formatted: Optional[bool] = None,
mimetype: Optional[str] = None,
beamline: Optional[str] = None,
investigation_id: Optional[str] = None,
proposal: Optional[str] = None,
dataset: Optional[str] = None,
**call_payload,
):
url = self._compose_url(
url=self._message_url,
beamline=beamline,
proposal=proposal,
investigation_id=investigation_id,
)
message_payload = self._encode_message(
message,
message_type=message_type,
editable=editable,
formatted=formatted,
mimetype=mimetype,
dataset=dataset,
)
self._post_with_payload(url, message_payload, call_payload)
[docs]
def send_binary_data(
self,
data: bytes,
mimetype: str,
beamline: Optional[str] = None,
proposal: Optional[str] = None,
investigation_id: Optional[str] = None,
**call_payload,
):
url = self._compose_url(
url=self._data_url,
beamline=beamline,
proposal=proposal,
investigation_id=investigation_id,
)
message_payload = self._encode_binary_data(data, mimetype=mimetype)
self._post_with_payload(url, message_payload, call_payload)
@staticmethod
def _compose_url(
url: str,
beamline: Optional[str] = None,
proposal: Optional[str] = None,
investigation_id: Optional[str] = None,
):
query = {}
if beamline:
query["instrumentName"] = beamline
if proposal:
query["investigationName"] = proposal
if investigation_id:
query["investigationId"] = investigation_id
query = "&".join([f"{k}={v}" for k, v in query.items()])
return f"{url}?{query}"
[docs]
def send_text_file(
self,
filename: str,
beamline: Optional[str] = None,
proposal: Optional[str] = None,
investigation_id: Optional[str] = None,
dataset: Optional[str] = None,
message_type: Optional[str] = None,
editable: Optional[bool] = None,
formatted: Optional[bool] = None,
mimetype: Optional[str] = None,
**payload,
):
with open(filename, "r") as f:
message = f.read()
self.send_message(
message,
message_type=message_type,
proposal=proposal,
investigation_id=investigation_id,
beamline=beamline,
dataset=dataset,
editable=editable,
formatted=formatted,
mimetype=mimetype,
**payload,
)
[docs]
def send_binary_file(
self,
filename: str,
beamline: Optional[str] = None,
proposal: Optional[str] = None,
**payload,
):
with open(filename, "rb") as f:
data = f.read()
mimetype, _ = mimetypes.guess_type(filename, strict=True)
self.send_binary_data(
data, mimetype=mimetype, beamline=beamline, proposal=proposal, **payload
)
def _encode_message(
self,
message: str,
message_type: Optional[str] = None,
editable: Optional[bool] = None,
formatted: Optional[bool] = None,
mimetype: Optional[str] = None,
dataset: Optional[str] = None,
) -> dict:
message_category, message_type = _message_category_and_type(
message_type=message_type, editable=editable, formatted=formatted
)
if mimetype is None:
mimetype = "text/plain"
try:
format = _MessageFormatMapping[mimetype]
except KeyError:
raise ValueError(
f"mime type '{mimetype}' is not supported ({list(_MessageFormatMapping)})"
) from None
message = {
"type": message_type.name,
"category": message_category.name,
"content": [{"format": format.name, "text": message}],
}
if dataset:
message["datasetName"] = dataset
return message
def _encode_binary_data(
self,
data: bytes,
mimetype: Optional[str] = None,
) -> dict:
if not mimetype:
# arbitrary binary data
mimetype = "application/octet-stream"
data_header = f"data:{mimetype};base64,"
data_blob = base64.b64encode(data).decode("latin-1")
return {"base64": data_header + data_blob}
_MessageCategory = Enum("MessageCategory", "debug info error commandLine comment")
_MessageType = Enum("MessageType", "annotation notification")
_MessageFormat = Enum("MessageType", "plainText html")
_MessageCategoryMapping = {
"debug": _MessageCategory.debug,
"info": _MessageCategory.info,
"warning": _MessageCategory.error,
"warn": _MessageCategory.error,
"error": _MessageCategory.error,
"critical": _MessageCategory.error,
"fatal": _MessageCategory.error,
"command": _MessageCategory.commandLine,
"comment": _MessageCategory.comment,
}
_MessageFormatMapping = {
"text/plain": _MessageFormat.plainText,
"text/html": _MessageFormat.html,
}
def _message_category_and_type(
message_type: Optional[str] = None,
editable: Optional[bool] = None,
formatted: Optional[bool] = None,
) -> Tuple[_MessageCategory, _MessageType]:
"""Derive the ICAT message category from the API message type.
The ICAT message types are:
- "annotation" (default for comments): editable and unformatted message
- "notification" (default for non-comments): formatted and not editable
Only comments can be editable and a message cannot be editable and
formatted at the same time.
"""
if message_type is None:
message_type = "comment"
try:
category = _MessageCategoryMapping[message_type.lower()]
except KeyError:
raise ValueError(
f"'{message_type}' is not a valid e-logbook message type"
) from None
if category != _MessageCategory.comment:
# Non-comments cannot be editable
if editable:
warnings.warn(
f"message type '{message_type}' cannot be editable", UserWarning
)
editable = None
# Non-comments cannot be unformatted
if formatted is not None and not formatted:
warnings.warn(
f"message type '{message_type}' cannot be unformatted", UserWarning
)
formatted = None
# A message cannot be editable and formatted at the same time or uneditable and unformatted
if formatted == editable and formatted is not None:
if formatted:
warnings.warn(
f"message type '{message_type}' cannot be editable and formatted at the same time",
UserWarning,
)
else:
warnings.warn(
f"message type '{message_type}' cannot be uneditable and unformatted at the same time",
UserWarning,
)
formatted = None
editable = None
# Editability is specified
if editable is not None:
if editable:
return category, _MessageType.annotation
return category, _MessageType.notification
# Formatability is specified
if formatted is not None:
if formatted:
return category, _MessageType.notification
return category, _MessageType.annotation
# By default comments are annotations and the rest are notifications
if category == _MessageCategory.comment:
return category, _MessageType.annotation
return category, _MessageType.notification