diff --git a/dulwich/client.py b/dulwich/client.py index 02ca79adb..71de0a973 100644 --- a/dulwich/client.py +++ b/dulwich/client.py @@ -122,6 +122,7 @@ write_pack_from_container, UnpackedObject, PackChunkGenerator, + PACK_SPOOL_FILE_MAX_SIZE, ) from dulwich.refs import ( read_info_refs, @@ -813,7 +814,9 @@ def fetch( determine_wants = target.object_store.determine_wants_all if CAPABILITY_THIN_PACK in self._fetch_capabilities: from tempfile import SpooledTemporaryFile - f: IO[bytes] = SpooledTemporaryFile() + f: IO[bytes] = SpooledTemporaryFile( + max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-', + dir=getattr(target.object_store, 'path', None)) def commit(): if f.tell(): diff --git a/dulwich/cloud/gcs.py b/dulwich/cloud/gcs.py index d6d1c562b..ed0a493ed 100644 --- a/dulwich/cloud/gcs.py +++ b/dulwich/cloud/gcs.py @@ -25,7 +25,7 @@ import tempfile from ..object_store import BucketBasedObjectStore -from ..pack import PackData, Pack, load_pack_index_file +from ..pack import PackData, Pack, load_pack_index_file, PACK_SPOOL_FILE_MAX_SIZE # TODO(jelmer): For performance, read ranges? @@ -58,14 +58,14 @@ def _iter_pack_names(self): def _load_pack_data(self, name): b = self.bucket.blob(posixpath.join(self.subpath, name + '.pack')) - f = tempfile.SpooledTemporaryFile() + f = tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE) b.download_to_file(f) f.seek(0) return PackData(name + '.pack', f) def _load_pack_index(self, name): b = self.bucket.blob(posixpath.join(self.subpath, name + '.idx')) - f = tempfile.SpooledTemporaryFile() + f = tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE) b.download_to_file(f) f.seek(0) return load_pack_index_file(name + '.idx', f) diff --git a/dulwich/object_store.py b/dulwich/object_store.py index a55f157de..ccef377c3 100644 --- a/dulwich/object_store.py +++ b/dulwich/object_store.py @@ -63,19 +63,18 @@ PackInflater, PackFileDisappeared, UnpackedObject, + extend_pack, load_pack_index_file, iter_sha1, full_unpacked_object, generate_unpacked_objects, pack_objects_to_data, - write_pack_header, - write_pack_index_v2, + write_pack_index, write_pack_data, - write_pack_object, - compute_file_sha, PackIndexer, PackStreamCopier, PackedObjectContainer, + PACK_SPOOL_FILE_MAX_SIZE, ) from dulwich.protocol import DEPTH_INFINITE from dulwich.refs import ANNOTATED_TAG_SUFFIX, Ref @@ -819,47 +818,14 @@ def _complete_thin_pack(self, f, path, copier, indexer, progress=None): entries = [] for i, entry in enumerate(indexer): if progress is not None: - progress(("generating index: %d\r" % i).encode('ascii')) + progress(("generating index: %d/%d\r" % (i, len(copier))).encode('ascii')) entries.append(entry) - ext_refs = indexer.ext_refs() - - if ext_refs: - # Update the header with the new number of objects. - f.seek(0) - write_pack_header(f.write, len(entries) + len(ext_refs)) - - # Must flush before reading (http://bugs.python.org/issue3207) - f.flush() - - # Rescan the rest of the pack, computing the SHA with the new header. - new_sha = compute_file_sha(f, end_ofs=-20) - - # Must reposition before writing (http://bugs.python.org/issue3207) - f.seek(0, os.SEEK_CUR) - - # Complete the pack. - for i, ext_sha in enumerate(ext_refs): - if progress is not None: - progress(("writing extra base objects: %d/%d\r" % (i, len(ext_refs))).encode("ascii")) - assert len(ext_sha) == 20 - type_num, data = self.get_raw(ext_sha) - offset = f.tell() - crc32 = write_pack_object( - f.write, - type_num, - data, - sha=new_sha, - compression_level=self.pack_compression_level, - ) - entries.append((ext_sha, offset, crc32)) - pack_sha = new_sha.digest() - f.write(pack_sha) - else: - f.seek(-20, os.SEEK_END) - pack_sha = f.read(20) + pack_sha, extra_entries = extend_pack( + f, indexer.ext_refs(), get_raw=self.get_raw, compression_level=self.pack_compression_level, + progress=progress) - f.close() + entries.extend(extra_entries) # Move the pack in. entries.sort() @@ -877,7 +843,7 @@ def _complete_thin_pack(self, f, path, copier, indexer, progress=None): # Write the index. index_file = GitFile(pack_base_name + ".idx", "wb", mask=PACK_MODE) try: - write_pack_index_v2(index_file, entries, pack_sha) + write_pack_index(index_file, entries, pack_sha) index_file.close() finally: index_file.abort() @@ -928,7 +894,7 @@ def move_in_pack(self, path): index_name = basename + ".idx" if not os.path.exists(index_name): with GitFile(index_name, "wb", mask=PACK_MODE) as f: - write_pack_index_v2(f, entries, p.get_stored_checksum()) + write_pack_index(f, entries, p.get_stored_checksum()) for pack in self.packs: if pack._basename == basename: return pack @@ -1076,46 +1042,23 @@ def add_pack(self): Returns: Fileobject to write to and a commit function to call when the pack is finished. """ - f = BytesIO() + from tempfile import SpooledTemporaryFile + f = SpooledTemporaryFile( + max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-') def commit(): - p = PackData.from_file(BytesIO(f.getvalue()), f.tell()) - f.close() + size = f.tell() + f.seek(0) + p = PackData.from_file(f, size) for obj in PackInflater.for_pack_data(p, self.get_raw): self.add_object(obj) + p.close() def abort(): pass return f, commit, abort - def _complete_thin_pack(self, f, indexer, progress=None): - """Complete a thin pack by adding external references. - - Args: - f: Open file object for the pack. - indexer: A PackIndexer for indexing the pack. - """ - entries = list(indexer) - - ext_refs = indexer.ext_refs() - - if ext_refs: - # Update the header with the new number of objects. - f.seek(0) - write_pack_header(f.write, len(entries) + len(ext_refs)) - - # Rescan the rest of the pack, computing the SHA with the new header. - new_sha = compute_file_sha(f, end_ofs=-20) - - # Complete the pack. - for ext_sha in indexer.ext_refs(): - assert len(ext_sha) == 20 - type_num, data = self.get_raw(ext_sha) - write_pack_object(f.write, type_num, data, sha=new_sha) - pack_sha = new_sha.digest() - f.write(pack_sha) - def add_thin_pack(self, read_all, read_some, progress=None): """Add a new thin pack to this object store. @@ -1129,12 +1072,11 @@ def add_thin_pack(self, read_all, read_some, progress=None): read_some: Read function that returns at least one byte, but may not return the number of bytes requested. """ + f, commit, abort = self.add_pack() try: - indexer = PackIndexer(f, resolve_ext_ref=self.get_raw) - copier = PackStreamCopier(read_all, read_some, f, delta_iter=indexer) - copier.verify(progress=progress) - self._complete_thin_pack(f, indexer, progress=progress) + copier = PackStreamCopier(read_all, read_some, f) + copier.verify() except BaseException: abort() raise @@ -1601,7 +1543,8 @@ def add_pack(self): """ import tempfile - pf = tempfile.SpooledTemporaryFile() + pf = tempfile.SpooledTemporaryFile( + max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-') def commit(): if pf.tell() == 0: @@ -1612,9 +1555,10 @@ def commit(): p = PackData(pf.name, pf) entries = p.sorted_entries() basename = iter_sha1(entry[0] for entry in entries).decode('ascii') - idxf = tempfile.SpooledTemporaryFile() + idxf = tempfile.SpooledTemporaryFile( + max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-') checksum = p.get_stored_checksum() - write_pack_index_v2(idxf, entries, checksum) + write_pack_index(idxf, entries, checksum) idxf.seek(0) idx = load_pack_index_file(basename + '.idx', idxf) for pack in self.packs: diff --git a/dulwich/pack.py b/dulwich/pack.py index cf7cdd6f7..dc4e176fc 100644 --- a/dulwich/pack.py +++ b/dulwich/pack.py @@ -50,7 +50,7 @@ import os import sys -from typing import Optional, Callable, Tuple, List, Deque, Union, Iterable, Iterator, Dict, TypeVar, Generic, Sequence, Set +from typing import Optional, Callable, Tuple, List, Deque, Union, Iterable, Iterator, Dict, TypeVar, Generic, Sequence, Set, BinaryIO try: from typing import Protocol @@ -103,6 +103,9 @@ DEFAULT_PACK_DELTA_WINDOW_SIZE = 10 +# Keep pack files under 16Mb in memory, otherwise write them out to disk +PACK_SPOOL_FILE_MAX_SIZE = 16 * 1024 * 1024 + OldUnpackedObject = Union[Tuple[Union[bytes, int], List[bytes]], List[bytes]] ResolveExtRefFn = Callable[[bytes], Tuple[int, OldUnpackedObject]] @@ -1006,8 +1009,6 @@ def read_objects(self, compute_crc32=False) -> Iterator[UnpackedObject]: IOError: if an error occurred writing to the output file. """ pack_version, self._num_objects = read_pack_header(self.read) - if pack_version is None: - return for i in range(self._num_objects): offset = self.offset @@ -2561,6 +2562,53 @@ def get_unpacked_object(self, sha: bytes, *, include_comp: bool = False, convert return unpacked +def extend_pack(f: BinaryIO, object_ids: Set[ObjectID], get_raw, *, compression_level=-1, progress=None) -> Tuple[bytes, List]: + """Extend a pack file with more objects. + + The caller should make sure that object_ids does not contain any objects + that are already in the pack + """ + # Update the header with the new number of objects. + f.seek(0) + _version, num_objects = read_pack_header(f.read) + + if object_ids: + f.seek(0) + write_pack_header(f.write, num_objects + len(object_ids)) + + # Must flush before reading (http://bugs.python.org/issue3207) + f.flush() + + # Rescan the rest of the pack, computing the SHA with the new header. + new_sha = compute_file_sha(f, end_ofs=-20) + + # Must reposition before writing (http://bugs.python.org/issue3207) + f.seek(0, os.SEEK_CUR) + + extra_entries = [] + + # Complete the pack. + for i, object_id in enumerate(object_ids): + if progress is not None: + progress(("writing extra base objects: %d/%d\r" % (i, len(object_ids))).encode("ascii")) + assert len(object_id) == 20 + type_num, data = get_raw(object_id) + offset = f.tell() + crc32 = write_pack_object( + f.write, + type_num, + data, + sha=new_sha, + compression_level=compression_level, + ) + extra_entries.append((object_id, offset, crc32)) + pack_sha = new_sha.digest() + f.write(pack_sha) + f.close() + + return pack_sha, extra_entries + + try: from dulwich._pack import ( # type: ignore # noqa: F811 apply_delta, diff --git a/dulwich/tests/test_web.py b/dulwich/tests/test_web.py index 1b02b1aad..cf395d130 100644 --- a/dulwich/tests/test_web.py +++ b/dulwich/tests/test_web.py @@ -78,7 +78,7 @@ def read(self, howmuch): start = self.pos end = self.pos + howmuch if start >= len(self.data): - return "" + return b"" self.pos = end return self.data[start:end] @@ -538,7 +538,7 @@ def setUp(self): def _get_zstream(self, text): zstream = BytesIO() - zfile = gzip.GzipFile(fileobj=zstream, mode="w") + zfile = gzip.GzipFile(fileobj=zstream, mode="wb") zfile.write(text) zfile.close() zlength = zstream.tell() diff --git a/dulwich/web.py b/dulwich/web.py index 5582dda5e..e6767dfd8 100644 --- a/dulwich/web.py +++ b/dulwich/web.py @@ -22,9 +22,6 @@ """HTTP server for dulwich that implements the git smart HTTP protocol.""" from io import BytesIO -import shutil -import tempfile -import gzip import os import re import sys @@ -466,21 +463,10 @@ def __init__(self, application): self.app = application def __call__(self, environ, start_response): + import gzip if environ.get("HTTP_CONTENT_ENCODING", "") == "gzip": - try: - environ["wsgi.input"].tell() - wsgi_input = environ["wsgi.input"] - except (AttributeError, OSError, NotImplementedError): - # The gzip implementation in the standard library of Python 2.x - # requires working '.seek()' and '.tell()' methods on the input - # stream. Read the data into a temporary file to work around - # this limitation. - wsgi_input = tempfile.SpooledTemporaryFile(16 * 1024 * 1024) - shutil.copyfileobj(environ["wsgi.input"], wsgi_input) - wsgi_input.seek(0) - environ["wsgi.input"] = gzip.GzipFile( - filename=None, fileobj=wsgi_input, mode="r" + filename=None, fileobj=environ["wsgi.input"], mode="rb" ) del environ["HTTP_CONTENT_ENCODING"] if "CONTENT_LENGTH" in environ: