Mini-Audit – Top Verdachtsfälle (Tabellen-Rendering via DOM)
Anzahl Buchungen:
Beispieldaten erzeugen + analysieren
Ergebnis-CSV herunterladen
Status: –
Buchungen
–
Regel-auffällig
–
Belegnr-Duplikate
–
AUC
–
Top-Fälle
–
Tabelle:
ID
Belegdatum
Buchungsdatum
Konto
Gegenk.
Kreditor
Betrag
Flag
Score
Grund
from js import document, Blob, URL import numpy as np import warnings from sklearn.exceptions import ConvergenceWarning warnings.filterwarnings("ignore", category=ConvergenceWarning) from sklearn.model_selection import train_test_split from sklearn.preprocessing import OneHotEncoder, StandardScaler from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline from sklearn.linear_model import LogisticRegression from sklearn.metrics import roc_auc_score def el(id_): return document.getElementById(id_) def set_status(msg): el("status").textContent = f"Status: {msg}" def euro(x: float) -> str: s = f"{x:,.2f}" s = s.replace(",", "X").replace(".", ",").replace("X", ".") return s + " €" def fmt_date_2025(doy: int) -> str: # einfache Umrechnung: Monat = (DOY-1)//30 + 1, Tag = Rest month = (int(doy) - 1) // 30 + 1 day = (int(doy) - 1) % 30 + 1 # Begrenzen (Demo, kein echter Kalender) if month > 12: month = 12 day = min(day, 31) return f"{day:02d}.{month:02d}.2025" # ----------- Beispiel-Daten (stabil) ----------- def generate_datev_like(n: int, seed: int = 42): rng = np.random.default_rng(seed) ids = np.arange(1, n + 1) beleg_doy = rng.integers(1, 366, size=n) delta = rng.integers(-10, 15, size=n) buch_doy = np.clip(beleg_doy + delta, 1, 365) expense_accounts = np.array(["6000","6200","6300","6640","6810","6820","6850","6870","6910","8736"]) revenue_accounts = np.array(["8400","8300"]) accounts = np.concatenate([expense_accounts, revenue_accounts]) w = np.array([0.09,0.06,0.06,0.06,0.06,0.06,0.06,0.06,0.06,0.05, 0.26,0.18], dtype=float) w = w / w.sum() konto = rng.choice(accounts, size=n, replace=True, p=w) gegenkonto = rng.choice(np.array(["1200","1210","1000","1600","1590","1400"]), size=n, replace=True) kreditor = rng.choice(np.array(["AMZN","SHELL","DB","HOTEL","CATER","MRO","PAYPAL","UNKNOWN","BAR","APPLE","MSFT"]), size=n, replace=True) amount = rng.lognormal(mean=4.3, sigma=0.7, size=n) sign = np.where(np.isin(konto, revenue_accounts), 1, -1) betrag = amount * sign is_round = (rng.random(n) < 0.20) betrag = np.where(is_round, np.round(betrag/10)*10, betrag) # Duplikat-Belegnummern triggern base = rng.integers(10000, 99999, size=n) prefix = np.where(np.isin(konto, revenue_accounts), "ER", "RE") belegnr = np.array([f"{prefix[i]}-2025-{base[i]}" for i in range(n)], dtype=object) dup_count = max(5, n // 50) dup_idx = rng.choice(np.arange(n), size=dup_count, replace=False) for i in dup_idx: j = rng.integers(0, n) belegnr[j] = belegnr[i] # Ableitungen / Regeln (Demo) u, c = np.unique(belegnr, return_counts=True) cmap = {uu: int(cc) for uu, cc in zip(u, c)} belegnr_count = np.array([cmap.get(b, 1) for b in belegnr], dtype=int) rule_dup = (belegnr_count >= 2) rule_unknown = (kreditor == "UNKNOWN") rule_high = (betrag < -2000) y = (rule_dup | rule_unknown | rule_high).astype(int) reasons = np.empty(n, dtype=object) reasons[:] = "" reasons = np.where(rule_dup, "R3: Belegnr Duplikat", reasons) reasons = np.where(rule_unknown, np.where(reasons=="", "R5: Kreditor UNKNOWN", reasons + " | R5: Kreditor UNKNOWN"), reasons) reasons = np.where(rule_high, np.where(reasons=="", "R1: Hohe Ausgabe > 2.000", reasons + " | R1: Hohe Ausgabe > 2.000"), reasons) return { "id": ids, "belegdatum": np.array([fmt_date_2025(d) for d in beleg_doy], dtype=object), "buchungsdatum": np.array([fmt_date_2025(d) for d in buch_doy], dtype=object), "konto": konto.astype(object), "gegenkonto": gegenkonto.astype(object), "kreditor": kreditor.astype(object), "betrag": betrag.astype(float), "belegnr": belegnr.astype(object), "belegnr_count": belegnr_count.astype(int), "y": y, "reasons": reasons, } # ----------- Modell + Score ----------- def fit_score(data): X_num = data["betrag"].reshape(-1, 1) X_cat = np.column_stack([data["konto"], data["gegenkonto"], data["kreditor"]]) X = np.concatenate([X_num, X_cat], axis=1) num_idx = [0] cat_idx = [1,2,3] pre = ColumnTransformer( transformers=[ ("num", StandardScaler(), num_idx), ("cat", OneHotEncoder(handle_unknown="ignore"), cat_idx), ], remainder="drop" ) pipe = Pipeline(steps=[ ("pre", pre), ("clf", LogisticRegression(max_iter=2000)) ]) y = data["y"] pos = int(np.sum(y==1)) if pos >= 5: X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.25, random_state=42, stratify=y) else: X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.25, random_state=42) pipe.fit(X_tr, y_tr) auc = float("nan") if len(np.unique(y_te)) > 1: auc = roc_auc_score(y_te, pipe.predict_proba(X_te)[:,1]) score = pipe.predict_proba(X)[:,1] return score, auc # ----------- Tabelle via DOM bauen ----------- def clear_tbody(tbody): while tbody.firstChild: tbody.removeChild(tbody.firstChild) def add_td(tr, text, cls=None): td = document.createElement("td") td.textContent = str(text) if cls: td.className = cls tr.appendChild(td) return td def add_td_tag(tr, text, bad=False, cls=None): td = document.createElement("td") if cls: td.className = cls span = document.createElement("span") span.className = "tag bad" if bad else "tag good" span.textContent = str(text) td.appendChild(span) tr.appendChild(td) return td def render_table(data, score, top_n=40): tbody = el("tbl_body") clear_tbody(tbody) idx = np.argsort(-score)[:top_n] for i in idx: tr = document.createElement("tr") add_td(tr, int(data["id"][i]), "col-id") add_td(tr, data["belegdatum"][i], "col-date") add_td(tr, data["buchungsdatum"][i], "col-date") add_td(tr, data["konto"][i], "col-kto") add_td(tr, data["gegenkonto"][i], "col-gk") add_td(tr, data["kreditor"][i], "col-vendor") add_td(tr, euro(float(data["betrag"][i])), "col-amt") flag = int(data["y"][i]) add_td_tag(tr, flag, bad=(flag==1), cls="col-flag") add_td(tr, f"{float(score[i]):.4f}", "col-score") add_td(tr, data["reasons"][i] if data["reasons"][i] else "–", "col-reason") tbody.appendChild(tr) el("kpi_top").textContent = str(top_n) # ----------- CSV Download ----------- def build_result_csv(data, score): header = ["id","belegdatum","buchungsdatum","konto","gegenkonto","kreditor","betrag","belegnr","belegnr_count","rule_flag","score","rule_reason"] lines = [",".join(header)] def esc_csv(s): s = str(s).replace('"', '""') return f'"{s}"' for i in range(len(score)): lines.append(",".join([ str(int(data["id"][i])), str(data["belegdatum"][i]), str(data["buchungsdatum"][i]), str(data["konto"][i]), str(data["gegenkonto"][i]), str(data["kreditor"][i]), f"{float(data['betrag'][i]):.2f}", str(data["belegnr"][i]), str(int(data["belegnr_count"][i])), str(int(data["y"][i])), f"{float(score[i]):.6f}", esc_csv(data["reasons"][i]), ])) return "\n".join(lines) def enable_download(csv_text: str): def download(_evt=None): blob = Blob.new([csv_text], {"type": "text/csv;charset=utf-8"}) url = URL.createObjectURL(blob) a = document.createElement("a") a.href = url a.download = "audit_scoring_result.csv" document.body.appendChild(a) a.click() a.remove() URL.revokeObjectURL(url) btn = el("btn_download") btn.disabled = False btn.onclick = download # ----------- Run ----------- def run(_evt=None): set_status("Erzeuge Daten…") try: n = int(el("n_rows").value) except Exception: n = 600 n = max(100, min(8000, n)) data = generate_datev_like(n=n, seed=42) set_status("Trainiere Modell…") score, auc = fit_score(data) el("kpi_rows").textContent = str(n) el("kpi_flagged").textContent = str(int(np.sum(data["y"]))) # ✅ KPI: Anzahl Zeilen mit Belegnr mehrfach verwendet el("kpi_dup").textContent = str(int(np.sum(data["belegnr_count"] >= 2))) el("kpi_auc").textContent = (f"{auc:.4f}" if auc == auc else "n/a") set_status("Rendere Tabelle…") render_table(data, score, top_n=40) csv_text = build_result_csv(data, score) enable_download(csv_text) set_status("Fertig ✅ (Duplikate-KPI aktiv)") el("btn_run").addEventListener("click", run) run()