-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathror_to_elastic.py
151 lines (118 loc) · 4.45 KB
/
ror_to_elastic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import os
import logging
from datetime import datetime
from elasticsearch import Elasticsearch, NotFoundError
import pandas as pd
ELASTIC_URL = os.getenv("ELASTIC_URL")
es = Elasticsearch([ELASTIC_URL])
logging.basicConfig(level=logging.INFO)
def get_current_ror_file():
logging.info("Reading current ROR file")
path = "s3://openalex-ingest/ror/current/ror_snapshot.parquet"
df = pd.read_parquet(path)
return df
def format_names(name_records):
names = []
primary = None
for record in name_records:
name_value = record['value']
if 'ror_display' in record['types']:
primary = name_value
if 'acronym' not in record['types']:
names.append({"name": [name_value]})
return names, primary
def extract_relationships(rel_records):
if pd.isna(rel_records).any() or len(rel_records) == 0:
return [], []
ids = []
types = []
for record in rel_records:
ids.append(record['id'])
types.append(record['type'])
return ids, types
def get_country(location_str):
if pd.isna(location_str).any() or len(location_str) == 0:
return None
return location_str[0]['geonames_details']['country_code']
def get_dates(admin_dict):
created = admin_dict.get('created', {}).get('date')
updated = admin_dict.get('last_modified', {}).get('date')
return created, updated
def transform_record(row):
names, primary = format_names(row['names'])
rel_ids, rel_types = extract_relationships(row['relationships'])
created_date, updated_date = get_dates(row['admin'])
return {
"_id": row['id'],
"_index": "search-ror-institutions-v2",
"_score": 0,
"country": get_country(row['locations']),
"id": row['id'],
"names": names,
"primary": primary,
"relationships.id": rel_ids,
"relationships.type": rel_types,
"status": row['status'],
"created_date": created_date,
"updated_date": updated_date
}
def get_existing_record(client, record_id, index):
try:
response = client.get(index=index, id=record_id)
return response['_source']
except NotFoundError:
return None
def should_update_record(existing_record, new_record):
"""Determine if record should be updated"""
if not existing_record:
return True
try:
# Check if updated_date exists in both records
existing_date = existing_record.get('updated_date')
new_date = new_record.get('updated_date')
# If either date is missing, assume we should update
if not existing_date:
return True
# Convert date strings to datetime objects for comparison
existing_date = datetime.strptime(existing_date, '%Y-%m-%d')
new_date = datetime.strptime(new_date, '%Y-%m-%d')
return new_date > existing_date
except (ValueError, KeyError, TypeError) as e:
logging.warning(f"Error comparing dates, defaulting to update record: {str(e)}")
return True
def save_to_elasticsearch(client, record):
existing_record = get_existing_record(client, record['_id'], record['_index'])
if should_update_record(existing_record, record):
logging.info(f"Saving/updating record {record['_id']} to Elasticsearch")
response = client.index(
index=record['_index'],
id=record['_id'],
document={
"country": record['country'],
"id": record['id'],
"names": record['names'],
"primary": record['primary'],
"relationships.id": record['relationships.id'],
"relationships.type": record['relationships.type'],
"status": record['status'],
"created_date": record['created_date'],
"updated_date": record['updated_date']
}
)
return response
else:
logging.info(f"Skipping record {record['_id']} - no update needed")
return None
if __name__ == "__main__":
df = get_current_ror_file()
updated_count = 0
total_count = 0
for index, row in df.iterrows():
total_count += 1
record = transform_record(row)
response = save_to_elasticsearch(es, record)
logging.info(f"Count processed is {total_count}")
if response:
updated_count += 1
logging.info(f"Processed {total_count} records")
logging.info(f"Updated {updated_count} records in Elasticsearch")