API Reference

Main Flowcept Object

See also: Flowcept object in provenance capture methods.

class flowcept.Flowcept(interceptors: List[str] = None, bundle_exec_id: str = None, campaign_id: str = None, workflow_id: str = None, workflow_name: str = None, workflow_subtype: str = None, workflow_args: Dict = None, agent_id: str = None, parent_workflow_id: str = None, start_persistence=True, check_safe_stops=True, save_workflow=True, delete_buffer_file=None, *args, **kwargs)

Main Flowcept controller class.

__init__(interceptors: List[str] = None, bundle_exec_id: str = None, campaign_id: str = None, workflow_id: str = None, workflow_name: str = None, workflow_subtype: str = None, workflow_args: Dict = None, agent_id: str = None, parent_workflow_id: str = None, start_persistence=True, check_safe_stops=True, save_workflow=True, delete_buffer_file=None, *args, **kwargs)

Initialize the Flowcept controller.

This class manages interceptors and workflow tracking. If used for instrumentation, each workflow should have its own instance of this class.

Parameters

interceptorsUnion[BaseInterceptor, List[BaseInterceptor], str], optional

A list of interceptor kinds (or a single interceptor kind) to apply. Examples: “instrumentation”, “dask”, “mlflow”, … The order of interceptors matters — place the outer-most interceptor first,

bundle_exec_idstr, optional

Identifier for grouping interceptors in a bundle, essential for the correct initialization and stop of interceptors. If not provided, a unique ID is assigned.

campaign_idstr, optional

A unique identifier for the campaign. If not provided, a new one is generated.

workflow_idstr, optional

A unique identifier for the workflow.

workflow_namestr, optional

A descriptive name for the workflow.

agent_id: str, optional

Use it if there is an agent responsible for executing this workflow.

parent_workflow_id: str, optional

Use it if this is a subworkflow.

workflow_subtypestr, optional

Optional subtype for workflow categorization (e.g., ml_workflow, data_prep_workflow).

workflow_argsstr, optional

Additional arguments related to the workflow.

start_persistencebool, default=True

If True, enables message persistence in the configured databases.

save_workflowbool, default=True

If True, a workflow object message is sent.

delete_buffer_filebool or None, optional

If True, deletes any existing dump buffer file on startup. If None, uses project.dump_buffer.delete_previous_file from settings.yaml.

Additional arguments (*args, **kwargs) are used for specific adapters.

For example, when using the Dask interceptor, the dask_client argument should be provided in kwargs to enable saving the Dask workflow, which is recommended.

static delete_buffer_file(path: str = None)

Delete the buffer file from disk if it exists.

If no path is provided, the default path from the settings file is used. Logs whether the file was successfully removed or not found.

Parameters

pathstr, optional

Path to the buffer JSONL file. If not provided, defaults to DUMP_BUFFER_PATH as configured in the settings.

Returns

None

The file is deleted from disk if it exists, no value is returned.

Notes

  • This operation only affects the file on disk. It does not clear the in-memory buffer.

  • Logging is performed through the class logger.

Examples

>>> flowcept.delete_buffer_file("buffer.jsonl")
# Deletes buffer.jsonl if it exists
>>> flowcept.delete_buffer_file()
# Deletes the default buffer file defined in settings
dump_buffer(path: str = None)

Dump the current in-memory buffer to a JSON Lines (JSONL) file.

Each element of the buffer (a dictionary) is serialized as a single line of JSON. If no path is provided, the default path from the settings file is used.

Parameters

pathstr, optional

Destination file path for the JSONL output. If not provided, defaults to DUMP_BUFFER_PATH as configured in the settings.

Returns

None

The buffer is written to disk, no value is returned.

Notes

  • The buffer is expected to be a list of dictionaries.

  • Existing files at the specified path will be overwritten.

  • Logging is performed through the class logger.

Examples

>>> flowcept.dump_buffer("buffer.jsonl")
# Writes buffer contents to buffer.jsonl
>>> flowcept.dump_buffer()
# Writes buffer contents to the default path defined in settings
static generate_report(report_type: str = 'provenance_card', format: str = 'markdown', print_markdown: bool = False, output_path: str | None = None, input_jsonl_path: str | None = None, records: List[Dict[str, Any]] | None = None, workflow_id: str | None = None, campaign_id: str | None = None) Dict[str, Any]

Generate a Flowcept report from JSONL, records, or DB data.

Parameters

report_typestr, optional

Report identifier. Supported values are "provenance_card" and "provenance_report". Default is "provenance_card".

