import csv import io from pathlib import Path from fastapi import APIRouter, Depends, File, Form, HTTPException, Query, Request, UploadFile from fastapi.responses import HTMLResponse, Response from fastapi.templating import Jinja2Templates from app.config import get_settings from app.db import list_leads, stats from app.frontend import ALLOWED_FILENAMES from app.security import require_admin router = APIRouter(prefix="/admin", tags=["admin"]) _TEMPLATES_DIR = Path(__file__).parent / "templates" templates = Jinja2Templates(directory=str(_TEMPLATES_DIR)) @router.get("/leads.html", response_class=HTMLResponse) @router.get("/leads", response_class=HTMLResponse) def admin_leads_html( request: Request, _user: str = Depends(require_admin), limit: int = Query(1000, ge=1, le=10000), ): leads = list_leads(limit=limit, offset=0) s = stats() return templates.TemplateResponse( "admin_leads.html", { "request": request, "leads": leads, "total": s["total_leads"], "total_downloads": s["total_downloads"], "top_empresas": s["top_5_empresas"], "downloads_por_archivo": s["downloads_por_archivo"], }, ) @router.get("/material-files") def admin_material_files(_user: str = Depends(require_admin)): """Lista los archivos en el directorio material/ con sus tamaños.""" material_dir = Path(get_settings().material_dir) if not material_dir.exists(): return {"material_dir": str(material_dir), "exists": False, "files": []} files = [] for path in sorted(material_dir.iterdir()): if path.is_file(): size = path.stat().st_size files.append({ "name": path.name, "size_bytes": size, "size_mb": round(size / (1024 * 1024), 2), }) return {"material_dir": str(material_dir), "exists": True, "files": files} @router.post("/upload-material") async def admin_upload_material( target_name: str = Form(...), file: UploadFile = File(...), _user: str = Depends(require_admin), ): """Sube un archivo al volumen /app/material/ con un nombre de la whitelist.""" if target_name not in ALLOWED_FILENAMES: raise HTTPException( status_code=400, detail=f"target_name must be one of {sorted(ALLOWED_FILENAMES)}", ) material_dir = Path(get_settings().material_dir) material_dir.mkdir(parents=True, exist_ok=True) dest = material_dir / target_name content = await file.read() dest.write_bytes(content) return { "saved_as": str(dest), "size_bytes": len(content), "size_mb": round(len(content) / (1024 * 1024), 2), "original_filename": file.filename, } @router.delete("/material-files/{filename}") def admin_delete_material(filename: str, _user: str = Depends(require_admin)): """Borra un archivo del volumen material/.""" if filename not in ALLOWED_FILENAMES: raise HTTPException(status_code=400, detail="filename not in whitelist") path = Path(get_settings().material_dir) / filename if not path.exists(): raise HTTPException(status_code=404, detail="file not found") path.unlink() return {"deleted": filename} @router.get("/leads.json") def admin_leads_json( _user: str = Depends(require_admin), limit: int = Query(100, ge=1, le=1000), offset: int = Query(0, ge=0), ): return list_leads(limit=limit, offset=offset) @router.get("/leads.csv") def admin_leads_csv(_user: str = Depends(require_admin)): rows = list_leads(limit=10_000, offset=0) buf = io.StringIO() if rows: writer = csv.DictWriter(buf, fieldnames=list(rows[0].keys())) writer.writeheader() writer.writerows(rows) else: buf.write("(no leads)\n") payload = "" + buf.getvalue() return Response( content=payload, media_type="text/csv; charset=utf-8", headers={"Content-Disposition": 'attachment; filename="leads.csv"'}, ) @router.get("/stats") def admin_stats(_user: str = Depends(require_admin)): return stats()