Files
kubernetes/argos/configmaps/configmap-orchestrator.yaml
2025-08-18 10:31:22 +02:00

157 lines
5.4 KiB
YAML

apiVersion: v1
kind: ConfigMap
metadata:
name: argos-orchestrator-config
namespace: argos-core
data:
settings.yaml: |
mqtt:
host: mqtt.argos.interna
port: 1883
topic: frigate/events
client_id: argos-core
minio:
bucket: argos
region: us-east-1
edges:
- name: rpi01
base_url: http://10.20.0.10:5000 # URL del Frigate del edge (VPN). 5000 por defecto.
api_token: "" # si usas auth, ponlo aquí
cameras:
- name: cam1
rtsp_main: rtsp://10.20.0.10:8554/cam1_main
path_mtx: cam1 # para abrir live en MediaMTX: /?path=cam1
app.py: |
import os, json, time, datetime, sqlite3, subprocess, yaml, requests
from pathlib import Path
from urllib.parse import urljoin
import paho.mqtt.client as mqtt
from minio import Minio
CFG_PATH="/app/settings.yaml"
DB_PATH="/data/argos.db"
TMP="/tmp"
with open(CFG_PATH,"r") as f:
CFG=yaml.safe_load(f)
# Map quick-lookup: camera -> (edge, cfg)
CAM_MAP={}
for e in CFG.get("edges", []):
for c in e.get("cameras", []):
CAM_MAP[c["name"]]=(e, c)
# MinIO
mc=Minio(os.getenv("MINIO_ENDPOINT","s3.argos.interna"),
access_key=os.getenv("MINIO_ACCESS_KEY"),
secret_key=os.getenv("MINIO_SECRET_KEY"),
secure=os.getenv("MINIO_SECURE","false").lower()=="true")
BUCKET=CFG["minio"]["bucket"]
if not mc.bucket_exists(BUCKET): mc.make_bucket(BUCKET)
# DB
Path("/data").mkdir(parents=True, exist_ok=True)
con=sqlite3.connect(DB_PATH, check_same_thread=False)
cur=con.cursor()
cur.execute("""CREATE TABLE IF NOT EXISTS events(
id TEXT PRIMARY KEY, ts INTEGER, edge TEXT, camera TEXT, label TEXT,
s3url TEXT, thumb_s3 TEXT
)""")
con.commit()
def upload_file(local_path, key, content_type):
mc.fput_object(BUCKET, key, local_path, content_type=content_type)
return f"s3://{BUCKET}/{key}"
def fetch_frigate_clip(edge_cfg, ev_id):
""" Descarga el clip nativo de Frigate si existe """
base=edge_cfg["base_url"]
url=urljoin(base, f"/api/events/{ev_id}/clip")
headers={}
if edge_cfg.get("api_token"):
headers["Authorization"]=f"Bearer {edge_cfg['api_token']}"
r=requests.get(url, headers=headers, stream=True, timeout=30)
if r.status_code!=200: return None
tmp=f"{TMP}/{ev_id}.mp4"
with open(tmp,"wb") as f:
for chunk in r.iter_content(1<<20):
if chunk: f.write(chunk)
return tmp
def fetch_frigate_thumb(edge_cfg, ev_id):
base=edge_cfg["base_url"]
url=urljoin(base, f"/api/events/{ev_id}/thumbnail.jpg")
headers={}
if edge_cfg.get("api_token"):
headers["Authorization"]=f"Bearer {edge_cfg['api_token']}"
r=requests.get(url, headers=headers, timeout=10)
if r.status_code!=200: return None
tmp=f"{TMP}/{ev_id}.jpg"
with open(tmp,"wb") as f: f.write(r.content)
return tmp
def record_rtsp(rtsp_url, seconds=30):
tmp=f"{TMP}/rtsp_{int(time.time())}.mp4"
cmd=["ffmpeg","-nostdin","-y","-rtsp_transport","tcp","-i",rtsp_url,"-t",str(seconds),"-c","copy",tmp]
try:
subprocess.run(cmd, check=True)
return tmp
except Exception as e:
print("FFmpeg fallback failed:", e)
return None
def on_message(client, userdata, msg):
try:
payload=json.loads(msg.payload.decode("utf-8"))
except Exception as e:
print("Bad JSON", e); return
ev_type=payload.get("type")
after=payload.get("after") or {}
ev_id=after.get("id") or payload.get("id")
cam=after.get("camera") or payload.get("camera")
label=after.get("label") or payload.get("label","")
if ev_type!="new" or not cam or not ev_id: return
if cam not in CAM_MAP:
print("Unknown camera:", cam); return
edge_cfg, cam_cfg = CAM_MAP[cam]
ts=int(time.time())
print(f"[ARGOS] {ev_id} {cam} → try Frigate clip")
path_local = fetch_frigate_clip(edge_cfg, ev_id)
if not path_local:
print(f"[ARGOS] {ev_id} no native clip, fallback RTSP")
path_local = record_rtsp(cam_cfg["rtsp_main"], seconds=30)
if not path_local:
print(f"[ARGOS] {ev_id} failed recording"); return
# upload clip
date=datetime.datetime.utcfromtimestamp(ts)
key=f"{cam}/{date.year:04d}/{date.month:02d}/{date.day:02d}/{ts}_{ev_id}.mp4"
s3url=upload_file(path_local, key, "video/mp4")
try: os.remove(path_local)
except: pass
# thumbnail
thumb_local = fetch_frigate_thumb(edge_cfg, ev_id)
thumb_s3=None
if thumb_local:
tkey=f"{cam}/thumbs/{ts}_{ev_id}.jpg"
thumb_s3=upload_file(thumb_local, tkey, "image/jpeg")
try: os.remove(thumb_local)
except: pass
cur.execute("INSERT OR REPLACE INTO events(id, ts, edge, camera, label, s3url, thumb_s3) VALUES (?,?,?,?,?,?,?)",
(ev_id, ts, edge_cfg["name"], cam, label, s3url, thumb_s3))
con.commit()
print(f"[ARGOS] stored {s3url}")
def main():
m=mqtt.Client(client_id=CFG["mqtt"]["client_id"], clean_session=True)
m.connect(CFG["mqtt"]["host"], CFG["mqtt"]["port"], keepalive=60)
m.subscribe(CFG["mqtt"]["topic"])
m.on_message=on_message
m.loop_forever()
if __name__=="__main__": main()