GuidesAPI reference
DiscordDashboard
DiscordDashboard

Deploy a PyTorch model

Create a scalable serverless endpoint for running inference on your PyTorch model

PyTorch is the de facto ML framework, and although Pipeline Cloud supports a range of frameworks, in practice most deployed pipelines are built from PyTorch models.

In this guide we'll assemble a basic neural network, deploy it to Pipeline Cloud for inference, and submit some runs (essentially inference requests). We'll also see how to deploy a trained model by using the PipelineFile interface to upload the weights. This tutorial uses Python and our Python library pipeline-ai. Let's go.

πŸ“˜

Install PyTorch by running pip install torch

torch is the package name for PyTorch, and you'll see it imported below.

NOTE: This is a walkthrough, so many of the below code snippets are mere chunks of a larger script. If you're skimming or just want to see code, then skip to the conclusion where you'll find the complete script.

Creating a PyTorch model

First, let's create a simple NN using the example from the PyTorch docs. This simple model first flattens the input tensor, performs some linear operations on it, and is regulated by a ReLU activation function.

import torch
from torch import nn

class NeuralNetwork(nn.Module):
	def __init__(self) -> None:
		super().__init__()
		self.flatten = nn.Flatten()
		self.linear_relu_stack = nn.Sequential(
			nn.Linear(4, 512),
			nn.ReLU(),
			nn.Linear(512, 512),
			nn.ReLU(),
			nn.Linear(512, 10),
		)

	def forward(self, x: torch.Tensor) -> torch.Tensor:
		x = self.flatten(x)
		logits = self.linear_relu_stack(x)
		return logits

The init method describes the model and its layers, and the forward method describes how an input becomes an output by passing through the network.

Running the model on CPU

To 'run' this model locally, we need to instantiate the NN, create an input tensor, pass it to the NN's forward method, and then observe the output.

import torch

model = NeuralNetwork()

input = torch.randn(2, 4)
output = model.forward(input)

print(output.size())

We'll print the shape (.size()) of the output tensor rather than the very long tensor itself. Running this script, we see this in the terminal:

torch.Size([2, 10])

Great! Everything is working as expected. This network is small and speedy, so it can run on CPU. Larger models or larger inputs will require more computing power, and it's common practice to use a GPU for these models since they can handle matrix multiplication more efficiently.

Running the model on GPU

If you have an NVIDIA GPU attached to your computer, you can use this modified version of the above script to send your model and inputs into GPU memory (known as VRAM) so that the computations are performed there. If you don't have a GPU, this script will fail, but it's worth seeing the syntax anyway. Notice how .to() is called on both the model and the input.

import torch

gpu = torch.device("cuda")

model = NeuralNetwork().to(gpu)

input = torch.randn(2, 4).to(gpu)
output = model.forward(input)

print(output.size())
torch.Size([2, 10])

🚧

Set up your GPU first before trying the above script

If you don't have an NVIDIA GPU, skip to the next section. If you do, make sure you've installed the necessary drivers, a CUDA-compiled version of PyTorch, and the CUDA toolkit. Installing these things is easiest with conda in our experience.

Converting the model into a pipeline

Perhaps you're building an app where users need to run this model 'whenever'. One option is to turn your computer into an API server and execute jobs on your own CPU or GPU as they come in.

Or you can outsource this compute demand to a remote cloud provider such as Pipeline Cloud. Pipeline Cloud automatically constructs a serverless API which can accept your inference requests, execute them on powerful servers, and return the result, so that you don't have to worry about managing servers, handling task queues, auto-scaling, caching models intelligently, or any other MLOps hassle like that.

From your app you can simply POST to an private API endpoint for your model to create a 'run'. We will handle execution and everything else, and then once the 'run' is complete, the result will be returned to you. You can find more benefits to our serverless compute offering here.

πŸ“˜

A 'run' is a forward pass

In the above script, we called model.forward(input) to perform inference on the model. A 'run', in Pipeline Cloud language, is the same thing – a single forward pass 'through the model'.

Now let's think about deploying this to Pipeline Cloud to generate an inference endpoint which you can call from your app.

