Spaces:
Running
Running
| """ | |
| Chargement et indexation des données CEREMA pour le serveur MCP. | |
| Optimisations mémoire : | |
| - DV3F : seules les ~60 colonnes utiles sont chargées (sur 619), avec types optimisés | |
| - Cartofriches : chargement GeoPandas standard (112 Mo en RAM) | |
| Les données sont chargées une seule fois au démarrage et indexées pour des requêtes rapides. | |
| Les friches sont pré-agrégées à chaque maille territoriale (commune, EPCI, département, | |
| région, national) au chargement pour des réponses instantanées. | |
| """ | |
| import os | |
| import pandas as pd | |
| import geopandas as gpd | |
| import numpy as np | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| # --- Répertoire des données --- | |
| DATA_DIR = os.path.join(os.path.dirname(__file__), "data") | |
| # --- Colonnes DV3F à charger --- | |
| # Base | |
| BASE_COLS = ["annee", "echelle", "code", "libelle"] | |
| # Transactions et prix globaux | |
| GLOBAL_COLS = [ | |
| "nbtrans_cod1", # Total mutations | |
| "nbtrans_cod11", # Biens bâtis | |
| "nbtrans_cod111", # Maisons | |
| "nbtrans_cod121", # Appartements | |
| "nbtrans_cod2", # Non bâti (terrains) | |
| "valeurfonc_sum_cod1", # Valeur foncière totale | |
| "valeurfonc_sum_cod111", # Valeur foncière maisons | |
| "valeurfonc_sum_cod121", # Valeur foncière appartements | |
| ] | |
| # Maisons - statistiques détaillées | |
| MAISON_COLS = [ | |
| "valeurfonc_median_cod111", "valeurfonc_q25_cod111", "valeurfonc_q75_cod111", | |
| "pxm2_median_cod111", "pxm2_q25_cod111", "pxm2_q75_cod111", | |
| "sbati_median_cod111", "sbati_sum_cod111", | |
| ] | |
| # Appartements - statistiques détaillées | |
| APPART_COLS = [ | |
| "valeurfonc_median_cod121", "valeurfonc_q25_cod121", "valeurfonc_q75_cod121", | |
| "pxm2_median_cod121", "pxm2_q25_cod121", "pxm2_q75_cod121", | |
| "sbati_median_cod121", "sbati_sum_cod121", | |
| ] | |
| # Maisons par période de construction | |
| MAISON_PERIODE_COLS = [] | |
| for p in ["mp1", "mp2", "mp3", "mp4", "mp5", "mpx"]: | |
| MAISON_PERIODE_COLS.extend([ | |
| f"nbtrans_{p}", f"valeurfonc_median_{p}", f"pxm2_median_{p}", f"sbati_median_{p}" | |
| ]) | |
| # Appartements par période de construction | |
| APPART_PERIODE_COLS = [] | |
| for p in ["ap1", "ap2", "ap3", "ap4", "ap5", "apx"]: | |
| APPART_PERIODE_COLS.extend([ | |
| f"nbtrans_{p}", f"valeurfonc_median_{p}", f"pxm2_median_{p}", f"sbati_median_{p}" | |
| ]) | |
| ALL_DV3F_COLS = BASE_COLS + GLOBAL_COLS + MAISON_COLS + APPART_COLS + MAISON_PERIODE_COLS + APPART_PERIODE_COLS | |
| # --- Nomenclature des périodes de construction --- | |
| PERIODES_CONSTRUCTION = { | |
| "mp1": "avant 1914", "mp2": "1914–1947", "mp3": "1948–1969", | |
| "mp4": "1970–1989", "mp5": "1990 et après", "mpx": "période inconnue", | |
| "ap1": "avant 1914", "ap2": "1914–1947", "ap3": "1948–1969", | |
| "ap4": "1970–1989", "ap5": "1990 et après", "apx": "période inconnue", | |
| } | |
| # --- Noms des régions --- | |
| REG_NAMES = { | |
| "01": "Guadeloupe", "02": "Martinique", "03": "Guyane", | |
| "04": "La Réunion", "06": "Mayotte", | |
| "11": "Île-de-France", "24": "Centre-Val de Loire", | |
| "27": "Bourgogne-Franche-Comté", "28": "Normandie", | |
| "32": "Hauts-de-France", "44": "Grand Est", | |
| "52": "Pays de la Loire", "53": "Bretagne", | |
| "75": "Nouvelle-Aquitaine", "76": "Occitanie", | |
| "84": "Auvergne-Rhône-Alpes", | |
| "93": "Provence-Alpes-Côte d'Azur", "94": "Corse", | |
| } | |
| # --- Échelles territoriales --- | |
| ECHELLES = ["commune", "epci", "departement", "region", "national"] | |
| # --- Variables globales pour les données chargées --- | |
| _dv3f: pd.DataFrame | None = None | |
| _friches: gpd.GeoDataFrame | None = None | |
| _mapping: pd.DataFrame | None = None | |
| _friches_agg: dict[str, pd.DataFrame] = {} # Agrégations pré-calculées par échelle | |
| def load_mapping() -> pd.DataFrame: | |
| """Charge la table de correspondance commune → EPCI → département → région (INSEE).""" | |
| global _mapping | |
| if _mapping is not None: | |
| return _mapping | |
| filepath = os.path.join(DATA_DIR, "mapping_communes.csv") | |
| logger.info(f"Chargement du mapping territorial depuis {filepath}...") | |
| _mapping = pd.read_csv(filepath, dtype=str) | |
| _mapping["reg_nom"] = _mapping["reg_nom"].fillna("") | |
| _mapping["epci_nom"] = _mapping["epci_nom"].fillna("") | |
| logger.info(f"Mapping chargé : {len(_mapping)} communes, " | |
| f"{_mapping['epci'].nunique()} EPCI, " | |
| f"{_mapping['dep'].nunique()} départements, " | |
| f"{_mapping['reg'].nunique()} régions") | |
| return _mapping | |
| def load_dv3f() -> pd.DataFrame: | |
| """Charge les données DV3F avec optimisation mémoire.""" | |
| global _dv3f | |
| if _dv3f is not None: | |
| return _dv3f | |
| filepath = os.path.join(DATA_DIR, "prix_volumes.csv") | |
| logger.info(f"Chargement de DV3F depuis {filepath}...") | |
| _dv3f = pd.read_csv( | |
| filepath, | |
| sep=";", | |
| usecols=ALL_DV3F_COLS, | |
| dtype={ | |
| "annee": "int16", | |
| "echelle": "category", | |
| "code": "str", | |
| "libelle": "str", | |
| }, | |
| low_memory=False, | |
| ) | |
| # Convertir les colonnes numériques en float32 pour économiser la mémoire | |
| numeric_cols = [c for c in _dv3f.columns if c not in BASE_COLS] | |
| for col in numeric_cols: | |
| _dv3f[col] = pd.to_numeric(_dv3f[col], errors="coerce").astype("float32") | |
| # Créer un index pour des lookups rapides | |
| _dv3f.set_index(["echelle", "code", "annee"], inplace=True) | |
| _dv3f.sort_index(inplace=True) | |
| mem_mb = _dv3f.memory_usage(deep=True).sum() / 1e6 | |
| logger.info(f"DV3F chargé : {len(_dv3f)} lignes, {mem_mb:.0f} Mo en mémoire") | |
| return _dv3f | |
| def load_friches() -> gpd.GeoDataFrame: | |
| """Charge les données Cartofriches et enrichit avec le mapping territorial.""" | |
| global _friches | |
| if _friches is not None: | |
| return _friches | |
| filepath = os.path.join(DATA_DIR, "cartofriches.geojson") | |
| logger.info(f"Chargement de Cartofriches depuis {filepath}...") | |
| _friches = gpd.read_file(filepath) | |
| # Nettoyer les colonnes utiles | |
| _friches["site_surface_num"] = pd.to_numeric( | |
| _friches["site_surface"], errors="coerce" | |
| ) | |
| _friches["site_surface_ha"] = _friches["site_surface_num"] / 10000 | |
| # Enrichir avec le mapping territorial (EPCI, région) | |
| mapping = load_mapping() | |
| _friches = _friches.merge( | |
| mapping[["comm_insee", "epci", "epci_nom", "reg", "reg_nom"]], | |
| on="comm_insee", | |
| how="left", | |
| suffixes=("", "_map"), | |
| ) | |
| mem_mb = _friches.memory_usage(deep=True).sum() / 1e6 | |
| logger.info(f"Cartofriches chargé et enrichi : {len(_friches)} friches, {mem_mb:.0f} Mo en mémoire") | |
| return _friches | |
| def _aggregate_friches_for_group(group: pd.DataFrame) -> dict: | |
| """Calcule les statistiques agrégées pour un groupe de friches.""" | |
| surfaces = group["site_surface_num"].dropna() | |
| statuts = group["site_statut"].value_counts() | |
| types = group["site_type"].value_counts() | |
| # Pollution avérée ou supposée | |
| pollution = group["sol_pollution_existe"].fillna("inconnu") | |
| nb_polluees = pollution.str.contains("avérée|supposée", case=False, na=False).sum() | |
| # En zone U | |
| nb_zone_u = (group["urba_zone_type"] == "U").sum() | |
| # Mobilisables (sans projet + potentielles) | |
| mobilisables = group[group["site_statut"].isin(["friche sans projet", "friche potentielle"])] | |
| surface_mobilisable = mobilisables["site_surface_num"].sum() / 10000 | |
| return { | |
| "nb_friches": len(group), | |
| "surface_totale_ha": surfaces.sum() / 10000, | |
| "surface_mediane_ha": surfaces.median() / 10000 if len(surfaces) > 0 else 0, | |
| "surface_moyenne_ha": surfaces.mean() / 10000 if len(surfaces) > 0 else 0, | |
| "surface_min_ha": surfaces.min() / 10000 if len(surfaces) > 0 else 0, | |
| "surface_max_ha": surfaces.max() / 10000 if len(surfaces) > 0 else 0, | |
| "nb_sans_projet": int(statuts.get("friche sans projet", 0)), | |
| "nb_avec_projet": int(statuts.get("friche avec projet", 0)), | |
| "nb_potentielle": int(statuts.get("friche potentielle", 0)), | |
| "nb_reconvertie": int(statuts.get("friche reconvertie", 0)), | |
| "nb_mobilisables": len(mobilisables), | |
| "surface_mobilisable_ha": surface_mobilisable, | |
| "nb_polluees": int(nb_polluees), | |
| "nb_zone_u": int(nb_zone_u), | |
| "pct_zone_u": round(100 * nb_zone_u / len(group), 1) if len(group) > 0 else 0, | |
| "top_types": types.head(5).to_dict(), | |
| "nb_communes": group["comm_insee"].nunique() if "comm_insee" in group.columns else 0, | |
| } | |
| def precompute_friches_aggregations(): | |
| """Pré-calcule les agrégations de friches à chaque maille territoriale.""" | |
| global _friches_agg | |
| gdf = load_friches() | |
| logger.info("Pré-agrégation des friches par commune...") | |
| commune_groups = gdf.groupby("comm_insee") | |
| agg_commune = {} | |
| for code, group in commune_groups: | |
| stats = _aggregate_friches_for_group(group) | |
| stats["libelle"] = group.iloc[0]["comm_nom"] if len(group) > 0 else "" | |
| agg_commune[code] = stats | |
| _friches_agg["commune"] = pd.DataFrame(agg_commune).T | |
| logger.info(f" Communes : {len(agg_commune)} territoires") | |
| logger.info("Pré-agrégation des friches par EPCI...") | |
| epci_groups = gdf[gdf["epci"].notna() & (gdf["epci"] != "ZZZZZZZZZ")].groupby("epci") | |
| agg_epci = {} | |
| for code, group in epci_groups: | |
| stats = _aggregate_friches_for_group(group) | |
| stats["libelle"] = group.iloc[0].get("epci_nom", "") or "" | |
| agg_epci[code] = stats | |
| _friches_agg["epci"] = pd.DataFrame(agg_epci).T | |
| logger.info(f" EPCI : {len(agg_epci)} territoires") | |
| logger.info("Pré-agrégation des friches par département...") | |
| dep_groups = gdf[gdf["dep"].notna()].groupby("dep") | |
| agg_dep = {} | |
| for code, group in dep_groups: | |
| stats = _aggregate_friches_for_group(group) | |
| # Trouver le nom du département dans DV3F | |
| stats["libelle"] = "" | |
| agg_dep[code] = stats | |
| _friches_agg["departement"] = pd.DataFrame(agg_dep).T | |
| logger.info(f" Départements : {len(agg_dep)} territoires") | |
| logger.info("Pré-agrégation des friches par région...") | |
| reg_groups = gdf[gdf["reg"].notna()].groupby("reg") | |
| agg_reg = {} | |
| for code, group in reg_groups: | |
| stats = _aggregate_friches_for_group(group) | |
| stats["libelle"] = REG_NAMES.get(code, "") | |
| agg_reg[code] = stats | |
| _friches_agg["region"] = pd.DataFrame(agg_reg).T | |
| logger.info(f" Régions : {len(agg_reg)} territoires") | |
| logger.info("Pré-agrégation des friches au niveau national...") | |
| national_stats = _aggregate_friches_for_group(gdf) | |
| national_stats["libelle"] = "France entière" | |
| _friches_agg["national"] = pd.DataFrame({"france": national_stats}).T | |
| logger.info(f" National : 1 territoire") | |
| logger.info("Pré-agrégation terminée.") | |
| def get_friches_agg(echelle: str, code: str = "") -> dict | None: | |
| """Récupère les statistiques agrégées de friches pour un territoire donné. | |
| Args: | |
| echelle: "commune", "epci", "departement", "region" ou "national" | |
| code: Code du territoire (INSEE, SIREN EPCI, etc.). Ignoré pour "national". | |
| Returns: | |
| Dict avec les statistiques agrégées, ou None si non trouvé. | |
| """ | |
| if echelle not in _friches_agg: | |
| return None | |
| if echelle == "national": | |
| return _friches_agg["national"].iloc[0].to_dict() | |
| if code in _friches_agg[echelle].index: | |
| return _friches_agg[echelle].loc[code].to_dict() | |
| return None | |
| def get_epci_for_commune(code_commune: str) -> tuple[str, str] | None: | |
| """Retourne (code_epci, nom_epci) pour une commune donnée.""" | |
| mapping = load_mapping() | |
| row = mapping[mapping["comm_insee"] == code_commune] | |
| if len(row) == 0: | |
| return None | |
| epci_code = row.iloc[0]["epci"] | |
| epci_nom = row.iloc[0]["epci_nom"] | |
| if epci_code == "ZZZZZZZZZ" or pd.isna(epci_code): | |
| return None | |
| return (epci_code, epci_nom if pd.notna(epci_nom) else "") | |
| def get_region_for_commune(code_commune: str) -> tuple[str, str] | None: | |
| """Retourne (code_region, nom_region) pour une commune donnée.""" | |
| mapping = load_mapping() | |
| row = mapping[mapping["comm_insee"] == code_commune] | |
| if len(row) == 0: | |
| return None | |
| reg = row.iloc[0]["reg"] | |
| reg_nom = row.iloc[0]["reg_nom"] | |
| return (reg, reg_nom if pd.notna(reg_nom) else REG_NAMES.get(reg, "")) | |
| def get_region_for_departement(code_dep: str) -> tuple[str, str] | None: | |
| """Retourne (code_region, nom_region) pour un département donné.""" | |
| mapping = load_mapping() | |
| dep_rows = mapping[mapping["dep"] == code_dep] | |
| if len(dep_rows) == 0: | |
| return None | |
| reg = dep_rows.iloc[0]["reg"] | |
| reg_nom = dep_rows.iloc[0]["reg_nom"] | |
| return (reg, reg_nom if pd.notna(reg_nom) else REG_NAMES.get(reg, "")) | |
| def get_departement_from_commune(code_commune: str) -> str: | |
| """Extrait le code département à partir d'un code INSEE de commune.""" | |
| if code_commune.startswith("97") or code_commune.startswith("98"): | |
| return code_commune[:3] # DOM-TOM | |
| return code_commune[:2] | |
| def get_all_echelles_for_commune(code_commune: str) -> dict: | |
| """Retourne tous les codes territoriaux pour une commune. | |
| Returns: | |
| Dict avec commune, epci, departement, region et leurs libellés. | |
| """ | |
| result = {"commune": code_commune, "commune_nom": ""} | |
| mapping = load_mapping() | |
| row = mapping[mapping["comm_insee"] == code_commune] | |
| if len(row) > 0: | |
| r = row.iloc[0] | |
| result["commune_nom"] = r.get("comm_nom", "") | |
| result["epci"] = r.get("epci", "") | |
| result["epci_nom"] = r.get("epci_nom", "") | |
| result["departement"] = r.get("dep", "") | |
| result["region"] = r.get("reg", "") | |
| result["region_nom"] = r.get("reg_nom", REG_NAMES.get(r.get("reg", ""), "")) | |
| else: | |
| dep = get_departement_from_commune(code_commune) | |
| result["departement"] = dep | |
| result["epci"] = "" | |
| result["epci_nom"] = "" | |
| reg_info = get_region_for_departement(dep) | |
| if reg_info: | |
| result["region"] = reg_info[0] | |
| result["region_nom"] = reg_info[1] | |
| else: | |
| result["region"] = "" | |
| result["region_nom"] = "" | |
| return result | |
| def init_all(): | |
| """Charge toutes les données au démarrage et pré-calcule les agrégations.""" | |
| logger.info("Initialisation des données...") | |
| load_mapping() | |
| load_dv3f() | |
| load_friches() | |
| precompute_friches_aggregations() | |
| logger.info("Toutes les données sont chargées et pré-agrégées.") | |