Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add adaptive example of a Monte Carlo estimate #19

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
358 changes: 358 additions & 0 deletions elastic_monte_carlo_estimate_of_pi.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,358 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Monte-Carlo Estimate of $\\pi$\n",
"\n",
"We want to estimate the number $\\pi$ using a [Monte-Carlo method](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods) exploiting that the area of a quarter circle of unit radius is $\\pi/4$ and that hence the probability of any randomly chosen point in a unit square to lie in a unit circle centerd at a corner of the unit square is $\\pi/4$ as well. So for N randomly chosen pairs $(x, y)$ with $x\\in[0, 1)$ and $y\\in[0, 1)$, we count the number $N_{circ}$ of pairs that also satisfy $(x^2 + y^2) < 1$ and estimage $\\pi \\approx 4 \\cdot N_{circ} / N$.\n",
"\n",
"[<img src=\"https://upload.wikimedia.org/wikipedia/commons/8/84/Pi_30K.gif\" \n",
" width=\"50%\" \n",
" align=top\n",
" alt=\"PI monte-carlo estimate\">](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)\n",
"\n",
"## Why Adaptive?\n",
"\n",
"Using [Dask's adaptivity](http://docs.dask.org/en/latest/setup/adaptive.html), we'll show that it is possible to scale the available resources to meet almost any desired wall times irrespective of the actual work load. This is important, because it allows for focussing on the situation of the human running the analysis:\n",
"\n",
"> _\"Am I in an explorative and creative phase of my work where it is important that I can see the next plot within seconds?\"_\n",
"\n",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo in actual.

You should maybe introduce also the Montecarlo estimate of py mechanism?

"> _\"Am I running routine analyses that can wait until tomorrow or next week?\"_\n",
"\n",
"## Actual timings\n",
"\n",
"Aming for a duration of 20 seconds per calculation, this is what we actually get:\n",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo in Aiming.
Maybe put the results at the end? To give the user an idea of what can be achieved, even if he probably won't scale all the way up.

"\n",
"- $\\pi$ from 50.0 GB of random data in 21.82 s with 13 workers\n",
"- $\\pi$ from 100.0 GB of random data in 16.73 s with 25 workers\n",
"- $\\pi$ from 200.0 GB of random data in 15.79 s with 53 workers\n",
"- $\\pi$ from 400.0 GB of random data in 16.94 s with 122 workers\n",
"- $\\pi$ from 800.0 GB of random data in 21.84 s with 241 workers\n",
"- $\\pi$ from 1600.0 GB of random data in 26.89 s with 400 workers\n",
"- $\\pi$ from 3200.0 GB of random data in 45.50 s with 392 workers"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Tuning adaptivity\n",
"\n",
"The following tunes a Dask cluster to use anywhere between 1 and 400 workers and to scale its size so that any computation is finished within 20 seconds. On Pangeo, time scales for starting / stopping workers are of the order of a few seconds, so we set a startup cost to 5 seconds (instead of the default value of 1 second) and increase possible scale-down times by setting the relevant interval to 2 seconds and the number of times a worker needs to be considered expendable before it is actually killed to `10`. We also reduce the default factor that is applied to adapt the cluster to a more modest `1.2`.\n",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

time scales for starting / stopping workers are of the order of a few seconds

Not sure this is true for Cloud deployments, is it?

Why only 1.2 scale factor? I'm not sure this is used in most cases.

