|
|
|
|
|
"""
|
|
|
Main processing orchestrator that ties together all the manufacturing priority logic.
|
|
|
This module provides high-level functions that both the CLI and Gradio interfaces can use.
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import pandas as pd
|
|
|
from typing import Dict, List, Tuple, Optional
|
|
|
from datetime import datetime
|
|
|
|
|
|
from config import WEIGHTS
|
|
|
from sheet_reader import list_sheets, read_sheet
|
|
|
from priority_logic import compute_priority
|
|
|
from output_writer import save_with_instructions
|
|
|
from utils import prompt_weights
|
|
|
|
|
|
|
|
|
class ManufacturingProcessor:
|
|
|
"""
|
|
|
Main processor class for manufacturing priority calculations.
|
|
|
Encapsulates all the logic needed to process Excel files and generate priority rankings.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, weights: Optional[Dict[str, int]] = None):
|
|
|
"""Initialize processor with weights"""
|
|
|
self.weights = weights or WEIGHTS.copy()
|
|
|
self.validate_weights()
|
|
|
|
|
|
def validate_weights(self) -> None:
|
|
|
"""Ensure weights sum to 100"""
|
|
|
total = sum(self.weights.values())
|
|
|
if total != 100:
|
|
|
raise ValueError(f"Weights must sum to 100, got {total}")
|
|
|
|
|
|
def get_file_info(self, file_path: str) -> Dict:
|
|
|
"""Get information about the Excel file"""
|
|
|
if not os.path.exists(file_path):
|
|
|
raise FileNotFoundError(f"File not found: {file_path}")
|
|
|
|
|
|
try:
|
|
|
sheets = list_sheets(file_path)
|
|
|
file_size = os.path.getsize(file_path)
|
|
|
|
|
|
return {
|
|
|
"file_path": file_path,
|
|
|
"file_name": os.path.basename(file_path),
|
|
|
"file_size": file_size,
|
|
|
"sheets": sheets,
|
|
|
"sheet_count": len(sheets)
|
|
|
}
|
|
|
except Exception as e:
|
|
|
raise Exception(f"Error reading file info: {e}")
|
|
|
|
|
|
def validate_sheet_data(self, df: pd.DataFrame) -> Dict:
|
|
|
"""Validate that the sheet has required columns and data"""
|
|
|
from priority_logic import REQUIRED_COLS
|
|
|
|
|
|
|
|
|
df_norm = df.copy()
|
|
|
df_norm.columns = [str(c).strip() for c in df_norm.columns]
|
|
|
|
|
|
|
|
|
missing_cols = [col for col in REQUIRED_COLS if col not in df_norm.columns]
|
|
|
|
|
|
|
|
|
validation_result = {
|
|
|
"valid": len(missing_cols) == 0,
|
|
|
"missing_columns": missing_cols,
|
|
|
"available_columns": list(df.columns),
|
|
|
"row_count": len(df),
|
|
|
"empty_rows": df.isnull().all(axis=1).sum(),
|
|
|
"data_issues": []
|
|
|
}
|
|
|
|
|
|
if validation_result["valid"]:
|
|
|
|
|
|
try:
|
|
|
|
|
|
date_col = "Oldest Product Required First"
|
|
|
date_issues = pd.to_datetime(df_norm[date_col], errors='coerce').isnull().sum()
|
|
|
if date_issues > 0:
|
|
|
validation_result["data_issues"].append(f"{date_issues} invalid dates in '{date_col}'")
|
|
|
|
|
|
|
|
|
qty_col = "Quantity of Each Component"
|
|
|
qty_numeric = pd.to_numeric(df_norm[qty_col], errors='coerce')
|
|
|
qty_issues = qty_numeric.isnull().sum()
|
|
|
if qty_issues > 0:
|
|
|
validation_result["data_issues"].append(f"{qty_issues} non-numeric values in '{qty_col}'")
|
|
|
|
|
|
|
|
|
for col in REQUIRED_COLS:
|
|
|
if col in df_norm.columns:
|
|
|
empty_count = df_norm[col].isnull().sum()
|
|
|
if empty_count == len(df_norm):
|
|
|
validation_result["data_issues"].append(f"Column '{col}' is completely empty")
|
|
|
|
|
|
except Exception as e:
|
|
|
validation_result["data_issues"].append(f"Data validation error: {e}")
|
|
|
|
|
|
return validation_result
|
|
|
|
|
|
def process_file(self,
|
|
|
file_path: str,
|
|
|
sheet_name: str,
|
|
|
min_qty: int = 50,
|
|
|
custom_weights: Dict[str, int] = None) -> Tuple[pd.DataFrame, Dict]:
|
|
|
"""
|
|
|
Process a single sheet from an Excel file and return prioritized results.
|
|
|
|
|
|
Returns:
|
|
|
Tuple of (processed_dataframe, processing_info)
|
|
|
"""
|
|
|
|
|
|
|
|
|
weights = custom_weights or self.weights
|
|
|
if custom_weights:
|
|
|
temp_weights = custom_weights.copy()
|
|
|
if sum(temp_weights.values()) != 100:
|
|
|
raise ValueError("Custom weights must sum to 100")
|
|
|
else:
|
|
|
temp_weights = weights
|
|
|
|
|
|
|
|
|
df = read_sheet(file_path, sheet_name)
|
|
|
if df is None or df.empty:
|
|
|
raise ValueError("Sheet is empty or could not be read")
|
|
|
|
|
|
|
|
|
validation = self.validate_sheet_data(df)
|
|
|
if not validation["valid"]:
|
|
|
raise ValueError(f"Data validation failed: Missing columns {validation['missing_columns']}")
|
|
|
|
|
|
|
|
|
try:
|
|
|
processed_df = compute_priority(df, min_qty=min_qty, weights=temp_weights)
|
|
|
except Exception as e:
|
|
|
raise Exception(f"Priority calculation failed: {e}")
|
|
|
|
|
|
|
|
|
processing_info = {
|
|
|
"timestamp": datetime.now().isoformat(),
|
|
|
"file_name": os.path.basename(file_path),
|
|
|
"sheet_name": sheet_name,
|
|
|
"weights_used": temp_weights,
|
|
|
"min_quantity": min_qty,
|
|
|
"total_products": len(df),
|
|
|
"products_above_threshold": sum(processed_df["QtyThresholdOK"]),
|
|
|
"highest_priority_score": processed_df["PriorityScore"].max(),
|
|
|
"lowest_priority_score": processed_df["PriorityScore"].min(),
|
|
|
"validation_info": validation
|
|
|
}
|
|
|
|
|
|
return processed_df, processing_info
|
|
|
|
|
|
def save_results(self,
|
|
|
processed_df: pd.DataFrame,
|
|
|
output_path: str,
|
|
|
processing_info: Dict) -> str:
|
|
|
"""Save processed results with full documentation"""
|
|
|
|
|
|
try:
|
|
|
save_with_instructions(
|
|
|
processed_df,
|
|
|
output_path,
|
|
|
min_qty=processing_info["min_quantity"],
|
|
|
weights=processing_info["weights_used"]
|
|
|
)
|
|
|
|
|
|
|
|
|
self._add_processing_log(output_path, processing_info)
|
|
|
|
|
|
return output_path
|
|
|
|
|
|
except Exception as e:
|
|
|
raise Exception(f"Failed to save results: {e}")
|
|
|
|
|
|
def _add_processing_log(self, output_path: str, processing_info: Dict):
|
|
|
"""Add a processing log sheet to the output file"""
|
|
|
try:
|
|
|
|
|
|
with pd.ExcelWriter(output_path, mode='a', engine='openpyxl', if_sheet_exists='replace') as writer:
|
|
|
log_data = []
|
|
|
log_data.append(["PROCESSING LOG"])
|
|
|
log_data.append([""])
|
|
|
log_data.append(["Processing Timestamp", processing_info["timestamp"]])
|
|
|
log_data.append(["Source File", processing_info["file_name"]])
|
|
|
log_data.append(["Sheet Processed", processing_info["sheet_name"]])
|
|
|
log_data.append([""])
|
|
|
log_data.append(["SETTINGS USED"])
|
|
|
log_data.append(["Age Weight", f"{processing_info['weights_used']['AGE_WEIGHT']}%"])
|
|
|
log_data.append(["Component Weight", f"{processing_info['weights_used']['COMPONENT_WEIGHT']}%"])
|
|
|
log_data.append(["Manual Weight", f"{processing_info['weights_used']['MANUAL_WEIGHT']}%"])
|
|
|
log_data.append(["Minimum Quantity", processing_info["min_quantity"]])
|
|
|
log_data.append([""])
|
|
|
log_data.append(["RESULTS SUMMARY"])
|
|
|
log_data.append(["Total Products", processing_info["total_products"]])
|
|
|
log_data.append(["Above Threshold", processing_info["products_above_threshold"]])
|
|
|
log_data.append(["Highest Priority Score", f"{processing_info['highest_priority_score']:.4f}"])
|
|
|
log_data.append(["Lowest Priority Score", f"{processing_info['lowest_priority_score']:.4f}"])
|
|
|
|
|
|
if processing_info["validation_info"]["data_issues"]:
|
|
|
log_data.append([""])
|
|
|
log_data.append(["DATA ISSUES FOUND"])
|
|
|
for issue in processing_info["validation_info"]["data_issues"]:
|
|
|
log_data.append(["", issue])
|
|
|
|
|
|
log_df = pd.DataFrame(log_data, columns=["Parameter", "Value"])
|
|
|
log_df.to_excel(writer, sheet_name='Processing_Log', index=False)
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
print(f"Warning: Could not add processing log: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
def quick_process(file_path: str,
|
|
|
sheet_name: str,
|
|
|
output_path: str = None,
|
|
|
min_qty: int = 50,
|
|
|
weights: Optional[Dict[str, int]] = None) -> str:
|
|
|
"""
|
|
|
Quick processing function that handles the full workflow.
|
|
|
|
|
|
Args:
|
|
|
file_path: Path to Excel file
|
|
|
sheet_name: Name of sheet to process
|
|
|
output_path: Where to save results (optional, will auto-generate if not provided)
|
|
|
min_qty: Minimum quantity threshold
|
|
|
weights: Custom weights dict (optional)
|
|
|
|
|
|
Returns:
|
|
|
Path to generated output file
|
|
|
"""
|
|
|
processor = ManufacturingProcessor(weights)
|
|
|
|
|
|
|
|
|
processed_df, processing_info = processor.process_file(
|
|
|
file_path, sheet_name, min_qty, weights
|
|
|
)
|
|
|
|
|
|
|
|
|
if output_path is None:
|
|
|
base_name = os.path.splitext(os.path.basename(file_path))[0]
|
|
|
output_dir = os.path.dirname(file_path)
|
|
|
output_path = os.path.join(output_dir, f"{base_name}_PRIORITY.xlsx")
|
|
|
|
|
|
|
|
|
return processor.save_results(processed_df, output_path, processing_info)
|
|
|
|
|
|
|
|
|
def get_file_preview(file_path: str, sheet_name: str, max_rows: int = 5) -> Dict:
|
|
|
"""
|
|
|
Get a preview of the file data for validation purposes.
|
|
|
|
|
|
Returns:
|
|
|
Dict containing preview info and sample data
|
|
|
"""
|
|
|
processor = ManufacturingProcessor()
|
|
|
|
|
|
|
|
|
file_info = processor.get_file_info(file_path)
|
|
|
|
|
|
|
|
|
df = read_sheet(file_path, sheet_name)
|
|
|
sample_df = df.head(max_rows) if df is not None else pd.DataFrame()
|
|
|
|
|
|
|
|
|
validation = processor.validate_sheet_data(df) if df is not None else {"valid": False}
|
|
|
|
|
|
return {
|
|
|
"file_info": file_info,
|
|
|
"sample_data": sample_df,
|
|
|
"validation": validation,
|
|
|
"preview_rows": len(sample_df)
|
|
|
} |