Building the pipeline

To add the model to Pipeline Cloud's servers, we need to convert it into a 'pipeline'. There is no functional difference between a model and a 'pipeline', it's just a wrapper which defines the API shape and computational graph so that we can parse and smartly serve your model in the best way possible. Don't worry if that sounds mysterious for now.

First, let's install the pipeline-ai library in order to begin assembling the pipeline:

pip install pipeline-ai

Let's recap what we want to achieve: we want to create an endpoint which will take the body of a POST request as an input, pass it through the forward function of our model, and then return the output in the response of the request. Before deploying to Pipeline Cloud, we'll build a local version of the pipeline.

Creating the pipeline

from pipeline import Pipeline, Variable
import torch

with Pipeline("pytorch-demo") as pipeline_blueprint:

	input_tensor = Variable(torch.Tensor, is_input=True)
	pipeline_blueprint.add_variable(input_tensor)
  
	model = NeuralNetwork()

	output_tensor = model.forward(input_tensor)
	pipeline_blueprint.output(output_tensor)

There's a lot of new concepts in the above code snippet, so let's go through them line by line. First we import the Pipeline and Variable interfaces from pipeline (aka pipeline-ai).

Then we instantiate a new Pipeline on line 4, and here it has the name 'PyTorch demo'. You can choose any name you want; it can be helpful to label your pipelines so you can find them more easily on the dashboard. You need to use a context manager as shown for the variables to be added to the pipeline correctly.

This blueprint is basically a diagram for what should happen when a 'run' is executed. Internally a graph is built of the flow from start to end, and we use this graph on our servers to maximise speed and efficiency of your inference. So on line 6 we tell the blueprint to expect an input of type torch.Tensor. Recall how we created random inputs earlier by calling torch.randn(2, 4), which returns a torch.Tensor.

🚧

PyTorch tensors cannot be JSON-serialised

Annoyingly, PyTorch tensors aren't JSON-serialisable, which means we won't be able to include them raw in your input or output when we deploy the API endpoint to Pipeline Cloud. But we'll see a workaround for this later.

On line 7, we add the input Variable into the blueprint. It's important to note that arbitrary 'pre-defined' inputs such as input_boolean = False cannot be added into the blueprint since the blueprint is not able to hold any fixed values. Instead, we must create a Variable with type bool or fix this boolean within the model itself.

On line 9 we instantiate the model, as before. And on line 11 we perform the forward pass on the input_tensor. Finally, on line 12, we tell the blueprint to emit this output_tensor as a result of the pipeline. Super! We've now built the pipeline blueprint, and there's a couple of things left to do to prepare our model for deployment.

Running the pipeline locally

All pipelines that you deploy to Pipeline Cloud can also be run locally (i.e. on your own computer), provided you have the necessary compute requirements.

So, earlier we ran the PyTorch model on CPU. Adding the pipeline blueprint to the bottom of that script, we get this:

import torch
from torch import nn

class NeuralNetwork(nn.Module):
	def __init__(self) -> None:
		super().__init__()
		self.flatten = nn.Flatten()
		self.linear_relu_stack = nn.Sequential(
			nn.Linear(4, 512),
			nn.ReLU(),
			nn.Linear(512, 512),
			nn.ReLU(),
			nn.Linear(512, 10),
		)

	def forward(self, x: torch.Tensor) -> torch.Tensor:
		x = self.flatten(x)
		logits = self.linear_relu_stack(x)
		return logits
  
  
from pipeline import Pipeline, Variable
import torch

with Pipeline("pytorch-demo") as pipeline_blueprint:

	input_tensor = Variable(torch.Tensor, is_input=True)
	pipeline_blueprint.add_variable(input_tensor)
  
	model = NeuralNetwork()

	output_tensor = model.forward(input_tensor)
	pipeline_blueprint.output(output_tensor)

If we run this script, no output is printed to the terminal. That's because the pipeline has only been assembled, but not run. To perform a local run we need to 'get' the pipeline and run it. Add the following lines to the bottom of the script to do exactly that:

pytorch_pipeline = Pipeline.get_pipeline("pytorch-demo")

