File size: 5,876 Bytes
4cf88e8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
from typing import Dict
import random
import datetime
import string

from src.domain.doc import Doc
from src.domain.wikidoc import WikiPage
from src.view.log_msg import create_msg_from
import src.tools.semantic_db as semantic_db
from src.tools.wiki import Wiki
from src.tools.llm_tools import get_wikilist, get_public_paragraph, get_private_paragraph
from src.tools.semantic_db import add_texts_to_collection, query_collection


class Controller:

    def __init__(self, config: Dict):
        self.templates_path = config['templates_path']
        self.generated_docs_path = config['generated_docs_path']
        self.styled_docs_path = config['styled_docs_path']
        self.new_docs = []
        self.gen_docs = []

        template_path = config['templates_path'] + '/' + config['templates'][config['default_template_index']]
        self.default_template = Doc(template_path)
        self.template = self.default_template
        self.log = []
        self.differences = []

    def copy_docs(self, temp_docs: []):
        get_name = lambda doc: doc.name.split('/')[-1].split('.')[0]
        doc_names = [get_name(doc) for doc in temp_docs]
        docs = [Doc(path=doc.name) for doc in temp_docs]
        style_paths = [f"{self.generated_docs_path}/{dn}_.docx" for dn in doc_names]
        gen_paths = [f"{self.generated_docs_path}/{dn}_e.docx" for dn in doc_names]
        for doc, style_path, gen_path in zip(docs, style_paths, gen_paths):
            new_doc = doc.copy(style_path)
            self.new_docs.append(new_doc)

    def clear_docs(self):
        for new_doc in self.new_docs:
            new_doc.clear()
        for gen_doc in self.gen_docs:
            gen_doc.clear()
        self.new_docs = []
        self.gen_docs = []
        self.log = []

    def set_template(self, template_name: str = ""):
        if not template_name:
            self.template = self.default_template
        else:
            template_path = f"{self.templates_path}/{template_name}"
            self.template = Doc(template_path)

    def get_difference_with_template(self):
        self.differences = []
        for new_doc in self.new_docs:
            diff_styles = new_doc.get_different_styles_with_template(template=self.template)
            diff_dicts = [{'doc': new_doc, 'style': s} for s in diff_styles]
            self.differences += diff_dicts
        template_styles = [name for name in self.template.styles.names if name.startswith('.')]
        return self.differences, template_styles

    def map_style(self, this_style_index: int, template_style_name: str):
        """
        maps a style from 'this' document into a style from the template
        """
        diff_dict = self.differences[this_style_index]
        doc = diff_dict['doc']
        this_style_name = diff_dict['style']
        log = doc.copy_one_style(this_style_name, template_style_name, self.template)
        self.log.append({doc.name: log})

    def apply_template(self, add_front_pages: bool):
        for new_doc in self.new_docs:
            log = new_doc.apply_template(template=self.template, add_front_pages=add_front_pages)
            if log:
                self.log.append({new_doc.name: log})

    def reset(self):
        for new_doc in self.new_docs:
            new_doc.delete()
        for gen_doc in self.gen_docs:
            gen_doc.delete()
        self.new_docs = []
        self.gen_docs = []


    def get_log(self):
        msg_log = create_msg_from(self.log, self.new_docs)
        return msg_log

    """
    Source Control
    """

    def get_or_create_collection(self, id_: str) -> str:
        """
        generates a new id if needed
        """
        if id_ != '-1':
            return id_
        else:
            now = datetime.datetime.now().strftime("%m%d%H%M")
            letters = string.ascii_lowercase + string.digits
            id_ = now + '-' + ''.join(random.choice(letters) for _ in range(10))
            semantic_db.get_or_create_collection(id_)
        return id_

    def wiki_fetch(self) -> [str]:
        """
        returns the title of the wikipages corresponding to the tasks described in the input text
        """
        all_tasks = []
        for new_doc in self.new_docs:
            all_tasks += new_doc.tasks
        wiki_lists = [get_wikilist(t) for t in all_tasks]
        flatten_wiki_list = list(set().union(*[set(w) for w in wiki_lists]))
        return flatten_wiki_list

    async def wiki_upload_and_store(self, wiki_title: str, collection_name: str):
        """
        uploads one wikipage and stores them into the right collection
        """
        wikipage = Wiki().fetch(wiki_title)
        wiki_title = wiki_title
        if type(wikipage) != str:
            texts = WikiPage(wikipage.page_content).get_paragraphs()
            add_texts_to_collection(coll_name=collection_name, texts=texts, file=wiki_title, source='wiki')
        else:
            print(wikipage)

    """
    Generate Control
    """

    def generate_doc_from_db(self, collection_name: str, from_files: [str]) -> [str]:

        def query_from_task(task):
            return get_public_paragraph(task)

        gen_paths = []

        for new_doc in self.new_docs:
            queries = [query_from_task(t) for t in new_doc.tasks]
            texts_list = [query_collection(coll_name=collection_name, query=q, from_files=from_files) for q in queries]
            task_resolutions = [get_private_paragraph(task=task, texts=texts)
                                for task, texts in zip(new_doc.tasks, texts_list)]

            gen_path = f"{self.generated_docs_path}/{new_doc.name}e.docx"
            gen_doc = new_doc.copy(gen_path)

            gen_doc.replace_tasks(task_resolutions)
            gen_doc.save_as_docx()
            gen_paths.append(gen_doc.path)
            self.gen_docs.append(gen_doc)
        return gen_paths