"\n",
"To see all available args and defaults for tuning adaptivity, check the [docs of dask.distributed.Adaptive](http://docs.dask.org/en/latest/setup/adaptive.html) or skip to the cell with the [docstring of Adaptive at the end of this notebook](#Docstring-of-Adaptive)."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from dask_kubernetes import KubeCluster\n",
"cluster = KubeCluster(n_workers=1)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"cluster.adapt(minimum=1, maximum=400,\n",
" target_duration=\"20s\",\n",
" interval=\"2s\",\n",
" wait_count=10,\n",
" startup_cost=\"5s\",\n",
" scale_factor=1.2);"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Client</h3>\n",
"<ul>\n",
" <li><b>Scheduler: </b>tcp://10.48.74.164:40711\n",
" <li><b>Dashboard: </b><a href='/user/willirath-pange-ample-notebooks-yxzvs48p/proxy/8787/status' target='_blank'>/user/willirath-pange-ample-notebooks-yxzvs48p/proxy/8787/status</a>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3>Cluster</h3>\n",
"<ul>\n",
" <li><b>Workers: </b>0</li>\n",
" <li><b>Cores: </b>0</li>\n",
" <li><b>Memory: </b>0 B</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: scheduler='tcp://10.48.74.164:40711' processes=0 cores=0>"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from dask.distributed import Client\n",
"c = Client(cluster)\n",
"c"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"(Check the dash board to see the cluster scale up and down!)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## The actual calculations\n",
"\n",
"We loop over volumes of 50 GB, 100 GB, 200 GB, ..., 3200 GB of double-precision random numbers and estimate $\\pi$ as described above."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe run it first we a small size, just so the user can validate and try the method.

And then loop, but maybe not up to 3200 GB?

]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"import dask.array as da\n",
"import numpy as np\n",
"from time import time\n",
"\n",
"def calc_pi_mc(size):\n",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to first describe step by step the pi estimation, that would be very interesting.
Then at the end put all inside a function to perform scalability analysis.

