""" Compute hierarchical precision, recall, and F_beta for a single instance. Returns: Tuple[float, float, float]: (hierarchical precision, hierarchical recall, F_beta score) """ # Copyright 2025 Daniel Duckworth # Licensed under the Apache License, Version 2.0 from typing import Iterable, List, Tuple, Optional, Dict, Any def _normalize(code: Optional[str]) -> Optional[str]: """ Normalize an ISCO-08 code to a digit string of length 1..4. Returns None if the input is empty/invalid. Preserves leading zeros if they were present in the original string. """ if code is None: return None s = str(code).strip() # If it's purely digits already, keep as-is to preserve leading zeros if s.isdigit(): if 1 <= len(s) <= 4: return s return None # Otherwise strip non-digits while preserving any leading 0s present digits = "".join(ch for ch in s if ch.isdigit()) if 1 <= len(digits) <= 4: return digits return None def ancestors(code: Optional[str]) -> List[str]: """ Ancestor-closure (excluding the artificial root): all non-empty prefixes. For '2211' -> ['2','22','221','2211']. """ norm = _normalize(code) if norm is None: return [] return [norm[:k] for k in range(1, len(norm) + 1)] def hp_hr_hfbeta( true_code: Optional[str], pred_code: Optional[str], beta: float = 1.0 ) -> Tuple[float, float, float]: """ Per-instance hierarchical precision, recall, and F_beta. """ C = set(ancestors(true_code)) Cp = set(ancestors(pred_code)) if not C or not Cp: return 0.0, 0.0, 0.0 m = len(C & Cp) hp = m / len(Cp) hr = m / len(C) if hp == 0.0 and hr == 0.0: return 0.0, 0.0, 0.0 b2 = beta * beta hf = (1.0 + b2) * hp * hr / (b2 * hp + hr) return hp, hr, hf def hierarchical_scores( y_true: Iterable[Optional[str]], y_pred: Iterable[Optional[str]], beta: float = 1.0, average: str = "both", # "micro", "macro", or "both" return_per_instance: bool = False, ) -> Dict[str, Any]: """ Compute micro/macro aggregated hierarchical P/R/F_beta. """ y_true = list(y_true) y_pred = list(y_pred) if len(y_true) != len(y_pred): raise ValueError("y_true and y_pred must have the same length") inst_hp, inst_hr, inst_hf = [], [], [] per_instance = [] M = 0 # total intersection P = 0 # total predicted path length T = 0 # total true path length for g, p in zip(y_true, y_pred): C = set(ancestors(g)) Cp = set(ancestors(p)) if C and Cp: m = len(C & Cp) hp = m / len(Cp) hr = m / len(C) if hp == 0.0 and hr == 0.0: hf = 0.0 else: b2 = beta * beta hf = (1.0 + b2) * hp * hr / (b2 * hp + hr) inst_hp.append(hp) inst_hr.append(hr) inst_hf.append(hf) M += m P += len(Cp) T += len(C) else: hp = hr = hf = 0.0 inst_hp.append(hp) inst_hr.append(hr) inst_hf.append(hf) if return_per_instance: per_instance.append( { "hP": hp, "hR": hr, "hF_beta": hf, } ) out: Dict[str, Any] = {} if average in ("macro", "both"): macro_hp = sum(inst_hp) / len(inst_hp) if inst_hp else 0.0 macro_hr = sum(inst_hr) / len(inst_hr) if inst_hr else 0.0 macro_hf_mean = sum(inst_hf) / len(inst_hf) if inst_hf else 0.0 b2 = beta * beta macro_hf_from_pr = ( (1.0 + b2) * macro_hp * macro_hr / (b2 * macro_hp + macro_hr) if (macro_hp + macro_hr) > 0 else 0.0 ) out.update( { "macro_hP": macro_hp, "macro_hR": macro_hr, "macro_hF_beta_mean": macro_hf_mean, "macro_hF_beta_from_macroPR": macro_hf_from_pr, } ) if average in ("micro", "both"): micro_hp = (M / P) if P > 0 else 0.0 micro_hr = (M / T) if T > 0 else 0.0 b2 = beta * beta micro_hf = ( (1.0 + b2) * micro_hp * micro_hr / (b2 * micro_hp + micro_hr) if (micro_hp + micro_hr) > 0 else 0.0 ) out.update( { "micro_hP": micro_hp, "micro_hR": micro_hr, "micro_hF_beta": micro_hf, } ) if return_per_instance: out["per_instance"] = per_instance return out