Skip to content

Commit

Permalink
Fixes cyclic imports, Improved Logging, Minor logging bug in terminate (
Browse files Browse the repository at this point in the history
#588)

* moved constants from create to separate file, fixed minor bugs in logging

* improved security group handling

* fixed tests

* explicified generic import

* switched to generators

* fixed typo, improved logging and style
  • Loading branch information
XaverStiensmeier authored Jan 15, 2025
1 parent 33a0720 commit 44e3dc2
Show file tree
Hide file tree
Showing 15 changed files with 166 additions and 177 deletions.
58 changes: 13 additions & 45 deletions bibigrid/core/actions/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@
The cluster creation (master's creation, key creation, ansible setup and execution, ...) is done here
"""

import os
import shutil
import subprocess
import threading
import traceback
from functools import partial
import os

import paramiko
import sympy
Expand All @@ -18,46 +17,13 @@
from bibigrid.core.utility import id_generation
from bibigrid.core.utility import image_selection
from bibigrid.core.utility.handler import ssh_handler
from bibigrid.core.utility.paths import ansible_resources_path as a_rp
from bibigrid.core.utility.paths import bin_path
from bibigrid.models import exceptions
from bibigrid.models import return_threading
from bibigrid.models.exceptions import ExecutionException, ConfigurationException

PREFIX = "bibigrid"
SEPARATOR = "-"
PREFIX_WITH_SEP = PREFIX + SEPARATOR
FILEPATHS = [(a_rp.PLAYBOOK_PATH, a_rp.PLAYBOOK_PATH_REMOTE), (bin_path.BIN_PATH, bin_path.BIN_PATH_REMOTE)]


def get_identifier(identifier, cluster_id, additional=""):
"""
This method does more advanced string formatting to generate master, vpngtw and worker names
@param identifier: master|vpngtw|worker
@param cluster_id: id of cluster
@param additional: an additional string to be added at the end
@return: the generated string
"""
general = PREFIX_WITH_SEP + identifier + SEPARATOR + cluster_id
if additional or additional == 0:
return general + SEPARATOR + str(additional)
return general


MASTER_IDENTIFIER = partial(get_identifier, identifier="master", additional="")
WORKER_IDENTIFIER = partial(get_identifier, identifier="worker")
VPN_WORKER_IDENTIFIER = partial(get_identifier, identifier="vpngtw")

KEY_PREFIX = "tempKey_bibi"
CONFIG_FOLDER = os.path.expanduser("~/.config/bibigrid/")
KEY_FOLDER = os.path.join(CONFIG_FOLDER, "keys/")
AC_NAME = "ac" + SEPARATOR + "{cluster_id}"
KEY_NAME = KEY_PREFIX + SEPARATOR + "{cluster_id}"
CLUSTER_MEMORY_FOLDER = KEY_FOLDER
CLUSTER_MEMORY_FILE = ".bibigrid.mem"
CLUSTER_MEMORY_PATH = os.path.join(CONFIG_FOLDER, CLUSTER_MEMORY_FILE)
DEFAULT_SECURITY_GROUP_NAME = "default" + SEPARATOR + "{cluster_id}"
WIREGUARD_SECURITY_GROUP_NAME = "wireguard" + SEPARATOR + "{cluster_id}"
from bibigrid.core.utility.paths import ansible_resources_path as a_rp
from bibigrid.core.utility.statics.create_statics import AC_NAME, KEY_NAME, DEFAULT_SECURITY_GROUP_NAME, \
WIREGUARD_SECURITY_GROUP_NAME, KEY_FOLDER, CLUSTER_MEMORY_PATH, MASTER_IDENTIFIER, WORKER_IDENTIFIER, \
VPNGTW_IDENTIFIER, UPLOAD_FILEPATHS


class Create: # pylint: disable=too-many-instance-attributes,too-many-arguments
Expand Down Expand Up @@ -238,7 +204,7 @@ def start_vpn_or_master(self, configuration, provider): # pylint: disable=too-m
raise ConfigurationException(f"MAC address for ip {configuration['private_v4']} not found.")

# pylint: disable=comparison-with-callable
if identifier == VPN_WORKER_IDENTIFIER or (identifier == MASTER_IDENTIFIER and self.use_master_with_public_ip):
if identifier == VPNGTW_IDENTIFIER or (identifier == MASTER_IDENTIFIER and self.use_master_with_public_ip):
configuration["floating_ip"] = \
provider.attach_available_floating_ip(network=external_network, server=server)["floating_ip_address"]
if identifier == MASTER_IDENTIFIER:
Expand Down Expand Up @@ -303,12 +269,13 @@ def create_server_volumes(self, provider, instance, name):
@param name: sever name
@return:
"""
self.log.info("Creating volumes ...")
self.log.info(f"Creating volumes for {name}...")
return_volumes = []

group_instance = {"volumes": []}
instance["group_instances"] = {name: group_instance}

for i, volume in enumerate(instance.get("volumes", [])):
self.log.debug(f"Volume {i}: {volume}")
if not volume.get("exists"):
if volume.get("permanent"):
infix = "perm"
Expand All @@ -332,10 +299,10 @@ def create_server_volumes(self, provider, instance, name):
if not return_volume:
raise ConfigurationException(f"Snapshot {volume['snapshot']} not found!")
else:
self.log.debug("Creating volume...")
return_volume = provider.create_volume(name=volume_name, size=volume.get("size", 50),
volume_type=volume.get("type"),
description=f"Created for {name}")
self.log.info(f"Volumes {i} created for {name}...")
return_volumes.append(return_volume)
return return_volumes

Expand Down Expand Up @@ -382,7 +349,7 @@ def prepare_vpn_or_master_args(self, configuration):
identifier = MASTER_IDENTIFIER
elif configuration.get("vpnInstance"):
instance_type = configuration["vpnInstance"]
identifier = VPN_WORKER_IDENTIFIER
identifier = VPNGTW_IDENTIFIER
else:
self.log.warning(
f"Configuration {configuration['cloud_identifier']} "
Expand Down Expand Up @@ -464,7 +431,8 @@ def upload_data(self, private_key, clean_playbook=False):
ssh_handler.execute_ssh(ssh_data=ssh_data, log=self.log)
self.log.info("Uploading Data")
ssh_data = {"floating_ip": self.master_ip, "private_key": private_key, "username": self.ssh_user,
"commands": commands, "filepaths": FILEPATHS, "gateway": self.configurations[0].get("gateway", {}),
"commands": commands, "filepaths": UPLOAD_FILEPATHS,
"gateway": self.configurations[0].get("gateway", {}),
"timeout": self.ssh_timeout}
ssh_handler.execute_ssh(ssh_data=ssh_data, log=self.log)

Expand Down
4 changes: 2 additions & 2 deletions bibigrid/core/actions/list_clusters.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pprint
import re

from bibigrid.core.actions import create
from bibigrid.core.utility.statics.create_statics import MASTER_IDENTIFIER

SERVER_REGEX = re.compile(r"^bibigrid-((master)-([a-zA-Z0-9]+)|(worker|vpngtw)-([a-zA-Z0-9]+)-\d+)$")

Expand Down Expand Up @@ -148,7 +148,7 @@ def get_master_access_ip(cluster_id, master_provider, log):
"""
# TODO: maybe move the method from list_clusters as it is now independent of list_clusters
log.info("Finding master ip for cluster %s...", cluster_id)
master = create.MASTER_IDENTIFIER(cluster_id=cluster_id)
master = MASTER_IDENTIFIER(cluster_id=cluster_id)
server = master_provider.get_server(master)
if server:
return server.get("public_v4") or server.get("public_v6") or server.get("private_v4")
Expand Down
27 changes: 14 additions & 13 deletions bibigrid/core/actions/terminate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
import re
import time

from bibigrid.core.actions import create
from bibigrid.core.utility.statics.create_statics import DEFAULT_SECURITY_GROUP_NAME, WIREGUARD_SECURITY_GROUP_NAME, \
KEY_NAME, KEY_FOLDER, AC_NAME
from bibigrid.models.exceptions import ConflictException


Expand All @@ -26,14 +27,14 @@ def terminate(cluster_id, providers, log, debug=False, assume_yes=False):
if not input(f"DEBUG MODE: Any non-empty input to shutdown cluster {cluster_id}. "
"Empty input to exit with cluster still alive:"):
return 0
security_groups = [create.DEFAULT_SECURITY_GROUP_NAME]
security_groups = [DEFAULT_SECURITY_GROUP_NAME]
if len(providers) > 1:
security_groups.append(create.WIREGUARD_SECURITY_GROUP_NAME)
security_groups.append(WIREGUARD_SECURITY_GROUP_NAME)
cluster_server_state = []
cluster_keypair_state = []
cluster_security_group_state = []
cluster_volume_state = []
tmp_keyname = create.KEY_NAME.format(cluster_id=cluster_id)
tmp_keyname = KEY_NAME.format(cluster_id=cluster_id)
local_keypairs_deleted = delete_local_keypairs(tmp_keyname, log)
if assume_yes or local_keypairs_deleted or input(
f"WARNING: No local temporary keyfiles found for cluster {cluster_id}. "
Expand Down Expand Up @@ -117,7 +118,7 @@ def delete_local_keypairs(tmp_keyname, log):
"""
success = False
log.info("Deleting Keypair locally...")
tmp_keypath = os.path.join(create.KEY_FOLDER, tmp_keyname)
tmp_keypath = os.path.join(KEY_FOLDER, tmp_keyname)
pub_tmp_keypath = tmp_keypath + ".pub"
if os.path.isfile(tmp_keypath):
os.remove(tmp_keypath)
Expand Down Expand Up @@ -148,14 +149,14 @@ def delete_security_groups(provider, cluster_id, security_groups, log, timeout=5
for security_group_format in security_groups:
security_group_name = security_group_format.format(cluster_id=cluster_id)
attempts = 0
tmp_success = False
tmp_success = not provider.get_security_group(security_group_name)
while not tmp_success:
try:
not_found = not provider.get_security_group(security_group_name)
tmp_success = provider.delete_security_group(security_group_name)
except ConflictException:
log.info(f"ConflictException on deletion attempt on {provider.cloud_specification['identifier']}.")
tmp_success = False
if tmp_success or not_found:
if tmp_success:
break
if attempts < timeout:
attempts += 1
Expand All @@ -166,7 +167,7 @@ def delete_security_groups(provider, cluster_id, security_groups, log, timeout=5
log.error(f"Attempt to delete security group {security_group_name} on "
f"{provider.cloud_specification['identifier']} failed.")
break
log.info(f"Delete security_group {security_group_name} -> {tmp_success or not_found} on "
log.info(f"Delete security_group {security_group_name} -> {tmp_success} on "
f"{provider.cloud_specification['identifier']}.")
success = success and tmp_success
return success
Expand All @@ -183,7 +184,7 @@ def delete_application_credentials(master_provider, cluster_id, log):
# implement deletion
auth = master_provider.cloud_specification["auth"]
if not auth.get("application_credential_id") or not auth.get("application_credential_secret"):
return master_provider.delete_application_credential_by_id_or_name(create.AC_NAME.format(cluster_id=cluster_id))
return master_provider.delete_application_credential_by_id_or_name(AC_NAME.format(cluster_id=cluster_id))
log.info("Because you used application credentials to authenticate, "
"no created application credentials need deletion.")
return True
Expand All @@ -197,7 +198,7 @@ def delete_non_permanent_volumes(provider, cluster_id, log):
@param log:
@return: a list of the servers' (that were to be terminated) termination states
"""
log.info("Deleting tmp volumes on provider %s...", provider.cloud_specification['identifier'])
log.info("Deleting non permanent volumes on provider %s...", provider.cloud_specification['identifier'])
volume_list = provider.list_volumes()
cluster_volume_state = []
volume_regex = re.compile(
Expand Down Expand Up @@ -228,7 +229,7 @@ def terminate_output(*, cluster_server_state, cluster_keypair_state, cluster_sec
cluster_server_terminated = all(cluster_server_state)
cluster_keypair_deleted = all(cluster_keypair_state)
cluster_security_group_deleted = all(cluster_security_group_state)
cluster_volume_deleted = all(cluster_volume_state)
cluster_volume_deleted = all(all(instance_volume_states) for instance_volume_states in cluster_volume_state)
if cluster_existed:
if cluster_server_terminated:
log.info("Terminated all servers of cluster %s.", cluster_id)
Expand All @@ -254,7 +255,7 @@ def terminate_output(*, cluster_server_state, cluster_keypair_state, cluster_sec
"\nAll servers terminated: %s"
"\nAll keys deleted: %s"
"\nAll security groups deleted: %s"
"\nAll security groups deleted: %s", cluster_id, cluster_server_terminated,
"\nAll volumes deleted: %s", cluster_id, cluster_server_terminated,
cluster_keypair_deleted, cluster_security_group_deleted, cluster_volume_deleted)
if ac_state:
log.info("Successfully handled application credential of cluster %s.", cluster_id)
Expand Down
3 changes: 1 addition & 2 deletions bibigrid/core/actions/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
Module that contains methods to update the master playbook
"""

from bibigrid.core.actions import create
from bibigrid.core.actions.list_clusters import dict_clusters
from bibigrid.core.utility.handler import cluster_ssh_handler

Expand All @@ -19,7 +18,7 @@ def update(creator, log):
log.warning(f"There are still workers up! {workers}")
return 1
if master_ip and ssh_user and used_private_key:
master = create.MASTER_IDENTIFIER(cluster_id=creator.cluster_id)
master = creator.MASTER_IDENTIFIER(cluster_id=creator.cluster_id)
server = creator.providers[0].get_server(master)
creator.master_ip = master_ip
creator.configurations[0]["private_v4"] = server["private_v4"]
Expand Down
20 changes: 10 additions & 10 deletions bibigrid/core/utility/ansible_configurator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import mergedeep
import yaml

from bibigrid.core.actions import create
from bibigrid.core.utility.statics.create_statics import MASTER_IDENTIFIER, VPNGTW_IDENTIFIER, WORKER_IDENTIFIER
from bibigrid.core.actions import ide
from bibigrid.core.actions.version import __version__
from bibigrid.core.utility import id_generation
Expand Down Expand Up @@ -57,7 +57,7 @@ def generate_site_file_yaml(user_roles):

def write_worker_host_vars(*, cluster_id, worker, worker_count, log):
for worker_number in range(worker.get('count', 1)):
name = create.WORKER_IDENTIFIER(cluster_id=cluster_id, additional=worker_count + worker_number)
name = WORKER_IDENTIFIER(cluster_id=cluster_id, additional=worker_count + worker_number)
write_volumes = []
for i, volume in enumerate(worker.get("volumes", [])):
if not volume.get("exists"):
Expand All @@ -79,10 +79,10 @@ def write_worker_host_vars(*, cluster_id, worker, worker_count, log):

def write_worker_vars(*, provider, configuration, cluster_id, worker, worker_count, log):
flavor_dict = provider.create_flavor_dict(flavor=worker["type"])
name = create.WORKER_IDENTIFIER(cluster_id=cluster_id,
name = WORKER_IDENTIFIER(cluster_id=cluster_id,
additional=f"[{worker_count}-{worker_count + worker.get('count', 1) - 1}]")
group_name = name.replace("[", "").replace("]", "").replace(":", "_").replace("-", "_")
regexp = create.WORKER_IDENTIFIER(cluster_id=cluster_id, additional=r"\d+")
regexp = WORKER_IDENTIFIER(cluster_id=cluster_id, additional=r"\d+")
worker_dict = {"name": name, "regexp": regexp, "image": worker["image"],
"network": configuration["network"], "flavor": flavor_dict,
"gateway_ip": configuration["private_v4"],
Expand Down Expand Up @@ -110,11 +110,11 @@ def write_worker_vars(*, provider, configuration, cluster_id, worker, worker_cou


def write_vpn_var(*, provider, configuration, cluster_id, vpngtw, vpn_count, log):
name = create.VPN_WORKER_IDENTIFIER(cluster_id=cluster_id, additional=f"{vpn_count}")
name = VPNGTW_IDENTIFIER(cluster_id=cluster_id, additional=f"{vpn_count}")
wireguard_ip = f"10.0.0.{vpn_count + 2}" # skipping 0 and 1 (master)
vpn_count += 1
flavor_dict = provider.create_flavor_dict(flavor=vpngtw["type"])
regexp = create.WORKER_IDENTIFIER(cluster_id=cluster_id, additional=r"\d+")
regexp = WORKER_IDENTIFIER(cluster_id=cluster_id, additional=r"\d+")
vpngtw_dict = {"name": name, "regexp": regexp, "image": vpngtw["image"],
"network": configuration["network"], "network_cidrs": configuration["subnet_cidrs"],
"floating_ip": configuration["floating_ip"], "private_v4": configuration["private_v4"],
Expand All @@ -130,7 +130,7 @@ def write_vpn_var(*, provider, configuration, cluster_id, vpngtw, vpn_count, log

def write_master_var(provider, configuration, cluster_id, log):
master = configuration["masterInstance"]
name = create.MASTER_IDENTIFIER(cluster_id=cluster_id)
name = MASTER_IDENTIFIER(cluster_id=cluster_id)
flavor_dict = provider.create_flavor_dict(flavor=master["type"])
master_dict = {"name": name, "image": master["image"], "network": configuration["network"],
"network_cidrs": configuration["subnet_cidrs"], "floating_ip": configuration["floating_ip"],
Expand Down Expand Up @@ -248,7 +248,7 @@ def generate_ansible_hosts_yaml(ssh_user, configurations, cluster_id, log): # p
@return: ansible_hosts yaml (dict)
"""
log.info("Generating ansible hosts file...")
master_name = create.MASTER_IDENTIFIER(cluster_id=cluster_id)
master_name = MASTER_IDENTIFIER(cluster_id=cluster_id)
ansible_hosts_yaml = {"vpn": {"hosts": {},
"children": {"master": {"hosts": {master_name: to_instance_host_dict(ssh_user)}},
"vpngtw": {"hosts": {}}}}, "workers": {"hosts": {}, "children": {}}}
Expand All @@ -259,7 +259,7 @@ def generate_ansible_hosts_yaml(ssh_user, configurations, cluster_id, log): # p
vpngtw_count = 0
for configuration in configurations:
for worker in configuration.get("workerInstances", []):
name = create.WORKER_IDENTIFIER(cluster_id=cluster_id,
name = WORKER_IDENTIFIER(cluster_id=cluster_id,
additional=f"[{worker_count}:{worker_count + worker.get('count', 1) - 1}]")
worker_dict = to_instance_host_dict(ssh_user, ip="")
group_name = name.replace("[", "").replace("]", "").replace(":", "_").replace("-", "_")
Expand All @@ -269,7 +269,7 @@ def generate_ansible_hosts_yaml(ssh_user, configurations, cluster_id, log): # p
worker_count += worker.get('count', 1)

if configuration.get("vpnInstance"):
name = create.VPN_WORKER_IDENTIFIER(cluster_id=cluster_id, additional=vpngtw_count)
name = VPNGTW_IDENTIFIER(cluster_id=cluster_id, additional=vpngtw_count)
vpngtw_dict = to_instance_host_dict(ssh_user, ip="")
vpngtw_dict["ansible_host"] = configuration["floating_ip"]
vpngtws[name] = vpngtw_dict
Expand Down
5 changes: 3 additions & 2 deletions bibigrid/core/utility/handler/cluster_ssh_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import os

from bibigrid.core.actions import create, list_clusters
from bibigrid.core.actions import list_clusters
from bibigrid.core.utility.statics.create_statics import KEY_FOLDER, KEY_NAME


def get_ssh_connection_info(cluster_id, master_provider, master_configuration, log):
Expand Down Expand Up @@ -34,7 +35,7 @@ def get_ssh_connection_info(cluster_id, master_provider, master_configuration, l
used_private_key = private_key
break
if not used_private_key:
private_key = os.path.join(create.KEY_FOLDER, create.KEY_NAME.format(cluster_id=cluster_id))
private_key = os.path.join(KEY_FOLDER, KEY_NAME.format(cluster_id=cluster_id))
if os.path.isfile(private_key):
used_private_key = private_key
return master_ip, ssh_user, used_private_key
8 changes: 4 additions & 4 deletions bibigrid/core/utility/id_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import shortuuid

from bibigrid.core.actions import create
from bibigrid.core.utility.statics.create_statics import MASTER_IDENTIFIER, VPNGTW_IDENTIFIER, WORKER_IDENTIFIER

MAX_ID_LENGTH = 15
CLUSTER_UUID_ALPHABET = '0123456789abcdefghijkmnopqrstuvwxyz'
Expand Down Expand Up @@ -43,9 +43,9 @@ def is_unique_cluster_id(cluster_id, providers):
"""
for provider in providers:
for server in provider.list_servers():
master = create.MASTER_IDENTIFIER(cluster_id=cluster_id)
vpngtw = create.VPN_WORKER_IDENTIFIER(cluster_id=cluster_id)
worker = create.WORKER_IDENTIFIER(cluster_id=cluster_id)
master = MASTER_IDENTIFIER(cluster_id=cluster_id)
vpngtw = VPNGTW_IDENTIFIER(cluster_id=cluster_id)
worker = WORKER_IDENTIFIER(cluster_id=cluster_id)
if server["name"] in [master, vpngtw, worker]:
return False
return True
Expand Down
Empty file.
Loading

0 comments on commit 44e3dc2

Please sign in to comment.