utils

This module provides

class Component(loc=None)[source]

Bases: ABC

Base class for all components with dynamic loading capability.

Parameters:

loc (str)

loc

Location identifier for the component.

Type:

str

args

Expected keys for arguments.

Type:

dict

check_args(args)[source]

Check whether provided args contain all required keys.

Parameters:

args (dict)

Return type:

bool

load_component(loc, args=None, setup=True)[source]
Parameters:
  • loc (str)

  • args (Dict[str, Any] | None)

  • setup (bool)

setup(args)[source]

Set up the component with provided arguments.

Parameters:

args (Dict[str, Any]) – Dictionary of arguments to initialize the component.

Returns:

Initialized component or setup result.

Return type:

Optional[Any]

class Db(db_path)[source]

Bases: object

Lightweight SQLite wrapper with foreign key enforcement.

Parameters:

db_path (str) – Path to the SQLite database file.

Raises:

FileNotFoundError – If the directory for the DB path doesn’t exist.

close()[source]

Closes the database connection.

Return type:

None

execute(query, params=())[source]

Execute a SQL query (INSERT, UPDATE, DELETE).

Parameters:
  • query (str) – SQL query string.

  • params (tuple) – Query parameters.

Returns:

sqlite3.Cursor or None

Return type:

Cursor | None

query(query, params=())[source]

Execute a SELECT query and fetch all results.

Parameters:
  • query (str) – SQL query string.

  • params (tuple) – Query parameters.

Returns:

Query results.

Return type:

list

class WorkFlow(loc=None)[source]

Bases: Component, ABC

Abstract base class for all workflows.

Workflows are intended to be managed by the PipeLine class. When a pipeline is created via PipeLine.new(), it takes a workflow configuration, instantiates the workflow, and passes the workflow-specific arguments to it. Therefore, all required workflow parameters should be validated at this point using the workflow’s template.

GUIDELINES

  1. Initialization and Argument Validation - When PipeLine.new() is called:

    • The pipeline verifies that the workflow exists and is properly defined.

    • All workflow-specific arguments (args) should be checked against the workflow’s template to ensure completeness and correctness.

    • Duplicate configurations (same args) should be detected and prevented.

    • The workflow must implement a new(name: str, **kwargs) method to initialize itself with workflow-specific arguments.

  2. Preparation - The workflow’s prepare() method is called by the pipeline to initialize

    all necessary components or resources required for execution.

    • Workflow implementations should convert any required objects or configuration entries from the pipeline config (self.P.cnfg) into Python objects here.

    • After prepare() completes, run() should be safe to execute.

  3. Execution - The workflow’s run() method is called by the pipeline when execution starts. - run() should implement the main computation or processing according to the workflow’s purpose. - Workflows should assume that prepare() has already been called.

  4. Path Management - Workflows must implement get_path(of: str, args: Optional[Dict] = None) -> str. - The pipeline only handles the path for the configuration file; all other paths

    are redirected to the workflow.

    • All output, intermediate, or artifact paths should be tracked in self.paths.

    • Avoid hard-coded paths; always generate paths dynamically so pipelines can move or copy artifacts safely.

  5. Optional Methods - clean(): Delete temporary files, cached outputs, or intermediate artifacts. - status() -> str: Return workflow status or progress information. - These methods are called by the pipeline when needed.

  6. Best Practices - Ensure deterministic behavior: same inputs should produce the same outputs. - Handle missing resources or exceptions gracefully with clear error messages. - Use consistent naming for workflow IDs, versions, and artifact paths. - Load components dynamically via self.load_component. - Workflows should be independent of any specific domain or technology.

REQUIRED METHODS

  • new(self, name: str, **kwargs)

  • prepare(self, *args, **kwargs)

  • run(self, *args, **kwargs)

  • get_path(self, of: str, args: Optional[Dict] = None) -> str

