#!/usr/bin/env python3 """Replace hard-coded values in 'Kunden' sheet with proper formulas: - Neukunden = driver lookup (per year × segment) from Treiber sheet - Churn = ROUND(previous-month Bestandskunden × Churn-Rate, 0) - Bestandskunden = previous + new - churn (cumulative) Default driver values are derived from each file's existing monthly Neukunden data so each scenario keeps its growth profile. Churn rates default to industry-typical B2B SaaS values (Starter 1%, Pro 0.5%, Enterprise 0.3%/Monat). Usage: python3 pitch-deck/scripts/add-kunden-formulas.py --dry-run python3 pitch-deck/scripts/add-kunden-formulas.py python3 pitch-deck/scripts/add-kunden-formulas.py --only Finanzplan-Wandeldarlehen-400k.xlsx """ from __future__ import annotations import argparse import shutil import sys from datetime import datetime from pathlib import Path from openpyxl import load_workbook from openpyxl.utils import get_column_letter EXPORTS = Path(__file__).resolve().parent.parent / "exports" # Default monthly churn rates per segment per year. Reflects business reality: # - Starter: high early churn (testing, startups failing) decreasing as product matures # - Professional: moderate, gradually decreasing # - Enterprise: very low — they integrate the product, don't switch # Annual equivalents: # Starter 64% → 17%, Professional 26% → 9%, Enterprise 6% → 1% CHURN_DEFAULTS: dict[str, dict[int, float]] = { "starter": {2026: 0.08, 2027: 0.05, 2028: 0.03, 2029: 0.02, 2030: 0.015}, "professional": {2026: 0.025, 2027: 0.02, 2028: 0.015, 2029: 0.01, 2030: 0.008}, "enterprise": {2026: 0.005, 2027: 0.003, 2028: 0.002, 2029: 0.001, 2030: 0.001}, } # Segment definitions: (segment_name, neukunden_row, churn_row, bestand_row, suffix) SEGMENTS = [ ("Starter (<10 MA)", 4, 5, 6, "starter"), ("Professional (10-250 MA)", 7, 8, 9, "professional"), ("Enterprise (250+ MA)", 10, 11, 12, "enterprise"), ] def year_columns(ws_kunden) -> dict[int, list[int]]: out: dict[int, list[int]] = {} for c in range(2, ws_kunden.max_column + 1): v = ws_kunden.cell(row=1, column=c).value if v is None: continue try: yr = int(v) except (TypeError, ValueError): continue out.setdefault(yr, []).append(c) return out def monthly_avg_per_year(ws_kunden, row: int, year_cols: dict[int, list[int]]) -> dict[int, int]: """Compute rounded monthly average from existing per-month values. Uses half-up rounding (0.5 → 1) instead of Python's banker's rounding (0.5 → 0) to preserve segments that grew at exactly half-a-customer-per-month. """ res: dict[int, int] = {} for yr, cols in year_cols.items(): total = 0.0 for c in cols: v = ws_kunden.cell(row=row, column=c).value if v in (None, ""): continue try: total += float(v) except (TypeError, ValueError): pass avg = total / len(cols) if cols else 0 res[yr] = max(0, int(avg + 0.5)) return res _SEG_LABEL = {"starter": "Starter", "professional": "Professional", "enterprise": "Enterprise"} def _clear_old_kundenakquise_block(ws_treiber) -> None: """If we ran a previous version, drop any rows after the original 30 to start fresh.""" if ws_treiber.max_row > 30: for r in range(31, ws_treiber.max_row + 1): ws_treiber.cell(row=r, column=1).value = None ws_treiber.cell(row=r, column=2).value = None def write_treiber_drivers(ws_treiber, years: list[int], defaults_by_segment: dict[str, dict[int, int]]) -> dict: """Rewrite the Kundenakquise + Churn driver block. Returns row indices for cross-references. Layout (after original row 30): 31 (blank) 32 Header "Kundenakquise" 33..47 Neukunden/Monat per (segment, year) 48 (blank) 49 Header "Churn (monatliche Rate, pro Jahr)" 50..64 Churn-Rate/Monat per (segment, year) """ _clear_old_kundenakquise_block(ws_treiber) r = 32 ws_treiber.cell(row=r, column=1).value = "Kundenakquise" r += 1 new_customer_refs: dict[str, dict[int, int]] = {} for suffix in ("starter", "professional", "enterprise"): new_customer_refs[suffix] = {} for yr in years: ws_treiber.cell(row=r, column=1).value = f"Neukunden/Monat {_SEG_LABEL[suffix]} {yr}" ws_treiber.cell(row=r, column=2).value = defaults_by_segment[suffix][yr] new_customer_refs[suffix][yr] = r r += 1 r += 1 # blank row 48 ws_treiber.cell(row=r, column=1).value = "Churn (monatliche Rate, pro Jahr)" r += 1 churn_refs: dict[str, dict[int, int]] = {} for suffix in ("starter", "professional", "enterprise"): churn_refs[suffix] = {} for yr in years: ws_treiber.cell(row=r, column=1).value = f"Churn-Rate/Monat {_SEG_LABEL[suffix]} {yr}" ws_treiber.cell(row=r, column=2).value = CHURN_DEFAULTS[suffix][yr] churn_refs[suffix][yr] = r r += 1 return {"new_customer_rows": new_customer_refs, "churn_rows": churn_refs} # Helper rows in Kunden sheet (added BELOW the totals to avoid breaking external refs). # Each helper row holds the year-varying monthly churn rate per column for one segment. HELPER_ROWS = { "starter": 18, "professional": 19, "enterprise": 20, } def write_kunden_formulas(ws_kunden, years: list[int], refs: dict) -> dict: min_year = min(years) new_refs = refs["new_customer_rows"] churn_refs = refs["churn_rows"] stats = {"neu": 0, "churn": 0, "bestand": 0, "helper": 0} # 1. Write per-segment helper rows (18-20) with year-lookup rates per column. ws_kunden.cell(row=17, column=1).value = None # separator for suffix, helper_row in HELPER_ROWS.items(): first = min(churn_refs[suffix].values()) last = max(churn_refs[suffix].values()) ws_kunden.cell(row=helper_row, column=1).value = ( f"Monatl. Churn-Rate {_SEG_LABEL[suffix]} (Helper)" ) for c in range(2, ws_kunden.max_column + 1): col_letter = get_column_letter(c) ws_kunden.cell(row=helper_row, column=c).value = ( f"=INDEX(Treiber!$B${first}:$B${last},{col_letter}$1-{min_year - 1})" ) stats["helper"] += 1 # 2. Write per-segment Neukunden/Churn/Bestandskunden formulas. for seg_label, neu_row, churn_row, bestand_row, suffix in SEGMENTS: ws_kunden.cell(row=neu_row, column=1).value = f"Neukunden {seg_label}" ws_kunden.cell(row=churn_row, column=1).value = f"Churn {seg_label}" ws_kunden.cell(row=bestand_row, column=1).value = f"Bestandskunden {seg_label}" seg_neu_rows = new_refs[suffix] neu_first = min(seg_neu_rows.values()) neu_last = max(seg_neu_rows.values()) helper_row = HELPER_ROWS[suffix] for c in range(2, ws_kunden.max_column + 1): col_letter = get_column_letter(c) prev_col = get_column_letter(c - 1) if c > 2 else None # --- Neukunden: year-lookup driver --- ws_kunden.cell(row=neu_row, column=c).value = ( f"=INDEX(Treiber!$B${neu_first}:$B${neu_last},{col_letter}$1-{min_year - 1})" ) stats["neu"] += 1 # --- Churn: cumulative-rounding to make small-base churn visible --- # Approach: expected_cum_churn(t) = SUMPRODUCT(Bestand[B..t-1], Rate[C..t]) # Each month's churn = ROUND(expected_cum) - already_booked_churn. # This guarantees integer monthly values that aggregate to ROUND(expected_total). if c == 2: # Aug 2026: no prior month, no churn. ws_kunden.cell(row=churn_row, column=c).value = 0 elif c == 3: # Sep 2026: one prior bestand cell (B), no churn-booked yet. ws_kunden.cell(row=churn_row, column=c).value = ( f"=MAX(0,ROUND(SUMPRODUCT($B{bestand_row}:B{bestand_row}," f"$C{helper_row}:C{helper_row}),0))" ) else: ws_kunden.cell(row=churn_row, column=c).value = ( f"=MAX(0,ROUND(SUMPRODUCT($B{bestand_row}:{prev_col}{bestand_row}," f"$C{helper_row}:{col_letter}{helper_row}),0)" f"-SUM($C{churn_row}:{prev_col}{churn_row}))" ) stats["churn"] += 1 # --- Bestandskunden: cumulative balance --- if c == 2: ws_kunden.cell(row=bestand_row, column=c).value = ( f"={col_letter}{neu_row}-{col_letter}{churn_row}" ) else: ws_kunden.cell(row=bestand_row, column=c).value = ( f"={prev_col}{bestand_row}+{col_letter}{neu_row}-{col_letter}{churn_row}" ) stats["bestand"] += 1 return stats def _was_already_processed(ws_treiber) -> bool: """Detect if a previous run already wrote the Kundenakquise driver block.""" return ws_treiber.cell(row=32, column=1).value == "Kundenakquise" def _read_existing_neukunden_drivers(ws_treiber, years: list[int]) -> dict: """Read driver values previously written to Treiber rows 33..47. Re-running on a processed file would otherwise compute defaults from formula cells (which openpyxl returns as strings) and reset everything to 0. """ defaults: dict[str, dict[int, int]] = {} r = 33 for suffix in ("starter", "professional", "enterprise"): defaults[suffix] = {} for yr in years: v = ws_treiber.cell(row=r, column=2).value try: defaults[suffix][yr] = int(round(float(v))) if v is not None else 0 except (TypeError, ValueError): defaults[suffix][yr] = 0 r += 1 return defaults def process_file(path: Path, dry_run: bool) -> dict | None: wb = load_workbook(path) if "Kunden" not in wb.sheetnames or "Treiber" not in wb.sheetnames: return None # caller will report skip ws_k = wb["Kunden"] ws_t = wb["Treiber"] yc = year_columns(ws_k) years = sorted(yc.keys()) if _was_already_processed(ws_t): # Preserve user-edited driver values from a previous run defaults = _read_existing_neukunden_drivers(ws_t, years) source = "treiber (preserved)" else: # First time: compute defaults from existing Kunden values defaults = { "starter": monthly_avg_per_year(ws_k, 4, yc), "professional": monthly_avg_per_year(ws_k, 7, yc), "enterprise": monthly_avg_per_year(ws_k, 10, yc), } source = "kunden data" refs = write_treiber_drivers(ws_t, years, defaults) stats = write_kunden_formulas(ws_k, years, refs) if not dry_run: wb.save(path) return {"years": years, "defaults": defaults, "stats": stats, "refs": refs, "defaults_source": source} def backup(path: Path) -> Path: ts = datetime.now().strftime("%Y%m%d-%H%M%S") bk = path.with_name(f"{path.stem}.BACKUP-pre-kunden-formulas-{ts}{path.suffix}") shutil.copy2(path, bk) return bk def main() -> int: ap = argparse.ArgumentParser(description=__doc__) ap.add_argument("--dry-run", action="store_true") ap.add_argument("--only", help="Process only this filename") ap.add_argument("--no-backup", action="store_true") args = ap.parse_args() files = sorted(EXPORTS.glob("Finanzplan-*.xlsx")) files = [f for f in files if "BACKUP" not in f.name] if args.only: files = [f for f in files if f.name == args.only] for path in files: # Peek first to decide whether to backup wb_peek = load_workbook(path, read_only=True) if "Treiber" not in wb_peek.sheetnames or "Kunden" not in wb_peek.sheetnames: print(f"\n ⨯ skip {path.name}: no Treiber sheet") continue if not args.dry_run and not args.no_backup: bk = backup(path) print(f" ✓ backup: {bk.name}") info = process_file(path, dry_run=args.dry_run) if info is None: print(f" ⨯ skip {path.name}: structural mismatch") continue print(f"\n === {path.name} ===") print(f" Years: {info['years']}") print(f" Defaults source: {info['defaults_source']}") print(" Neukunden/Monat:") for seg in ("starter", "professional", "enterprise"): print(f" {seg:13s}: {info['defaults'][seg]}") print(f" Cells written: {info['stats']}") return 0 if __name__ == "__main__": sys.exit(main())