Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File management improvements #24

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/broker/operandi_broker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
__all__ = [
"cli",
"ServiceBroker",
"JobStatusWorker",
"Worker"
"cli",
"JobWorkerDownload",
"JobWorkerStatus",
"JobWorkerSubmit",
"ServiceBroker",
]

from .cli import cli
from .broker import ServiceBroker
from .job_status_worker import JobStatusWorker
from .worker import Worker
from .job_worker_download import JobWorkerDownload
from .job_worker_status import JobWorkerStatus
from .job_worker_submit import JobWorkerSubmit
88 changes: 13 additions & 75 deletions src/broker/operandi_broker/broker.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
from logging import getLogger
from os import environ, fork
import psutil
import signal
from os import environ
from time import sleep

from operandi_utils import (
get_log_file_path_prefix, reconfigure_all_loggers, verify_database_uri, verify_and_parse_mq_uri)
from operandi_utils.constants import LOG_LEVEL_BROKER
from operandi_utils.rabbitmq.constants import (
RABBITMQ_QUEUE_HARVESTER, RABBITMQ_QUEUE_USERS, RABBITMQ_QUEUE_JOB_STATUSES)
from .worker import Worker
from .job_status_worker import JobStatusWorker
RABBITMQ_QUEUE_HPC_DOWNLOADS, RABBITMQ_QUEUE_HARVESTER, RABBITMQ_QUEUE_USERS, RABBITMQ_QUEUE_JOB_STATUSES)

from .broker_utils import create_child_process, kill_workers


class ServiceBroker:
Expand Down Expand Up @@ -48,14 +46,15 @@ def run_broker(self):
# A list of queues for which a worker process should be created
queues = [RABBITMQ_QUEUE_HARVESTER, RABBITMQ_QUEUE_USERS]
status_queue = RABBITMQ_QUEUE_JOB_STATUSES
hpc_download_queue = RABBITMQ_QUEUE_HPC_DOWNLOADS
try:
for queue_name in queues:
self.log.info(f"Creating a worker process to consume from queue: {queue_name}")
self.create_worker_process(
queue_name=queue_name, status_checker=False, tunnel_port_executor=22, tunnel_port_transfer=22)
self.create_worker_process(queue_name, "submit_worker")
self.log.info(f"Creating a status worker process to consume from queue: {status_queue}")
self.create_worker_process(
queue_name=status_queue, status_checker=True, tunnel_port_executor=22, tunnel_port_transfer=22)
self.create_worker_process(status_queue, "status_worker")
self.log.info(f"Creating a download worker process to consume from queue: {hpc_download_queue}")
self.create_worker_process(hpc_download_queue, "download_worker")
except Exception as error:
self.log.error(f"Error while creating worker processes: {error}")

Expand All @@ -72,85 +71,24 @@ def run_broker(self):
except KeyboardInterrupt:
self.log.info(f"SIGINT signal received. Sending SIGINT to worker processes.")
# Sends SIGINT to workers
self.kill_workers()
kill_workers(self.log, self.queues_and_workers)
self.log.info(f"Closing gracefully in 3 seconds!")
exit(0)
except Exception as error:
# This is for logging any other errors
self.log.error(f"Unexpected error: {error}")

# Creates a separate worker process and append its pid if successful
def create_worker_process(
self, queue_name, tunnel_port_executor: int = 22, tunnel_port_transfer: int = 22, status_checker=False
) -> None:
def create_worker_process(self, queue_name, worker_type: str) -> None:
# If the entry for queue_name does not exist, create id
if queue_name not in self.queues_and_workers:
self.log.info(f"Initializing workers list for queue: {queue_name}")
# Initialize the worker pids list for the queue
self.queues_and_workers[queue_name] = []
child_pid = self.__create_child_process(
queue_name=queue_name, status_checker=status_checker, tunnel_port_executor=tunnel_port_executor,
tunnel_port_transfer=tunnel_port_transfer)
child_pid = create_child_process(
self.log, self.db_url, self.rabbitmq_url, queue_name, worker_type, self.test_sbatch)
# If creation of the child process was successful
if child_pid:
self.log.info(f"Assigning a new worker process with pid: {child_pid}, to queue: {queue_name}")
# append the pid to the workers list of the queue_name
(self.queues_and_workers[queue_name]).append(child_pid)