formatstr, optional

Output format. "provenance_card" supports only "markdown", and "provenance_report" supports only "pdf". Default is "markdown".

print_markdownbool, optional

When True and format="markdown", render the generated markdown report to the terminal using Rich (install it with pip install flowcept[extras])

output_pathstr, optional

Destination path for the generated report file.

input_jsonl_pathstr, optional

Path to a Flowcept JSONL buffer file used as report input.

recordslist of dict, optional

In-memory workflow/task/object records used as report input.

workflow_idstr, optional

Workflow identifier for DB query mode.

campaign_idstr, optional

Campaign identifier for DB query mode.

Returns

dict

Report generation metadata including output path and input mode.

Raises

ValueError

If input-mode selection or report type/format is invalid.

FileNotFoundError

If input_jsonl_path is selected but the file does not exist.

ModuleNotFoundError

If print_markdown=True without Rich installed.

get_buffer(return_df: bool = False)

Retrieve the in-memory message buffer.

Parameters

return_dfbool, optional

If False (default), return the raw buffer as a list of dictionaries. If True, normalize the buffer into a pandas DataFrame with dotted notation for nested keys. Requires pandas to be installed.

Returns

list of dict or pandas.DataFrame
  • If return_df=False: the buffer as a list of dictionaries.

  • If return_df=True: the buffer as a normalized DataFrame.

Raises

ModuleNotFoundError

If return_df=True but pandas is not installed.

Examples

>>> buf = flowcept.get_buffer()
>>> isinstance(buf, list)
True
>>> df = flowcept.get_buffer(return_df=True)
>>> "generated.attention" in df.columns
True
static read_buffer_file(file_path: str | None = None, return_df: bool = False, normalize_df: bool = False, consolidate: bool = False, workflow_id: str | None = None, cleanup_files: bool = True)

Read a JSON Lines (JSONL) file containing captured Flowcept messages.

This function loads a file where each line is a serialized JSON object. It joins the lines into a single JSON array and parses them efficiently with orjson. If return_df is True, it returns a pandas DataFrame created via pandas.json_normalize(..., sep='.') so nested fields become dot-separated columns (for example, generated.attention).

Parameters

file_pathstr, optional

Path to the buffer file. If not provided, defaults to the value of DUMP_BUFFER_PATH from the configuration. If neither is provided, an assertion error is raised.

return_dfbool, default False

If True, return a normalized pandas DataFrame. If False, return the parsed list of dictionaries.

normalize_df: bool, default False

If True, normalize the inner dicts (e.g., used, generated, custom_metadata) as individual columns in the returned DataFrame.

consolidate: bool, default False

If True, merge all matching workflow buffer files into a single JSONL file first.

workflow_idstr, optional

Workflow ID to use when consolidating buffer files.

cleanup_filesbool, default True

If True, delete consolidated input files and keep a single JSONL file with only the workflow ID appended to the base path.

Returns

list of dict or pandas.DataFrame

A list of message objects when return_df is False, otherwise a normalized DataFrame with dot-separated columns.

Raises

AssertionError

If no file_path is provided and DUMP_BUFFER_PATH is not set.

FileNotFoundError

If the specified file does not exist.

orjson.JSONDecodeError

If the file contents cannot be parsed as valid JSON.

ModuleNotFoundError

If return_df is True but pandas is not installed.

Examples

Read messages as a list:

>>> msgs = read_buffer_file("offline_buffer.jsonl")
>>> len(msgs) > 0
True

Read messages as a normalized DataFrame:

>>> df = read_buffer_file("offline_buffer.jsonl", return_df=True)
>>> "generated.attention" in df.columns
True
save_workflow(interceptor: str, interceptor_instance: BaseInterceptor)

Save the current workflow and send its metadata using the provided interceptor.

This method assigns a unique workflow ID if one does not already exist, creates a WorkflowObject, and populates it with relevant metadata such as campaign ID, workflow name, and arguments. The interceptor is then used to send the workflow data.

Parameters

interceptor : str interceptor kind interceptor_instance: BaseInterceptor object to store the workflow info

Returns

None

static services_alive() bool

Checks the liveness of the MQ (Message Queue) and, if enabled, the MongoDB service.

Returns

bool

True if all services (MQ and optionally MongoDB) are alive, False otherwise.

Notes

  • The method tests the liveness of the MQ service using MQDao.

  • If MONGO_ENABLED is True, it also checks the liveness of the MongoDB service using MongoDBDAO.

  • Logs errors if any service is not ready, and logs success when both services are operational.

