danieldux's picture
Add application file
c569318
"""
Compute hierarchical precision, recall, and F_beta for a single instance.
Returns: Tuple[float, float, float]: (hierarchical precision, hierarchical recall, F_beta score)
"""
# Copyright 2025 Daniel Duckworth
# Licensed under the Apache License, Version 2.0
from typing import Iterable, List, Tuple, Optional, Dict, Any
def _normalize(code: Optional[str]) -> Optional[str]:
"""
Normalize an ISCO-08 code to a digit string of length 1..4.
Returns None if the input is empty/invalid.
Preserves leading zeros if they were present in the original string.
"""
if code is None:
return None
s = str(code).strip()
# If it's purely digits already, keep as-is to preserve leading zeros
if s.isdigit():
if 1 <= len(s) <= 4:
return s
return None
# Otherwise strip non-digits while preserving any leading 0s present
digits = "".join(ch for ch in s if ch.isdigit())
if 1 <= len(digits) <= 4:
return digits
return None
def ancestors(code: Optional[str]) -> List[str]:
"""
Ancestor-closure (excluding the artificial root): all non-empty prefixes.
For '2211' -> ['2','22','221','2211'].
"""
norm = _normalize(code)
if norm is None:
return []
return [norm[:k] for k in range(1, len(norm) + 1)]
def hp_hr_hfbeta(
true_code: Optional[str], pred_code: Optional[str], beta: float = 1.0
) -> Tuple[float, float, float]:
"""
Per-instance hierarchical precision, recall, and F_beta.
"""
C = set(ancestors(true_code))
Cp = set(ancestors(pred_code))
if not C or not Cp:
return 0.0, 0.0, 0.0
m = len(C & Cp)
hp = m / len(Cp)
hr = m / len(C)
if hp == 0.0 and hr == 0.0:
return 0.0, 0.0, 0.0
b2 = beta * beta
hf = (1.0 + b2) * hp * hr / (b2 * hp + hr)
return hp, hr, hf
def hierarchical_scores(
y_true: Iterable[Optional[str]],
y_pred: Iterable[Optional[str]],
beta: float = 1.0,
average: str = "both", # "micro", "macro", or "both"
return_per_instance: bool = False,
) -> Dict[str, Any]:
"""
Compute micro/macro aggregated hierarchical P/R/F_beta.
"""
y_true = list(y_true)
y_pred = list(y_pred)
if len(y_true) != len(y_pred):
raise ValueError("y_true and y_pred must have the same length")
inst_hp, inst_hr, inst_hf = [], [], []
per_instance = []
M = 0 # total intersection
P = 0 # total predicted path length
T = 0 # total true path length
for g, p in zip(y_true, y_pred):
C = set(ancestors(g))
Cp = set(ancestors(p))
if C and Cp:
m = len(C & Cp)
hp = m / len(Cp)
hr = m / len(C)
if hp == 0.0 and hr == 0.0:
hf = 0.0
else:
b2 = beta * beta
hf = (1.0 + b2) * hp * hr / (b2 * hp + hr)
inst_hp.append(hp)
inst_hr.append(hr)
inst_hf.append(hf)
M += m
P += len(Cp)
T += len(C)
else:
hp = hr = hf = 0.0
inst_hp.append(hp)
inst_hr.append(hr)
inst_hf.append(hf)
if return_per_instance:
per_instance.append(
{
"hP": hp,
"hR": hr,
"hF_beta": hf,
}
)
out: Dict[str, Any] = {}
if average in ("macro", "both"):
macro_hp = sum(inst_hp) / len(inst_hp) if inst_hp else 0.0
macro_hr = sum(inst_hr) / len(inst_hr) if inst_hr else 0.0
macro_hf_mean = sum(inst_hf) / len(inst_hf) if inst_hf else 0.0
b2 = beta * beta
macro_hf_from_pr = (
(1.0 + b2) * macro_hp * macro_hr / (b2 * macro_hp + macro_hr)
if (macro_hp + macro_hr) > 0
else 0.0
)
out.update(
{
"macro_hP": macro_hp,
"macro_hR": macro_hr,
"macro_hF_beta_mean": macro_hf_mean,
"macro_hF_beta_from_macroPR": macro_hf_from_pr,
}
)
if average in ("micro", "both"):
micro_hp = (M / P) if P > 0 else 0.0
micro_hr = (M / T) if T > 0 else 0.0
b2 = beta * beta
micro_hf = (
(1.0 + b2) * micro_hp * micro_hr / (b2 * micro_hp + micro_hr)
if (micro_hp + micro_hr) > 0
else 0.0
)
out.update(
{
"micro_hP": micro_hp,
"micro_hR": micro_hr,
"micro_hF_beta": micro_hf,
}
)
if return_per_instance:
out["per_instance"] = per_instance
return out