Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced Artemis Table Conversion Script for Readability, Performance, and Scalability #142

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 43 additions & 66 deletions convert_table/src/convert.py
Original file line number Diff line number Diff line change
@@ -1,102 +1,79 @@
"""Converts the different artemis table into a single TSV file"""
import csv

from rich import print
from utils import get_item_info
from utils import get_metatable
from utils import get_root_dir
from utils import load_data
from utils import print_item_to_table
from pathlib import Path
from utils import (
get_item_info,
get_metatable,
get_root_dir,
load_data,
print_item_to_table,
)


def main():

"""
Converts the different Artemis tables into a single TSV file.
"""
sep = "\t"

pattern = "artemis-"

header = ["item", "field", "question", "options", "instructions"]

output_file = get_root_dir().joinpath("outputs", "artemis.tsv")
output_file = Path(get_root_dir(), "outputs", "artemis.tsv")
output_file.parent.mkdir(parents=True, exist_ok=True)

# Initialize TSV file writer
with open(output_file, "w", newline="") as tsv_file:

writer = csv.DictWriter(tsv_file, fieldnames=header, delimiter="\t")

# Write initial metadata description
writer.writerow(
{
"item": "ARTEM-IS (Agreed Reporting Template for EEG Methodology - International Standard) template for ERP"
}
{"item": "ARTEM-IS (Agreed Reporting Template for EEG Methodology - International Standard) template for ERP"}
)
writer.writeheader()

df = get_metatable()

tables_to_convert = df[df["schema"].str.match(f"(^{pattern}.*)") == True]
tables_name = list(tables_to_convert["basename"])
tables_to_convert = list(tables_to_convert["schema"])

# Loops through each table and adds its content to the main table
for j, this_table in enumerate(tables_to_convert):

df = load_data(this_table)
# Load metatable schema and filter relevant tables
metatable = get_metatable()
tables_to_convert = metatable[metatable["schema"].str.contains(pattern)]
table_names = tables_to_convert["basename"].tolist()
table_schemas = tables_to_convert["schema"].tolist()

# get the different tables in the right order
activities = list(df.activity_order.unique())
# Loop through the tables and process each table's contents
for schema, table_name in zip(table_schemas, table_names):
data_frame = load_data(schema)

for i, activity_idx in enumerate(activities):
# Identify unique activities and process them
activities = data_frame["activity_order"].dropna().unique()
for activity_idx in activities:
activity_id = f"{activity_idx} - {table_name.upper()}"
print(f"[bold red]{activity_id}[/bold red]")
writer.writerow({"item": activity_id})

this_id = f"{str(activity_idx)} - {tables_name[j].upper()}"

print(f"[bold red]{this_id}[/bold red]")
writer.writerow({"item": this_id})

this_activity = df["activity_order"] == activities[i]

items = df[this_activity]
included_items = items["include"] == 1
items = items[included_items]

items_order = items.item_order.unique()
activity_items = data_frame[data_frame["activity_order"] == activity_idx]
included_items = activity_items[activity_items["include"] == 1]
item_orders = included_items["item_order"].unique()

sub_section = ""
sub_section_id = 0

for item_idx in items_order:

this_item = items[items["item_order"] == item_idx]
# Process each item order within the activity
for item_order in item_orders:
this_item = included_items[included_items["item_order"] == item_order]
item_info = get_item_info(this_item)

if item_info["sub_section"] not in ["", sub_section]:
# Handle sub-section changes
if item_info["sub_section"] and item_info["sub_section"] != sub_section:
sub_section_id += 1
item_id = 0
sub_section = item_info["sub_section"]
this_id = (
str(activity_idx)
+ "."
+ str(sub_section_id)
+ " - "
+ sub_section.upper()
)
writer.writerow({"item": this_id})
subsection_id = f"{activity_idx}.{sub_section_id} - {sub_section.upper()}"
writer.writerow({"item": subsection_id})

# Increment item count and write the item to file
item_id += 1
this_id = (
str(activity_idx)
+ "."
+ str(sub_section_id)
+ "."
+ str(item_id)
)

dict_to_print = print_item_to_table(
this_id, this_item, item_info, sep
)

writer.writerow(dict_to_print)
item_id_str = f"{activity_idx}.{sub_section_id}.{item_id}"
item_row = print_item_to_table(item_id_str, this_item, item_info, sep)
writer.writerow(item_row)


if __name__ == "__main__":

main()