Source code for plf._transfer_utils

import json
from pathlib import Path
from .context import get_shared_data
    
    
    


def _load_transfer_config():
    settings = get_shared_data()

    lab_base = Path(settings["data_path"]).resolve()

    transfers_dir = lab_base / "Transfers"
    transfers_dir.mkdir(exist_ok=True)

    cfg_path = transfers_dir / "transfer_config.json"
    if not cfg_path.exists():
        return {
            "active_transfer_id": None,
            "history": [],
            "ppl_to_transfer": {} #sqlit3
        }
    return json.loads(cfg_path.read_text(encoding="utf-8"))
# ---------------------------


# ---------------------------
# TransferContext
# ---------------------------
[docs] class TransferContext: """Runtime context for remapping paths and components on remote.""" def __init__(self): settings = get_shared_data() transfers_dir = Path(settings["data_path"]).resolve() / "Transfers" self.transfers_dir = transfers_dir self._cfg = _load_transfer_config() def _load_transfer_meta(self, transfer_id: str) -> dict: meta_path = self.transfers_dir / transfer_id / "transfer.json" if not meta_path.exists(): return {} return json.loads(meta_path.read_text(encoding="utf-8"))
[docs] def map_cnfg(self, cnfg): pplid = cnfg['pplid'] def remap(d): if isinstance(d, dict): for k, v in d.items(): if "loc" in k and isinstance(v, str): # Map LOC via transfer context d[k] = self.map_loc(v, pplid=pplid) elif 'src' in k and isinstance(v, str): # Map file paths via transfer context d[k] = self.map_src(v) else: remap(v) elif isinstance(d, list): for item in d: remap(item) remap(cnfg) return cnfg
[docs] def map_src(self, src: str, pplid: str) -> str: src = Path(src).as_posix() transfer_id = self._cfg["ppl_to_transfer"].get(pplid) if not transfer_id: return src meta = self._load_transfer_meta(transfer_id) path_map = meta.get("path_map", {}) for src, dst in path_map.items(): dst = self.transfers_dir / transfer_id /"payload"/ dst if src.startswith(src): return src.replace(src, dst, 1) return src
[docs] def map_loc(self, loc: str, pplid: str) -> str: transfer_id = self._cfg["ppl_to_transfer"].get(pplid) if not transfer_id: return loc meta = self._load_transfer_meta(transfer_id) loc_map = meta.get("loc_map", {}) return loc_map.get(loc, loc)