# app.py # CodexFlow / GVBDMS v3 • January 11, 2026 # Persistent provenance ledger + first gentle IRE influence (coherence check + Ω smoothing) import gradio as gr import requests import time import json import hashlib import sqlite3 import numpy as np from typing import Dict, Any, List, Optional, Tuple from collections import deque # ────────────────────────────────────────────────────────────── # CONFIGURATION # ────────────────────────────────────────────────────────────── DB_PATH = "codexflow_gvbdms_v3.db" WORLD_BANK_BASE = "https://api.worldbank.org/v2" DEFAULT_YEAR = "2023" INDICATORS = { "GDP": "NY.GDP.MKTP.CD", "INFLATION": "FP.CPI.TOTL.ZG", "POPULATION": "SP.POP.TOTL", } # Very simple toy intent anchor for first coherence check INTENT_ANCHOR = {"stability": 0.92, "transparency": 0.88} COHERENCE_THRESHOLD = 0.65 # records below this are refused OMEGA_MEMORY = deque(maxlen=8) # very light causal smoothing buffer # ────────────────────────────────────────────────────────────── # DATABASE LAYER # ────────────────────────────────────────────────────────────── def init_db(): with sqlite3.connect(DB_PATH) as con: cur = con.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS records ( id INTEGER PRIMARY KEY AUTOINCREMENT, ts REAL NOT NULL, hash TEXT UNIQUE NOT NULL, prev_hash TEXT NOT NULL, schema TEXT NOT NULL, country TEXT NOT NULL, source TEXT NOT NULL, reliability REAL NOT NULL, latency_s REAL NOT NULL, payload_json TEXT NOT NULL, metadata_json TEXT NOT NULL, coherence_score REAL, bytes INTEGER NOT NULL, entropy_proxy REAL NOT NULL ) """) cur.execute("CREATE INDEX IF NOT EXISTS idx_schema_country ON records(schema, country)") cur.execute("CREATE INDEX IF NOT EXISTS idx_ts ON records(ts)") print("Database initialized.") init_db() def get_tip_hash() -> str: with sqlite3.connect(DB_PATH) as con: cur = con.cursor() cur.execute("SELECT hash FROM records ORDER BY id DESC LIMIT 1") row = cur.fetchone() return row[0] if row else "GENESIS" def insert_record(rec: Dict) -> bool: try: with sqlite3.connect(DB_PATH) as con: cur = con.cursor() cur.execute(""" INSERT INTO records ( ts, hash, prev_hash, schema, country, source, reliability, latency_s, payload_json, metadata_json, coherence_score, bytes, entropy_proxy ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( rec["ts"], rec["hash"], rec["prev_hash"], rec["schema"], rec["country"], rec["source"], rec["reliability"], rec["latency_s"], rec["payload_json"], rec["metadata_json"], rec.get("coherence_score"), rec["bytes"], rec["entropy_proxy"] )) return True except sqlite3.IntegrityError: return False # duplicate hash # ────────────────────────────────────────────────────────────── # UTILITIES # ────────────────────────────────────────────────────────────── def canonical_bytes(obj: Any) -> bytes: return json.dumps(obj, sort_keys=True, separators=(",", ":")).encode('utf-8') def compute_bit_stats(payload: Dict) -> Tuple[int, float]: b = canonical_bytes(payload) n = len(b) return n, round(len(set(b)) / max(n, 1), 6) def hash_chain(payload: Dict, prev: str) -> str: return hashlib.sha256(canonical_bytes({"payload": payload, "prev": prev})).hexdigest() def toy_coherence_score(values: Dict[str, float]) -> float: """Extremely simple first coherence proxy against intent anchor""" scores = [] for k, target in INTENT_ANCHOR.items(): v = values.get(k) if v is not None: diff = abs(v - target) / max(abs(target), 0.01) scores.append(max(0.0, 1.0 - min(1.0, diff))) return round(np.mean(scores) if scores else 0.5, 4) def omega_smooth(key: str, value: float) -> float: if not OMEGA_MEMORY: OMEGA_MEMORY.append({key: value}) return value prev = OMEGA_MEMORY[-1].get(key, value) smoothed = 0.25 * value + 0.75 * prev OMEGA_MEMORY.append({key: smoothed}) return round(smoothed, 6) # ────────────────────────────────────────────────────────────── # DATA INGEST & SIGNAL GENERATION # ────────────────────────────────────────────────────────────── def fetch_macro(country: str, year: str) -> Dict: result = {"type": "macro", "country": country, "year": year} t0 = time.time() for name, code in INDICATORS.items(): try: r = requests.get( f"{WORLD_BANK_BASE}/country/{country}/indicator/{code}?format=json&date={year}&per_page=1", timeout=7 ).json() result[name.lower()] = r[1][0]["value"] if len(r) > 1 and r[1] else None except: result[name.lower()] = None latency = time.time() - t0 return result, latency def generate_signals(commodity: str, anchor: float, macro: Dict, lag_days: int) -> List[Tuple[Dict, str]]: gdp_scale = macro.get("gdp", 1e14) / 1e14 if macro.get("gdp") else 1.0 supply = anchor * gdp_scale price = omega_smooth("price_index", round((supply / 11.0) * gdp_scale, 6)) econ = { "type": "commodity", "commodity": commodity, "supply": round(supply, 4), "demand": round(supply * 0.95, 4), "price_index": price, "flow": round(supply * 0.95 * price, 4) } friction = abs(econ["supply"] - econ["demand"]) / max(econ["supply"], 1e-9) logi = {"type": "logistics", "friction": round(friction, 6)} ener = {"type": "energy", "cost_index": round(price * 0.42, 4), "dependency": "high" if commodity.lower() in ["oil","gas"] else "moderate"} sent = {"type": "sentiment", "confidence": omega_smooth("confidence", np.random.uniform(0.62, 0.91))} feat = { "type": "features", "lag_days": lag_days, "projected_price": round(price * (1 + (1 - sent["confidence"]) * 0.07), 6), "volatility": round(0.012 * lag_days, 6) } return [(econ, "commodity.v1"), (logi, "logistics.v1"), (ener, "energy.v1"), (sent, "sentiment.v1"), (feat, "features.v1")] # ────────────────────────────────────────────────────────────── # CORE TICK FUNCTION (with coherence refusal) # ────────────────────────────────────────────────────────────── def run_tick(commodity: str, anchor: float, country: str, lag_days: int, use_live: bool, year: str): macro, latency = fetch_macro(country, year) if use_live else ( {"type": "macro", "country": country, "year": year, "gdp": None, "inflation": None, "population": None}, 0.0 ) macro_coh = toy_coherence_score({ "stability": 1.0 - abs(macro.get("inflation", 0) or 0) / 10, }) if macro_coh < COHERENCE_THRESHOLD: return {"status": "refused", "reason": f"Macro coherence too low: {macro_coh:.3f}", "tip": get_tip_hash()}, None macro_rec = { "ts": time.time(), "hash": hash_chain(macro, get_tip_hash()), "prev_hash": get_tip_hash(), "schema": "macro.v1", "country": country, "source": "world_bank" if use_live else "synthetic", "reliability": 0.88 if use_live else 0.65, "latency_s": round(latency, 4), "payload_json": json.dumps(macro, sort_keys=True), "metadata_json": json.dumps({"note": "anchor ingest"}, sort_keys=True), "coherence_score": macro_coh, "bytes": len(canonical_bytes(macro)), "entropy_proxy": compute_bit_stats(macro)[1] } if not insert_record(macro_rec): return {"status": "error", "reason": "duplicate hash"}, None signals = generate_signals(commodity, anchor, macro, lag_days) added = 0 for payload, schema in signals: coh = toy_coherence_score({"stability": 1.0 - payload.get("friction", 0)}) if coh < COHERENCE_THRESHOLD: continue # refuse low-coherence derived signal rec = { "ts": time.time(), "hash": hash_chain(payload, get_tip_hash()), "prev_hash": get_tip_hash(), "schema": schema, "country": country, "source": "derived", "reliability": 0.92, "latency_s": 0.0, "payload_json": json.dumps(payload, sort_keys=True), "metadata_json": json.dumps({"linked_macro": macro_rec["hash"]}, sort_keys=True), "coherence_score": coh, "bytes": len(canonical_bytes(payload)), "entropy_proxy": compute_bit_stats(payload)[1] } if insert_record(rec): added += 1 tip = get_tip_hash() return { "status": "ok", "added": added, "tip_hash": tip, "macro_coherence": macro_coh }, tip # ────────────────────────────────────────────────────────────── # QUERY & CHAT # ────────────────────────────────────────────────────────────── def query_records(limit: int = 50, schema: str = "ANY", country: str = "ANY") -> List[Dict]: limit = max(1, min(int(limit), 300)) with sqlite3.connect(DB_PATH) as con: cur = con.cursor() where = [] params = [] if schema != "ANY": where.append("schema = ?") params.append(schema) if country != "ANY": where.append("country = ?") params.append(country) sql = "SELECT ts, hash, prev_hash, schema, country, coherence_score FROM records" if where: sql += " WHERE " + " AND ".join(where) sql += " ORDER BY id DESC LIMIT ?" params.append(limit) cur.execute(sql, params) rows = cur.fetchall() return [{"ts": r[0], "hash": r[1], "prev": r[2], "schema": r[3], "country": r[4], "coherence": r[5]} for r in rows] def jarvis(message: str, history): m = message.lower().strip() if "latest" in m or "tip" in m: recs = query_records(1) return json.dumps(recs[0] if recs else {"status": "empty"}, indent=2) if "coherence" in m: recs = query_records(20) coh_values = [r["coherence"] for r in recs if r["coherence"] is not None] return f"Recent coherence (last {len(coh_values)}): mean = {np.mean(coh_values):.3f}" if coh_values else "No coherence data yet" return "Commands: latest, tip, coherence" # ────────────────────────────────────────────────────────────── # GRADIO INTERFACE # ────────────────────────────────────────────────────────────── with gr.Blocks(title="CodexFlow v3 • IRE Influence") as app: gr.Markdown("# CodexFlow v3 • Provenance Ledger + First IRE Touch") gr.Markdown("SQLite • Hash chain • Bit stats • Simple coherence refusal & smoothing") with gr.Row(): with gr.Column(scale=2): comm = gr.Dropdown(["Gold","Oil","Gas","Wheat","Copper"], "Gold", label="Commodity") anch = gr.Number(950, label="Anchor") cntry = gr.Textbox("WLD", label="Country") lag_d = gr.Slider(1, 365, 7, label="Lag days") yr = gr.Textbox(DEFAULT_YEAR, label="Year") live = gr.Checkbox(True, label="Live World Bank") btn = gr.Button("Run Tick", variant="primary") res = gr.JSON(label="Result") tip = gr.Textbox(label="Tip Hash", interactive=False) btn.click(run_tick, [comm, anch, cntry, lag_d, live, yr], [res, tip]) gr.Markdown("### Query") lim = gr.Slider(5, 200, 30, label="Limit") sch = gr.Dropdown(["ANY", "macro.v1", "commodity.v1", "features.v1"], "ANY", label="Schema") qry_btn = gr.Button("Query") out = gr.JSON(label="Records") qry_btn.click(query_records, [lim, sch, cntry], out) with gr.Column(scale=1): gr.Markdown("### Jarvis X") gr.ChatInterface(jarvis, chatbot=gr.Chatbot(height=400)) gr.Markdown("**v3** • First coherence check & Ω smoothing • Still toy-level IRE influence") if __name__ == "__main__": app.launch()