-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdatacite.py
152 lines (125 loc) · 5.13 KB
/
datacite.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import argparse
from datetime import datetime, timedelta
import gzip
from io import BytesIO
import json
import logging
import boto3
import requests
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger("datacite_harvester")
BASE_URL = "https://api.datacite.org/dois"
BATCH_SIZE = 1000
S3_BUCKET = "openalex-ingest"
s3_client = boto3.client("s3")
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(requests.exceptions.RequestException),
)
def fetch_page(cursor_url):
"""
Fetch a single page of results using the given cursor URL.
Retries on network errors.
"""
LOGGER.info(f"Fetching page: {cursor_url}")
response = requests.get(cursor_url)
response.raise_for_status()
return response.json()
def compress_json(data):
"""
Compress JSON data using Gzip.
Returns a BytesIO object containing the compressed data.
"""
compressed_buffer = BytesIO()
with gzip.GzipFile(fileobj=compressed_buffer, mode="w") as gz_file:
gz_file.write(json.dumps(data).encode("utf-8"))
compressed_buffer.seek(0)
return compressed_buffer
def upload_to_s3(date_path, batch_number, works):
"""
Upload a batch of works to S3 in a date-based directory.
"""
timestamp = int(datetime.now().timestamp())
object_key = f"datacite/works/{date_path}/batch_{batch_number}_{timestamp}.json.gz"
compressed_data = compress_json(works)
s3_client.put_object(
Bucket=S3_BUCKET,
Key=object_key,
Body=compressed_data,
ContentType="application/gzip",
)
LOGGER.info(f"Uploaded batch {batch_number} to S3: {object_key}")
def get_date_path_from_batch(works):
"""
Get the date-based directory path from the first work in the batch.
"""
if not works:
return "unknown_date"
first_work = works[0]
updated_str = first_work["attributes"].get("updated")
if updated_str:
updated_dt = datetime.fromisoformat(updated_str.replace("Z", "+00:00"))
return updated_dt.strftime("%Y/%m/%d") # Format: YYYY/MM/DD
return "unknown_date"
def harvest_datacite_works(from_date, to_date):
"""
Harvest works from the DataCite API using cursor pagination and upload to S3 in date-based directories.
"""
LOGGER.info(f"Starting harvest from {from_date} to {to_date}")
# Start with initial cursor
cursor_url = (
f"{BASE_URL}?page[cursor]=1&page[size]={BATCH_SIZE}&query=updated:[{from_date}T00:00:00Z TO {to_date}T23:59:59Z]&sort=updated&affiliation=true&publisher=true"
)
total_records = 0
batch_number = 0
works_batch = []
while cursor_url:
data = fetch_page(cursor_url)
works = data.get("data", [])
total_records += len(works)
works_batch.extend(works)
if len(works_batch) >= BATCH_SIZE:
batch_number += 1
date_path = get_date_path_from_batch(works_batch) # Use the first record's date
upload_to_s3(date_path, batch_number, works_batch)
works_batch = []
# update cursor URL to the next page
links = data.get("links", {})
cursor_url = links.get("next")
if not cursor_url:
LOGGER.info("No more pages to fetch.")
# upload any remaining records
if works_batch:
batch_number += 1
date_path = get_date_path_from_batch(works_batch) # Use the first record's date
upload_to_s3(date_path, batch_number, works_batch)
LOGGER.info(f"Harvest complete. Total records fetched: {total_records}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Harvest DataCite works and upload to S3.")
parser.add_argument("--from-date", help="Start date (YYYY-MM-DD).")
parser.add_argument("--to-date", help="End date (YYYY-MM-DD).")
parser.add_argument("--yesterday", action="store_true", help="Fetch records from the last 24 hours.")
parser.add_argument("--backfill", action="store_true", help="Backfill records.")
args = parser.parse_args()
if args.yesterday:
now = datetime.utcnow()
previous_day = now - timedelta(days=1)
from_date = previous_day.strftime("%Y-%m-%d")
to_date = previous_day.strftime("%Y-%m-%d")
LOGGER.info(f"--today argument provided. Fetching updates for {from_date} (full day).")
elif args.backfill:
# go from 2024-11-01 to 2024-11-18, setting the from and to date to the same date and iterating through the days
from_date = "2024-11-01"
to_date = "2024-11-26"
# for each date, call the harvest_datacite_works function
while from_date <= to_date:
harvest_datacite_works(from_date, from_date)
from_date = (datetime.strptime(from_date, "%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d")
elif args.from_date and args.to_date:
from_date = args.from_date
to_date = args.to_date
else:
parser.error("You must specify either --yesterday or both --from-date and --to-date.")
harvest_datacite_works(from_date, to_date)