Skip to main content
Version: v2509

Kubernetes PyTorchJob Tuning

ZenithTune provides tuning functionality for PyTorchJob in Kubernetes environments (PyTorchJobTuner). This feature allows you to specify an existing job and configure tuning parameters and objective functions just like with other Tuners, enabling the execution of tuning jobs on the cluster.

How to Use PyTorchJobTuner

PyTorchJobTuner operates with the following workflow:

Users iteratively repeat local optimization and cluster job execution by defining the job_converter function needed for job transformation and the value_extractor function to extract objective values.

Initialization

Create a PyTorchJobTuner instance by specifying the existing job name to be tuned and the namespace used for tuning.

from zenith_tune.job_tuning.kubernetes import PyTorchJobTuner

# Initialize the tuner
tuner = PyTorchJobTuner(
job_name="my-training-job", # Name of the PyTorchJob to tune
namespace="default", # Namespace to execute jobs (defaults to current context namespace if omitted)
)

Defining job_converter

The job_converter function is a user-defined function that transforms existing jobs into tuning jobs. The first argument receives an Optuna Trial, and the second argument receives the existing job specified when creating the PyTorchJobTuner instance.

Transformation to tuning jobs is performed through the second argument PyTorchJob by: (a) adding tuning target options to command strings, or (b) setting tuning target environment variables in job definitions. PyTorchJob provides convenient member functions like set_env, set_command and also allows access to job manifests in Dict format, enabling users to transform existing jobs into arbitrary jobs. Please refer to the API reference for details on PyTorchJob.

ZenithTune also provides the CommandBuilder class for manipulating command strings. CommandBuilder parses command strings and can overwrite or remove existing options. For details, see CommandBuilder.

from optuna.trial import Trial
from zenith_tune.command import CommandBuilder
from zenith_tune.job_tuning.kubernetes import PyTorchJob

def job_converter(trial: Trial, job: PyTorchJob) -> PyTorchJob:
# Define tuning parameters
learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True)
batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])
omp_num_threads = trial.suggest_categorical("omp_num_threads", [1, 2, 4, 8])

# Set environment variables
job.set_env("OMP_NUM_THREADS", str(omp_num_threads))

# Update command
current_command = job.get_command()
builder = CommandBuilder(current_command[2]) # Example: ["sh", "-c", "python train.py"]
builder.update(f"--learning-rate {learning_rate}")
builder.update(f"--batch-size {batch_size}")

# Set the updated command
new_command = current_command.copy()
new_command[2] = builder.get_command()
job.set_command(new_command)

return job

Defining value_extractor

The value_extractor function extracts objective values (e.g., execution time, accuracy, loss) from job execution logs, similar to CommandOutputTuner. The first argument receives the local file path of logs that PyTorchJobTuner automatically downloads after each trial completion.

import re
from typing import Optional

def value_extractor(log_path: str) -> Optional[float]:
with open(log_path, "r") as f:
logs = f.read()

# Example 1: Extract execution time (for minimization)
match = re.search(r"Elapsed time: ([0-9.]+) seconds", logs)
if match:
elapsed_time = float(match.group(1))
return elapsed_time

# Example 2: Extract validation accuracy (for maximization)
# match = re.search(r"Validation accuracy: ([0-9.]+)", logs)
# if match:
# accuracy = float(match.group(1))
# return accuracy
return None

Running the Tuning

Once all preparations are complete, call the optimize method to start tuning.

# Execute tuning
tuner.optimize(
job_converter=job_converter, # Job conversion function
value_extractor=value_extractor, # Objective value extraction function
n_trials=20, # Number of trials
default_params={ # Default values for the first trial (optional)
"learning_rate": 0.001,
"batch_size": 32,
"omp_num_threads" : 2,
}
)

The detailed behavior during tuning is as follows:

  1. A new PyTorchJob is created for each trial
  2. Parameters are applied through job_converter
  3. The job is submitted to the Kubernetes cluster
  4. Wait for job completion (with timeout setting)
  5. Download job logs from Kubernetes to local storage, saved as outputs/<study_name>/trial_<n>.txt
  6. Execute value_extractor on the saved log file to extract the objective value
  7. Jobs are automatically cleaned up
  8. Optuna suggests parameters for the next trial

Interrupting and Resuming Tuning

PyTorchJobTuner supports interruption and resumption of tuning sessions. This feature is useful for long-running tuning sessions or when authentication time is limited in environments like AWS.

To persist tuning sessions, specify the dp_path parameter.

tuner = PyTorchJobTuner(
job_name="my-training-job",
namespace="default",
db_path="./study.db" # Specify the same database file
)

When resuming, if there are incomplete jobs from the previous session, PyTorchJobTuner automatically detects these jobs and waits for their completion. If no incomplete jobs exist, it submits jobs based on new trials. This mechanism enables safe resumption of job tuning even after unexpected interruptions.