| import os | |
| from transformers import TokenClassificationPipeline,DebertaV2TokenizerFast | |
| from transformers.models.bert_japanese.tokenization_bert_japanese import MecabTokenizer | |
| try: | |
| from transformers.utils import cached_file | |
| except: | |
| from transformers.file_utils import cached_path,hf_bucket_url | |
| cached_file=lambda x,y:os.path.join(x,y) if os.path.isdir(x) else cached_path(hf_bucket_url(x,y)) | |
| class UniversalDependenciesPipeline(TokenClassificationPipeline): | |
| def _forward(self,model_inputs): | |
| import torch | |
| v=model_inputs["input_ids"][0].tolist() | |
| with torch.no_grad(): | |
| e=self.model(input_ids=torch.tensor([v[0:i]+[self.tokenizer.mask_token_id]+v[i+1:]+[j] for i,j in enumerate(v[1:-1],1)],device=self.device)) | |
| return {"logits":e.logits[:,1:-2,:],**model_inputs} | |
| def postprocess(self,model_outputs,**kwargs): | |
| import numpy | |
| e=model_outputs["logits"].numpy() | |
| r=[1 if i==0 else -1 if j.endswith("|root") else 0 for i,j in sorted(self.model.config.id2label.items())] | |
| e+=numpy.where(numpy.add.outer(numpy.identity(e.shape[0]),r)==0,0,numpy.nan) | |
| g=self.model.config.label2id["X|_|goeswith"] | |
| r=numpy.tri(e.shape[0]) | |
| for i in range(e.shape[0]): | |
| for j in range(i+2,e.shape[1]): | |
| r[i,j]=r[i,j-1] if numpy.nanargmax(e[i,j-1])==g else 1 | |
| e[:,:,g]+=numpy.where(r==0,0,numpy.nan) | |
| m,p=numpy.nanmax(e,axis=2),numpy.nanargmax(e,axis=2) | |
| h=self.chu_liu_edmonds(m) | |
| z=[i for i,j in enumerate(h) if i==j] | |
| if len(z)>1: | |
| k,h=z[numpy.nanargmax(m[z,z])],numpy.nanmin(m)-numpy.nanmax(m) | |
| m[:,z]+=[[0 if j in z and (i!=j or i==k) else h for i in z] for j in range(m.shape[0])] | |
| h=self.chu_liu_edmonds(m) | |
| v=[(s,e) for s,e in model_outputs["offset_mapping"][0].tolist() if s<e] | |
| q=[self.model.config.id2label[p[j,i]].split("|") for i,j in enumerate(h)] | |
| if "aggregation_strategy" in kwargs and kwargs["aggregation_strategy"]!="none": | |
| for i,j in reversed(list(enumerate(q[1:],1))): | |
| if j[-1]=="goeswith" and set([t[-1] for t in q[h[i]+1:i+1]])=={"goeswith"}: | |
| h=[b if i>b else b-1 for a,b in enumerate(h) if i!=a] | |
| v[i-1]=(v[i-1][0],v.pop(i)[1]) | |
| q.pop(i) | |
| t=model_outputs["sentence"].replace("\n"," ") | |
| u="# text = "+t+"\n" | |
| for i,(s,e) in enumerate(v): | |
| u+="\t".join([str(i+1),t[s:e],"_",q[i][0],"_","|".join(q[i][1:-1]),str(0 if h[i]==i else h[i]+1),q[i][-1],"_","_" if i+1<len(v) and e<v[i+1][0] else "SpaceAfter=No"])+"\n" | |
| return u+"\n" | |
| def chu_liu_edmonds(self,matrix): | |
| import numpy | |
| h=numpy.nanargmax(matrix,axis=0) | |
| x=[-1 if i==j else j for i,j in enumerate(h)] | |
| for b in [lambda x,i,j:-1 if i not in x else x[i],lambda x,i,j:-1 if j<0 else x[j]]: | |
| y=[] | |
| while x!=y: | |
| y=list(x) | |
| for i,j in enumerate(x): | |
| x[i]=b(x,i,j) | |
| if max(x)<0: | |
| return h | |
| y,x=[i for i,j in enumerate(x) if j==max(x)],[i for i,j in enumerate(x) if j<max(x)] | |
| z=matrix-numpy.nanmax(matrix,axis=0) | |
| m=numpy.block([[z[x,:][:,x],numpy.nanmax(z[x,:][:,y],axis=1).reshape(len(x),1)],[numpy.nanmax(z[y,:][:,x],axis=0),numpy.nanmax(z[y,y])]]) | |
| k=[j if i==len(x) else x[j] if j<len(x) else y[numpy.nanargmax(z[y,x[i]])] for i,j in enumerate(self.chu_liu_edmonds(m))] | |
| h=[j if i in y else k[x.index(i)] for i,j in enumerate(h)] | |
| i=y[numpy.nanargmax(z[x[k[-1]],y] if k[-1]<len(x) else z[y,y])] | |
| h[i]=x[k[-1]] if k[-1]<len(x) else i | |
| return h | |
| class MecabPreTokenizer(MecabTokenizer): | |
| def mecab_split(self,i,normalized_string): | |
| t=str(normalized_string) | |
| z=[] | |
| e=0 | |
| for c in self.tokenize(t): | |
| s=t.find(c,e) | |
| if s<0: | |
| z.append((0,0)) | |
| else: | |
| e=s+len(c) | |
| z.append((s,e)) | |
| return [normalized_string[s:e] for s,e in z if e>0] | |
| def pre_tokenize(self,pretok): | |
| pretok.split(self.mecab_split) | |
| class JumanDebertaV2TokenizerFast(DebertaV2TokenizerFast): | |
| def __init__(self,**kwargs): | |
| from tokenizers.pre_tokenizers import PreTokenizer,Metaspace,Sequence | |
| super().__init__(**kwargs) | |
| d,r="/var/lib/mecab/dic/juman-utf8","/etc/mecabrc" | |
| if not (os.path.isdir(d) and os.path.isfile(r)): | |
| import zipfile | |
| import tempfile | |
| self.dicdir=tempfile.TemporaryDirectory() | |
| d=self.dicdir.name | |
| with zipfile.ZipFile(cached_file(self.name_or_path,"mecab-jumandic-utf8.zip")) as z: | |
| z.extractall(d) | |
| r=os.path.join(d,"mecabrc") | |
| with open(r,"w",encoding="utf-8") as w: | |
| print("dicdir =",d,file=w) | |
| self.custom_pre_tokenizer=Sequence([PreTokenizer.custom(MecabPreTokenizer(mecab_dic=None,mecab_option="-d "+d+" -r "+r)),Metaspace()]) | |
| self._tokenizer.pre_tokenizer=self.custom_pre_tokenizer | |
| def save_pretrained(self,save_directory,**kwargs): | |
| import shutil | |
| from tokenizers.pre_tokenizers import Metaspace | |
| self._auto_map={"AutoTokenizer":[None,"ud.JumanDebertaV2TokenizerFast"]} | |
| self._tokenizer.pre_tokenizer=Metaspace() | |
| super().save_pretrained(save_directory,**kwargs) | |
| self._tokenizer.pre_tokenizer=self.custom_pre_tokenizer | |
| shutil.copy(os.path.abspath(__file__),os.path.join(save_directory,"ud.py")) | |
| shutil.copy(cached_file(self.name_or_path,"mecab-jumandic-utf8.zip"),os.path.join(save_directory,"mecab-jumandic-utf8.zip")) | |