# Forks a child process
def __create_child_process(
self, queue_name, tunnel_port_executor: int = 22, tunnel_port_transfer: int = 22, status_checker=False
) -> int:
self.log.info(f"Trying to create a new worker process for queue: {queue_name}")
try:
# TODO: Try to utilize Popen() instead of fork()
created_pid = fork()
except Exception as os_error:
self.log.error(f"Failed to create a child process, reason: {os_error}")
return 0
if created_pid != 0:
return created_pid
try:
# Clean unnecessary data
# self.queues_and_workers = None
if status_checker:
child_worker = JobStatusWorker(
db_url=self.db_url, rabbitmq_url=self.rabbitmq_url, queue_name=queue_name,
tunnel_port_executor=tunnel_port_executor, tunnel_port_transfer=tunnel_port_transfer,
test_sbatch=self.test_sbatch)
else:
child_worker = Worker(
db_url=self.db_url, rabbitmq_url=self.rabbitmq_url, queue_name=queue_name,
tunnel_port_executor=tunnel_port_executor, tunnel_port_transfer=tunnel_port_transfer,
test_sbatch=self.test_sbatch)
child_worker.run()
exit(0)
except Exception as e:
self.log.error(f"Worker process failed for queue: {queue_name}, reason: {e}")
exit(-1)

def _send_signal_to_worker(self, worker_pid: int, signal_type: signal):
try:
process = psutil.Process(pid=worker_pid)
process.send_signal(signal_type)
except psutil.ZombieProcess as error:
self.log.info(f"Worker process has become a zombie: {worker_pid}, {error}")
except psutil.NoSuchProcess as error:
self.log.error(f"No such worker process with pid: {worker_pid}, {error}")
except psutil.AccessDenied as error:
self.log.error(f"Access denied to the worker process with pid: {worker_pid}, {error}")

def kill_workers(self):
interrupted_pids = []
self.log.info(f"Starting to send SIGINT to all workers")
# Send SIGINT to all workers
for queue_name in self.queues_and_workers:
self.log.info(f"Sending SIGINT to workers of queue: {queue_name}")
for worker_pid in self.queues_and_workers[queue_name]:
self._send_signal_to_worker(worker_pid=worker_pid, signal_type=signal.SIGINT)
interrupted_pids.append(worker_pid)
sleep(3)
self.log.info(f"Sending SIGKILL (if needed) to previously interrupted workers")
# Check whether workers exited properly
for pid in interrupted_pids:
self._send_signal_to_worker(worker_pid=pid, signal_type=signal.SIGKILL)
67 changes: 67 additions & 0 deletions src/broker/operandi_broker/broker_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from logging import Logger
from os import fork
import psutil
import signal
from time import sleep
from typing import Dict

from .job_worker_download import JobWorkerDownload
from .job_worker_status import JobWorkerStatus
from .job_worker_submit import JobWorkerSubmit


# Forks a child process
def create_child_process(
logger: Logger, db_url: str, rabbitmq_url: str, queue_name: str, worker_type: str, test_batch: bool
) -> int:
logger.info(f"Trying to create a new worker process for queue: {queue_name}")
try:
created_pid = fork()
except Exception as os_error:
logger.error(f"Failed to create a child process, reason: {os_error}")
return 0

if created_pid != 0:
return created_pid
try:
if worker_type == "status_worker":
child_worker = JobWorkerStatus(db_url, rabbitmq_url, queue_name)
child_worker.run(hpc_executor=True, hpc_io_transfer=True, publisher=True)
elif worker_type == "download_worker":
child_worker = JobWorkerDownload(db_url, rabbitmq_url, queue_name)
child_worker.run(hpc_executor=True, hpc_io_transfer=True, publisher=False)
else: # worker_type == "submit_worker"
child_worker = JobWorkerSubmit(db_url, rabbitmq_url, queue_name, test_batch)
child_worker.run(hpc_executor=True, hpc_io_transfer=True, publisher=False)
exit(0)
except Exception as e:
logger.error(f"Worker process failed for queue: {queue_name}, reason: {e}")
exit(-1)


def send_signal_to_worker(logger: Logger, worker_pid: int, signal_type: signal):
try:
process = psutil.Process(pid=worker_pid)
process.send_signal(signal_type)
except psutil.ZombieProcess as error:
logger.info(f"Worker process has become a zombie: {worker_pid}, {error}")
except psutil.NoSuchProcess as error:
logger.error(f"No such worker process with pid: {worker_pid}, {error}")
except psutil.AccessDenied as error:
logger.error(f"Access denied to the worker process with pid: {worker_pid}, {error}")


def kill_workers(logger: Logger, queues_and_workers: Dict):
interrupted_pids = []
logger.info(f"Starting to send SIGINT to all workers")
# Send SIGINT to all workers
for queue_name in queues_and_workers:
logger.info(f"Sending SIGINT to workers of queue: {queue_name}")
for worker_pid in queues_and_workers[queue_name]:
send_signal_to_worker(logger, worker_pid=worker_pid, signal_type=signal.SIGINT)
interrupted_pids.append(worker_pid)
sleep(3)
logger.info(f"Sending SIGKILL (if needed) to previously interrupted workers")
# Check whether workers exited properly
for pid in interrupted_pids:
send_signal_to_worker(logger, worker_pid=pid, signal_type=signal.SIGKILL)
Loading
Loading