example_input = torch.randn(2, 4)
result = pytorch_pipeline.run(example_input)

print(result)

Notice how we used the pipeline's name (defined earlier) to access the computational graph before calling the run method. We're almost there! However, when you try to run this script, you'll notice an error:

AttributeError: 'Variable' object has no attribute 'flatten'

This is occurring in the forward method of the NeuralNetwork class, and it looks like the input_tensor Variable from the blueprint is being directly passed to the function, rather than its actual runtime value, which we know is a tensor (example_input). So, we need to tell the forward method of the model that it needs to 'fetch' the true runtime value of input_tensor.

We do this by decorating the forward method with the @pipeline_function decorator:

from pipeline import pipeline_function

...

class NeuralNetwork(nn.Module):
  ...
	
  @pipeline_function
	def forward(self, x: torch.Tensor) -> torch.Tensor:
		x = self.flatten(x)
		logits = self.linear_relu_stack(x)
		return logits

Any function which directly receives the value of a blueprint Variable should be wrapped in the @pipeline_function decorator, which instructs the function to access the runtime values of its input blueprint Variables.

Similarly, any object (such as the NeuralNetwork class) which is instantiated directly inside the Pipeline blueprint (see line 30 above, where model = NeuralNetwork() is called from within the context manager) needs to be decorated with @pipeline_model. Accordingly:

from pipeline import pipeline_function, pipeline_model

...

@pipeline_model
class NeuralNetwork(nn.Module):
  ...

The blueprint and decorators can be a bit confusing if you're new to pipeline-ai but don't be scared by them. They exist to help convert blueprint Variables into their runtime values automatically, so that your functions run as expected. The library will usually raise a helpful Exception in the event of a missing decorator, or if you accidentally include a runtime value in the blueprint.

Everything's set up for a local run now, so let's test it:

[tensor([[-0.0138,  0.0716, -0.2074, -0.0073,  0.0806, -0.1348, -0.0979,  0.1820,
         -0.0417,  0.0059],
        [ 0.0215,  0.1512,  0.0052, -0.0503,  0.0423, -0.1144,  0.0655,  0.0270,
         -0.0877, -0.0703]], grad_fn=<AddmmBackward0>)]

Eyyyy, super, it worked! We converted our PyTorch model into a pipeline and ran it locally! Notice how the result is a list – that's because a pipeline outputs a list by default and calling pipeline_blueprint.output() simply appends a new output to this list. This way, you can easily compose multi-input and multi-output pipelines.

Uploading the pipeline to Pipeline Cloud

Now we know that the pipeline works locally, we can upload it to Pipeline Cloud's servers for remote computation. It's very simple to modify our existing code to do this. Make sure to grab an API token from the dashboard here.

...

from pipeline import PipelineCloud

pytorch_pipeline = Pipeline.get_pipeline("pytorch-demo")

api = PipelineCloud(token="PIPELINE_API_TOKEN")
uploaded_pipeline = api.upload_pipeline(pytorch_pipeline)

print(f"Uploaded pipeline id: {uploaded_pipeline.id}")

We authenticate with Pipeline Cloud on line 5, upload the pipeline on line 7, and print out the remote pipeline id on line 8. To refer to uploaded pipelines, we'll need to use this id, so record it somewhere. You can always find it on the dashboard too, if necessary.

There's one more thing we need to do before we can do remote runs on our deployed pipeline. Unfortunately tensors are not JSON-serialisable so we need to modify the pipeline to return the tensor data as a 'nested list'. This can be a bit complicated, so pipeline-ai comes with a helper tensor_to_list to do the hard work. Let's add it to the blueprint.

from pipeline.util.torch_utils import tensor_to_list

...

with Pipeline("pytorch-demo") as pipeline_blueprint:
  
  ...
  
  output_tensor = model.forward(input_tensor)
  output_list = tensor_to_list(output_tensor)
  pipeline_blueprint.output(output_list)

Notice how on line 10 we converted the result of the forward method into a list so that it can be sent over JSON through the API. Now we just need to upload this updated pipeline, and we'll be ready to submit runs! So, trigger upload_pipeline() once again, save the pipeline id, and now let's do a remote run!