Examples

>>> is_alive = services_alive()
>>> if is_alive:
...     print("All services are running.")
... else:
...     print("One or more services are not ready.")
start() Flowcept

Start Flowcept Controller.

static start_consumption_services(bundle_exec_id: str = None, check_safe_stops: bool = False, consumers: List = None)

Starts the document consumption services for processing.

Parameters

bundle_exec_idstr, optional

The execution ID of the bundle being processed. Defaults to None.

check_safe_stopsbool, optional

Whether to enable safe stop checks for the service. Defaults to False.

consumersList, optional

A list of consumer types to be started. Currently, only one type of consumer is supported. Defaults to None.

Raises

NotImplementedError

If multiple consumer types are provided in the consumers list.

Notes

  • The method initializes the DocumentInserter service, which processes documents based on the provided parameters.

  • The threaded parameter for DocumentInserter.start is set to False.

Examples

>>> start_consumption_services(bundle_exec_id="12345", check_safe_stops=True)
stop()

Stop Flowcept controller.

Flowcept.db: Querying the Database

The Flowcept.db property exposes an instance of flowcept.flowcept_api.db_api.DBAPI, providing high-level methods to query, insert, and update provenance data in the configured database.

Typical usage:

from flowcept import Flowcept

# Query tasks from the current workflow
tasks = Flowcept.db.get_tasks_from_current_workflow()

# Query workflows
workflows = Flowcept.db.workflow_query({"name": "my_workflow"})

# Insert or update a task/workflow
Flowcept.db.insert_or_update_task(my_task_obj)
Flowcept.db.insert_or_update_workflow(my_wf_obj)

Main Message Objects

class flowcept.TaskObject

Task object class.

Represents a single provenance task in Flowcept, including inputs, outputs, execution metadata, telemetry, and environment details.

activity_id: AnyStr = None

Activity name (usually the function name) associated with the task.

adapter_id: AnyStr = None

Identifier of the adapter that produced this task (if any).

address: AnyStr = None

Optional network address associated with the task.

agent_id: str = None

Identifier of the agent that executed (or is going to execute) this task.

campaign_id: AnyStr = None

Campaign identifier grouping related tasks together.

custom_metadata: Dict[AnyStr, Any] = None

Custom metadata dictionary provided by the developer/user.

data: Any = None

Arbitrary raw data payload associated with the task. It is good practice to add custom_metadata associated with data, especially if it contains file contents. In that case, custom_metadata should contain the keys “file_type”, “file_content”, “file_name”, “extension”.

dependencies: List = None

List of task IDs this task depends on.

dependents: List = None

List of task IDs that depend on this task.

ended_at: float = None

Timestamp when the task execution ended.

enrich(adapter_key=None)

Enrich it.

static enrich_task_dict(task_dict: dict)

Enrich the task.

environment_id: AnyStr = None

Identifier of the environment where the task executed.

static from_dict(task_obj_dict: Dict[AnyStr, Any]) TaskObject

Create a TaskObject from a dictionary.

Parameters

task_obj_dictDict[AnyStr, Any]

Dictionary containing task attributes.

Returns

TaskObject

A TaskObject instance populated with available data.

generated: Dict[AnyStr, Any] = None

Outputs produced by the task (results, artifacts, files).

static get_dict_field_names()

Get field names.

static get_time_field_names()

Get the time field.

group_id: AnyStr = None

Grouping identifier, often used to link loop iterations together.

hostname: AnyStr = None

Hostname of the machine executing the task.

login_name: AnyStr = None

Login name of the user in the execution environment.

mq_host: str = None

Message queue host associated with the task.

node_name: AnyStr = None

Node name in a distributed system or HPC cluster.

parent_task_id: AnyStr = None

Identifier of the parent task, if this task is nested or dependent.

private_ip: AnyStr = None

Private IP address of the machine executing the task.

public_ip: AnyStr = None

Public IP address of the machine executing the task.

registered_at: float = None

Timestamp when the task was registered by the DocInserter.

serialize()

Serialize it.

source_agent_id: str = None

Identifier of the agent that sent this task to be executed (if any).

started_at: float = None

Timestamp when the task execution started.

status: Status = None

Execution status of the task (e.g., FINISHED, ERROR).

stderr: AnyStr | Dict = None

Captured standard error from the task, if available.

stdout: AnyStr | Dict = None

Captured standard output from the task, if available.

submitted_at: float = None

Timestamp when the task was submitted.

subtype: AnyStr = None