OPTIONAL METHODS

  • clean(self, *args, **kwargs)

  • status(self, *args, **kwargs) -> str

clean()[source]

Clean up temporary files, cached outputs, or intermediate artifacts.

abstract get_path(of, args=None)[source]

Return a standardized path for the requested artifact type (of). All workflow-specific path options should be listed in self.paths. This ensures that when a pipeline is transferred, all artifacts are correctly located.

Parameters:
  • of (str)

  • args (Dict | None)

Return type:

str

abstract new(args)[source]

Initialize a new workflow instance with the given name and arguments.

abstract prepare()[source]

Called when PipeLine.prepare() is executed. Convert necessary components from the configuration dictionary into Python objects here so that the workflow is ready for run().

abstract run()[source]

Called when PipeLine.run() is executed. Implement the main computation or processing logic here.

status()[source]

Return the current status or progress of the workflow.

Return type:

str

Parameters:

loc (str)

extract_all_locs(d)[source]

Recursively extract all ‘loc’ values from nested dictionaries or lists. A component is defined as a dict with a ‘loc’ key and optional ‘args’.

Parameters:

d (Dict | List)

Return type:

List[str]

filter_configs(query: str, ids: List[str], loader_func: Callable[[str], Dict[str, Any]], params: Literal[True]) DataFrame[source]
filter_configs(query: str, ids: List[str], loader_func: Callable[[str], Dict[str, Any]], params: Literal[False] = False) List[str]

Filter and extract information from a collection of configurations.

get_invalid_loc_queries(d, parent_key='')[source]

Recursively search a nested dictionary or list for invalid ‘loc’ entries.

A ‘loc’ entry is considered invalid if it is not a string or does not contain a dot (‘.’).

Parameters:
  • d (Union[Dict, List]) – The nested dictionary or list to inspect.

  • parent_key (str, optional) – The concatenated key path used during recursion, by default “”. This helps identify where in the nested structure the invalid ‘loc’ is.

Returns:

A list of key paths (strings) to all invalid ‘loc’ entries found. Each path uses ‘>’ for dict keys and ‘[index]’ for list indices.

Return type:

List[str]

get_matching(base_id, get_ids_fn, loader_fn, query=None, include=False)[source]

Get IDs of configurations that match the same flattened key-value pair(s) as a base config.

Parameters:
  • base_id (str) – ID of the base configuration.

  • get_ids_fn (Callable) – Function to retrieve all configuration IDs.

  • loader_fn (Callable) – Function to load a configuration given its ID.

  • query (str, optional) – Specific query key or ‘key=value’ pair.

Returns:

Mapping of matched query to list of matching IDs.

Return type:

Dict[str, List[str]]

hash_args(args)[source]

Generate a SHA-256 hash from a dictionary of arguments.

This is commonly used to uniquely identify a configuration or set of parameters.

Parameters:

args (dict) – The dictionary of arguments to be hashed. Must be JSON-serializable.

Returns:

A SHA-256 hash string representing the input dictionary.

Return type:

str

Raises:
  • TypeError

  • If the dictionary contains non-serializable values.

load_component(loc, args=None, setup=True)[source]

Dynamically load and optionally initialize a component class.

This utility imports a class from a given module path and instantiates it. If the class defines a setup method and setup=True, it calls setup(args) and returns the initialized component. Otherwise, it returns the raw instance.

Parameters:
  • loc (str) – Fully qualified class location in dot notation (e.g., ‘CompBase.models.MyModel’). If no dot is present, it is assumed the class is defined in __main__.

  • args (dict, optional) – Dictionary of arguments to pass to the setup() method, if applicable. Defaults to an empty dict.

  • setup (bool, optional) – Whether to invoke the component’s setup method after instantiation. Defaults to True.

Returns:

An instance of the loaded class, either raw or configured via setup().

Return type:

Any

Raises:
  • ComponentLoadError – If the specified class is not found in the target module.

  • ImportError – If the module cannot be imported.