diff --git a/configs/distributed-training/aws.yaml b/configs/distributed-training/aws.yaml new file mode 100644 index 000000000..0ccfc874d --- /dev/null +++ b/configs/distributed-training/aws.yaml @@ -0,0 +1,8 @@ +head_node_type: + name: head_node_type + instance_type: m4.2xlarge +worker_node_types: +- name: gpu_worker + instance_type: g4dn.xlarge + min_workers: 1 + max_workers: 3 diff --git a/configs/distributed-training/gce.yaml b/configs/distributed-training/gce.yaml new file mode 100644 index 000000000..7fa075313 --- /dev/null +++ b/configs/distributed-training/gce.yaml @@ -0,0 +1,8 @@ +head_node_type: + name: head_node_type + instance_type: n2-standard-8 +worker_node_types: +- name: gpu_worker + instance_type: n1-standard-4-nvidia-t4-16gb-1 + min_workers: 1 + max_workers: 3 diff --git a/templates/distributed-training/README.md b/templates/distributed-training/README.md new file mode 100644 index 000000000..df012668c --- /dev/null +++ b/templates/distributed-training/README.md @@ -0,0 +1,82 @@ +# Distributed Training With PyTorch on Fashion MNIST + +In this tutorial you will train models on the Fashion MNIST dataset using PyTorch. + + +| Details | Description | +| ---------------------- | ----------- | +| Summary | This tutorial demonstrates how to set up distributed training with PyTorch using the MNIST dataset and run on Anyscale| +| Time to Run | Less than 5 minutes | +| Compute Requirements | We recommend at least 1 GPU node. The default will scale up to 3 worker nodes each with 1 NVIDIA T4 GPU. | +| Cluster Environment | This template uses a docker image built on top of the latest Anyscale-provided Ray image using Python 3.10, which comes with PyTorch: [`anyscale/ray-ml:2.5.1-py310-gpu`](https://docs.anyscale.com/reference/base-images/overview). See the appendix below for more details. | + +## Running the tutorial +### Background +The tutorial is run from an Anyscale Workspace. Anyscale Workspaces is a fully managed development environment that enables ML practitioners to build distributed Ray applications and advance from research to development to production easily, all within a single environment. The Workspace provides developer friendly tools like VSCode and Jupyter backed by a remote scaling Ray Cluster for development. + +Anyscale requires 2 configs to start up a Workspace Cluster: +1. A cluster environment that handles dependencies. +2. A compute configuration that determines how many nodes of each type to bring up. This also configures how many nodes are available for autoscaling. + +Those have been set by default in this tutorial but can be edited and updated if needed and is covered in the appendix. + +### Run +The tutorial includes a pyton script with the code to do distributed training. You can execute this training script directly from the workspace terminal. + +To run: +```bash +python pytorch.py +``` + +You'll see training iterations and metrics as the training executes. + +### Monitor +After launching the script, you can look at the Ray dashboard. It can be accessed from the Workspace home page and enables users to track things like CPU/GPU utilization, GPU memory usage, remote task statuses, and more! + +![Dash](https://github.com/anyscale/templates/releases/download/media/workspacedash.png) + +[See here for more extensive documentation on the dashboard.](https://docs.ray.io/en/latest/ray-observability/getting-started.html) + +### Model Saving +The model will be saved in the Anyscale Artifact Store, which is automatically set up and configured with your Anyscale deployment. + +For every Anyscale Cloud, a default object storage bucket is configured during the Cloud deployment. All the Workspaces, Jobs, and Services Clusters within an Anyscale Cloud have permission to read and write to its default bucket. + +Use the following environment variables to access the default bucket: + +1. ANYSCALE_CLOUD_STORAGE_BUCKET: the name of the bucket. +2. ANYSCALE_CLOUD_STORAGE_BUCKET_REGION: the region of the bucket. +3. ANYSCALE_ARTIFACT_STORAGE: the URI to the pre-generated folder for storing your artifacts while keeping them separate them from Anyscale-generated ones. + + +You can view the saved model and artifacts by running: +```bash +aws s3 ls $ANYSCALE_ARTIFACT_STORAGE +``` + +Or, if you are on GCP: +```bash +gsutil ls $ANYSCALE_ARTIFACT_STORAGE +``` +Authentication is automatcially handled by default. + +### Submit as Anyscale Production Job +From within your Anyscale Workspace, you can run your script as an Anyscale Job. This might be useful if you want to run things in production and have a long running job. You can test that each Anyscale Job will spin up its own cluster (with the same compute config and cluster environment as the Workspace) and run the script. The Anyscale Job will automatically retry in event of failure and provides monitoring via the Ray Dashboard and Grafana. + +To submit as a Production Job you can run: + +```bash +anyscale job submit -- python pytorch.py +``` + +[You can learn more about Anyscale Jobs here.](https://docs.anyscale.com/productionize/jobs/get-started) + +### Next Steps + +#### Training on your own data: Modifying the Script +You can easily modify the script in VSCode or Jupyter to use your own data, add data pre-processing logic, or change the model architecture! Read more about loading data with Ray [from your file store or database here](https://docs.ray.io/en/latest/data/loading-data.html). + +Once the code is updated, run the same command as before to kick off your training job: +```bash +anyscale job submit -- python pytorch.py +``` \ No newline at end of file diff --git a/templates/distributed-training/pytorch.py b/templates/distributed-training/pytorch.py new file mode 100644 index 000000000..d4b3c43a3 --- /dev/null +++ b/templates/distributed-training/pytorch.py @@ -0,0 +1,159 @@ +import argparse +from typing import Dict +from ray.air import session +import os + +import torch +from torch import nn +from torch.utils.data import DataLoader +from torchvision import datasets +from torchvision.transforms import ToTensor + +import ray.train as train +from ray.train.torch import TorchTrainer +from ray.air.config import ScalingConfig +from ray.air.config import RunConfig + + +# Download training data from open datasets. +training_data = datasets.FashionMNIST( + root="~/data", + train=True, + download=True, + transform=ToTensor(), +) + +# Download test data from open datasets. +test_data = datasets.FashionMNIST( + root="~/data", + train=False, + download=True, + transform=ToTensor(), +) + + +# Define model +class NeuralNetwork(nn.Module): + def __init__(self): + super(NeuralNetwork, self).__init__() + self.flatten = nn.Flatten() + self.linear_relu_stack = nn.Sequential( + nn.Linear(28 * 28, 512), + nn.ReLU(), + nn.Linear(512, 512), + nn.ReLU(), + nn.Linear(512, 10), + nn.ReLU(), + ) + + def forward(self, x): + x = self.flatten(x) + logits = self.linear_relu_stack(x) + return logits + + +def train_epoch(dataloader, model, loss_fn, optimizer): + size = len(dataloader.dataset) // session.get_world_size() + model.train() + for batch, (X, y) in enumerate(dataloader): + # Compute prediction error + pred = model(X) + loss = loss_fn(pred, y) + + # Backpropagation + optimizer.zero_grad() + loss.backward() + optimizer.step() + + if batch % 100 == 0: + loss, current = loss.item(), batch * len(X) + print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]") + + +def validate_epoch(dataloader, model, loss_fn): + size = len(dataloader.dataset) // session.get_world_size() + num_batches = len(dataloader) + model.eval() + test_loss, correct = 0, 0 + with torch.no_grad(): + for X, y in dataloader: + pred = model(X) + test_loss += loss_fn(pred, y).item() + correct += (pred.argmax(1) == y).type(torch.float).sum().item() + test_loss /= num_batches + correct /= size + print( + f"Test Error: \n " + f"Accuracy: {(100 * correct):>0.1f}%, " + f"Avg loss: {test_loss:>8f} \n" + ) + return test_loss + + +def train_func(config: Dict): + batch_size = config["batch_size"] + lr = config["lr"] + epochs = config["epochs"] + + worker_batch_size = batch_size // session.get_world_size() + + # Create data loaders. + train_dataloader = DataLoader(training_data, batch_size=worker_batch_size) + test_dataloader = DataLoader(test_data, batch_size=worker_batch_size) + + train_dataloader = train.torch.prepare_data_loader(train_dataloader) + test_dataloader = train.torch.prepare_data_loader(test_dataloader) + + # Create model. + model = NeuralNetwork() + model = train.torch.prepare_model(model) + + loss_fn = nn.CrossEntropyLoss() + optimizer = torch.optim.SGD(model.parameters(), lr=lr) + + for _ in range(epochs): + train_epoch(train_dataloader, model, loss_fn, optimizer) + loss = validate_epoch(test_dataloader, model, loss_fn) + session.report(dict(loss=loss)) + +artifact_storage_path = os.environ['ANYSCALE_ARTIFACT_STORAGE'] +'/pytorch-tutorial' +print(f"Storing artifacts in {artifact_storage_path}") + +def train_fashion_mnist(num_workers=2, use_gpu=True): + trainer = TorchTrainer( + train_loop_per_worker=train_func, + train_loop_config={"lr": 1e-3, "batch_size": 64, "epochs": 4}, + scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu), + run_config=RunConfig(storage_path=artifact_storage_path) + + ) + result = trainer.fit() + print(f"Last result: {result.metrics}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + parser.add_argument( + "--num-workers", + "-n", + type=int, + default=2, + help="Sets number of workers for training.", + ) + parser.add_argument( + "--use-gpu", action="store_true", default=True, help="Enables GPU training" + ) + parser.add_argument( + "--smoke-test", + action="store_true", + default=False, + help="Finish quickly for testing.", + ) + + args, _ = parser.parse_known_args() + + import ray + + + train_fashion_mnist(num_workers=args.num_workers, use_gpu=args.use_gpu) \ No newline at end of file