import gradio as gr import json import pandas as pd import os import zipfile # Asume que estos módulos existen en tu proyecto from error_handler import ErrorHandler from ner_tags import ALL_NER_TAGS from ocr_processor import draw_boxes # --- Configuración de Directorios --- DATASET_BASE_DIR = "dataset" JSON_FILENAME = "anotacion_factura.json" TEMP_ZIP_FILENAME = "dataset.zip" # ---------------------------------------------------------------------- # 💡 FUNCIÓN AUXILIAR: Filtrado de Tags Utilizados # ---------------------------------------------------------------------- def _get_available_tags(tokens_data: list, current_index: int) -> list: """ Determina qué etiquetas 'B-' ya están en uso por otros tokens. Retorna la lista de ALL_NER_TAGS filtrada, excluyendo las opciones 'B-' que ya aparecen en tokens_data para otros índices (principio de no repetición). """ used_b_tags = set() # 1. Identificar qué etiquetas 'B-' ya están en uso en el documento for i, item in enumerate(tokens_data): # Excluimos el token actual para permitir la re-edición del tag existente if i == current_index: continue tag = item['ner_tag'] # Si el tag es un tag 'B-', lo añadimos al conjunto de tags usados if tag.startswith('B-'): used_b_tags.add(tag) # 2. Filtrar ALL_NER_TAGS filtered_tags = [] current_tag = tokens_data[current_index]['ner_tag'] if 0 <= current_index < len(tokens_data) else None for tag in ALL_NER_TAGS: # La etiqueta 'O' y las etiquetas 'I-' siempre están disponibles. if tag == 'O' or tag.startswith('I-'): filtered_tags.append(tag) # Una etiqueta 'B-' solo se añade si NO está en el conjunto de tags ya usados elif tag.startswith('B-') and tag not in used_b_tags: filtered_tags.append(tag) # 3. Asegurar que el tag actual se pueda seleccionar (si ya estaba aplicado y fue filtrado) if current_tag and current_tag.startswith('B-') and current_tag not in filtered_tags: filtered_tags.append(current_tag) filtered_tags.sort() # Opcional: Reordenar si se añade al final return filtered_tags # ---------------------------------------------------------------------- # --- Funciones de Configuración y UI --- # ---------------------------------------------------------------------- def setup_label_components(): """ Configura y retorna los componentes de edición de etiquetas. """ # 1. Dataframe NO INTERACTIVO (Solo para visualización y selección de fila) df_label_input = gr.Dataframe( headers=["token", "ner_tag"], col_count=(2, "fixed"), datatype=["str", "str"], label="Tabla de Tokens y Etiquetas (Haga clic en la FILA para seleccionar y editar abajo)", interactive=False, wrap=True, value=[] ) # 2. Componentes de Edición Externos tb_token_editor = gr.Textbox( label="Token Seleccionado", interactive=True, visible=False ) dd_tag_selector = gr.Dropdown( # Inicialmente usa la lista completa; las opciones se actualizarán dinámicamente choices=ALL_NER_TAGS, label="Etiqueta NER Seleccionada", value="O", interactive=True, visible=False ) # 3. Botones y Salida tb_new_token_text = gr.Textbox( label="Nuevo Texto a Agregar", interactive=True, visible=False, placeholder="Escriba aquí el texto del área seleccionada..." ) btn_add_new_token = gr.Button( "📝 Agregar Nuevo Token y Etiqueta", variant="call-to-action", visible=False ) btn_save_annotation = gr.Button("3. Guardar Anotación Actual (JSON)", variant="primary") btn_export = gr.Button("4. Descargar Dataset Completo (.zip)", variant="secondary") file_output = gr.File(label="Archivo ZIP del Dataset (Imágenes + Anotaciones)") # 5. Nuevo Estado Temporal state_new_bbox = gr.State(value=None) return ( df_label_input, tb_token_editor, dd_tag_selector, btn_save_annotation, btn_export, file_output, tb_new_token_text, btn_add_new_token, state_new_bbox ) # ---------------------------------------------------------------------- # --- FUNCIÓN: Obtener la fila seleccionada y mostrar editores (MODIFICADA) --- # ---------------------------------------------------------------------- def display_selected_row(tokens_data: list, highlight_index: int): """ Muestra el token y la etiqueta de la fila seleccionada en los editores externos. ACTUALIZA las opciones del Dropdown para filtrar las etiquetas 'B-' ya utilizadas. """ if highlight_index >= 0 and highlight_index < len(tokens_data): token = tokens_data[highlight_index]['token'] ner_tag = tokens_data[highlight_index]['ner_tag'] # 💡 Lógica de Filtrado: Obtener solo los tags disponibles available_tags = _get_available_tags(tokens_data, highlight_index) # Muestra los componentes, actualizando las opciones del Dropdown return ( gr.update(value=token, visible=True), # tb_token_editor gr.update(value=ner_tag, choices=available_tags, visible=True), # dd_tag_selector highlight_index ) # Si no hay selección válida, oculta los componentes # Y restablece las opciones del Dropdown a la lista completa return gr.update(value="", visible=False), gr.update(value="O", choices=ALL_NER_TAGS, visible=False), -1 # ---------------------------------------------------------------------- # --- FUNCIÓN: Actualizar el Dataframe y el estado de los tokens (VALIDACIÓN) --- # ---------------------------------------------------------------------- def update_dataframe_and_state(tokens_data: list, df_data_current, new_tag: str, new_token: str, row_index: int, update_type: str): """ Función unificada para actualizar la lista de tokens (estado) y el Dataframe (UI). Incluye validación para forzar la etiqueta a 'O' si no es válida. """ # Asegurar que se trabaja con una lista de listas if isinstance(df_data_current, pd.DataFrame): df_list = df_data_current.values.tolist() else: df_list = df_data_current if row_index < 0 or row_index >= len(df_list): return tokens_data, df_list if update_type == 'tag' and new_tag is not None: # 💡 VALIDACIÓN CRÍTICA: Forzar el tag a 'O' si no está en la lista maestra if new_tag not in ALL_NER_TAGS: print(f"ADVERTENCIA: Tag no válido detectado ('{new_tag}') en índice {row_index}. Forzando a 'O'.") new_tag = 'O' df_list[row_index][1] = new_tag tokens_data[row_index]['ner_tag'] = new_tag elif update_type == 'token' and new_token is not None: df_list[row_index][0] = new_token tokens_data[row_index]['token'] = new_token return tokens_data, df_list # ---------------------------------------------------------------------- # --- Función de Sincronización de UI/Estados (Sin Cambios) --- # ---------------------------------------------------------------------- def update_ui(image_orig, tokens_data: list, df_labels: list, highlight_index: int): """Actualiza la imagen resaltada basándose en el estado interno de los tokens.""" highlighted_image = draw_boxes(image_orig, tokens_data, highlight_index) return tokens_data, highlighted_image # ---------------------------------------------------------------------- # --- FUNCIÓN: Guardar Anotación Actual (JSON) (Mínima Modificación) --- # ---------------------------------------------------------------------- def save_current_annotation_to_json(image_orig, tokens_data: list, image_filename: str): """ Guarda la anotación del documento actual en el archivo JSON. """ if not tokens_data or not image_filename: gr.Warning("Error: No hay tokens o la imagen no fue procesada.") return None, "Guardado fallido: No hay datos de imagen o tokens." # 1. Asegurarse de que la carpeta 'dataset' exista os.makedirs(DATASET_BASE_DIR, exist_ok=True) temp_file = os.path.join(DATASET_BASE_DIR, JSON_FILENAME) # 2. Preparar el nuevo elemento a agregar W, H = image_orig.size new_annotations = [] for item in tokens_data: # Asegurar que el tag que se guarda es válido tag_to_save = item['ner_tag'] if item['ner_tag'] in ALL_NER_TAGS else 'O' new_annotations.append({ 'token': item['token'], 'bbox_normalized': [int(b) for b in item['bbox_norm']], 'ner_tag': tag_to_save }) new_document_entry = { 'image': { 'size': [W, H], 'name': image_filename }, 'annotations': new_annotations } # 3. Leer el archivo existente existing_document_list = [] try: if os.path.exists(temp_file): with open(temp_file, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, list): existing_document_list = data except Exception: pass # 4. Consolidar: Agregar o Sobrescribir el documento actual is_new = True for i, doc in enumerate(existing_document_list): if doc.get('image', {}).get('name') == image_filename: existing_document_list[i] = new_document_entry is_new = False break if is_new: existing_document_list.append(new_document_entry) # 5. Escribir la lista completa try: with open(temp_file, 'w', encoding='utf-8') as f: json.dump(existing_document_list, f, ensure_ascii=False, indent=4) action_message = "actualizados" if not is_new else "agregados" total_docs = len(existing_document_list) msg = f"Anotación '{image_filename}' {action_message} al JSON. Documentos totales: {total_docs}." gr.Info(f"✅ {msg}") return None, msg except Exception as e: error_msg = ErrorHandler.handle_export_error(e) gr.Warning(f"Error al escribir el archivo: {error_msg}") return None, f"Error en guardado: {error_msg}" # ---------------------------------------------------------------------- # --- FUNCIÓN PRINCIPAL DE EXPORTACIÓN: ZIP (Sin Cambios) --- # ---------------------------------------------------------------------- def export_and_zip_dataset(image_orig, tokens_data: list, image_filename: str): """ Asegura que la anotación actual se guarde y luego comprime toda la carpeta 'dataset/' en un archivo ZIP. """ # Paso 1: Asegurar que la anotación actual se guarde save_current_annotation_to_json(image_orig, tokens_data, image_filename) # Paso 2: Obtener el total de documentos para el mensaje json_path = os.path.join(DATASET_BASE_DIR, JSON_FILENAME) total_docs = 0 if os.path.exists(json_path): try: with open(json_path, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, list): total_docs = len(data) except Exception: pass if total_docs == 0: gr.Warning("Error: No hay datos guardados para generar el ZIP.") return None, "Error: No se puede generar el ZIP. El archivo JSON está vacío o no existe." # Paso 3: Crear el archivo ZIP zip_path = "/tmp/dataset.zip" try: if os.path.exists(zip_path): os.remove(zip_path) with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(DATASET_BASE_DIR): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, DATASET_BASE_DIR) zipf.write(file_path, arcname) gr.Info(f"✅ Dataset listo para descargar. Contiene {total_docs} documentos.") return zip_path, f"Dataset exportado con éxito. Descargue el archivo ZIP. (Total docs: {total_docs})" except Exception as e: error_msg = ErrorHandler.handle_export_error(e) gr.Warning(f"Error al comprimir el ZIP: {error_msg}") return None, f"Error al comprimir el dataset: {error_msg}"