diff --git a/monitoring_service.py b/monitoring_service.py new file mode 100644 index 0000000..5a0a511 --- /dev/null +++ b/monitoring_service.py @@ -0,0 +1,70 @@ +# + +# +import logging +import asyncio +from datetime import datetime +from src.orchestrator import AppOrchestrator +from src.core.settings import settings +from src.core.logging_config import setup_logging +from src.analyzer import DataAnalyzer +from src.utils.telegram_sender import send_telegram_notification +# + +# +# description: "Главная точка входа для сервиса мониторинга. Запускается по расписанию (cron)." +# postconditions: +# - "Парсер запускается, данные сохраняются в БД." +# - "Если обнаружены изменения, отправляется отчет в Telegram." +# +# +async def main(): + """ + Основная асинхронная функция, которая запускает парсер, + анализирует данные и отправляет уведомление. + """ + run_id = datetime.now().strftime("%Y%m%d-%H%M%S") + setup_logging(run_id=run_id) + logger = logging.getLogger(__name__) + + logger.info(f"🚀 Запуск сервиса мониторинга. Run ID: {run_id}") + + try: + # 1. Запуск парсера + logger.info("Начало этапа парсинга...") + orchestrator = AppOrchestrator(settings=settings) + orchestrator.run() + logger.info("Этап парсинга завершен.") + + # 2. Анализ данных + logger.info("Начало этапа анализа данных...") + analyzer = DataAnalyzer() + report_message = analyzer.analyze() + logger.info("Этап анализа данных завершен.") + + # 3. Отправка отчета + if report_message: + logger.info("Обнаружены изменения, отправка отчета в Telegram...") + await send_telegram_notification(report_message) + else: + logger.info("Изменений не найдено, отправка отчета не требуется.") + + except Exception as e: + logger.critical(f"💥 Критическая ошибка в сервисе мониторинга: {e}", exc_info=True) + # Попытка отправить уведомление об ошибке + try: + error_message = f"❗️ Критическая ошибка в сервисе мониторинга\n\n
{e}
" + await send_telegram_notification(error_message) + except Exception as tg_e: + logger.error(f"Не удалось даже отправить уведомление об ошибке в Telegram: {tg_e}") + + logger.info("✅ Сервис мониторинга завершил работу.") + +# +# description: "Стандартный блок для запуска main() при выполнении скрипта." +# +if __name__ == "__main__": + # Используем asyncio.run() для запуска асинхронной функции main + asyncio.run(main()) + +# diff --git a/requirements.txt b/requirements.txt index f466b36..576a275 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,4 +17,12 @@ python-dotenv>=1.0.0 # ANCHOR: RabbitMQ_Dependencies # Семантика: Зависимости для работы с очередью сообщений RabbitMQ -pika>=1.3.0 \ No newline at end of file +pika>=1.3.0 + +# ANCHOR: Database_Dependencies +# Семантика: Зависимости для работы с базой данных +SQLAlchemy>=2.0.0 + +# ANCHOR: Telegram_Dependencies +# Семантика: Зависимости для отправки уведомлений в Telegram +python-telegram-bot>=21.0.0 \ No newline at end of file diff --git a/src/analyzer.py b/src/analyzer.py new file mode 100644 index 0000000..947c335 --- /dev/null +++ b/src/analyzer.py @@ -0,0 +1,116 @@ +# + +# +import logging +from typing import List, Dict, Tuple +from sqlalchemy.orm import sessionmaker +from src.core.models import ProductVariant, ParsingRun +from sqlalchemy import create_engine +from src.core.settings import settings +# + +# +# description: "Анализирует данные двух последних запусков парсера и выявляет изменения." +# +class DataAnalyzer: + # + def __init__(self): + self.engine = create_engine(settings.db_path.absolute().as_uri()) + self.Session = sessionmaker(bind=self.engine) + self.logger = logging.getLogger(__name__) + # + + # + def _get_last_two_runs(self, session) -> Tuple[str, str]: + """Возвращает ID двух последних успешных запусков.""" + runs = session.query(ParsingRun.run_id).order_by(ParsingRun.start_time.desc()).limit(2).all() + if len(runs) < 2: + self.logger.warning("Найдено менее двух запусков. Сравнение невозможно.") + return None, None + return runs[0][0], runs[1][0] # (current_run_id, previous_run_id) + # + + # + def _get_data_for_run(self, session, run_id: str) -> Dict[str, ProductVariant]: + """Возвращает данные о товарах для указанного запуска в виде словаря {url: ProductVariant}.""" + variants = session.query(ProductVariant).filter(ProductVariant.run_id == run_id).all() + return {v.url: v for v in variants} + # + + # + def analyze(self) -> str: + """ + Сравнивает два последних запуска и генерирует HTML-отчет об изменениях. + """ + # + with self.Session() as session: + current_run_id, prev_run_id = self._get_last_two_runs(session) + if not current_run_id or not prev_run_id: + return "" + + self.logger.info(f"Сравнение запуска {current_run_id} с предыдущим з��пуском {prev_run_id}") + + current_data = self._get_data_for_run(session, current_run_id) + prev_data = self._get_data_for_run(session, prev_run_id) + + price_changes = [] + availability_changes = [] + new_items = [] + removed_items = [] + + # Поиск изменений и новых товаров + for url, current_item in current_data.items(): + prev_item = prev_data.get(url) + if prev_item: + # Изменение цены + if current_item.price != prev_item.price: + price_changes.append((current_item, prev_item.price)) + # Изменение статуса наличия + if current_item.is_available != prev_item.is_available: + availability_changes.append(current_item) + else: + new_items.append(current_item) + + # Поиск удаленных товаров + for url, prev_item in prev_data.items(): + if url not in current_data: + removed_items.append(prev_item) + + return self._format_report(price_changes, availability_changes, new_items, removed_items) + # + # + + # + def _format_report(self, price_changes, availability_changes, new_items, removed_items) -> str: + """Форматирует найденные изменения в красивый HTML-отчет.""" + if not any([price_changes, availability_changes, new_items, removed_items]): + self.logger.info("Изменений не найдено.") + return "" + + report_parts = ["📈 Отчет об изменениях на сайте\n"] + + if price_changes: + report_parts.append("\n💰 Изменились цены:") + for item, old_price in price_changes: + report_parts.append(f" • {item.name} ({item.volume}): {old_price} ₽ → {item.price} ₽") + + if availability_changes: + report_parts.append("\n📦 Изменилось наличие:") + for item in availability_changes: + status = "✅ В наличии" if item.is_available else "❌ Нет в наличии" + report_parts.append(f" • {item.name} ({item.volume}): {status}") + + if new_items: + report_parts.append("\n✨ Новые товары:") + for item in new_items: + report_parts.append(f" • {item.name} ({item.volume}) - {item.price} ₽") + + if removed_items: + report_parts.append("\n🗑️ Удаленные товары:") + for item in removed_items: + report_parts.append(f" • {item.name} ({item.volume})") + + return "\n".join(report_parts) + # +# + /> diff --git a/src/core/settings.py b/src/core/settings.py index 5161791..d5cc207 100644 --- a/src/core/settings.py +++ b/src/core/settings.py @@ -77,14 +77,19 @@ class Settings(BaseModel): num_parser_threads: int = Field(default=int(os.getenv('PARSER_THREADS', 5)), description="Количество потоков для парсинга") # + # + telegram_bot_token: str = Field(default=os.getenv('TELEGRAM_BOT_TOKEN', ''), description="Токен для Telegram бота") + telegram_chat_id: str = Field(default=os.getenv('TELEGRAM_CHAT_ID', ''), description="ID чата для отправки уведомлений") + # + # selectors: ScraperSelectors = ScraperSelectors( CATALOG_PRODUCT_LINK='.product-card h4 a.product-link', - VARIANT_LIST_ITEM='.product-version-select li', + VARIANT_LIST_ITEM='.product-version-select li, .product-variants-list .variant-item', PRODUCT_PAGE_NAME='h1.product-h1', - ACTIVE_VOLUME='.product-version-select li.active', - PRICE_BLOCK='.product-sale-box .price span, .price-value, .product-price, .price', - PRODUCT_UNAVAILABLE='.product-unavailable, .out-of-stock-message, .unavailable-message, .stock-status.out-of-stock, li.not-available, div.disabled', + ACTIVE_VOLUME='.product-version-select li.active, .variant-item.active', + PRICE_BLOCK='.price-value, .product-price .price, .product-sale-box .price span', + PRODUCT_UNAVAILABLE='.product-unavailable, .out-of-stock, .unavailable, .stock.status-0', ) # diff --git a/src/utils/telegram_sender.py b/src/utils/telegram_sender.py new file mode 100644 index 0000000..ff1df4e --- /dev/null +++ b/src/utils/telegram_sender.py @@ -0,0 +1,72 @@ +# + +# +import logging +import asyncio +from telegram import Bot +from telegram.error import TelegramError +from src.core.settings import settings +# + +# +# description: "Отправляет сообщения в Telegram, используя токен и ID чата из настроек." +# invariant: "Токен и ID чата должны быть заданы в конфигурации." +# +class TelegramSender: + # + def __init__(self): + if not settings.telegram_bot_token or not settings.telegram_chat_id: + raise ValueError("TELEGRAM_BOT_TOKEN и TELEGRAM_CHAT_ID должны быть установлены в .env") + self.bot = Bot(token=settings.telegram_bot_token) + self.chat_id = settings.telegram_chat_id + self.logger = logging.getLogger(__name__) + # + + # + # description: "Асинхронно отправляет текстовое сообщение �� Telegram." + # preconditions: + # - "Сообщение (message) должно быть непустой строкой." + # postconditions: + # - "Сообщение отправлено в чат." + # exceptions: + # - "TelegramError: при ошибках API Telegram." + # - "Exception: при других ошибках." + # + # + async def send_message(self, message: str): + """Асинхронно отправляет сообщение в Telegram.""" + # + try: + self.logger.info(f"Отправка сообщения в Telegram (чат ID: {self.chat_id})...") + await self.bot.send_message( + chat_id=self.chat_id, + text=message, + parse_mode='HTML' + ) + self.logger.info("Сообщение в Telegram успешно отправлено.") + except TelegramError as e: + self.logger.error(f"Ошибка отправки сообщения в Telegram: {e}") + raise + except Exception as e: + self.logger.error(f"Непредвиденная ошибка при отправке сообщения в Telegram: {e}") + raise + # + # + +# +# description: "Удобная обертка для инициализации и отправки сообщения." +# +async def send_telegram_notification(message: str): + """ + Инициализирует TelegramSender и отправляет сообщение. + Является удобной оберткой для вызова из других частей приложения. + """ + try: + sender = TelegramSender() + await sender.send_message(message) + except ValueError as e: + logging.error(f"Ошибка инициализации TelegramSender: {e}") + except Exception as e: + logging.error(f"Не удалось отправить уведомление в Telegram: {e}") + +#