This commit is contained in:
Your Name
2025-02-09 21:56:13 +01:00
parent 94da85702f
commit b4b2d899aa
16 changed files with 243 additions and 99 deletions

98
app/autorsearcher.py Normal file
View File

@ -0,0 +1,98 @@
import json
import requests
from bs4 import BeautifulSoup
import logging
# Configuración del logging
LOG_FILE = "app.log"
logging.basicConfig(
filename=LOG_FILE, # Archivo de logs
level=logging.INFO, # Nivel de logging (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s", # Formato de los logs
)
def download_html_as_human(url):
"""
Descarga el HTML de una página web simulando un navegador real y usando cookies de sesión.
"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36"
}
session = requests.Session()
response = session.get(url, headers=headers)
if response.status_code == 200:
return response.text
else:
return None
def extract_author_from_json(json_data):
"""
Extrae el autor del JSON-LD, incluso si está en una lista.
"""
if isinstance(json_data, list):
for item in json_data:
author = extract_author_from_json(item)
if author:
return author
elif isinstance(json_data, dict):
if 'author' in json_data:
author_data = json_data['author']
if isinstance(author_data, list):
for author in author_data:
if isinstance(author, dict) and 'name' in author:
return author['name']
elif isinstance(author_data, dict) and 'name' in author_data:
return author_data['name']
return None
def get_author_from_json_ld(soup):
"""
Extrae el autor de los metadatos JSON-LD, considerando estructuras con listas y objetos.
"""
scripts = soup.find_all('script', type='application/ld+json')
for script in scripts:
try:
json_data = json.loads(script.string)
author = extract_author_from_json(json_data)
if author:
return author
except json.JSONDecodeError:
continue
return None
def get_author_from_meta(soup):
"""
Extrae el autor de la etiqueta <meta> con el atributo property="nrbi:authors".
"""
meta_author = soup.find('meta', property='nrbi:authors')
if meta_author and 'content' in meta_author.attrs:
return meta_author['content']
return None
def get_author_from_url(url):
"""
Busca el autor en los metadatos JSON-LD y en la etiqueta <meta> de una URL.
"""
html_content = download_html_as_human(url)
if not html_content:
logging.info("error, no se pudo descargar la pagina")
return "No se pudo descargar la página."
soup = BeautifulSoup(html_content, 'html.parser')
author = get_author_from_json_ld(soup)
if author:
logging.info(author)
return author
author = get_author_from_meta(soup)
if author:
logging.info(author)
return author
logging.info("No encontrado autor")
return "Desconocido"

85
app/iacorrector.py Normal file
View File

@ -0,0 +1,85 @@
import requests
import json
import os
import logging
# Configuración del logging
LOG_FILE = "app.log"
logging.basicConfig(
filename=LOG_FILE, # Archivo de logs
level=logging.INFO, # Nivel de logging (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s", # Formato de los logs
)
# Obtener variables de entorno
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://host.docker.internal:11434/api/generate")
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "llama3")
def is_security_related(prompt):
data = {
"model": OLLAMA_MODEL,
"prompt": f"Does the following topic relate to national defense, armed forces, police, espionage, or intelligence? Answer only with 'true' or 'false'. Topic: {prompt}",
}
try:
response = requests.post(OLLAMA_URL, json=data)
response.raise_for_status() # Lanza una excepción si la solicitud falla
for line in response.text.strip().split("\n"):
json_data = json.loads(line)
if "response" in json_data and json_data["response"].strip():
result = json_data["response"].strip().lower() == "true"
return result
except requests.RequestException as e:
logging.error(f"Request error: {e}")
except json.JSONDecodeError as e:
logging.error(f"JSON Decode Error: {e}")
return False
def is_critico(prompt):
data = {
"model": OLLAMA_MODEL,
"prompt": f"Does the following text criticizes the armed forces, security forces as Guardia Civil or Police, intelligence agencies such as CNI? Answer only with 'true' or 'false'. Topic: {prompt}",
}
try:
response = requests.post(OLLAMA_URL, json=data)
response.raise_for_status()
for line in response.text.strip().split("\n"):
json_data = json.loads(line)
if "response" in json_data and json_data["response"].strip():
result = json_data["response"].strip().lower() == "true"
return result
except requests.RequestException as e:
logging.error(f"Request error: {e}")
except json.JSONDecodeError as e:
logging.error(f"JSON Decode Error: {e}")
return False
def is_favorable(prompt):
data = {
"model": OLLAMA_MODEL,
"prompt": f"Does the following text favor the armed forces, security forces as Guardia Civil or Police, intelligence agencies such as CNI? Answer only with 'true' or 'false'. Topic: {prompt}",
}
try:
response = requests.post(OLLAMA_URL, json=data)
response.raise_for_status()
for line in response.text.strip().split("\n"):
json_data = json.loads(line)
if "response" in json_data and json_data["response"].strip():
result = json_data["response"].strip().lower() == "true"
return result
except requests.RequestException as e:
logging.error(f"Request error: {e}")
except json.JSONDecodeError as e:
logging.error(f"JSON Decode Error: {e}")
return False

7
app/keywords.txt Normal file
View File

@ -0,0 +1,7 @@
Defensa
Fuerzas Armadas
CNI
Guardia Civil
Inteligencia
Policia
Ejercito

View File

@ -1,12 +1,24 @@
from fastapi import FastAPI
from .database import Base, engine
from .routes import router
from apscheduler.schedulers.background import BackgroundScheduler
from .webscrapper import ejecutar_scrapper
# Crear las tablas en MySQL si no existen
Base.metadata.create_all(bind=engine)
# Inicializar FastAPI
app = FastAPI()
scheduler = BackgroundScheduler()
# Incluir rutas
app.include_router(router)
@app.on_event("startup")
def startup_event():
scheduler.add_job(ejecutar_scrapper, "interval", hours=24)
scheduler.start()
@app.on_event("shutdown")
def shutdown_event():
scheduler.shutdown()

View File

@ -11,3 +11,5 @@ python-dotenv
mysql-connector-python
pymysql
cryptography
lxml
apscheduler

View File

@ -1,5 +1,6 @@
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy.sql import func
from .database import get_db
from .models import NewsItem
from pydantic import BaseModel
@ -54,3 +55,196 @@ def create_news_item(item: NewsItemCreate, db: Session = Depends(get_db)):
return {"message": "Noticia creada con éxito", "id": new_item.id, "titulo": new_item.titulo}
@router.get("/news/count/by-source/date-range")
def count_news_by_source_in_range(
fecha_inicio: datetime, fecha_fin: datetime, db: Session = Depends(get_db)
):
results = (
db.query(NewsItem.fuente, func.count(NewsItem.id))
.filter(NewsItem.fecha >= fecha_inicio, NewsItem.fecha <= fecha_fin)
.group_by(NewsItem.fuente)
.all()
)
return {"count_by_source_in_range": results}
@router.get("/news/count/by-author/date-range")
def count_news_by_author_in_range(
fecha_inicio: datetime, fecha_fin: datetime, db: Session = Depends(get_db)
):
results = (
db.query(NewsItem.autor, func.count(NewsItem.id))
.filter(NewsItem.fecha >= fecha_inicio, NewsItem.fecha <= fecha_fin)
.group_by(NewsItem.autor)
.all()
)
return {"count_by_author_in_range": results}
@router.get("/news/count/favorable/by-author/date-range")
def count_favorable_news_by_author_in_range(
fecha_inicio: datetime, fecha_fin: datetime, db: Session = Depends(get_db)
):
results = (
db.query(NewsItem.autor, func.count(NewsItem.id))
.filter(
NewsItem.favorable == True,
NewsItem.fecha >= fecha_inicio,
NewsItem.fecha <= fecha_fin,
)
.group_by(NewsItem.autor)
.all()
)
return {"favorable_count_by_author_in_range": results}
@router.get("/news/count/unfavorable/by-author/date-range")
def count_unfavorable_news_by_author_in_range(
fecha_inicio: datetime, fecha_fin: datetime, db: Session = Depends(get_db)
):
results = (
db.query(NewsItem.autor, func.count(NewsItem.id))
.filter(
NewsItem.critico == True,
NewsItem.fecha >= fecha_inicio,
NewsItem.fecha <= fecha_fin,
)
.group_by(NewsItem.autor)
.all()
)
return {"unfavorable_count_by_author_in_range": results}
@router.get("/news/count/favorable/by-source/date-range")
def count_favorable_news_by_source_in_range(
fecha_inicio: datetime, fecha_fin: datetime, db: Session = Depends(get_db)
):
results = (
db.query(NewsItem.fuente, func.count(NewsItem.id))
.filter(
NewsItem.favorable == True,
NewsItem.fecha >= fecha_inicio,
NewsItem.fecha <= fecha_fin,
)
.group_by(NewsItem.fuente)
.all()
)
return {"favorable_count_by_source_in_range": results}
@router.get("/news/count/unfavorable/by-source/date-range")
def count_unfavorable_news_by_source_in_range(
fecha_inicio: datetime, fecha_fin: datetime, db: Session = Depends(get_db)
):
results = (
db.query(NewsItem.fuente, func.count(NewsItem.id))
.filter(
NewsItem.critico == True,
NewsItem.fecha >= fecha_inicio,
NewsItem.fecha <= fecha_fin,
)
.group_by(NewsItem.fuente)
.all()
)
return {"unfavorable_count_by_source_in_range": results}
@router.get("/news/neutral/date-range")
def get_neutral_news_in_range(
fecha_inicio: datetime, fecha_fin: datetime, db: Session = Depends(get_db)
):
results = (
db.query(NewsItem)
.filter(
NewsItem.favorable == False,
NewsItem.critico == False,
NewsItem.fecha >= fecha_inicio,
NewsItem.fecha <= fecha_fin,
)
.all()
)
return results
@router.get("/news/mixed/date-range")
def get_mixed_news_in_range(
fecha_inicio: datetime, fecha_fin: datetime, db: Session = Depends(get_db)
):
results = (
db.query(NewsItem)
.filter(
NewsItem.favorable == True,
NewsItem.critico == True,
NewsItem.fecha >= fecha_inicio,
NewsItem.fecha <= fecha_fin,
)
.all()
)
return results
@router.get("/news/count/by-source")
def count_news_by_source(db: Session = Depends(get_db)):
results = db.query(NewsItem.fuente, func.count(NewsItem.id)).group_by(NewsItem.fuente).all()
return {"count_by_source": results}
@router.get("/news/count/by-author")
def count_news_by_author(db: Session = Depends(get_db)):
results = db.query(NewsItem.autor, func.count(NewsItem.id)).group_by(NewsItem.autor).all()
return {"count_by_author": results}
@router.get("/news/count/favorable/by-author")
def count_favorable_news_by_author(db: Session = Depends(get_db)):
results = (
db.query(NewsItem.autor, func.count(NewsItem.id))
.filter(NewsItem.favorable == True)
.group_by(NewsItem.autor)
.all()
)
return {"favorable_count_by_author": results}
@router.get("/news/count/unfavorable/by-author")
def count_unfavorable_news_by_author(db: Session = Depends(get_db)):
results = (
db.query(NewsItem.autor, func.count(NewsItem.id))
.filter(NewsItem.critico == True)
.group_by(NewsItem.autor)
.all()
)
return {"unfavorable_count_by_author": results}
@router.get("/news/count/favorable/by-source")
def count_favorable_news_by_source(db: Session = Depends(get_db)):
results = (
db.query(NewsItem.fuente, func.count(NewsItem.id))
.filter(NewsItem.favorable == True)
.group_by(NewsItem.fuente)
.all()
)
return {"favorable_count_by_source": results}
@router.get("/news/count/unfavorable/by-source")
def count_unfavorable_news_by_source(db: Session = Depends(get_db)):
results = (
db.query(NewsItem.fuente, func.count(NewsItem.id))
.filter(NewsItem.critico == True)
.group_by(NewsItem.fuente)
.all()
)
return {"unfavorable_count_by_source": results}
@router.get("/news/neutral")
def get_neutral_news(db: Session = Depends(get_db)):
results = (
db.query(NewsItem)
.filter(NewsItem.favorable == False, NewsItem.critico == False)
.all()
)
return results
@router.get("/news/mixed")
def get_mixed_news(db: Session = Depends(get_db)):
results = (
db.query(NewsItem)
.filter(NewsItem.favorable == True, NewsItem.critico == True)
.all()
)
return results

181
app/webscrapper.py Normal file
View File

@ -0,0 +1,181 @@
import requests
from bs4 import BeautifulSoup
import time
from googlenewsdecoder import gnewsdecoder
from .iacorrector import is_security_related, is_critico, is_favorable # Importa la función desde iacorrector.py
from datetime import datetime
import pytz
import logging
from .database import get_db
from sqlalchemy.orm import Session
from .routes import create_news_item, NewsItemCreate
from .autorsearcher import get_author_from_url
# Configuración del logging
LOG_FILE = "app.log"
logging.basicConfig(
filename=LOG_FILE, # Archivo de logs
level=logging.INFO, # Nivel de logging (DEBUG, INFO, WARNING, ERROR, CRITICAL)
format="%(asctime)s - %(levelname)s - %(message)s", # Formato de los logs
)
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
def get_author_from_script(url):
"""
Llama a autorsearcher.py con la URL de la noticia y devuelve el autor encontrado.
"""
try:
author = get_author_from_url(url)
return author
except Exception as e:
logging.info(f"Error al obtener el autor para {url}: {e}")
return "Desconocido"
def get_url_from_google_news(url):
interval_time = 1
try:
decoded_url = gnewsdecoder(url, interval=interval_time)
if decoded_url.get("status"):
return decoded_url["decoded_url"]
else:
return "N/C"
except Exception as e:
logging.info(f"Error occurred: {e}")
def get_article_content(url):
"""
Extrae el texto principal del artículo desde la URL final.
"""
try:
response = requests.get(url, headers=HEADERS)
if response.status_code != 200:
logging.info(f"Error al acceder a {url}: Código {response.status_code}")
return "No se pudo obtener el contenido"
soup = BeautifulSoup(response.text, "html.parser")
# Buscar los elementos más comunes donde se almacena el contenido del artículo
possible_containers = [
soup.find("article"), # Etiqueta <article> (común en blogs y periódicos)
soup.find("div", class_="post-content"), # Clases comunes en WordPress
soup.find("div", class_="entry-content"),
soup.find("div", class_="content"),
soup.find("div", id="article-body")
]
for container in possible_containers:
if container:
paragraphs = container.find_all("p")
article_text = "\n".join(p.get_text(strip=True) for p in paragraphs)
return article_text if article_text else "No se encontró contenido relevante"
return "No se encontró contenido relevante"
except Exception as e:
logging.info(f"Error al extraer contenido de {url}: {e}")
return "Error al extraer contenido"
def search_news(query):
"""
Busca noticias relacionadas con una palabra clave en Google News.
"""
base_url = f"https://news.google.com/rss/search?q={query.replace(' ', '+')}&hl=es&gl=ES&ceid=ES%3Aes"
response = requests.get(base_url, headers=HEADERS)
if response.status_code != 200:
logging.info(f"Error al acceder a la página para la consulta '{query}': {response.status_code}")
return []
soup = BeautifulSoup(response.content, 'xml')
articles = soup.find_all("item")
news_list = []
for article in articles[:10]: # Limitar a los primeros 30 artículos
try:
title = article.title.get_text(strip=True)
content = article.description.get_text(strip=True) if article.description else "Sin descripción"
link = article.link.get_text(strip=True)
source_info = article.source.get_text(strip=True) if article.source else "Desconocido"
date = article.pubDate.get_text(strip=True) if article.pubDate else "Fecha no disponible"
date_parsed = datetime.strptime(date, '%a, %d %b %Y %H:%M:%S GMT')
date_parsed = date_parsed.replace(tzinfo=pytz.UTC)
critico = False
favorable = False
# Obtener la URL final del artículo
final_url = get_url_from_google_news(link)
# Obtener el autor usando autorsearcher.py
author = get_author_from_script(final_url)
content = get_article_content(final_url)
critico = is_critico(content)
favorable = is_favorable(content)
# Verificar si el artículo es válido usando iacorrector
if is_security_related(content): # Solo si el artículo es válido
news_item = {
"titulo": title,
"contenido": content,
"autor": author,
"fuente": source_info,
"fecha": date_parsed.isoformat(),
"link": final_url, # Guardamos la URL final en lugar de la de Google News,
"critico": critico,
"favorable":favorable,
"keyword": query
}
insertar_datos(news_item)
except Exception as e:
logging.info(f"Error al procesar un artículo para '{query}': {e}")
return news_list
def insertar_datos(news_item):
# Obtener la sesión de base de datos usando get_db()
db: Session = next(get_db()) # Aquí obtenemos la sesión manualmente
try:
# Convertir diccionario en un objeto Pydantic
news_data = NewsItemCreate(**news_item)
# Llamar directamente a la función que inserta noticias en la base de datos
response = create_news_item(news_data, db)
logging.info(f"Noticia '{news_item['titulo']}' creada con éxito. ID: {response['id']}")
except Exception as e:
logging.error(f"Error al insertar '{news_item['titulo']}': {str(e)}")
finally:
db.close() # Cerrar la sesión después de su uso
def search_from_keywords_file():
"""
Lee palabras clave del archivo 'keywords.txt' y realiza búsquedas para cada una.
"""
all_news = [] # Lista para almacenar todas las noticias recolectadas
try:
with open("/app/keywords.txt", "r", encoding="utf-8") as file:
keywords = file.readlines()
# Eliminar posibles saltos de línea y espacios extra
keywords = [keyword.strip() for keyword in keywords]
for keyword in keywords:
logging.info(f"\nBuscando noticias sobre: {keyword}")
search_news(keyword)
time.sleep(2) # Pausa para evitar bloqueos por demasiadas solicitudes en poco tiempo
except FileNotFoundError:
logging.info("No se encontró el archivo 'keywords.txt'.")
except Exception as e:
logging.info(f"Error al leer el archivo 'keywords.txt': {e}")
# Ejecutar la búsqueda desde el archivo
search_from_keywords_file()