Spaces:
Runtime error
Runtime error
| import copy | |
| import random | |
| from functools import partial | |
| from typing import Callable, Optional, Tuple, List | |
| def initialize_population(services: dict, users: dict, population_size: int) -> list: | |
| """ | |
| Initialize the population of assignment solutions for the genetic algorithm. | |
| Args: | |
| services (dict): A dictionary containing service constraints. | |
| users (dict): A dictionary containing user preferences and constraints. | |
| population_size (int): The number of assignment solutions to generate. | |
| Returns: | |
| list: A list of generated assignment solutions. | |
| """ | |
| population = [] | |
| # Generate population_size number of assignment solutions | |
| for _ in range(population_size): | |
| assignment_solution = {} | |
| for service in services.keys(): | |
| # Randomly assign users to each service, while considering user preferences and constraints | |
| assigned_users = [] | |
| for user, user_info in users.items(): | |
| # Check if user cannot be assigned to this service | |
| if service not in user_info["cannot_assign"]: | |
| # Assign user to service based on their preference | |
| if service in user_info["preferences"]: | |
| assigned_users.append(user) | |
| # Assign user to service with a small probability if not in their preferences | |
| elif random.random() < 0.1: | |
| assigned_users.append(user) | |
| # Shuffle the list of assigned users to create random assignments | |
| random.shuffle(assigned_users) | |
| assignment_solution[service] = assigned_users | |
| # Add the generated assignment solution to the population | |
| population.append(assignment_solution) | |
| return population | |
| def calculate_fitness(population: list, services: dict, users: dict, fitness_fn: Optional[Callable] = None) -> list: | |
| """ | |
| Calculate the fitness of each assignment solution in the population. | |
| Args: | |
| population (list): A list of assignment solutions. | |
| services (dict): A dictionary containing service constraints. | |
| users (dict): A dictionary containing user preferences and constraints. | |
| fitness_fn (Optional[Callable]): An optional custom fitness function. | |
| Returns: | |
| list: A list of fitness scores for each assignment solution in the population. | |
| """ | |
| if not fitness_fn: | |
| fitness_fn = default_fitness_function | |
| fitness_scores = [] | |
| # Calculate the fitness score for each assignment solution in the population | |
| for assignment_solution in population: | |
| fitness_score = fitness_fn(assignment_solution, services, users) | |
| fitness_scores.append(fitness_score) | |
| return fitness_scores | |
| def default_fitness_function(assignment_solution: dict, services: dict, users: dict) -> float: | |
| """ | |
| Calculate the fitness of an assignment solution based on the criteria described in the problem statement, | |
| including user preferences and cannot_assign constraints. | |
| Args: | |
| assignment_solution (dict): An assignment solution to evaluate. | |
| services (dict): A dictionary containing service constraints. | |
| users (dict): A dictionary containing user preferences and constraints. | |
| Returns: | |
| float: The fitness score of the given assignment solution. | |
| """ | |
| fitness = -100 | |
| for service, assigned_users in assignment_solution.items(): | |
| service_info = services[service] | |
| num_assigned_users = len(assigned_users) | |
| # Prefers for solutions that assign users near the recommended value | |
| # (positive or negative value is punishment, score 0 for the best fit) | |
| if service_info["min"] <= num_assigned_users <= service_info["max"]: | |
| fitness += abs(num_assigned_users - service_info["rec"]) * service_info["priority"] | |
| # Punish solutions that assign users below the minimum value | |
| # (positive value is punishment) | |
| elif num_assigned_users < service_info["min"]: | |
| fitness += (service_info["min"] - num_assigned_users) * service_info["priority"] | |
| # Punish solutions that assign users above the maximum value | |
| # (positive value is punishment) | |
| else: # num_assigned_users > service_info["max"]: | |
| fitness += (num_assigned_users - service_info["max"]) * service_info["priority"] | |
| # Punish solutions that assign users to their cannot_assign services | |
| # (positive value is punishment) | |
| for user in assigned_users: | |
| if service in users[user]["cannot_assign"]: | |
| fitness += 100 * service_info["priority"] | |
| # Bonus solutions that assign users to their preferred services | |
| # (negative value is bonus) | |
| for user, user_info in users.items(): | |
| if service in user_info["preferences"] and user in assigned_users: | |
| fitness -= 20 | |
| return fitness | |
| def least_changed_fitness_function(prev_solution: dict, solution: dict, services: dict, users: dict) -> float: | |
| """ | |
| A fitness function that favors solutions with the least changes from a previous solution. | |
| Args: | |
| solution (dict): The assignment solution to evaluate. | |
| services (dict): The input services dictionary. | |
| users (dict): The input users dictionary. | |
| prev_solution (dict): The previous assignment solution to compare against. | |
| Returns: | |
| float: A fitness score for the assignment solution. | |
| """ | |
| # Add the default fitness function score | |
| fitness = default_fitness_function(solution, services, users) | |
| # Bonus for users assigned to the same services as in the previous solution | |
| same_user_assignments = 0 | |
| for service, assigned_users in solution.items(): | |
| if service in prev_solution: | |
| prev_assigned_users = prev_solution[service] | |
| same_users = set(assigned_users).intersection(set(prev_assigned_users)) | |
| same_user_assignments += len(same_users) | |
| user_bonus = same_user_assignments * 100 # / sum(len(user_data["preferences"]) for user_data in users.values()) | |
| # (positive value is punishment) | |
| fitness -= user_bonus | |
| # Bonus for services having the same users assigned as in the previous solution | |
| same_service_assignments = 0 | |
| for service, assigned_users in solution.items(): | |
| if service in prev_solution: | |
| prev_assigned_users = prev_solution[service] | |
| same_users = set(assigned_users).intersection(set(prev_assigned_users)) | |
| same_service_assignments += len(same_users) | |
| service_bonus = same_service_assignments * 100 | |
| # (positive value is punishment) | |
| fitness -= service_bonus | |
| # Malus for user and service changes | |
| user_changes = 0 | |
| service_changes = 0 | |
| for service, service_data in services.items(): | |
| if service in prev_solution: | |
| prev_service_data = prev_solution[service] | |
| if service_data != prev_service_data: | |
| service_changes += 1 | |
| for user, user_data in users.items(): | |
| if user in prev_service_data and user not in service_data: | |
| user_changes += 1 | |
| user_malus = user_changes | |
| service_malus = service_changes | |
| # (positive value is punishment) | |
| fitness += (user_malus + service_malus) | |
| return fitness | |
| def selection(fitness_scores: list) -> Tuple[int, int]: | |
| """ | |
| Select two parent solutions from the population based on their fitness scores. | |
| Args: | |
| fitness_scores (list): A list of fitness scores for each assignment solution in the population. | |
| Returns: | |
| Tuple[int, int]: The indices of the two selected parent solutions in the population. | |
| """ | |
| # Calculate the total fitness of the population | |
| total_fitness = sum(fitness_scores) | |
| # Calculate the relative fitness of each solution | |
| relative_fitness = [f / total_fitness for f in fitness_scores] | |
| # Select the first parent using roulette wheel selection | |
| parent1_index = -1 | |
| r = random.random() | |
| accumulator = 0 | |
| for i, rf in enumerate(relative_fitness): | |
| accumulator += rf | |
| if accumulator >= r: | |
| parent1_index = i | |
| break | |
| # Select the second parent using roulette wheel selection, ensuring it's different from the first parent | |
| parent2_index = -1 | |
| while parent2_index == -1 or parent2_index == parent1_index: | |
| r = random.random() | |
| accumulator = 0 | |
| for i, rf in enumerate(relative_fitness): | |
| accumulator += rf | |
| if accumulator >= r: | |
| parent2_index = i | |
| break | |
| return parent1_index, parent2_index | |
| def crossover(parent1: dict, parent2: dict) -> dict: | |
| """ | |
| Combine two parent assignment solutions to create a child solution. | |
| Args: | |
| parent1 (dict): The first parent assignment solution. | |
| parent2 (dict): The second parent assignment solution. | |
| Returns: | |
| dict: The child assignment solution created by combining the parents. | |
| """ | |
| child_solution = {} | |
| # Iterate over the services in the parents | |
| for service in parent1.keys(): | |
| # Create two sets of users assigned to the current service in parent1 and parent2 | |
| assigned_users_parent1 = set(parent1[service]) | |
| assigned_users_parent2 = set(parent2[service]) | |
| # Perform set union to combine users assigned in both parents | |
| combined_assigned_users = assigned_users_parent1 | assigned_users_parent2 | |
| # Randomly assign each user from the combined set to the child solution | |
| child_assigned_users = [] | |
| for user in combined_assigned_users: | |
| if random.random() < 0.5: | |
| child_assigned_users.append(user) | |
| child_solution[service] = child_assigned_users | |
| return child_solution | |
| def mutation(solution: dict, users: dict, mutation_rate: float = 0.01) -> dict: | |
| """ | |
| Mutate an assignment solution by randomly reassigning users to services. | |
| Args: | |
| solution (dict): The assignment solution to mutate. | |
| users (dict): A dictionary containing user preferences and constraints. | |
| mutation_rate (float): The probability of mutation for each user in the solution (default: 0.01). | |
| Returns: | |
| dict: The mutated assignment solution. | |
| """ | |
| mutated_solution = copy.deepcopy(solution) | |
| # Iterate over the services in the solution | |
| for service, assigned_users in mutated_solution.items(): | |
| for user in assigned_users: | |
| # Check if the user should be mutated based on the mutation rate | |
| if random.random() < mutation_rate: | |
| # Remove the user from the current service | |
| assigned_users.remove(user) | |
| # Find a new service for the user while considering their cannot_assign constraints | |
| new_service = service | |
| while new_service == service or new_service in users[user]["cannot_assign"]: | |
| new_service = random.choice(list(mutated_solution.keys())) | |
| # Assign the user to the new service | |
| mutated_solution[new_service].append(user) | |
| return mutated_solution | |
| def report_generation(generation: int, fitness_scores: list, best_solution: dict, services: dict, users: dict) -> None: | |
| """ | |
| Print a report of the genetic algorithm's progress for the current generation. | |
| Args: | |
| generation (int): The current generation number. | |
| fitness_scores (list): The fitness scores for the current population. | |
| best_solution (dict): The best assignment solution found so far. | |
| services (dict): The input services dictionary. | |
| users (dict): The input users dictionary. | |
| """ | |
| best_fitness = min(fitness_scores) | |
| worst_fitness = max(fitness_scores) | |
| avg_fitness = sum(fitness_scores) / len(fitness_scores) | |
| generation_errors = polish_errors(calculate_errors(best_solution, services, users)) | |
| print(f"Generation {generation}:") | |
| print(f" Best fitness: {best_fitness}") | |
| print(f" Worst fitness: {worst_fitness}") | |
| print(f" Average fitness: {avg_fitness}") | |
| print(f" Best solution so far: {best_solution}") | |
| print(f" Errors so far: {generation_errors}") | |
| def calculate_errors(solution: dict, services: dict, users: dict) -> dict: | |
| """ | |
| Calculate the errors in the assignment solution based on the user and service constraints. | |
| Args: | |
| solution (dict): The assignment solution to analyze. | |
| services (dict): The input services dictionary. | |
| users (dict): The input users dictionary. | |
| Returns: | |
| dict: A dictionary containing the errors for each user and service in the assignment solution. | |
| """ | |
| errors = {"users": {}, "services": {}} | |
| # Analyze user errors | |
| for user, user_data in users.items(): | |
| errors["users"][user] = {"unmet_max_assignments": False, "unmet_preference": [], "unmet_cannot_assign": []} | |
| user_assignments = [service for service, assigned_users in solution.items() if user in assigned_users] | |
| if len(user_assignments) > user_data["max_assignments"]: | |
| errors["users"][user]["unmet_max_assignments"] = True | |
| errors["users"][user]["effective_assignments"] = len(user_assignments) | |
| for preferred_service in user_data["preferences"]: | |
| if preferred_service not in user_assignments: | |
| errors["users"][user]["unmet_preference"].append(preferred_service) | |
| for cannot_assign_service in user_data["cannot_assign"]: | |
| if cannot_assign_service in user_assignments: | |
| errors["users"][user]["unmet_cannot_assign"].append(cannot_assign_service) | |
| # Analyze service errors | |
| for service, service_data in services.items(): | |
| errors["services"][service] = {"unmet_constraint": None, "extra_users": []} | |
| assigned_users = solution[service] | |
| num_assigned_users = len(assigned_users) | |
| if num_assigned_users < service_data["min"]: | |
| errors["services"][service]["unmet_constraint"] = "min" | |
| elif num_assigned_users > service_data["rec"]: | |
| errors["services"][service]["unmet_constraint"] = "rec" | |
| elif num_assigned_users > service_data["max"]: | |
| errors["services"][service]["unmet_constraint"] = "max" | |
| extra_users = assigned_users[service_data["max"]:] | |
| errors["services"][service]["extra_users"] = extra_users | |
| return errors | |
| def polish_errors(errors: dict) -> dict: | |
| """ | |
| Remove users and services without unmet constraints from the errors object. | |
| Args: | |
| errors (dict): The errors object to polish. | |
| Returns: | |
| dict: A polished errors object without users and services with no unmet constraints. | |
| """ | |
| polished_errors = {"users": {}, "services": {}} | |
| for user, user_errors in errors["users"].items(): | |
| polished_user_errors = {} | |
| if user_errors["unmet_max_assignments"]: | |
| polished_user_errors["unmet_max_assignments"] = True | |
| for key, value in user_errors.items(): | |
| if key not in ["unmet_max_assignments"] and value: | |
| polished_user_errors[key] = value | |
| if polished_user_errors: | |
| polished_errors["users"][user] = polished_user_errors | |
| for service, service_errors in errors["services"].items(): | |
| polished_service_errors = {} | |
| for key, value in service_errors.items(): | |
| if value: | |
| polished_service_errors[key] = value | |
| if polished_service_errors: | |
| polished_errors["services"][service] = polished_service_errors | |
| return polished_errors | |
| def calculate_diff(solution1: dict, solution2: dict) -> dict: | |
| """ | |
| Calculate the differences between two solution JSON objects and return the differences categorized into | |
| "added" and "removed" attributes for each service. | |
| Args: | |
| solution1 (dict): The first solution JSON object. | |
| solution2 (dict): The second solution JSON object. | |
| Returns: | |
| dict: A dictionary with the differences between the two solutions, categorized into "added" and | |
| "removed" attributes for each service. | |
| """ | |
| diff = {} | |
| all_services = set(solution1.keys()).union(set(solution2.keys())) | |
| for service in all_services: | |
| service_diff = { | |
| "added": [], | |
| "removed": [] | |
| } | |
| if service not in solution1: | |
| service_diff["added"] = solution2[service] | |
| elif service not in solution2: | |
| service_diff["removed"] = solution1[service] | |
| else: | |
| added_users = set(solution2[service]) - set(solution1[service]) | |
| removed_users = set(solution1[service]) - set(solution2[service]) | |
| if added_users: | |
| service_diff["added"] = list(added_users) | |
| if removed_users: | |
| service_diff["removed"] = list(removed_users) | |
| if service_diff["added"] or service_diff["removed"]: | |
| diff[service] = service_diff | |
| return diff | |
| def genetic_algorithm(services: dict, users: dict, population_size: int = 100, num_generations: int = 100, | |
| mutation_rate: float = 0.01, fitness_fn: Optional[Callable] = None, | |
| initial_population: Optional[List[dict]] = None) -> dict: | |
| """ | |
| Run the genetic algorithm to find an optimal assignment solution based on user preferences and constraints. | |
| Args: | |
| services (dict): The input services dictionary. | |
| users (dict): The input users dictionary. | |
| population_size (int): The size of the population for each generation (default: 100). | |
| num_generations (int): The number of generations for the genetic algorithm to run (default: 100). | |
| mutation_rate (float): The probability of mutation for each individual in the population (default: 0.01). | |
| fitness_fn (Callable, optional): An optional custom fitness function. | |
| initial_population: An optional previous solution to be used as starting point | |
| Returns: | |
| dict: The best assignment solution found by the genetic algorithm. | |
| """ | |
| # Initialize the population | |
| population = initial_population or initialize_population(services, users, population_size) | |
| # If no custom fitness function is provided, use the default fitness function | |
| if fitness_fn is None: | |
| fitness_fn = default_fitness_function | |
| # Calculate the initial fitness scores for the population | |
| fitness_scores = calculate_fitness(population, services, users, fitness_fn) | |
| best_solution = None | |
| best_fitness = float('inf') | |
| # Main loop of the genetic algorithm | |
| for generation in range(num_generations): | |
| # Select two parent solutions based on their fitness scores | |
| parent1_index, parent2_index = selection(fitness_scores) | |
| # Create a child solution by combining the parents using crossover | |
| child_solution = crossover(population[parent1_index], population[parent2_index]) | |
| # Mutate the child solution | |
| mutated_child_solution = mutation(child_solution, users, mutation_rate) | |
| # Calculate the fitness of the child solution | |
| child_fitness = fitness_fn(mutated_child_solution, services, users) | |
| # Replace the least-fit solution in the population with the child solution | |
| worst_fitness_index = fitness_scores.index(max(fitness_scores)) | |
| population[worst_fitness_index] = mutated_child_solution | |
| fitness_scores[worst_fitness_index] = child_fitness | |
| # Update the best solution found so far | |
| if child_fitness < best_fitness: | |
| best_solution = mutated_child_solution | |
| best_fitness = child_fitness | |
| # Print the progress of the algorithm | |
| report_generation(generation, fitness_scores, best_solution, services, users) | |
| return best_solution | |
| def update_genetic_algorithm(prev_solution: dict, updated_services: dict, updated_users: dict, | |
| population_size: int = 100, num_generations: int = 100, mutation_rate: float = 0.01, | |
| fitness_fn: Optional[Callable] = None) -> dict: | |
| """ | |
| Update the previous assignment solution with updated services and users using a genetic algorithm. | |
| Args: | |
| prev_solution (dict): The previous assignment solution. | |
| updated_services (dict): The updated services dictionary. | |
| updated_users (dict): The updated users dictionary. | |
| population_size (int): The size of the population for each generation in the genetic algorithm (default: 100). | |
| num_generations (int): The number of generations for the genetic algorithm to run (default: 100). | |
| mutation_rate (float): The probability of mutation for each individual in the population (default: 0.01). | |
| fitness_fn (Optional[Callable]): An optional fitness function to use in the genetic algorithm. | |
| Returns: | |
| dict: The updated assignment solution. | |
| """ | |
| # Create the initial population with the given previous solution | |
| population = initialize_population(updated_services, updated_users, population_size - 1) | |
| population.append(prev_solution) | |
| if fitness_fn is None: | |
| fitness_fn = partial(least_changed_fitness_function, prev_solution) | |
| # Run the genetic algorithm using the initial population | |
| updated_solution = genetic_algorithm(updated_services, updated_users, population_size, num_generations, | |
| mutation_rate, | |
| fitness_fn, | |
| population) | |
| return updated_solution | |