Versioned Model Example¶
This example mirrors the single_layer_perceptron_test flow and focuses on versioned
checkpoint storage for the best validation-loss model.
Purpose¶
Show Git-like version control with
control_version=Truefor model checkpoints.Show two alternative save paths: - generic
ml_modelpath (works for any model payload you serialize), - PyTorch-specific helper path (works only for PyTorch models).Show simple inference after loading the best saved model from each path.
Use one save path in real projects. This example uses both only to demonstrate and test both APIs.
Training + Checkpoint Save (Two Paths)¶
import torch
import torch.nn as nn
import torch.optim as optim
from flowcept import Flowcept
from flowcept.instrumentation.flowcept_task import get_current_context_task_id
class SingleLayerPerceptron(nn.Module):
def __init__(self, input_size):
super().__init__()
self.layer = nn.Linear(input_size, 1)
def forward(self, x):
return torch.sigmoid(self.layer(x))
def validate(model, criterion, x_val, y_val):
model.eval()
with torch.no_grad():
outputs = model(x_val)
loss = criterion(outputs, y_val)
return loss.item()
def train_and_checkpoint(x_train, y_train, x_val, y_val, epochs=4, checkpoint_check=2):
model = SingleLayerPerceptron(input_size=2)
criterion = nn.BCELoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)
best_val_loss = float("inf")
ml_model_object_id = None
torch_model_object_id = None
current_task_id = get_current_context_task_id()
for epoch in range(1, epochs + 1):
model.train()
outputs = model(x_train)
loss = criterion(outputs, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
current_val_loss = validate(model, criterion, x_val, y_val)
if epoch % checkpoint_check == 0 and current_val_loss < best_val_loss:
best_val_loss = current_val_loss
metadata = {"loss": best_val_loss, "checkpoint_epoch": epoch}
# Path A: generic object API ("ml_model"), version-controlled.
ml_model_object_id = Flowcept.db.save_or_update_ml_model(
object=model.state_dict(),
object_id=ml_model_object_id,
task_id=current_task_id,
custom_metadata=metadata,
save_data_in_collection=True,
pickle=True,
control_version=True,
)
# Path B: PyTorch helper API, also version-controlled.
torch_model_object_id = Flowcept.db.save_or_update_torch_model(
model=model,
object_id=torch_model_object_id,
task_id=current_task_id,
custom_metadata=metadata,
control_version=True,
)
return ml_model_object_id, torch_model_object_id
Load Best Model + Inference¶
import pickle
import torch
from flowcept import Flowcept
sample = torch.randn(1, 2)
# Load best model from Path A (generic ml_model with pickled state_dict payload).
best_ml_model_blob = Flowcept.db.get_ml_model(ml_model_object_id)
ml_state_dict = pickle.loads(best_ml_model_blob.data)
ml_model = SingleLayerPerceptron(input_size=2)
ml_model.load_state_dict(ml_state_dict)
ml_model.eval()
with torch.no_grad():
ml_pred = ml_model(sample)
# Load best model from Path B (PyTorch helper).
reloaded_torch_model = SingleLayerPerceptron(input_size=2)
Flowcept.db.load_torch_model(reloaded_torch_model, torch_model_object_id)
reloaded_torch_model.eval()
with torch.no_grad():
torch_pred = reloaded_torch_model(sample)
assert ml_pred.shape == (1, 1)
assert torch_pred.shape == (1, 1)
Why Two Paths?¶
save_or_update_ml_modelis format-agnostic and can store serialized payloads from non-PyTorch models too.save_or_update_torch_modelis convenience tooling for PyTorchstate_dictsave/load.
Both support version control when control_version=True and object_id is reused across updates.