diff --git a/internetarchive/iarequest.py b/internetarchive/iarequest.py index 878e6dd2..9af194c4 100644 --- a/internetarchive/iarequest.py +++ b/internetarchive/iarequest.py @@ -20,7 +20,7 @@ internetarchive.iarequest ~~~~~~~~~~~~~~~~~~~~~~~~~ -:copyright: (C) 2012-2024 by Internet Archive. +:copyright: (C) 2012-2025 by Internet Archive. :license: AGPL 3, see LICENSE for more details. """ import copy @@ -198,21 +198,30 @@ def prepare(self, method=None, url=None, headers=None, files=None, data=None, headers['X-Accept-Reduced-Priority'] = '1' # Now run full parent preparation - super().prepare(method=method, url=url, headers=headers, files=files, - data=data, params=params, auth=auth, cookies=cookies, - hooks=hooks) + super().prepare( + method=method, + url=url, + headers=headers, + files=files, + data=data, + params=params, + auth=auth, + cookies=cookies, + hooks=hooks, + ) # Now add our custom handling - self.identifier = self.url.split("?")[0].split("/")[-1] - if reduced_priority: - headers = headers.copy() if headers else {} - headers['X-Accept-Reduced-Priority'] = '1' - self.prepare_headers(headers) - self.prepare_cookies(cookies) - - # Custom body preparation - self._prepare_request_body(metadata, source_metadata, target, priority, append, - append_list, insert, expect) + self.identifier = self.url.split('?')[0].split('/')[-1] + self._prepare_request_body( + metadata, + source_metadata, + target, + priority, + append, + append_list, + insert, + expect, + ) self.prepare_auth(auth, url) # Note that prepare_auth must be last to enable authentication schemes # such as OAuth to work on a fully prepared request. @@ -227,21 +236,37 @@ def _prepare_request_body(self, metadata, source_metadata, target, priority, source_metadata = r.json() if self._is_multi_target(metadata): - changes = self._prepare_multi_target_changes(metadata, source_metadata, - target, append, expect, - append_list, insert) + changes = self._prepare_multi_target_changes( + metadata, + source_metadata, + target, + append, + expect, + append_list, + insert, + ) self.data = {'-changes': json.dumps(changes), 'priority': priority or -5} else: - self._prepare_single_target_body(metadata, source_metadata, target, append, - append_list, insert, expect, priority) + self._prepare_single_target_body( + metadata, + source_metadata, + target, + append, + append_list, + insert, + expect, + priority, + ) logger.debug(f'submitting metadata request: {self.data}') super().prepare_body(self.data, None) def _is_multi_target(self, metadata): - return (isinstance(metadata, list) or - any('/' in k for k in metadata) or - all(isinstance(v, dict) for v in metadata.values())) + return ( + isinstance(metadata, list) + or any('/' in k for k in metadata) + or all(isinstance(v, dict) for v in metadata.values()) + ) def _prepare_multi_target_changes(self, metadata, source_metadata, target, append, expect, append_list, insert): @@ -249,8 +274,15 @@ def _prepare_multi_target_changes(self, metadata, source_metadata, target, if target: metadata = {target: metadata} for key in metadata: - patch = self._get_patch_for_target(key, metadata[key], source_metadata, - append, expect, append_list, insert) + patch = self._get_patch_for_target( + key, + metadata[key], + source_metadata, + append, + expect, + append_list, + insert, + ) changes.append({'target': key, 'patch': patch}) return changes @@ -259,17 +291,40 @@ def _prepare_single_target_body(self, metadata, source_metadata, target, append, target = target or 'metadata' if target == 'metadata': try: - patch = prepare_patch(metadata, source_metadata['metadata'], append, - expect, append_list, insert) + patch = prepare_patch( + metadata, + source_metadata['metadata'], + append, + expect, + append_list, + insert, + ) except KeyError: - raise ItemLocateError(f"{self.identifier} cannot be located " - "because it is dark or does not exist.") + raise ItemLocateError( + f'{self.identifier} cannot be located ' + 'because it is dark or does not exist.' + ) elif target.startswith('files/'): - patch = prepare_files_patch(metadata, source_metadata['files'], - target, append, append_list, insert, expect) + patch = prepare_files_patch( + metadata, + source_metadata['files'], + target, + append, + append_list, + insert, + expect, + ) else: - patch = prepare_target_patch({target: metadata}, source_metadata, append, - target, append_list, target, insert, expect) + patch = prepare_target_patch( + {target: metadata}, + source_metadata, + append, + target, + append_list, + target, + insert, + expect, + ) self.data = { '-patch': json.dumps(patch), '-target': target, @@ -285,8 +340,13 @@ def prepare_patch(metadata, source_metadata, append, expect=None, if not destination: destination = [] else: - prepared_metadata = prepare_metadata(metadata, source_metadata, append, - append_list, insert) + prepared_metadata = prepare_metadata( + metadata, + source_metadata, + append, + append_list, + insert, + ) if isinstance(destination, dict): destination.update(prepared_metadata) elif isinstance(metadata, list): @@ -342,7 +402,14 @@ def prepare_files_patch(metadata, files_metadata, target, append, filename = target.split('/')[1] for file_meta in files_metadata: if file_meta.get('name') == filename: - return prepare_patch(metadata, file_meta, append, expect, append_list, insert) + return prepare_patch( + metadata, + file_meta, + append, + expect, + append_list, + insert, + ) return [] @@ -350,49 +417,75 @@ def prepare_metadata(metadata, source_metadata=None, append=False, append_list=False, insert=False): source = copy.deepcopy(source_metadata) if source_metadata else {} prepared = {} - indexed_keys = _process_indexed_keys(metadata, source) - - for key in metadata: - if ( - isinstance(metadata[key], (int, float, complex)) - and not isinstance(metadata[key], bool) - ): - metadata[key] = str(metadata[key]) - current_key = _get_base_key(key) - if _is_indexed_key(key) and not insert: - idx = _get_index(key) - try: - prepared[current_key][idx] = metadata[key] - except IndexError: - prepared[current_key].append(metadata[key]) - elif append_list and source.get(current_key): - _append_to_list(prepared, current_key, source[current_key], metadata[key]) + + indexed_keys = _process_indexed_keys(metadata, source, prepared) + _process_non_indexed_keys(metadata, source, prepared, append, append_list, insert) + _cleanup_indexed_keys(prepared, indexed_keys, metadata) + + return prepared + + +def _process_non_indexed_keys(metadata, source, prepared, append, append_list, insert): + for key, value in metadata.items(): + current_key = key + + if isinstance(value, (int, float, complex)) and not isinstance(value, bool): + value = str(value) + + if append_list and source.get(current_key): + existing = source[current_key] + if not isinstance(existing, list): + existing = [existing] + prepared[current_key] = existing + [value] elif append and source.get(current_key): - prepared[current_key] = f"{source[current_key]} {metadata[key]}" + prepared[current_key] = f'{source[current_key]} {value}' elif insert and source.get(current_key): - _insert_value(prepared, key, source[current_key], metadata[key]) + existing = source[current_key] + if not isinstance(existing, list): + existing = [existing] + existing.insert(0, value) + prepared[current_key] = [v for v in existing if v] else: - prepared[key] = metadata[key] + prepared[current_key] = value - _clean_indexed_keys(prepared, indexed_keys, metadata) - return prepared +def _cleanup_indexed_keys(prepared, indexed_keys, metadata): + for base in indexed_keys: + if base in prepared: + prepared[base] = [v for v in prepared[base] if v is not None] + indexes = [ + i for i, k in enumerate(metadata) + if _get_base_key(k) == base and metadata[k] == 'REMOVE_TAG' + ] + for i in reversed(indexes): + if i < len(prepared[base]): + del prepared[base][i] + + +def _process_indexed_keys(metadata, source, prepared): + indexed_keys = {} + for key in list(metadata.keys()): + if _is_indexed_key(key): + base = _get_base_key(key) + idx = _get_index(key) + + if base not in indexed_keys: + source_list = source.get(base, []) + if not isinstance(source_list, list): + source_list = [source_list] + indexed_keys[base] = len(source_list) + + current_metadata_length = len(metadata) + prepared[base] = source_list + [None] * ( + current_metadata_length - len(source_list) + ) + + while len(prepared[base]) <= idx: + prepared[base].append(None) -def _process_indexed_keys(metadata, source_metadata): - indexed = {} - for key in metadata: - base = _get_base_key(key) - if not _is_indexed_key(key): - continue - count = sum(1 for k in metadata if _get_base_key(k) == base) - indexed[base] = count + len(source_metadata.get(base, [])) - for base, total in indexed.items(): - prepared_list = source_metadata.get(base, []) - if not isinstance(prepared_list, list): - prepared_list = [prepared_list] - prepared_list += [None] * (total - len(prepared_list)) - indexed[base] = prepared_list - return indexed + prepared[base][idx] = metadata[key] + del metadata[key] + return indexed_keys def _get_base_key(key): @@ -406,30 +499,3 @@ def _is_indexed_key(key): def _get_index(key): match = re.search(r'(?<=\[)\d+(?=\])', key) return int(match.group()) if match else None - - -def _append_to_list(prepared, key, existing, new_value): - existing = existing if isinstance(existing, list) else [existing] - new_values = [new_value] if not isinstance(new_value, list) else new_value - for val in new_values: - if val not in existing: - existing.append(val) - prepared[key] = existing - - -def _insert_value(prepared, key, existing, new_value): - base = _get_base_key(key) - idx = _get_index(key) or 0 - existing = existing if isinstance(existing, list) else [existing] - existing.insert(idx, new_value) - prepared[base] = [v for v in existing if v] - - -def _clean_indexed_keys(prepared, indexed_keys, metadata): - for base in indexed_keys: - prepared[base] = [v for v in prepared[base] if v is not None] - indexes = [i for i, k in enumerate(metadata) - if _get_base_key(k) == base and metadata[k] == 'REMOVE_TAG'] - for i in reversed(indexes): - if i < len(prepared[base]): - del prepared[base][i]