Skip to content

Commit

Permalink
Updated with new sections
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobceles authored May 16, 2020
1 parent 9e10683 commit 552864a
Showing 1 changed file with 212 additions and 4 deletions.
216 changes: 212 additions & 4 deletions pyspark/Colab and PySpark.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
"provenance": [],
"collapsed_sections": [
"YR1CO3FTqO5h",
"K_q3Yzc9qYc0",
"5gs9JXCWqb9s",
"NhM3wLG2qhlN",
"1NJMWs4NqnlF",
Expand Down Expand Up @@ -46,10 +45,21 @@
"oVwGYAZZiyGV",
"f3crkAQVlxKp",
"vIbXZT29JxmG",
"snACMwZug5Yn",
"dABRu9eokZxw",
"xrgbAiZHnq_U",
"244El832wT8f",
"H5cFCvbczHz_"
"H5cFCvbczHz_",
"rdGARl-D3n-l",
"aaxfGqYZ6Iqz",
"BG09dDdL6Tvt",
"CJlmPbLYKKFA",
"r-R5ijHrKSg0",
"UOvEVcieVn7e",
"2KY1mxZVfNsl",
"qwEBu3T3EbfD",
"-HxUYv77EdSv",
"NZ5xsdWmNOVz"
]
},
"kernelspec": {
Expand Down Expand Up @@ -156,6 +166,8 @@
" <ul>\n",
" <li><a href=\"#emr-sizing\">EMR Sizing</a></li>\n",
" <li><a href=\"#spark-configurations\">Spark Configurations</a></li>\n",
" <li><a href=\"#job-tuning\">Job Tuning</a>\n",
" <li><a href=\"#best-practices\">Best Practices</a>\n",
" </ul>\n",
" </li>\n",
" </ol>\n",
Expand Down Expand Up @@ -326,7 +338,7 @@
"In the words of Google: <br>\n",
"`Colaboratory, or “Colab” for short, is a product from Google Research. Colab allows anybody to write and execute arbitrary python code through the browser, and is especially well suited to machine learning, data analysis and education. More technically, Colab is a hosted Jupyter notebook service that requires no setup to use, while providing free access to computing resources including GPUs.`\n",
"\n",
"The reason why I used colab is because of its shareability and free GPU. Yeah you read that right. A FREE GPU! Additionally, it helps use different Google services conveniently. It saves to Google Drive and all the services are very closely related. I recommend you go through the offical [overview documentation](https://colab.research.google.com/notebooks/basic_features_overview.ipynb) if you want to know more about it.\n",
"The reason why I used colab is because of its shareability and free GPU and TPU. Yeah you read that right, FREE GPU AND TPU! For using TPU, your program needs to be optimized for the same. Additionally, it helps use different Google services conveniently. It saves to Google Drive and all the services are very closely related. I recommend you go through the offical [overview documentation](https://colab.research.google.com/notebooks/basic_features_overview.ipynb) if you want to know more about it.\n",
"If you have more questions about colab, please [refer this link](https://research.google.com/colaboratory/faq.html).\n",
"\n",
">While using a colab notebook, you will need an active internet connection to keep a session alive. If you lose the connection you will have to download the datasets again."
Expand Down Expand Up @@ -3887,9 +3899,10 @@
"\n",
">Spark Performance Tuning refers to the process of adjusting settings to record for memory, cores, and instances used by the system. This process guarantees that the Spark has a flawless performance and also prevents bottlenecking of resources in Spark.\n",
"\n",
"Considering you are using Amazon EMR to execute your spark jobs, there are two aspects you need to take care of:\n",
"Considering you are using Amazon EMR to execute your spark jobs, there are three aspects you need to take care of:\n",
"1. EMR Sizing\n",
"2. Spark Configurations\n",
"3. Job Tuning\n",
"\n"
]
},
Expand Down Expand Up @@ -4153,6 +4166,201 @@
"\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "CJlmPbLYKKFA",
"colab_type": "text"
},
"source": [
"<a id='job-tuning'></a>\n",
"### Job Tuning"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "vccmILvVWewW",
"colab_type": "text"
},
"source": [
"Apart from EMR and Spark tuning, there is another way to approach opttimizations, and that is by tuning your job itself to produce results efficently. I will be going over some such techniques which will help you achieve this. The [Spark Programming Guide](https://spark.apache.org/docs/2.1.1/programming-guide.html) talks more about these concepts in detail. If you guys prefer watching a video over reading, I highly recommend [A Deep Dive into Proper Optimization for Spark Jobs](https://youtu.be/daXEp4HmS-E) by Daniel Tomes from Databricks, which I found really useful and informative!"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "r-R5ijHrKSg0",
"colab_type": "text"
},
"source": [
"#### Broadcast Joins (Broadcast Hash Join)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "dvO0z5EpM5U8",
"colab_type": "text"
},
"source": [
"For some jobs, the efficenecy can be increased by caching them in memory. Broadcast Hash Join(BHJ) is such a technique which will help you optimize join queries when the size of one side of the data is low.\n",
">BroadCast joins are the fastest but the drawaback is that it will consume more memory on both the executor and driver.\n",
"\n",
"This following steps give a sneak peek into how it works, which will help you understand the use cases where it can be used:<br>\n",
"1. Input file(smaller of the two tables) to be broadcasted is read by the executors in parallel into its working memory.\n",
"2. All the data from the executors is collected into driver (Hence, the need for higher memory at driver).\n",
"3. The driver then broadcasts the combined dataset (full copy) into each executor.\n",
"4. The size of the broadcasted dataset could be several (10-20+) times bigger the input in memory due to factors like deserialization.\n",
"5. Executors will end up storing the parts it read first, and also the full copy, thereby leading to a high memory requirement.\n",
"\n",
"Some things to keep in mind about BHJ:\n",
"1. It is advisable to use broadcast joins on small datasets only (dimesnion table, for example).\n",
"2. Spark does not guarantee BHJ is always chosen, since not all cases (e.g. full outer join) support BHJ.\n",
"3. You could notice skews in tasks due to uneven partition sizes; especially during aggregations, joins etc. This can be evened out by introducing Salt value (random value).<br>*Suggested formula for salt value:* random(0 – (shuffle partition count – 1))\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "UOvEVcieVn7e",
"colab_type": "text"
},
"source": [
"#### Spark Partitions"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ocpQiqOtVqPz",
"colab_type": "text"
},
"source": [
"A partition in spark is an atomic chunk of data (logical division of data) stored on a node in the cluster. Partitions are the basic units of parallelism in Spark. Having too large a number of partitions or too few is not an ideal solution. The number of partitions in spark should be decided based on the cluster configuration and requirements of the application. Increasing the number of partitions will make each partition have less data or no data at all. Generally, spark partitioning can be broken down in three ways:\n",
"1. Input\n",
"2. Shuffle\n",
"3. Output\n"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "2KY1mxZVfNsl",
"colab_type": "text"
},
"source": [
"##### Input"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "dKw48h9eEbPa",
"colab_type": "text"
},
"source": [
"Spark usually does a good job of figuring the ideal configuration for this one, except in very particular cases. It is advisable to use the spark default unless:\n",
"1. Increase parallelism\n",
"2. Heavily nested data\n",
"3. Generating data (explode)\n",
"4. Source is not optimal\n",
"5. You are using UDFs\n",
"\n",
"`spark.sql.files.maxpartitionBytes`: This property indicates the maximum number of bytes to pack into a single partition when reading files (Default 128 MB) . Use this to increase the parallelism in reading input data. For example, if you have more cores, then you can increase the number of parallel tasks which will ensure usage of the all the cores of the cluster, and increase the speed of the task."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "qwEBu3T3EbfD",
"colab_type": "text"
},
"source": [
"##### Shuffle"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "Wx0iQUpFEbus",
"colab_type": "text"
},
"source": [
"One of the major reason why most jobs lags in performance is, for the majority of the time, because they get the shuffle partitions count wrong. By default, the value is set to 200. In almost all situations, this is not ideal. If you are dealing with shuffle satge of less than 20 GB, 200 is fine, but otherwise this needs to be changed. For most cases, you can use the following equation to find the right value:\n",
">`Partition Count = Stage Input Data / Target Size` where <br>\n",
"`Largest Shuffle Stage (Target Size) < 200MB/partition` in most cases.<br>\n",
"`spark.sql.shuffle.partitions` property is used to set the ideal partition count value.\n",
"\n",
"If you ever notice that target size at the range of TBs, there is something terribly wrong, and you might want to change it back to 200, or recalculate it. Shuffle partitions can be configured for every action (not transformation) in the spark script.\n",
"\n",
"Let us use an example to explain this scenario: <br>\n",
"Assume shuffle stage input = 210 GB. <br>\n",
"Partition Count = Stage Input Data / Target Size = 210000 MB/200 MB = 1050. <br>\n",
"As you can see, my shuffle partitions should be 1050, not 200.\n",
"\n",
"But, if your cluster has 2000 cores, then set your shuffle partitions to 2000.\n",
">In a large cluster dealing with a large data job, never set your shuffle partitions less than your total core count. \n",
"\n",
"\n",
"\n",
"Shuffle stages almost always precede the write stages and having high shuffle partition count creates small files in the output. To address this, use localCheckPoint just before write & do a coalesce call. This localCheckPoint writes the Shuffle Partition to executor local disk and then coalesces into lower partition count and hence improves the overall performance of both shuffle stage and write stage."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "-HxUYv77EdSv",
"colab_type": "text"
},
"source": [
"##### Output"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "CPA6YRYrEdgG",
"colab_type": "text"
},
"source": [
"There are different methods to write the data. You can control the size, composition, number of files in the output and even the number of records in each file while writing the data. While writing the data, you can increase the parallelism, thereby ensuring you use all the resources that you have. But this approach would lead to a larger number of smaller files. Usually, this isn't a problem, but if you want bigger files, you will have to use one of the compaction techniques, preferably in a cluster with lesser configuration. There are multiple ways to change the composition of the output. Keep these two in mind about composition:\n",
"1. Coalesce: Use this to reduce the number of partitions.\n",
"2. Repartition: Use this very rarely, and never to reduce the number of partitions<br>\n",
" a. Range Paritioner - It partitions the data either based on some sorted order OR set of sorted ranges of keys. <br>\n",
" b. Hash Partioner - It spreads around the data in the partitioning based upon the key value. Hash partitioning can make distributed data skewed."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "NZ5xsdWmNOVz",
"colab_type": "text"
},
"source": [
"<a id='best-practices'></a>\n",
"### Best Practices"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "NUKLZ8G8NVuR",
"colab_type": "text"
},
"source": [
"Try to incorporate these to your coding habits for better performance:\n",
"1. Do not use NOT IN use NOT EXISTS.\n",
"2. Remove Counts, Distinct Counts (use approxCountDIstinct).\n",
"3. Drop Duplicates early.\n",
"4. Always prefer SQL functions over PandasUDF.\n",
"5. Use Hive partitions effectively.\n",
"6. Leverage Spark UI effectively. Avoid Shuffle Spills.\n",
"7. Drop Duplicates early\n",
"8. Aim for target cluster utilization of atleast 70%\n",
"\n"
]
}
]
}

0 comments on commit 552864a

Please sign in to comment.