Skip to content

Commit

Permalink
Merge pull request #1124 from jelmer/extend-pack
Browse files Browse the repository at this point in the history
Add extend_pack, limit SpooledTemporaryFile memory usage
  • Loading branch information
jelmer authored Jan 17, 2023
2 parents acde021 + c75327f commit a0d7981
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 106 deletions.
5 changes: 4 additions & 1 deletion dulwich/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
write_pack_from_container,
UnpackedObject,
PackChunkGenerator,
PACK_SPOOL_FILE_MAX_SIZE,
)
from dulwich.refs import (
read_info_refs,
Expand Down Expand Up @@ -813,7 +814,9 @@ def fetch(
determine_wants = target.object_store.determine_wants_all
if CAPABILITY_THIN_PACK in self._fetch_capabilities:
from tempfile import SpooledTemporaryFile
f: IO[bytes] = SpooledTemporaryFile()
f: IO[bytes] = SpooledTemporaryFile(
max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-',
dir=getattr(target.object_store, 'path', None))

def commit():
if f.tell():
Expand Down
6 changes: 3 additions & 3 deletions dulwich/cloud/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import tempfile

from ..object_store import BucketBasedObjectStore
from ..pack import PackData, Pack, load_pack_index_file
from ..pack import PackData, Pack, load_pack_index_file, PACK_SPOOL_FILE_MAX_SIZE


# TODO(jelmer): For performance, read ranges?
Expand Down Expand Up @@ -58,14 +58,14 @@ def _iter_pack_names(self):

def _load_pack_data(self, name):
b = self.bucket.blob(posixpath.join(self.subpath, name + '.pack'))
f = tempfile.SpooledTemporaryFile()
f = tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE)
b.download_to_file(f)
f.seek(0)
return PackData(name + '.pack', f)

def _load_pack_index(self, name):
b = self.bucket.blob(posixpath.join(self.subpath, name + '.idx'))
f = tempfile.SpooledTemporaryFile()
f = tempfile.SpooledTemporaryFile(max_size=PACK_SPOOL_FILE_MAX_SIZE)
b.download_to_file(f)
f.seek(0)
return load_pack_index_file(name + '.idx', f)
Expand Down
106 changes: 25 additions & 81 deletions dulwich/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,18 @@
PackInflater,
PackFileDisappeared,
UnpackedObject,
extend_pack,
load_pack_index_file,
iter_sha1,
full_unpacked_object,
generate_unpacked_objects,
pack_objects_to_data,
write_pack_header,
write_pack_index_v2,
write_pack_index,
write_pack_data,
write_pack_object,
compute_file_sha,
PackIndexer,
PackStreamCopier,
PackedObjectContainer,
PACK_SPOOL_FILE_MAX_SIZE,
)
from dulwich.protocol import DEPTH_INFINITE
from dulwich.refs import ANNOTATED_TAG_SUFFIX, Ref
Expand Down Expand Up @@ -819,47 +818,14 @@ def _complete_thin_pack(self, f, path, copier, indexer, progress=None):
entries = []
for i, entry in enumerate(indexer):
if progress is not None:
progress(("generating index: %d\r" % i).encode('ascii'))
progress(("generating index: %d/%d\r" % (i, len(copier))).encode('ascii'))
entries.append(entry)

ext_refs = indexer.ext_refs()

if ext_refs:
# Update the header with the new number of objects.
f.seek(0)
write_pack_header(f.write, len(entries) + len(ext_refs))

# Must flush before reading (http://bugs.python.org/issue3207)
f.flush()

# Rescan the rest of the pack, computing the SHA with the new header.
new_sha = compute_file_sha(f, end_ofs=-20)

# Must reposition before writing (http://bugs.python.org/issue3207)
f.seek(0, os.SEEK_CUR)

# Complete the pack.
for i, ext_sha in enumerate(ext_refs):
if progress is not None:
progress(("writing extra base objects: %d/%d\r" % (i, len(ext_refs))).encode("ascii"))
assert len(ext_sha) == 20
type_num, data = self.get_raw(ext_sha)
offset = f.tell()
crc32 = write_pack_object(
f.write,
type_num,
data,
sha=new_sha,
compression_level=self.pack_compression_level,
)
entries.append((ext_sha, offset, crc32))
pack_sha = new_sha.digest()
f.write(pack_sha)
else:
f.seek(-20, os.SEEK_END)
pack_sha = f.read(20)
pack_sha, extra_entries = extend_pack(
f, indexer.ext_refs(), get_raw=self.get_raw, compression_level=self.pack_compression_level,
progress=progress)

