Spaces:
Running
Running
| import gradio as gr | |
| from huggingface_hub import HfApi | |
| import os | |
| import re | |
| import json | |
| import torch | |
| import random | |
| from typing import List, Dict, Union, Tuple | |
| from gliner import GLiNER | |
| from datasets import load_dataset | |
| # Available models for annotation | |
| AVAILABLE_MODELS = [ | |
| "BookingCare/gliner-multi-healthcare", | |
| "knowledgator/gliner-multitask-large-v0.5", | |
| "knowledgator/gliner-multitask-base-v0.5" | |
| ] | |
| # Dataset Viewer Classes and Functions | |
| class DynamicDataset: | |
| def __init__( | |
| self, data: List[Dict[str, Union[List[Union[int, str]], bool]]] | |
| ) -> None: | |
| self.data = data | |
| self.data_len = len(self.data) | |
| self.current = -1 | |
| for example in self.data: | |
| if not "validated" in example.keys(): | |
| example["validated"] = False | |
| def next_example(self): | |
| self.current += 1 | |
| if self.current > self.data_len-1: | |
| self.current = self.data_len -1 | |
| elif self.current < 0: | |
| self.current = 0 | |
| def previous_example(self): | |
| self.current -= 1 | |
| if self.current > self.data_len-1: | |
| self.current = self.data_len -1 | |
| elif self.current < 0: | |
| self.current = 0 | |
| def example_by_id(self, id): | |
| self.current = id | |
| if self.current > self.data_len-1: | |
| self.current = self.data_len -1 | |
| elif self.current < 0: | |
| self.current = 0 | |
| def validate(self): | |
| self.data[self.current]["validated"] = True | |
| def load_current_example(self): | |
| return self.data[self.current] | |
| def tokenize_text(text): | |
| """Tokenize the input text into a list of tokens.""" | |
| return re.findall(r'\w+(?:[-_]\w+)*|\S', text) | |
| def join_tokens(tokens): | |
| # Joining tokens with space, but handling special characters correctly | |
| text = "" | |
| for token in tokens: | |
| if token in {",", ".", "!", "?", ":", ";", "..."}: | |
| text = text.rstrip() + token | |
| else: | |
| text += " " + token | |
| return text.strip() | |
| def prepare_for_highlight(data): | |
| tokens = data["tokenized_text"] | |
| ner = data["ner"] | |
| highlighted_text = [] | |
| current_entity = None | |
| entity_tokens = [] | |
| normal_tokens = [] | |
| for idx, token in enumerate(tokens): | |
| # Check if the current token is the start of a new entity | |
| if current_entity is None or idx > current_entity[1]: | |
| if entity_tokens: | |
| highlighted_text.append((" ".join(entity_tokens), current_entity[2])) | |
| entity_tokens = [] | |
| current_entity = next((entity for entity in ner if entity[0] == idx), None) | |
| # If current token is part of an entity | |
| if current_entity and current_entity[0] <= idx <= current_entity[1]: | |
| if normal_tokens: | |
| highlighted_text.append((" ".join(normal_tokens), None)) | |
| normal_tokens = [] | |
| entity_tokens.append(token + " ") | |
| else: | |
| if entity_tokens: | |
| highlighted_text.append((" ".join(entity_tokens), current_entity[2])) | |
| entity_tokens = [] | |
| normal_tokens.append(token + " ") | |
| # Append any remaining tokens | |
| if entity_tokens: | |
| highlighted_text.append((" ".join(entity_tokens), current_entity[2])) | |
| if normal_tokens: | |
| highlighted_text.append((" ".join(normal_tokens), None)) | |
| # Clean up spaces before punctuation | |
| cleaned_highlighted_text = [] | |
| for text, label in highlighted_text: | |
| cleaned_text = re.sub(r'\s(?=[,\.!?…:;])', '', text) | |
| cleaned_highlighted_text.append((cleaned_text, label)) | |
| return cleaned_highlighted_text | |
| def extract_tokens_and_labels(data: List[Dict[str, Union[str, None]]]) -> Dict[str, Union[List[str], List[Tuple[int, int, str]]]]: | |
| tokens = [] | |
| ner = [] | |
| token_start_idx = 0 | |
| for entry in data: | |
| char = entry['token'] | |
| label = entry['class_or_confidence'] | |
| # Tokenize the current text chunk | |
| token_list = tokenize_text(char) | |
| # Append tokens to the main tokens list | |
| tokens.extend(token_list) | |
| if label: | |
| token_end_idx = token_start_idx + len(token_list) - 1 | |
| ner.append((token_start_idx, token_end_idx, label)) | |
| token_start_idx += len(token_list) | |
| return tokens, ner | |
| # Global variables for dataset viewer | |
| dynamic_dataset = None | |
| def update_example(data): | |
| global dynamic_dataset | |
| tokens, ner = extract_tokens_and_labels(data) | |
| dynamic_dataset.data[dynamic_dataset.current]["tokenized_text"] = tokens | |
| dynamic_dataset.data[dynamic_dataset.current]["ner"] = ner | |
| return prepare_for_highlight(dynamic_dataset.load_current_example()) | |
| def validate_example(): | |
| global dynamic_dataset | |
| dynamic_dataset.data[dynamic_dataset.current]["validated"] = True | |
| return [("The example was validated!", None)] | |
| def next_example(): | |
| global dynamic_dataset | |
| dynamic_dataset.next_example() | |
| return prepare_for_highlight(dynamic_dataset.load_current_example()), dynamic_dataset.current | |
| def previous_example(): | |
| global dynamic_dataset | |
| dynamic_dataset.previous_example() | |
| return prepare_for_highlight(dynamic_dataset.load_current_example()), dynamic_dataset.current | |
| def save_dataset(inp): | |
| global dynamic_dataset | |
| with open("data/annotated_data.json", "wt") as file: | |
| json.dump(dynamic_dataset.data, file) | |
| return [("The validated dataset was saved as data/annotated_data.json", None)] | |
| def load_dataset(): | |
| global dynamic_dataset | |
| try: | |
| with open("data/annotated_data.json", 'rt') as dataset: | |
| ANNOTATED_DATA = json.load(dataset) | |
| dynamic_dataset = DynamicDataset(ANNOTATED_DATA) | |
| max_value = len(dynamic_dataset.data) - 1 if dynamic_dataset.data else 0 | |
| return prepare_for_highlight(dynamic_dataset.load_current_example()), 0, max_value | |
| except Exception as e: | |
| return [("Error loading dataset: " + str(e), None)], 0, 0 | |
| # Original annotation functions | |
| def transform_data(data): | |
| tokens = tokenize_text(data['text']) | |
| spans = [] | |
| for entity in data['entities']: | |
| entity_tokens = tokenize_text(entity['word']) | |
| entity_length = len(entity_tokens) | |
| # Find the start and end indices of each entity in the tokenized text | |
| for i in range(len(tokens) - entity_length + 1): | |
| if tokens[i:i + entity_length] == entity_tokens: | |
| spans.append([i, i + entity_length - 1, entity['entity']]) | |
| break | |
| return {"tokenized_text": tokens, "ner": spans, "validated": False} | |
| def merge_entities(entities): | |
| if not entities: | |
| return [] | |
| merged = [] | |
| current = entities[0] | |
| for next_entity in entities[1:]: | |
| if next_entity['entity'] == current['entity'] and (next_entity['start'] == current['end'] + 1 or next_entity['start'] == current['end']): | |
| current['word'] += ' ' + next_entity['word'] | |
| current['end'] = next_entity['end'] | |
| else: | |
| merged.append(current) | |
| current = next_entity | |
| merged.append(current) | |
| return merged | |
| def annotate_text(model, text, labels: List[str], threshold: float, nested_ner: bool) -> Dict: | |
| labels = [label.strip() for label in labels] | |
| r = { | |
| "text": text, | |
| "entities": [ | |
| { | |
| "entity": entity["label"], | |
| "word": entity["text"], | |
| "start": entity["start"], | |
| "end": entity["end"], | |
| "score": 0, | |
| } | |
| for entity in model.predict_entities( | |
| text, labels, flat_ner=not nested_ner, threshold=threshold | |
| ) | |
| ], | |
| } | |
| r["entities"] = merge_entities(r["entities"]) | |
| return transform_data(r) | |
| class AutoAnnotator: | |
| def __init__( | |
| self, model: str = "knowledgator/gliner-multitask-large-v0.5", | |
| device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu') | |
| ) -> None: | |
| self.model = GLiNER.from_pretrained(model).to(device) | |
| self.annotated_data = [] | |
| self.stat = { | |
| "total": None, | |
| "current": -1 | |
| } | |
| def auto_annotate( | |
| self, data: List[str], labels: List[str], | |
| prompt: Union[str, List[str]] = None, threshold: float = 0.5, nested_ner: bool = False | |
| ) -> List[Dict]: | |
| self.stat["total"] = len(data) | |
| self.stat["current"] = -1 # Reset current progress | |
| for text in data: | |
| self.stat["current"] += 1 | |
| if isinstance(prompt, list): | |
| prompt_text = random.choice(prompt) | |
| else: | |
| prompt_text = prompt | |
| text = f"{prompt_text}\n{text}" if prompt_text else text | |
| annotation = annotate_text(self.model, text, labels, threshold, nested_ner) | |
| if not annotation["ner"]: # If no entities identified | |
| annotation = {"tokenized_text": tokenize_text(text), "ner": [], "validated": False} | |
| self.annotated_data.append(annotation) | |
| return self.annotated_data | |
| # Global variables | |
| annotator = None | |
| sentences = [] | |
| def process_uploaded_file(file_obj): | |
| if file_obj is None: | |
| return "Please upload a file first!" | |
| try: | |
| # Read the uploaded file | |
| with open(file_obj.name, 'r', encoding='utf-8') as f: | |
| global sentences | |
| sentences = [line.strip() for line in f if line.strip()] | |
| return f"Successfully loaded {len(sentences)} sentences from file!" | |
| except Exception as e: | |
| return f"Error reading file: {str(e)}" | |
| def annotate(model, labels, threshold, prompt): | |
| global annotator | |
| try: | |
| if not sentences: | |
| return "Please upload a file with text first!" | |
| labels = [label.strip() for label in labels.split(",")] | |
| annotator = AutoAnnotator(model) | |
| annotated_data = annotator.auto_annotate(sentences, labels, prompt, threshold) | |
| # Save annotated data | |
| os.makedirs("data", exist_ok=True) | |
| with open("data/annotated_data.json", "wt") as file: | |
| json.dump(annotated_data, file, ensure_ascii=False) | |
| # Upload to Hugging Face Hub | |
| api = HfApi() | |
| api.upload_file( | |
| path_or_fileobj="data/annotated_data.json", | |
| path_in_repo="annotated_data.json", | |
| repo_id="YOUR_USERNAME/YOUR_REPO_NAME", # Replace with your repo | |
| repo_type="dataset" | |
| ) | |
| return "Successfully annotated and saved to Hugging Face Hub!" | |
| except Exception as e: | |
| return f"Error during annotation: {str(e)}" | |
| def convert_hf_dataset_to_ner_format(dataset): | |
| """Convert Hugging Face dataset to NER format""" | |
| converted_data = [] | |
| for item in dataset: | |
| # Assuming the dataset has 'tokens' and 'ner_tags' fields | |
| # Adjust the field names based on your dataset structure | |
| if 'tokens' in item and 'ner_tags' in item: | |
| ner_spans = [] | |
| current_span = None | |
| for i, (token, tag) in enumerate(zip(item['tokens'], item['ner_tags'])): | |
| if tag != 'O': # Not Outside | |
| if current_span is None: | |
| current_span = [i, i, tag] | |
| elif tag == current_span[2]: | |
| current_span[1] = i | |
| else: | |
| ner_spans.append(current_span) | |
| current_span = [i, i, tag] | |
| elif current_span is not None: | |
| ner_spans.append(current_span) | |
| current_span = None | |
| if current_span is not None: | |
| ner_spans.append(current_span) | |
| converted_data.append({ | |
| "tokenized_text": item['tokens'], | |
| "ner": ner_spans, | |
| "validated": False | |
| }) | |
| return converted_data | |
| def load_from_huggingface(dataset_name: str, split: str = "train"): | |
| """Load dataset from Hugging Face Hub""" | |
| try: | |
| dataset = load_dataset(dataset_name, split=split) | |
| converted_data = convert_hf_dataset_to_ner_format(dataset) | |
| # Save the converted data | |
| os.makedirs("data", exist_ok=True) | |
| with open("data/annotated_data.json", "wt") as file: | |
| json.dump(converted_data, file, ensure_ascii=False) | |
| return f"Successfully loaded and converted dataset: {dataset_name}" | |
| except Exception as e: | |
| return f"Error loading dataset: {str(e)}" | |
| def load_from_local_file(file_path: str, file_format: str = "json"): | |
| """Load and convert data from local file in various formats""" | |
| try: | |
| if file_format == "json": | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| if isinstance(data, list): | |
| # If data is already in the correct format | |
| if all("tokenized_text" in item and "ner" in item for item in data): | |
| return data | |
| # Convert from other JSON formats | |
| converted_data = [] | |
| for item in data: | |
| if "tokens" in item and "ner_tags" in item: | |
| ner_spans = [] | |
| current_span = None | |
| for i, (token, tag) in enumerate(zip(item["tokens"], item["ner_tags"])): | |
| if tag != "O": | |
| if current_span is None: | |
| current_span = [i, i, tag] | |
| elif tag == current_span[2]: | |
| current_span[1] = i | |
| else: | |
| ner_spans.append(current_span) | |
| current_span = [i, i, tag] | |
| elif current_span is not None: | |
| ner_spans.append(current_span) | |
| current_span = None | |
| if current_span is not None: | |
| ner_spans.append(current_span) | |
| converted_data.append({ | |
| "tokenized_text": item["tokens"], | |
| "ner": ner_spans, | |
| "validated": False | |
| }) | |
| return converted_data | |
| else: | |
| raise ValueError("JSON file must contain a list of examples") | |
| elif file_format == "conll": | |
| converted_data = [] | |
| current_example = {"tokens": [], "ner_tags": []} | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| if line.startswith("#"): | |
| continue | |
| parts = line.split() | |
| if len(parts) >= 2: | |
| token, tag = parts[0], parts[-1] | |
| current_example["tokens"].append(token) | |
| current_example["ner_tags"].append(tag) | |
| elif current_example["tokens"]: | |
| # Convert current example | |
| ner_spans = [] | |
| current_span = None | |
| for i, (token, tag) in enumerate(zip(current_example["tokens"], current_example["ner_tags"])): | |
| if tag != "O": | |
| if current_span is None: | |
| current_span = [i, i, tag] | |
| elif tag == current_span[2]: | |
| current_span[1] = i | |
| else: | |
| ner_spans.append(current_span) | |
| current_span = [i, i, tag] | |
| elif current_span is not None: | |
| ner_spans.append(current_span) | |
| current_span = None | |
| if current_span is not None: | |
| ner_spans.append(current_span) | |
| converted_data.append({ | |
| "tokenized_text": current_example["tokens"], | |
| "ner": ner_spans, | |
| "validated": False | |
| }) | |
| current_example = {"tokens": [], "ner_tags": []} | |
| # Handle last example if exists | |
| if current_example["tokens"]: | |
| ner_spans = [] | |
| current_span = None | |
| for i, (token, tag) in enumerate(zip(current_example["tokens"], current_example["ner_tags"])): | |
| if tag != "O": | |
| if current_span is None: | |
| current_span = [i, i, tag] | |
| elif tag == current_span[2]: | |
| current_span[1] = i | |
| else: | |
| ner_spans.append(current_span) | |
| current_span = [i, i, tag] | |
| elif current_span is not None: | |
| ner_spans.append(current_span) | |
| current_span = None | |
| if current_span is not None: | |
| ner_spans.append(current_span) | |
| converted_data.append({ | |
| "tokenized_text": current_example["tokens"], | |
| "ner": ner_spans, | |
| "validated": False | |
| }) | |
| return converted_data | |
| elif file_format == "txt": | |
| # Simple text file with one sentence per line | |
| converted_data = [] | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| tokens = tokenize_text(line) | |
| converted_data.append({ | |
| "tokenized_text": tokens, | |
| "ner": [], | |
| "validated": False | |
| }) | |
| return converted_data | |
| else: | |
| raise ValueError(f"Unsupported file format: {file_format}") | |
| except Exception as e: | |
| raise Exception(f"Error loading file: {str(e)}") | |
| def process_local_file(file_obj, file_format): | |
| """Process uploaded local file""" | |
| if file_obj is None: | |
| return "Please upload a file first!" | |
| try: | |
| # Load and convert the data | |
| data = load_from_local_file(file_obj.name, file_format) | |
| # Save the converted data | |
| os.makedirs("data", exist_ok=True) | |
| with open("data/annotated_data.json", "wt") as file: | |
| json.dump(data, file, ensure_ascii=False) | |
| return f"Successfully loaded and converted {len(data)} examples from {file_format} file!" | |
| except Exception as e: | |
| return f"Error processing file: {str(e)}" | |
| # Create the main interface with tabs | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# NER Annotation Tool") | |
| with gr.Tabs(): | |
| with gr.TabItem("Auto Annotation"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_uploader = gr.File(label="Upload text file (one sentence per line)") | |
| upload_status = gr.Textbox(label="Upload Status") | |
| file_uploader.change(fn=process_uploaded_file, inputs=[file_uploader], outputs=[upload_status]) | |
| with gr.Column(): | |
| model = gr.Dropdown( | |
| label="Choose the model for annotation", | |
| choices=AVAILABLE_MODELS, | |
| value=AVAILABLE_MODELS[0] | |
| ) | |
| labels = gr.Textbox( | |
| label="Labels", | |
| placeholder="Enter comma-separated labels (e.g., PERSON,ORG,LOC)", | |
| scale=2 | |
| ) | |
| threshold = gr.Slider( | |
| 0, 1, | |
| value=0.3, | |
| step=0.01, | |
| label="Threshold", | |
| info="Lower threshold increases entity predictions" | |
| ) | |
| prompt = gr.Textbox( | |
| label="Prompt", | |
| placeholder="Enter your annotation prompt (optional)", | |
| scale=2 | |
| ) | |
| annotate_btn = gr.Button("Annotate Data") | |
| output_info = gr.Textbox(label="Processing Status") | |
| annotate_btn.click( | |
| fn=annotate, | |
| inputs=[model, labels, threshold, prompt], | |
| outputs=[output_info] | |
| ) | |
| with gr.TabItem("Dataset Viewer"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| with gr.Row(): | |
| load_local_btn = gr.Button("Load Local Dataset") | |
| load_hf_btn = gr.Button("Load from Hugging Face") | |
| local_file = gr.File(label="Upload Local Dataset", visible=False) | |
| file_format = gr.Dropdown( | |
| choices=["json", "conll", "txt"], | |
| value="json", | |
| label="File Format", | |
| visible=False | |
| ) | |
| local_status = gr.Textbox(label="Local File Status", visible=False) | |
| dataset_name = gr.Textbox( | |
| label="Hugging Face Dataset Name", | |
| placeholder="Enter dataset name (e.g., conll2003)", | |
| visible=False | |
| ) | |
| dataset_split = gr.Dropdown( | |
| choices=["train", "validation", "test"], | |
| value="train", | |
| label="Dataset Split", | |
| visible=False | |
| ) | |
| bar = gr.Slider(minimum=0, maximum=1, step=1, label="Progress", interactive=False) | |
| with gr.Row(): | |
| previous_btn = gr.Button("Previous example") | |
| apply_btn = gr.Button("Apply changes") | |
| next_btn = gr.Button("Next example") | |
| validate_btn = gr.Button("Validate") | |
| save_btn = gr.Button("Save validated dataset") | |
| inp_box = gr.HighlightedText(value=None, interactive=True) | |
| def toggle_local_inputs(): | |
| return { | |
| local_file: gr.update(visible=True), | |
| file_format: gr.update(visible=True), | |
| local_status: gr.update(visible=True), | |
| dataset_name: gr.update(visible=False), | |
| dataset_split: gr.update(visible=False) | |
| } | |
| def toggle_hf_inputs(): | |
| return { | |
| local_file: gr.update(visible=False), | |
| file_format: gr.update(visible=False), | |
| local_status: gr.update(visible=False), | |
| dataset_name: gr.update(visible=True), | |
| dataset_split: gr.update(visible=True) | |
| } | |
| load_local_btn.click( | |
| fn=toggle_local_inputs, | |
| inputs=None, | |
| outputs=[local_file, file_format, local_status, dataset_name, dataset_split] | |
| ) | |
| load_hf_btn.click( | |
| fn=toggle_hf_inputs, | |
| inputs=None, | |
| outputs=[local_file, file_format, local_status, dataset_name, dataset_split] | |
| ) | |
| def process_and_load_local(file_obj, format): | |
| status = process_local_file(file_obj, format) | |
| if "Successfully" in status: | |
| return load_dataset() | |
| return [status], 0, 0 | |
| local_file.change( | |
| fn=process_and_load_local, | |
| inputs=[local_file, file_format], | |
| outputs=[inp_box, bar] | |
| ) | |
| def load_hf_dataset(name, split): | |
| status = load_from_huggingface(name, split) | |
| if "Successfully" in status: | |
| return load_dataset() | |
| return [status], 0, 0 | |
| load_hf_btn.click( | |
| fn=load_hf_dataset, | |
| inputs=[dataset_name, dataset_split], | |
| outputs=[inp_box, bar] | |
| ) | |
| apply_btn.click(fn=update_example, inputs=inp_box, outputs=inp_box) | |
| save_btn.click(fn=save_dataset, inputs=inp_box, outputs=inp_box) | |
| validate_btn.click(fn=validate_example, inputs=None, outputs=inp_box) | |
| next_btn.click(fn=next_example, inputs=None, outputs=[inp_box, bar]) | |
| previous_btn.click(fn=previous_example, inputs=None, outputs=[inp_box, bar]) | |
| demo.launch() |