-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcrossref.py
123 lines (92 loc) · 4.14 KB
/
crossref.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import argparse
import datetime
import json
import os
import time
import boto3
import requests
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from common import S3_BUCKET, LOGGER
CROSSREF_API_KEY = os.getenv('CROSSREF_API_KEY')
"""
Run with: heroku local:run python crossref.py new
Download monthly snapshot to ec2: curl -H 'crossref-api-key: mykey' -H 'User-Agent: Downloader/1.1 (mailto:[email protected])' -v -L -o all.json.tar.gz -X GET https://api.crossref.org/snapshots/monthly/latest/all.json.tar.gz
"""
@retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, min=4, max=10),
retry=retry_if_exception_type(requests.exceptions.RequestException))
def make_request_with_retry(url, headers):
response = requests.get(url, headers=headers)
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 60))
LOGGER.warning(f"Rate limit exceeded (429). Retrying after {retry_after} seconds.")
time.sleep(retry_after)
response.raise_for_status()
elif response.status_code >= 500:
LOGGER.error(f"Server error {response.status_code} for URL {url}. Retrying...")
response.raise_for_status()
return response
def get_crossref_data(filter_params, s3_bucket, s3_prefix):
base_url = 'https://api.crossref.org/works'
headers = {
"Accept": "application/json",
"User-Agent": "mailto:[email protected]",
"crossref-api-key": CROSSREF_API_KEY
}
per_page = 500
cursor = '*'
page_number = 1
has_more_pages = True
url_template = f"{base_url}?filter={{filter}}&rows={{rows}}&cursor={{cursor}}"
while has_more_pages:
url = url_template.format(
filter=filter_params,
rows=per_page,
cursor=cursor
)
response = make_request_with_retry(url, headers)
LOGGER.info(f"Requesting page {page_number} from URL {url}.")
data = response.json()
items = data['message']['items']
if items:
current_timestamp = datetime.datetime.now().isoformat()
s3_key = f'{s3_prefix}/works_page_{page_number}_{current_timestamp}.json'
save_to_s3(items, s3_bucket, s3_key)
else:
LOGGER.info(f"No more items to fetch on page {page_number}. Ending pagination.")
has_more_pages = False
if 'next-cursor' not in data['message']:
LOGGER.info("No next cursor found, pagination complete.")
has_more_pages = False
cursor = data['message']['next-cursor']
page_number += 1
time.sleep(.5)
def save_to_s3(json_data, s3_bucket, s3_key):
LOGGER.info(f"Saving crossref works to S3 bucket {s3_bucket} with key {s3_key}.")
s3 = boto3.client('s3')
data_to_save = {'items': json_data} # same format as crossref snapshot
s3.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=json.dumps(data_to_save, indent=2),
ContentType='application/json; charset=utf-8'
)
def main():
parser = argparse.ArgumentParser(description='Pull Crossref data (new works or updates).')
parser.add_argument('mode', choices=['new', 'updates_two_days_ago', 'updates'], help='Specify whether to pull new works or updates.')
args = parser.parse_args()
now = datetime.datetime.now(datetime.timezone.utc)
today_str = now.strftime('%Y-%m-%d')
yesterday = now - datetime.timedelta(days=1)
yesterday_str = yesterday.strftime('%Y-%m-%d')
two_days_ago = now - datetime.timedelta(days=2)
two_days_ago_str = two_days_ago.strftime('%Y-%m-%d')
if args.mode == 'new':
filter_params = f'from-created-date:{yesterday_str},until-created-date:{today_str}'
s3_prefix = f'crossref/new-works/{now.strftime("%Y/%m/%d/%H")}'
get_crossref_data(filter_params, S3_BUCKET, s3_prefix)
elif args.mode == 'updates':
filter_params = f'from-index-date:{two_days_ago_str},until-index-date:{yesterday_str}'
s3_prefix = f'crossref/updates/{yesterday.strftime("%Y/%m/%d")}'
get_crossref_data(filter_params, S3_BUCKET, s3_prefix)
if __name__ == '__main__':
main()