feat: initial implementation taller-wox.fitlabs.dev

Portal FastAPI + 5 endpoints REST para Bootcamp Agentic AI con
watsonx Orchestrate (FactorIT). Single container, Coolify-ready.

- Landing brandeado FIT con formulario de registro (honeypot anti-bot)
- Tokens itsdangerous para descargas (24h expiry)
- 5 endpoints API: historical/available procedures, member-insights,
  schedule, generate-report (Jinja2 + Plotly)
- SQLite con upsert-on-email para leads + log de descargas
- Admin endpoints (HTTP Basic): leads.json, leads.csv, stats
- 23 tests pytest pasando
- Dockerfile listo para Coolify con volúmenes persistentes
  (/app/leads.db, /app/app/data/reports_output, /app/material)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-13 03:01:44 +00:00
commit a062b45c51
57 changed files with 8035 additions and 0 deletions

128
app/db.py Normal file
View File

@@ -0,0 +1,128 @@
import sqlite3
from contextlib import contextmanager
from pathlib import Path
from typing import Iterator
from app.config import get_settings
SCHEMA = """
CREATE TABLE IF NOT EXISTS leads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
nombre TEXT NOT NULL,
email TEXT NOT NULL UNIQUE,
empresa TEXT NOT NULL,
ip TEXT,
user_agent TEXT,
consent INTEGER NOT NULL DEFAULT 0,
times_registered INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_seen TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS downloads (
id INTEGER PRIMARY KEY AUTOINCREMENT,
lead_email TEXT NOT NULL,
filename TEXT NOT NULL,
ip TEXT,
downloaded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_downloads_email ON downloads(lead_email);
CREATE INDEX IF NOT EXISTS idx_downloads_filename ON downloads(filename);
"""
@contextmanager
def _conn() -> Iterator[sqlite3.Connection]:
settings = get_settings()
db_path = Path(settings.db_path)
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db() -> None:
with _conn() as conn:
conn.executescript(SCHEMA)
def upsert_lead(
nombre: str,
email: str,
empresa: str,
ip: str | None,
user_agent: str | None,
consent: bool,
) -> int:
with _conn() as conn:
cur = conn.execute(
"""
INSERT INTO leads (nombre, email, empresa, ip, user_agent, consent)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(email) DO UPDATE SET
nombre = excluded.nombre,
empresa = excluded.empresa,
ip = excluded.ip,
user_agent = excluded.user_agent,
consent = excluded.consent,
times_registered = times_registered + 1,
last_seen = CURRENT_TIMESTAMP
RETURNING id
""",
(nombre, email, empresa, ip, user_agent, 1 if consent else 0),
)
return cur.fetchone()["id"]
def get_lead_by_email(email: str) -> dict | None:
with _conn() as conn:
row = conn.execute(
"SELECT * FROM leads WHERE email = ?", (email,)
).fetchone()
return dict(row) if row else None
def log_download(lead_email: str, filename: str, ip: str | None) -> None:
with _conn() as conn:
conn.execute(
"INSERT INTO downloads (lead_email, filename, ip) VALUES (?, ?, ?)",
(lead_email, filename, ip),
)
def list_leads(limit: int = 100, offset: int = 0) -> list[dict]:
with _conn() as conn:
rows = conn.execute(
"SELECT * FROM leads ORDER BY id ASC LIMIT ? OFFSET ?",
(limit, offset),
).fetchall()
return [dict(r) for r in rows]
def stats() -> dict:
with _conn() as conn:
total_leads = conn.execute("SELECT COUNT(*) AS c FROM leads").fetchone()["c"]
total_downloads = conn.execute("SELECT COUNT(*) AS c FROM downloads").fetchone()["c"]
per_file = {
r["filename"]: r["c"]
for r in conn.execute(
"SELECT filename, COUNT(*) AS c FROM downloads GROUP BY filename"
).fetchall()
}
top_empresas = [
{"empresa": r["empresa"], "count": r["c"]}
for r in conn.execute(
"SELECT empresa, COUNT(*) AS c FROM leads GROUP BY empresa ORDER BY c DESC LIMIT 5"
).fetchall()
]
return {
"total_leads": total_leads,
"total_downloads": total_downloads,
"downloads_por_archivo": per_file,
"top_5_empresas": top_empresas,
}