Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed training #8

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions configs/distributed-training/aws.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
head_node_type:
name: head_node_type
instance_type: m4.2xlarge
worker_node_types:
- name: gpu_worker
instance_type: g4dn.xlarge
min_workers: 1
max_workers: 3
Comment on lines +7 to +8
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these values really make sense with the current training script. If we are showing distributed training with 2 GPUs, I think we should either have min_workers be 2 (to make the script run immediately) or 1 (if we want to show autoscaling).

8 changes: 8 additions & 0 deletions configs/distributed-training/gce.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
head_node_type:
name: head_node_type
instance_type: n2-standard-8
worker_node_types:
- name: gpu_worker
instance_type: n1-standard-4-nvidia-t4-16gb-1
min_workers: 1
max_workers: 3
82 changes: 82 additions & 0 deletions templates/distributed-training/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Distributed Training With PyTorch on Fashion MNIST

In this tutorial you will train models on the Fashion MNIST dataset using PyTorch.


| Details | Description |
| ---------------------- | ----------- |
| Summary | This tutorial demonstrates how to set up distributed training with PyTorch using the MNIST dataset and run on Anyscale|
| Time to Run | Less than 5 minutes |
| Compute Requirements | We recommend at least 1 GPU node. The default will scale up to 3 worker nodes each with 1 NVIDIA T4 GPU. |
| Cluster Environment | This template uses a docker image built on top of the latest Anyscale-provided Ray image using Python 3.10, which comes with PyTorch: [`anyscale/ray-ml:2.5.1-py310-gpu`](https://docs.anyscale.com/reference/base-images/overview). See the appendix below for more details. |

## Running the tutorial
### Background
The tutorial is run from an Anyscale Workspace. Anyscale Workspaces is a fully managed development environment that enables ML practitioners to build distributed Ray applications and advance from research to development to production easily, all within a single environment. The Workspace provides developer friendly tools like VSCode and Jupyter backed by a remote scaling Ray Cluster for development.

Anyscale requires 2 configs to start up a Workspace Cluster:
1. A cluster environment that handles dependencies.
2. A compute configuration that determines how many nodes of each type to bring up. This also configures how many nodes are available for autoscaling.

Those have been set by default in this tutorial but can be edited and updated if needed and is covered in the appendix.

### Run
The tutorial includes a pyton script with the code to do distributed training. You can execute this training script directly from the workspace terminal.

To run:
```bash
python pytorch.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest having a slightly more descriptive name here, e.g. train_torch_model.py.

```

You'll see training iterations and metrics as the training executes.

### Monitor
After launching the script, you can look at the Ray dashboard. It can be accessed from the Workspace home page and enables users to track things like CPU/GPU utilization, GPU memory usage, remote task statuses, and more!

![Dash](https://github.com/anyscale/templates/releases/download/media/workspacedash.png)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This image is highlighting VSCode 😅


[See here for more extensive documentation on the dashboard.](https://docs.ray.io/en/latest/ray-observability/getting-started.html)

### Model Saving
The model will be saved in the Anyscale Artifact Store, which is automatically set up and configured with your Anyscale deployment.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we point to Anyscale documentation here? I feel like this is introducing a new concept for something that should be more simple (a cloud storage bucket).


For every Anyscale Cloud, a default object storage bucket is configured during the Cloud deployment. All the Workspaces, Jobs, and Services Clusters within an Anyscale Cloud have permission to read and write to its default bucket.

Use the following environment variables to access the default bucket:

1. ANYSCALE_CLOUD_STORAGE_BUCKET: the name of the bucket.
2. ANYSCALE_CLOUD_STORAGE_BUCKET_REGION: the region of the bucket.
3. ANYSCALE_ARTIFACT_STORAGE: the URI to the pre-generated folder for storing your artifacts while keeping them separate them from Anyscale-generated ones.


You can view the saved model and artifacts by running:
```bash
aws s3 ls $ANYSCALE_ARTIFACT_STORAGE
```

Or, if you are on GCP:
```bash
gsutil ls $ANYSCALE_ARTIFACT_STORAGE
```
Authentication is automatcially handled by default.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Authentication is automatcially handled by default.
Authentication is automatically handled by default.


### Submit as Anyscale Production Job
From within your Anyscale Workspace, you can run your script as an Anyscale Job. This might be useful if you want to run things in production and have a long running job. You can test that each Anyscale Job will spin up its own cluster (with the same compute config and cluster environment as the Workspace) and run the script. The Anyscale Job will automatically retry in event of failure and provides monitoring via the Ray Dashboard and Grafana.

To submit as a Production Job you can run:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have consistency in the naming? We use "Anyscale Production Job", "Anyscale Job", and "Production Job" here - it may not be obvious to the user that all three of these combinations are meant to be the same thing 😄


```bash
anyscale job submit -- python pytorch.py
```

[You can learn more about Anyscale Jobs here.](https://docs.anyscale.com/productionize/jobs/get-started)

### Next Steps

#### Training on your own data: Modifying the Script
You can easily modify the script in VSCode or Jupyter to use your own data, add data pre-processing logic, or change the model architecture! Read more about loading data with Ray [from your file store or database here](https://docs.ray.io/en/latest/data/loading-data.html).

Once the code is updated, run the same command as before to kick off your training job:
```bash
anyscale job submit -- python pytorch.py
```
159 changes: 159 additions & 0 deletions templates/distributed-training/pytorch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import argparse
from typing import Dict
from ray.air import session
import os

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

import ray.train as train
from ray.train.torch import TorchTrainer
from ray.air.config import ScalingConfig
from ray.air.config import RunConfig


# Download training data from open datasets.
training_data = datasets.FashionMNIST(
root="~/data",
train=True,
download=True,
transform=ToTensor(),
)

# Download test data from open datasets.
test_data = datasets.FashionMNIST(
root="~/data",
train=False,
download=True,
transform=ToTensor(),
)


# Define model
class NeuralNetwork(nn.Module):
def __init__(self):
super(NeuralNetwork, self).__init__()
self.flatten = nn.Flatten()
self.linear_relu_stack = nn.Sequential(
nn.Linear(28 * 28, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, 10),
nn.ReLU(),
)

def forward(self, x):
x = self.flatten(x)
logits = self.linear_relu_stack(x)
return logits


def train_epoch(dataloader, model, loss_fn, optimizer):
size = len(dataloader.dataset) // session.get_world_size()
model.train()
for batch, (X, y) in enumerate(dataloader):
# Compute prediction error
pred = model(X)
loss = loss_fn(pred, y)

# Backpropagation
optimizer.zero_grad()
loss.backward()
optimizer.step()

if batch % 100 == 0:
loss, current = loss.item(), batch * len(X)
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")


def validate_epoch(dataloader, model, loss_fn):
size = len(dataloader.dataset) // session.get_world_size()
num_batches = len(dataloader)
model.eval()
test_loss, correct = 0, 0
with torch.no_grad():
for X, y in dataloader:
pred = model(X)
test_loss += loss_fn(pred, y).item()
correct += (pred.argmax(1) == y).type(torch.float).sum().item()
test_loss /= num_batches
correct /= size
print(
f"Test Error: \n "
f"Accuracy: {(100 * correct):>0.1f}%, "
f"Avg loss: {test_loss:>8f} \n"
)
return test_loss


def train_func(config: Dict):
batch_size = config["batch_size"]
lr = config["lr"]
epochs = config["epochs"]

worker_batch_size = batch_size // session.get_world_size()

# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=worker_batch_size)
test_dataloader = DataLoader(test_data, batch_size=worker_batch_size)

train_dataloader = train.torch.prepare_data_loader(train_dataloader)
test_dataloader = train.torch.prepare_data_loader(test_dataloader)

# Create model.
model = NeuralNetwork()
model = train.torch.prepare_model(model)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=lr)

for _ in range(epochs):
train_epoch(train_dataloader, model, loss_fn, optimizer)
loss = validate_epoch(test_dataloader, model, loss_fn)
session.report(dict(loss=loss))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should save a checkpoint here 😄


artifact_storage_path = os.environ['ANYSCALE_ARTIFACT_STORAGE'] +'/pytorch-tutorial'
print(f"Storing artifacts in {artifact_storage_path}")

def train_fashion_mnist(num_workers=2, use_gpu=True):
trainer = TorchTrainer(
train_loop_per_worker=train_func,
train_loop_config={"lr": 1e-3, "batch_size": 64, "epochs": 4},
scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
run_config=RunConfig(storage_path=artifact_storage_path)

)
result = trainer.fit()
print(f"Last result: {result.metrics}")


if __name__ == "__main__":
parser = argparse.ArgumentParser()

parser.add_argument(
"--num-workers",
"-n",
type=int,
default=2,
help="Sets number of workers for training.",
)
parser.add_argument(
"--use-gpu", action="store_true", default=True, help="Enables GPU training"
)
Comment on lines +144 to +146
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes it so that this value is always true?

parser.add_argument(
"--smoke-test",
action="store_true",
default=False,
help="Finish quickly for testing.",
)
Comment on lines +147 to +152
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't used.


args, _ = parser.parse_known_args()

import ray


train_fashion_mnist(num_workers=args.num_workers, use_gpu=args.use_gpu)