import sqlite3 from contextlib import contextmanager from pathlib import Path from typing import Iterator from app.config import get_settings SCHEMA = """ CREATE TABLE IF NOT EXISTS leads ( id INTEGER PRIMARY KEY AUTOINCREMENT, nombre TEXT NOT NULL, email TEXT NOT NULL UNIQUE, empresa TEXT NOT NULL, ip TEXT, user_agent TEXT, consent INTEGER NOT NULL DEFAULT 0, times_registered INTEGER NOT NULL DEFAULT 1, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE TABLE IF NOT EXISTS downloads ( id INTEGER PRIMARY KEY AUTOINCREMENT, lead_email TEXT NOT NULL, filename TEXT NOT NULL, ip TEXT, downloaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); CREATE INDEX IF NOT EXISTS idx_downloads_email ON downloads(lead_email); CREATE INDEX IF NOT EXISTS idx_downloads_filename ON downloads(filename); """ @contextmanager def _conn() -> Iterator[sqlite3.Connection]: settings = get_settings() db_path = Path(settings.db_path) db_path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row try: yield conn conn.commit() finally: conn.close() def init_db() -> None: with _conn() as conn: conn.executescript(SCHEMA) def upsert_lead( nombre: str, email: str, empresa: str, ip: str | None, user_agent: str | None, consent: bool, ) -> int: with _conn() as conn: cur = conn.execute( """ INSERT INTO leads (nombre, email, empresa, ip, user_agent, consent) VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(email) DO UPDATE SET nombre = excluded.nombre, empresa = excluded.empresa, ip = excluded.ip, user_agent = excluded.user_agent, consent = excluded.consent, times_registered = times_registered + 1, last_seen = CURRENT_TIMESTAMP RETURNING id """, (nombre, email, empresa, ip, user_agent, 1 if consent else 0), ) return cur.fetchone()["id"] def get_lead_by_email(email: str) -> dict | None: with _conn() as conn: row = conn.execute( "SELECT * FROM leads WHERE email = ?", (email,) ).fetchone() return dict(row) if row else None def log_download(lead_email: str, filename: str, ip: str | None) -> None: with _conn() as conn: conn.execute( "INSERT INTO downloads (lead_email, filename, ip) VALUES (?, ?, ?)", (lead_email, filename, ip), ) def list_leads(limit: int = 100, offset: int = 0) -> list[dict]: with _conn() as conn: rows = conn.execute( "SELECT * FROM leads ORDER BY id ASC LIMIT ? OFFSET ?", (limit, offset), ).fetchall() return [dict(r) for r in rows] def stats() -> dict: with _conn() as conn: total_leads = conn.execute("SELECT COUNT(*) AS c FROM leads").fetchone()["c"] total_downloads = conn.execute("SELECT COUNT(*) AS c FROM downloads").fetchone()["c"] per_file = { r["filename"]: r["c"] for r in conn.execute( "SELECT filename, COUNT(*) AS c FROM downloads GROUP BY filename" ).fetchall() } top_empresas = [ {"empresa": r["empresa"], "count": r["c"]} for r in conn.execute( "SELECT empresa, COUNT(*) AS c FROM leads GROUP BY empresa ORDER BY c DESC LIMIT 5" ).fetchall() ] return { "total_leads": total_leads, "total_downloads": total_downloads, "downloads_por_archivo": per_file, "top_5_empresas": top_empresas, }