|
|
import os |
|
|
import re |
|
|
import pandas as pd |
|
|
import torch |
|
|
import gradio as gr |
|
|
|
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
from peft import PeftModel |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
BASE_MODEL = "meta-llama/Llama-3.2-3B-Instruct" |
|
|
ADAPTER_REPO = "Turkiii0/UT-AI-model" |
|
|
EXCEL_FILE = "1000 Q.xlsx" |
|
|
|
|
|
SIM_THRESHOLD = 0.60 |
|
|
MAX_RAG_ANSWER_LEN = 220 |
|
|
|
|
|
|
|
|
HF_TOKEN = os.getenv("HF_TOKEN") |
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("🔐 HF Token:", "Found" if HF_TOKEN else "Missing") |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained( |
|
|
BASE_MODEL, |
|
|
use_auth_token=HF_TOKEN, |
|
|
use_fast=True |
|
|
) |
|
|
|
|
|
if tokenizer.pad_token is None: |
|
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
|
|
dtype = torch.float16 if torch.cuda.is_available() else torch.float32 |
|
|
|
|
|
print("🧠 Loading base model...") |
|
|
base_model = AutoModelForCausalLM.from_pretrained( |
|
|
BASE_MODEL, |
|
|
use_auth_token=HF_TOKEN, |
|
|
torch_dtype=dtype |
|
|
) |
|
|
|
|
|
print("🧩 Loading LoRA adapter...") |
|
|
model = PeftModel.from_pretrained( |
|
|
base_model, |
|
|
ADAPTER_REPO, |
|
|
use_auth_token=HF_TOKEN |
|
|
) |
|
|
|
|
|
model.to(device) |
|
|
model.eval() |
|
|
print("✅ Model ready on:", device) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
df = pd.read_excel(EXCEL_FILE) |
|
|
|
|
|
print("🧾 Columns:", list(df.columns)) |
|
|
|
|
|
q_candidates = [c for c in df.columns |
|
|
if "سؤال" in str(c).lower() or "question" in str(c).lower()] |
|
|
a_candidates = [c for c in df.columns |
|
|
if "جواب" in str(c).lower() or "answer" in str(c).lower()] |
|
|
|
|
|
if q_candidates and a_candidates: |
|
|
QCOL = q_candidates[0] |
|
|
ACOL = a_candidates[0] |
|
|
else: |
|
|
QCOL = df.columns[0] |
|
|
ACOL = df.columns[1] |
|
|
|
|
|
df = df[[QCOL, ACOL]] |
|
|
df.columns = ["question", "answer"] |
|
|
|
|
|
df["question"] = df["question"].astype(str).str.strip() |
|
|
df["answer"] = df["answer"].astype(str).str.strip() |
|
|
|
|
|
qa_data = df.to_dict(orient="records") |
|
|
print("📚 Loaded RAG entries:", len(qa_data)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_code(text: str): |
|
|
"""استخراج كود المقرر مثل CIT1302.""" |
|
|
m = re.search(r"[A-Za-z]{2,4}\s?\d{3,4}", text) |
|
|
if m: |
|
|
return m.group(0).replace(" ", "").upper() |
|
|
return None |
|
|
|
|
|
|
|
|
def normalize_question(q: str) -> str: |
|
|
q = q.strip() |
|
|
match = re.search(r'\b([A-Za-z]{2,4}\s?\d{3,4})\b', q) |
|
|
course = match.group(1).replace(" ", "") if match else None |
|
|
|
|
|
if not course: |
|
|
return q |
|
|
|
|
|
lower_q = q.lower() |
|
|
|
|
|
|
|
|
if lower_q == course.lower(): |
|
|
return f"ماهو مقرر {course}؟" |
|
|
|
|
|
|
|
|
if "متطلبات" in lower_q or "متطلب" in lower_q or "prereq" in lower_q: |
|
|
return f"ماهي متطلبات {course}؟" |
|
|
|
|
|
|
|
|
if "اسم" in lower_q or "name" in lower_q: |
|
|
return f"ما اسم مقرر {course}؟" |
|
|
|
|
|
|
|
|
if "ساع" in lower_q or "hour" in lower_q: |
|
|
return f"كم عدد ساعات مقرر {course}؟" |
|
|
|
|
|
|
|
|
return f"معلومات عن مقرر {course}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
AR_STOPWORDS = { |
|
|
"ما", "هو", "هي", "هل", "عن", "في", "من", "الى", "إلى", |
|
|
"مادة", "مقرر", "المقرر", "المادة", "ماهي", "ماهو", |
|
|
"كم", "متطلبات", "متطلب", "متى" |
|
|
} |
|
|
|
|
|
def tokenize(text: str): |
|
|
text = text.lower() |
|
|
tokens = re.findall(r"[ء-يA-Za-z0-9]+", text) |
|
|
tokens = [t for t in tokens if t not in AR_STOPWORDS and len(t) > 1] |
|
|
return set(tokens) |
|
|
|
|
|
|
|
|
def smart_similarity(user_q: str, candidate_q: str, candidate_a: str = "") -> float: |
|
|
u_tokens = tokenize(user_q) |
|
|
q_tokens = tokenize(candidate_q) |
|
|
a_tokens = tokenize(candidate_a) |
|
|
|
|
|
if not u_tokens: |
|
|
return 0.0 |
|
|
|
|
|
inter_q = u_tokens & q_tokens |
|
|
inter_a = u_tokens & a_tokens |
|
|
|
|
|
score_q = len(inter_q) / len(u_tokens) |
|
|
score_a = len(inter_a) / len(u_tokens) |
|
|
|
|
|
score = score_q * 0.7 + score_a * 0.3 |
|
|
|
|
|
code = extract_code(user_q) |
|
|
if code: |
|
|
code_nospace = code.replace(" ", "").lower() |
|
|
if code_nospace in (candidate_q + " " + candidate_a).replace(" ", "").lower(): |
|
|
score += 0.3 |
|
|
|
|
|
return score |
|
|
|
|
|
|
|
|
def filter_by_code(code: str, records): |
|
|
"""نفلتر السطور اللي تحتوي نفس كود المادة قدر الإمكان.""" |
|
|
if not code: |
|
|
return list(records) |
|
|
|
|
|
code_nospace = code.replace(" ", "").upper() |
|
|
out = [] |
|
|
for rec in records: |
|
|
text = (rec["question"] + " " + rec["answer"]).replace(" ", "").upper() |
|
|
if code_nospace in text: |
|
|
out.append(rec) |
|
|
return out or list(records) |
|
|
|
|
|
|
|
|
def best_match(user_q: str, records): |
|
|
best = None |
|
|
best_sim = 0.0 |
|
|
|
|
|
for rec in records: |
|
|
sim = smart_similarity(user_q, rec["question"], rec["answer"]) |
|
|
if sim > best_sim: |
|
|
best = rec |
|
|
best_sim = sim |
|
|
|
|
|
return best, best_sim |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SYSTEM_PROMPT = ( |
|
|
"أنت مساعد أكاديمي متخصص في جامعة تبوك. " |
|
|
"أجب فقط بالمعلومة المطلوبة (اسم مقرر، متطلب سابق، عدد ساعات، أو ضابط أكاديمي محدد) " |
|
|
"بدون شرح إضافي وبدون كلام زائد." |
|
|
) |
|
|
|
|
|
def clean_repetition(text: str) -> str: |
|
|
"""يحاول يشيل التكرار بعد الفواصل العربية.""" |
|
|
parts = [p.strip() for p in text.split("،") if p.strip()] |
|
|
seen = set() |
|
|
out = [] |
|
|
for p in parts: |
|
|
if p not in seen: |
|
|
out.append(p) |
|
|
seen.add(p) |
|
|
return "، ".join(out) if out else text |
|
|
|
|
|
|
|
|
def generate_from_model(q: str) -> str: |
|
|
msgs = [ |
|
|
{"role": "system", "content": SYSTEM_PROMPT}, |
|
|
{"role": "user", "content": q}, |
|
|
] |
|
|
|
|
|
prompt = tokenizer.apply_chat_template( |
|
|
msgs, |
|
|
add_generation_prompt=True, |
|
|
tokenize=False |
|
|
) |
|
|
|
|
|
inputs = tokenizer( |
|
|
prompt, |
|
|
return_tensors="pt", |
|
|
truncation=True, |
|
|
padding=True, |
|
|
max_length=512 |
|
|
).to(device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=80, |
|
|
do_sample=True, |
|
|
temperature=0.3, |
|
|
top_p=0.9, |
|
|
repetition_penalty=1.35, |
|
|
no_repeat_ngram_size=4, |
|
|
eos_token_id=tokenizer.eos_token_id, |
|
|
pad_token_id=tokenizer.pad_token_id |
|
|
) |
|
|
|
|
|
prompt_len = inputs["input_ids"].shape[-1] |
|
|
out_ids = outputs[0][prompt_len:] |
|
|
ans = tokenizer.decode(out_ids, skip_special_tokens=True).strip() |
|
|
ans = " ".join(ans.split()) |
|
|
ans = clean_repetition(ans) |
|
|
return ans if ans else "لم أجد إجابة واضحة لهذا السؤال." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def smart_answer(user_q: str) -> str: |
|
|
user_q = user_q.strip() |
|
|
if not user_q: |
|
|
return "اكتب سؤالك أولاً." |
|
|
|
|
|
normalized = normalize_question(user_q) |
|
|
code = extract_code(normalized) |
|
|
|
|
|
|
|
|
candidates = filter_by_code(code, qa_data) |
|
|
|
|
|
rec, sim = best_match(normalized, candidates) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if rec and sim >= SIM_THRESHOLD and len(rec["answer"]) <= MAX_RAG_ANSWER_LEN: |
|
|
return rec["answer"].strip() |
|
|
|
|
|
|
|
|
return generate_from_model(normalized) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chat_fn(msg, history): |
|
|
return smart_answer(msg) |
|
|
|
|
|
chat_ui = gr.ChatInterface( |
|
|
fn=chat_fn, |
|
|
title="مرشد التقويم الأكاديمي - جامعة تبوك", |
|
|
description="اسأل عن: المتطلبات، أسماء المقررات، الساعات، والمعلومات الأكاديمية.", |
|
|
examples=[ |
|
|
"CIT 1302", |
|
|
"متطلبات CIT1302", |
|
|
"اسم CIT 1401", |
|
|
"كم ساعات CS 322", |
|
|
"Prerequisite MGT 211" |
|
|
] |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
chat_ui.launch() |
|
|
|