import gradio as gr from huggingface_hub import HfApi import os import re import json import torch import random from typing import List, Dict, Union, Tuple from gliner import GLiNER from datasets import load_dataset # Available models for annotation AVAILABLE_MODELS = [ "BookingCare/gliner-multi-healthcare", "knowledgator/gliner-multitask-large-v0.5", "knowledgator/gliner-multitask-base-v0.5" ] # Dataset Viewer Classes and Functions class DynamicDataset: def __init__( self, data: List[Dict[str, Union[List[Union[int, str]], bool]]] ) -> None: self.data = data self.data_len = len(self.data) self.current = -1 for example in self.data: if not "validated" in example.keys(): example["validated"] = False def next_example(self): self.current += 1 if self.current > self.data_len-1: self.current = self.data_len -1 elif self.current < 0: self.current = 0 def previous_example(self): self.current -= 1 if self.current > self.data_len-1: self.current = self.data_len -1 elif self.current < 0: self.current = 0 def example_by_id(self, id): self.current = id if self.current > self.data_len-1: self.current = self.data_len -1 elif self.current < 0: self.current = 0 def validate(self): self.data[self.current]["validated"] = True def load_current_example(self): return self.data[self.current] def tokenize_text(text): """Tokenize the input text into a list of tokens.""" return re.findall(r'\w+(?:[-_]\w+)*|\S', text) def join_tokens(tokens): # Joining tokens with space, but handling special characters correctly text = "" for token in tokens: if token in {",", ".", "!", "?", ":", ";", "..."}: text = text.rstrip() + token else: text += " " + token return text.strip() def prepare_for_highlight(data): tokens = data["tokenized_text"] ner = data["ner"] highlighted_text = [] current_entity = None entity_tokens = [] normal_tokens = [] for idx, token in enumerate(tokens): # Check if the current token is the start of a new entity if current_entity is None or idx > current_entity[1]: if entity_tokens: highlighted_text.append((" ".join(entity_tokens), current_entity[2])) entity_tokens = [] current_entity = next((entity for entity in ner if entity[0] == idx), None) # If current token is part of an entity if current_entity and current_entity[0] <= idx <= current_entity[1]: if normal_tokens: highlighted_text.append((" ".join(normal_tokens), None)) normal_tokens = [] entity_tokens.append(token + " ") else: if entity_tokens: highlighted_text.append((" ".join(entity_tokens), current_entity[2])) entity_tokens = [] normal_tokens.append(token + " ") # Append any remaining tokens if entity_tokens: highlighted_text.append((" ".join(entity_tokens), current_entity[2])) if normal_tokens: highlighted_text.append((" ".join(normal_tokens), None)) # Clean up spaces before punctuation cleaned_highlighted_text = [] for text, label in highlighted_text: cleaned_text = re.sub(r'\s(?=[,\.!?…:;])', '', text) cleaned_highlighted_text.append((cleaned_text, label)) return cleaned_highlighted_text def extract_tokens_and_labels(data: List[Dict[str, Union[str, None]]]) -> Dict[str, Union[List[str], List[Tuple[int, int, str]]]]: tokens = [] ner = [] token_start_idx = 0 for entry in data: char = entry['token'] label = entry['class_or_confidence'] # Tokenize the current text chunk token_list = tokenize_text(char) # Append tokens to the main tokens list tokens.extend(token_list) if label: token_end_idx = token_start_idx + len(token_list) - 1 ner.append((token_start_idx, token_end_idx, label)) token_start_idx += len(token_list) return tokens, ner # Global variables for dataset viewer dynamic_dataset = None def update_example(data): global dynamic_dataset tokens, ner = extract_tokens_and_labels(data) dynamic_dataset.data[dynamic_dataset.current]["tokenized_text"] = tokens dynamic_dataset.data[dynamic_dataset.current]["ner"] = ner return prepare_for_highlight(dynamic_dataset.load_current_example()) def validate_example(): global dynamic_dataset dynamic_dataset.data[dynamic_dataset.current]["validated"] = True return [("The example was validated!", None)] def next_example(): global dynamic_dataset dynamic_dataset.next_example() return prepare_for_highlight(dynamic_dataset.load_current_example()), dynamic_dataset.current def previous_example(): global dynamic_dataset dynamic_dataset.previous_example() return prepare_for_highlight(dynamic_dataset.load_current_example()), dynamic_dataset.current def save_dataset(inp): global dynamic_dataset with open("data/annotated_data.json", "wt") as file: json.dump(dynamic_dataset.data, file) return [("The validated dataset was saved as data/annotated_data.json", None)] def load_dataset(): global dynamic_dataset try: with open("data/annotated_data.json", 'rt') as dataset: ANNOTATED_DATA = json.load(dataset) dynamic_dataset = DynamicDataset(ANNOTATED_DATA) max_value = len(dynamic_dataset.data) - 1 if dynamic_dataset.data else 0 return prepare_for_highlight(dynamic_dataset.load_current_example()), 0, max_value except Exception as e: return [("Error loading dataset: " + str(e), None)], 0, 0 # Original annotation functions def transform_data(data): tokens = tokenize_text(data['text']) spans = [] for entity in data['entities']: entity_tokens = tokenize_text(entity['word']) entity_length = len(entity_tokens) # Find the start and end indices of each entity in the tokenized text for i in range(len(tokens) - entity_length + 1): if tokens[i:i + entity_length] == entity_tokens: spans.append([i, i + entity_length - 1, entity['entity']]) break return {"tokenized_text": tokens, "ner": spans, "validated": False} def merge_entities(entities): if not entities: return [] merged = [] current = entities[0] for next_entity in entities[1:]: if next_entity['entity'] == current['entity'] and (next_entity['start'] == current['end'] + 1 or next_entity['start'] == current['end']): current['word'] += ' ' + next_entity['word'] current['end'] = next_entity['end'] else: merged.append(current) current = next_entity merged.append(current) return merged def annotate_text(model, text, labels: List[str], threshold: float, nested_ner: bool) -> Dict: labels = [label.strip() for label in labels] r = { "text": text, "entities": [ { "entity": entity["label"], "word": entity["text"], "start": entity["start"], "end": entity["end"], "score": 0, } for entity in model.predict_entities( text, labels, flat_ner=not nested_ner, threshold=threshold ) ], } r["entities"] = merge_entities(r["entities"]) return transform_data(r) class AutoAnnotator: def __init__( self, model: str = "knowledgator/gliner-multitask-large-v0.5", device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu') ) -> None: self.model = GLiNER.from_pretrained(model).to(device) self.annotated_data = [] self.stat = { "total": None, "current": -1 } def auto_annotate( self, data: List[str], labels: List[str], prompt: Union[str, List[str]] = None, threshold: float = 0.5, nested_ner: bool = False ) -> List[Dict]: self.stat["total"] = len(data) self.stat["current"] = -1 # Reset current progress for text in data: self.stat["current"] += 1 if isinstance(prompt, list): prompt_text = random.choice(prompt) else: prompt_text = prompt text = f"{prompt_text}\n{text}" if prompt_text else text annotation = annotate_text(self.model, text, labels, threshold, nested_ner) if not annotation["ner"]: # If no entities identified annotation = {"tokenized_text": tokenize_text(text), "ner": [], "validated": False} self.annotated_data.append(annotation) return self.annotated_data # Global variables annotator = None sentences = [] def process_uploaded_file(file_obj): if file_obj is None: return "Please upload a file first!" try: # Read the uploaded file with open(file_obj.name, 'r', encoding='utf-8') as f: global sentences sentences = [line.strip() for line in f if line.strip()] return f"Successfully loaded {len(sentences)} sentences from file!" except Exception as e: return f"Error reading file: {str(e)}" def annotate(model, labels, threshold, prompt): global annotator try: if not sentences: return "Please upload a file with text first!" labels = [label.strip() for label in labels.split(",")] annotator = AutoAnnotator(model) annotated_data = annotator.auto_annotate(sentences, labels, prompt, threshold) # Save annotated data os.makedirs("data", exist_ok=True) with open("data/annotated_data.json", "wt") as file: json.dump(annotated_data, file, ensure_ascii=False) # Upload to Hugging Face Hub api = HfApi() api.upload_file( path_or_fileobj="data/annotated_data.json", path_in_repo="annotated_data.json", repo_id="YOUR_USERNAME/YOUR_REPO_NAME", # Replace with your repo repo_type="dataset" ) return "Successfully annotated and saved to Hugging Face Hub!" except Exception as e: return f"Error during annotation: {str(e)}" def convert_hf_dataset_to_ner_format(dataset): """Convert Hugging Face dataset to NER format""" converted_data = [] for item in dataset: # Assuming the dataset has 'tokens' and 'ner_tags' fields # Adjust the field names based on your dataset structure if 'tokens' in item and 'ner_tags' in item: ner_spans = [] current_span = None for i, (token, tag) in enumerate(zip(item['tokens'], item['ner_tags'])): if tag != 'O': # Not Outside if current_span is None: current_span = [i, i, tag] elif tag == current_span[2]: current_span[1] = i else: ner_spans.append(current_span) current_span = [i, i, tag] elif current_span is not None: ner_spans.append(current_span) current_span = None if current_span is not None: ner_spans.append(current_span) converted_data.append({ "tokenized_text": item['tokens'], "ner": ner_spans, "validated": False }) return converted_data def load_from_huggingface(dataset_name: str, split: str = "train"): """Load dataset from Hugging Face Hub""" try: dataset = load_dataset(dataset_name, split=split) converted_data = convert_hf_dataset_to_ner_format(dataset) # Save the converted data os.makedirs("data", exist_ok=True) with open("data/annotated_data.json", "wt") as file: json.dump(converted_data, file, ensure_ascii=False) return f"Successfully loaded and converted dataset: {dataset_name}" except Exception as e: return f"Error loading dataset: {str(e)}" def load_from_local_file(file_path: str, file_format: str = "json"): """Load and convert data from local file in various formats""" try: if file_format == "json": with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) if isinstance(data, list): # If data is already in the correct format if all("tokenized_text" in item and "ner" in item for item in data): return data # Convert from other JSON formats converted_data = [] for item in data: if "tokens" in item and "ner_tags" in item: ner_spans = [] current_span = None for i, (token, tag) in enumerate(zip(item["tokens"], item["ner_tags"])): if tag != "O": if current_span is None: current_span = [i, i, tag] elif tag == current_span[2]: current_span[1] = i else: ner_spans.append(current_span) current_span = [i, i, tag] elif current_span is not None: ner_spans.append(current_span) current_span = None if current_span is not None: ner_spans.append(current_span) converted_data.append({ "tokenized_text": item["tokens"], "ner": ner_spans, "validated": False }) return converted_data else: raise ValueError("JSON file must contain a list of examples") elif file_format == "conll": converted_data = [] current_example = {"tokens": [], "ner_tags": []} with open(file_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line: if line.startswith("#"): continue parts = line.split() if len(parts) >= 2: token, tag = parts[0], parts[-1] current_example["tokens"].append(token) current_example["ner_tags"].append(tag) elif current_example["tokens"]: # Convert current example ner_spans = [] current_span = None for i, (token, tag) in enumerate(zip(current_example["tokens"], current_example["ner_tags"])): if tag != "O": if current_span is None: current_span = [i, i, tag] elif tag == current_span[2]: current_span[1] = i else: ner_spans.append(current_span) current_span = [i, i, tag] elif current_span is not None: ner_spans.append(current_span) current_span = None if current_span is not None: ner_spans.append(current_span) converted_data.append({ "tokenized_text": current_example["tokens"], "ner": ner_spans, "validated": False }) current_example = {"tokens": [], "ner_tags": []} # Handle last example if exists if current_example["tokens"]: ner_spans = [] current_span = None for i, (token, tag) in enumerate(zip(current_example["tokens"], current_example["ner_tags"])): if tag != "O": if current_span is None: current_span = [i, i, tag] elif tag == current_span[2]: current_span[1] = i else: ner_spans.append(current_span) current_span = [i, i, tag] elif current_span is not None: ner_spans.append(current_span) current_span = None if current_span is not None: ner_spans.append(current_span) converted_data.append({ "tokenized_text": current_example["tokens"], "ner": ner_spans, "validated": False }) return converted_data elif file_format == "txt": # Simple text file with one sentence per line converted_data = [] with open(file_path, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line: tokens = tokenize_text(line) converted_data.append({ "tokenized_text": tokens, "ner": [], "validated": False }) return converted_data else: raise ValueError(f"Unsupported file format: {file_format}") except Exception as e: raise Exception(f"Error loading file: {str(e)}") def process_local_file(file_obj, file_format): """Process uploaded local file""" if file_obj is None: return "Please upload a file first!" try: # Load and convert the data data = load_from_local_file(file_obj.name, file_format) # Save the converted data os.makedirs("data", exist_ok=True) with open("data/annotated_data.json", "wt") as file: json.dump(data, file, ensure_ascii=False) return f"Successfully loaded and converted {len(data)} examples from {file_format} file!" except Exception as e: return f"Error processing file: {str(e)}" # Create the main interface with tabs with gr.Blocks() as demo: gr.Markdown("# NER Annotation Tool") with gr.Tabs(): with gr.TabItem("Auto Annotation"): with gr.Row(): with gr.Column(): file_uploader = gr.File(label="Upload text file (one sentence per line)") upload_status = gr.Textbox(label="Upload Status") file_uploader.change(fn=process_uploaded_file, inputs=[file_uploader], outputs=[upload_status]) with gr.Column(): model = gr.Dropdown( label="Choose the model for annotation", choices=AVAILABLE_MODELS, value=AVAILABLE_MODELS[0] ) labels = gr.Textbox( label="Labels", placeholder="Enter comma-separated labels (e.g., PERSON,ORG,LOC)", scale=2 ) threshold = gr.Slider( 0, 1, value=0.3, step=0.01, label="Threshold", info="Lower threshold increases entity predictions" ) prompt = gr.Textbox( label="Prompt", placeholder="Enter your annotation prompt (optional)", scale=2 ) annotate_btn = gr.Button("Annotate Data") output_info = gr.Textbox(label="Processing Status") annotate_btn.click( fn=annotate, inputs=[model, labels, threshold, prompt], outputs=[output_info] ) with gr.TabItem("Dataset Viewer"): with gr.Row(): with gr.Column(): with gr.Row(): load_local_btn = gr.Button("Load Local Dataset") load_hf_btn = gr.Button("Load from Hugging Face") local_file = gr.File(label="Upload Local Dataset", visible=False) file_format = gr.Dropdown( choices=["json", "conll", "txt"], value="json", label="File Format", visible=False ) local_status = gr.Textbox(label="Local File Status", visible=False) dataset_name = gr.Textbox( label="Hugging Face Dataset Name", placeholder="Enter dataset name (e.g., conll2003)", visible=False ) dataset_split = gr.Dropdown( choices=["train", "validation", "test"], value="train", label="Dataset Split", visible=False ) bar = gr.Slider(minimum=0, maximum=1, step=1, label="Progress", interactive=False) with gr.Row(): previous_btn = gr.Button("Previous example") apply_btn = gr.Button("Apply changes") next_btn = gr.Button("Next example") validate_btn = gr.Button("Validate") save_btn = gr.Button("Save validated dataset") inp_box = gr.HighlightedText(value=None, interactive=True) def toggle_local_inputs(): return { local_file: gr.update(visible=True), file_format: gr.update(visible=True), local_status: gr.update(visible=True), dataset_name: gr.update(visible=False), dataset_split: gr.update(visible=False) } def toggle_hf_inputs(): return { local_file: gr.update(visible=False), file_format: gr.update(visible=False), local_status: gr.update(visible=False), dataset_name: gr.update(visible=True), dataset_split: gr.update(visible=True) } load_local_btn.click( fn=toggle_local_inputs, inputs=None, outputs=[local_file, file_format, local_status, dataset_name, dataset_split] ) load_hf_btn.click( fn=toggle_hf_inputs, inputs=None, outputs=[local_file, file_format, local_status, dataset_name, dataset_split] ) def process_and_load_local(file_obj, format): status = process_local_file(file_obj, format) if "Successfully" in status: return load_dataset() return [status], 0, 0 local_file.change( fn=process_and_load_local, inputs=[local_file, file_format], outputs=[inp_box, bar] ) def load_hf_dataset(name, split): status = load_from_huggingface(name, split) if "Successfully" in status: return load_dataset() return [status], 0, 0 load_hf_btn.click( fn=load_hf_dataset, inputs=[dataset_name, dataset_split], outputs=[inp_box, bar] ) apply_btn.click(fn=update_example, inputs=inp_box, outputs=inp_box) save_btn.click(fn=save_dataset, inputs=inp_box, outputs=inp_box) validate_btn.click(fn=validate_example, inputs=None, outputs=inp_box) next_btn.click(fn=next_example, inputs=None, outputs=[inp_box, bar]) previous_btn.click(fn=previous_example, inputs=None, outputs=[inp_box, bar]) demo.launch()