-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathrun_pipeline.py
97 lines (71 loc) · 3.77 KB
/
run_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""
Description:
Script to run pipeline.
Usage:
Batch upload and invoke Lambda (default)
python tools/run_pipeline.py --destination_bucket <DESTINATION_BUCKET> --input_mode batch --batch_datasets_file sample-batch-datasets.json
Single file upload.
python run_pipeline.py --destination_bucket <DESTINATION_BUCKET> --input_mode file --file_or_url <LOCAL_OR_REMOTE_CSV_PATH>
"""
import argparse
import json
import logging
from pathlib import Path
import boto3
import pandas as pd
import yaml
from utils import configure_logging
_ = configure_logging("run-pipeline")
with open("config.yaml") as file:
config = yaml.load(file, Loader=yaml.SafeLoader)
resources_name_prefix = config["resources_name_prefix"]
def invoke(batch_prefix: str = "data/csv/input/batch", region: str = "us-east-1"):
lambda_client = boto3.client("lambda", region_name=region)
lambda_input = {"batch_prefix": batch_prefix}
lambda_fn_name = [f for f in lambda_client.list_functions()["Functions"] if f"{resources_name_prefix}-invoke-step-functions" in f["FunctionName"]][0]["FunctionName"]
try:
response = lambda_client.invoke(
FunctionName=lambda_fn_name,
InvocationType="RequestResponse",
LogType="None",
Payload=json.dumps(lambda_input),
)
HTTPStatusCode = response["ResponseMetadata"]["HTTPStatusCode"]
if HTTPStatusCode == 200:
logging.info(f"\nSuccess! Triggered Lambda {lambda_fn_name} to process batch files from {batch_prefix}\n")
else:
logging.info(f"\nIssue triggering Lambda function {lambda_fn_name}. Status code {HTTPStatusCode}\n")
except Exception as e:
logging.error(e)
def _run_batch_pipeline(batch_datasets_file: str, destination_bucket: str, destination_prefix: str, region: str, max_rows: int):
with Path(batch_datasets_file).open("r") as f:
sample_datasets = json.load(f)
for name, url in sample_datasets.items():
df = pd.read_csv(url, nrows=max_rows)
fn = f"s3://{destination_bucket}/{destination_prefix}/{name}.csv"
df.to_csv(fn, index=False)
logging.info(f"Uploaded {name} to {fn}")
invoke(destination_prefix, region)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Upload CSV file(s) to S3 and invoke pipeline.")
parser.add_argument("--destination_bucket", type=str, required=True)
parser.add_argument("--destination_prefix", type=str, default="data/csv/input")
parser.add_argument("--region", type=str, default="us-east-1", help="AWS service region.")
parser.add_argument("--input_mode", type=str, choices=["file", "batch"], default="batch")
parser.add_argument("--batch_datasets_file", "-b", type=str, default="sample-batch-datasets.json", help="JSON file containing URLs of sample datasets.")
parser.add_argument("--file_or_url", "-f", type=str, default=None, help="Local path or remote URL to CSV file to upload.")
parser.add_argument("--max_rows", default=2**14, type=int, help="Maximum number of rows to download.")
args = parser.parse_args()
destination_prefix = f"{args.destination_prefix}/{args.input_mode}"
destination_bucket = args.destination_bucket
region = args.region
max_rows = args.max_rows
if args.input_mode == "batch":
logging.info("Batch CSV mode.")
_run_batch_pipeline(args.batch_datasets_file, destination_bucket, destination_prefix, region, max_rows)
if args.input_mode == "file" and args.file_or_url:
logging.info("Single CSV mode.")
df = pd.read_csv(args.file_or_url, nrows=max_rows)
fn = f"s3://{destination_bucket}/{destination_prefix}/{Path(args.file_or_url).stem}"
df.to_csv(fn, index=False)
logging.info(f"Uploaded {args.file_or_url} to {fn}")