Source code for plf.lab

"""
create or use our lab
"""

from pathlib import Path
import os
import json
from datetime import datetime
from typing import Optional
import pandas as pd


from .context import set_shared_data, get_caller, register_libs_path, get_shared_data
from .utils import Db

__all__ = ["lab_setup", "create_project", "get_logs", 'create_clone', 'init_clone']
 
def export_settigns():
    settings = get_shared_data()
    # Change project_path to data_path parent
    pth = os.path.join(Path(settings['data_path']).parent, settings["project_name"] + ".json")
    with open(pth, "w", encoding="utf-8") as out_file:
        json.dump(settings, out_file, indent=4)
    return pth

[docs] def create_project(settings: dict) -> str: """ Create the project directory structure, databases, and settings file. Returns the absolute path to the settings JSON. """ project_dir = os.path.abspath(settings["project_dir"]) project_name = settings["project_name"] component_dir = os.path.abspath(settings["component_dir"]) # Derived paths data_path = os.path.join(project_dir, project_name) setting_path = os.path.join(data_path, f"{project_name}.json") # Update settings with absolute paths settings.update({ "lab_id": None, "lab_role": "base", "project_dir": project_dir, "component_dir": component_dir, "data_path": data_path, "setting_path": setting_path, }) # Create required directories for key in ["data_path", "component_dir"]: os.makedirs(settings[key], exist_ok=True) # Remove old databases if any for db_file in ["logs.db", "ppls.db"]: db_path = os.path.join(data_path, db_file) if os.path.exists(db_path): os.remove(db_path) base_dir = Path(data_path) if base_dir.exists() and base_dir.is_dir(): new_folder = base_dir / "Clones" new_folder.mkdir(exist_ok=True) # Setup DBs and shared data setup_databases(settings) set_shared_data(settings) # Save settings file with open(setting_path, "w", encoding="utf-8") as f: json.dump(settings, f, indent=4) return setting_path
def create_and_init_db(db_path: str, tables: list, init_statements: list = None): db = Db(db_path=db_path) for table_sql in tables: db.execute(table_sql) if init_statements: for stmt, params in init_statements: db.execute(stmt, params) db.close() def setup_databases(settings: dict): """ Sets up the required databases for the lab project, including: - logs.db (with logs table) - ppls.db (with ppls, edges, runnings tables) - Archived/ppls.db (with ppls table) """ # ---- logs.db ---- logs_db_path = os.path.join(settings["data_path"], "logs.db") logs_table = """ CREATE TABLE IF NOT EXISTS logs ( logid TEXT PRIMARY KEY, called_at TEXT NOT NULL, created_time TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); """ log_init = [("INSERT INTO logs (logid, called_at) VALUES (?, ?)", ('log0', get_caller()))] create_and_init_db(logs_db_path, [logs_table], log_init) # ---- ppls.db ---- ppls_db_path = os.path.join(settings["data_path"], "ppls.db") ppls_tables = [ """ CREATE TABLE IF NOT EXISTS ppls ( pplid TEXT PRIMARY KEY, args_hash TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'init' CHECK(status IN ('init', 'running', 'frozen', 'cleaned')), created_time TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); """, """ CREATE TABLE IF NOT EXISTS edges ( edgid INTEGER PRIMARY KEY AUTOINCREMENT, prev TEXT NOT NULL, next TEXT NOT NULL, desc TEXT, directed BOOL DEFAULT TRUE, FOREIGN KEY(prev) REFERENCES ppls(pplid), FOREIGN KEY(next) REFERENCES ppls(pplid) ); """, """ CREATE TABLE IF NOT EXISTS runnings ( runid INTEGER PRIMARY KEY AUTOINCREMENT, pplid NOT NULL, logid TEXT DEFAULT NULL, parity TEXT DEFAULT NULL, started_time TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(pplid) REFERENCES ppls(pplid) ); """ ] create_and_init_db(ppls_db_path, ppls_tables) os.makedirs(os.path.join(settings["data_path"], "Archived"), exist_ok=True) # ---- Archived/ppls.db ---- archived_ppls_db_path = os.path.join(settings["data_path"], "Archived", "ppls.db") archived_ppls_table = """ CREATE TABLE IF NOT EXISTS ppls ( pplid TEXT PRIMARY KEY, args_hash TEXT NOT NULL, status TEXT NOT NULL DEFAULT 'init' CHECK(status IN ('init', 'running', 'frozen', 'cleaned')), created_time TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP ); """ create_and_init_db(archived_ppls_db_path, [archived_ppls_table])
[docs] def lab_setup(settings_path: Optional[str]) -> None: if settings_path and os.path.exists(settings_path): with open(settings_path, encoding="utf-8") as sp: settings = json.load(sp) # Check if 'lab_role' exists in settings; add default if missing if "lab_role" not in settings: settings["lab_role"] = "base" # or some default value # Update the JSON settings file settings_path = settings.get("settings_path") if settings_path: with open(settings_path, "w") as f: json.dump(settings, f, indent=4) else: raise ValueError("Provide either settings_path or settings for lab setup") caller = get_caller() log_path = os.path.join(settings["data_path"], "logs.db") db = Db(db_path=log_path) # Get current number of logs cursor = db.execute("SELECT COUNT(*) FROM logs") row_count = cursor.fetchone()[0] logid = f"log{row_count}" # Insert new log db.execute( "INSERT INTO logs (logid, called_at) VALUES (?, ?)", (logid, caller) ) db.close() set_shared_data(settings, logid) register_libs_path(settings["component_dir"])
[docs] def get_logs(): """ Retrieve all log records from the logs database and return them as a DataFrame. This function reads shared application settings to locate the SQLite `logs.db` file, queries all rows from the `logs` table, and converts the results into a pandas DataFrame with column names preserved. Returns ------- pandas.DataFrame A DataFrame containing all records from the `logs` table. """ settings = get_shared_data() log_path = os.path.join(settings["data_path"], "logs.db") db = Db(db_path=log_path) cursor = db.execute("SELECT * FROM logs") rows = cursor.fetchall() col_names = [desc[0] for desc in cursor.description] db.close() df = pd.DataFrame(rows, columns=col_names) return df
[docs] def create_clone(name, desc="", clone_type="remote", clone_id=None): """ Create a clone entry in BASE lab. If clone_id is not provided, a unique one is generated automatically. """ settings = get_shared_data() clones_root = Path(settings["data_path"]) / "Clones" # ----------------------------- # Generate unique clone_id # ----------------------------- if clone_id is None: while True: clone_id = ( "cl_" + datetime.utcnow().strftime("%Y%m%d_%H%M%S") + "_" + uuid4().hex[:6] ) if not (clones_root / clone_id).exists(): break clones_dir = clones_root / clone_id clones_dir.mkdir(parents=True, exist_ok=False) clone_cfg = { "clone_id": clone_id, "clone_type": clone_type, "name": name, "desc": desc, "created_at": datetime.utcnow().isoformat(), "transfers": [] } with open(clones_dir / "clone.json", "w", encoding="utf-8") as f: json.dump(clone_cfg, f, indent=4) return clone_cfg
[docs] def init_clone( clone_config: dict, data_path: str, component_dir: str, ): clone_id = clone_config["clone_id"] # Absolute paths data_path = os.path.abspath(data_path) component_dir = os.path.abspath(component_dir) # Project naming project_name = f"remote_{clone_id}" project_dir = os.path.dirname(data_path) settings = { # Identity "lab_id": clone_id, "lab_role": "remote", # Project "project_name": project_name, "project_dir": project_dir, "component_dir": component_dir, } # ----------------------------- # Create directories # ----------------------------- os.makedirs(data_path, exist_ok=True) os.makedirs(component_dir, exist_ok=True) # Required runtime dirs for d in [ "Transfers", "TransfersOut", "RemoteResults", "Archived", ]: os.makedirs(os.path.join(data_path, d), exist_ok=True) # ----------------------------- # Setup databases # ----------------------------- setup_databases(settings) # ----------------------------- # Persist settings # ----------------------------- settings_path = os.path.join(data_path, f"{project_name}.json") settings["setting_path"] = settings_path with open(settings_path, "w", encoding="utf-8") as f: json.dump(settings, f, indent=4) # ----------------------------- # Activate lab # ----------------------------- set_shared_data(settings) register_libs_path(component_dir) return settings_path