-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Distributed training #8
base: main
Are you sure you want to change the base?
Changes from all commits
03d37d3
36542b4
5fe2ca2
fe043be
ba705af
f17c34c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
head_node_type: | ||
name: head_node_type | ||
instance_type: m4.2xlarge | ||
worker_node_types: | ||
- name: gpu_worker | ||
instance_type: g4dn.xlarge | ||
min_workers: 1 | ||
max_workers: 3 | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
head_node_type: | ||
name: head_node_type | ||
instance_type: n2-standard-8 | ||
worker_node_types: | ||
- name: gpu_worker | ||
instance_type: n1-standard-4-nvidia-t4-16gb-1 | ||
min_workers: 1 | ||
max_workers: 3 |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,82 @@ | ||||||
# Distributed Training With PyTorch on Fashion MNIST | ||||||
|
||||||
In this tutorial you will train models on the Fashion MNIST dataset using PyTorch. | ||||||
|
||||||
|
||||||
| Details | Description | | ||||||
| ---------------------- | ----------- | | ||||||
| Summary | This tutorial demonstrates how to set up distributed training with PyTorch using the MNIST dataset and run on Anyscale| | ||||||
| Time to Run | Less than 5 minutes | | ||||||
| Compute Requirements | We recommend at least 1 GPU node. The default will scale up to 3 worker nodes each with 1 NVIDIA T4 GPU. | | ||||||
| Cluster Environment | This template uses a docker image built on top of the latest Anyscale-provided Ray image using Python 3.10, which comes with PyTorch: [`anyscale/ray-ml:2.5.1-py310-gpu`](https://docs.anyscale.com/reference/base-images/overview). See the appendix below for more details. | | ||||||
|
||||||
## Running the tutorial | ||||||
### Background | ||||||
The tutorial is run from an Anyscale Workspace. Anyscale Workspaces is a fully managed development environment that enables ML practitioners to build distributed Ray applications and advance from research to development to production easily, all within a single environment. The Workspace provides developer friendly tools like VSCode and Jupyter backed by a remote scaling Ray Cluster for development. | ||||||
|
||||||
Anyscale requires 2 configs to start up a Workspace Cluster: | ||||||
1. A cluster environment that handles dependencies. | ||||||
2. A compute configuration that determines how many nodes of each type to bring up. This also configures how many nodes are available for autoscaling. | ||||||
|
||||||
Those have been set by default in this tutorial but can be edited and updated if needed and is covered in the appendix. | ||||||
|
||||||
### Run | ||||||
The tutorial includes a pyton script with the code to do distributed training. You can execute this training script directly from the workspace terminal. | ||||||
|
||||||
To run: | ||||||
```bash | ||||||
python pytorch.py | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest having a slightly more descriptive name here, e.g. |
||||||
``` | ||||||
|
||||||
You'll see training iterations and metrics as the training executes. | ||||||
|
||||||
### Monitor | ||||||
After launching the script, you can look at the Ray dashboard. It can be accessed from the Workspace home page and enables users to track things like CPU/GPU utilization, GPU memory usage, remote task statuses, and more! | ||||||
|
||||||
![Dash](https://github.com/anyscale/templates/releases/download/media/workspacedash.png) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This image is highlighting VSCode 😅 |
||||||
|
||||||
[See here for more extensive documentation on the dashboard.](https://docs.ray.io/en/latest/ray-observability/getting-started.html) | ||||||
|
||||||
### Model Saving | ||||||
The model will be saved in the Anyscale Artifact Store, which is automatically set up and configured with your Anyscale deployment. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we point to Anyscale documentation here? I feel like this is introducing a new concept for something that should be more simple (a cloud storage bucket). |
||||||
|
||||||
For every Anyscale Cloud, a default object storage bucket is configured during the Cloud deployment. All the Workspaces, Jobs, and Services Clusters within an Anyscale Cloud have permission to read and write to its default bucket. | ||||||
|
||||||
Use the following environment variables to access the default bucket: | ||||||
|
||||||
1. ANYSCALE_CLOUD_STORAGE_BUCKET: the name of the bucket. | ||||||
2. ANYSCALE_CLOUD_STORAGE_BUCKET_REGION: the region of the bucket. | ||||||
3. ANYSCALE_ARTIFACT_STORAGE: the URI to the pre-generated folder for storing your artifacts while keeping them separate them from Anyscale-generated ones. | ||||||
|
||||||
|
||||||
You can view the saved model and artifacts by running: | ||||||
```bash | ||||||
aws s3 ls $ANYSCALE_ARTIFACT_STORAGE | ||||||
``` | ||||||
|
||||||
Or, if you are on GCP: | ||||||
```bash | ||||||
gsutil ls $ANYSCALE_ARTIFACT_STORAGE | ||||||
``` | ||||||
Authentication is automatcially handled by default. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
### Submit as Anyscale Production Job | ||||||
From within your Anyscale Workspace, you can run your script as an Anyscale Job. This might be useful if you want to run things in production and have a long running job. You can test that each Anyscale Job will spin up its own cluster (with the same compute config and cluster environment as the Workspace) and run the script. The Anyscale Job will automatically retry in event of failure and provides monitoring via the Ray Dashboard and Grafana. | ||||||
|
||||||
To submit as a Production Job you can run: | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we have consistency in the naming? We use "Anyscale Production Job", "Anyscale Job", and "Production Job" here - it may not be obvious to the user that all three of these combinations are meant to be the same thing 😄 |
||||||
|
||||||
```bash | ||||||
anyscale job submit -- python pytorch.py | ||||||
``` | ||||||
|
||||||
[You can learn more about Anyscale Jobs here.](https://docs.anyscale.com/productionize/jobs/get-started) | ||||||
|
||||||
### Next Steps | ||||||
|
||||||
#### Training on your own data: Modifying the Script | ||||||
You can easily modify the script in VSCode or Jupyter to use your own data, add data pre-processing logic, or change the model architecture! Read more about loading data with Ray [from your file store or database here](https://docs.ray.io/en/latest/data/loading-data.html). | ||||||
|
||||||
Once the code is updated, run the same command as before to kick off your training job: | ||||||
```bash | ||||||
anyscale job submit -- python pytorch.py | ||||||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
import argparse | ||
from typing import Dict | ||
from ray.air import session | ||
import os | ||
|
||
import torch | ||
from torch import nn | ||
from torch.utils.data import DataLoader | ||
from torchvision import datasets | ||
from torchvision.transforms import ToTensor | ||
|
||
import ray.train as train | ||
from ray.train.torch import TorchTrainer | ||
from ray.air.config import ScalingConfig | ||
from ray.air.config import RunConfig | ||
|
||
|
||
# Download training data from open datasets. | ||
training_data = datasets.FashionMNIST( | ||
root="~/data", | ||
train=True, | ||
download=True, | ||
transform=ToTensor(), | ||
) | ||
|
||
# Download test data from open datasets. | ||
test_data = datasets.FashionMNIST( | ||
root="~/data", | ||
train=False, | ||
download=True, | ||
transform=ToTensor(), | ||
) | ||
|
||
|
||
# Define model | ||
class NeuralNetwork(nn.Module): | ||
def __init__(self): | ||
super(NeuralNetwork, self).__init__() | ||
self.flatten = nn.Flatten() | ||
self.linear_relu_stack = nn.Sequential( | ||
nn.Linear(28 * 28, 512), | ||
nn.ReLU(), | ||
nn.Linear(512, 512), | ||
nn.ReLU(), | ||
nn.Linear(512, 10), | ||
nn.ReLU(), | ||
) | ||
|
||
def forward(self, x): | ||
x = self.flatten(x) | ||
logits = self.linear_relu_stack(x) | ||
return logits | ||
|
||
|
||
def train_epoch(dataloader, model, loss_fn, optimizer): | ||
size = len(dataloader.dataset) // session.get_world_size() | ||
model.train() | ||
for batch, (X, y) in enumerate(dataloader): | ||
# Compute prediction error | ||
pred = model(X) | ||
loss = loss_fn(pred, y) | ||
|
||
# Backpropagation | ||
optimizer.zero_grad() | ||
loss.backward() | ||
optimizer.step() | ||
|
||
if batch % 100 == 0: | ||
loss, current = loss.item(), batch * len(X) | ||
print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]") | ||
|
||
|
||
def validate_epoch(dataloader, model, loss_fn): | ||
size = len(dataloader.dataset) // session.get_world_size() | ||
num_batches = len(dataloader) | ||
model.eval() | ||
test_loss, correct = 0, 0 | ||
with torch.no_grad(): | ||
for X, y in dataloader: | ||
pred = model(X) | ||
test_loss += loss_fn(pred, y).item() | ||
correct += (pred.argmax(1) == y).type(torch.float).sum().item() | ||
test_loss /= num_batches | ||
correct /= size | ||
print( | ||
f"Test Error: \n " | ||
f"Accuracy: {(100 * correct):>0.1f}%, " | ||
f"Avg loss: {test_loss:>8f} \n" | ||
) | ||
return test_loss | ||
|
||
|
||
def train_func(config: Dict): | ||
batch_size = config["batch_size"] | ||
lr = config["lr"] | ||
epochs = config["epochs"] | ||
|
||
worker_batch_size = batch_size // session.get_world_size() | ||
|
||
# Create data loaders. | ||
train_dataloader = DataLoader(training_data, batch_size=worker_batch_size) | ||
test_dataloader = DataLoader(test_data, batch_size=worker_batch_size) | ||
|
||
train_dataloader = train.torch.prepare_data_loader(train_dataloader) | ||
test_dataloader = train.torch.prepare_data_loader(test_dataloader) | ||
|
||
# Create model. | ||
model = NeuralNetwork() | ||
model = train.torch.prepare_model(model) | ||
|
||
loss_fn = nn.CrossEntropyLoss() | ||
optimizer = torch.optim.SGD(model.parameters(), lr=lr) | ||
|
||
for _ in range(epochs): | ||
train_epoch(train_dataloader, model, loss_fn, optimizer) | ||
loss = validate_epoch(test_dataloader, model, loss_fn) | ||
session.report(dict(loss=loss)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should save a checkpoint here 😄 |
||
|
||
artifact_storage_path = os.environ['ANYSCALE_ARTIFACT_STORAGE'] +'/pytorch-tutorial' | ||
print(f"Storing artifacts in {artifact_storage_path}") | ||
|
||
def train_fashion_mnist(num_workers=2, use_gpu=True): | ||
trainer = TorchTrainer( | ||
train_loop_per_worker=train_func, | ||
train_loop_config={"lr": 1e-3, "batch_size": 64, "epochs": 4}, | ||
scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu), | ||
run_config=RunConfig(storage_path=artifact_storage_path) | ||
|
||
) | ||
result = trainer.fit() | ||
print(f"Last result: {result.metrics}") | ||
|
||
|
||
if __name__ == "__main__": | ||
parser = argparse.ArgumentParser() | ||
|
||
parser.add_argument( | ||
"--num-workers", | ||
"-n", | ||
type=int, | ||
default=2, | ||
help="Sets number of workers for training.", | ||
) | ||
parser.add_argument( | ||
"--use-gpu", action="store_true", default=True, help="Enables GPU training" | ||
) | ||
Comment on lines
+144
to
+146
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this makes it so that this value is always true? |
||
parser.add_argument( | ||
"--smoke-test", | ||
action="store_true", | ||
default=False, | ||
help="Finish quickly for testing.", | ||
) | ||
Comment on lines
+147
to
+152
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't used. |
||
|
||
args, _ = parser.parse_known_args() | ||
|
||
import ray | ||
|
||
|
||
train_fashion_mnist(num_workers=args.num_workers, use_gpu=args.use_gpu) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think these values really make sense with the current training script. If we are showing distributed training with 2 GPUs, I think we should either have
min_workers
be 2 (to make the script run immediately) or 1 (if we want to show autoscaling).