Optional subtype of the task (e.g., iteration, ML step, custom).

tags: List = None

User-defined tags attached to the task.

task_id: AnyStr = None

Unique identifier of the task.

static task_id_field()

Get task id.

telemetry_at_end: Telemetry = None

Telemetry snapshot captured at the end of the task.

telemetry_at_start: Telemetry = None

Telemetry snapshot captured at the start of the task.

to_dict()

Convert to dictionary.

type = 'task'

Constant type label for this object (“task”).

used: Dict[AnyStr, Any] = None

Inputs consumed by the task (parameters, files, resources).

user: AnyStr = None

User who executed or triggered the task.

utc_timestamp: float = None

UTC timestamp when the task object was created.

workflow_id: AnyStr = None

Identifier of the workflow this task belongs to.

static workflow_id_field()

Get workflow id.

workflow_name: AnyStr = None

Name of the workflow this task belongs to.

class flowcept.WorkflowObject(workflow_id=None, name=None, used=None, generated=None)

Workflow object class.

Represents metadata and provenance details for a workflow execution.

adapter_id: AnyStr = None

Identifier of the adapter (e.g., Dask, MLflow) that triggered workflow capture.

campaign_id: AnyStr = None

Identifier for grouping workflows into a campaign or experiment.

code_repository: Dict = None

Details of the code repository (URL, commit hash, branch) used to run the workflow.

conf: Dict = None

Workflow configuration parameters, such as hyperparameters or runtime options.

custom_metadata: Dict = None

User-defined metadata dictionary with additional annotations.

static deserialize(serialized_data) WorkflowObject

Deserialize it.

enrich(adapter_key=None)

Enrich it.

environment_id: str = None

Identifier for the runtime environment (e.g., conda env, container).

extra_metadata: str = None

Optional free-form metadata for extensions not covered by other fields.

flowcept_settings: Dict = None

Snapshot of Flowcept’s active settings used for this workflow.

flowcept_version: AnyStr = None

Version of Flowcept used during execution.

static from_dict(dict_obj: Dict) WorkflowObject

Convert from dictionary.

generated: Dict = None

Outputs generated by the workflow (artifacts, models, or results).

interceptor_ids: List = None

List of interceptors applied to this workflow (e.g., instrumentation, telemetry).

machine_info: Dict = None

System or hardware information where the workflow is executed.

name: AnyStr = None

Descriptive name for the workflow.

parent_workflow_id: AnyStr = None

Identifier of the parent workflow, if this workflow is nested or derived.

serialize()

Serialize it.

subtype: AnyStr = None

Optional subtype of the workflow (e.g., data_prep_workflow, ml_workflow).

sys_name: str = None

Logical system or facility name (e.g., HPC system name, cluster identifier).

to_dict()

Convert to dictionary.

used: Dict = None

Inputs consumed by the workflow (datasets, arguments, or configuration values).

user: AnyStr = None

User who launched or owns the workflow run.

utc_timestamp: float = None

Timestamp (UTC, in seconds) when the workflow object was created.

workflow_id: AnyStr = None

Unique identifier for the workflow.

static workflow_id_field()

Get workflow id.

FlowceptTask

class flowcept.FlowceptTask(task_id: str = None, workflow_id: str = None, campaign_id: str = None, activity_id: str = None, agent_id: str = None, source_agent_id: str = None, parent_task_id: str = None, used: Dict = None, data: Any = None, subtype: str = None, hostname: str = None, tags: List[str] = None, adapter_id: str = None, custom_metadata: Dict = None, generated: Dict = None, started_at: float = None, ended_at: float = None, stdout: str = None, stderr: str = None, status: Status = None)

Bases: object

A context manager for capturing and provenance and task telemetry data within the Flowcept framework.

This class encapsulates the lifecycle of a task, recording its start and end times, telemetry, and metadata. It integrates with the Flowcept API and Instrumentation Interceptor to log task-specific details.

Methods

__enter__()

Sets up the task context.

__exit__(exc_type, exc_val, exc_tb)

Ends the task context, ensuring telemetry and metadata are recorded.

end(generated=None, ended_at=None, stdout=None, stderr=None, status=Status.FINISHED)

Finalizes the task, capturing telemetry, status, and other details.

Notes

If instrumentation is disabled (INSTRUMENTATION_ENABLED is False), the methods in this class are no-ops, and no data is captured.

