-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathparse_xml.py
372 lines (304 loc) · 13.9 KB
/
parse_xml.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urljoin, urlparse
from zipfile import ZipFile
from lxml import etree, html
from pprint import pprint
from typing import BinaryIO, Dict, Optional, Set
from lxml.etree import _Element as Element, tostring
from zavod import Zavod, init_context
from followthemoney.proxy import EntityProxy
from followthemoney.util import join_text
from addressformatting import AddressFormatter
INN_URL = "https://egrul.itsoft.ru/%s.xml"
PREFIX = "https://egrul.itsoft.ru/EGRUL_406/01.01.2022_FULL/"
aformatter = AddressFormatter()
def tag_text(el: Element) -> str:
return tostring(el, encoding="utf-8").decode("utf-8")
def dput(data: Dict[str, Optional[str]], name: str, value: Optional[str]):
if value is None or not len(value.strip()):
return
dd = value.replace("-", "")
if not len(dd.strip()):
return
data[name] = value
def elattr(el: Optional[Element], attr: str):
if el is not None:
return el.get(attr)
def make_id(
context: Zavod, entity: EntityProxy, local_id: Optional[str] = None
) -> Optional[str]:
# FIXME: should we make INN slugs if the nationality is not Russian?
for inn in sorted(entity.get("innCode", quiet=True)):
return context.make_slug("inn", inn)
for ogrn in sorted(entity.get("ogrnCode", quiet=True)):
return context.make_slug("ogrn", ogrn)
if local_id is not None:
# If no INN is present, make a fake entity ID:
for name in sorted(entity.get("name")):
return context.make_id(local_id, name)
return None
def make_person(context: Zavod, el: Element, local_id: str) -> Optional[EntityProxy]:
name_el = el.find(".//СвФЛ")
entity = context.make("Person")
if name_el is None:
return None
last_name = name_el.get("Фамилия")
first_name = name_el.get("Имя")
patronymic = name_el.get("Отчество")
name = join_text(first_name, patronymic, last_name)
entity.add("name", name)
entity.add("firstName", first_name)
entity.add("fatherName", patronymic)
entity.add("lastName", last_name)
entity.add("innCode", name_el.get("ИННФЛ"))
entity.id = make_id(context, entity, local_id)
country = el.find("./СвГраждФЛ")
if country is not None:
if country.get("КодГражд") == "1":
entity.add("country", "ru")
entity.add("country", country.get("НаимСтран"))
return entity
def make_org(context: Zavod, el: Element, local_id: str) -> EntityProxy:
entity = context.make("Organization")
name_el = el.find("./НаимИННЮЛ")
if name_el is not None:
entity.add("name", name_el.get("НаимЮЛПолн"))
entity.add("innCode", name_el.get("ИНН"))
entity.add("ogrnCode", name_el.get("ОГРН"))
name_latin_el = el.find("./СвНаимЮЛПолнИн")
if name_latin_el is not None:
entity.add("name", name_latin_el.get("НаимПолн"))
foreign_reg_el = el.find("./СвРегИн")
if foreign_reg_el is not None:
entity.add("jurisdiction", foreign_reg_el.get("НаимСтран"))
entity.add("registrationNumber", foreign_reg_el.get("РегНомер"))
entity.add("publisher", foreign_reg_el.get("НаимРегОрг"))
entity.add("address", foreign_reg_el.get("АдрСтр"))
entity.id = make_id(context, entity, local_id)
return entity
def parse_founder(context: Zavod, company: EntityProxy, el: Element):
owner = context.make("LegalEntity")
ownership = context.make("Ownership")
meta = el.find("./ГРНДатаПерв")
local_id = company.id
if meta is not None:
ownership.add("startDate", meta.get("ДатаЗаписи"))
local_id = meta.get("ГРН") or local_id
ownership.add("role", el.tag)
if el.tag == "УчрФЛ": # Individual founder
owner_proxy = make_person(context, el, local_id)
if owner_proxy is not None:
owner = owner_proxy
elif el.tag == "УчрЮЛИн": # Foreign company
owner = make_org(context, el, local_id)
elif el.tag == "УчрЮЛРос": # Russian legal entity
# print(tag_text(el))
owner = make_org(context, el, local_id)
elif el.tag == "УчрПИФ": # Mutual investment fund
# TODO: nested ownership structure, make Security
# owner = context.make("Security")
# FIXME: Security cannot own.
fund_name_el = el.find("./СвНаимПИФ")
if fund_name_el is not None:
# owner.add("name", fund_name_el.get("НаимПИФ"))
ownership.add("summary", fund_name_el.get("НаимПИФ"))
manager_el = el.find("./СвУпрКомпПИФ/УпрКомпПиф")
if manager_el is not None:
owner.add("name", manager_el.get("НаимЮЛПолн"))
owner.add("innCode", manager_el.get("ИНН"))
owner.add("ogrnCode", manager_el.get("ОГРН"))
owner.id = make_id(context, owner, local_id)
elif el.tag == "УчрРФСубМО": # Russian public body
pb_name_el = el.find("./ВидНаимУчр")
if pb_name_el is not None:
# Name of the owning authority
pb_name = pb_name_el.get("НаимМО")
if pb_name is not None:
owner = context.make("Organization")
owner.add("name", pb_name)
owner.id = make_id(context, owner, local_id)
# ownership.add("role", pb_name_el.get("НаимМО"))
# managing body:
pb_el = el.find("./СвОргОсущПр")
if pb_el is not None:
owner = make_org(context, pb_el, local_id)
elif el.tag == "УчрДогИнвТов": # investment partnership agreement.
# FIXME: should the partnership be its own entity?
terms_el = el.find("./ИнПрДогИнвТов")
if terms_el is not None:
ownership.add("summary", terms_el.get("НаимДог"))
ownership.add("recordId", terms_el.get("НомерДог"))
ownership.add("date", terms_el.get("Дата"))
# managing vehicle
manager_el = el.find("./СвУпТовЮЛ")
if manager_el is not None:
owner.add("name", manager_el.get("НаимЮЛПолн"))
owner.add("innCode", manager_el.get("ИНН"))
owner.add("ogrnCode", manager_el.get("ОГРН"))
owner.id = make_id(context, owner, local_id)
else:
context.log.warn("Unknown owner type", tag=el.tag)
return
if owner.id is None:
context.log.warning("No ID for owner: %s" % company.id, el=tag_text(el), owner=owner.to_dict())
return
ownership.id = context.make_id(company.id, owner.id)
ownership.add("owner", owner)
ownership.add("asset", company)
share_el = el.find("./ДоляУстКап")
if share_el is not None:
ownership.add("sharesCount", share_el.get("НоминСтоим"))
percent_el = share_el.find("./РазмерДоли/Процент")
if percent_el is not None:
ownership.add("percentage", percent_el.text)
reliable_el = el.find("./СвНедДанУчр")
if reliable_el is not None:
ownership.add("summary", reliable_el.get("ТекстНедДанУчр"))
# pprint(owner.to_dict())
context.emit(owner)
# pprint(ownership.to_dict())
context.emit(ownership)
def parse_directorship(context: Zavod, company: EntityProxy, el: Element):
# TODO: can we use the ГРН as a fallback ID?
director = make_person(context, el, company.id)
if director is None:
# context.log.warn("Directorship has no person", company=company.id)
return
context.emit(director)
role = el.find("./СвДолжн")
if role is None:
context.log.warn("Directorship has no role", tag=tag_text(el))
return
directorship = context.make("Directorship")
directorship.id = context.make_id(company.id, director.id, role.get("ВидДолжн"))
directorship.add("role", role.get("НаимДолжн"))
directorship.add("summary", role.get("НаимВидДолжн"))
directorship.add("director", director)
directorship.add("organization", company)
date = el.find("./ГРНДатаПерв")
if date is not None:
directorship.add("startDate", date.get("ДатаЗаписи"))
context.emit(directorship)
def parse_address(context: Zavod, entity: EntityProxy, el: Element):
data: Dict[str, Optional[str]] = {}
country = "ru"
if el.tag == "АдресРФ": # normal address
# print(tag_text(el))
pass
elif el.tag == "СвМНЮЛ": # location of legal entity
# print(tag_text(el))
pass
elif el.tag == "СвАдрЮЛФИАС": # special structure?
# print(tag_text(el))
pass
elif el.tag == "СвНедАдресЮЛ": # missing address
# print(el.get("ТекстНедАдресЮЛ"))
return None # ignore this one entirely
elif el.tag == "СвРешИзмМН": # address change
# print(tag_text(el))
# print(el.get("ТекстРешИзмМН"))
pass
else:
context.log.warn("Unknown address type", tag=el.tag)
return
# FIXME: this is a complete mess
dput(data, "postcode", el.get("Индекс"))
dput(data, "postcode", el.get("ИдНом"))
dput(data, "house", el.get("Дом"))
dput(data, "house_number", el.get("Корпус"))
dput(data, "neighbourhood", el.get("Кварт"))
dput(data, "neighbourhood", el.get("Кварт"))
dput(data, "city", el.findtext("./НаимРегион"))
dput(data, "city", elattr(el.find("./Регион"), "НаимРегион"))
dput(data, "state", elattr(el.find("./Район"), "НаимРайон"))
dput(data, "town", elattr(el.find("./НаселПункт"), "НаимНаселПункт"))
dput(data, "municipality", elattr(el.find("./МуниципРайон"), "Наим"))
dput(data, "suburb", elattr(el.find("./НаселенПункт"), "Наим"))
dput(data, "road", elattr(el.find("./ЭлУлДорСети"), "Наим"))
dput(data, "road", elattr(el.find("./Улица"), "НаимУлица"))
dput(data, "house", elattr(el.find("./ПомещЗдания"), "Номер"))
address = aformatter.one_line(data, country=country)
entity.add("address", address)
def parse_company(context: Zavod, el: Element):
entity = context.make("Company")
entity.id = context.make_slug("inn", el.get("ИНН"))
entity.add("jurisdiction", "ru")
entity.add("ogrnCode", el.get("ОГРН"))
entity.add("innCode", el.get("ИНН"))
entity.add("kppCode", el.get("КПП"))
entity.add("legalForm", el.get("ПолнНаимОПФ"))
entity.add("incorporationDate", el.get("ДатаОГРН"))
email_el = el.find("./СвАдрЭлПочты")
if email_el is not None:
entity.add("email", email_el.get("E-mail"))
citizen_el = el.find("./СвГражд")
if citizen_el is not None:
entity.add("country", citizen_el.get("НаимСтран"))
for addr_el in el.findall("./СвАдресЮЛ/*"):
parse_address(context, entity, addr_el)
for name_el in el.findall("./СвНаимЮЛ"):
entity.add("name", name_el.get("НаимЮЛПолн"))
entity.add("name", name_el.get("НаимЮЛСокр"))
entity.id = make_id(context, entity)
# prokura or directors etc.
for director in el.findall("./СведДолжнФЛ"):
parse_directorship(context, entity, director)
for founder in el.findall("./СвУчредит/*"):
parse_founder(context, entity, founder)
# pprint(entity.to_dict())
context.emit(entity)
def parse_sole_trader(context: Zavod, el: Element):
entity = context.make("LegalEntity")
entity.add("country", "ru")
entity.add("ogrnCode", el.get("ОГРНИП"))
entity.add("innCode", el.get("ИННФЛ"))
entity.add("legalForm", el.get("НаимВидИП"))
entity.id = make_id(context, entity)
context.emit(entity)
def parse_xml(context: Zavod, handle: BinaryIO):
doc = etree.parse(handle)
for el in doc.findall(".//СвЮЛ"):
parse_company(context, el)
for el in doc.findall(".//СвИП"):
parse_sole_trader(context, el)
def parse_examples(context: Zavod):
for inn in ["7709383684", "7704667322", "9710075695"]:
path = context.fetch_resource("%s.xml" % inn, INN_URL % inn)
with open(path, "rb") as fh:
parse_xml(context, fh)
def crawl_index(context: Zavod, url: str) -> Set[str]:
archives: Set[str] = set()
res = context.http.get(url)
doc = html.fromstring(res.text)
for a in doc.findall(".//a"):
link_url = urljoin(url, a.get("href"))
if not link_url.startswith(url):
continue
if link_url.endswith(".zip"):
archives.add(link_url)
continue
archives.update(crawl_index(context, link_url))
return archives
def crawl_archive(context: Zavod, url: str):
url_path = urlparse(url).path.lstrip("/")
path = context.fetch_resource(url_path, url)
context.log.info("Parsing: %s" % url_path)
with ZipFile(path, "r") as zip:
for name in zip.namelist():
if not name.lower().endswith(".xml"):
continue
with zip.open(name, "r") as fh:
parse_xml(context, fh)
def crawl(context: Zavod):
# TODO: thread pool execution
for archive_url in sorted(crawl_index(context, PREFIX)):
crawl_archive(context, archive_url)
def crawl_parallel(context: Zavod):
with ThreadPoolExecutor() as executor:
for archive_url in crawl_index(context, PREFIX):
executor.submit(crawl_archive, context, archive_url)
if __name__ == "__main__":
with init_context("ru_egrul", "ru") as context:
crawl_parallel(context)
# crawl(context)
# parse_examples(context)