example_input = tensor_to_list(torch.randn(2, 4))

run = api.run_pipeline(
  uploaded_pipeline.id,
  [example_input],
)

print(f"Run id: {run.id}")
print(run.result_preview)

And then we see printed out in the terminal:

[[[-0.06113505735993385, -0.04900337755680084, -0.028971631079912186, 0.0019726287573575974, -0.00612134113907814, 0.026191186159849167, -0.10189536213874817, -0.10396647453308105, 0.09754741191864014, -0.16766135394573212], [-0.1953875571489334, 0.010309990495443344, -0.04846293479204178, -0.061653949320316315, 0.10781243443489075, 0.08252111077308655, -0.23458144068717957, -0.14181871712207794, 0.065968818962574, -0.1120561957359314]]]

We just performed inference on our pipeline in the cloud! For smallish results you can simply print the result_preview property of the returned RunGet class. When you're returning a lot of data, you may see that result_preview is None. In this case (as long as the run's state is 'COMPLETE') you can download the result by running this:

result = api.download_result(run.result)
print(result)

πŸ“˜

If you don't see a result

You can print run.error_info to see whether your run failed and if so, what caused it to fail. During debugging this is a helpful feature as the raw traceback from the servers is forwarded so you can read it as if it were a traceback from your own computer.

Look how much ground we've covered: from a basic PyTorch model, we've made a deployed instance of the model which is ready to be added to apps where it will scale automatically, handle multiple parallel requests and spikes with ease, report usage metrics to the dashboard, and cost absolutely nothing while it's not in use. Pipeline Cloud is a truly convenient and modern way of serving ML models in production.

Using GPU on Pipeline Cloud

You might have noticed that we uploaded the model without specifying whether it should run on CPU or GPU. Since our code didn't reference any GPU, nor did we call .to() to transfer the data to any NVIDIA device, by default the pipeline will execute exactly as it had locally – on CPU.

Pipeline Cloud supports arbitrary compute of any nature, and it won't meddle with the code to change the used device. So we can run CPU tasks, or GPU tasks, or a combination of both! Here though, we're running a CPU task when (as with most PyTorch models) really we want to take advantage of the remote GPU. Let's create a new pipeline_model to wrap around the NeuralNetwork class, which will send both the network and the inputs at inference time to the Pipeline Cloud cuda device.

class NeuralNetwork(nn.Module):
  def __init__(self) -> None:
    super().__init__()
    self.flatten = nn.Flatten()
    self.linear_relu_stack = nn.Sequential(
      nn.Linear(4, 512),
      nn.ReLU(),
      nn.Linear(512, 512),
      nn.ReLU(),
      nn.Linear(512, 10),
    )

  def forward(self, x: torch.Tensor) -> torch.Tensor:
    x = self.flatten(x)
    logits = self.linear_relu_stack(x)
    return logits


@pipeline_model
class PyTorchDemoModel:
  model = None
  gpu = torch.device("cuda")

  @pipeline_function
  def load(self) -> bool:
    self.model = NeuralNetwork().to(self.gpu)
    return True

  @pipeline_function
  def infer(self, input: list[list[float]]) -> torch.Tensor:
    input = torch.tensor(input)  # input needs to be converted from list to tensor
    input = input.to(self.gpu)
    return self.model.forward(input)

with Pipeline("pytorch-demo") as pipeline_blueprint:

  input_tensor = Variable(list, is_input=True)
  pipeline_blueprint.add_variable(input_tensor)

  model = PyTorchDemoModel()
  model.load()
  
  output_tensor = model.infer(input_tensor)
  output_list = tensor_to_list(output_tensor)
  pipeline_blueprint.output(output_list)

This looks like a lot, but it's pretty simple to follow! We're taking the first definition of the NeuralNetwork module and building a pipeline_model named 'PyTorchDemoModel' from it. There's a load function and then an infer function which are high-level adapters over the original init and forward methods on the NeuralNetwork module. The load and infer functions handle transfering the model and its inputs to the Pipeline Cloud CUDA device.