__init__(task_id: str = None, workflow_id: str = None, campaign_id: str = None, activity_id: str = None, agent_id: str = None, source_agent_id: str = None, parent_task_id: str = None, used: Dict = None, data: Any = None, subtype: str = None, hostname: str = None, tags: List[str] = None, adapter_id: str = None, custom_metadata: Dict = None, generated: Dict = None, started_at: float = None, ended_at: float = None, stdout: str = None, stderr: str = None, status: Status = None)

Initializes a FlowceptTask and optionally finalizes it.

If any of the following optional arguments are provided — generated, ended_at, stdout, stderr, or status — the task will be automatically finalized by calling end() during initialization. This is useful when the task’s outcome is already known at the moment of instantiation.

Parameters

task_idstr, optional

Unique identifier for the task. Defaults to a generated ID.

workflow_idstr, optional

ID of the workflow to which this task belongs.

campaign_idstr, optional

ID of the campaign to which this task belongs.

activity_idstr, optional

Describes the specific activity this task captures.

usedDict, optional

Metadata about resources or data used during the task.

data: Any, optional

Any raw data associated to this task.

subtypestr, optional

Optional string categorizing the task subtype.

custom_metadataDict, optional

Additional user-defined metadata to associate with the task.

generatedDict, optional

Output data generated during the task execution.

started_atfloat, optional

Timestamp indicating when the task started.

ended_atfloat, optional

Timestamp indicating when the task ended.

stdoutstr, optional

Captured standard output from the task.

stderrstr, optional

Captured standard error from the task.

statusStatus, optional

Task completion status. If provided, defaults to Status.FINISHED if unspecified.

end(generated: Dict = None, ended_at: float = None, stdout: str = None, stderr: str = None, data: Any = None, custom_metadata: Dict = None, status: Status | None = None)

Finalizes the task by capturing its end state, telemetry, and status.

This method records the task’s ending telemetry data, status, and any outputs or errors. It also sends the task data to the instrumentation interceptor for logging or further processing.

Parameters

generatedDict, optional

Metadata or data generated during the task’s execution. Defaults to None.

data: Any, optional

Any raw data associated to this task.

custom_metadataDict, optional

Additional user-defined metadata to associate with the task.

ended_atfloat, optional

Timestamp indicating when the task ended. If not provided, defaults to the current time.

stdoutstr, optional

Standard output captured during the task’s execution. Defaults to None.

stderrstr, optional

Standard error captured during the task’s execution. Defaults to None.

statusStatus, optional

Status of the task at the time of completion. Defaults to Status.FINISHED.

Notes

If instrumentation is disabled (INSTRUMENTATION_ENABLED is False), this method performs no actions.

get_agent_id()

Return the agent_id identifier.

Returns

Any

Identifier of the campaign associated with the current task.

get_campaign_id()

Return the campaign identifier.

Returns

Any

Identifier of the campaign associated with the current task.

get_id()

Return the task identifier.

Returns

Any

Identifier associated with the current task.

get_workflow_id()

Return the workflow identifier.

Returns

Any

Identifier of the workflow associated with the current task.

send()

Finalizes and sends the task data if not already ended.

This method acts as a simple alias for finalizing the task without requiring additional arguments. It sends the task object to the interceptor for capture and marks it as ended to prevent multiple submissions.

FlowceptLoop

Can be imported via from flowcept import FlowceptLoop

class flowcept.instrumentation.flowcept_loop.FlowceptLoop(items: Sized | Iterator | int, loop_name='loop', item_name='item', parent_task_id=None, workflow_id=None, items_length=0, capture_enabled=True)

Bases: object

A utility class to wrap and instrument iterable loops for telemetry and tracking.

The FlowceptLoop class supports iterating over a collection of items or a numeric range while capturing metadata for each iteration and for the loop as a whole. This is particularly useful in scenarios where tracking and instrumentation of loop executions is required.

Notes

This class integrates with the Flowcept system for telemetry and tracking, ensuring detailed monitoring of loops and their iterations. It is designed for cases where capturing granular runtime behavior of loops is critical.

__init__(items: Sized | Iterator | int, loop_name='loop', item_name='item', parent_task_id=None, workflow_id=None, items_length=0, capture_enabled=True)

Initialize a FlowceptLoop instance for tracking iterations.

This constructor wraps an iterable, numeric range, or explicit iterator into a loop context where each iteration is instrumented with provenance and optional telemetry. If instrumentation is disabled, the loop behaves like a normal Python iterator with minimal overhead.

Parameters

itemsUnion[Sized, Iterator, int]