f.close()
entries.extend(extra_entries)

# Move the pack in.
entries.sort()
Expand All @@ -877,7 +843,7 @@ def _complete_thin_pack(self, f, path, copier, indexer, progress=None):
# Write the index.
index_file = GitFile(pack_base_name + ".idx", "wb", mask=PACK_MODE)
try:
write_pack_index_v2(index_file, entries, pack_sha)
write_pack_index(index_file, entries, pack_sha)
index_file.close()
finally:
index_file.abort()
Expand Down Expand Up @@ -928,7 +894,7 @@ def move_in_pack(self, path):
index_name = basename + ".idx"
if not os.path.exists(index_name):
with GitFile(index_name, "wb", mask=PACK_MODE) as f:
write_pack_index_v2(f, entries, p.get_stored_checksum())
write_pack_index(f, entries, p.get_stored_checksum())
for pack in self.packs:
if pack._basename == basename:
return pack
Expand Down Expand Up @@ -1076,46 +1042,23 @@ def add_pack(self):
Returns: Fileobject to write to and a commit function to
call when the pack is finished.
"""
f = BytesIO()
from tempfile import SpooledTemporaryFile
f = SpooledTemporaryFile(
max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-')

def commit():
p = PackData.from_file(BytesIO(f.getvalue()), f.tell())
f.close()
size = f.tell()
f.seek(0)
p = PackData.from_file(f, size)
for obj in PackInflater.for_pack_data(p, self.get_raw):
self.add_object(obj)
p.close()

def abort():
pass

return f, commit, abort

def _complete_thin_pack(self, f, indexer, progress=None):
"""Complete a thin pack by adding external references.
Args:
f: Open file object for the pack.
indexer: A PackIndexer for indexing the pack.
"""
entries = list(indexer)

ext_refs = indexer.ext_refs()

if ext_refs:
# Update the header with the new number of objects.
f.seek(0)
write_pack_header(f.write, len(entries) + len(ext_refs))

# Rescan the rest of the pack, computing the SHA with the new header.
new_sha = compute_file_sha(f, end_ofs=-20)

# Complete the pack.
for ext_sha in indexer.ext_refs():
assert len(ext_sha) == 20
type_num, data = self.get_raw(ext_sha)
write_pack_object(f.write, type_num, data, sha=new_sha)
pack_sha = new_sha.digest()
f.write(pack_sha)

def add_thin_pack(self, read_all, read_some, progress=None):
"""Add a new thin pack to this object store.
Expand All @@ -1129,12 +1072,11 @@ def add_thin_pack(self, read_all, read_some, progress=None):
read_some: Read function that returns at least one byte, but may
not return the number of bytes requested.
"""

f, commit, abort = self.add_pack()
try:
indexer = PackIndexer(f, resolve_ext_ref=self.get_raw)
copier = PackStreamCopier(read_all, read_some, f, delta_iter=indexer)
copier.verify(progress=progress)
self._complete_thin_pack(f, indexer, progress=progress)
copier = PackStreamCopier(read_all, read_some, f)
copier.verify()
except BaseException:
abort()
raise
Expand Down Expand Up @@ -1601,7 +1543,8 @@ def add_pack(self):
"""
import tempfile

