Source code for plf._pipeline

"""
Pipeline class  holder
"""

from typing import TypedDict, Optional, Dict, Union, Any
import json
import os
from pathlib import Path

import traceback

from .utils import (
    load_component,
    hash_args,
    get_invalid_loc_queries,
    Db,
    Component)

from ._transfer_utils import TransferContext
from .context import get_shared_data, set_shared_data

class CompsDict(TypedDict):
    """
    fgfvv
    """

    model: Component
    loss: Component
    optimizer: Component
    dataset: Component
    metrics: Dict[str, Component]


# _core.py
[docs] class PipeLine: """ khgkjv """ def __init__(self, pplid=None): """ Initialize the pipeline with default settings and empty components. """ self._paths = ['config'] self.settings = get_shared_data() self.pplid = None self.workflow = None self.cnfg = None self._prepared = False self.__db = Db(db_path=f"{self.settings['data_path']}/ppls.db") if pplid: self.load(pplid=pplid) def _save_config(self) -> None: """ Save the current experiment configuration to a JSON file. This method writes the configuration stored in `self.cnfg` to a config file, but only if the hash of the current arguments matches the stored experiment ID. This ensures consistency and prevents accidental overwrites due to argument changes. Raises ------ ValueError If the current arguments do not match the stored experiment's arguments, indicating that the configuration has been modified since it was created. """ if self.verify(cnfg=self.cnfg) == self.cnfg["pplid"]: with open(self.get_path(of="config"), "w", encoding="utf-8") as out_file: json.dump(self.cnfg, out_file, indent=4) else: raise ValueError( f"can not save config for Experiment: {self.cnfg['pplid']}." "\n it's args has been changed" )
[docs] def get_path( self, of: str, pplid: Optional[str] = None, args: Optional[Dict] = None ) -> str: """ Generate a standardized file path for various experiment artifacts. Constructs and returns a file path based on the type of file (`of`), experiment ID, epoch number, and batch index, where applicable. Automatically creates necessary directories if they do not exist. Parameters ---------- of : str The type of file to retrieve the path for. Supported values: - "config": Configuration file path. - "weight": Model weights file path. - "gradient": Saved gradients file path. - "history": Training history file path. - "quick": Quick config file path. pplid : str, optional Experiment ID. If not provided, uses the currently set `self.pplid`. epoch : int, optional Epoch number. Required for weight and gradient file paths. For weights, if not specified, the best epoch from config is used. batch : int, optional Batch index, required for gradient file paths. Returns ------- str Full path to the specified artifact as a string with forward slashes. Raises ------ ValueError If `pplid` is not set or invalid. If required parameters (`epoch`, `batch`) are missing for gradient paths. If the `of` argument is not one of the supported values. """ pplid = pplid or self.pplid if not pplid: raise ValueError("Experiment ID (pplid) must be provided.") base_path = Path(self.settings["data_path"]) if of == "config": path = Path("Configs") / f"{pplid}.json" else: if self.workflow is None: self.workflow = self.load_component(**self.cnfg['workflow']) path = self.workflow.get_path(of=of, pplid=pplid, args=args) path = base_path / path path = path.as_posix() os.makedirs(os.path.dirname(path), exist_ok=True) return path
[docs] def load(self, pplid: str, prepare: bool = False): """Load a pipeline configuration from disk""" self.reset() if not self.verify(pplid=pplid): raise ValueError(f"The pplid: {pplid} does not exist") cfg_path = self.get_path(of="config", pplid=pplid) with open(cfg_path, encoding="utf-8") as f: self.cnfg = json.load(f) self.pplid = pplid if prepare: self.prepare()
[docs] def reset(self): """ reset """ self.pplid = None self.settings = get_shared_data() self.cnfg = None self._prepared = False self.workflow = None self.__db = Db(db_path=f"{self.settings['data_path']}/ppls.db")
[docs] def load_component(self,loc: str, args: Optional[Dict[str, Any]] = None, setup: bool = True): # if lab_role nopt exists then add the key in json file at self.settings['settings_path] if self.settings.get("lab_role") != "base": Tsx = TransferContext() loc = Tsx.map_loc(loc, pplid=self.pplid) comp = load_component(loc=loc, args=args, setup=setup) comp.P = self return comp
[docs] def verify(self, *, pplid: str = None, cnfg: Dict = None) -> Union[str, bool]: """ Check whether a given experiment ID exists in the experiment database. Queries the experiments table to verify whether the specified experiment ID is recorded. Parameters ---------- pplid : str The experiment ID to check. Returns ------- Union[str, bool] Returns the `pplid` if it exists in the database, otherwise returns `False`. Examples -------- >>> pipeline.verify("exp_001") 'exp_001' >>> pipeline.verify("nonexistent_exp") False """ if pplid: result = self.__db.query( "SELECT 1 FROM ppls WHERE pplid = ? LIMIT 1", (pplid,) ) if len(result) > 0: return pplid elif cnfg: args = { 'workflow':cnfg['workflow'], 'args': cnfg['args'] } args_hash = hash_args(args) rows = self.__db.query( "SELECT pplid FROM ppls WHERE args_hash =? LIMIT 1", (args_hash,) ) if rows: pplid = rows[0][0] return pplid return False
def _check_args(self, cnfg): t = get_invalid_loc_queries(cnfg) if t: raise ValueError( "Make sure all components are saved.\nReff: " + ", ".join(t) ) t = self.verify(cnfg=cnfg) if t: raise ValueError(f"same configuration is already exists in: {t}")
[docs] def new( self, pplid: Optional[str] = None, args: Optional[Dict[str, Any]] = None, prepare: bool = False, ) -> None: """ Create a new experiment configuration and initialize its tracking files. Parameters ---------- pplid : str, optional Unique experiment identifier. Raises ValueError if it already exists. args : dict, optional Configuration arguments for the experiment. prepare : bool, optional If True, calls `self.prepare()` after creation. Defaults to False. Raises ------ ValueError If the experiment ID already exists or if monitor mode is invalid. KeyError If 'metrics' key is missing from settings. Behavior -------- - Checks if the experiment ID already exists; raises an error if so. - Checks if the same configuration already exists using `verify`. - Initializes configuration dictionary with metadata. - Saves the configuration. - Creates an empty history CSV with columns for training and validation metrics and loss. - Initializes quick checkpoint file with default best and last epoch metrics. - Appends experiment metadata to the main experiments CSV. - Optionally calls `self.prepare()` if `prepare=True`. """ if self.settings.get("lab_role") != "base": print("cant use new in remote lab") return if self.verify(pplid=pplid): raise ValueError(f"{pplid} is already exists try different id") self._check_args(args) t = { "pplid": pplid, **args } self.pplid = pplid self.cnfg = t try: self.workflow = self.load_component(**args['workflow']) self.workflow.new(args['args']) except: traceback.print_exc() raise self.__db.execute( "INSERT INTO ppls (pplid, args_hash) VALUES (?, ?)", (pplid, hash_args(args)), ) self._save_config() if prepare: self.prepare()
[docs] def prepare(self) -> None: """ Prepare the experiment by loading model, optimizer, metrics, loss, and data loaders. Loads components according to current configuration, initializes data loaders, and sets the best metric value based on the stored history and strategy. Raises ------ ValueError If strategy monitor mode is not 'min' or 'max'. Behavior -------- - Loads model and moves it to device. - Loads optimizer with model parameters. - Loads metrics and loss functions to device. - Creates training and validation data loaders. - Loads last saved model weights. - Initializes the best metric value from saved checkpoints or sets default. - Sets internal flag `_prepared` to True on success. """ try: if self.settings.get("lab_role") != "base": Tsx = TransferContext() self.cnfg = Tsx.map_cnfg(self.cnfg) self.workflow = self.load_component(**self.cnfg['workflow']) self._prepared = self.workflow.prepare() except: traceback.print_exc()
[docs] def run(self) -> None: if not self._prepared: print( "Preparation Error. Execute prepare() or set prepare=True before training." ) return rows = self.__db.query( "SELECT logid FROM runnings WHERE pplid = ?", (self.pplid,) ) if rows: print(f"pplid: {self.pplid} is running in logid: {rows[0][0]}") return try: self.__db.execute( "INSERT INTO runnings (pplid, logid) VALUES (?, ?)", (self.pplid, self.settings["logid"]), ) self.workflow.run() except (RuntimeError, ValueError, KeyError) as e: print("Error in training loop:", e) traceback.print_exc() except BaseException as e: print("Unexpected error in training loop:", type(e).__name__, e) traceback.print_exc() finally: self.__db.execute("DELETE FROM runnings WHERE pplid = ?", (self.pplid,))
[docs] def is_running(self): rows = self.__db.query( "SELECT logid FROM runnings WHERE pplid = ?", (self.pplid,) ) if rows: return rows[0][0] return False
@property def should_running(self): rows = self.__db.query( "SELECT parity FROM runnings WHERE pplid = ?", (self.pplid,) ) if rows and rows[0][0]=='stop': return False return True
[docs] def stop_running(self): logid = self.is_running() if logid: self.__db.execute( "UPDATE runnings SET parity = ? WHERE logid = ?", ('stop', logid) ) print(f"ppid:{self.pplid} will be stopped at logid:{logid} after current iteration") else: print("it is not running anywhere")
@property def paths(self): artifs = self._paths if self.workflow is None and self.pplid: self.workflow = self.load_component(**self.cnfg['workflow']) if self.workflow: artifs += self.workflow.paths return artifs
[docs] def clean(self): if self.cnfg==None: print("Empty Pipeline") return try: if self.workflow is None: self.workflow = self.load_component(**self.cnfg['workflow']) self.workflow.clean() except: traceback.print_exc()
[docs] def status(self): if self.cnfg==None: print("Empty Pipeline") return try: if self.workflow is None: self.workflow = self.load_component(**self.cnfg['workflow']) return self.workflow.status() except: traceback.print_exc()
from copy import deepcopy