Provenance Capture Methods¶
Flowcept provides two main methods to capture provenance: data observability adapters and code annotation (instrumentation) via @decorators or via explicit Python objects creation. This page shows practical ways to capture provenance in Flowcept—from zero-config quick runs to decorators, context managers, adapters, and fully custom tasks. Each section includes a minimal code snippet and links to examples.
The Flowcept() Object¶
The Flowcept object is the main runtime controller API. Its .start() method has two main purposes (which can be individually toggled enabled/disabled via constructor arguments):
To establish the communication with the MQ service
To begin the data consumers for database persistence (see also here).
It can be used in two main ways:
As a context manager – the easiest and safest option, which automatically starts and stops provenance capture.
Manually – start/stop explicitly in code. Useful when workflows are scattered across multiple files or distributed processes.
Using the Context Manager¶
from flowcept import Flowcept, flowcept_task
@flowcept_task(output_names="z")
def add_one(x): return x + 1
with Flowcept(workflow_name="my_workflow"):
# all code inside belongs to the workflow
z = add_one(7)
print(z)
When to use: - Flexible block capture - Multi-file codebases - When you don’t want to decorate a single top-level function
Manual Start/Stop¶
You can also manage a Flowcept object manually.
This gives finer control and is handy when your workflow spans multiple files or when you want to start/stop capture dynamically.
from flowcept import Flowcept, flowcept_task
@flowcept_task
def double(x): return 2 * x
flowcept = Flowcept(workflow_name="manual_example")
flowcept.start()
y = double(21)
print("Result:", y)
flowcept.stop()
Advanced Usage: Manual Control over Distributed Processes¶
In complex setups, Flowcept can run in a fully distributed mode, with producers on one environment and a consumer on another. In this case, to ensure database consistency, tasks must be given an explicit workflow_id (and campaign_id in the case of multi-workflows).
A common setup uses one persistence process for the database and multiple producers (task scripts across processes, nodes, or systems). Both producers and the persistence process require Flowcept objects, but with different configurations.
Start the DB persistence only once (e.g., first thing in an HPC job)
Save as persistence_starter.py:
from flowcept import Flowcept
flowcept = Flowcept(workflow_name="distributed_process_example", start_persistence=True, check_safe_stops=False)
# Using check_safe_stops=False allow users to fully control when is best to stop Flowcept.
# Because of that, this process will need to be manually killed later
# (via ctrl+c or leaving the HPC job scheduler kill it automatically when the job finishes)
with open('workflow_id.txt') as f:
f.write(flowcept.current_workflow_id) # This will enable the provenance tasks to be linked to this consumer and workflow.
flowcept.start()
Start the task processes, passing the generated workflow_id
Save the Python script below as distributed_tasks.py and run it with:
python distributed_tasks.py `cat workflow_id.txt`
distributed_tasks.py:
import sys
from flowcept import Flowcept, flowcept_task
workflow_id = sys.argv[1]
@flowcept_task
def double(x):
return 2 * x
# Initialize Flowcept
flowcept = Flowcept(
workflow_id=workflow_id,
start_persistence=False,
check_safe_stops=False,
save_workflows=False,
)
# This will only establish connection to the MQ, not to the DB.
flowcept.start()
# Example task
y = double(21)
print(f"Result: {y}")
flowcept.stop()
Optional Arguments¶
When creating a Flowcept instance (with or without a context manager), you can pass:
interceptors: list of interceptors (e.g.,
"instrumentation","dask","mlflow"). Defaults to["instrumentation"]if enabled. Instrumentation defaults to enabled unless explicitly set tofalsein settings.bundle_exec_id: identifier for grouping interceptors. Defaults to
id(self).campaign_id: unique identifier for the campaign. Defaults to a generated UUID.
workflow_id: unique identifier for the workflow. Defaults to a generated UUID.
workflow_name: descriptive name for the workflow.
workflow_subtype: optional workflow subtype/category (e.g.,
ml_workflow).workflow_args: dictionary of workflow-level arguments, stored in provenance.
start_persistence (bool): default
True. Enables message persistence into DBs.check_safe_stops (bool): default
True. Controls safe shutdown of consumers.save_workflow (bool): default
True. Emits a workflow metadata message on start.*args / **kwargs: adapter-specific parameters (e.g., Dask requires a
dask_clientkwarg).
Example with Custom Args¶
with Flowcept(
workflow_name="training_workflow",
workflow_subtype="ml_workflow",
campaign_id="experiment_42",
start_persistence=True,
interceptors=["instrumentation", "dask"],
workflow_args={"epochs_list": [10, 100], "learning_rates": [0.1, 0.01, 0.001]}
):
train_model()
Notes¶
Without persistence, messages are published only to MQ (not DB).
In offline mode, provenance can be dumped to a JSONL file and loaded with
Flowcept.read_messages_file().Both context manager and manual start/stop provide the same functionality—choose whichever fits your code structure.
Full API reference for the
Flowceptobject.
Data Observability Adapters¶
Flowcept can observe external tools and emit provenance automatically.
Supported adapters:
MLflow — MLflow example
Dask — Dask example
TensorBoard — TensorBoard example
Install the extras you need (see installation), then configure the adapter in your settings file. Adapters capture runs, tasks, metrics, and artifacts and push them through Flowcept’s pipeline (MQ → DB).
See the contributing page for how to add new adapters.
Decorators¶
Use decorators to mark functions as workflows or tasks with almost no code changes. If using the decorators, we expect that instrumentation is enabled in your settings file. If it is not, the provenance capture will be simply ignored and the decorated function will run as if without any Flowcept instrumentation.
@flowcept (wrap a “main” function as a workflow)¶
from flowcept.instrumentation.flowcept_decorator import flowcept
@flowcept
def main():
# Your workflow code here
if __name__ == "__main__":
main()
When to use: a single entrypoint (e.g., main) that represents the whole workflow.
Effect: creates a workflow context and captures enclosed calls (including decorated tasks).
@flowcept_task (mark a function as a task)¶
The @flowcept_task decorator wraps a Python function as a provenance task.
When the function executes, Flowcept captures its inputs (used), outputs (generated), execution metadata, telemetry (if enabled), and publishes them as provenance messages.
Simple Example (works for most)¶
from flowcept import Flowcept, flowcept_task
@flowcept_task(output_names="y") # output_names is optional.
def mult_two(x: int) -> int:
return 2 * x
with Flowcept(workflow_name="demo"):
y = mult_two(21)
# Captured provenance will show {"used": {"x": 21}, "generated": {"y": 42}}
# Without the output_names, the generated dict will show {"arg_0": 42}
Options & Behavior
Inputs (``used``)
- Function arguments are automatically bound to their parameter names using Python’s introspection.
- Example: double(21) → stored as {"x": 21} instead of {"arg_0": 21}.
- If an argparse.Namespace is passed, its attributes are flattened into key-value pairs.
- Internally this is done by the default args handler. You may override it by passing args_handler=... in the decorator.
Outputs (``generated``)
By default, the return value is stored under generic keys.
Using
output_namesimproves semantics:@flowcept_task(output_names="y")maps a scalar result to{"y": result}.If the function returns a tuple/list and
output_nameshas the same length, elements are mapped accordingly.
If the function returns a dict, it is passed through directly as
generated(with minimal normalization).
Optional Metadata
- workflow_id: by default, inherits the current workflow’s ID. Can be overridden if passed as a keyword argument.
- campaign_id: groups tasks under a campaign. Defaults to the current Flowcept campaign.
- tags: free-form labels (list or string) attached to the task, useful for filtering.
- custom_metadata: arbitrary dictionary to attach extra metadata.
Telemetry - If telemetry capture is enabled, system metrics (CPU, GPU, memory, etc.) are recorded at the start and end of the task.
Error Handling
- If the wrapped function raises an exception, provenance is still captured with status=ERROR and the exception message recorded in the stderr field..
Advanced Usage¶
from flowcept import flowcept_task
@flowcept_task(
output_names=["y", "z"], # map tuple outputs
tags=["math", "demo"], # attach tags
custom_metadata={"owner": "devX"} # arbitrary extra info
)
def compute(x):
return x * 2, x * 3
result = compute(5)
# generated = {"y": 10, "z": 15}
# tags = ["math", "demo"]
# custom_metadata = {"owner": "devX"}
—
Custom Arguments Handler and Understanding Arguments Serialization¶
The arguments handler in @flowcept_task defines how function inputs and outputs are turned into provenance-friendly dictionaries.
By default, Flowcept uses default_args_handler to capture arguments, flatten argparse.Namespace inputs, and handle non-serializable objects.
Serialization of Inputs
If a function argument or output is not JSON-serializable, Flowcept will try to convert it automatically (if settings.project.replace_non_json_serializable is enabled in your settings.yaml):
Objects with
to_flowcept_dict()orto_dict()→ converted using those methodsObjects that have __dict__ method and is kept in its internal list of (
__DICT__CLASSES) → converted using__dict__All other objects → replaced by a string
<ClassName>_instance_id_<id>
This prevents crashes while still preserving some information about the object identity.
Providing a Custom Handler¶
Developers can override this behavior with their own args_handler function.
For example, suppose you want to drop the input argument very_big_list and the output super_large_matrix:
ARGS_TO_DROP = ["very_big_list", "super_large_matrix"]
def custom_args_handler(*args, **kwargs):
if len(args):
raise Exception("In this simple example, we are assuming that"
"functions will be called using named args only.")
handled = {}
# Add all args/kwargs normally
for i, arg in enumerate(args):
handled[f"arg_{i}"] = arg
handled.update(kwargs)
# Drop unwanted inputs
for k in ARGS_TO_DROP:
handled.pop(k, None)
return handled
from flowcept import flowcept_task
@flowcept_task(args_handler=custom_args_handler, output_names="result")
def heavy_function(x, very_big_list, super_large_matrix):
# Some expensive computation
return x * 2
# Only "x" and "result" will be recorded in the provenance.
# If using this specific custom_args_handler example, make sure you call the
# function using named arguments so the expected behavior happens:
# result = heavy_function(x=x,
# very_big_list=very_big_list,
# super_large_matrix=super_large_matrix)
Summary¶
used: bound inputs, derived from function args (names are preserved if possible).generated: outputs, improved withoutput_namesor direct dict returns.workflow_id/campaign_id: control task grouping in provenance.tagsandcustom_metadata: user-controlled metadata.args_handler: optional override to customize how inputs/outputs are serialized.By default, Flowcept captures all arguments and sanitizes non-serializable objects.
With a custom args handler, you control exactly what goes into provenance (e.g., drop, rename, or transform arguments).
This is especially useful when handling large inputs (big matrices, tensors) that you don’t want persisted in provenance.
This flexibility allows Flowcept to adapt to lightweight HPC tasks, ML training steps, or fine-grained function-level tracing with minimal code changes.
@telemetry_flowcept_task (task with telemetry)¶
Same usage as @flowcept_task, but optimized to capture telemetry (CPU/GPU/memory) for the task:
from flowcept import telemetry_flowcept_task
@telemetry_flowcept_task
def train_step(batch):
# ... your training logic ...
return 0.123
When to use: you want per-task telemetry without writing custom telemetry plumbing.
@lightweight_flowcept_task (ultra-low-overhead task)¶
Optimized for HPC and tight loops; minimal interception overhead:
from flowcept import lightweight_flowcept_task
@lightweight_flowcept_task
def fast_op(x):
return x + 1
When to use: massive iteration counts, sensitive microbenchmarks, or very low overhead needs.
Loop Instrumentation¶
Instrument iterative loops directly (see loop example). Combine the context manager (below) with per-iteration tasks or custom events.
with Flowcept():
loop = FlowceptLoop(range(5)) # See also: FlowceptLightweightLoop
for item in loop:
loss = random.random()
sleep(0.05)
print(item, loss)
# The following is optional, in case you want to capture values generated inside the loop.
loop.end_iter({"item": item, "loss": loss})
FlowceptLoop vs FlowceptLightweightLoop¶
Both classes instrument iterative code and attach each iteration to provenance. They differ in how they trade detail for speed.
FlowceptLoop: opens and closes a tiny “iteration task” around every next() call. It can attach started_at, status, and (if enabled) per-iteration telemetry at the end of each iteration. Messages are sent one by one. Works with sized iterables, integers, and iterators. If you pass a pure iterator without a known length, it will materialize it into a list unless you provide items_length.
FlowceptLightweightLoop: pre-allocates a task object for every iteration up front, updates used and generated as the loop progresses, and sends everything in a single batch when the loop ends. No per-iteration telemetry capture. Requires a known length. If you pass a pure iterator, you must provide items_length.
When to use which¶
Choose FlowceptLoop if you need: - Per-iteration telemetry, started_at, and fine-grained timing. - Streaming of iteration records to the MQ/DB as the loop runs. - Constant memory usage independent of the number of iterations.
Choose FlowceptLightweightLoop if you need: - The lowest overhead for very large loops. - A single batched publish at the end of the loop. - You can provide, or already have, the exact iteration count.
Behavioral differences at a glance¶
Telemetry: FlowceptLoop records telemetry at the end of each iteration when telemetry is enabled. Lightweight does not.
Publishing: FlowceptLoop calls intercept(…) per iteration. Lightweight calls intercept_many([…]) once after the loop finishes.
Memory: FlowceptLoop keeps only the current iteration in memory. Lightweight pre-allocates a list of task objects of size len(items).
Unknown lengths: FlowceptLoop can materialize an unknown-length iterator into a list if you do not provide items_length (may be expensive). Lightweight requires a known items_length for iterators.
API quick links¶
Examples¶
Per-iteration telemetry and streaming¶
from time import sleep
from flowcept import Flowcept
from flowcept import FlowceptLoop
with Flowcept(workflow_name="telemetry_stream"):
loop = FlowceptLoop(range(5), loop_name="train_loop", item_name="epoch")
for epoch in loop:
loss = 0.1 * (5 - epoch)
sleep(0.02)
# Attach values produced inside this iteration
loop.end_iter({"loss": loss})
# Each iteration is sent with status and, if enabled, telemetry_at_end.
Ultra-low overhead and batched publish¶
from flowcept import Flowcept
from flowcept import FlowceptLightweightLoop
data = [0, 1, 2, 3, 4]
with Flowcept(workflow_name="batched_publish"):
loop = FlowceptLightweightLoop(data, loop_name="eval_loop", item_name="batch")
for batch in loop:
metric = batch * 2
loop.end_iter({"metric": metric})
# All iteration tasks are published together after the loop completes.
Iterating an unknown-length iterator¶
import itertools as it
from flowcept import Flowcept
from flowcept.instrumentation.loop import FlowceptLoop, FlowceptLightweightLoop
stream = it.islice(it.count(), 0, 100) # iterator without __len__
with Flowcept(workflow_name="iterators"):
# Option A: FlowceptLoop can materialize if you do not know the length,
# but this may be expensive for large streams.
loop_a = FlowceptLoop(stream, loop_name="loop_a", item_name="i", items_length=100)
for i in loop_a:
loop_a.end_iter({"v": i*i})
# Option B: Lightweight requires items_length for iterators.
stream2 = it.islice(it.count(), 0, 100)
loop_b = FlowceptLightweightLoop(stream2, loop_name="loop_b", item_name="i", items_length=100)
for i in loop_b:
loop_b.end_iter({"v": i*i})
Tips and caveats¶
Set item_name to control the key stored under used, for example {“epoch”: 3} instead of {“item”: 3}.
Use parent_task_id to nest loop iterations under another task.
For very large loops where you only need used and generated, prefer Lightweight to reduce interceptor calls.
If you use FlowceptLoop with a huge iterator, pass items_length to avoid accidental materialization.
Both classes honor INSTRUMENTATION_ENABLED and capture_enabled. If disabled, they behave like regular iterators and end_iter(…) becomes a no-op.
PyTorch Models¶
Flowcept can capture provenance directly from PyTorch models.
Use the @flowcept_torch decorator to wrap an nn.Module so that each forward call is automatically tracked.
import torch
import torch.nn as nn
import torch.optim as optim
from flowcept import flowcept_torch
# Instrument the model with @flowcept_torch
@flowcept_torch
class MyNet(nn.Module):
def __init__(self):
super().__init__()
self.fc = nn.Linear(10, 1)
def forward(self, x):
return self.fc(x)
# Dummy training data
x = torch.randn(100, 10) # 100 samples, 10 features
y = torch.randn(100, 1) # 100 targets
model = MyNet()
optimizer = optim.SGD(model.parameters(), lr=0.01)
loss_fn = nn.MSELoss()
# Simple training loop
for epoch in range(3):
optimizer.zero_grad()
out = model(x) # provenance captured here
loss = loss_fn(out, y)
loss.backward()
optimizer.step()
print(f"Epoch {epoch} - Loss {loss.item()}")
Explanation:
@flowcept_torch instruments the model’s
forwardmethod.Each call to
model(x)is tracked as a provenance task.If enabled (controlled in the settings.yaml file), metadata such as tensor usage, loss values, telemetry are captured.
Developers can pass extra constructor arguments like
get_profile=Trueorcustom_metadata={...}to record richer details.
This makes it possible to monitor model execution end-to-end with addition of simple @decorators.
MCP Agent Workflows¶
Capture agentic task provenance (prompt, tool call, result, timing). See MCP Agent example.
from flowcept.instrumentation.flowcept_agent_task import agent_flowcept_task
agent_controller = AgentController() # Must be a subclass of flowcept.flowceptor.consumers.agent.base_agent_context_manager.BaseAgentContextManager
mcp = FastMCP("AnC_Agent_mock", require_session=True, lifespan=agent_controller.lifespan)
@mcp.tool()
@agent_flowcept_task # Must be in this order. @mcp.tool then @flowcept_task
def tool_example(x, y, campaign_id=None):
llm = build_llm_model()
ctx = mcp.get_context()
history = ctx.request_context.lifespan_context.history
messages = generate_prompt(x, y)
response = llm.invoke(messages)
result = generate_response(result)
return result
Custom Task Creation (fully customizable)¶
Build tasks programmatically with FlowceptTask—useful for non-decorator flows or custom payloads.
Requires an active workflow (with Flowcept(...) or Flowcept().start()).
from flowcept import Flowcept
from flowcept.instrumentation.task import FlowceptTask
with Flowcept(workflow_name="custom_tasks"):
# Context-managed publish
with FlowceptTask(activity_id="download", used={"url": "https://..."}) as t:
data = b"..." # Some binary data
t.end(data=data, generated={"bytes": len(data)})
# Or publish explicitly
task = FlowceptTask(activity_id="parse", used={"bytes": len(data)})
task.end({"records": 42})
task.send() # publishes to MQ
Custom task usage patterns (explicit end vs send)¶
from time import sleep
from flowcept import Flowcept, FlowceptTask
from uuid import uuid4
flowcept = Flowcept(
start_persistence=False,
save_workflow=True,
workflow_name="MyFirstWorkflow",
)
flowcept.start()
agent1 = uuid4()
FlowceptTask(activity_id="super_func1", used={"x": 1}, agent_id=agent1, tags=["tag1"]).send()
with FlowceptTask(activity_id="super_func2", used={"y": 1}, agent_id=agent1, tags=["tag2"]) as t2:
sleep(0.5)
t2.end(generated={"o": 3})
t3 = FlowceptTask(activity_id="super_func3", used={"z": 1}, agent_id=agent1, tags=["tag3"])
sleep(0.1)
t3.end(generated={"w": 1})
flowcept.stop()
flowcept_messages = Flowcept.read_buffer_file()
assert len(flowcept_messages) == 4
If you need to store something that is not publicly exposed in the API (yet), you can use the private instance of FlowceptTask._task to access the task object fields directly. If that happens, open an issue in the repository and we will try to expose that in the public API.
Notes:
Use context (
with FlowceptTask(...)) or callsend()explicitly.If you pass any end-like fields at construction (
generated,ended_at,stdout,stderr, orstatus), the task auto-callsend(...)during__init__(defaultingstatustoFINISHEDwhen not provided).end()finalizes the task (captures end telemetry/status and any outputs) and publishes;send()only publishes if not ended, without end-telemetry/status/output capture (it setsended_at = started_at).Flows publish to the MQ; persistence/queries require a DB (e.g., MongoDB).
See also: FlowceptTask API reference
See also: Consumer example
References & Examples¶
Examples directory: https://github.com/ORNL/flowcept/tree/main/examples
MLflow adapter: https://github.com/ORNL/flowcept/blob/main/examples/mlflow_example.py
Dask adapter: https://github.com/ORNL/flowcept/blob/main/examples/dask_example.py
TensorBoard adapter: https://github.com/ORNL/flowcept/blob/main/examples/tensorboard_example.py
Loop instrumentation: https://github.com/ORNL/flowcept/blob/main/examples/instrumented_loop_example.py
LLM/PyTorch model: https://github.com/ORNL/flowcept/blob/main/examples/llm_complex/llm_model.py
MCP Agent tasks: https://github.com/ORNL/flowcept/blob/main/examples/agents/aec_agent_mock.py
Settings sample: https://github.com/ORNL/flowcept/blob/main/resources/sample_settings.yaml
Deployment (services): https://github.com/ORNL/flowcept/tree/main/deployment