Then, in the blueprint, instead of instantiating NeuralNetwork directly, we instantiate PyTorchDemoModel and call its load function in order to instruct Pipeline Cloud's servers to load the model onto GPU, and later run inference on GPU. By the way, all of this code is also executable locally if you have an attached CUDA device.

If we upload and run this model, inference time (which is reported in run.compute_time_ms) might not be very different because the model is so tiny. But for larger models, running on GPU can lead to inference speed-ups of many orders of magnitude, so it's definitely worth learning the knack.

Deploying a trained model

In the examples above we deployed a freshly-initialised edition of the neural network. But most of the time, we want to deploy the neural network with trained weights.

Although in theory it's possible to train a model using Pipeline Cloud, only inference is documented and supported at the moment. So for now let's assume we already have the trained weights in a file called weights.pt, and we want to load them into the network so that when we call the cloud endpoint, we get useful results.

In fact, we only need to change a few lines to add in our trained model. We'll upload it using PipelineFile, an interface from the SDK which not only cleverly chunks the upload but also enables the Pipeline Cloud servers to share the weights between themselves so that remote loading and caching is rapid.

from pipeline import PipelineFile

with Pipeline("pytorch-demo") as pipeline_blueprint:

  input_tensor = Variable(list, is_input=True)
	weights_file = PipelineFile(path="weights.pt")
	pipeline_blueprint.add_variables(input_tensor, weights_file)

  model = PyTorchDemoModel()
  model.load(weights_file)
  
  output_tensor = model.infer(input_tensor)
  output_list = tensor_to_list(output_tensor)
  pipeline_blueprint.output(output_list)

The above blueprint follows the same pattern as before, but with three changes. Firstly, we defined a file (PipelineFile inherits from Variable) at path weights.pt on line 6. Then in the next line, we added both the input_tensor and the weights_file to the blueprint, using the add_variables method. Finally, on line 10, we updated the model's load function to receive this PipelineFile so that it can insert the stored weights into the model.

Now all that's left to do is modify the model's load function so that it correctly loads in the weights:

@pipeline_model
class PyTorchDemoModel:
  ...

  @pipeline_function
  def load(self, weights_file: PipelineFile) -> bool:
    weights = torch.load(weights_file.path, map_location=self.gpu)
    self.model = NeuralNetwork().to(self.gpu)
    self.model.load_state_dict(weights)
    return True

  ...

Now we can upload the pipeline, again using upload_pipeline, and in the terminal we'll see the weights file upload. When we submit a run, we'll now get the output of a trained model instead of the output of an untrained model.

🚧

Don't run your pipeline locally before uploading it

When you run a pipeline locally, its internal state changes according to the inputs at runtime and your current system. Quite often, serialisation issues can occur when an already-run pipeline is uploaded. And besides, you want the deployed version of your pipeline to be in its 'vanilla' un-run form before every request otherwise it may not perform as expected.

Minimising cold starts

There are a couple of neat features inside the pipeline-ai library which you can use when uploading a pipeline to make inference even faster. To figure out how they work, we first need to understand a detail about pipeline blueprints.

What happens when we call run_pipeline()? Well, the entire pipeline blueprint gets executed from top to bottom. This is a 'run': your entire API logic is within the blueprint and every time a request comes in, it will perform the inference steps – the pipeline_functions inside the blueprint.

To be clear, even though PyTorchDemoModel is instantiated inside the pipeline blueprint, it isn't instantiated during every single request, precisely because we wrapped it in pipeline_model which prevents that repeating behaviour. Similarly, the input and output variables aren't redefined and re-added to the pipeline every time as the graph knows not to do that once it has been uploaded.

However, the PyTorchDemoModel method load, as it is a pipeline_function, will be re-run every time a request is made. If the pipeline has already been loaded by a server on Pipeline Cloud, we don't actually want load to re-run; put differently, we only want load to run once, and only when it's cached on the server for the first time. Luckily, there are some flags to set this behaviour:

@pipeline_model
class PyTorchDemoModel:

  ...

  @pipeline_function(run_once=True, on_startup=True)
  def load(self) -> bool:
		...

