From 0259289ee9a9507875b42d15a5538bc279141375 Mon Sep 17 00:00:00 2001 From: busya Date: Sat, 19 Jul 2025 01:04:04 +0300 Subject: [PATCH] + --- .gitignore | 5 +++- src/core/models.py | 20 +++++++++++-- src/core/settings.py | 6 ++-- src/main.py | 1 + src/orchestrator.py | 67 ++++++++++++++++++++++++++----------------- src/scraper/engine.py | 65 +++++++++++++++++++++++++---------------- 6 files changed, 107 insertions(+), 57 deletions(-) diff --git a/.gitignore b/.gitignore index 8029918..3ea26d2 100644 --- a/.gitignore +++ b/.gitignore @@ -19,4 +19,7 @@ price_data_final/ .vscode/ #backups -*.bak \ No newline at end of file +*.bak + +# Logs +/logs/ \ No newline at end of file diff --git a/src/core/models.py b/src/core/models.py index d05fca6..4d5c4a9 100644 --- a/src/core/models.py +++ b/src/core/models.py @@ -6,10 +6,14 @@ # # +import logging from pydantic import BaseModel, Field, HttpUrl +from pydantic.functional_validators import model_validator from datetime import datetime -from typing import List +from typing import List, Optional import uuid + +logger = logging.getLogger(__name__) # # @@ -19,8 +23,18 @@ import uuid class ProductVariant(BaseModel): # name: str = Field(..., description="Название продукта.") - volume: str = Field(..., description="Объем или вариант продукта (например, '50мл', '10 капсул').") - price: int = Field(..., gt=0, description="Цена продукта в числовом формате, должна быть положительной.") + volume: Optional[str] = Field(None, description="Объем или вариант продукта (например, '50мл', '10 капсул'). Может быть пустым, если не применимо.") + price: int = Field(..., description="Цена продукта в числовом формате. Должна быть положительной, если товар в наличии, иначе может быть 0.") + is_in_stock: bool = Field(..., description="Наличие товара.") + + @model_validator(mode='after') + def validate_price_based_on_stock(self) -> 'ProductVariant': + if not self.is_in_stock and self.price != 0: + logger.warning(f"[CONTRACT_VIOLATION] Product '{self.name}' (URL: {self.url}) is out of stock but has a non-zero price ({self.price}). Setting price to 0.") + self.price = 0 + elif self.is_in_stock and self.price <= 0: + raise ValueError("Price must be greater than 0 for in-stock products.") + return self url: HttpUrl = Field(..., description="Полный URL страницы варианта продукта.") is_in_stock: bool = Field(..., description="Наличие товара.") # diff --git a/src/core/settings.py b/src/core/settings.py index 6e7903d..5161791 100644 --- a/src/core/settings.py +++ b/src/core/settings.py @@ -67,12 +67,14 @@ class Settings(BaseModel): # log_to_db: bool = Field(default=os.getenv('PARSER_LOG_TO_DB', 'true').lower() == 'true') + log_dir: Path = Field(default=BASE_DIR / "logs", description="Директория для сохранения логов") # # request_timeout: int = Field(default=int(os.getenv('PARSER_TIMEOUT', 30))) delay_between_requests: float = Field(default=float(os.getenv('PARSER_DELAY', 1.0))) max_retries: int = Field(default=int(os.getenv('PARSER_RETRIES', 3))) + num_parser_threads: int = Field(default=int(os.getenv('PARSER_THREADS', 5)), description="Количество потоков для парсинга") # # @@ -81,8 +83,8 @@ class Settings(BaseModel): VARIANT_LIST_ITEM='.product-version-select li', PRODUCT_PAGE_NAME='h1.product-h1', ACTIVE_VOLUME='.product-version-select li.active', - PRICE_BLOCK='.product-sale-box .price span', - PRODUCT_UNAVAILABLE='.product-unavailable', + PRICE_BLOCK='.product-sale-box .price span, .price-value, .product-price, .price', + PRODUCT_UNAVAILABLE='.product-unavailable, .out-of-stock-message, .unavailable-message, .stock-status.out-of-stock, li.not-available, div.disabled', ) # diff --git a/src/main.py b/src/main.py index b5ada08..57b029f 100644 --- a/src/main.py +++ b/src/main.py @@ -50,6 +50,7 @@ def main(): logger.info(f" • Таймаут запросов: {settings.request_timeout}с") logger.info(f" • Задержка между запросами: {settings.delay_between_requests}с") logger.info(f" • Максимум попыток: {settings.max_retries}") + logger.info(f" • Количество потоков: {settings.num_parser_threads}") # config_errors = settings.validate_configuration() diff --git a/src/orchestrator.py b/src/orchestrator.py index 8f81dcb..eca6089 100644 --- a/src/orchestrator.py +++ b/src/orchestrator.py @@ -12,6 +12,7 @@ import requests from datetime import datetime from typing import List, Optional from contextlib import contextmanager +from concurrent.futures import ThreadPoolExecutor, as_completed from core.settings import Settings, ENABLE_RABBITMQ_EXPORT, ENABLE_CSV_EXPORT, ENABLE_DATABASE_EXPORT from core.models import ProductVariant @@ -48,7 +49,6 @@ class AppOrchestrator: # # Оркестратор владеет скрейпером и управляет его жизненным циклом. self.scraper = Scraper( - session=self.http_session, selectors=self.settings.selectors, base_url=self.settings.base_url ) @@ -155,41 +155,56 @@ class AppOrchestrator: # # + # + # description: "Вспомогательный метод для парсинга одного URL в отдельном потоке." + # postconditions: "Возвращает ProductVariant или None в случае ошибки." + # + # + def _scrape_single_url(self, url: str) -> Optional[ProductVariant]: + try: + self.logger.info(f"[ACTION:_scrape_single_url] Парсинг URL: {url.split('/')[-1]}") + variant_data = self.scraper.scrape_variant_page( + variant_url=url, + run_id=self.run_id + ) + if variant_data: + self.logger.debug(f"[ACTION:_scrape_single_url] Успешно спарсен URL: {url.split('/')[-1]}") + return variant_data + else: + self.logger.warning(f"[ACTION:_scrape_single_url] Данные не получены для URL: {url.split('/')[-1]}") + return None + except Exception as e: + self.logger.error(f"[ACTION:_scrape_single_url] Ошибка при парсинге URL ({url}): {e}") + return None + # + # - # description: "Шаг 3: Итеративный парсинг данных по списку URL." + # description: "Шаг 3: Параллельный парсинг данных по списку URL с использованием ThreadPoolExecutor." # # def _scrape_data(self, urls: List[str]): - """Итеративный парсинг данных.""" + """Параллельный парсинг данных.""" with self._error_context("scrape_data"): # total_to_scrape = len(urls) - self.logger.info(f"[ACTION:_scrape_data] Начало парсинга {total_to_scrape} URL вариантов.") + self.logger.info(f"[ACTION:_scrape_data] Начало параллельного парсинга {total_to_scrape} URL вариантов с {self.settings.num_parser_threads} потоками.") - for i, url in enumerate(urls): - # - try: - self.logger.info(f"[ACTION:_scrape_data] Парсинг URL {i+1}/{total_to_scrape}: {url.split('/')[-1]}") - time.sleep(1) - - # - variant_data = self.scraper.scrape_variant_page( - variant_url=url, - run_id=self.run_id - ) - - if variant_data: - self.final_data.append(variant_data) - self.stats['successful_parses'] += 1 - else: + with ThreadPoolExecutor(max_workers=self.settings.num_parser_threads) as executor: + future_to_url = {executor.submit(self._scrape_single_url, url): url for url in urls} + + for i, future in enumerate(as_completed(future_to_url)): + url = future_to_url[future] + try: + variant_data = future.result() + if variant_data: + self.final_data.append(variant_data) + self.stats['successful_parses'] += 1 + else: + self.stats['failed_parses'] += 1 + except Exception as e: self.stats['failed_parses'] += 1 + self.logger.error(f"[ACTION:_scrape_data] Необработанная ошибка в потоке для URL ({url}): {e}") - except Exception as e: - self.stats['failed_parses'] += 1 - self.logger.error(f"[ACTION:_scrape_data] Ошибка при парсинге URL {i+1}/{total_to_scrape} ({url}): {e}") - continue - # - self.logger.info(f"[ACTION:_scrape_data] Парсинг данных завершен. Всего собрано {len(self.final_data)} валидных вариантов.") self.logger.info(f"[STATS][ACTION:_scrape_data] Успешно: {self.stats['successful_parses']}, Ошибок: {self.stats['failed_parses']}") # diff --git a/src/scraper/engine.py b/src/scraper/engine.py index a3b2aa2..5c1fa08 100644 --- a/src/scraper/engine.py +++ b/src/scraper/engine.py @@ -26,25 +26,20 @@ from core.settings import ScraperSelectors # class Scraper: # - def __init__(self, session: requests.Session, selectors: ScraperSelectors, base_url: str): + def __init__(self, selectors: ScraperSelectors, base_url: str): """Инициализирует скрейпер с зависимостями: сессия, селекторы и базовый URL.""" # - self.session = session self.selectors = selectors self.base_url = base_url self.logger = logging.getLogger(self.__class__.__name__) # - - # - self._setup_retry_strategy() - # # # # description: "Настраивает retry-логику для HTTP-адаптера сессии." # # - def _setup_retry_strategy(self): + def _setup_retry_strategy(self, session: requests.Session): """Настраивает retry стратегию для HTTP запросов.""" # retry_strategy = Retry( @@ -54,12 +49,27 @@ class Scraper: allowed_methods=["HEAD", "GET", "OPTIONS"] ) adapter = HTTPAdapter(max_retries=retry_strategy) - self.session.mount("http://", adapter) - self.session.mount("https://", adapter) + session.mount("http://", adapter) + session.mount("https://", adapter) self.logger.debug("[HELPER:_setup_retry_strategy] Retry стратегия настроена для HTTP запросов.") # # + # + # description: "Пытается найти элемент, используя список CSS-селекторов." + # postconditions: "Возвращает найденный элемент BeautifulSoup или None." + # + # + def _find_element_with_multiple_selectors(self, soup: BeautifulSoup, selectors: List[str], log_prefix: str, element_name: str): + for selector in selectors: + element = soup.select_one(selector) + if element: + self.logger.debug(f"{log_prefix} [FOUND_ELEMENT] Элемент '{element_name}' найден с селектором: '{selector}'") + return element + self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Элемент '{element_name}' не найден ни с одним из селекторов: {selectors}") + return None + # + # # description: "Очищает строковое представление цены, оставляя только цифры." # postconditions: "Возвращает целое число или 0 в случае ошибки." @@ -93,7 +103,7 @@ class Scraper: # postconditions: "Возвращает текстовое содержимое страницы или None в случае любой ошибки." # # - def _fetch_page(self, url: str, request_id: str) -> Optional[str]: + def _fetch_page(self, session: requests.Session, url: str, request_id: str) -> Optional[str]: """Приватный метод для скачивания HTML-содержимого страницы.""" log_prefix = f"[HELPER:_fetch_page(id={request_id})]" self.logger.debug(f"{log_prefix} Запрос к URL: {url}") @@ -101,7 +111,7 @@ class Scraper: # try: # - response = self.session.get(url, timeout=30) + response = session.get(url, timeout=30) response.raise_for_status() if not response.text.strip(): @@ -119,7 +129,7 @@ class Scraper: self.logger.error(f"{log_prefix} [TIMEOUT] Превышено время ожидания для {url}") return None except requests.exceptions.ConnectionError as e: - self.logger.error(f"{log_prefix} [CONNECTION_ERROR] Ошибка соединения для {url}: {e}") + self.logger.error(f"{log_prefix} [CONNECTION_ERROR] Ошибка соединения дл�� {url}: {e}") return None except requests.exceptions.HTTPError as e: self.logger.error(f"{log_prefix} [HTTP_ERROR] HTTP ошибка для {url}: {e.response.status_code}") @@ -144,7 +154,9 @@ class Scraper: log_prefix = f"[ACTION:get_base_product_urls(id={run_id})]" self.logger.info(f"{log_prefix} Начало сбора базовых URL с: {catalog_url}") - html = self._fetch_page(catalog_url, f"get_base_urls(id={run_id})") + with requests.Session() as session: + self._setup_retry_strategy(session) + html = self._fetch_page(session, catalog_url, f"get_base_urls(id={run_id})") if not html: self.logger.error(f"{log_prefix} [CRITICAL] Не удалось получить HTML страницы каталога, возвращаю пустой список.") return [] @@ -196,7 +208,9 @@ class Scraper: for i, base_url in enumerate(base_product_urls): self.logger.info(f"{log_prefix} Обработка базового URL {i+1}/{total_base}: {base_url.split('/')[-1]}") - html = self._fetch_page(base_url, f"get_variant_urls(id={run_id})-{i+1}") + with requests.Session() as session: + self._setup_retry_strategy(session) + html = self._fetch_page(session, base_url, f"get_variant_urls(id={run_id})-{i+1}") if not html: self.logger.warning(f"{log_prefix} Пропуск базового URL из-за ошибки загрузки: {base_url}") continue @@ -224,9 +238,7 @@ class Scraper: all_variant_urls.append(base_url) # - time.sleep(0.5) - - self.logger.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.") + self.logger.info(f"{log_prefix} [COHERENCE_CHECK_PASSED] Обнаружено всего {len(all_variant_urls)} URL вариантов для парсинга.") return all_variant_urls # # @@ -240,9 +252,11 @@ class Scraper: def scrape_variant_page(self, variant_url: str, run_id: str) -> Optional[ProductVariant]: """Парсит страницу одного варианта и возвращает Pydantic-модель.""" log_prefix = f"[ACTION:scrape_variant_page(id={run_id}, url={variant_url.split('/')[-1]})]" - self.logger.info(f"{log_prefix} Начало парсинга страниц�� варианта.") + self.logger.info(f"{log_prefix} Начало парсинга страницы варианта.") - html = self._fetch_page(variant_url, f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})") + with requests.Session() as session: + self._setup_retry_strategy(session) + html = self._fetch_page(session, variant_url, f"scrape_variant(id={run_id}, url={variant_url.split('/')[-1]})") if not html: self.logger.warning(f"{log_prefix} Не удалось получить HTML страницы варианта, пропуск парсинга.") return None @@ -251,10 +265,10 @@ class Scraper: try: soup = BeautifulSoup(html, 'html.parser') - name_el = soup.select_one(self.selectors.product_page_name) - price_el = soup.select_one(self.selectors.price_block) - volume_el = soup.select_one(self.selectors.active_volume) - unavailable_el = soup.select_one(self.selectors.product_unavailable) + name_el = self._find_element_with_multiple_selectors(soup, [self.selectors.product_page_name, 'h1.product-title', 'h2.product-name'], log_prefix, 'имени продукта') + price_el = self._find_element_with_multiple_selectors(soup, [self.selectors.price_block, 'span.price', 'div.price'], log_prefix, 'цены') + volume_el = self._find_element_with_multiple_selectors(soup, [self.selectors.active_volume, '.product-options .active'], log_prefix, 'объема') + unavailable_el = self._find_element_with_multiple_selectors(soup, [self.selectors.product_unavailable, 'div.out-of-stock', 'span.unavailable'], log_prefix, 'отсутствия в наличии') # # Товар должен иметь имя. Цена или статус "нет в наличии" должны присутствовать. @@ -262,7 +276,7 @@ class Scraper: self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден элемент имени продукта с селектором: {self.selectors.product_page_name}") return None if not price_el and not unavailable_el: - self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден ни элемент цены, ни элемент отсутствия в наличии.") + self.logger.warning(f"{log_prefix} [MISSING_ELEMENT] Не найден ни элемент цены, ни элемент отсутствия в наличи��.") return None # @@ -293,6 +307,7 @@ class Scraper: # # try: + self.logger.debug(f"{log_prefix} [DEBUG_SCRAPE_DATA] Name: '{name}', Price: {price}, URL: '{variant_url}', IsInStock: {is_in_stock}") product = ProductVariant(name=name, volume=volume, price=price, url=variant_url, is_in_stock=is_in_stock) self.logger.debug(f"{log_prefix} [COHERENCE_CHECK_PASSED] Успешно распарсен вариант: '{product.name}' | InStock: {product.is_in_stock} | Price: '{product.price}'") return product @@ -311,4 +326,4 @@ class Scraper: # # # -# +# \ No newline at end of file