Versioned Single-Layer Perceptron Example ========================================= This page provides a full runnable example based on ``tests/instrumentation_tests/ml_tests/single_layer_perceptron_test.py``. It shows: - saving dataset snapshots with ``save_or_update_dataset``, - saving model checkpoints with ``save_or_update_ml_model``, - saving/loading PyTorch checkpoints with ``save_or_update_torch_model`` and ``load_torch_model``, - generating workflow provenance reports (markdown and pdf). .. code-block:: python import pickle import random import torch import torch.nn as nn import torch.optim as optim from flowcept import Flowcept from flowcept.instrumentation.flowcept_task import flowcept_task, get_current_context_task_id def set_reproducibility(seed: int): """Simple deterministic setup. This is optional. Not required by Flowcept.""" random.seed(seed) torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed) if hasattr(torch, "use_deterministic_algorithms"): torch.use_deterministic_algorithms(True, warn_only=True) if hasattr(torch.backends, "cudnn"): torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False return { "seed": seed, "torch_deterministic_algorithms": True, "torch_cudnn_deterministic": bool(getattr(torch.backends.cudnn, "deterministic", False)) if hasattr(torch.backends, "cudnn") else False, "torch_cudnn_benchmark": bool(getattr(torch.backends.cudnn, "benchmark", False)) if hasattr(torch.backends, "cudnn") else False, } class SingleLayerPerceptron(nn.Module): def __init__(self, input_size=2): super().__init__() self.layer = nn.Linear(input_size, 1) def forward(self, x): return torch.sigmoid(self.layer(x)) def shape_args_handler(*args, **kwargs): """Store tensor arguments as shape metadata (for readable/serializable provenance).""" def _shape_key(name): return name if name.endswith("_shape") else f"{name}_shape" handled = {} for i, arg in enumerate(args): key = f"arg_{i}" if isinstance(arg, torch.Tensor): handled[_shape_key(key)] = tuple(arg.shape) else: handled[key] = arg for key, value in kwargs.items(): if isinstance(value, torch.Tensor): handled[_shape_key(key)] = tuple(value.shape) else: handled[key] = value return handled @flowcept_task( args_handler=shape_args_handler, output_names=["x_train_shape", "y_train_shape", "x_val_shape", "y_val_shape"], ) def get_dataset(n_samples, split_ratio, reproducibility): """Generate synthetic binary-classification data and save dataset blob.""" generator = torch.Generator().manual_seed(reproducibility["seed"]) x = torch.cat( [ torch.randn(n_samples // 2, 2, generator=generator) + 2, torch.randn(n_samples // 2, 2, generator=generator) - 2, ] ) y = torch.cat([torch.zeros(n_samples // 2), torch.ones(n_samples // 2)]).unsqueeze(1) n_train = int(n_samples * split_ratio) x_train, x_val = x[:n_train], x[n_train:] y_train, y_val = y[:n_train], y[n_train:] Flowcept.db.save_or_update_dataset( object={"x_train": x_train, "y_train": y_train, "x_val": x_val, "y_val": y_val}, task_id=get_current_context_task_id(), custom_metadata={"n_samples": n_samples, "split_ratio": split_ratio, **reproducibility}, save_data_in_collection=True, pickle=True, control_version=True, ) return x_train, y_train, x_val, y_val def validate(model, criterion, x_val, y_val): model.eval() with torch.no_grad(): outputs = model(x_val) loss = criterion(outputs, y_val) predictions = outputs.round() accuracy = (predictions.eq(y_val).sum().item()) / y_val.size(0) return loss.item(), accuracy @flowcept_task(args_handler=shape_args_handler) def train_and_validate(n_input_neurons, epochs, x_train, y_train, x_val, y_val, checkpoint_check=2): model = SingleLayerPerceptron(input_size=n_input_neurons) criterion = nn.BCELoss() optimizer = optim.SGD(model.parameters(), lr=0.1) best_val_loss = float("inf") ml_model_object_id = None torch_model_object_id = None current_task_id = get_current_context_task_id() for epoch in range(1, epochs + 1): model.train() outputs = model(x_train) loss = criterion(outputs, y_train) optimizer.zero_grad() loss.backward() optimizer.step() current_val_loss, _ = validate(model, criterion, x_val, y_val) if epoch % checkpoint_check == 0 and current_val_loss < best_val_loss: best_val_loss = current_val_loss checkpoint_meta = {"loss": best_val_loss, "checkpoint_epoch": epoch} # Generic path (format-agnostic) ml_model_object_id = Flowcept.db.save_or_update_ml_model( object=model.state_dict(), object_id=ml_model_object_id, task_id=current_task_id, custom_metadata=checkpoint_meta, save_data_in_collection=True, pickle=True, control_version=True, ) # PyTorch helper path torch_model_object_id = Flowcept.db.save_or_update_torch_model( model=model, object_id=torch_model_object_id, task_id=current_task_id, custom_metadata=checkpoint_meta, control_version=True, ) final_val_loss, final_val_accuracy = validate(model, criterion, x_val, y_val) return { "val_loss": final_val_loss, "val_accuracy": final_val_accuracy, "best_val_loss": best_val_loss, "ml_model_object_id": ml_model_object_id, "torch_model_object_id": torch_model_object_id, } def run_training(n_samples=120, split_ratio=0.8, n_input_neurons=2, epochs=4, seed=42): reproducibility = set_reproducibility(seed) x_train, y_train, x_val, y_val = get_dataset(n_samples, split_ratio, reproducibility) return train_and_validate(n_input_neurons, epochs, x_train, y_train, x_val, y_val) if __name__ == "__main__": reproducibility = set_reproducibility(42) with Flowcept(workflow_name="MLP Train", workflow_args=reproducibility) as flowcept: run_result = run_training(seed=reproducibility["seed"]) workflow_id = flowcept.current_workflow_id # Load best generic ml_model checkpoint ml_model_blob = Flowcept.db.get_ml_model(run_result["ml_model_object_id"]) ml_state_dict = pickle.loads(ml_model_blob.data) model_from_ml_model = SingleLayerPerceptron(input_size=2) model_from_ml_model.load_state_dict(ml_state_dict) model_from_ml_model.eval() # Load best torch helper checkpoint model_from_torch_helper = SingleLayerPerceptron(input_size=2) Flowcept.db.load_torch_model(model_from_torch_helper, run_result["torch_model_object_id"]) model_from_torch_helper.eval() # Generate markdown report Flowcept.generate_report( output_path=f"./PROVENANCE_CARD_{workflow_id}.md", workflow_id=workflow_id, ) # Generate pdf report if report_pdf dependencies are installed try: import matplotlib # noqa: F401 import reportlab # noqa: F401 except ModuleNotFoundError: pass else: Flowcept.generate_report( report_type="provenance_report", format="pdf", output_path=f"./PROVENANCE_REPORT_{workflow_id}.pdf", workflow_id=workflow_id, ) Notes ----- - This is a single-layer perceptron example (one ``nn.Linear`` layer). - In production, choose one model-save path (generic or torch-specific). - Version history is append-only in ``object_history`` when ``control_version=True``. - Dataset snapshots are useful for reproducibility and workflow linkage.