The items to iterate over. Can be:

  • A sized iterable (e.g., list, range).

  • An integer (interpreted as range(items)).

  • An iterator (requires items_length if length cannot be inferred).

loop_namestr, optional

A descriptive name for the loop. Used in provenance as the loop’s activity identifier. Default is "loop".

item_namestr, optional

The key name under which each iteration’s item is recorded in provenance. Default is "item".

parent_task_idstr, optional

The identifier of a parent task, if this loop is nested within another task. Default is None.

workflow_idstr, optional

Identifier for the workflow this loop belongs to. If not provided, it is inherited from the current Flowcept context or generated as a UUID.

items_lengthint, optional

Explicit number of items if items is an iterator without a defined length. Default is 0.

capture_enabledbool, optional

Whether to enable provenance/telemetry capture. If False, the loop runs without instrumentation. Default is True.

Raises

Exception

If items is not a supported type (sized iterable, integer, or iterator).

Notes

  • Each iteration is recorded with used (inputs) and optional generated values, plus telemetry if enabled.

  • Iteration metadata is finalized at the end of each iteration and sent to the active Flowcept interceptor.

get_current_iteration_id()

Get current iteration’s task id.

FlowceptLightweightLoop

Can be imported via from flowcept import FlowceptLightweightLoop

class flowcept.instrumentation.flowcept_loop.FlowceptLightweightLoop(items: Sized | Iterator, loop_name='loop', item_name='item', parent_task_id=None, workflow_id=None, items_length=0, capture_enabled=True)

Bases: object

A utility class to wrap and instrument iterable loops for telemetry and tracking.

The FlowceptLightweightLoop class supports iterating over a collection of items or a numeric range while capturing metadata for each iteration and for the loop as a whole. This is particularly useful in scenarios where tracking and instrumentation of loop executions is required.

Parameters

itemsUnion[Sized, Iterator]

The items to iterate over. Must either be an iterable with a __len__ method or an integer representing the range of iteration.

loop_namestr, optional

A descriptive name for the loop (default is “loop”).

item_namestr, optional

The name used for each item in the telemetry (default is “item”).

parent_task_idstr, optional

The ID of the parent task associated with the loop, if applicable (default is None).

workflow_idstr, optional

The workflow ID to associate with this loop. If not provided, it will be generated or inferred from the current workflow context.

Raises

Exception

If items is not an iterable with a __len__ method or an integer.

Notes

This class integrates with the Flowcept system for telemetry and tracking, ensuring detailed monitoring of loops and their iterations. It is designed for cases where capturing granular runtime behavior of loops is critical.

__init__(items: Sized | Iterator, loop_name='loop', item_name='item', parent_task_id=None, workflow_id=None, items_length=0, capture_enabled=True)

Initialize a FlowceptLightweightLoop instance for tracking iterations.

This constructor provides a lower-overhead loop wrapper compared to FlowceptLoop. Iterations are pre-registered as task objects, and capture primarily updates used and generated fields as the loop progresses.

Parameters

itemsUnion[Sized, Iterator]

The items to iterate over. Must either be:

  • A sized iterable (with __len__).

  • An explicit iterator (length must be given by items_length).

loop_namestr, optional

A descriptive name for the loop. Used in provenance as the loop’s activity identifier. Default is "loop".

item_namestr, optional

The key name under which each iteration’s item is recorded in provenance. Default is "item".

parent_task_idstr, optional

The identifier of a parent task, if this loop is nested within another task. Default is None.

workflow_idstr, optional

Identifier for the workflow this loop belongs to. If not provided, it is inherited from the current Flowcept context or generated as a UUID.

items_lengthint, optional

Explicit number of items if items is an iterator without a defined length. Default is 0.

capture_enabledbool, optional

Whether to enable provenance/telemetry capture. If False, the loop runs without instrumentation. Default is True.

Raises

Exception

If items is neither a sized iterable nor an iterator.

Notes

  • This class is designed for high-performance scenarios with many iterations.

  • Iteration tasks are pre-allocated, and provenance capture is batched via the Flowcept interceptor.

  • Compared to FlowceptLoop, this class avoids per-iteration telemetry overhead unless explicitly enabled.

end_iter(generated_value: Dict)

Finalizes the current iteration by associating generated values with the iteration metadata.

This method updates the metadata of the current iteration to include the values generated during the iteration, ensuring they are properly logged and tracked.

Parameters

generated_valuedict

A dictionary containing the generated values for the current iteration. These values will be stored in the generated field of the iteration’s metadata.

get_current_iteration_id()

Get current iteration’s task id.