" xy = da.random.uniform(0, 1,\n",
" size=(int(size / 8 / 2), 2),\n",
" chunks=(int(500e6 / 8), 2))\n",
" \n",
" in_circle = ((xy ** 2).sum(axis=-1) < 1)\n",
" pi = 4 * in_circle.mean()\n",
"\n",
" start = time()\n",
" pi = pi.compute()\n",
" end = time()\n",
" \n",
" num_workers = len(cluster.scheduler.workers)\n",
" \n",
" print(\"Size of data:\", xy.nbytes / 1e9, \"GB\")\n",
" print(\"Monte-Carlo pi:\", pi)\n",
" print(\"Numpys pi:\", np.pi)\n",
" print(\"Delta:\", abs(pi - np.pi))\n",
" print(\"Duration: {:.2f} seconds with {} workers\".format(\n",
" end - start, num_workers))\n",
" print()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Size of data: 50.0 GB\n",
"Monte-Carlo pi: 3.1415611776\n",
"Numpys pi: 3.141592653589793\n",
"Delta: 3.147598979325039e-05\n",
"Duration: 21.82 seconds with 13 workers\n",
"\n",
"Size of data: 100.0 GB\n",
"Monte-Carlo pi: 3.14157815872\n",
"Numpys pi: 3.141592653589793\n",
"Delta: 1.4494869793324483e-05\n",
"Duration: 16.73 seconds with 25 workers\n",
"\n",
"Size of data: 200.0 GB\n",
"Monte-Carlo pi: 3.14158670528\n",
"Numpys pi: 3.141592653589793\n",
"Delta: 5.94830979316896e-06\n",
"Duration: 15.79 seconds with 53 workers\n",
"\n",
"Size of data: 400.0 GB\n",
"Monte-Carlo pi: 3.14159057072\n",
"Numpys pi: 3.141592653589793\n",
"Delta: 2.0828697930852513e-06\n",
"Duration: 16.94 seconds with 122 workers\n",
"\n",
"Size of data: 800.0 GB\n",
"Monte-Carlo pi: 3.14159497536\n",
"Numpys pi: 3.141592653589793\n",
"Delta: 2.321770206759055e-06\n",
"Duration: 21.84 seconds with 241 workers\n",
"\n",
"Size of data: 1600.0 GB\n",
"Monte-Carlo pi: 3.14159482016\n",
"Numpys pi: 3.141592653589793\n",
"Delta: 2.1665702067963366e-06\n",
"Duration: 26.89 seconds with 400 workers\n",
"\n",
"Size of data: 3200.0 GB\n",
"Monte-Carlo pi: 3.14159423066\n",
"Numpys pi: 3.141592653589793\n",
"Delta: 1.577070206870701e-06\n",
"Duration: 45.50 seconds with 392 workers\n",
"\n"
]
}
],
"source": [
"from time import sleep\n",
"\n",
"for size in [1e9 * n for n in [50, 100, 200, 400,\n",
" 800, 1600, 3200]]:\n",
" \n",
" calc_pi_mc(size)\n",
" sleep(30) # allow for some scale-down time"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Docstring of Adaptive"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"from dask.distributed import Adaptive"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"\u001b[0;31mInit signature:\u001b[0m \u001b[0mAdaptive\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mscheduler\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mcluster\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mNone\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0minterval\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m'1s'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mstartup_cost\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m'1s'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mscale_factor\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m2\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mminimum\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m0\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mmaximum\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;32mNone\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mwait_count\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m3\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mtarget_duration\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m'5s'\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mworker_key\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m<\u001b[0m\u001b[0mfunction\u001b[0m \u001b[0mAdaptive\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m<\u001b[0m\u001b[0;32mlambda\u001b[0m\u001b[0;34m>\u001b[0m \u001b[0mat\u001b[0m \u001b[0;36m0x7fcbc0584e18\u001b[0m\u001b[0;34m>\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkwargs\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
"\u001b[0;31mDocstring:\u001b[0m \n",
"Adaptively allocate workers based on scheduler load. A superclass.\n",
"\n",
"Contains logic to dynamically resize a Dask cluster based on current use.\n",
"This class needs to be paired with a system that can create and destroy\n",
"Dask workers using a cluster resource manager. Typically it is built into\n",
"already existing solutions, rather than used directly by users.\n",
"It is most commonly used from the ``.adapt(...)`` method of various Dask\n",
"cluster classes.\n",
"\n",
"Parameters\n",
"----------\n",
"scheduler: distributed.Scheduler\n",
"cluster: object\n",
" Must have scale_up and scale_down methods/coroutines\n",
"startup_cost : timedelta or str, default \"1s\"\n",
" Estimate of the number of seconds for nnFactor representing how costly it is to start an additional worker.\n",
" Affects quickly to adapt to high tasks per worker loads\n",
"interval : timedelta or str, default \"1000 ms\"\n",
" Milliseconds between checks\n",
"wait_count: int, default 3\n",
" Number of consecutive times that a worker should be suggested for\n",
" removal before we remove it.\n",
"scale_factor : int, default 2\n",
" Factor to scale by when it's determined additional workers are needed\n",
"target_duration: timedelta or str, default \"5s\"\n",
" Amount of time we want a computation to take.\n",
" This affects how aggressively we scale up.\n",
"worker_key: Callable[WorkerState]\n",
" Function to group workers together when scaling down\n",
" See Scheduler.workers_to_close for more information\n",
"minimum: int\n",
" Minimum number of workers to keep around\n",
"maximum: int\n",
" Maximum number of workers to keep around\n",
"**kwargs:\n",
" Extra parameters to pass to Scheduler.workers_to_close\n",
"\n",
"Examples\n",
"--------\n",
"\n",
"This is commonly used from existing Dask classes, like KubeCluster\n",
"\n",
">>> from dask_kubernetes import KubeCluster\n",
">>> cluster = KubeCluster()\n",
">>> cluster.adapt(minimum=10, maximum=100)\n",
"\n",
"Alternatively you can use it from your own Cluster class by subclassing\n",
"from Dask's Cluster superclass\n",
"\n",
">>> from distributed.deploy import Cluster\n",
">>> class MyCluster(Cluster):\n",
"... def scale_up(self, n):\n",
"... \"\"\" Bring worker count up to n \"\"\"\n",
"... def scale_down(self, workers):\n",
"... \"\"\" Remove worker addresses from cluster \"\"\"\n",
"\n",
">>> cluster = MyCluster()\n",
">>> cluster.adapt(minimum=10, maximum=100)\n",
"\n",
"Notes\n",
"-----\n",
"Subclasses can override :meth:`Adaptive.should_scale_up` and\n",
":meth:`Adaptive.workers_to_close` to control when the cluster should be\n",
"resized. The default implementation checks if there are too many tasks\n",
"per worker or too little memory available (see :meth:`Adaptive.needs_cpu`\n",
"and :meth:`Adaptive.needs_memory`).\n",
"\n",
":meth:`Adaptive.get_scale_up_kwargs` method controls the arguments passed to\n",
"the cluster's ``scale_up`` method.\n",
"\u001b[0;31mFile:\u001b[0m /srv/conda/lib/python3.6/site-packages/distributed/deploy/adaptive.py\n",
"\u001b[0;31mType:\u001b[0m type\n"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"Adaptive?"
]
}
],
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.6"
}
},
"nbformat": 4,
"nbformat_minor": 2
}