-
-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
parallel function mapping #1493
Conversation
__all__ = ['process_chunks'] | ||
|
||
|
||
def process_chunks(function, array, chunks=None, depth=None, mode=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestions for a better name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that we could improve upon process
.
This could also probably use a docstring saying that it splits apart an array and executes a normal function in parallel on overlapping chunks of the array.
This PR came out of a conversation with @stefanv |
@jni for purposes of discussion I believe it's possible to specify I really like what I see here, pending a little API/naming discussion. |
We need to know if this is the right API to include in scikit-image. It would be great to get feedback from the scikit-image community about what would be most useful. Would it be possible to have someone play-test this? (Also, to give credit, this is @cowlicks work, not mine :)) |
As of 0.4.0 |
Since toolz is pure-python, I would favour just adding the dependency. I adore toolz. =) I can playtest as soon as it's stable. Thoughts about the API:
Thank you! |
I was avoiding pass through kwargs because I was worried about cluttering the interface. Also some cases like |
I suggest |
Dask will have to support numpy 1.6. It wraps |
Seems doable |
__all__ = ['process_chunks'] | ||
|
||
|
||
def _get_chunks(shape, ncpu): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jni here is my naive chunk size selection based on multiprocessing.cpu_count
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cowlicks doesn't this overchunk? (that's a fun word.) e.g. if I have 4 processors, it's going to give me back 16 chunks, right? I think
- the division should be by the ceiling of the ndim-th root of the number of cpus, and
- In cases of non-even division, the initial chunk size should be 1 + the division result, so that you get a decent-sized remainder.
Here's what a doctest should look like imho:
>>> _get_chunks((4, 4), 4)
(2, 2)
>>> _get_chunks((4, 4), 2)
(2, 2)
>>> _get_chunks((5, 5), 2)
(3, 3)
If we want to get more clever, this'd be a fun one:
>>> _get_chunks((2, 4), 2)
(2, 2)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jni You are correct, I was thinking about this in one dimension. This currently gives you ncpu * len(shape)
chunks. I'll fix this.
Also _get_chunks
should return a tuple of tuples. One tuple for each dimension listing the size of each chunk in that dimension. So:
>>> _get_chunks((4, 4), 4)
((2, 2), (2, 2))
>>> _get_chunks((4, 4), 2)
((2, 2), (4,))
>>> _get_chunks((5, 5), 2)
((2, 3), (5,))
>>> _get_chunks((2, 4), 2)
((1, 1), (4,)) # ? or it might be better to have ((2,), (2, 2))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also have the get_chunks function in skimage just return a single tuple of blocksizes. This is probably more intuitive for users and dask.array functions will convert to the more explicit tuple of tuples form automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jni About your second point: this would make the last remainder smaller. I want to avoid this because you could accidentally have your boundary depth be larger than the last chunk. If we just extend the last chunk by the remainder, we know the safe upper bound for boundary depth would be min(shape) // ciel(ncpu ** (1./len(shape)))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jni Try this. It could definitely be optimized. But I think that could get complicated quickly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this! I am realising that I didn't understand the chunk specification format... And I still don't! I was indeed thinking about block sizes, as @mrocklin suggests. (h5py, for example, specifies chunking in this way.) Can you explain the format a bit more? Maybe a few examples of how a chunk tuple gets unpacked to array indices? This could go in a Notes section for the process_chunks
function docstring. I don't imagine I'll be the first to give something completely different to what's expected!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Storage formats like h5py
have a bit of an easier time with chunk sizes, they can rely on them being uniform. When you start actually computing on your arrays your chunk sizes can change and these changes can be heterogeneous.
Chunks is defined here http://dask.pydata.org/en/latest/array-design.html and generally means the sizes of each block along each dimension. These can vary along each axis but must be Cartesian within the array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jni the docs @mrocklin linked to explain it better than I can.
Currently _get_chunks
returns a sequence of chunksizes (chunk and block are synonymous) along each dimension. This way we can specify what happens at the boundaries to make sure they are not smaller than the ghosting depth.
If _get_chunks
just returned one chunk's dimensions so that this chunk was tiled across the array (this is what you are suggesting I think), the chunks on the boundaries could be truncated so that the ghosting depth might end up larger than a chunk. Which causes issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cowlicks those docs are indeed great! Could you add a link in this docstring under "References"?
@cowlicks I would add the extra args and kwargs to the end of the argument list, calling them |
>>> _get_chunks((5, 5), 2) | ||
((2, 3), (5,)) | ||
>>> _get_chunks((2, 4), 2) | ||
((1, 1), (4,)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we would split the larger dimensions more aggressively than the smaller. I agree with your sentiment though that we should probably wait before optimizing this much further; this is a potential rabbit hole.
In dask.array I'm honestly fine requiring users to think about chunk sizes. Automatic solutions in n-dimensions are likely to fail decently often and users should probably be made somewhat aware of what's going on. So far people also seem willing to think about chunk sizes; it doesn't seem to be that alien to people yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For reference, Distarray does something similar here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar application, but it tries a lot harder to make a good guess.
@jni, extra keyword args in addition to the ones already in there? Or should I move these to the end of the signature? |
@cowlicks I was thinking,
The last bit is my opinion only and I'd consider it optional, since there's examples of different nomenclature elsewhere. But I still would strongly recommend it, perhaps pending feedback from others in this discussion. |
return tuple(chunks) | ||
|
||
|
||
def process_chunks(function, array, chunks=None, depth=0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe "apply_chunks" is more intuitive, given that a function is the first parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stefanv I like it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I'm changing the module name from process.py
to apply.py
too, likewise for the tests.
renamed them to extra_arguments and extra_keywords suggested by @jni
It looks like @cowlicks has been doing some trial and error testing here against the scikit-image build/dependencies system with little luck. This now seems fairly scikit-image focused rather than dask focused; perhaps scikit-image folk could take over from here? |
Yes, I pinged @blink1073 above. He knows most about our current testing infrastructure. |
What's this about optional dependencies? I thought we decided to make dask[array] and toolz actual dependencies? At any rate, optional dependencies should fail gracefully: import dask.array as da becomes: try:
import dask.array as da
except ImportError:
raise NotImplementedError("apply_parallel requires dask[array]") or: try:
import dask.array as da
except ImportError:
import warnings
warnings.warn("dask array is not installed; apply_parallel will only use a single process")
# [code to do this gracefully] But, as I mentioned, I would actually prefer an actual dependency. |
I also thought this would be an actual dependency specified in |
I think that @cowlicks did too, but when he added it to requirements.txt there was an issue with .travis.yml. Apparently you weren't pulling from the standard PyPI repositories but only accepted wheels from some other location. |
@@ -67,6 +67,8 @@ fi | |||
|
|||
retry pip install -q tifffile | |||
|
|||
pip install 'dask[array]>=0.5.0' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try moving this to this line
@@ -52,6 +52,9 @@ functionality is only available with the following installed: | |||
* `imread <http://pythonhosted.org/imread/>`__ | |||
Optional io plugin providing most standard `formats <http://pythonhosted.org//imread/formats.html>`__. | |||
|
|||
* `dask array<dask.pydata.org/en/latest/>`__ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably needs an http://
Hrm, the failure in travis.ci doesn't appear related to changes in this pull request. |
This is the spiritual successor to PR #723
It allows you to break an image into chunks and map a function, in parallel, over the chunks. The chunks' boundary conditions and boundary depth can be specified to match those the function needs.
This adds dask as a dependency, and is based off dask master.