Spaces:
Runtime error
Runtime error
| """ | |
| Compute hierarchical precision, recall, and F_beta for a single instance. | |
| Returns: Tuple[float, float, float]: (hierarchical precision, hierarchical recall, F_beta score) | |
| """ | |
| # Copyright 2025 Daniel Duckworth | |
| # Licensed under the Apache License, Version 2.0 | |
| from typing import Iterable, List, Tuple, Optional, Dict, Any | |
| def _normalize(code: Optional[str]) -> Optional[str]: | |
| """ | |
| Normalize an ISCO-08 code to a digit string of length 1..4. | |
| Returns None if the input is empty/invalid. | |
| Preserves leading zeros if they were present in the original string. | |
| """ | |
| if code is None: | |
| return None | |
| s = str(code).strip() | |
| # If it's purely digits already, keep as-is to preserve leading zeros | |
| if s.isdigit(): | |
| if 1 <= len(s) <= 4: | |
| return s | |
| return None | |
| # Otherwise strip non-digits while preserving any leading 0s present | |
| digits = "".join(ch for ch in s if ch.isdigit()) | |
| if 1 <= len(digits) <= 4: | |
| return digits | |
| return None | |
| def ancestors(code: Optional[str]) -> List[str]: | |
| """ | |
| Ancestor-closure (excluding the artificial root): all non-empty prefixes. | |
| For '2211' -> ['2','22','221','2211']. | |
| """ | |
| norm = _normalize(code) | |
| if norm is None: | |
| return [] | |
| return [norm[:k] for k in range(1, len(norm) + 1)] | |
| def hp_hr_hfbeta( | |
| true_code: Optional[str], pred_code: Optional[str], beta: float = 1.0 | |
| ) -> Tuple[float, float, float]: | |
| """ | |
| Per-instance hierarchical precision, recall, and F_beta. | |
| """ | |
| C = set(ancestors(true_code)) | |
| Cp = set(ancestors(pred_code)) | |
| if not C or not Cp: | |
| return 0.0, 0.0, 0.0 | |
| m = len(C & Cp) | |
| hp = m / len(Cp) | |
| hr = m / len(C) | |
| if hp == 0.0 and hr == 0.0: | |
| return 0.0, 0.0, 0.0 | |
| b2 = beta * beta | |
| hf = (1.0 + b2) * hp * hr / (b2 * hp + hr) | |
| return hp, hr, hf | |
| def hierarchical_scores( | |
| y_true: Iterable[Optional[str]], | |
| y_pred: Iterable[Optional[str]], | |
| beta: float = 1.0, | |
| average: str = "both", # "micro", "macro", or "both" | |
| return_per_instance: bool = False, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Compute micro/macro aggregated hierarchical P/R/F_beta. | |
| """ | |
| y_true = list(y_true) | |
| y_pred = list(y_pred) | |
| if len(y_true) != len(y_pred): | |
| raise ValueError("y_true and y_pred must have the same length") | |
| inst_hp, inst_hr, inst_hf = [], [], [] | |
| per_instance = [] | |
| M = 0 # total intersection | |
| P = 0 # total predicted path length | |
| T = 0 # total true path length | |
| for g, p in zip(y_true, y_pred): | |
| C = set(ancestors(g)) | |
| Cp = set(ancestors(p)) | |
| if C and Cp: | |
| m = len(C & Cp) | |
| hp = m / len(Cp) | |
| hr = m / len(C) | |
| if hp == 0.0 and hr == 0.0: | |
| hf = 0.0 | |
| else: | |
| b2 = beta * beta | |
| hf = (1.0 + b2) * hp * hr / (b2 * hp + hr) | |
| inst_hp.append(hp) | |
| inst_hr.append(hr) | |
| inst_hf.append(hf) | |
| M += m | |
| P += len(Cp) | |
| T += len(C) | |
| else: | |
| hp = hr = hf = 0.0 | |
| inst_hp.append(hp) | |
| inst_hr.append(hr) | |
| inst_hf.append(hf) | |
| if return_per_instance: | |
| per_instance.append( | |
| { | |
| "hP": hp, | |
| "hR": hr, | |
| "hF_beta": hf, | |
| } | |
| ) | |
| out: Dict[str, Any] = {} | |
| if average in ("macro", "both"): | |
| macro_hp = sum(inst_hp) / len(inst_hp) if inst_hp else 0.0 | |
| macro_hr = sum(inst_hr) / len(inst_hr) if inst_hr else 0.0 | |
| macro_hf_mean = sum(inst_hf) / len(inst_hf) if inst_hf else 0.0 | |
| b2 = beta * beta | |
| macro_hf_from_pr = ( | |
| (1.0 + b2) * macro_hp * macro_hr / (b2 * macro_hp + macro_hr) | |
| if (macro_hp + macro_hr) > 0 | |
| else 0.0 | |
| ) | |
| out.update( | |
| { | |
| "macro_hP": macro_hp, | |
| "macro_hR": macro_hr, | |
| "macro_hF_beta_mean": macro_hf_mean, | |
| "macro_hF_beta_from_macroPR": macro_hf_from_pr, | |
| } | |
| ) | |
| if average in ("micro", "both"): | |
| micro_hp = (M / P) if P > 0 else 0.0 | |
| micro_hr = (M / T) if T > 0 else 0.0 | |
| b2 = beta * beta | |
| micro_hf = ( | |
| (1.0 + b2) * micro_hp * micro_hr / (b2 * micro_hp + micro_hr) | |
| if (micro_hp + micro_hr) > 0 | |
| else 0.0 | |
| ) | |
| out.update( | |
| { | |
| "micro_hP": micro_hp, | |
| "micro_hR": micro_hr, | |
| "micro_hF_beta": micro_hf, | |
| } | |
| ) | |
| if return_per_instance: | |
| out["per_instance"] = per_instance | |
| return out | |