API Reference¶
Main Flowcept Object¶
See also: Flowcept object in provenance capture methods.
- class flowcept.Flowcept(interceptors: List[str] = None, bundle_exec_id: str = None, campaign_id: str = None, workflow_id: str = None, workflow_name: str = None, workflow_subtype: str = None, workflow_args: Dict = None, agent_id: str = None, parent_workflow_id: str = None, start_persistence=True, check_safe_stops=True, save_workflow=True, delete_buffer_file=None, *args, **kwargs)
Main Flowcept controller class.
- __init__(interceptors: List[str] = None, bundle_exec_id: str = None, campaign_id: str = None, workflow_id: str = None, workflow_name: str = None, workflow_subtype: str = None, workflow_args: Dict = None, agent_id: str = None, parent_workflow_id: str = None, start_persistence=True, check_safe_stops=True, save_workflow=True, delete_buffer_file=None, *args, **kwargs)
Initialize the Flowcept controller.
This class manages interceptors and workflow tracking. If used for instrumentation, each workflow should have its own instance of this class.
Parameters¶
- interceptorsUnion[BaseInterceptor, List[BaseInterceptor], str], optional
A list of interceptor kinds (or a single interceptor kind) to apply. Examples: “instrumentation”, “dask”, “mlflow”, … The order of interceptors matters — place the outer-most interceptor first,
- bundle_exec_idstr, optional
Identifier for grouping interceptors in a bundle, essential for the correct initialization and stop of interceptors. If not provided, a unique ID is assigned.
- campaign_idstr, optional
A unique identifier for the campaign. If not provided, a new one is generated.
- workflow_idstr, optional
A unique identifier for the workflow.
- workflow_namestr, optional
A descriptive name for the workflow.
- agent_id: str, optional
Use it if there is an agent responsible for executing this workflow.
- parent_workflow_id: str, optional
Use it if this is a subworkflow.
- workflow_subtypestr, optional
Optional subtype for workflow categorization (e.g.,
ml_workflow,data_prep_workflow).- workflow_argsstr, optional
Additional arguments related to the workflow.
- start_persistencebool, default=True
If True, enables message persistence in the configured databases.
- save_workflowbool, default=True
If True, a workflow object message is sent.
- delete_buffer_filebool or None, optional
If True, deletes any existing dump buffer file on startup. If None, uses project.dump_buffer.delete_previous_file from settings.yaml.
- Additional arguments (*args, **kwargs) are used for specific adapters.
For example, when using the Dask interceptor, the dask_client argument should be provided in kwargs to enable saving the Dask workflow, which is recommended.
- static delete_buffer_file(path: str = None)
Delete the buffer file from disk if it exists.
If no path is provided, the default path from the settings file is used. Logs whether the file was successfully removed or not found.
Parameters¶
- pathstr, optional
Path to the buffer JSONL file. If not provided, defaults to
DUMP_BUFFER_PATHas configured in the settings.
Returns¶
- None
The file is deleted from disk if it exists, no value is returned.
Notes¶
This operation only affects the file on disk. It does not clear the in-memory buffer.
Logging is performed through the class logger.
Examples¶
>>> flowcept.delete_buffer_file("buffer.jsonl") # Deletes buffer.jsonl if it exists
>>> flowcept.delete_buffer_file() # Deletes the default buffer file defined in settings
- dump_buffer(path: str = None)
Dump the current in-memory buffer to a JSON Lines (JSONL) file.
Each element of the buffer (a dictionary) is serialized as a single line of JSON. If no path is provided, the default path from the settings file is used.
Parameters¶
- pathstr, optional
Destination file path for the JSONL output. If not provided, defaults to
DUMP_BUFFER_PATHas configured in the settings.
Returns¶
- None
The buffer is written to disk, no value is returned.
Notes¶
The buffer is expected to be a list of dictionaries.
Existing files at the specified path will be overwritten.
Logging is performed through the class logger.
Examples¶
>>> flowcept.dump_buffer("buffer.jsonl") # Writes buffer contents to buffer.jsonl
>>> flowcept.dump_buffer() # Writes buffer contents to the default path defined in settings
- static generate_report(report_type: str = 'provenance_card', format: str = 'markdown', print_markdown: bool = False, output_path: str | None = None, input_jsonl_path: str | None = None, records: List[Dict[str, Any]] | None = None, workflow_id: str | None = None, campaign_id: str | None = None) Dict[str, Any]
Generate a Flowcept report from JSONL, records, or DB data.
Parameters¶
- report_typestr, optional
Report identifier. Supported values are
"provenance_card"and"provenance_report". Default is"provenance_card".- formatstr, optional
Output format.
"provenance_card"supports only"markdown", and"provenance_report"supports only"pdf". Default is"markdown".- print_markdownbool, optional
When
Trueandformat="markdown", render the generated markdown report to the terminal using Rich (install it with pip install flowcept[extras])- output_pathstr, optional
Destination path for the generated report file.
- input_jsonl_pathstr, optional
Path to a Flowcept JSONL buffer file used as report input.
- recordslist of dict, optional
In-memory workflow/task/object records used as report input.
- workflow_idstr, optional
Workflow identifier for DB query mode.
- campaign_idstr, optional
Campaign identifier for DB query mode.
Returns¶
- dict
Report generation metadata including output path and input mode.
Raises¶
- ValueError
If input-mode selection or report type/format is invalid.
- FileNotFoundError
If
input_jsonl_pathis selected but the file does not exist.- ModuleNotFoundError
If
print_markdown=Truewithout Rich installed.
- get_buffer(return_df: bool = False)
Retrieve the in-memory message buffer.
Parameters¶
- return_dfbool, optional
If False (default), return the raw buffer as a list of dictionaries. If True, normalize the buffer into a pandas DataFrame with dotted notation for nested keys. Requires
pandasto be installed.
Returns¶
- list of dict or pandas.DataFrame
If
return_df=False: the buffer as a list of dictionaries.If
return_df=True: the buffer as a normalized DataFrame.
Raises¶
- ModuleNotFoundError
If
return_df=Truebutpandasis not installed.
Examples¶
>>> buf = flowcept.get_buffer() >>> isinstance(buf, list) True
>>> df = flowcept.get_buffer(return_df=True) >>> "generated.attention" in df.columns True
- static read_buffer_file(file_path: str | None = None, return_df: bool = False, normalize_df: bool = False, consolidate: bool = False, workflow_id: str | None = None, cleanup_files: bool = True)
Read a JSON Lines (JSONL) file containing captured Flowcept messages.
This function loads a file where each line is a serialized JSON object. It joins the lines into a single JSON array and parses them efficiently with
orjson. Ifreturn_dfis True, it returns a pandas DataFrame created viapandas.json_normalize(..., sep='.')so nested fields become dot-separated columns (for example,generated.attention).Parameters¶
- file_pathstr, optional
Path to the buffer file. If not provided, defaults to the value of
DUMP_BUFFER_PATHfrom the configuration. If neither is provided, an assertion error is raised.- return_dfbool, default False
If True, return a normalized pandas DataFrame. If False, return the parsed list of dictionaries.
- normalize_df: bool, default False
If True, normalize the inner dicts (e.g., used, generated, custom_metadata) as individual columns in the returned DataFrame.
- consolidate: bool, default False
If True, merge all matching workflow buffer files into a single JSONL file first.
- workflow_idstr, optional
Workflow ID to use when consolidating buffer files.
- cleanup_filesbool, default True
If True, delete consolidated input files and keep a single JSONL file with only the workflow ID appended to the base path.
Returns¶
- list of dict or pandas.DataFrame
A list of message objects when
return_dfis False, otherwise a normalized DataFrame with dot-separated columns.
Raises¶
- AssertionError
If no
file_pathis provided andDUMP_BUFFER_PATHis not set.- FileNotFoundError
If the specified file does not exist.
- orjson.JSONDecodeError
If the file contents cannot be parsed as valid JSON.
- ModuleNotFoundError
If
return_dfis True but pandas is not installed.
Examples¶
Read messages as a list:
>>> msgs = read_buffer_file("offline_buffer.jsonl") >>> len(msgs) > 0 True
Read messages as a normalized DataFrame:
>>> df = read_buffer_file("offline_buffer.jsonl", return_df=True) >>> "generated.attention" in df.columns True
- save_workflow(interceptor: str, interceptor_instance: BaseInterceptor)
Save the current workflow and send its metadata using the provided interceptor.
This method assigns a unique workflow ID if one does not already exist, creates a WorkflowObject, and populates it with relevant metadata such as campaign ID, workflow name, and arguments. The interceptor is then used to send the workflow data.
Parameters¶
interceptor : str interceptor kind interceptor_instance: BaseInterceptor object to store the workflow info
Returns¶
None
- static services_alive() bool
Checks the liveness of the MQ (Message Queue) and, if enabled, the MongoDB service.
Returns¶
- bool
True if all services (MQ and optionally MongoDB) are alive, False otherwise.
Notes¶
The method tests the liveness of the MQ service using MQDao.
If MONGO_ENABLED is True, it also checks the liveness of the MongoDB service using MongoDBDAO.
Logs errors if any service is not ready, and logs success when both services are operational.
Examples¶
>>> is_alive = services_alive() >>> if is_alive: ... print("All services are running.") ... else: ... print("One or more services are not ready.")
- start() Flowcept
Start Flowcept Controller.
- static start_consumption_services(bundle_exec_id: str = None, check_safe_stops: bool = False, consumers: List = None)
Starts the document consumption services for processing.
Parameters¶
- bundle_exec_idstr, optional
The execution ID of the bundle being processed. Defaults to None.
- check_safe_stopsbool, optional
Whether to enable safe stop checks for the service. Defaults to False.
- consumersList, optional
A list of consumer types to be started. Currently, only one type of consumer is supported. Defaults to None.
Raises¶
- NotImplementedError
If multiple consumer types are provided in the consumers list.
Notes¶
The method initializes the DocumentInserter service, which processes documents based on the provided parameters.
The threaded parameter for DocumentInserter.start is set to False.
Examples¶
>>> start_consumption_services(bundle_exec_id="12345", check_safe_stops=True)
- stop()
Stop Flowcept controller.
Flowcept.db: Querying the Database¶
The Flowcept.db property exposes an instance of flowcept.flowcept_api.db_api.DBAPI,
providing high-level methods to query, insert, and update provenance data in the configured database.
Typical usage:
from flowcept import Flowcept
# Query tasks from the current workflow
tasks = Flowcept.db.get_tasks_from_current_workflow()
# Query workflows
workflows = Flowcept.db.workflow_query({"name": "my_workflow"})
# Insert or update a task/workflow
Flowcept.db.insert_or_update_task(my_task_obj)
Flowcept.db.insert_or_update_workflow(my_wf_obj)
Main Message Objects¶
- class flowcept.TaskObject
Task object class.
Represents a single provenance task in Flowcept, including inputs, outputs, execution metadata, telemetry, and environment details.
- activity_id: AnyStr = None
Activity name (usually the function name) associated with the task.
- adapter_id: AnyStr = None
Identifier of the adapter that produced this task (if any).
- address: AnyStr = None
Optional network address associated with the task.
- agent_id: str = None
Identifier of the agent that executed (or is going to execute) this task.
- campaign_id: AnyStr = None
Campaign identifier grouping related tasks together.
- custom_metadata: Dict[AnyStr, Any] = None
Custom metadata dictionary provided by the developer/user.
- data: Any = None
Arbitrary raw data payload associated with the task. It is good practice to add custom_metadata associated with data, especially if it contains file contents. In that case, custom_metadata should contain the keys “file_type”, “file_content”, “file_name”, “extension”.
- dependencies: List = None
List of task IDs this task depends on.
- dependents: List = None
List of task IDs that depend on this task.
- ended_at: float = None
Timestamp when the task execution ended.
- enrich(adapter_key=None)
Enrich it.
- static enrich_task_dict(task_dict: dict)
Enrich the task.
- environment_id: AnyStr = None
Identifier of the environment where the task executed.
- static from_dict(task_obj_dict: Dict[AnyStr, Any]) TaskObject
Create a TaskObject from a dictionary.
Parameters¶
- task_obj_dictDict[AnyStr, Any]
Dictionary containing task attributes.
Returns¶
- TaskObject
A TaskObject instance populated with available data.
- generated: Dict[AnyStr, Any] = None
Outputs produced by the task (results, artifacts, files).
- static get_dict_field_names()
Get field names.
- static get_time_field_names()
Get the time field.
- group_id: AnyStr = None
Grouping identifier, often used to link loop iterations together.
- hostname: AnyStr = None
Hostname of the machine executing the task.
- login_name: AnyStr = None
Login name of the user in the execution environment.
- mq_host: str = None
Message queue host associated with the task.
- node_name: AnyStr = None
Node name in a distributed system or HPC cluster.
- parent_task_id: AnyStr = None
Identifier of the parent task, if this task is nested or dependent.
- private_ip: AnyStr = None
Private IP address of the machine executing the task.
- public_ip: AnyStr = None
Public IP address of the machine executing the task.
- registered_at: float = None
Timestamp when the task was registered by the DocInserter.
- serialize()
Serialize it.
- source_agent_id: str = None
Identifier of the agent that sent this task to be executed (if any).
- started_at: float = None
Timestamp when the task execution started.
- status: Status = None
Execution status of the task (e.g., FINISHED, ERROR).
- stderr: AnyStr | Dict = None
Captured standard error from the task, if available.
- stdout: AnyStr | Dict = None
Captured standard output from the task, if available.
- submitted_at: float = None
Timestamp when the task was submitted.
- subtype: AnyStr = None
Optional subtype of the task (e.g., iteration, ML step, custom).
- tags: List = None
User-defined tags attached to the task.
- task_id: AnyStr = None
Unique identifier of the task.
- static task_id_field()
Get task id.
- telemetry_at_end: Telemetry = None
Telemetry snapshot captured at the end of the task.
- telemetry_at_start: Telemetry = None
Telemetry snapshot captured at the start of the task.
- to_dict()
Convert to dictionary.
- type = 'task'
Constant type label for this object (“task”).
- used: Dict[AnyStr, Any] = None
Inputs consumed by the task (parameters, files, resources).
- user: AnyStr = None
User who executed or triggered the task.
- utc_timestamp: float = None
UTC timestamp when the task object was created.
- workflow_id: AnyStr = None
Identifier of the workflow this task belongs to.
- static workflow_id_field()
Get workflow id.
- workflow_name: AnyStr = None
Name of the workflow this task belongs to.
- class flowcept.WorkflowObject(workflow_id=None, name=None, used=None, generated=None)
Workflow object class.
Represents metadata and provenance details for a workflow execution.
- adapter_id: AnyStr = None
Identifier of the adapter (e.g., Dask, MLflow) that triggered workflow capture.
- campaign_id: AnyStr = None
Identifier for grouping workflows into a campaign or experiment.
- code_repository: Dict = None
Details of the code repository (URL, commit hash, branch) used to run the workflow.
- conf: Dict = None
Workflow configuration parameters, such as hyperparameters or runtime options.
- custom_metadata: Dict = None
User-defined metadata dictionary with additional annotations.
- static deserialize(serialized_data) WorkflowObject
Deserialize it.
- enrich(adapter_key=None)
Enrich it.
- environment_id: str = None
Identifier for the runtime environment (e.g., conda env, container).
- extra_metadata: str = None
Optional free-form metadata for extensions not covered by other fields.
- flowcept_settings: Dict = None
Snapshot of Flowcept’s active settings used for this workflow.
- flowcept_version: AnyStr = None
Version of Flowcept used during execution.
- static from_dict(dict_obj: Dict) WorkflowObject
Convert from dictionary.
- generated: Dict = None
Outputs generated by the workflow (artifacts, models, or results).
- interceptor_ids: List = None
List of interceptors applied to this workflow (e.g., instrumentation, telemetry).
- machine_info: Dict = None
System or hardware information where the workflow is executed.
- name: AnyStr = None
Descriptive name for the workflow.
- parent_workflow_id: AnyStr = None
Identifier of the parent workflow, if this workflow is nested or derived.
- serialize()
Serialize it.
- subtype: AnyStr = None
Optional subtype of the workflow (e.g., data_prep_workflow, ml_workflow).
- sys_name: str = None
Logical system or facility name (e.g., HPC system name, cluster identifier).
- to_dict()
Convert to dictionary.
- used: Dict = None
Inputs consumed by the workflow (datasets, arguments, or configuration values).
- user: AnyStr = None
User who launched or owns the workflow run.
- utc_timestamp: float = None
Timestamp (UTC, in seconds) when the workflow object was created.
- workflow_id: AnyStr = None
Unique identifier for the workflow.
- static workflow_id_field()
Get workflow id.
FlowceptTask¶
- class flowcept.FlowceptTask(task_id: str = None, workflow_id: str = None, campaign_id: str = None, activity_id: str = None, agent_id: str = None, source_agent_id: str = None, parent_task_id: str = None, used: Dict = None, data: Any = None, subtype: str = None, hostname: str = None, tags: List[str] = None, adapter_id: str = None, custom_metadata: Dict = None, generated: Dict = None, started_at: float = None, ended_at: float = None, stdout: str = None, stderr: str = None, status: Status = None)
Bases:
objectA context manager for capturing and provenance and task telemetry data within the Flowcept framework.
This class encapsulates the lifecycle of a task, recording its start and end times, telemetry, and metadata. It integrates with the Flowcept API and Instrumentation Interceptor to log task-specific details.
Methods¶
- __enter__()
Sets up the task context.
- __exit__(exc_type, exc_val, exc_tb)
Ends the task context, ensuring telemetry and metadata are recorded.
- end(generated=None, ended_at=None, stdout=None, stderr=None, status=Status.FINISHED)
Finalizes the task, capturing telemetry, status, and other details.
Notes¶
If instrumentation is disabled (INSTRUMENTATION_ENABLED is False), the methods in this class are no-ops, and no data is captured.
- __init__(task_id: str = None, workflow_id: str = None, campaign_id: str = None, activity_id: str = None, agent_id: str = None, source_agent_id: str = None, parent_task_id: str = None, used: Dict = None, data: Any = None, subtype: str = None, hostname: str = None, tags: List[str] = None, adapter_id: str = None, custom_metadata: Dict = None, generated: Dict = None, started_at: float = None, ended_at: float = None, stdout: str = None, stderr: str = None, status: Status = None)
Initializes a FlowceptTask and optionally finalizes it.
If any of the following optional arguments are provided — generated, ended_at, stdout, stderr, or status — the task will be automatically finalized by calling end() during initialization. This is useful when the task’s outcome is already known at the moment of instantiation.
Parameters¶
- task_idstr, optional
Unique identifier for the task. Defaults to a generated ID.
- workflow_idstr, optional
ID of the workflow to which this task belongs.
- campaign_idstr, optional
ID of the campaign to which this task belongs.
- activity_idstr, optional
Describes the specific activity this task captures.
- usedDict, optional
Metadata about resources or data used during the task.
- data: Any, optional
Any raw data associated to this task.
- subtypestr, optional
Optional string categorizing the task subtype.
- custom_metadataDict, optional
Additional user-defined metadata to associate with the task.
- generatedDict, optional
Output data generated during the task execution.
- started_atfloat, optional
Timestamp indicating when the task started.
- ended_atfloat, optional
Timestamp indicating when the task ended.
- stdoutstr, optional
Captured standard output from the task.
- stderrstr, optional
Captured standard error from the task.
- statusStatus, optional
Task completion status. If provided, defaults to Status.FINISHED if unspecified.
- end(generated: Dict = None, ended_at: float = None, stdout: str = None, stderr: str = None, data: Any = None, custom_metadata: Dict = None, status: Status | None = None)
Finalizes the task by capturing its end state, telemetry, and status.
This method records the task’s ending telemetry data, status, and any outputs or errors. It also sends the task data to the instrumentation interceptor for logging or further processing.
Parameters¶
- generatedDict, optional
Metadata or data generated during the task’s execution. Defaults to None.
- data: Any, optional
Any raw data associated to this task.
- custom_metadataDict, optional
Additional user-defined metadata to associate with the task.
- ended_atfloat, optional
Timestamp indicating when the task ended. If not provided, defaults to the current time.
- stdoutstr, optional
Standard output captured during the task’s execution. Defaults to None.
- stderrstr, optional
Standard error captured during the task’s execution. Defaults to None.
- statusStatus, optional
Status of the task at the time of completion. Defaults to Status.FINISHED.
Notes¶
If instrumentation is disabled (INSTRUMENTATION_ENABLED is False), this method performs no actions.
- get_agent_id()
Return the agent_id identifier.
Returns¶
- Any
Identifier of the campaign associated with the current task.
- get_campaign_id()
Return the campaign identifier.
Returns¶
- Any
Identifier of the campaign associated with the current task.
- get_id()
Return the task identifier.
Returns¶
- Any
Identifier associated with the current task.
- get_workflow_id()
Return the workflow identifier.
Returns¶
- Any
Identifier of the workflow associated with the current task.
- send()
Finalizes and sends the task data if not already ended.
This method acts as a simple alias for finalizing the task without requiring additional arguments. It sends the task object to the interceptor for capture and marks it as ended to prevent multiple submissions.
FlowceptLoop¶
Can be imported via from flowcept import FlowceptLoop
- class flowcept.instrumentation.flowcept_loop.FlowceptLoop(items: Sized | Iterator | int, loop_name='loop', item_name='item', parent_task_id=None, workflow_id=None, items_length=0, capture_enabled=True)
Bases:
objectA utility class to wrap and instrument iterable loops for telemetry and tracking.
The FlowceptLoop class supports iterating over a collection of items or a numeric range while capturing metadata for each iteration and for the loop as a whole. This is particularly useful in scenarios where tracking and instrumentation of loop executions is required.
Notes¶
This class integrates with the Flowcept system for telemetry and tracking, ensuring detailed monitoring of loops and their iterations. It is designed for cases where capturing granular runtime behavior of loops is critical.
- __init__(items: Sized | Iterator | int, loop_name='loop', item_name='item', parent_task_id=None, workflow_id=None, items_length=0, capture_enabled=True)
Initialize a FlowceptLoop instance for tracking iterations.
This constructor wraps an iterable, numeric range, or explicit iterator into a loop context where each iteration is instrumented with provenance and optional telemetry. If instrumentation is disabled, the loop behaves like a normal Python iterator with minimal overhead.
Parameters¶
- itemsUnion[Sized, Iterator, int]
The items to iterate over. Can be:
A sized iterable (e.g., list, range).
An integer (interpreted as
range(items)).An iterator (requires
items_lengthif length cannot be inferred).
- loop_namestr, optional
A descriptive name for the loop. Used in provenance as the loop’s activity identifier. Default is
"loop".- item_namestr, optional
The key name under which each iteration’s item is recorded in provenance. Default is
"item".- parent_task_idstr, optional
The identifier of a parent task, if this loop is nested within another task. Default is
None.- workflow_idstr, optional
Identifier for the workflow this loop belongs to. If not provided, it is inherited from the current Flowcept context or generated as a UUID.
- items_lengthint, optional
Explicit number of items if
itemsis an iterator without a defined length. Default is0.- capture_enabledbool, optional
Whether to enable provenance/telemetry capture. If
False, the loop runs without instrumentation. Default isTrue.
Raises¶
- Exception
If
itemsis not a supported type (sized iterable, integer, or iterator).
Notes¶
Each iteration is recorded with
used(inputs) and optionalgeneratedvalues, plus telemetry if enabled.Iteration metadata is finalized at the end of each iteration and sent to the active Flowcept interceptor.
- get_current_iteration_id()
Get current iteration’s task id.
FlowceptLightweightLoop¶
Can be imported via from flowcept import FlowceptLightweightLoop
- class flowcept.instrumentation.flowcept_loop.FlowceptLightweightLoop(items: Sized | Iterator, loop_name='loop', item_name='item', parent_task_id=None, workflow_id=None, items_length=0, capture_enabled=True)
Bases:
objectA utility class to wrap and instrument iterable loops for telemetry and tracking.
The FlowceptLightweightLoop class supports iterating over a collection of items or a numeric range while capturing metadata for each iteration and for the loop as a whole. This is particularly useful in scenarios where tracking and instrumentation of loop executions is required.
Parameters¶
- itemsUnion[Sized, Iterator]
The items to iterate over. Must either be an iterable with a __len__ method or an integer representing the range of iteration.
- loop_namestr, optional
A descriptive name for the loop (default is “loop”).
- item_namestr, optional
The name used for each item in the telemetry (default is “item”).
- parent_task_idstr, optional
The ID of the parent task associated with the loop, if applicable (default is None).
- workflow_idstr, optional
The workflow ID to associate with this loop. If not provided, it will be generated or inferred from the current workflow context.
Raises¶
- Exception
If items is not an iterable with a __len__ method or an integer.
Notes¶
This class integrates with the Flowcept system for telemetry and tracking, ensuring detailed monitoring of loops and their iterations. It is designed for cases where capturing granular runtime behavior of loops is critical.
- __init__(items: Sized | Iterator, loop_name='loop', item_name='item', parent_task_id=None, workflow_id=None, items_length=0, capture_enabled=True)
Initialize a FlowceptLightweightLoop instance for tracking iterations.
This constructor provides a lower-overhead loop wrapper compared to
FlowceptLoop. Iterations are pre-registered as task objects, and capture primarily updatesusedandgeneratedfields as the loop progresses.Parameters¶
- itemsUnion[Sized, Iterator]
The items to iterate over. Must either be:
A sized iterable (with
__len__).An explicit iterator (length must be given by
items_length).
- loop_namestr, optional
A descriptive name for the loop. Used in provenance as the loop’s activity identifier. Default is
"loop".- item_namestr, optional
The key name under which each iteration’s item is recorded in provenance. Default is
"item".- parent_task_idstr, optional
The identifier of a parent task, if this loop is nested within another task. Default is
None.- workflow_idstr, optional
Identifier for the workflow this loop belongs to. If not provided, it is inherited from the current Flowcept context or generated as a UUID.
- items_lengthint, optional
Explicit number of items if
itemsis an iterator without a defined length. Default is0.- capture_enabledbool, optional
Whether to enable provenance/telemetry capture. If
False, the loop runs without instrumentation. Default isTrue.
Raises¶
- Exception
If
itemsis neither a sized iterable nor an iterator.
Notes¶
This class is designed for high-performance scenarios with many iterations.
Iteration tasks are pre-allocated, and provenance capture is batched via the Flowcept interceptor.
Compared to
FlowceptLoop, this class avoids per-iteration telemetry overhead unless explicitly enabled.
- end_iter(generated_value: Dict)
Finalizes the current iteration by associating generated values with the iteration metadata.
This method updates the metadata of the current iteration to include the values generated during the iteration, ensuring they are properly logged and tracked.
Parameters¶
- generated_valuedict
A dictionary containing the generated values for the current iteration. These values will be stored in the generated field of the iteration’s metadata.
- get_current_iteration_id()
Get current iteration’s task id.