import requests from bs4 import BeautifulSoup import time from googlenewsdecoder import gnewsdecoder # from iacorrector import is_economy_related # Importa la función desde iacorrector.py from datetime import datetime import pytz import logging from database import get_db from sqlalchemy.orm import Session from routes import create_news_item, NewsItemCreate from autorsearcher import get_author_from_url # Configuración del logging LOG_FILE = "app.log" logging.basicConfig( filename=LOG_FILE, # Archivo de logs level=logging.INFO, # Nivel de logging (DEBUG, INFO, WARNING, ERROR, CRITICAL) format="%(asctime)s - %(levelname)s - %(message)s", # Formato de los logs ) HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" } TELEGRAM_BOT_URL = "http://telegrambot-app:8080/telegram/send" def get_author_from_script(url): """ Llama a autorsearcher.py con la URL de la noticia y devuelve el autor encontrado. """ try: author = get_author_from_url(url) return author except Exception as e: logging.info(f"Error al obtener el autor para {url}: {e}") return "Desconocido" def get_url_from_google_news(url): interval_time = 1 try: decoded_url = gnewsdecoder(url, interval=interval_time) if decoded_url.get("status"): return decoded_url["decoded_url"] else: return "N/C" except Exception as e: logging.info(f"Error occurred: {e}") def get_article_content(url): """ Extrae el texto principal del artículo desde la URL final. """ try: response = requests.get(url, headers=HEADERS) if response.status_code != 200: logging.info(f"Error al acceder a {url}: Código {response.status_code}") return "No se pudo obtener el contenido" soup = BeautifulSoup(response.text, "html.parser") # Buscar los elementos más comunes donde se almacena el contenido del artículo possible_containers = [ soup.find("article"), # Etiqueta
(común en blogs y periódicos) soup.find("div", class_="post-content"), # Clases comunes en WordPress soup.find("div", class_="entry-content"), soup.find("div", class_="content"), soup.find("div", id="article-body") ] for container in possible_containers: if container: paragraphs = container.find_all("p") article_text = "\n".join(p.get_text(strip=True) for p in paragraphs) return article_text if article_text else "No se encontró contenido relevante" return "No se encontró contenido relevante" except Exception as e: logging.info(f"Error al extraer contenido de {url}: {e}") return "Error al extraer contenido" def search_news(query): """ Busca noticias relacionadas con una palabra clave en Google News. """ base_url = f"https://news.google.com/rss/search?q={query.replace(' ', '+')}&hl=es&gl=ES&ceid=ES%3Aes" response = requests.get(base_url, headers=HEADERS) if response.status_code != 200: logging.info(f"Error al acceder a la página para la consulta '{query}': {response.status_code}") return [] soup = BeautifulSoup(response.content, 'xml') articles = soup.find_all("item") news_list = [] for article in articles[:12]: # Limitar a los primeros 12 artículos try: title = article.title.get_text(strip=True) content = article.description.get_text(strip=True) if article.description else "Sin descripción" link = article.link.get_text(strip=True) source_info = article.source.get_text(strip=True) if article.source else "Desconocido" date = article.pubDate.get_text(strip=True) if article.pubDate else "Fecha no disponible" date_parsed = datetime.strptime(date, '%a, %d %b %Y %H:%M:%S GMT') date_parsed = date_parsed.replace(tzinfo=pytz.UTC) # Obtener la URL final del artículo final_url = get_url_from_google_news(link) # Obtener el autor usando autorsearcher.py author = get_author_from_script(final_url) content = get_article_content(final_url) news_item = { "titulo": title, "contenido": content, "autor": author, "fuente": source_info, "fecha": date_parsed.isoformat(), "link": final_url, # Guardamos la URL final en lugar de la de Google News, "keyword": query } insertar_datos(news_item) # Verificar si el artículo es válido usando iacorrector # if is_economy_related(content): # Solo si el artículo es válido # news_item = { # "titulo": title, # "contenido": content, # "autor": author, # "fuente": source_info, # "fecha": date_parsed.isoformat(), # "link": final_url, # Guardamos la URL final en lugar de la de Google News, # "critico": critico, # "favorable":favorable, # "keyword": query # } # insertar_datos(news_item) except Exception as e: logging.info(f"Error al procesar un artículo para '{query}': {e}") return news_list def insertar_datos(news_item): # Obtener la sesión de base de datos usando get_db() db: Session = next(get_db()) # Aquí obtenemos la sesión manualmente try: # Convertir diccionario en un objeto Pydantic news_data = NewsItemCreate(**news_item) # Llamar directamente a la función que inserta noticias en la base de datos response = create_news_item(news_data, db) logging.info(f"Noticia '{news_item['titulo']}' creada con éxito. ID: {response['id']}") except Exception as e: logging.error(f"Error al insertar '{news_item['titulo']}': {str(e)}") finally: db.close() # Cerrar la sesión después de su uso def search_from_keywords_file(): """ Lee palabras clave del archivo 'keywords.txt' y realiza búsquedas para cada una. """ all_news = [] # Lista para almacenar todas las noticias recolectadas try: with open("/app/keywords.txt", "r", encoding="utf-8") as file: keywords = file.readlines() # Eliminar posibles saltos de línea y espacios extra keywords = [keyword.strip() for keyword in keywords] for keyword in keywords: logging.info(f"\nBuscando noticias sobre: {keyword}") search_news(keyword) time.sleep(2) # Pausa para evitar bloqueos por demasiadas solicitudes en poco tiempo except FileNotFoundError: logging.info("No se encontró el archivo 'keywords.txt'.") except Exception as e: logging.info(f"Error al leer el archivo 'keywords.txt': {e}") # Ejecutar la búsqueda desde el archivo def search_from_indices_file(): all_indices=[] try: with open("app/indices.txt", "r", encoding="utf-8") as file: indices = file.readlines() indices = [indice.strip() for indice in indices] for indice in indices: logging.info(f"\nAnalizando indice: {indice}") search_indice(indice) time.sleep(2) except FileNotFoundError: logging.info("No se encontró el archivo 'indices.txt'.") except Exception as e: logging.info(f"Error al leer el archivo 'indices.txt': {e}") def search_indice(indice): base_url = f"https://www.investing.com/indices/{indice['url']}" try: response = requests.get(base_url, headers=HEADERS) if response.status_code != 200: logging.info(f"Error al acceder a la página para la consulta '{indice['nombre']}': {response.status_code}") return soup = BeautifulSoup(response.content, 'html.parser') # Buscar los valores dentro del HTML price = soup.find("div", {"data-test": "instrument-price-last"}) price_change = soup.find("span", {"data-test": "instrument-price-change"}) price_change_percent = soup.find("span", {"data-test": "instrument-price-change-percent"}) porcentaje = price_change_percent.text.strip().replace("(", "").replace(")", "").replace("%", "") if price and price_change and price_change_percent: data = { "indice": indice["nombre"], "valorActual": price.text.replace(",", "").strip(), "cambio": price_change.text.replace(",", "").strip(), "porcentaje": porcentaje } logging.info(data) # Enviar los datos al bot de Telegram response_telegram = requests.post(TELEGRAM_BOT_URL, json=data) if response_telegram.status_code == 200: logging.info(f"Mensaje enviado a Telegram correctamente para '{indice['nombre']}'") else: logging.error(f"Error enviando mensaje a Telegram: {response_telegram.status_code} - {response_telegram.text}") else: logging.info(f"No se encontraron datos para el índice '{indice['nombre']}'.") except requests.RequestException as e: logging.error(f"Error en la solicitud: {e}")