Skip to content

Commit

Permalink
Updated Dask Rechunking tutorial
Browse files Browse the repository at this point in the history
  • Loading branch information
kjdoore committed Jan 3, 2025
1 parent 0fa6be5 commit 3d46d8d
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 106 deletions.
154 changes: 63 additions & 91 deletions 201/RechunkingwithDask.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
":::{Warning}\n",
"You should only run workflows like this tutorial on a cloud or HPC compute node.\n",
"In application, this will require reading and writing **enormous** amounts of data.\n",
"Using a typical network connection and simple compute environment, you would saturate your bandwidth and max out your processor, thereby taking days to for the rechunking to complete.\n",
"Using a typical network connection and simple compute environment, you would saturate your bandwidth and max out your processor, thereby taking days for the rechunking to complete.\n",
":::"
]
},
Expand All @@ -27,9 +27,9 @@
"import fsspec\n",
"from rechunker import rechunk\n",
"import zarr\n",
"import shutil\n",
"import numpy as np\n",
"import dask"
"import dask.distributed\n",
"import dask.diagnostics\n",
"import logging"
]
},
{
Expand Down Expand Up @@ -221,68 +221,57 @@
"\n",
"### Set up output location\n",
"\n",
"Unlike with the smaller dataset in our previous rechunking tutorial, we will write this larger dataset to an object store (an S3 'bucket') in a datacenter.\n",
"So, we need to set that up so that `rechunker` will have a suitable place to write data."
"Unlike with the smaller dataset in our previous rechunking tutorial, we will write this larger dataset to an object store (an S3 'bucket') on the USGS OSN.\n",
"So, we need to set that up so that `rechunker` will have a suitable place to write data.\n",
"\n",
"First, we need to set up the AWS profile and S3 endpoit."
]
},
{
"cell_type": "markdown",
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"TODO: Update these next three cells to properly use AWS or HPC. May need to add some markdown cells to describe what is being done."
"os.environ['AWS_PROFILE'] = \"osn-hytest-scratch\"\n",
"os.environ['AWS_S3_ENDPOINT'] = \"https://usgs.osn.mghpcc.org\"\n",
"%run ../AWS.ipynb"
]
},
{
"cell_type": "code",
"execution_count": null,
"cell_type": "markdown",
"metadata": {},
"outputs": [],
"source": [
"os.environ['AWS_PROFILE'] = \"osn-renci\"\n",
"os.environ['AWS_S3_ENDPOINT'] = \"https://renc.osn.xsede.org\"\n",
"# %run ../AWS.ipynb"
"Next, we make our S3 `fsspec.filesystem` with the required user info and get the mapper to this file to pass to Rechunker."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"metadata": {},
"outputs": [],
"source": [
"from getpass import getuser\n",
"uname=getuser()\n",
"\n",
"fsw = fsspec.filesystem(\n",
"fs = fsspec.filesystem(\n",
" 's3', \n",
" anon=False, \n",
" default_fill_cache=False, \n",
" skip_instance_cache=True, \n",
" client_kwargs={'endpoint_url': os.environ['AWS_S3_ENDPOINT'], }\n",
")\n",
"\n",
"workspace = 's3://rsignellbucket2/'\n",
"testDir = workspace + \"testing/\"\n",
"myDir = testDir + f'{uname}/'\n",
"fsw.mkdir(testDir)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"temp_store = fsw.get_mapper(myDir + 'tutorial_staging.zarr')\n",
"outfile = fsw.get_mapper(myDir + 'tutorial_rechunked.zarr')\n",
"for fname in [staging, outfile]:\n",
" print(f\"Ensuring {fname.root} is empty...\", end='')\n",
"output_dir = f's3://hytest-scratch/rechunking_tutorial/{uname}/'\n",
"\n",
"temp_store = fs.get_mapper(output_dir + 'temp_store.zarr')\n",
"target_store = fs.get_mapper(output_dir + 'tutorial_rechunked.zarr')\n",
"# Check if the objects exist and remove if they do\n",
"for filename in [temp_store, target_store]:\n",
" try:\n",
" fsw.rm(fname.root, recursive=True)\n",
" fs.rm(filename.root, recursive=True)\n",
" except:\n",
" FileNotFoundError\n",
" print(\" Done.\")"
" FileNotFoundError"
]
},
{
Expand All @@ -292,31 +281,27 @@
"### Spin up Dask Cluster\n",
"\n",
"Our rechunking operation will be able to work in parallel.\n",
"To do that, we will spin up a `dask` cluster on the cloud hardware to schedule the various workers.\n",
"Note that this cluster must be configured with a specific user **profile** with permissions to write to our eventual output location."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"TODO: Ensure this is spinning up the cluster we want"
"To do that, we will spin up a `dask` cluster to schedule the various workers.\n",
"\n",
"```{note}\n",
"This cluster will be configured differently depending on where you compute is performed.\n",
"See the [dask deployment docs](https://docs.dask.org/en/stable/deploying.html) for details.\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"metadata": {},
"outputs": [],
"source": [
"import logging\n",
"\n",
"from dask.distributed import Client\n",
"\n",
"# client = Client(n_workers=8, silence_logs=logging.ERROR)\n",
"# client"
"cluster = dask.distributed.LocalCluster(\n",
" n_workers=8,\n",
" threads_per_worker=1, \n",
" silence_logs=logging.ERROR\n",
")\n",
"client = dask.distributed.Client(cluster)\n",
"client"
]
},
{
Expand All @@ -331,17 +316,15 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"metadata": {},
"outputs": [],
"source": [
"result = rechunk(\n",
" # Make sure the base chunks are correct\n",
" ds.chunk({'time': 672, 'feature_id': 15000}),\n",
" target_chunks=chunk_plan,\n",
" max_mem=\"16GB\",\n",
" target_store=outfile,\n",
" max_mem=\"2GB\",\n",
" target_store=target_store,\n",
" temp_store=temp_store\n",
")\n",
"result"
Expand All @@ -353,32 +336,32 @@
"source": [
"Remember that merely invoking Rechunker does not do any work.\n",
"It just sorts out the rechunking plan and writes metadata.\n",
"We need to call `.execute` on the `results` object to actually run the rechunking."
"We need to call `.execute` on the `result` object to actually run the rechunking."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true,
"tags": []
},
"outputs": [],
"source": [
"from dask.distributed import progress, performance_report\n",
"\n",
"with performance_report(filename=\"dask-report.html\"):\n",
"with dask.diagnostics.ProgressBar():\n",
" r = result.execute(retries=10) \n",
"\n",
"# Also consolidate the metadata for fast reading into xarray\n",
"_ = zarr.consolidate_metadata(outfile)"
"_ = zarr.consolidate_metadata(target_store)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Results\n",
"Let's read in the resulting re-chunked dataset to see how it looks:"
"## Confirm the Creation of the Zarr Store by Rechunker\n",
"\n",
"Let's read in the resulting re-chunked dataset to confirm it turned out how we intended."
]
},
{
Expand All @@ -387,35 +370,28 @@
"metadata": {},
"outputs": [],
"source": [
"ds_rechunked = xr.open_zarr(outfile)\n",
"ds_rechunked = xr.open_zarr(target_store)\n",
"ds_rechunked"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Comparison\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"## Before:\n",
"ds"
"Nice, looks good!\n",
"You may have noticed that the only difference between the [introductory tutorial on rechunking](../101/Rechunking.ipynb) and this is the inclusion of creating the dask cluster and where we saved the files.\n",
"Picking your compute environment and output location will typically be the only things that vary in other workflows requiring rechunking.\n",
"Therefore, if you understand this rechunking process you should be able to apply it to your own data efficiently."
]
},
{
"cell_type": "code",
"execution_count": null,
"cell_type": "markdown",
"metadata": {},
"outputs": [],
"source": [
"## After:\n",
"ds_rechunked"
"## Clean Up\n",
"\n",
"As we don't want to keep this rechunked `zarr`, let's go ahead and delete it.\n",
"We will also conform with best practices and close our Dask client and cluster."
]
},
{
Expand All @@ -424,16 +400,12 @@
"metadata": {},
"outputs": [],
"source": [
"fs.rm(temp_store.root, recursive=True)\n",
"fs.rm(target_store.root, recursive=True)\n",
" \n",
"client.close()\n",
"cluster.close()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
19 changes: 6 additions & 13 deletions AWS.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
},
{
"cell_type": "code",
"execution_count": null,
"id": "1117a21a-b7f7-4374-a5bd-15bca24c11a1",
"execution_count": 10,
"id": "858d22f3-065c-41d3-8974-06afc7ada04c",
"metadata": {},
"outputs": [],
"source": [
Expand All @@ -33,8 +33,9 @@
" os.path.expanduser('~/.aws/credentials') \n",
" # default location... if yours is elsewhere, change this.\n",
")\n",
"_profile_nm = os.environ.get('AWS_PROFILE', 'osn-rsignellbucket2')\n",
"_endpoint = os.environ.get('AWS_S3_ENDPOINT', 'https://renc.osn.xsede.org')\n",
"_profile_nm = os.environ.get('AWS_PROFILE', 'osn-hytest-scratch')\n",
"_endpoint = os.environ.get('AWS_S3_ENDPOINT', 'https://usgs.osn.mghpcc.org/')\n",
"\n",
"# Set environment vars based on parsed awsconfig\n",
"try:\n",
" os.environ['AWS_ACCESS_KEY_ID'] = awsconfig[_profile_nm]['aws_access_key_id']\n",
Expand All @@ -53,14 +54,6 @@
"source": [
"It is extremely important that you **never** set any of the access keys or secrets directly -- we never want to include any of those values as string literals in any code. This code is committed to a public repository, so doing this would essentially publish those secrets. **ALWAYS** parse the config file as demonstrated above in order to obtain the access key and the secret access key. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f473e813-dfff-4e91-a87c-bf7f74a414e8",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand All @@ -79,7 +72,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.8"
"version": "3.12.0"
}
},
"nbformat": 4,
Expand Down
4 changes: 2 additions & 2 deletions _toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ chapters:
- file: 201/RechunkingwithDask
- file: 201/VirtualZarr
- file: 201/AddingCRStoZarr
# - file: 201/ChunkingAuxilliaryCoords
# - file: 201/OptimalChunkSelection
# - file: 201/IcechunkTutorial
- file: back/index
Expand All @@ -24,4 +23,5 @@ chapters:
# - file: utils
# - file: AWS
# - file: StartNebariCluster
- file: back/Glossary
- file: back/Glossary
# - file: back/AdditionalResources

0 comments on commit 3d46d8d

Please sign in to comment.