245 lines
9.4 KiB
Python
245 lines
9.4 KiB
Python
import requests
|
|
from bs4 import BeautifulSoup
|
|
import time
|
|
from googlenewsdecoder import gnewsdecoder
|
|
# from iacorrector import is_economy_related # Importa la función desde iacorrector.py
|
|
from datetime import datetime
|
|
import pytz
|
|
import logging
|
|
from database import get_db
|
|
from sqlalchemy.orm import Session
|
|
from routes import create_news_item, NewsItemCreate
|
|
from autorsearcher import get_author_from_url
|
|
|
|
# Configuración del logging
|
|
LOG_FILE = "app.log"
|
|
logging.basicConfig(
|
|
filename=LOG_FILE, # Archivo de logs
|
|
level=logging.INFO, # Nivel de logging (DEBUG, INFO, WARNING, ERROR, CRITICAL)
|
|
format="%(asctime)s - %(levelname)s - %(message)s", # Formato de los logs
|
|
)
|
|
|
|
HEADERS = {
|
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
|
|
}
|
|
|
|
TELEGRAM_BOT_URL = "http://telegrambot-app:8080/telegram/send"
|
|
|
|
def get_author_from_script(url):
|
|
"""
|
|
Llama a autorsearcher.py con la URL de la noticia y devuelve el autor encontrado.
|
|
"""
|
|
try:
|
|
author = get_author_from_url(url)
|
|
return author
|
|
except Exception as e:
|
|
logging.info(f"Error al obtener el autor para {url}: {e}")
|
|
return "Desconocido"
|
|
|
|
def get_url_from_google_news(url):
|
|
interval_time = 1
|
|
try:
|
|
decoded_url = gnewsdecoder(url, interval=interval_time)
|
|
|
|
if decoded_url.get("status"):
|
|
return decoded_url["decoded_url"]
|
|
else:
|
|
return "N/C"
|
|
except Exception as e:
|
|
logging.info(f"Error occurred: {e}")
|
|
|
|
def get_article_content(url):
|
|
"""
|
|
Extrae el texto principal del artículo desde la URL final.
|
|
"""
|
|
try:
|
|
response = requests.get(url, headers=HEADERS)
|
|
if response.status_code != 200:
|
|
logging.info(f"Error al acceder a {url}: Código {response.status_code}")
|
|
return "No se pudo obtener el contenido"
|
|
|
|
soup = BeautifulSoup(response.text, "html.parser")
|
|
|
|
# Buscar los elementos más comunes donde se almacena el contenido del artículo
|
|
possible_containers = [
|
|
soup.find("article"), # Etiqueta <article> (común en blogs y periódicos)
|
|
soup.find("div", class_="post-content"), # Clases comunes en WordPress
|
|
soup.find("div", class_="entry-content"),
|
|
soup.find("div", class_="content"),
|
|
soup.find("div", id="article-body")
|
|
]
|
|
|
|
for container in possible_containers:
|
|
if container:
|
|
paragraphs = container.find_all("p")
|
|
article_text = "\n".join(p.get_text(strip=True) for p in paragraphs)
|
|
return article_text if article_text else "No se encontró contenido relevante"
|
|
|
|
return "No se encontró contenido relevante"
|
|
|
|
except Exception as e:
|
|
logging.info(f"Error al extraer contenido de {url}: {e}")
|
|
return "Error al extraer contenido"
|
|
|
|
def search_news(query):
|
|
"""
|
|
Busca noticias relacionadas con una palabra clave en Google News.
|
|
"""
|
|
base_url = f"https://news.google.com/rss/search?q={query.replace(' ', '+')}&hl=es&gl=ES&ceid=ES%3Aes"
|
|
response = requests.get(base_url, headers=HEADERS)
|
|
|
|
if response.status_code != 200:
|
|
logging.info(f"Error al acceder a la página para la consulta '{query}': {response.status_code}")
|
|
return []
|
|
|
|
soup = BeautifulSoup(response.content, 'xml')
|
|
articles = soup.find_all("item")
|
|
news_list = []
|
|
|
|
for article in articles[:12]: # Limitar a los primeros 12 artículos
|
|
try:
|
|
title = article.title.get_text(strip=True)
|
|
content = article.description.get_text(strip=True) if article.description else "Sin descripción"
|
|
link = article.link.get_text(strip=True)
|
|
source_info = article.source.get_text(strip=True) if article.source else "Desconocido"
|
|
date = article.pubDate.get_text(strip=True) if article.pubDate else "Fecha no disponible"
|
|
date_parsed = datetime.strptime(date, '%a, %d %b %Y %H:%M:%S GMT')
|
|
date_parsed = date_parsed.replace(tzinfo=pytz.UTC)
|
|
|
|
# Obtener la URL final del artículo
|
|
final_url = get_url_from_google_news(link)
|
|
|
|
# Obtener el autor usando autorsearcher.py
|
|
author = get_author_from_script(final_url)
|
|
content = get_article_content(final_url)
|
|
|
|
news_item = {
|
|
"titulo": title,
|
|
"contenido": content,
|
|
"autor": author,
|
|
"fuente": source_info,
|
|
"fecha": date_parsed.isoformat(),
|
|
"link": final_url, # Guardamos la URL final en lugar de la de Google News,
|
|
"keyword": query
|
|
}
|
|
insertar_datos(news_item)
|
|
|
|
# Verificar si el artículo es válido usando iacorrector
|
|
# if is_economy_related(content): # Solo si el artículo es válido
|
|
# news_item = {
|
|
# "titulo": title,
|
|
# "contenido": content,
|
|
# "autor": author,
|
|
# "fuente": source_info,
|
|
# "fecha": date_parsed.isoformat(),
|
|
# "link": final_url, # Guardamos la URL final en lugar de la de Google News,
|
|
# "critico": critico,
|
|
# "favorable":favorable,
|
|
# "keyword": query
|
|
# }
|
|
# insertar_datos(news_item)
|
|
|
|
except Exception as e:
|
|
logging.info(f"Error al procesar un artículo para '{query}': {e}")
|
|
|
|
return news_list
|
|
|
|
def insertar_datos(news_item):
|
|
# Obtener la sesión de base de datos usando get_db()
|
|
db: Session = next(get_db()) # Aquí obtenemos la sesión manualmente
|
|
try:
|
|
# Convertir diccionario en un objeto Pydantic
|
|
news_data = NewsItemCreate(**news_item)
|
|
|
|
# Llamar directamente a la función que inserta noticias en la base de datos
|
|
response = create_news_item(news_data, db)
|
|
|
|
logging.info(f"Noticia '{news_item['titulo']}' creada con éxito. ID: {response['id']}")
|
|
|
|
except Exception as e:
|
|
logging.error(f"Error al insertar '{news_item['titulo']}': {str(e)}")
|
|
|
|
finally:
|
|
db.close() # Cerrar la sesión después de su uso
|
|
|
|
def search_from_keywords_file():
|
|
"""
|
|
Lee palabras clave del archivo 'keywords.txt' y realiza búsquedas para cada una.
|
|
"""
|
|
all_news = [] # Lista para almacenar todas las noticias recolectadas
|
|
|
|
try:
|
|
with open("/app/keywords.txt", "r", encoding="utf-8") as file:
|
|
keywords = file.readlines()
|
|
|
|
# Eliminar posibles saltos de línea y espacios extra
|
|
keywords = [keyword.strip() for keyword in keywords]
|
|
|
|
for keyword in keywords:
|
|
logging.info(f"\nBuscando noticias sobre: {keyword}")
|
|
search_news(keyword)
|
|
time.sleep(2) # Pausa para evitar bloqueos por demasiadas solicitudes en poco tiempo
|
|
|
|
except FileNotFoundError:
|
|
logging.info("No se encontró el archivo 'keywords.txt'.")
|
|
except Exception as e:
|
|
logging.info(f"Error al leer el archivo 'keywords.txt': {e}")
|
|
|
|
# Ejecutar la búsqueda desde el archivo
|
|
|
|
def search_from_indices_file():
|
|
all_indices=[]
|
|
|
|
try:
|
|
with open("app/indices.txt", "r", encoding="utf-8") as file:
|
|
indices = file.readlines()
|
|
indices = [indice.strip() for indice in indices]
|
|
|
|
for indice in indices:
|
|
logging.info(f"\nAnalizando indice: {indice}")
|
|
search_indice(indice)
|
|
time.sleep(2)
|
|
except FileNotFoundError:
|
|
logging.info("No se encontró el archivo 'indices.txt'.")
|
|
except Exception as e:
|
|
logging.info(f"Error al leer el archivo 'indices.txt': {e}")
|
|
|
|
def search_indice(indice):
|
|
base_url = f"https://www.investing.com/indices/{indice}"
|
|
|
|
try:
|
|
response = requests.get(base_url, headers=HEADERS)
|
|
|
|
if response.status_code != 200:
|
|
logging.info(f"Error al acceder a la página para la consulta '{indice}': {response.status_code}")
|
|
return
|
|
|
|
soup = BeautifulSoup(response.content, 'html.parser')
|
|
|
|
# Buscar los valores dentro del HTML
|
|
price = soup.find("div", {"data-test": "instrument-price-last"})
|
|
price_change = soup.find("span", {"data-test": "instrument-price-change"})
|
|
price_change_percent = soup.find("span", {"data-test": "instrument-price-change-percent"})
|
|
|
|
|
|
if price and price_change and price_change_percent:
|
|
data = {
|
|
"indice": indice,
|
|
"valorActual": price.text.replace(",", "").strip(), # Convertir a número
|
|
"cambio": price_change.text.replace(",", "").strip(), # Convertir a número
|
|
"porcentaje": price_change_percent.text.strip()
|
|
}
|
|
logging.info(data)
|
|
# Enviar los datos al bot de Telegram
|
|
response_telegram = requests.post(TELEGRAM_BOT_URL, json=data)
|
|
|
|
if response_telegram.status_code == 200:
|
|
logging.info(f"Mensaje enviado a Telegram correctamente para '{indice}'")
|
|
else:
|
|
logging.error(f"Error enviando mensaje a Telegram: {response_telegram.status_code} - {response_telegram.text}")
|
|
else:
|
|
logging.info(f"No se encontraron datos para el índice '{indice}'.")
|
|
|
|
except requests.RequestException as e:
|
|
logging.error(f"Error en la solicitud: {e}")
|
|
|