160 lines
5.5 KiB
YAML
160 lines
5.5 KiB
YAML
apiVersion: v1
|
|
kind: ConfigMap
|
|
metadata:
|
|
name: argos-orchestrator-config
|
|
namespace: argos-core
|
|
data:
|
|
settings.yaml: |
|
|
mqtt:
|
|
host: mosquitto.argos-core.svc.cluster.local
|
|
topic: frigate/events
|
|
topic_base: argos/alerts
|
|
port: 1883
|
|
client_id: argos-core
|
|
minio:
|
|
endpoint: minio.argos-core.svc.cluster.local:9000
|
|
secure: false
|
|
bucket: argos
|
|
region: us-east-1
|
|
edges:
|
|
- name: rpi01
|
|
base_url: http://10.20.0.10:5000 # URL del Frigate del edge (VPN). 5000 por defecto.
|
|
api_token: "" # si usas auth, ponlo aquí
|
|
cameras:
|
|
- name: cam1
|
|
rtsp_main: rtsp://10.20.0.10:8554/cam1_main
|
|
path_mtx: cam1 # para abrir live en MediaMTX: /?path=cam1
|
|
app.py: |
|
|
import os, json, time, datetime, sqlite3, subprocess, yaml, requests
|
|
from pathlib import Path
|
|
from urllib.parse import urljoin
|
|
import paho.mqtt.client as mqtt
|
|
from minio import Minio
|
|
|
|
CFG_PATH="/app/settings.yaml"
|
|
DB_PATH="/data/argos.db"
|
|
TMP="/tmp"
|
|
with open(CFG_PATH,"r") as f:
|
|
CFG=yaml.safe_load(f)
|
|
|
|
# Map quick-lookup: camera -> (edge, cfg)
|
|
CAM_MAP={}
|
|
for e in CFG.get("edges", []):
|
|
for c in e.get("cameras", []):
|
|
CAM_MAP[c["name"]]=(e, c)
|
|
|
|
# MinIO
|
|
mc=Minio(os.getenv("MINIO_ENDPOINT","s3.argos.interna"),
|
|
access_key=os.getenv("MINIO_ACCESS_KEY"),
|
|
secret_key=os.getenv("MINIO_SECRET_KEY"),
|
|
secure=os.getenv("MINIO_SECURE","false").lower()=="true")
|
|
BUCKET=CFG["minio"]["bucket"]
|
|
if not mc.bucket_exists(BUCKET): mc.make_bucket(BUCKET)
|
|
|
|
# DB
|
|
Path("/data").mkdir(parents=True, exist_ok=True)
|
|
con=sqlite3.connect(DB_PATH, check_same_thread=False)
|
|
cur=con.cursor()
|
|
cur.execute("""CREATE TABLE IF NOT EXISTS events(
|
|
id TEXT PRIMARY KEY, ts INTEGER, edge TEXT, camera TEXT, label TEXT,
|
|
s3url TEXT, thumb_s3 TEXT
|
|
)""")
|
|
con.commit()
|
|
|
|
def upload_file(local_path, key, content_type):
|
|
mc.fput_object(BUCKET, key, local_path, content_type=content_type)
|
|
return f"s3://{BUCKET}/{key}"
|
|
|
|
def fetch_frigate_clip(edge_cfg, ev_id):
|
|
""" Descarga el clip nativo de Frigate si existe """
|
|
base=edge_cfg["base_url"]
|
|
url=urljoin(base, f"/api/events/{ev_id}/clip")
|
|
headers={}
|
|
if edge_cfg.get("api_token"):
|
|
headers["Authorization"]=f"Bearer {edge_cfg['api_token']}"
|
|
r=requests.get(url, headers=headers, stream=True, timeout=30)
|
|
if r.status_code!=200: return None
|
|
tmp=f"{TMP}/{ev_id}.mp4"
|
|
with open(tmp,"wb") as f:
|
|
for chunk in r.iter_content(1<<20):
|
|
if chunk: f.write(chunk)
|
|
return tmp
|
|
|
|
def fetch_frigate_thumb(edge_cfg, ev_id):
|
|
base=edge_cfg["base_url"]
|
|
url=urljoin(base, f"/api/events/{ev_id}/thumbnail.jpg")
|
|
headers={}
|
|
if edge_cfg.get("api_token"):
|
|
headers["Authorization"]=f"Bearer {edge_cfg['api_token']}"
|
|
r=requests.get(url, headers=headers, timeout=10)
|
|
if r.status_code!=200: return None
|
|
tmp=f"{TMP}/{ev_id}.jpg"
|
|
with open(tmp,"wb") as f: f.write(r.content)
|
|
return tmp
|
|
|
|
def record_rtsp(rtsp_url, seconds=30):
|
|
tmp=f"{TMP}/rtsp_{int(time.time())}.mp4"
|
|
cmd=["ffmpeg","-nostdin","-y","-rtsp_transport","tcp","-i",rtsp_url,"-t",str(seconds),"-c","copy",tmp]
|
|
try:
|
|
subprocess.run(cmd, check=True)
|
|
return tmp
|
|
except Exception as e:
|
|
print("FFmpeg fallback failed:", e)
|
|
return None
|
|
|
|
def on_message(client, userdata, msg):
|
|
try:
|
|
payload=json.loads(msg.payload.decode("utf-8"))
|
|
except Exception as e:
|
|
print("Bad JSON", e); return
|
|
|
|
ev_type=payload.get("type")
|
|
after=payload.get("after") or {}
|
|
ev_id=after.get("id") or payload.get("id")
|
|
cam=after.get("camera") or payload.get("camera")
|
|
label=after.get("label") or payload.get("label","")
|
|
|
|
if ev_type!="new" or not cam or not ev_id: return
|
|
if cam not in CAM_MAP:
|
|
print("Unknown camera:", cam); return
|
|
|
|
edge_cfg, cam_cfg = CAM_MAP[cam]
|
|
ts=int(time.time())
|
|
print(f"[ARGOS] {ev_id} {cam} → try Frigate clip")
|
|
path_local = fetch_frigate_clip(edge_cfg, ev_id)
|
|
if not path_local:
|
|
print(f"[ARGOS] {ev_id} no native clip, fallback RTSP")
|
|
path_local = record_rtsp(cam_cfg["rtsp_main"], seconds=30)
|
|
if not path_local:
|
|
print(f"[ARGOS] {ev_id} failed recording"); return
|
|
|
|
# upload clip
|
|
date=datetime.datetime.utcfromtimestamp(ts)
|
|
key=f"{cam}/{date.year:04d}/{date.month:02d}/{date.day:02d}/{ts}_{ev_id}.mp4"
|
|
s3url=upload_file(path_local, key, "video/mp4")
|
|
try: os.remove(path_local)
|
|
except: pass
|
|
|
|
# thumbnail
|
|
thumb_local = fetch_frigate_thumb(edge_cfg, ev_id)
|
|
thumb_s3=None
|
|
if thumb_local:
|
|
tkey=f"{cam}/thumbs/{ts}_{ev_id}.jpg"
|
|
thumb_s3=upload_file(thumb_local, tkey, "image/jpeg")
|
|
try: os.remove(thumb_local)
|
|
except: pass
|
|
|
|
cur.execute("INSERT OR REPLACE INTO events(id, ts, edge, camera, label, s3url, thumb_s3) VALUES (?,?,?,?,?,?,?)",
|
|
(ev_id, ts, edge_cfg["name"], cam, label, s3url, thumb_s3))
|
|
con.commit()
|
|
print(f"[ARGOS] stored {s3url}")
|
|
|
|
def main():
|
|
m=mqtt.Client(client_id=CFG["mqtt"]["client_id"], clean_session=True)
|
|
m.connect(CFG["mqtt"]["host"], CFG["mqtt"]["port"], keepalive=60)
|
|
m.subscribe(CFG["mqtt"]["topic"])
|
|
m.on_message=on_message
|
|
m.loop_forever()
|
|
|
|
if __name__=="__main__": main()
|