-
Notifications
You must be signed in to change notification settings - Fork 97
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
prefetch: use a separate temporary cache for prefetching #730
base: main
Are you sure you want to change the base?
Conversation
afae789
to
1266e4a
Compare
Deploying datachain-documentation with Cloudflare Pages
|
Deploying datachain-documentation with Cloudflare Pages
|
1266e4a
to
15c30fb
Compare
15c30fb
to
1b34bc0
Compare
1862bd0
to
90f1b7c
Compare
90f1b7c
to
0ee1da1
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #730 +/- ##
==========================================
+ Coverage 87.33% 87.39% +0.06%
==========================================
Files 116 116
Lines 11147 11217 +70
Branches 1532 1536 +4
==========================================
+ Hits 9735 9803 +68
Misses 1032 1032
- Partials 380 382 +2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
0ee1da1
to
0ca4e5f
Compare
0ca4e5f
to
5770599
Compare
5770599
to
a58c8a3
Compare
a58c8a3
to
bb8cc22
Compare
bb8cc22
to
acd168e
Compare
acd168e
to
b7e620b
Compare
b7e620b
to
278af30
Compare
278af30
to
c1146ef
Compare
@@ -179,6 +180,7 @@ def iterate(self, timeout=None) -> Generator[ResultT, None, None]: | |||
self.shutdown_producer() | |||
if not async_run.done(): | |||
async_run.cancel() | |||
wait([async_run]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.cancel()
does not immediately cancel the task the underlying asyncio
task.
We could add a .result()
to wait for the future, but that does not seem to work either for the cancelled future from run_coroutine_threadsafe()
. See python/cpython#105836.
So, I have added wait(...)
as it seems to wait the cancelled future, and wait for underlying asyncio task.
Alternatively, we could add an asyncio.Event
and wait for it.
src/datachain/lib/file.py
Outdated
if client.protocol == HfClient.protocol: | ||
self._set_stream(catalog, self._caching_enabled, download_cb=download_cb) | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefetch
is disabled for huggingface. See #746.
if os.getenv("DATACHAIN_SHOW_PREFETCH_PROGRESS"): | ||
download_cb = get_download_callback( | ||
f"{total_rank}/{total_workers}", position=total_rank | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shows a prefetch download progressbar for each worker which will be useful for debugging.
We cannot enable this by default, as this will mess up user's progressbar due to multiprocessing.
pass | ||
|
||
|
||
class TqdmCombinedDownloadCallback(CombinedDownloadCallback, TqdmCallback): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have modified the callback to also show file counts on prefetching.
This will not show up on pytorch however.
Eg:
Download: 1.03MB [00:01, 605kB/s, 50 files]
392aaff
to
1d3012d
Compare
1d3012d
to
b9ee297
Compare
b9ee297
to
7adfc0a
Compare
7adfc0a
to
f9e580d
Compare
f9e580d
to
bf24f57
Compare
Unless `cache=True` is set, a separate temporary cache will be used for prefetching. It will get removed after the iteration is closed.
950170b
to
a516de8
Compare
This PR will use a separate temporary cache for prefetching that resides in
.datachain/tmp/prefetch-<random>
directory whenprefetch=
is set butcache
is not.The temporary directory will be automatically deleted after the prefetching is done.
For
cache=True
, the cache will be reused and won't be deleted.Please note that auto-cleanup does not work for PyTorch datasets because there is no way to invoke cleanup from the
Dataset
side. TheDataLoader
may still have cached data or rows even after theDataset
instance has finished iterating. As a result, values associated with acatalog
/cache
instance can outlive theDataset
instance.One potential solution is to implement a custom dataloader or provide a user-facing API.
In this PR, I have implemented the latter. The
PytorchDataset
now includes aclose()
method, which can be used to clean up the temporary prefetch cache.Eg: