-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsql_convert_csv.py
82 lines (66 loc) · 2.84 KB
/
sql_convert_csv.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import argparse
import sqlite3
import csv
import re
import os
def extract_table_names(sql_script):
# Regular expression to find CREATE TABLE statements
# Try One of these versions
# create_table_pattern = re.compile(r'CREATE TABLE IF NOT EXISTS\s+"?(\w+)"?', re.IGNORECASE)
create_table_pattern = re.compile(r'CREATE TABLE\s+"?(\w+)"?', re.IGNORECASE)
table_names = create_table_pattern.findall(sql_script)
return table_names
def convert_blob_to_csv_string(blob):
# Convert the binary data to a comma-separated string of values between 0 and 255
return ','.join(map(str, blob))
def convert_sql_to_csv(sql_file_path, csv_file_path):
# Establish a connection to an in-memory SQLite database
conn = sqlite3.connect(':memory:')
conn.text_factory = sqlite3.OptimizedUnicode
cursor = conn.cursor()
# Read the SQL file
with open(sql_file_path, 'r') as sql_file:
sql_script = sql_file.read()
# Execute the entire SQL script
cursor.executescript(sql_script)
# Extract table names
table_names = extract_table_names(sql_script)
if not table_names:
print("No tables found in the SQL script.")
return
for table_name in table_names:
# Retrieve the data from the table
cursor.execute(f'SELECT * FROM {table_name}')
rows = cursor.fetchall()
# Get column names
column_names = [description[0] for description in cursor.description]
# Write data to CSV
csv_file_table_path = f"{csv_file_path.rsplit('.', 1)[0]}_{table_name}.csv"
with open(csv_file_table_path, 'w', newline='') as csv_file:
csv_writer = csv.writer(csv_file)
# Write header
csv_writer.writerow(column_names)
# Write data rows
for row in rows:
row = list(row)
# Convert BLOB data in the 'data' column (assuming column name is 'data')
for i, col in enumerate(column_names):
if col == 'data' and isinstance(row[i], bytes):
row[i] = convert_blob_to_csv_string(row[i])
csv_writer.writerow(row)
print(f"Data from table '{table_name}' has been successfully written to {csv_file_table_path}")
# Close the database connection
conn.close()
def main():
parser = argparse.ArgumentParser(description='Convert SQL file to CSV')
parser.add_argument('--input', required=True, help='Path to the input SQL file')
parser.add_argument('--output', required=True, help='Base path for the output CSV file(s)')
args = parser.parse_args()
# Check if the SQL file exists
if not os.path.isfile(args.input):
print(f"Error: SQL file '{args.input}' does not exist.")
return
# Run the conversion
convert_sql_to_csv(args.input, args.output)
if __name__ == '__main__':
main()