apiVersion: v1 kind: ConfigMap metadata: name: argos-orchestrator-config namespace: argos-core data: settings.yaml: | mqtt: host: mosquitto.argos-core.svc.cluster.local topic: frigate/events topic_base: argos/alerts port: 1883 client_id: argos-core minio: endpoint: minio.argos-core.svc.cluster.local:9000 secure: false bucket: argos region: us-east-1 edges: - name: rpi01 base_url: http://10.20.0.10:5000 # URL del Frigate del edge (VPN). 5000 por defecto. api_token: "" # si usas auth, ponlo aquí cameras: - name: cam1 rtsp_main: rtsp://10.20.0.10:8554/cam1_main path_mtx: cam1 # para abrir live en MediaMTX: /?path=cam1 app.py: | import os, json, time, datetime, sqlite3, subprocess, yaml, requests from pathlib import Path from urllib.parse import urljoin import paho.mqtt.client as mqtt from minio import Minio CFG_PATH="/app/settings.yaml" DB_PATH="/data/argos.db" TMP="/tmp" with open(CFG_PATH,"r") as f: CFG=yaml.safe_load(f) # Map quick-lookup: camera -> (edge, cfg) CAM_MAP={} for e in CFG.get("edges", []): for c in e.get("cameras", []): CAM_MAP[c["name"]]=(e, c) # MinIO mc=Minio(os.getenv("MINIO_ENDPOINT","s3.argos.interna"), access_key=os.getenv("MINIO_ACCESS_KEY"), secret_key=os.getenv("MINIO_SECRET_KEY"), secure=os.getenv("MINIO_SECURE","false").lower()=="true") BUCKET=CFG["minio"]["bucket"] if not mc.bucket_exists(BUCKET): mc.make_bucket(BUCKET) # DB Path("/data").mkdir(parents=True, exist_ok=True) con=sqlite3.connect(DB_PATH, check_same_thread=False) cur=con.cursor() cur.execute("""CREATE TABLE IF NOT EXISTS events( id TEXT PRIMARY KEY, ts INTEGER, edge TEXT, camera TEXT, label TEXT, s3url TEXT, thumb_s3 TEXT )""") con.commit() def upload_file(local_path, key, content_type): mc.fput_object(BUCKET, key, local_path, content_type=content_type) return f"s3://{BUCKET}/{key}" def fetch_frigate_clip(edge_cfg, ev_id): """ Descarga el clip nativo de Frigate si existe """ base=edge_cfg["base_url"] url=urljoin(base, f"/api/events/{ev_id}/clip") headers={} if edge_cfg.get("api_token"): headers["Authorization"]=f"Bearer {edge_cfg['api_token']}" r=requests.get(url, headers=headers, stream=True, timeout=30) if r.status_code!=200: return None tmp=f"{TMP}/{ev_id}.mp4" with open(tmp,"wb") as f: for chunk in r.iter_content(1<<20): if chunk: f.write(chunk) return tmp def fetch_frigate_thumb(edge_cfg, ev_id): base=edge_cfg["base_url"] url=urljoin(base, f"/api/events/{ev_id}/thumbnail.jpg") headers={} if edge_cfg.get("api_token"): headers["Authorization"]=f"Bearer {edge_cfg['api_token']}" r=requests.get(url, headers=headers, timeout=10) if r.status_code!=200: return None tmp=f"{TMP}/{ev_id}.jpg" with open(tmp,"wb") as f: f.write(r.content) return tmp def record_rtsp(rtsp_url, seconds=30): tmp=f"{TMP}/rtsp_{int(time.time())}.mp4" cmd=["ffmpeg","-nostdin","-y","-rtsp_transport","tcp","-i",rtsp_url,"-t",str(seconds),"-c","copy",tmp] try: subprocess.run(cmd, check=True) return tmp except Exception as e: print("FFmpeg fallback failed:", e) return None def on_message(client, userdata, msg): try: payload=json.loads(msg.payload.decode("utf-8")) except Exception as e: print("Bad JSON", e); return ev_type=payload.get("type") after=payload.get("after") or {} ev_id=after.get("id") or payload.get("id") cam=after.get("camera") or payload.get("camera") label=after.get("label") or payload.get("label","") if ev_type!="new" or not cam or not ev_id: return if cam not in CAM_MAP: print("Unknown camera:", cam); return edge_cfg, cam_cfg = CAM_MAP[cam] ts=int(time.time()) print(f"[ARGOS] {ev_id} {cam} → try Frigate clip") path_local = fetch_frigate_clip(edge_cfg, ev_id) if not path_local: print(f"[ARGOS] {ev_id} no native clip, fallback RTSP") path_local = record_rtsp(cam_cfg["rtsp_main"], seconds=30) if not path_local: print(f"[ARGOS] {ev_id} failed recording"); return # upload clip date=datetime.datetime.utcfromtimestamp(ts) key=f"{cam}/{date.year:04d}/{date.month:02d}/{date.day:02d}/{ts}_{ev_id}.mp4" s3url=upload_file(path_local, key, "video/mp4") try: os.remove(path_local) except: pass # thumbnail thumb_local = fetch_frigate_thumb(edge_cfg, ev_id) thumb_s3=None if thumb_local: tkey=f"{cam}/thumbs/{ts}_{ev_id}.jpg" thumb_s3=upload_file(thumb_local, tkey, "image/jpeg") try: os.remove(thumb_local) except: pass cur.execute("INSERT OR REPLACE INTO events(id, ts, edge, camera, label, s3url, thumb_s3) VALUES (?,?,?,?,?,?,?)", (ev_id, ts, edge_cfg["name"], cam, label, s3url, thumb_s3)) con.commit() print(f"[ARGOS] stored {s3url}") def main(): m=mqtt.Client(client_id=CFG["mqtt"]["client_id"], clean_session=True) m.connect(CFG["mqtt"]["host"], CFG["mqtt"]["port"], keepalive=60) m.subscribe(CFG["mqtt"]["topic"]) m.on_message=on_message m.loop_forever() if __name__=="__main__": main()