Kubernetes PyTorchJob Tuning
ZenithTune provides tuning functionality for PyTorchJob in Kubernetes environments (PyTorchJobTuner). This feature allows you to specify an existing job and configure tuning parameters and objective functions just like with other Tuners, enabling the execution of tuning jobs on the cluster.
How to Use PyTorchJobTuner
PyTorchJobTuner operates with the following workflow:
Users iteratively repeat local optimization and cluster job execution by defining the job_converter
function needed for job transformation and the value_extractor
function to extract objective values.
Initialization
Create a PyTorchJobTuner instance by specifying the existing job name to be tuned and the namespace used for tuning.
from zenith_tune.job_tuning.kubernetes import PyTorchJobTuner
# Initialize the tuner
tuner = PyTorchJobTuner(
job_name="my-training-job", # Name of the PyTorchJob to tune
namespace="default", # Namespace to execute jobs (defaults to current context namespace if omitted)
)
Defining job_converter
The job_converter
function is a user-defined function that transforms existing jobs into tuning jobs.
The first argument receives an Optuna Trial, and the second argument receives the existing job specified when creating the PyTorchJobTuner instance.
Transformation to tuning jobs is performed through the second argument PyTorchJob
by: (a) adding tuning target options to command strings, or (b) setting tuning target environment variables in job definitions.
PyTorchJob
provides convenient member functions like set_env, set_command
and also allows access to job manifests in Dict format, enabling users to transform existing jobs into arbitrary jobs.
Please refer to the API reference for details on PyTorchJob
.
ZenithTune also provides the CommandBuilder
class for manipulating command strings.
CommandBuilder
parses command strings and can overwrite or remove existing options.
For details, see CommandBuilder.
from optuna.trial import Trial
from zenith_tune.command import CommandBuilder
from zenith_tune.job_tuning.kubernetes import PyTorchJob
def job_converter(trial: Trial, job: PyTorchJob) -> PyTorchJob:
# Define tuning parameters
learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-1, log=True)
batch_size = trial.suggest_categorical("batch_size", [16, 32, 64, 128])
omp_num_threads = trial.suggest_categorical("omp_num_threads", [1, 2, 4, 8])
# Set environment variables
job.set_env("OMP_NUM_THREADS", str(omp_num_threads))
# Update command
current_command = job.get_command()
builder = CommandBuilder(current_command[2]) # Example: ["sh", "-c", "python train.py"]
builder.update(f"--learning-rate {learning_rate}")
builder.update(f"--batch-size {batch_size}")
# Set the updated command
new_command = current_command.copy()
new_command[2] = builder.get_command()
job.set_command(new_command)
return job
Defining value_extractor
The value_extractor
function extracts objective values (e.g., execution time, accuracy, loss) from job execution logs, similar to CommandOutputTuner
.
The first argument receives the local file path of logs that PyTorchJobTuner automatically downloads after each trial completion.
import re
from typing import Optional
def value_extractor(log_path: str) -> Optional[float]:
with open(log_path, "r") as f:
logs = f.read()
# Example 1: Extract execution time (for minimization)
match = re.search(r"Elapsed time: ([0-9.]+) seconds", logs)
if match:
elapsed_time = float(match.group(1))
return elapsed_time
# Example 2: Extract validation accuracy (for maximization)
# match = re.search(r"Validation accuracy: ([0-9.]+)", logs)
# if match:
# accuracy = float(match.group(1))
# return accuracy
return None
Running the Tuning
Once all preparations are complete, call the optimize
method to start tuning.
# Execute tuning
tuner.optimize(
job_converter=job_converter, # Job conversion function
value_extractor=value_extractor, # Objective value extraction function
n_trials=20, # Number of trials
default_params={ # Default values for the first trial (optional)
"learning_rate": 0.001,
"batch_size": 32,
"omp_num_threads" : 2,
}
)
The detailed behavior during tuning is as follows:
- A new PyTorchJob is created for each trial
- Parameters are applied through
job_converter
- The job is submitted to the Kubernetes cluster
- Wait for job completion (with timeout setting)
- Download job logs from Kubernetes to local storage, saved as
outputs/<study_name>/trial_<n>.txt
- Execute
value_extractor
on the saved log file to extract the objective value - Jobs are automatically cleaned up
- Optuna suggests parameters for the next trial
Interrupting and Resuming Tuning
PyTorchJobTuner supports interruption and resumption of tuning sessions. This feature is useful for long-running tuning sessions or when authentication time is limited in environments like AWS.
To persist tuning sessions, specify the dp_path
parameter.
tuner = PyTorchJobTuner(
job_name="my-training-job",
namespace="default",
db_path="./study.db" # Specify the same database file
)
When resuming, if there are incomplete jobs from the previous session, PyTorchJobTuner automatically detects these jobs and waits for their completion. If no incomplete jobs exist, it submits jobs based on new trials. This mechanism enables safe resumption of job tuning even after unexpected interruptions.