Check out line 6. By setting these two parameters to True we establish that the load function only needs to run once during caching, and not again. This way, when new runs are submitted, they will skip the load as long as it has already been executed on the server.

Here's another trick to help Pipeline Cloud load the pipeline onto the right type of GPU. If we know how much GPU VRAM the pipeline consumes then we can specify the minimum VRAM requirement when we instantiate the pipeline.

with Pipeline("pytorch-demo", min_gpu_vram_mb=205) as pipeline_blueprint:
  ...

Using the min_gpu_vram_mb argument, we're saying explicitly that this pipeline requires a GPU with at least 205 mb of available VRAM. Pipeline Cloud servers can automatically infer this if necessary, but specifying it like this helps make pipeline inference even more efficient, and ultimately that means better performance for your endpoint.

🚧

Record VRAM at inference time, not load time

If you're using the min_gpu_vram_mb argument, you'll need to identify how much VRAM your pipeline needs. Typically this can be as easy as watching nvidia_smi. But make sure to record the highest figure (which will likely be the VRAM usage during inference time) rather than just how much memory the model takes when loaded.

Conclusion

We've seen how to convert a PyTorch neural network into an inference-ready cloud endpoint by using pipeline-ai, and how to deploy the model to the serverless GPUs at Pipeline Cloud. We looked at PipelineFile and how to upload the weights of a trained model. We also discussed some easy performance pitfalls, and some techniques to help optimise the performance of a deployed pipeline.

πŸ“˜

It's easier with pipeline-ai

Everything we did above can actually be done using our REST endpoints, documented here. Internally, the pipeline-ai SDK is simply formatting the data and sending it to the same public endpoints that you can use directly. But it's far easier to build and upload pipelines using the SDK. However, for runs, most of our users submit requests directly to the REST endpoint from their environment of choice.

Complete script

import torch
from torch import nn
from pipeline import (
  Pipeline,
  Variable,
  pipeline_function,
  pipeline_model,
  PipelineCloud,
  PipelineFile,
)
from pipeline.util.torch_utils import tensor_to_list


class NeuralNetwork(nn.Module):
  def __init__(self) -> None:
    super().__init__()
    self.flatten = nn.Flatten()
    self.linear_relu_stack = nn.Sequential(
      nn.Linear(4, 512),
      nn.ReLU(),
      nn.Linear(512, 512),
      nn.ReLU(),
      nn.Linear(512, 10),
    )
    
  def forward(self, x: torch.Tensor) -> torch.Tensor:
    x = self.flatten(x)
    logits = self.linear_relu_stack(x)
    return logits

    
@pipeline_model
class PyTorchDemoModel:
  model = None
  gpu = torch.device("cuda")

  @pipeline_function
  def load(self, weights_file: PipelineFile) -> bool:
    weights = torch.load(weights_file.path, map_location=self.gpu)
    self.model = NeuralNetwork().to(self.gpu)
    self.model.load_state_dict(weights)
    return True

  @pipeline_function
  def infer(self, input: list[list[float]]) -> torch.Tensor:
    input = torch.tensor(input)  # input needs to be converted from list to tensor
    input = input.to(self.gpu)
    return self.model.forward(input)


with Pipeline("pytorch-demo") as pipeline_blueprint:
  input_tensor = Variable(list, is_input=True)
  weights_file = PipelineFile(path="weights.pt")
  pipeline_blueprint.add_variables(input_tensor, weights_file)

  model = PyTorchDemoModel()
  model.load(weights_file)

  output_tensor = model.infer(input_tensor)
  output_list = tensor_to_list(output_tensor)
  pipeline_blueprint.output(output_list)


pytorch_pipeline = Pipeline.get_pipeline("pytorch-demo")

api = PipelineCloud(token="PIPELINE_API_TOKEN")
uploaded_pipeline = api.upload_pipeline(pytorch_pipeline)

print(f"Uploaded pipeline id: {uploaded_pipeline.id}")

example_input = tensor_to_list(torch.randn(2, 4))
run = api.run_pipeline(
  uploaded_pipeline.id,
  [example_input],
)

print(f"Run id: {run.id}")
print(run.result_preview)