| from typing import Dict | |
| import random | |
| import datetime | |
| import string | |
| from src.domain.doc import Doc | |
| from src.domain.wikidoc import WikiPage | |
| from src.view.log_msg import create_msg_from | |
| import src.tools.semantic_db as semantic_db | |
| from src.tools.wiki import Wiki | |
| from src.tools.llm_tools import get_wikilist, get_public_paragraph, get_private_paragraph | |
| from src.tools.semantic_db import add_texts_to_collection, query_collection | |
| class Controller: | |
| def __init__(self, config: Dict): | |
| self.templates_path = config['templates_path'] | |
| self.generated_docs_path = config['generated_docs_path'] | |
| self.styled_docs_path = config['styled_docs_path'] | |
| self.new_docs = [] | |
| self.gen_docs = [] | |
| template_path = config['templates_path'] + '/' + config['templates'][config['default_template_index']] | |
| self.default_template = Doc(template_path) | |
| self.template = self.default_template | |
| self.log = [] | |
| self.differences = [] | |
| def copy_docs(self, temp_docs: []): | |
| get_name = lambda doc: doc.name.split('/')[-1].split('.')[0] | |
| doc_names = [get_name(doc) for doc in temp_docs] | |
| docs = [Doc(path=doc.name) for doc in temp_docs] | |
| style_paths = [f"{self.generated_docs_path}/{dn}_.docx" for dn in doc_names] | |
| gen_paths = [f"{self.generated_docs_path}/{dn}_e.docx" for dn in doc_names] | |
| for doc, style_path, gen_path in zip(docs, style_paths, gen_paths): | |
| new_doc = doc.copy(style_path) | |
| self.new_docs.append(new_doc) | |
| def clear_docs(self): | |
| for new_doc in self.new_docs: | |
| new_doc.clear() | |
| for gen_doc in self.gen_docs: | |
| gen_doc.clear() | |
| self.new_docs = [] | |
| self.gen_docs = [] | |
| self.log = [] | |
| def set_template(self, template_name: str = ""): | |
| if not template_name: | |
| self.template = self.default_template | |
| else: | |
| template_path = f"{self.templates_path}/{template_name}" | |
| self.template = Doc(template_path) | |
| def get_difference_with_template(self): | |
| self.differences = [] | |
| for new_doc in self.new_docs: | |
| diff_styles = new_doc.get_different_styles_with_template(template=self.template) | |
| diff_dicts = [{'doc': new_doc, 'style': s} for s in diff_styles] | |
| self.differences += diff_dicts | |
| template_styles = [name for name in self.template.styles.names if name.startswith('.')] | |
| return self.differences, template_styles | |
| def map_style(self, this_style_index: int, template_style_name: str): | |
| """ | |
| maps a style from 'this' document into a style from the template | |
| """ | |
| diff_dict = self.differences[this_style_index] | |
| doc = diff_dict['doc'] | |
| this_style_name = diff_dict['style'] | |
| log = doc.copy_one_style(this_style_name, template_style_name, self.template) | |
| self.log.append({doc.name: log}) | |
| def apply_template(self, add_front_pages: bool): | |
| for new_doc in self.new_docs: | |
| log = new_doc.apply_template(template=self.template, add_front_pages=add_front_pages) | |
| if log: | |
| self.log.append({new_doc.name: log}) | |
| def reset(self): | |
| for new_doc in self.new_docs: | |
| new_doc.delete() | |
| for gen_doc in self.gen_docs: | |
| gen_doc.delete() | |
| self.new_docs = [] | |
| self.gen_docs = [] | |
| def get_log(self): | |
| msg_log = create_msg_from(self.log, self.new_docs) | |
| return msg_log | |
| """ | |
| Source Control | |
| """ | |
| def get_or_create_collection(self, id_: str) -> str: | |
| """ | |
| generates a new id if needed | |
| """ | |
| if id_ != '-1': | |
| return id_ | |
| else: | |
| now = datetime.datetime.now().strftime("%m%d%H%M") | |
| letters = string.ascii_lowercase + string.digits | |
| id_ = now + '-' + ''.join(random.choice(letters) for _ in range(10)) | |
| semantic_db.get_or_create_collection(id_) | |
| return id_ | |
| def wiki_fetch(self) -> [str]: | |
| """ | |
| returns the title of the wikipages corresponding to the tasks described in the input text | |
| """ | |
| all_tasks = [] | |
| for new_doc in self.new_docs: | |
| all_tasks += new_doc.tasks | |
| wiki_lists = [get_wikilist(t) for t in all_tasks] | |
| flatten_wiki_list = list(set().union(*[set(w) for w in wiki_lists])) | |
| return flatten_wiki_list | |
| async def wiki_upload_and_store(self, wiki_title: str, collection_name: str): | |
| """ | |
| uploads one wikipage and stores them into the right collection | |
| """ | |
| wikipage = Wiki().fetch(wiki_title) | |
| wiki_title = wiki_title | |
| if type(wikipage) != str: | |
| texts = WikiPage(wikipage.page_content).get_paragraphs() | |
| add_texts_to_collection(coll_name=collection_name, texts=texts, file=wiki_title, source='wiki') | |
| else: | |
| print(wikipage) | |
| """ | |
| Generate Control | |
| """ | |
| def generate_doc_from_db(self, collection_name: str, from_files: [str]) -> [str]: | |
| def query_from_task(task): | |
| return get_public_paragraph(task) | |
| gen_paths = [] | |
| for new_doc in self.new_docs: | |
| queries = [query_from_task(t) for t in new_doc.tasks] | |
| texts_list = [query_collection(coll_name=collection_name, query=q, from_files=from_files) for q in queries] | |
| task_resolutions = [get_private_paragraph(task=task, texts=texts) | |
| for task, texts in zip(new_doc.tasks, texts_list)] | |
| gen_path = f"{self.generated_docs_path}/{new_doc.name}e.docx" | |
| gen_doc = new_doc.copy(gen_path) | |
| gen_doc.replace_tasks(task_resolutions) | |
| gen_doc.save_as_docx() | |
| gen_paths.append(gen_doc.path) | |
| self.gen_docs.append(gen_doc) | |
| return gen_paths | |