import json
import logging
import os
import socket
import threading
from enum import Enum
from json.decoder import JSONDecodeError
from typing import Optional
from xml.etree.ElementTree import ParseError
import stomp
from ..._icat_messaging.deserializers.xml import deserialize_message
from ...utils.log_utils import basic_config
from .activemq_rest_server import ICAT_QUEUES
from .icat_db import IcatDb
logger = logging.getLogger("STOMP SUBSCRIBER")
MessageType = Enum("MessageType", "investigation dataset archiving addfiles unknown")
[docs]
class MyListener(stomp.ConnectionListener):
def __init__(self, conn, icat_data_dir: Optional[str] = None):
self.conn = conn
self.s_out = None
self.icatdb = IcatDb(icat_data_dir)
super().__init__()
[docs]
def redirect_messages(self, port):
if self.s_out is not None:
self.s_out.close()
self.s_out = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s_out.connect(("localhost", port))
logger.info(f"Redirect received messages to port {port}")
[docs]
def on_message(self, frame):
message = frame.body
logger.info("received message:\n %s", message)
try:
message_type, data = deserialize_message(message.encode("utf-8"))
if message_type == "investigation":
message_type = MessageType.investigation
elif message_type == "dataset":
message_type = MessageType.dataset
else:
message_type = None
logger.info("Parsed XML message (%s):\n %s", message_type, data)
except ParseError:
try:
data = json.loads(message)
keys = set(data)
if keys == {"datasetId", "type", "level", "message"}:
message_type = MessageType.archiving
elif keys == {"datasetId"}:
message_type = MessageType.addfiles
else:
message_type = MessageType.unknown
logger.info("Parsed JSON message (%s):\n %s", message_type, data)
except JSONDecodeError:
data = None
message_type = MessageType.unknown
logger.info("Message parsing failed (%s):\n %s", message_type, data)
# Only access specific destinations
header = frame.headers
icat_queues = ["/queue/" + q for q in ICAT_QUEUES]
if header.get("destination") not in icat_queues:
return
# Only accept valid proposals
if message_type in (message_type.investigation, message_type.dataset):
if message_type is message_type.investigation:
proposal = data["experiment"]
else:
proposal = data["investigation"]
# Convert to backend format
file_count = 0
if os.path.exists(data["location"]):
file_count = len(os.listdir(data["location"]))
data["parameter"].append({"name": "__fileCount", "value": file_count})
data["parameters"] = data.pop("parameter")
if proposal and "666" in proposal:
logger.info(
"Do not register %s for invalid proposal '%s'",
message_type,
proposal,
)
return
# Store data
if message_type in (message_type.investigation, message_type.dataset):
if message_type is message_type.investigation:
self.icatdb.start_investigation(data)
else:
self.icatdb.store_dataset(data)
if message_type == message_type.addfiles:
dataset_id = data["datasetId"]
with self.icatdb.update_dataset(dataset_id) as data:
if data is None:
logger.error("datasetId %s does not exist", dataset_id)
else:
file_count = 0
dirname = data["location"]
if os.path.exists(dirname):
file_count = len(os.listdir(dirname))
for parameter in data["parameters"]:
if parameter["name"] == "__fileCount":
logger.info(
"Update file count for %s to %d", dirname, file_count
)
parameter["value"] = file_count
break
else:
logger.info("Add file count for %s to %d", dirname, file_count)
data["parameters"].append(
{"name": "__fileCount", "value": file_count}
)
# Notify that data is valid
if self.s_out is not None:
self.s_out.sendall(frame.body.encode() + b"\n")
[docs]
def main(
host=None, port=60001, queue=None, port_out=0, icat_data_dir: Optional[str] = None
):
if not host:
host = "localhost"
if not queue:
queue = "/queue/icatIngest"
conn = stomp.Connection([(host, port)])
# Listener will run in a different thread
listener = MyListener(conn, icat_data_dir)
conn.set_listener("", listener)
conn.connect("guest", "guest", wait=True)
conn.subscribe(destination=queue, id=1, ack="auto")
logger.info(f"subscribed to {queue} on STOMP {host}:{port}")
if port_out:
listener.redirect_messages(port_out)
listener.s_out.sendall(b"LISTENING\n")
logger.info("CTRL-C to stop")
try:
threading.Event().wait()
finally:
logger.info("Exit.")
if __name__ == "__main__":
import argparse
basic_config(
logger=logger,
level=logging.DEBUG,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
parser = argparse.ArgumentParser(
description="STOMP client which subscribes to a STOMP queue and redirect its output to a socket"
)
parser.add_argument(
"--host", default="localhost", type=str, help="STOMP server host"
)
parser.add_argument("--port", default=60001, type=int, help="STOMP server port")
parser.add_argument(
"--queue", default="/queue/icatIngest", type=str, help="STOMP queue"
)
parser.add_argument("--port_out", default=0, type=int, help="output socket")
parser.add_argument(
"--icat_data_dir", default=None, type=str, help="Dataset directory"
)
args = parser.parse_args()
main(
host=args.host,
port=args.port,
port_out=args.port_out,
queue=args.queue,
icat_data_dir=args.icat_data_dir,
)