pf = tempfile.SpooledTemporaryFile()
pf = tempfile.SpooledTemporaryFile(
max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-')

def commit():
if pf.tell() == 0:
Expand All @@ -1612,9 +1555,10 @@ def commit():
p = PackData(pf.name, pf)
entries = p.sorted_entries()
basename = iter_sha1(entry[0] for entry in entries).decode('ascii')
idxf = tempfile.SpooledTemporaryFile()
idxf = tempfile.SpooledTemporaryFile(
max_size=PACK_SPOOL_FILE_MAX_SIZE, prefix='incoming-')
checksum = p.get_stored_checksum()
write_pack_index_v2(idxf, entries, checksum)
write_pack_index(idxf, entries, checksum)
idxf.seek(0)
idx = load_pack_index_file(basename + '.idx', idxf)
for pack in self.packs:
Expand Down
54 changes: 51 additions & 3 deletions dulwich/pack.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@

import os
import sys
from typing import Optional, Callable, Tuple, List, Deque, Union, Iterable, Iterator, Dict, TypeVar, Generic, Sequence, Set
from typing import Optional, Callable, Tuple, List, Deque, Union, Iterable, Iterator, Dict, TypeVar, Generic, Sequence, Set, BinaryIO

try:
from typing import Protocol
Expand Down Expand Up @@ -103,6 +103,9 @@

DEFAULT_PACK_DELTA_WINDOW_SIZE = 10

# Keep pack files under 16Mb in memory, otherwise write them out to disk
PACK_SPOOL_FILE_MAX_SIZE = 16 * 1024 * 1024


OldUnpackedObject = Union[Tuple[Union[bytes, int], List[bytes]], List[bytes]]
ResolveExtRefFn = Callable[[bytes], Tuple[int, OldUnpackedObject]]
Expand Down Expand Up @@ -1006,8 +1009,6 @@ def read_objects(self, compute_crc32=False) -> Iterator[UnpackedObject]:
IOError: if an error occurred writing to the output file.
"""
pack_version, self._num_objects = read_pack_header(self.read)
if pack_version is None:
return

for i in range(self._num_objects):
offset = self.offset
Expand Down Expand Up @@ -2561,6 +2562,53 @@ def get_unpacked_object(self, sha: bytes, *, include_comp: bool = False, convert
return unpacked


def extend_pack(f: BinaryIO, object_ids: Set[ObjectID], get_raw, *, compression_level=-1, progress=None) -> Tuple[bytes, List]:
"""Extend a pack file with more objects.
The caller should make sure that object_ids does not contain any objects
that are already in the pack
"""
# Update the header with the new number of objects.
f.seek(0)
_version, num_objects = read_pack_header(f.read)

if object_ids:
f.seek(0)
write_pack_header(f.write, num_objects + len(object_ids))

# Must flush before reading (http://bugs.python.org/issue3207)
f.flush()

# Rescan the rest of the pack, computing the SHA with the new header.
new_sha = compute_file_sha(f, end_ofs=-20)

# Must reposition before writing (http://bugs.python.org/issue3207)
f.seek(0, os.SEEK_CUR)

extra_entries = []

# Complete the pack.
for i, object_id in enumerate(object_ids):
if progress is not None:
progress(("writing extra base objects: %d/%d\r" % (i, len(object_ids))).encode("ascii"))
assert len(object_id) == 20
type_num, data = get_raw(object_id)
offset = f.tell()
crc32 = write_pack_object(
f.write,
type_num,
data,
sha=new_sha,
compression_level=compression_level,
)
extra_entries.append((object_id, offset, crc32))
pack_sha = new_sha.digest()
f.write(pack_sha)
f.close()

return pack_sha, extra_entries


try:
from dulwich._pack import ( # type: ignore # noqa: F811
apply_delta,
Expand Down
4 changes: 2 additions & 2 deletions dulwich/tests/test_web.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def read(self, howmuch):
start = self.pos
end = self.pos + howmuch
if start >= len(self.data):
return ""
return b""
self.pos = end
return self.data[start:end]

Expand Down Expand Up @@ -538,7 +538,7 @@ def setUp(self):

def _get_zstream(self, text):
zstream = BytesIO()
zfile = gzip.GzipFile(fileobj=zstream, mode="w")
zfile = gzip.GzipFile(fileobj=zstream, mode="wb")
zfile.write(text)
zfile.close()
zlength = zstream.tell()
Expand Down
18 changes: 2 additions & 16 deletions dulwich/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@
"""HTTP server for dulwich that implements the git smart HTTP protocol."""

from io import BytesIO
import shutil
import tempfile
import gzip
import os
import re
import sys
Expand Down Expand Up @@ -466,21 +463,10 @@ def __init__(self, application):
self.app = application

def __call__(self, environ, start_response):
import gzip
if environ.get("HTTP_CONTENT_ENCODING", "") == "gzip":
try:
environ["wsgi.input"].tell()
wsgi_input = environ["wsgi.input"]
except (AttributeError, OSError, NotImplementedError):
# The gzip implementation in the standard library of Python 2.x
# requires working '.seek()' and '.tell()' methods on the input
# stream. Read the data into a temporary file to work around
# this limitation.
wsgi_input = tempfile.SpooledTemporaryFile(16 * 1024 * 1024)
shutil.copyfileobj(environ["wsgi.input"], wsgi_input)
wsgi_input.seek(0)

environ["wsgi.input"] = gzip.GzipFile(
filename=None, fileobj=wsgi_input, mode="r"
filename=None, fileobj=environ["wsgi.input"], mode="rb"
)
del environ["HTTP_CONTENT_ENCODING"]
if "CONTENT_LENGTH" in environ:
Expand Down

0 comments on commit a0d7981

Please sign in to comment.