diff --git a/RABBITMQ_SETUP.md b/RABBITMQ_SETUP.md new file mode 100644 index 0000000..da6b3f9 --- /dev/null +++ b/RABBITMQ_SETUP.md @@ -0,0 +1,237 @@ +# ANCHOR: RabbitMQ_Setup_Guide +# Семантика: Руководство по настройке и использованию RabbitMQ в проекте price_parser + +## Обзор + +Проект `price_parser` теперь поддерживает экспорт данных в очередь сообщений RabbitMQ. Это позволяет: +- Асинхронно обрабатывать данные о продуктах +- Интегрироваться с другими системами через очереди +- Масштабировать обработку данных +- Обеспечивать надежную доставку сообщений + +## Архитектура RabbitMQ + +### Очереди +- `price_parser.products` - очередь для данных о продуктах +- `price_parser.logs` - очередь для логов парсера + +### Exchange +- `price_parser.exchange` - прямой exchange для маршрутизации сообщений + +### Routing Keys +- `products` - для данных о продуктах +- `logs` - для логов + +## Установка RabbitMQ + +### Windows +1. Скачайте RabbitMQ с официального сайта: https://www.rabbitmq.com/download.html +2. Установите Erlang (требуется для RabbitMQ) +3. Запустите RabbitMQ как службу Windows + +### Docker (рекомендуется) +```bash +docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management +``` + +### Linux (Ubuntu/Debian) +```bash +# Установка Erlang +sudo apt-get install erlang + +# Установка RabbitMQ +sudo apt-get install rabbitmq-server + +# Запуск службы +sudo systemctl start rabbitmq-server +sudo systemctl enable rabbitmq-server +``` + +## Настройка проекта + +### 1. Установка зависимостей +```bash +pip install -r requirements.txt +``` + +### 2. Конфигурация +Скопируйте `env.example` в `.env` и настройте параметры: + +```bash +# RabbitMQ настройки +RABBITMQ_HOST=localhost +RABBITMQ_PORT=5672 +RABBITMQ_USERNAME=guest +RABBITMQ_PASSWORD=guest +RABBITMQ_VIRTUAL_HOST=/ + +# Очереди +RABBITMQ_PRODUCTS_QUEUE=price_parser.products +RABBITMQ_LOGS_QUEUE=price_parser.logs +RABBITMQ_EXCHANGE=price_parser.exchange + +# Включение экспорта в RabbitMQ +ENABLE_RABBITMQ_EXPORT=true +``` + +### 3. Проверка подключения +```bash +python -c "from utils.exporters import validate_rabbitmq_connection; print('RabbitMQ доступен' if validate_rabbitmq_connection() else 'RabbitMQ недоступен')" +``` + +## Использование + +### Запуск парсера с экспортом в RabbitMQ +```bash +python src/main.py +``` + +### Структура сообщений + +#### Сообщение с данными о продуктах +```json +{ + "message_id": "550e8400-e29b-41d4-a716-446655440000", + "timestamp": "2023-10-27T12:34:56.789Z", + "source": "price_parser", + "products": [ + { + "name": "Peptide X", + "volume": "30ml", + "price": 1500, + "url": "https://elixirpeptide.ru/catalog/peptide-x/?product=variant1" + } + ], + "run_id": "20231027-123456", + "total_count": 1 +} +``` + +#### Сообщение с логами +```json +{ + "message_id": "550e8400-e29b-41d4-a716-446655440001", + "timestamp": "2023-10-27T12:34:56.789Z", + "source": "price_parser", + "log_records": [ + { + "run_id": "20231027-123456", + "timestamp": "2023-10-27T12:34:56.789Z", + "level": "INFO", + "message": "Парсинг начат." + } + ], + "run_id": "20231027-123456" +} +``` + +## Мониторинг + +### RabbitMQ Management UI +Если используется Docker с management plugin: +- URL: http://localhost:15672 +- Логин: guest +- Пароль: guest + +### Проверка очередей +```bash +# Установка rabbitmqadmin +wget http://localhost:15672/cli/rabbitmqadmin +chmod +x rabbitmqadmin + +# Просмотр очередей +./rabbitmqadmin list queues + +# Просмотр сообщений +./rabbitmqadmin get queue=price_parser.products +``` + +## Обработка сообщений + +### Python Consumer Example +```python +import pika +import json +from core.models import ProductDataMessage + +def callback(ch, method, properties, body): + """Обработчик сообщений из очереди продуктов""" + try: + data = json.loads(body) + message = ProductDataMessage(**data) + + print(f"Получено {message.total_count} продуктов") + for product in message.products: + print(f"- {product.name}: {product.price} руб.") + + ch.basic_ack(delivery_tag=method.delivery_tag) + except Exception as e: + print(f"Ошибка обработки сообщения: {e}") + ch.basic_nack(delivery_tag=method.delivery_tag) + +# Подключение к RabbitMQ +connection = pika.BlockingConnection( + pika.ConnectionParameters('localhost') +) +channel = connection.channel() + +# Подписка на очередь +channel.basic_consume( + queue='price_parser.products', + on_message_callback=callback +) + +print("Ожидание сообщений...") +channel.start_consuming() +``` + +## Troubleshooting + +### Проблемы подключения +1. Проверьте, что RabbitMQ запущен +2. Убедитесь в правильности настроек в `.env` +3. Проверьте доступность порта 5672 + +### Проблемы с правами доступа +```bash +# Создание пользователя (если нужно) +sudo rabbitmqctl add_user myuser mypassword +sudo rabbitmqctl set_permissions -p / myuser ".*" ".*" ".*" +``` + +### Проблемы с очередями +```bash +# Очистка очереди (осторожно!) +sudo rabbitmqctl purge_queue price_parser.products +``` + +## Безопасность + +### Рекомендации по безопасности +1. Измените стандартные учетные данные (guest/guest) +2. Используйте SSL/TLS для продакшена +3. Ограничьте права доступа пользователей +4. Регулярно обновляйте RabbitMQ + +### SSL настройка +```bash +# В .env добавьте: +RABBITMQ_SSL=True +RABBITMQ_SSL_CERT_FILE=/path/to/cert.pem +RABBITMQ_SSL_KEY_FILE=/path/to/key.pem +``` + +## Производительность + +### Настройки для высокой нагрузки +```bash +# В .env: +RABBITMQ_HEARTBEAT=60 +RABBITMQ_CONNECTION_TIMEOUT=10 +RABBITMQ_BLOCKED_CONNECTION_TIMEOUT=60 +``` + +### Мониторинг производительности +- Используйте RabbitMQ Management UI +- Следите за размером очередей +- Мониторьте время обработки сообщений \ No newline at end of file diff --git a/README.md b/README.md index 77d320d..8aa6df5 100644 --- a/README.md +++ b/README.md @@ -5,6 +5,27 @@ Это структурированное Python-приложение для парсинга каталога товаров с сайта `elixirpeptide.ru`, сбора информации о вариантах товаров и их ценах. +## 🚀 Новые возможности (v2.0) + +### ✅ Исправленные критические проблемы: +- **Устранено дублирование кода** в `engine.py` и `database.py` +- **Дополнены зависимости** в `requirements.txt` (pydantic, lxml, python-dotenv) +- **Улучшена обработка ошибок** с детальной диагностикой и retry механизмом +- **Добавлена валидация данных** на всех уровнях приложения + +### 🎯 Новые функции: +- **Retry стратегия** для HTTP запросов с экспоненциальной задержкой +- **Детальная статистика** выполнения парсинга +- **Валидация конфигурации** при запуске +- **Поддержка переменных окружения** через `.env` файл +- **Graceful degradation** - продолжение работы при частичных сбоях +- **Улучшенное логирование** с категоризацией ошибок + +### 🔧 Улучшения производительности: +- **Адаптивные таймауты** для HTTP запросов +- **Проверка на блокировку/капчу** в ответах сервера +- **Оптимизированная обработка данных** с пропуском некорректных записей + ## Структура Проекта Проект организован по принципу семантического разделения ответственности для удобства поддержки и дальнейшей разработки. @@ -16,40 +37,164 @@ - `database.py`: Логика работы с базой данных SQLite. - `logging_config.py`: Настройка системы логирования. - **`models.py`: [NEW FILE] Pydantic модели данных (ProductVariant, LogRecordModel).** + - **`settings.py`: [ENHANCED] Конфигурация с валидацией и поддержкой .env** - `scraper/`: Пакет с логикой парсинга. - - `engine.py`: Функции для скачивания и анализа HTML-страниц. + - **`engine.py`: [ENHANCED] Класс Scraper с retry механизмом и улучшенной обработкой ошибок** - `utils/`: Пакет со вспомогательными утилитами. - - `exporters.py`: Функции для сохранения данных в разные форматы (CSV). + - **`exporters.py`: [ENHANCED] Функции для сохранения данных с валидацией** - `requirements.txt`: Список зависимостей проекта. - `price_data_final/`: Директория для хранения результатов (создается автоматически). +- **`.env.example`: [NEW] Пример файла с переменными окружения** ## Установка и Запуск -1. **Клонируйте репозиторий:** - ```bash - git clone - cd peptide_parser_project - ``` +### 1. Клонирование и настройка окружения -2. **Создайте и активируйте виртуальное окружение:** - ```bash - python -m venv venv - source venv/bin/activate # Для Windows: venv\Scripts\activate - ``` +```bash +git clone +cd peptide_parser_project -3. **Установите зависимости:** - ```bash - pip install -r requirements.txt - ``` +# Создание виртуального окружения +python -m venv venv +source venv/bin/activate # Для Windows: venv\Scripts\activate -4. **Запустите парсер:** - Все настройки находятся в файле `src/config.py`. Вы можете изменить их перед запуском. - ```bash - python src/main.py - ``` +# Установка зависимостей +pip install -r requirements.txt +``` + +### 2. Настройка конфигурации + +#### Вариант A: Через переменные окружения +```bash +# Создайте файл .env на основе .env.example +cp .env.example .env + +# Отредактируйте .env файл под ваши нужды +nano .env +``` + +#### Вариант B: Прямое редактирование настроек +Отредактируйте `src/core/settings.py` для изменения настроек по умолчанию. + +### 3. Запуск парсера + +```bash +python src/main.py +``` + +## Конфигурация + +### Переменные окружения (.env файл) + +| Переменная | Описание | По умолчанию | +|------------|----------|--------------| +| `PARSER_BASE_URL` | Базовый URL сайта | `https://elixirpeptide.ru` | +| `PARSER_CATALOG_URL` | URL каталога товаров | `https://elixirpeptide.ru/catalog/` | +| `PARSER_SAVE_TO_CSV` | Сохранять в CSV | `true` | +| `PARSER_SAVE_TO_DB` | Сохранять в базу данных | `true` | +| `PARSER_LOG_TO_DB` | Логировать в базу данных | `true` | +| `PARSER_TIMEOUT` | Таймаут HTTP запросов (сек) | `30` | +| `PARSER_DELAY` | Задержка между запросами (сек) | `1.0` | +| `PARSER_RETRIES` | Максимум попыток для запросов | `3` | + +### Настройки производительности + +- **Таймаут запросов**: 30 секунд (настраивается) +- **Задержка между запросами**: 1 секунда (настраивается) +- **Retry стратегия**: 3 попытки с экспоненциальной задержкой +- **Graceful degradation**: Продолжение работы при ошибках отдельных запросов ## Результаты -- Если `SAVE_TO_CSV = True`, в директории `price_data_final/` будет создан CSV-файл с ценами. -- Если `SAVE_TO_DB = True`, в той же директории будет создан или обновлен файл `parser_data.db`. -- Если `LOG_TO_DB = True`, все логи сессии будут также записаны в таблицу `logs` в базе данных. \ No newline at end of file +### Файлы результатов + +- **CSV файл**: `price_data_final/prices_full_catalog_YYYY-MM-DD_HHMMSS.csv` +- **База данных**: `price_data_final/parser_data.db` (SQLite) + +### Структура данных + +```csv +name,volume,price +"Peptide X","30ml",1500 +"Peptide Y","50ml",2500 +``` + +### Логирование + +- **Консольные логи**: Детальная информация о процессе парсинга +- **Логи в БД**: Если `PARSER_LOG_TO_DB=true`, все логи сохраняются в таблицу `logs` + +## Обработка ошибок + +### Типы обрабатываемых ошибок + +1. **Сетевые ошибки**: Timeout, ConnectionError, HTTPError +2. **Ошибки парсинга**: Отсутствующие элементы, некорректные данные +3. **Ошибки файловой системы**: Права доступа, отсутствие директорий +4. **Ошибки базы данных**: SQLite ошибки, проблемы с подключением + +### Стратегия восстановления + +- **Retry механизм**: Автоматические повторные попытки для сетевых ошибок +- **Graceful degradation**: Пропуск проблемных записей с продолжением работы +- **Детальная диагностика**: Подробные логи для анализа проблем + +## Мониторинг и статистика + +### Статистика выполнения + +Приложение выводит детальную статистику: + +``` +[FINAL_STATS] Время выполнения: 45.23 секунд +[FINAL_STATS] Успешность: 95/100 (95.0%) +[STATS] Успешно: 95, Ошибок: 5 +``` + +### Метрики + +- Общее количество URL для парсинга +- Количество успешно обработанных записей +- Количество ошибок +- Время выполнения +- Процент успешности + +## Разработка + +### Архитектурные принципы + +1. **Разделение ответственности**: Каждый модуль отвечает за свою область +2. **Типизация**: Использование Pydantic для валидации данных +3. **Обработка ошибок**: Graceful handling с детальной диагностикой +4. **Конфигурируемость**: Гибкие настройки через переменные окружения +5. **Логирование**: Структурированное логирование на всех уровнях + +### Добавление новых функций + +1. **Новые форматы экспорта**: Добавьте функции в `src/utils/exporters.py` +2. **Новые селекторы**: Обновите `ScraperSelectors` в `src/core/settings.py` +3. **Новые поля данных**: Расширьте модель `ProductVariant` в `src/core/models.py` + +## Устранение неполадок + +### Частые проблемы + +1. **"Не удается подключиться к базовому URL"** + - Проверьте интернет-соединение + - Убедитесь, что сайт доступен + - Проверьте настройки прокси + +2. **"Не найдено ни одной ссылки на товар"** + - Проверьте CSS селекторы в настройках + - Убедитесь, что структура сайта не изменилась + +3. **"Ошибка при сохранении в БД"** + - Проверьте права доступа к директории + - Убедитесь, что SQLite поддерживается + +### Логи для диагностики + +Все ошибки логируются с детальной информацией. Проверьте: +- Консольные логи при запуске +- Логи в базе данных (если включено) +- Файлы результатов для проверки данных \ No newline at end of file diff --git a/env.example b/env.example new file mode 100644 index 0000000..84457a6 --- /dev/null +++ b/env.example @@ -0,0 +1,42 @@ +# ANCHOR: Environment_Variables_Example +# Семантика: Пример переменных окружения для конфигурации приложения +# Скопируйте этот файл в .env и настройте под ваши нужды + +# ANCHOR: Database_Settings +DATABASE_URL=sqlite:///price_parser.db + +# ANCHOR: Scraping_Settings +SCRAPING_DELAY=1.0 +MAX_RETRIES=3 +REQUEST_TIMEOUT=30 +USER_AGENT=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 + +# ANCHOR: Logging_Settings +LOG_LEVEL=INFO +LOG_FORMAT=%(asctime)s - %(name)s - %(levelname)s - %(message)s +LOG_FILE=logs/price_parser.log + +# ANCHOR: RabbitMQ_Settings +RABBITMQ_HOST=localhost +RABBITMQ_PORT=5672 +RABBITMQ_USERNAME=guest +RABBITMQ_PASSWORD=guest +RABBITMQ_VIRTUAL_HOST=/ + +# ANCHOR: RabbitMQ_Queue_Settings +RABBITMQ_PRODUCTS_QUEUE=price_parser.products +RABBITMQ_LOGS_QUEUE=price_parser.logs +RABBITMQ_EXCHANGE=price_parser.exchange + +# ANCHOR: RabbitMQ_Connection_Settings +RABBITMQ_CONNECTION_TIMEOUT=30 +RABBITMQ_HEARTBEAT=600 +RABBITMQ_BLOCKED_CONNECTION_TIMEOUT=300 + +# ANCHOR: Export_Settings +ENABLE_RABBITMQ_EXPORT=false +ENABLE_CSV_EXPORT=true +ENABLE_DATABASE_EXPORT=true + +# ANCHOR: Validation_Settings +VALIDATE_DATA_BEFORE_EXPORT=true \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 66879ed..f466b36 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,20 @@ # ANCHOR: Requirements # Семантика: Список внешних библиотек, необходимых для запуска приложения. -requests -beautifulsoup4 \ No newline at end of file + +# Основные зависимости для парсинга +requests>=2.31.0 +beautifulsoup4>=4.12.0 +lxml>=4.9.0 + +# Валидация и типизация данных +pydantic>=2.0.0 + +# Типизация (для Python < 3.8) +typing-extensions>=4.0.0 + +# Дополнительные утилиты +python-dotenv>=1.0.0 + +# ANCHOR: RabbitMQ_Dependencies +# Семантика: Зависимости для работы с очередью сообщений RabbitMQ +pika>=1.3.0 \ No newline at end of file diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..d862428 --- /dev/null +++ b/src/__init__.py @@ -0,0 +1,2 @@ +# ANCHOR: Package_Init +# Семантика: Инициализация пакета src \ No newline at end of file diff --git a/src/core/__init__.py b/src/core/__init__.py new file mode 100644 index 0000000..32f67a6 --- /dev/null +++ b/src/core/__init__.py @@ -0,0 +1,2 @@ +# ANCHOR: Core_Package_Init +# Семантика: Инициализация пакета core \ No newline at end of file diff --git a/src/core/database.py b/src/core/database.py index f910ea8..fa5f2e1 100644 --- a/src/core/database.py +++ b/src/core/database.py @@ -8,7 +8,7 @@ from datetime import datetime from pathlib import Path from typing import List, Dict, Optional -from src.core.models import ProductVariant, LogRecordModel # [FIX] Импорт моделей +from core.models import ProductVariant, LogRecordModel # [FIX] Импорт моделей # [CONTRACT] DatabaseManager # @description: Контекстный менеджер для управления соединением с SQLite. @@ -146,33 +146,84 @@ def init_database(db_path: Path, run_id: str): # - `data` должен быть списком словарей, каждый из которых соответствует ProductVariant. # - `db_path` должен указывать на существующую и инициализированную БД. # @post: Данные из `data` вставлены в таблицу `products`. -def save_data_to_db(data: List[Dict], db_path: Path, run_id: str): +def save_data_to_db(data: List[Dict], db_path: Path, run_id: str) -> bool: + """ + [ENHANCED] Сохраняет данные в базу данных с улучшенной обработкой ошибок. + + Args: + data: Список словарей с данными для сохранения + db_path: Путь к файлу базы данных + run_id: Идентификатор запуска для логирования + + Returns: + bool: True если сохранение прошло успешно, False в противном случае + """ log_prefix = f"save_data_to_db(id={run_id})" + + # [ENHANCEMENT] Валидация входных данных if not data: logging.warning(f"{log_prefix} - [CONTRACT_VIOLATION] Данные для сохранения отсутствуют. Пропуск сохранения.") - return + return False + + if not isinstance(data, list): + logging.error(f"{log_prefix} - [TYPE_ERROR] Данные должны быть списком, получено: {type(data)}") + return False + logging.info(f"{log_prefix} - Начало сохранения {len(data)} записей в БД: {db_path}") + # [PRECONDITION] Проверка формата данных (хотя ProductVariant.model_dump() должен гарантировать) - if not all(isinstance(item, dict) and all(k in item for k in ['name', 'volume', 'price']) for item in data): + required_fields = ['name', 'volume', 'price'] + if not all(isinstance(item, dict) and all(k in item for k in required_fields) for item in data): logging.error(f"{log_prefix} - [CONTRACT_VIOLATION] Некорректный формат данных для сохранения в БД.", extra={"sample_data": data[:1]}) - raise ValueError("Данные для сохранения в БД не соответствуют ожидаемому формату ProductVariant.") + return False try: + # [ENHANCEMENT] Проверка существования файла БД + if not db_path.exists(): + logging.warning(f"{log_prefix} - Файл БД не существует: {db_path}") + return False + # [CONTEXT_MANAGER] Используем with-statement для безопасного соединения и коммита with sqlite3.connect(db_path) as con: cur = con.cursor() products_to_insert = [] - for item in data: - # Преобразование к int и обработка возможных ошибок приведения типа + skipped_count = 0 + + for i, item in enumerate(data): + # [ENHANCEMENT] Детальная валидация каждого элемента try: - price_int = int(item['price']) - except (ValueError, TypeError) as e: - logging.error(f"{log_prefix} - [DATA_CLEANUP_FAILED] Некорректное значение цены для '{item.get('name')}': {item.get('price')}. Пропуск записи. Ошибка: {e}") - # [COHERENCE_CHECK_FAILED] Данные не соответствуют схеме - continue # Пропускаем эту запись, но продолжаем для остальных - products_to_insert.append( - (run_id, item['name'], item['volume'], price_int) - ) + # Проверка типов данных + if not isinstance(item['name'], str) or not item['name'].strip(): + logging.warning(f"{log_prefix} - [INVALID_NAME] Элемент {i}: некорректное имя '{item.get('name')}'") + skipped_count += 1 + continue + + if not isinstance(item['volume'], str): + logging.warning(f"{log_prefix} - [INVALID_VOLUME] Элемент {i}: некорректный объем '{item.get('volume')}'") + skipped_count += 1 + continue + + # Преобразование к int и обработка возможных ошибок приведения типа + try: + price_int = int(item['price']) + if price_int <= 0: + logging.warning(f"{log_prefix} - [INVALID_PRICE] Элемент {i}: некорректная цена {price_int}") + skipped_count += 1 + continue + except (ValueError, TypeError) as e: + logging.error(f"{log_prefix} - [DATA_CLEANUP_FAILED] Некорректное значение цены для '{item.get('name')}': {item.get('price')}. Пропуск записи. Ошибка: {e}") + skipped_count += 1 + continue # Пропускаем эту запись, но продолжаем для остальных + + products_to_insert.append( + (run_id, item['name'], item['volume'], price_int) + ) + + except KeyError as e: + logging.error(f"{log_prefix} - [MISSING_FIELD] Элемент {i} не содержит обязательное поле: {e}") + skipped_count += 1 + continue + if products_to_insert: cur.executemany( "INSERT INTO products (run_id, name, volume, price) VALUES (?, ?, ?, ?)", @@ -180,85 +231,21 @@ def save_data_to_db(data: List[Dict], db_path: Path, run_id: str): ) con.commit() logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] {len(products_to_insert)} записей успешно сохранено в базу данных.") + if skipped_count > 0: + logging.warning(f"{log_prefix} - Пропущено {skipped_count} некорректных записей.") + return True else: logging.warning(f"{log_prefix} - После фильтрации не осталось валидных записей для сохранения.") + return False except sqlite3.Error as e: logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Ошибка SQLite при сохранении данных: {e}", exc_info=True) - raise ConnectionError(f"Ошибка БД при сохранении: {e}") from e + return False + except PermissionError as e: + logging.error(f"{log_prefix} - [PERMISSION_ERROR] Нет прав на запись в БД {db_path}: {e}") + return False except Exception as e: logging.critical(f"{log_prefix} - [CRITICAL] Непредвиденная ошибка при сохранении данных в БД: {e}", exc_info=True) - raise + return False -# [CONTRACT] save_data_to_db -# @description: Сохраняет список объектов ProductVariant (представленных как словари) в таблицу `products`. -# @pre: -# - `data` должен быть списком словарей, каждый из которых соответствует ProductVariant. -# - `db_path` должен указывать на существующую и инициализированную БД. -# @post: Данные из `data` вставлены в таблицу `products`. -def save_data_to_db(data: List[Dict], db_path: Path, run_id: str): - log_prefix = f"save_data_to_db(id={run_id})" - if not data: - logging.warning(f"{log_prefix} - [CONTRACT_VIOLATION] Данные для сохранения отсутствуют. Пропуск сохранения.") - return - logging.info(f"{log_prefix} - Начало сохранения {len(data)} записей в БД: {db_path}") - # [PRECONDITION] Проверка формата данных (хотя ProductVariant.model_dump() должен гарантировать) - if not all(isinstance(item, dict) and all(k in item for k in ['name', 'volume', 'price']) for item in data): - logging.error(f"{log_prefix} - [CONTRACT_VIOLATION] Некорректный формат данных для сохранения в БД.", extra={"sample_data": data[:1]}) - raise ValueError("Данные для сохранения в БД не соответствуют ожидаемому формату ProductVariant.") - - try: - # [CONTEXT_MANAGER] Используем with-statement для безопасного соединения и коммита - with sqlite3.connect(db_path) as con: - cur = con.cursor() - products_to_insert = [] - for item in data: - # Преобразование к int и обработка возможных ошибок приведения типа - try: - price_int = int(item['price']) - except (ValueError, TypeError) as e: - logging.error(f"{log_prefix} - [DATA_CLEANUP_FAILED] Некорректное значение цены для '{item.get('name')}': {item.get('price')}. Пропуск записи. Ошибка: {e}") - # [COHERENCE_CHECK_FAILED] Данные не соответствуют схеме - continue # Пропускаем эту запись, но продолжаем для остальных - products_to_insert.append( - (run_id, item['name'], item['volume'], price_int) - ) - if products_to_insert: - cur.executemany( - "INSERT INTO products (run_id, name, volume, price) VALUES (?, ?, ?, ?)", - products_to_insert - ) - con.commit() - logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] {len(products_to_insert)} записей успешно сохранено в базу данных.") - else: - logging.warning(f"{log_prefix} - После фильтрации не осталось валидных записей для сохранения.") - - except sqlite3.Error as e: - logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Ошибка SQLite при сохранении данных: {e}", exc_info=True) - raise ConnectionError(f"Ошибка БД при сохранении: {e}") from e - except Exception as e: - logging.critical(f"{log_prefix} - [CRITICAL] Непредвиденная ошибка при сохранении данных в БД: {e}", exc_info=True) - raise - -def save_data_to_db(data: List[Dict], db_path: Path, run_id: str): - # ... (код функции save_data_to_db без изменений) ... - log_prefix = f"save_data_to_db(id={run_id})" - if not data: - logging.warning(f"{log_prefix} - [CONTRACT_VIOLATION] Данные для сохранения отсутствуют.") - return - logging.info(f"{log_prefix} - Начало сохранения {len(data)} записей в БД: {db_path}") - try: - con = sqlite3.connect(db_path) - cur = con.cursor() - products_to_insert = [ - (run_id, item['name'], item['volume'], int(item['price'])) for item in data - ] - cur.executemany( - "INSERT INTO products (run_id, name, volume, price) VALUES (?, ?, ?, ?)", - products_to_insert - ) - con.commit() - con.close() - logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Данные успешно сохранены в базу данных.") - except Exception as e: - logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Ошибка при сохранении в БД: {e}") \ No newline at end of file +# [REFACTORING_COMPLETE] Дублированные функции удалены, улучшена обработка ошибок \ No newline at end of file diff --git a/src/core/models.py b/src/core/models.py index ddd43fa..fd6e266 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -1,13 +1,14 @@ # [FILE] src/core/models.py # ANCHOR: Core_Models_Module # Семантика: Определяет Pydantic-модели для структурированного представления данных -# в приложении (продукты, логи). +# в приложении (продукты, логи, сообщения RabbitMQ). # [CONTRACT]: Все модели наследуются от `BaseModel` и обеспечивают типизацию и валидацию. # [COHERENCE]: Согласованы со схемами данных, используемыми в БД и экспортах. from pydantic import BaseModel, Field, HttpUrl, ValidationError from datetime import datetime -from typing import Optional +from typing import Optional, List +import uuid class ProductVariant(BaseModel): """ @@ -61,4 +62,78 @@ class LogRecordModel(BaseModel): } } +# ANCHOR: RabbitMQ_Models +# Семантика: Модели для работы с сообщениями RabbitMQ + +class RabbitMQMessage(BaseModel): + """ + [CONTRACT] + @description: Базовая модель для сообщений RabbitMQ. + @invariant: Все сообщения имеют уникальный ID и timestamp. + """ + message_id: str = Field(default_factory=lambda: str(uuid.uuid4()), description="Уникальный идентификатор сообщения.") + timestamp: datetime = Field(default_factory=datetime.utcnow, description="Время создания сообщения.") + source: str = Field(..., description="Источник сообщения (например, 'price_parser').") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class ProductDataMessage(RabbitMQMessage): + """ + [CONTRACT] + @description: Модель сообщения с данными о продуктах для отправки в RabbitMQ. + @invariant: Содержит список продуктов и метаданные о парсинге. + """ + products: List[ProductVariant] = Field(..., description="Список продуктов для обработки.") + run_id: str = Field(..., description="Идентификатор запуска парсера.") + total_count: int = Field(..., description="Общее количество продуктов в сообщении.") + + class Config: + json_schema_extra = { + "example": { + "message_id": "550e8400-e29b-41d4-a716-446655440000", + "timestamp": "2023-10-27T12:34:56.789Z", + "source": "price_parser", + "products": [ + { + "name": "Peptide X", + "volume": "30ml", + "price": 1500, + "url": "https://elixirpeptide.ru/catalog/peptide-x/?product=variant1" + } + ], + "run_id": "20231027-123456", + "total_count": 1 + } + } + +class LogMessage(RabbitMQMessage): + """ + [CONTRACT] + @description: Модель сообщения с логами для отправки в RabbitMQ. + @invariant: Содержит информацию о логах парсера. + """ + log_records: List[LogRecordModel] = Field(..., description="Список записей логов.") + run_id: str = Field(..., description="Идентификатор запуска парсера.") + + class Config: + json_schema_extra = { + "example": { + "message_id": "550e8400-e29b-41d4-a716-446655440001", + "timestamp": "2023-10-27T12:34:56.789Z", + "source": "price_parser", + "log_records": [ + { + "run_id": "20231027-123456", + "timestamp": "2023-10-27T12:34:56.789Z", + "level": "INFO", + "message": "Парсинг начат." + } + ], + "run_id": "20231027-123456" + } + } + # [COHERENCE_CHECK_PASSED] Все основные модели данных определены и типизированы. \ No newline at end of file diff --git a/src/core/rabbitmq.py b/src/core/rabbitmq.py new file mode 100644 index 0000000..42d2f9c --- /dev/null +++ b/src/core/rabbitmq.py @@ -0,0 +1,350 @@ +# [FILE] src/core/rabbitmq.py +# ANCHOR: RabbitMQ_Module +# Семантика: Модуль для работы с очередью сообщений RabbitMQ. +# [CONTRACT]: Обеспечивает надежное подключение, отправку сообщений и обработку ошибок. +# [COHERENCE]: Интегрирован с моделями данных и настройками приложения. + +import logging +import json +from typing import Optional, Dict, Any +from contextlib import contextmanager +import pika +from pika.exceptions import AMQPConnectionError, AMQPChannelError, ConnectionClosed + +from .settings import ( + RABBITMQ_HOST, RABBITMQ_PORT, RABBITMQ_USERNAME, RABBITMQ_PASSWORD, + RABBITMQ_VIRTUAL_HOST, RABBITMQ_CONNECTION_TIMEOUT, RABBITMQ_HEARTBEAT, + RABBITMQ_BLOCKED_CONNECTION_TIMEOUT, RABBITMQ_PRODUCTS_QUEUE, + RABBITMQ_LOGS_QUEUE, RABBITMQ_EXCHANGE +) + +logger = logging.getLogger(__name__) + +class RabbitMQConnection: + """ + [CONTRACT] + @description: Класс для управления подключением к RabbitMQ. + @invariant: Обеспечивает надежное подключение с автоматическим переподключением. + """ + + def __init__(self): + """[INIT] Инициализация подключения к RabbitMQ.""" + self.connection: Optional[pika.BlockingConnection] = None + self.channel: Optional[pika.channel.Channel] = None + self._connection_params = self._build_connection_params() + + def _build_connection_params(self) -> pika.ConnectionParameters: + """ + [HELPER] Строит параметры подключения к RabbitMQ. + + Returns: + pika.ConnectionParameters: Параметры подключения + """ + credentials = pika.PlainCredentials(RABBITMQ_USERNAME, RABBITMQ_PASSWORD) + return pika.ConnectionParameters( + host=RABBITMQ_HOST, + port=RABBITMQ_PORT, + virtual_host=RABBITMQ_VIRTUAL_HOST, + credentials=credentials, + connection_attempts=3, + retry_delay=5, + socket_timeout=RABBITMQ_CONNECTION_TIMEOUT, + heartbeat=RABBITMQ_HEARTBEAT, + blocked_connection_timeout=RABBITMQ_BLOCKED_CONNECTION_TIMEOUT + ) + + def connect(self) -> bool: + """ + [CONTRACT] + @description: Устанавливает подключение к RabbitMQ. + @precondition: Параметры подключения корректны. + @postcondition: Подключение установлено или False в случае ошибки. + + Returns: + bool: True если подключение успешно, False в противном случае + """ + try: + logger.info(f"[RABBITMQ] Подключение к {RABBITMQ_HOST}:{RABBITMQ_PORT}") + + self.connection = pika.BlockingConnection(self._connection_params) + self.channel = self.connection.channel() + + # [SETUP] Настройка exchange и очередей + self._setup_exchange_and_queues() + + logger.info("[RABBITMQ] Подключение успешно установлено") + return True + + except AMQPConnectionError as e: + logger.error(f"[RABBITMQ] Ошибка подключения: {e}") + return False + except Exception as e: + logger.error(f"[RABBITMQ] Непредвиденная ошибка при подключении: {e}") + return False + + def _setup_exchange_and_queues(self): + """ + [HELPER] Настраивает exchange и очереди в RabbitMQ. + @invariant: Создает необходимые exchange и очереди, если они не существуют. + """ + try: + # Создание exchange + self.channel.exchange_declare( + exchange=RABBITMQ_EXCHANGE, + exchange_type='direct', + durable=True + ) + + # Создание очереди для продуктов + self.channel.queue_declare( + queue=RABBITMQ_PRODUCTS_QUEUE, + durable=True + ) + self.channel.queue_bind( + exchange=RABBITMQ_EXCHANGE, + queue=RABBITMQ_PRODUCTS_QUEUE, + routing_key='products' + ) + + # Создание очереди для логов + self.channel.queue_declare( + queue=RABBITMQ_LOGS_QUEUE, + durable=True + ) + self.channel.queue_bind( + exchange=RABBITMQ_EXCHANGE, + queue=RABBITMQ_LOGS_QUEUE, + routing_key='logs' + ) + + logger.info("[RABBITMQ] Exchange и очереди настроены") + + except AMQPChannelError as e: + logger.error(f"[RABBITMQ] Ошибка настройки очередей: {e}") + raise + + def disconnect(self): + """ + [CONTRACT] + @description: Закрывает подключение к RabbitMQ. + @postcondition: Подключение закрыто корректно. + """ + try: + if self.channel and not self.channel.is_closed: + self.channel.close() + if self.connection and not self.connection.is_closed: + self.connection.close() + logger.info("[RABBITMQ] Подключение закрыто") + except Exception as e: + logger.error(f"[RABBITMQ] Ошибка при закрытии подключения: {e}") + + def is_connected(self) -> bool: + """ + [HELPER] Проверяет, активно ли подключение. + + Returns: + bool: True если подключение активно, False в противном случае + """ + return ( + self.connection is not None and + not self.connection.is_closed and + self.channel is not None and + not self.channel.is_closed + ) + + def send_message(self, queue: str, message: Dict[str, Any], routing_key: str = None) -> bool: + """ + [CONTRACT] + @description: Отправляет сообщение в указанную очередь. + @precondition: Подключение активно, сообщение валидно. + @postcondition: Сообщение отправлено или False в случае ошибки. + + Args: + queue: Название очереди + message: Сообщение для отправки + routing_key: Ключ маршрутизации (по умолчанию равен названию очереди) + + Returns: + bool: True если сообщение отправлено, False в противном случае + """ + if not self.is_connected(): + logger.error("[RABBITMQ] Попытка отправить сообщение без активного подключения") + return False + + try: + routing_key = routing_key or queue + message_body = json.dumps(message, ensure_ascii=False, default=str) + + self.channel.basic_publish( + exchange=RABBITMQ_EXCHANGE, + routing_key=routing_key, + body=message_body, + properties=pika.BasicProperties( + delivery_mode=2, # Сохранять сообщения на диск + content_type='application/json' + ) + ) + + logger.info(f"[RABBITMQ] Сообщение отправлено в очередь {queue}") + return True + + except AMQPChannelError as e: + logger.error(f"[RABBITMQ] Ошибка отправки сообщения: {e}") + return False + except Exception as e: + logger.error(f"[RABBITMQ] Непредвиденная ошибка при отправке: {e}") + return False + +@contextmanager +def get_rabbitmq_connection(): + """ + [CONTEXT_MANAGER] + @description: Контекстный менеджер для работы с RabbitMQ. + @invariant: Автоматически закрывает подключение при выходе из контекста. + + Yields: + RabbitMQConnection: Объект подключения к RabbitMQ + """ + connection = RabbitMQConnection() + try: + if connection.connect(): + yield connection + else: + logger.error("[RABBITMQ] Не удалось установить подключение") + yield None + finally: + connection.disconnect() + +class RabbitMQExporter: + """ + [CONTRACT] + @description: Класс для экспорта данных в RabbitMQ. + @invariant: Обеспечивает надежную отправку данных о продуктах и логов. + """ + + def __init__(self): + """[INIT] Инициализация экспортера RabbitMQ.""" + self.connection = RabbitMQConnection() + + def export_products(self, products: list, run_id: str) -> bool: + """ + [CONTRACT] + @description: Экспортирует данные о продуктах в RabbitMQ. + @precondition: Список продуктов не пустой, run_id валиден. + @postcondition: Данные отправлены в очередь или False в случае ошибки. + + Args: + products: Список продуктов для экспорта + run_id: Идентификатор запуска парсера + + Returns: + bool: True если экспорт успешен, False в противном случае + """ + if not products: + logger.warning("[RABBITMQ] Попытка экспорта пустого списка продуктов") + return False + + try: + from .models import ProductDataMessage, ProductVariant + + # Преобразование данных в Pydantic модели + product_variants = [] + for product in products: + try: + variant = ProductVariant(**product) + product_variants.append(variant) + except Exception as e: + logger.error(f"[RABBITMQ] Ошибка валидации продукта: {e}") + continue + + if not product_variants: + logger.error("[RABBITMQ] Нет валидных продуктов для экспорта") + return False + + # Создание сообщения + message = ProductDataMessage( + source="price_parser", + products=product_variants, + run_id=run_id, + total_count=len(product_variants) + ) + + # Отправка сообщения + if not self.connection.is_connected() and not self.connection.connect(): + return False + + return self.connection.send_message( + queue=RABBITMQ_PRODUCTS_QUEUE, + message=message.dict(), + routing_key='products' + ) + + except Exception as e: + logger.error(f"[RABBITMQ] Ошибка экспорта продуктов: {e}") + return False + + def export_logs(self, log_records: list, run_id: str) -> bool: + """ + [CONTRACT] + @description: Экспортирует логи в RabbitMQ. + @precondition: Список логов не пустой, run_id валиден. + @postcondition: Логи отправлены в очередь или False в случае ошибки. + + Args: + log_records: Список записей логов + run_id: Идентификатор запуска парсера + + Returns: + bool: True если экспорт успешен, False в противном случае + """ + if not log_records: + logger.warning("[RABBITMQ] Попытка экспорта пустого списка логов") + return False + + try: + from .models import LogMessage, LogRecordModel + + # Преобразование данных в Pydantic модели + log_models = [] + for log_record in log_records: + try: + log_model = LogRecordModel(**log_record) + log_models.append(log_model) + except Exception as e: + logger.error(f"[RABBITMQ] Ошибка валидации лога: {e}") + continue + + if not log_models: + logger.error("[RABBITMQ] Нет валидных логов для экспорта") + return False + + # Создание сообщения + message = LogMessage( + source="price_parser", + log_records=log_models, + run_id=run_id + ) + + # Отправка сообщения + if not self.connection.is_connected() and not self.connection.connect(): + return False + + return self.connection.send_message( + queue=RABBITMQ_LOGS_QUEUE, + message=message.dict(), + routing_key='logs' + ) + + except Exception as e: + logger.error(f"[RABBITMQ] Ошибка экспорта логов: {e}") + return False + + def close(self): + """ + [CONTRACT] + @description: Закрывает подключение к RabbitMQ. + @postcondition: Подключение закрыто корректно. + """ + self.connection.disconnect() + +# [COHERENCE_CHECK_PASSED] Модуль RabbitMQ создан с полной поддержкой контрактов и обработки ошибок. \ No newline at end of file diff --git a/src/core/settings.py b/src/core/settings.py index c43d6c6..0400a80 100644 --- a/src/core/settings.py +++ b/src/core/settings.py @@ -4,8 +4,69 @@ # Семантика: Этот модуль является единственным источником истины для всех # конфигурационных параметров приложения. Использует Pydantic для типизации и валидации. +import os from pathlib import Path -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, validator, HttpUrl +from typing import Optional +from dotenv import load_dotenv + +# ANCHOR: Environment_Loading +# Семантика: Загрузка переменных окружения из .env файла +load_dotenv() + +# ANCHOR: Base_Paths +# Семантика: Базовые пути для приложения +BASE_DIR = Path(__file__).parent.parent.parent +DATA_DIR = BASE_DIR / "price_data_final" + +# ANCHOR: Database_Settings +# Семантика: Настройки базы данных SQLite +DATABASE_URL = os.getenv("DATABASE_URL", f"sqlite:///{BASE_DIR}/price_parser.db") + +# ANCHOR: Scraping_Settings +# Семантика: Настройки для веб-скрапинга +SCRAPING_DELAY = float(os.getenv("SCRAPING_DELAY", "1.0")) # Задержка между запросами в секундах +MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3")) # Максимальное количество попыток +REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "30")) # Таймаут запросов в секундах +USER_AGENT = os.getenv("USER_AGENT", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36") + +# ANCHOR: Logging_Settings +# Семантика: Настройки логирования +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") +LOG_FORMAT = os.getenv("LOG_FORMAT", "%(asctime)s - %(name)s - %(levelname)s - %(message)s") +LOG_FILE = os.getenv("LOG_FILE", str(BASE_DIR / "logs" / "price_parser.log")) + +# ANCHOR: RabbitMQ_Settings +# Семантика: Настройки для подключения к RabbitMQ +RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost") +RABBITMQ_PORT = int(os.getenv("RABBITMQ_PORT", "5672")) +RABBITMQ_USERNAME = os.getenv("RABBITMQ_USERNAME", "guest") +RABBITMQ_PASSWORD = os.getenv("RABBITMQ_PASSWORD", "guest") +RABBITMQ_VIRTUAL_HOST = os.getenv("RABBITMQ_VIRTUAL_HOST", "/") + +# ANCHOR: RabbitMQ_Queue_Settings +# Семантика: Настройки очередей RabbitMQ +RABBITMQ_PRODUCTS_QUEUE = os.getenv("RABBITMQ_PRODUCTS_QUEUE", "price_parser.products") +RABBITMQ_LOGS_QUEUE = os.getenv("RABBITMQ_LOGS_QUEUE", "price_parser.logs") +RABBITMQ_EXCHANGE = os.getenv("RABBITMQ_EXCHANGE", "price_parser.exchange") + +# ANCHOR: RabbitMQ_Connection_Settings +# СEMАНТИКА: Настройки подключения к RabbitMQ +RABBITMQ_CONNECTION_TIMEOUT = int(os.getenv("RABBITMQ_CONNECTION_TIMEOUT", "30")) +RABBITMQ_HEARTBEAT = int(os.getenv("RABBITMQ_HEARTBEAT", "600")) +RABBITMQ_BLOCKED_CONNECTION_TIMEOUT = int(os.getenv("RABBITMQ_BLOCKED_CONNECTION_TIMEOUT", "300")) + +# ANCHOR: Export_Settings +# Семантика: Настройки экспорта данных +ENABLE_RABBITMQ_EXPORT = os.getenv("ENABLE_RABBITMQ_EXPORT", "false").lower() == "true" +ENABLE_CSV_EXPORT = os.getenv("ENABLE_CSV_EXPORT", "true").lower() == "true" +ENABLE_DATABASE_EXPORT = os.getenv("ENABLE_DATABASE_EXPORT", "true").lower() == "true" + +# ANCHOR: Validation_Settings +# Семантика: Настройки валидации данных +VALIDATE_DATA_BEFORE_EXPORT = os.getenv("VALIDATE_DATA_BEFORE_EXPORT", "true").lower() == "true" + +# [COHERENCE_CHECK_PASSED] Все настройки определены с разумными значениями по умолчанию. class ScraperSelectors(BaseModel): """ @@ -20,6 +81,13 @@ class ScraperSelectors(BaseModel): product_page_name: str = Field(..., alias='PRODUCT_PAGE_NAME') active_volume: str = Field(..., alias='ACTIVE_VOLUME') price_block: str = Field(..., alias='PRICE_BLOCK') + + @validator('*') + def validate_selectors(cls, v): + """[VALIDATOR] Проверяет, что селекторы не пустые.""" + if not v or not v.strip(): + raise ValueError('Селектор не может быть пустым') + return v.strip() class Settings(BaseModel): """ @@ -27,19 +95,27 @@ class Settings(BaseModel): @description: Главный класс конфигурации приложения. Собирает все настройки в одном месте. """ # [CONFIG] Основные настройки парсера - base_url: str = 'https://elixirpeptide.ru' - catalog_url: str = 'https://elixirpeptide.ru/catalog/' - headers: dict = { - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' - } + base_url: str = Field(default='https://elixirpeptide.ru', description="Базовый URL сайта") + catalog_url: str = Field(default='https://elixirpeptide.ru/catalog/', description="URL каталога товаров") + headers: dict = Field( + default={ + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + }, + description="HTTP заголовки для запросов" + ) # [CONFIG] Настройки вывода - output_dir: Path = Path('price_data_final') - save_to_csv: bool = True - save_to_db: bool = True + output_dir: Path = Field(default=Path('price_data_final'), description="Директория для сохранения результатов") + save_to_csv: bool = Field(default=True, description="Сохранять ли данные в CSV") + save_to_db: bool = Field(default=True, description="Сохранять ли данные в базу данных") # [CONFIG] Настройки логирования - log_to_db: bool = True + log_to_db: bool = Field(default=True, description="Сохранять ли логи в базу данных") + + # [ENHANCEMENT] Настройки производительности + request_timeout: int = Field(default=30, description="Таймаут HTTP запросов в секундах") + delay_between_requests: float = Field(default=1.0, description="Задержка между запросами в секундах") + max_retries: int = Field(default=3, description="Максимальное количество попыток для запросов") # [CONFIG] Вложенная модель с селекторами # Мы инициализируем ее прямо здесь, передавая словарь со значениями. @@ -50,6 +126,40 @@ class Settings(BaseModel): ACTIVE_VOLUME='.product-version-select li.active', PRICE_BLOCK='.product-sale-box .price span', ) + + @validator('base_url', 'catalog_url') + def validate_urls(cls, v): + """[VALIDATOR] Проверяет корректность URL.""" + if not v.startswith(('http://', 'https://')): + raise ValueError('URL должен начинаться с http:// или https://') + return v + + @validator('request_timeout') + def validate_timeout(cls, v): + """[VALIDATOR] Проверяет корректность таймаута.""" + if v <= 0: + raise ValueError('Таймаут должен быть положительным числом') + if v > 300: # 5 минут максимум + raise ValueError('Таймаут не может превышать 300 секунд') + return v + + @validator('delay_between_requests') + def validate_delay(cls, v): + """[VALIDATOR] Проверяет корректность задержки.""" + if v < 0: + raise ValueError('Задержка не может быть отрицательной') + if v > 60: # 1 минута максимум + raise ValueError('Задержка не может превышать 60 секунд') + return v + + @validator('max_retries') + def validate_retries(cls, v): + """[VALIDATOR] Проверяет корректность количества попыток.""" + if v < 0: + raise ValueError('Количество попыток не может быть отрицательным') + if v > 10: # 10 попыток максимум + raise ValueError('Количество попыток не может превышать 10') + return v @property def db_path(self) -> Path: @@ -58,9 +168,70 @@ class Settings(BaseModel): Гарантирует, что путь всегда будет актуальным, если изменится output_dir. """ return self.output_dir / 'parser_data.db' + + def validate_configuration(self) -> list[str]: + """ + [NEW] Валидирует всю конфигурацию и возвращает список ошибок. + + Returns: + list[str]: Список ошибок конфигурации (пустой, если все корректно) + """ + errors = [] + + # Проверка доступности директории + try: + self.output_dir.mkdir(parents=True, exist_ok=True) + except Exception as e: + errors.append(f"Не удается создать директорию {self.output_dir}: {e}") + + # Проверка URL + try: + import requests + response = requests.head(self.base_url, timeout=10) + if response.status_code >= 400: + errors.append(f"Базовый URL недоступен: {self.base_url} (статус: {response.status_code})") + except Exception as e: + errors.append(f"Не удается подключиться к базовому URL {self.base_url}: {e}") + + return errors + +# [ENHANCEMENT] Загрузка настроек из переменных окружения +def load_settings_from_env() -> Settings: + """ + [NEW] Загружает настройки из переменных окружения. + + Returns: + Settings: Объект настроек + """ + # Загружаем .env файл, если он существует + env_file = Path('.env') + if env_file.exists(): + try: + from dotenv import load_dotenv + load_dotenv() + except ImportError: + pass # python-dotenv не установлен + + # Создаем настройки с возможностью переопределения через переменные окружения + settings_data = { + 'base_url': os.getenv('PARSER_BASE_URL', 'https://elixirpeptide.ru'), + 'catalog_url': os.getenv('PARSER_CATALOG_URL', 'https://elixirpeptide.ru/catalog/'), + 'save_to_csv': os.getenv('PARSER_SAVE_TO_CSV', 'true').lower() == 'true', + 'save_to_db': os.getenv('PARSER_SAVE_TO_DB', 'true').lower() == 'true', + 'log_to_db': os.getenv('PARSER_LOG_TO_DB', 'true').lower() == 'true', + 'request_timeout': int(os.getenv('PARSER_TIMEOUT', '30')), + 'delay_between_requests': float(os.getenv('PARSER_DELAY', '1.0')), + 'max_retries': int(os.getenv('PARSER_RETRIES', '3')), + } + + return Settings(**settings_data) # [SINGLETON] Создаем единственный экземпляр настроек, который будет использоваться # во всем приложении. Это стандартная практика для работы с конфигурацией. -settings = Settings() +try: + settings = load_settings_from_env() +except Exception as e: + # Fallback к настройкам по умолчанию + settings = Settings() # [REFACTORING_COMPLETE] Этот модуль готов к использованию. \ No newline at end of file diff --git a/src/main.py b/src/main.py index 51d7e1a..1906722 100644 --- a/src/main.py +++ b/src/main.py @@ -3,13 +3,62 @@ # Семантика: Единственная задача этого модуля - создать и запустить оркестратор. # Он не содержит никакой логики, только инициализирует процесс. -from src.orchestrator import AppOrchestrator -from src.core.settings import settings +import sys +import logging +from orchestrator import AppOrchestrator +from core.settings import settings def main(): """Точка входа в приложение.""" - orchestrator = AppOrchestrator(settings=settings) - orchestrator.run() + # [ENHANCEMENT] Настройка базового логирования для main + logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + logger = logging.getLogger(__name__) + + try: + logger.info("="*60) + logger.info("🚀 Запуск парсера цен ElixirPeptide") + logger.info("="*60) + + # [ENHANCEMENT] Валидация настроек + logger.info("📋 Проверка конфигурации...") + logger.info(f" • Базовый URL: {settings.base_url}") + logger.info(f" • Каталог: {settings.catalog_url}") + logger.info(f" • Сохранение в CSV: {'✅' if settings.save_to_csv else '❌'}") + logger.info(f" • Сохранение в БД: {'✅' if settings.save_to_db else '❌'}") + logger.info(f" • Логирование в БД: {'✅' if settings.log_to_db else '❌'}") + logger.info(f" • Таймаут запросов: {settings.request_timeout}с") + logger.info(f" • Задержка между запросами: {settings.delay_between_requests}с") + logger.info(f" • Максимум попыток: {settings.max_retries}") + + # [ENHANCEMENT] Валидация конфигурации + config_errors = settings.validate_configuration() + if config_errors: + logger.error("❌ Ошибки в конфигурации:") + for error in config_errors: + logger.error(f" • {error}") + raise ValueError("Конфигурация содержит ошибки") + else: + logger.info("✅ Конфигурация корректна") + + # Создание и запуск оркестратора + orchestrator = AppOrchestrator(settings=settings) + orchestrator.run() + + logger.info("="*60) + logger.info("✅ Парсинг успешно завершен!") + logger.info("="*60) + + except KeyboardInterrupt: + logger.warning("⚠️ Парсинг прерван пользователем (Ctrl+C)") + sys.exit(1) + except Exception as e: + logger.critical(f"💥 Критическая ошибка в приложении: {e}", exc_info=True) + logger.critical("🔧 Проверьте логи для детальной диагностики") + sys.exit(1) if __name__ == "__main__": main() \ No newline at end of file diff --git a/src/orchestrator.py b/src/orchestrator.py index 9b76754..904d6f3 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -9,13 +9,14 @@ import time import requests from datetime import datetime from typing import List, Optional +from contextlib import contextmanager -from src.core.settings import Settings -from src.core.models import ProductVariant # [FIX] Импорт ProductVariant из models.py -from src.core.database import init_database, save_data_to_db, DatabaseManager # [FIX] Импорт DatabaseManager -from src.core.logging_config import setup_logging # [COHERENCE_CHECK_PASSED] Импорт loggin_config -from src.scraper.engine import Scraper -from src.utils.exporters import save_data_to_csv +from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT +from core.models import ProductVariant # [FIX] Импорт ProductVariant из models.py +from core.database import init_database, save_data_to_db, DatabaseManager # [FIX] Импорт DatabaseManager +from core.logging_config import setup_logging # [COHERENCE_CHECK_PASSED] Импорт loggin_config +from scraper.engine import Scraper +from utils.exporters import save_data_to_csv, export_data_to_rabbitmq, export_logs_to_rabbitmq, validate_rabbitmq_connection class AppOrchestrator: """ @@ -31,6 +32,13 @@ class AppOrchestrator: self.http_session.headers.update(settings.headers) self.db_manager: Optional[DatabaseManager] = None # [STATE] Инициализация db_manager как Optional self.final_data: List[ProductVariant] = [] + self.stats = { + 'total_urls': 0, + 'successful_parses': 0, + 'failed_parses': 0, + 'start_time': None, + 'end_time': None + } # [DELEGATES] Создаем экземпляр скрейпера, передавая ему зависимости. # Оркестратор владеет скрейпером. @@ -41,106 +49,210 @@ class AppOrchestrator: ) self.logger = logging.getLogger(self.__class__.__name__) # [INIT] Инициализация логгера для класса + @contextmanager + def _error_context(self, operation: str): + """[HELPER] Контекстный менеджер для обработки ошибок с детальной диагностикой.""" + try: + yield + except Exception as e: + self.logger.error(f"[ERROR] Ошибка в операции '{operation}': {e}", exc_info=True) + # [ENHANCEMENT] Детальная диагностика ошибки + self._log_error_details(operation, e) + raise + + def _log_error_details(self, operation: str, error: Exception): + """[HELPER] Логирует детальную информацию об ошибке.""" + error_info = { + 'operation': operation, + 'error_type': type(error).__name__, + 'error_message': str(error), + 'run_id': self.run_id, + 'timestamp': datetime.now().isoformat(), + 'stats': self.stats.copy() + } + self.logger.error(f"[ERROR_DETAILS] {error_info}") + def _setup(self): """[ACTION] Шаг 0: Инициализация всех систем.""" - self.logger.info(f"[INFO] Запуск инициализации систем. Run ID: {self.run_id}") - # [CONDITIONAL_ACTION] Инициализация базы данных, если требуется - if self.settings.save_to_db or self.settings.log_to_db: - # [ACTION] Создаем директорию для БД, если ее нет - self.settings.output_dir.mkdir(parents=True, exist_ok=True) - self.db_manager = DatabaseManager(self.settings.db_path) - init_database(self.db_manager.db_path, self.run_id) # init_database работает с Path - # [DELEGATES] Настройка логирования - setup_logging(self.run_id, self.db_manager) - self.logger.info(f"[INFO] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}") + with self._error_context("setup"): + self.stats['start_time'] = datetime.now() + self.logger.info(f"[INFO] Запуск инициализации систем. Run ID: {self.run_id}") + + # [CONDITIONAL_ACTION] Инициализация базы данных, если требуется + if self.settings.save_to_db or self.settings.log_to_db: + # [ACTION] Создаем директорию для БД, если ее нет + self.settings.output_dir.mkdir(parents=True, exist_ok=True) + self.db_manager = DatabaseManager(self.settings.db_path) + init_database(self.db_manager.db_path, self.run_id) # init_database работает с Path + + # [DELEGATES] Настройка логирования + setup_logging(self.run_id, self.db_manager) + + # [ENHANCEMENT] Проверка доступности RabbitMQ + if ENABLE_RABBITMQ_EXPORT: + if validate_rabbitmq_connection(): + self.logger.info("[RABBITMQ] Подключение к RabbitMQ доступно") + else: + self.logger.warning("[RABBITMQ] Подключение к RabbitMQ недоступно, экспорт в RabbitMQ будет пропущен") + + self.logger.info(f"[INFO] Оркестратор запущен. Архитектура v2.0. Run ID: {self.run_id}") def _collect_urls(self) -> List[str]: """[ACTION] Шаги 1 и 2: Сбор всех URL для парсинга.""" - self.logger.info("[INFO] Начало сбора URL для парсинга.") - # [DELEGATES] Делегируем сбор URL скрейперу. - base_urls = self.scraper.get_base_product_urls( - catalog_url=self.settings.catalog_url, - run_id=self.run_id - ) - if not base_urls: - self.logger.error("[ERROR] Не найдено ни одного базового URL. Завершение работы сбора URL.") - return [] - # [DELEGATES] Делегируем сбор URL вариантов скрейперу. - all_urls_to_scrape = self.scraper.get_all_variant_urls( - base_product_urls=base_urls, - run_id=self.run_id - ) - if not all_urls_to_scrape: - self.logger.error("[ERROR] Не удалось сформировать список URL для парсинга. Завершение работы сбора URL.") - self.logger.info(f"[INFO] Сбор URL завершен. Найдено {len(all_urls_to_scrape)} URL вариантов для парсинга.") - return all_urls_to_scrape + with self._error_context("collect_urls"): + self.logger.info("[INFO] Начало сбора URL для парсинга.") + + # [DELEGATES] Делегируем сбор URL скрейперу. + base_urls = self.scraper.get_base_product_urls( + catalog_url=self.settings.catalog_url, + run_id=self.run_id + ) + + if not base_urls: + self.logger.error("[ERROR] Не найдено ни одного базового URL. Завершение работы сбора URL.") + return [] + + # [DELEGATES] Делегируем сбор URL вариантов скрейперу. + all_urls_to_scrape = self.scraper.get_all_variant_urls( + base_product_urls=base_urls, + run_id=self.run_id + ) + + if not all_urls_to_scrape: + self.logger.error("[ERROR] Не удалось сформировать список URL для парсинга. Завершение работы сбора URL.") + return [] + + self.stats['total_urls'] = len(all_urls_to_scrape) + self.logger.info(f"[INFO] Сбор URL завершен. Найдено {len(all_urls_to_scrape)} URL вариантов для парсинга.") + return all_urls_to_scrape def _scrape_data(self, urls: List[str]): """[ACTION] Шаг 3: Итеративный парсинг данных.""" - total_to_scrape = len(urls) - self.logger.info(f"[INFO] Начало парсинга {total_to_scrape} URL вариантов.") - for i, url in enumerate(urls): - self.logger.info(f"[INFO] Парсинг URL {i+1}/{total_to_scrape}: {url.split('/')[-1]}") - time.sleep(1) # [ACTION] Задержка между запросами - # [DELEGATES] Делегируем парсинг одной страницы скрейперу. - variant_data = self.scraper.scrape_variant_page( - variant_url=url, - run_id=self.run_id - ) - if variant_data: - self.final_data.append(variant_data) - self.logger.info(f"[INFO] Парсинг данных завершен. Всего собрано {len(self.final_data)} валидных вариантов.") + with self._error_context("scrape_data"): + total_to_scrape = len(urls) + self.logger.info(f"[INFO] Начало парсинга {total_to_scrape} URL вариантов.") + + for i, url in enumerate(urls): + try: + self.logger.info(f"[INFO] Парсинг URL {i+1}/{total_to_scrape}: {url.split('/')[-1]}") + time.sleep(1) # [ACTION] Задержка между запросами + + # [DELEGATES] Делегируем парсинг одной страницы скрейперу. + variant_data = self.scraper.scrape_variant_page( + variant_url=url, + run_id=self.run_id + ) + + if variant_data: + self.final_data.append(variant_data) + self.stats['successful_parses'] += 1 + else: + self.stats['failed_parses'] += 1 + + except Exception as e: + self.stats['failed_parses'] += 1 + self.logger.error(f"[ERROR] Ошибка при парсинге URL {i+1}/{total_to_scrape} ({url}): {e}") + # [ENHANCEMENT] Продолжаем работу, не прерывая весь процесс + continue + + self.logger.info(f"[INFO] Парсинг данных завершен. Всего собрано {len(self.final_data)} валидных вариантов.") + self.logger.info(f"[STATS] Успешно: {self.stats['successful_parses']}, Ошибок: {self.stats['failed_parses']}") def _save_results(self): """[ACTION] Шаг 4: Сохранение результатов.""" - self.logger.info("[INFO] Начало сохранения результатов парсинга.") - if not self.final_data: - self.logger.warning("[WARN] Итоговый набор данных пуст. Файлы не будут созданы.") - return + with self._error_context("save_results"): + self.logger.info("[INFO] Начало сохранения результатов парсинга.") + + if not self.final_data: + self.logger.warning("[WARN] Итоговый набор данных пуст. Файлы не будут созданы.") + return - self.logger.info(f"[INFO] Всего найдено валидных вариантов для сохранения: {len(self.final_data)}") - # [CONDITIONAL_ACTION] Сохранение в CSV - if self.settings.save_to_csv: - timestamp = datetime.now().strftime('%Y-%m-%d_%H%M%S') # Добавил время для уникальности - output_filename = self.settings.output_dir / f'prices_full_catalog_{timestamp}.csv' - # Преобразуем ProductVariant объекты в словари для save_data_to_csv - data_to_csv = [p.model_dump() for p in self.final_data] # Используем model_dump() для Pydantic v2 - save_data_to_csv(data_to_csv, output_filename, self.run_id) - self.logger.info(f"[INFO] Данные сохранены в CSV: {output_filename}") + self.logger.info(f"[INFO] Всего найдено валидных вариантов для сохранения: {len(self.final_data)}") + + # [CONDITIONAL_ACTION] Сохранение в CSV + if ENABLE_CSV_EXPORT and self.settings.save_to_csv: + try: + timestamp = datetime.now().strftime('%Y-%m-%d_%H%M%S') # Добавил время для уникальности + output_filename = self.settings.output_dir / f'prices_full_catalog_{timestamp}.csv' + # Преобразуем ProductVariant объекты в словари для save_data_to_csv + data_to_csv = [p.model_dump() for p in self.final_data] # Используем model_dump() для Pydantic v2 + if save_data_to_csv(data_to_csv, output_filename, self.run_id): + self.logger.info(f"[INFO] Данные успешно сохранены в CSV: {output_filename}") + else: + self.logger.error(f"[ERROR] Не удалось сохранить данные в CSV: {output_filename}") + except Exception as e: + self.logger.error(f"[ERROR] Ошибка при сохранении в CSV: {e}") - # [CONDITIONAL_ACTION] Сохранение в БД - if self.settings.save_to_db and self.db_manager: - # Преобразуем ProductVariant объекты в словари для save_data_to_db - data_to_db = [p.model_dump() for p in self.final_data] - save_data_to_db(data_to_db, self.db_manager.db_path, self.run_id) # save_data_to_db ожидает Path - self.logger.info("[INFO] Данные сохранены в базу данных.") - self.logger.info("[INFO] Сохранение результатов завершено.") + # [CONDITIONAL_ACTION] Сохранение в БД + if ENABLE_DATABASE_EXPORT and self.settings.save_to_db and self.db_manager: + try: + # Преобразуем ProductVariant объекты в словари для save_data_to_db + data_to_db = [p.model_dump() for p in self.final_data] + if save_data_to_db(data_to_db, self.db_manager.db_path, self.run_id): # save_data_to_db ожидает Path + self.logger.info("[INFO] Данные успешно сохранены в базу данных.") + else: + self.logger.error("[ERROR] Не удалось сохранить данные в базу данных.") + except Exception as e: + self.logger.error(f"[ERROR] Ошибка при сохранении в БД: {e}") + + # [ENHANCEMENT] Экспорт в RabbitMQ + if ENABLE_RABBITMQ_EXPORT: + try: + # Преобразуем ProductVariant объекты в словари для экспорта + data_to_rabbitmq = [p.model_dump() for p in self.final_data] + if export_data_to_rabbitmq(data_to_rabbitmq, self.run_id, self.run_id): + self.logger.info("[INFO] Данные успешно экспортированы в RabbitMQ.") + else: + self.logger.error("[ERROR] Не удалось экспортировать данные в RabbitMQ.") + except Exception as e: + self.logger.error(f"[ERROR] Ошибка при экспорте в RabbitMQ: {e}") + + self.logger.info("[INFO] Сохранение результатов завершено.") def _cleanup(self): """[ACTION] Шаг 5: Корректное завершение работы.""" - self.logger.info("[INFO] Начало очистки ресурсов.") - self.http_session.close() - self.logger.debug("[DEBUG] HTTP-сессия закрыта.") - if self.db_manager: - self.db_manager.close() - self.logger.debug("[DEBUG] Соединение с базой данных закрыто.") - self.logger.info(f"[COHERENCE_CHECK_PASSED] Работа парсера завершена. Run ID: {self.run_id}") + try: + self.stats['end_time'] = datetime.now() + duration = self.stats['end_time'] - self.stats['start_time'] if self.stats['start_time'] else None + + self.logger.info("[INFO] Начало очистки ресурсов.") + self.http_session.close() + self.logger.debug("[DEBUG] HTTP-сессия закрыта.") + + if self.db_manager: + self.db_manager.close() + self.logger.debug("[DEBUG] Соединение с базой данных закрыто.") + + # [ENHANCEMENT] Финальная статистика + if duration: + self.logger.info(f"[FINAL_STATS] Время выполнения: {duration.total_seconds():.2f} секунд") + self.logger.info(f"[FINAL_STATS] Успешность: {self.stats['successful_parses']}/{self.stats['total_urls']} ({self.stats['successful_parses']/self.stats['total_urls']*100:.1f}%)") + + self.logger.info(f"[COHERENCE_CHECK_PASSED] Работа парсера завершена. Run ID: {self.run_id}") + + except Exception as e: + self.logger.error(f"[ERROR] Ошибка при очистке ресурсов: {e}") def run(self): """[ENTRYPOINT] Основной метод, запускающий весь процесс.""" self.logger.info("="*50) self.logger.info("[INFO] Запуск главного процесса оркестратора.") self.logger.info("="*50) + try: self._setup() urls_to_scrape = self._collect_urls() + if urls_to_scrape: self._scrape_data(urls_to_scrape) self._save_results() else: self.logger.warning("[WARN] Отсутствуют URL для парсинга. Пропуск шагов парсинга и сохранения.") + except Exception as e: self.logger.critical(f"[CRITICAL] Непредвиденная критическая ошибка в оркестраторе: {e}", exc_info=True) # [COHERENCE_CHECK_FAILED] Критическая ошибка нарушила нормальный поток выполнения. + raise # Пробрасываем исключение для обработки на верхнем уровне + finally: self._cleanup() diff --git a/src/scraper/__init__.py b/src/scraper/__init__.py new file mode 100644 index 0000000..2af51cb --- /dev/null +++ b/src/scraper/__init__.py @@ -0,0 +1,2 @@ +# ANCHOR: Scraper_Package_Init +# Семантика: Инициализация пакета scraper \ No newline at end of file diff --git a/src/scraper/engine.py b/src/scraper/engine.py index 01a542a..5c578e3 100644 --- a/src/scraper/engine.py +++ b/src/scraper/engine.py @@ -9,9 +9,11 @@ from urllib.parse import urljoin import requests from bs4 import BeautifulSoup from typing import List, Optional +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry -from src.core.models import ProductVariant # [FIX] Импорт ProductVariant -from src.core.settings import ScraperSelectors +from core.models import ProductVariant # [FIX] Импорт ProductVariant +from core.settings import ScraperSelectors class Scraper: """ @@ -25,27 +27,79 @@ class Scraper: self.selectors = selectors self.base_url = base_url self.logger = logging.getLogger(self.__class__.__name__) + + # [ENHANCEMENT] Настройка retry стратегии для HTTP запросов + self._setup_retry_strategy() + + def _setup_retry_strategy(self): + """[HELPER] Настраивает retry стратегию для HTTP запросов.""" + retry_strategy = Retry( + total=3, # Максимум 3 попытки + backoff_factor=1, # Экспоненциальная задержка: 1, 2, 4 секунды + status_forcelist=[429, 500, 502, 503, 504], # Коды ошибок для retry + allowed_methods=["HEAD", "GET", "OPTIONS"] # Разрешенные методы + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + self.logger.debug("[DEBUG] Retry стратегия настроена для HTTP запросов.") def _clean_price(self, price_str: str) -> int: """[HELPER] Очищает строку цены и возвращает целое число.""" self.logger.debug(f"[DEBUG] Очистка цены: '{price_str}'") - digits = ''.join(filter(str.isdigit, price_str)) - cleaned_price = int(digits) if digits else 0 - self.logger.debug(f"[DEBUG] Цена после очистки: {cleaned_price}") - return cleaned_price + try: + # Удаляем все символы кроме цифр + digits = ''.join(filter(str.isdigit, price_str)) + if not digits: + self.logger.warning(f"[WARNING] Не удалось извлечь цифры из цены: '{price_str}'") + return 0 + cleaned_price = int(digits) + if cleaned_price <= 0: + self.logger.warning(f"[WARNING] Некорректная цена (<= 0): {cleaned_price}") + return 0 + self.logger.debug(f"[DEBUG] Цена после очистки: {cleaned_price}") + return cleaned_price + except (ValueError, TypeError) as e: + self.logger.error(f"[ERROR] Ошибка при обработке цены '{price_str}': {e}") + return 0 def _fetch_page(self, url: str, request_id: str) -> Optional[str]: """[HELPER] Приватный метод для скачивания HTML-содержимого страницы.""" log_prefix = f"_fetch_page(id={request_id})" self.logger.debug(f"{log_prefix} - Запрос к URL: {url}") + try: - response = self.session.get(url, timeout=20) - response.raise_for_status() # Вызовет исключение для 4xx/5xx кодов. + response = self.session.get(url, timeout=30) # Увеличил timeout до 30 секунд + response.raise_for_status() + + # [ENHANCEMENT] Проверка на валидный HTML + if not response.text.strip(): + self.logger.warning(f"{log_prefix} - Получен пустой ответ от {url}") + return None + + # [ENHANCEMENT] Проверка на блокировку или капчу + if "captcha" in response.text.lower() or "blocked" in response.text.lower(): + self.logger.error(f"{log_prefix} - [BLOCKED] Обнаружена капча или блокировка на {url}") + return None + self.logger.debug(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Страница успешно получена, статус {response.status_code}.") return response.text + + except requests.exceptions.Timeout: + self.logger.error(f"{log_prefix} - [TIMEOUT] Превышено время ожидания для {url}") + return None + except requests.exceptions.ConnectionError as e: + self.logger.error(f"{log_prefix} - [CONNECTION_ERROR] Ошибка соединения для {url}: {e}") + return None + except requests.exceptions.HTTPError as e: + self.logger.error(f"{log_prefix} - [HTTP_ERROR] HTTP ошибка для {url}: {e.response.status_code}") + return None except requests.RequestException as e: self.logger.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Сетевая ошибка при запросе {url}: {e}", exc_info=True) return None + except Exception as e: + self.logger.critical(f"{log_prefix} - [CRITICAL] Непредвиденная ошибка при запросе {url}: {e}", exc_info=True) + return None def get_base_product_urls(self, catalog_url: str, run_id: str) -> List[str]: """[ACTION] Собирает URL всех товаров с основной страницы каталога. @@ -54,16 +108,36 @@ class Scraper: """ log_prefix = f"get_base_urls(id={run_id})" self.logger.info(f"{log_prefix} - Начало сбора базовых URL с: {catalog_url}") + html = self._fetch_page(catalog_url, log_prefix) if not html: - self.logger.warning(f"{log_prefix} - Не удалось получить HTML страницы каталога, возвращаю пустой список.") + self.logger.error(f"{log_prefix} - [CRITICAL] Не удалось получить HTML страницы каталога, возвращаю пустой список.") + return [] + + try: + soup = BeautifulSoup(html, 'html.parser') + links = soup.select(self.selectors.catalog_product_link) + + if not links: + self.logger.warning(f"{log_prefix} - [WARNING] Не найдено ни одной ссылки на товар с селектором: {self.selectors.catalog_product_link}") + return [] + + unique_urls = set() + for link in links: + href = link.get('href') + if href: + full_url = urljoin(self.base_url, href) + unique_urls.add(full_url) + else: + self.logger.debug(f"{log_prefix} - Пропуск ссылки без href: {link}") + + self.logger.info(f"{log_prefix} - Найдено {len(unique_urls)} уникальных базовых URL.") + # [COHERENCE_CHECK_PASSED] Базовые URL успешно собраны. + return list(unique_urls) + + except Exception as e: + self.logger.error(f"{log_prefix} - [CRITICAL] Ошибка при парсинге каталога: {e}", exc_info=True) return [] - soup = BeautifulSoup(html, 'html.parser') - links = soup.select(self.selectors.catalog_product_link) - unique_urls = {urljoin(self.base_url, link.get('href')) for link in links if link.get('href')} - self.logger.info(f"{log_prefix} - Найдено {len(unique_urls)} уникальных базовых URL.") - # [COHERENCE_CHECK_PASSED] Базовые URL успешно собраны. - return list(unique_urls) def get_all_variant_urls(self, base_product_urls: List[str], run_id: str) -> List[str]: """[ACTION] Проходит по базовым URL и собирает URL всех их вариантов. @@ -77,24 +151,36 @@ class Scraper: for i, base_url in enumerate(base_product_urls): self.logger.info(f"{log_prefix} - Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}") + html = self._fetch_page(base_url, f"{log_prefix}-{i+1}") if not html: self.logger.warning(f"{log_prefix} - Пропуск базового URL из-за ошибки загрузки: {base_url}") continue - soup = BeautifulSoup(html, 'html.parser') - variant_items = soup.select(self.selectors.variant_list_item) - if not variant_items: - self.logger.debug(f"{log_prefix} - Товар не имеет явных вариантов, добавляю базовый URL как вариант: {base_url}") + try: + soup = BeautifulSoup(html, 'html.parser') + variant_items = soup.select(self.selectors.variant_list_item) + + if not variant_items: + self.logger.debug(f"{log_prefix} - Товар не имеет явных вариантов, добавляю базовый URL как вариант: {base_url}") + all_variant_urls.append(base_url) + else: + for item in variant_items: + variant_id = item.get('data-id') + if variant_id: + variant_url = f"{base_url}?product={variant_id}" + all_variant_urls.append(variant_url) + else: + self.logger.debug(f"{log_prefix} - Пропуск варианта без data-id: {item}") + self.logger.debug(f"{log_prefix} - Найдено {len(variant_items)} вариантов для товара {base_url.split('/')[-1]}.") + + except Exception as e: + self.logger.error(f"{log_prefix} - [ERROR] Ошибка при обработке вариантов для {base_url}: {e}") + # Добавляем базовый URL как fallback all_variant_urls.append(base_url) - else: - for item in variant_items: - variant_id = item.get('data-id') - if variant_id: - variant_url = f"{base_url}?product={variant_id}" - all_variant_urls.append(variant_url) - self.logger.debug(f"{log_prefix} - Найдено {len(variant_items)} вариантов для товара {base_url.split('/')[-1]}.") + time.sleep(0.5) # [ACTION] Задержка между запросами + self.logger.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.") return all_variant_urls @@ -105,188 +191,59 @@ class Scraper: """ log_prefix = f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})" self.logger.info(f"{log_prefix} - Начало парсинга страницы варианта.") + html = self._fetch_page(variant_url, log_prefix) if not html: self.logger.warning(f"{log_prefix} - Не удалось получить HTML страницы варианта, пропуск парсинга.") return None - soup = BeautifulSoup(html, 'html.parser') + try: + soup = BeautifulSoup(html, 'html.parser') + + # [ENHANCEMENT] Более детальная проверка элементов name_el = soup.select_one(self.selectors.product_page_name) price_el = soup.select_one(self.selectors.price_block) volume_el = soup.select_one(self.selectors.active_volume) # Optional, может отсутствовать # [PRECONDITION] Проверка наличия основных элементов - if not (name_el and price_el): - self.logger.warning(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Не найдены базовые элементы (Имя продукта или Блок цены). Пропуск URL: {variant_url}.") + if not name_el: + self.logger.warning(f"{log_prefix} - [MISSING_ELEMENT] Не найден элемент имени продукта с селектором: {self.selectors.product_page_name}") return None - # [ACTION] Извлечение данных + + if not price_el: + self.logger.warning(f"{log_prefix} - [MISSING_ELEMENT] Не найден элемент цены с селектором: {self.selectors.price_block}") + return None + + # [ACTION] Извлечение данных с дополнительной валидацией name = name_el.get_text(strip=True) - price = self._clean_price(price_el.get_text(strip=True)) + if not name: + self.logger.warning(f"{log_prefix} - [EMPTY_DATA] Пустое имя продукта") + return None + + price_text = price_el.get_text(strip=True) + if not price_text: + self.logger.warning(f"{log_prefix} - [EMPTY_DATA] Пустая цена") + return None + + price = self._clean_price(price_text) + if price <= 0: + self.logger.warning(f"{log_prefix} - [INVALID_PRICE] Некорректная цена: {price}") + return None + volume = volume_el.get_text(strip=True) if volume_el else "N/A" # [POSTCONDITION] Создаем экземпляр контракта данных. # [CONTRACT_VALIDATOR] Pydantic валидация при создании модели - product = ProductVariant(name=name, volume=volume, price=price, url=variant_url) - self.logger.debug(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | '{product.volume}' | '{product.price}'") - return product + try: + product = ProductVariant(name=name, volume=volume, price=price, url=variant_url) + self.logger.debug(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | '{product.volume}' | '{product.price}'") + return product + except Exception as e: + self.logger.error(f"{log_prefix} - [VALIDATION_ERROR] Ошибка валидации ProductVariant: {e}") + return None except Exception as e: self.logger.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Исключение при парсинге страницы {variant_url}: {e}", exc_info=True) return None - def get_base_product_urls(self, catalog_url: str, run_id: str) -> List[str]: - """[ACTION] Собирает URL всех товаров с основной страницы каталога. - @pre: `catalog_url` должен быть доступен. - @post: Возвращает список уникальных URL базовых продуктов. - """ - log_prefix = f"get_base_urls(id={run_id})" - self.logger.info(f"{log_prefix} - Начало сбора базовых URL с: {catalog_url}") - html = self._fetch_page(catalog_url, log_prefix) - if not html: - self.logger.warning(f"{log_prefix} - Не удалось получить HTML страницы каталога, возвращаю пустой список.") - return [] - soup = BeautifulSoup(html, 'html.parser') - links = soup.select(self.selectors.catalog_product_link) - unique_urls = {urljoin(self.base_url, link.get('href')) for link in links if link.get('href')} - self.logger.info(f"{log_prefix} - Найдено {len(unique_urls)} уникальных базовых URL.") - # [COHERENCE_CHECK_PASSED] Базовые URL успешно собраны. - return list(unique_urls) - - def get_all_variant_urls(self, base_product_urls: List[str], run_id: str) -> List[str]: - """[ACTION] Проходит по базовым URL и собирает URL всех их вариантов. - @pre: `base_product_urls` - список доступных URL продуктов. - @post: Возвращает список всех URL вариантов продуктов. - """ - all_variant_urls = [] - total_base = len(base_product_urls) - log_prefix = f"get_variant_urls(id={run_id})" - self.logger.info(f"{log_prefix} - Начало сбора URL вариантов для {total_base} базовых продуктов.") - - for i, base_url in enumerate(base_product_urls): - self.logger.info(f"{log_prefix} - Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}") - html = self._fetch_page(base_url, f"{log_prefix}-{i+1}") - if not html: - self.logger.warning(f"{log_prefix} - Пропуск базового URL из-за ошибки загрузки: {base_url}") - continue - - soup = BeautifulSoup(html, 'html.parser') - variant_items = soup.select(self.selectors.variant_list_item) - if not variant_items: - self.logger.debug(f"{log_prefix} - Товар не имеет явных вариантов, добавляю базовый URL как вариант: {base_url}") - all_variant_urls.append(base_url) - else: - for item in variant_items: - variant_id = item.get('data-id') - if variant_id: - variant_url = f"{base_url}?product={variant_id}" - all_variant_urls.append(variant_url) - self.logger.debug(f"{log_prefix} - Найдено {len(variant_items)} вариантов для товара {base_url.split('/')[-1]}.") - time.sleep(0.5) # [ACTION] Задержка между запросами - self.logger.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.") - return all_variant_urls - - def scrape_variant_page(self, variant_url: str, run_id: str) -> Optional[ProductVariant]: - """[ACTION] Парсит страницу одного варианта и возвращает Pydantic-модель. - @pre: `variant_url` должен быть доступен и содержать ожидаемые элементы. - @post: Возвращает `ProductVariant` или `None` в случае ошибки парсинга. - """ - log_prefix = f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})" - self.logger.info(f"{log_prefix} - Начало парсинга страницы варианта.") - html = self._fetch_page(variant_url, log_prefix) - if not html: - self.logger.warning(f"{log_prefix} - Не удалось получить HTML страницы варианта, пропуск парсинга.") - return None - soup = BeautifulSoup(html, 'html.parser') - try: - name_el = soup.select_one(self.selectors.product_page_name) - price_el = soup.select_one(self.selectors.price_block) - volume_el = soup.select_one(self.selectors.active_volume) # Optional, может отсутствовать - - # [PRECONDITION] Проверка наличия основных элементов - if not (name_el and price_el): - self.logger.warning(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Не найдены базовые элементы (Имя продукта или Блок цены). Пропуск URL: {variant_url}.") - return None - # [ACTION] Извлечение данных - name = name_el.get_text(strip=True) - price = self._clean_price(price_el.get_text(strip=True)) - volume = volume_el.get_text(strip=True) if volume_el else "N/A" - - # [POSTCONDITION] Создаем экземпляр контракта данных. - # [CONTRACT_VALIDATOR] Pydantic валидация при создании модели - product = ProductVariant(name=name, volume=volume, price=price, url=variant_url) - self.logger.debug(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | '{product.volume}' | '{product.price}'") - return product - - except Exception as e: - self.logger.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Исключение при парсинге страницы {variant_url}: {e}", exc_info=True) - return None - - def get_base_product_urls(self, catalog_url: str, run_id: str) -> List[str]: - """[ACTION] Собирает URL всех товаров с основной страницы каталога.""" - log_prefix = f"get_base_urls(id={run_id})" - self.logger.info(f"{log_prefix} - Начало сбора базовых URL с: {catalog_url}") - html = self._fetch_page(catalog_url, log_prefix) - if not html: - return [] - soup = BeautifulSoup(html, 'html.parser') - links = soup.select(self.selectors.catalog_product_link) - unique_urls = {urljoin(self.base_url, link.get('href')) for link in links if link.get('href')} - self.logger.info(f"{log_prefix} - Найдено {len(unique_urls)} уникальных базовых URL.") - return list(unique_urls) - - def get_all_variant_urls(self, base_product_urls: List[str], run_id: str) -> List[str]: - """[ACTION] Проходит по базовым URL и собирает URL всех их вариантов.""" - all_variant_urls = [] - total_base = len(base_product_urls) - log_prefix = f"get_variant_urls(id={run_id})" - - for i, base_url in enumerate(base_product_urls): - self.logger.info(f"{log_prefix} - Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}") - html = self._fetch_page(base_url, f"{log_prefix}-{i+1}") - if not html: - continue - - soup = BeautifulSoup(html, 'html.parser') - variant_items = soup.select(self.selectors.variant_list_item) - if not variant_items: - self.logger.debug(f"{log_prefix} - Товар без вариантов, используется базовый URL: {base_url}") - all_variant_urls.append(base_url) - else: - for item in variant_items: - variant_id = item.get('data-id') - if variant_id: - variant_url = f"{base_url}?product={variant_id}" - all_variant_urls.append(variant_url) - self.logger.debug(f"{log_prefix} - Найдено {len(variant_items)} вариантов для товара.") - time.sleep(0.5) - self.logger.info(f"Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.") - return all_variant_urls - - def scrape_variant_page(self, variant_url: str, run_id: str) -> Optional[ProductVariant]: - """[ACTION] Парсит страницу одного варианта и возвращает Pydantic-модель.""" - log_prefix = f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})" - html = self._fetch_page(variant_url, log_prefix) - if not html: - return None - soup = BeautifulSoup(html, 'html.parser') - try: - name_el = soup.select_one(self.selectors.product_page_name) - price_el = soup.select_one(self.selectors.price_block) - if not (name_el and price_el): - self.logger.warning(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Не найдены базовые элементы (Имя или Цена). Пропуск URL.") - return None - name = name_el.get_text(strip=True) - price = self._clean_price(price_el.get_text(strip=True)) - volume_el = soup.select_one(self.selectors.active_volume) - volume = volume_el.get_text(strip=True) if volume_el else "N/A" - - # [POSTCONDITION] Создаем экземпляр контракта данных. - product = ProductVariant(name=name, volume=volume, price=price, url=variant_url) - self.logger.debug(f"{log_prefix} - Успешно: '{product.name}', '{product.volume}', '{product.price}'") - return product - - except Exception as e: - self.logger.error(f"{log_prefix} - Исключение при парсинге страницы: {e}", exc_info=True) - return None - -# [REFACTORING_COMPLETE] \ No newline at end of file +# [REFACTORING_COMPLETE] Дублированные методы удалены, улучшена обработка ошибок \ No newline at end of file diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..0ab708e --- /dev/null +++ b/src/utils/__init__.py @@ -0,0 +1,2 @@ +# ANCHOR: Utils_Package_Init +# Семантика: Инициализация пакета utils \ No newline at end of file diff --git a/src/utils/exporters.py b/src/utils/exporters.py index c5465aa..a8a9b08 100644 --- a/src/utils/exporters.py +++ b/src/utils/exporters.py @@ -5,22 +5,265 @@ import logging import csv from pathlib import Path -from typing import List, Dict +from typing import List, Dict, Optional -def save_data_to_csv(data: List[Dict], filename: Path, request_id: str): - # ... (код функции save_data_to_csv без изменений) ... +def save_data_to_csv(data: List[Dict], filename: Path, request_id: str) -> bool: + """ + [ENHANCED] Сохраняет данные в CSV файл с улучшенной обработкой ошибок. + + Args: + data: Список словарей с данными для сохранения + filename: Путь к файлу для сохранения + request_id: Идентификатор запроса для логирования + + Returns: + bool: True если сохранение прошло успешно, False в противном случае + """ log_prefix = f"save_data_to_csv(id={request_id})" + + # [ENHANCEMENT] Валидация входных данных if not data: logging.warning(f"{log_prefix} - [CONTRACT_VIOLATION] Данные для сохранения отсутствуют.") - return + return False + + if not isinstance(data, list): + logging.error(f"{log_prefix} - [TYPE_ERROR] Данные должны быть списком, получено: {type(data)}") + return False + + # [ENHANCEMENT] Проверка структуры данных + required_fields = ['name', 'volume', 'price'] + for i, item in enumerate(data): + if not isinstance(item, dict): + logging.error(f"{log_prefix} - [TYPE_ERROR] Элемент {i} должен быть словарем, получено: {type(item)}") + return False + + missing_fields = [field for field in required_fields if field not in item] + if missing_fields: + logging.error(f"{log_prefix} - [MISSING_FIELDS] Элемент {i} не содержит поля: {missing_fields}") + return False + + # [ENHANCEMENT] Валидация типов данных + if not isinstance(item['name'], str) or not item['name'].strip(): + logging.error(f"{log_prefix} - [INVALID_NAME] Элемент {i} имеет некорректное имя: {item['name']}") + return False + + if not isinstance(item['volume'], str): + logging.error(f"{log_prefix} - [INVALID_VOLUME] Элемент {i} имеет некорректный объем: {item['volume']}") + return False + + if not isinstance(item['price'], (int, float)) or item['price'] < 0: + logging.error(f"{log_prefix} - [INVALID_PRICE] Элемент {i} имеет некорректную цену: {item['price']}") + return False + logging.info(f"{log_prefix} - Начало сохранения {len(data)} записей в файл: {filename}") + try: + # [ENHANCEMENT] Создание директории, если она не существует filename.parent.mkdir(parents=True, exist_ok=True) + + # [ENHANCEMENT] Проверка доступности файла для записи + if filename.exists(): + logging.warning(f"{log_prefix} - Файл {filename} уже существует и будет перезаписан") + + # [ENHANCEMENT] Определение полей на основе данных fieldnames = ['name', 'volume', 'price'] + with open(filename, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() - writer.writerows(data) - logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Данные успешно сохранены.") + + # [ENHANCEMENT] Запись данных с обработкой ошибок + for i, row in enumerate(data): + try: + writer.writerow(row) + except Exception as e: + logging.error(f"{log_prefix} - [WRITE_ERROR] Ошибка записи строки {i}: {e}") + # Продолжаем запись остальных строк + continue + + logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Данные успешно сохранены в {filename}") + return True + + except PermissionError as e: + logging.error(f"{log_prefix} - [PERMISSION_ERROR] Нет прав на запись в файл {filename}: {e}") + return False + except OSError as e: + logging.error(f"{log_prefix} - [OS_ERROR] Ошибка операционной системы при сохранении {filename}: {e}") + return False except Exception as e: - logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Ошибка при сохранении CSV: {e}") \ No newline at end of file + logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Непредвиденная ошибка при сохранении CSV: {e}", exc_info=True) + return False + +def validate_csv_data(data: List[Dict]) -> tuple[bool, List[str]]: + """ + [NEW] Валидирует данные перед сохранением в CSV. + + Args: + data: Список словарей для валидации + + Returns: + tuple: (is_valid, list_of_errors) + """ + errors = [] + + if not data: + errors.append("Данные отсутствуют") + return False, errors + + if not isinstance(data, list): + errors.append(f"Данные должны быть списком, получено: {type(data)}") + return False, errors + + required_fields = ['name', 'volume', 'price'] + + for i, item in enumerate(data): + if not isinstance(item, dict): + errors.append(f"Элемент {i} должен быть словарем") + continue + + # Проверка обязательных полей + for field in required_fields: + if field not in item: + errors.append(f"Элемент {i} не содержит поле '{field}'") + + # Проверка типов данных + if 'name' in item and (not isinstance(item['name'], str) or not item['name'].strip()): + errors.append(f"Элемент {i} имеет некорректное имя") + + if 'price' in item and (not isinstance(item['price'], (int, float)) or item['price'] < 0): + errors.append(f"Элемент {i} имеет некорректную цену") + + return len(errors) == 0, errors + +# ANCHOR: RabbitMQ_Export_Functions +# Семантика: Функции для экспорта данных в RabbitMQ + +def export_data_to_rabbitmq(products: List[Dict], run_id: str, request_id: str) -> bool: + """ + [CONTRACT] + @description: Экспортирует данные о продуктах в RabbitMQ. + @precondition: Список продуктов валиден, run_id не пустой. + @postcondition: Данные отправлены в очередь или False в случае ошибки. + + Args: + products: Список продуктов для экспорта + run_id: Идентификатор запуска парсера + request_id: Идентификатор запроса для логирования + + Returns: + bool: True если экспорт успешен, False в противном случае + """ + log_prefix = f"export_data_to_rabbitmq(id={request_id})" + + try: + from core.rabbitmq import RabbitMQExporter + + # [VALIDATION] Проверка входных данных + if not products: + logging.warning(f"{log_prefix} - [CONTRACT_VIOLATION] Список продуктов пуст") + return False + + if not run_id: + logging.error(f"{log_prefix} - [CONTRACT_VIOLATION] run_id не может быть пустым") + return False + + logging.info(f"{log_prefix} - Начало экспорта {len(products)} продуктов в RabbitMQ") + + # [EXPORT] Создание экспортера и отправка данных + exporter = RabbitMQExporter() + try: + success = exporter.export_products(products, run_id) + if success: + logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Данные успешно экспортированы в RabbitMQ") + else: + logging.error(f"{log_prefix} - [EXPORT_FAILED] Не удалось экспортировать данные в RabbitMQ") + return success + finally: + exporter.close() + + except ImportError as e: + logging.error(f"{log_prefix} - [IMPORT_ERROR] Не удалось импортировать модуль RabbitMQ: {e}") + return False + except Exception as e: + logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Непредвиденная ошибка при экспорте в RabbitMQ: {e}", exc_info=True) + return False + +def export_logs_to_rabbitmq(log_records: List[Dict], run_id: str, request_id: str) -> bool: + """ + [CONTRACT] + @description: Экспортирует логи в RabbitMQ. + @precondition: Список логов валиден, run_id не пустой. + @postcondition: Логи отправлены в очередь или False в случае ошибки. + + Args: + log_records: Список записей логов + run_id: Идентификатор запуска парсера + request_id: Идентификатор запроса для логирования + + Returns: + bool: True если экспорт успешен, False в противном случае + """ + log_prefix = f"export_logs_to_rabbitmq(id={request_id})" + + try: + from core.rabbitmq import RabbitMQExporter + + # [VALIDATION] Проверка входных данных + if not log_records: + logging.warning(f"{log_prefix} - [CONTRACT_VIOLATION] Список логов пуст") + return False + + if not run_id: + logging.error(f"{log_prefix} - [CONTRACT_VIOLATION] run_id не может быть пустым") + return False + + logging.info(f"{log_prefix} - Начало экспорта {len(log_records)} логов в RabbitMQ") + + # [EXPORT] Создание экспортера и отправка логов + exporter = RabbitMQExporter() + try: + success = exporter.export_logs(log_records, run_id) + if success: + logging.info(f"{log_prefix} - [COHERENCE_CHECK_PASSED] Логи успешно экспортированы в RabbitMQ") + else: + logging.error(f"{log_prefix} - [EXPORT_FAILED] Не удалось экспортировать логи в RabbitMQ") + return success + finally: + exporter.close() + + except ImportError as e: + logging.error(f"{log_prefix} - [IMPORT_ERROR] Не удалось импортировать модуль RabbitMQ: {e}") + return False + except Exception as e: + logging.error(f"{log_prefix} - [COHERENCE_CHECK_FAILED] Непредвиденная ошибка при экспорте логов в RabbitMQ: {e}", exc_info=True) + return False + +def validate_rabbitmq_connection() -> bool: + """ + [HELPER] Проверяет доступность подключения к RabbitMQ. + + Returns: + bool: True если подключение доступно, False в противном случае + """ + try: + from core.rabbitmq import RabbitMQConnection + + connection = RabbitMQConnection() + success = connection.connect() + connection.disconnect() + + if success: + logging.info("[RABBITMQ] Подключение к RabbitMQ доступно") + else: + logging.warning("[RABBITMQ] Подключение к RabbitMQ недоступно") + + return success + + except ImportError: + logging.warning("[RABBITMQ] Модуль RabbitMQ не установлен") + return False + except Exception as e: + logging.error(f"[RABBITMQ] Ошибка проверки подключения: {e}") + return False + +# [COHERENCE_CHECK_PASSED] Модуль экспортеров расширен поддержкой RabbitMQ с полной валидацией и обработкой ошибок. \ No newline at end of file