chore: commit remaining workspace changes
This commit is contained in:
Submodule backend/git_repos/10 updated: 3c0ade67f9...dec289695f
@@ -206,6 +206,42 @@ def _resolve_dashboard_id_from_ref(
|
||||
raise HTTPException(status_code=404, detail="Dashboard not found")
|
||||
# [/DEF:_resolve_dashboard_id_from_ref:Function]
|
||||
|
||||
|
||||
# [DEF:_normalize_filter_values:Function]
|
||||
# @PURPOSE: Normalize query filter values to lower-cased non-empty tokens.
|
||||
# @PRE: values may be None or list of strings.
|
||||
# @POST: Returns trimmed normalized list preserving input order.
|
||||
def _normalize_filter_values(values: Optional[List[str]]) -> List[str]:
|
||||
if not values:
|
||||
return []
|
||||
normalized: List[str] = []
|
||||
for value in values:
|
||||
token = str(value or "").strip().lower()
|
||||
if token:
|
||||
normalized.append(token)
|
||||
return normalized
|
||||
# [/DEF:_normalize_filter_values:Function]
|
||||
|
||||
|
||||
# [DEF:_dashboard_git_filter_value:Function]
|
||||
# @PURPOSE: Build comparable git status token for dashboards filtering.
|
||||
# @PRE: dashboard payload may contain git_status or None.
|
||||
# @POST: Returns one of ok|diff|no_repo|error|pending.
|
||||
def _dashboard_git_filter_value(dashboard: Dict[str, Any]) -> str:
|
||||
git_status = dashboard.get("git_status") or {}
|
||||
sync_status = str(git_status.get("sync_status") or "").strip().upper()
|
||||
has_repo = git_status.get("has_repo")
|
||||
if has_repo is False or sync_status == "NO_REPO":
|
||||
return "no_repo"
|
||||
if sync_status == "DIFF":
|
||||
return "diff"
|
||||
if sync_status == "OK":
|
||||
return "ok"
|
||||
if sync_status == "ERROR":
|
||||
return "error"
|
||||
return "pending"
|
||||
# [/DEF:_dashboard_git_filter_value:Function]
|
||||
|
||||
# [DEF:get_dashboards:Function]
|
||||
# @PURPOSE: Fetch list of dashboards from a specific environment with Git status and last task status
|
||||
# @PRE: env_id must be a valid environment ID
|
||||
@@ -225,6 +261,11 @@ async def get_dashboards(
|
||||
search: Optional[str] = None,
|
||||
page: int = 1,
|
||||
page_size: int = 10,
|
||||
filter_title: Optional[List[str]] = Query(default=None),
|
||||
filter_git_status: Optional[List[str]] = Query(default=None),
|
||||
filter_llm_status: Optional[List[str]] = Query(default=None),
|
||||
filter_changed_on: Optional[List[str]] = Query(default=None),
|
||||
filter_actor: Optional[List[str]] = Query(default=None),
|
||||
config_manager=Depends(get_config_manager),
|
||||
task_manager=Depends(get_task_manager),
|
||||
resource_service=Depends(get_resource_service),
|
||||
@@ -249,9 +290,23 @@ async def get_dashboards(
|
||||
try:
|
||||
# Get all tasks for status lookup
|
||||
all_tasks = task_manager.get_all_tasks()
|
||||
title_filters = _normalize_filter_values(filter_title)
|
||||
git_filters = _normalize_filter_values(filter_git_status)
|
||||
llm_filters = _normalize_filter_values(filter_llm_status)
|
||||
changed_on_filters = _normalize_filter_values(filter_changed_on)
|
||||
actor_filters = _normalize_filter_values(filter_actor)
|
||||
has_column_filters = any(
|
||||
(
|
||||
title_filters,
|
||||
git_filters,
|
||||
llm_filters,
|
||||
changed_on_filters,
|
||||
actor_filters,
|
||||
)
|
||||
)
|
||||
|
||||
# Fast path: real ResourceService -> one Superset page call per API request.
|
||||
if isinstance(resource_service, ResourceService):
|
||||
if isinstance(resource_service, ResourceService) and not has_column_filters:
|
||||
try:
|
||||
page_payload = await resource_service.get_dashboards_page_with_status(
|
||||
env,
|
||||
@@ -288,6 +343,60 @@ async def get_dashboards(
|
||||
start_idx = (page - 1) * page_size
|
||||
end_idx = start_idx + page_size
|
||||
paginated_dashboards = dashboards[start_idx:end_idx]
|
||||
elif isinstance(resource_service, ResourceService) and has_column_filters:
|
||||
dashboards = await resource_service.get_dashboards_with_status(
|
||||
env,
|
||||
all_tasks,
|
||||
include_git_status=bool(git_filters),
|
||||
)
|
||||
|
||||
if search:
|
||||
search_lower = search.lower()
|
||||
dashboards = [
|
||||
d for d in dashboards
|
||||
if search_lower in d.get("title", "").lower()
|
||||
or search_lower in d.get("slug", "").lower()
|
||||
]
|
||||
|
||||
def _matches_dashboard_filters(dashboard: Dict[str, Any]) -> bool:
|
||||
title_value = str(dashboard.get("title") or "").strip().lower()
|
||||
if title_filters and title_value not in title_filters:
|
||||
return False
|
||||
|
||||
if git_filters:
|
||||
git_value = _dashboard_git_filter_value(dashboard)
|
||||
if git_value not in git_filters:
|
||||
return False
|
||||
|
||||
llm_value = str(
|
||||
((dashboard.get("last_task") or {}).get("validation_status"))
|
||||
or "UNKNOWN"
|
||||
).strip().lower()
|
||||
if llm_filters and llm_value not in llm_filters:
|
||||
return False
|
||||
|
||||
changed_on_raw = str(dashboard.get("last_modified") or "").strip().lower()
|
||||
changed_on_prefix = changed_on_raw[:10] if len(changed_on_raw) >= 10 else changed_on_raw
|
||||
if changed_on_filters and changed_on_raw not in changed_on_filters and changed_on_prefix not in changed_on_filters:
|
||||
return False
|
||||
|
||||
owners = dashboard.get("owners") or []
|
||||
if isinstance(owners, list):
|
||||
actor_value = ", ".join(str(item).strip() for item in owners if str(item).strip()).lower()
|
||||
else:
|
||||
actor_value = str(owners).strip().lower()
|
||||
if not actor_value:
|
||||
actor_value = "-"
|
||||
if actor_filters and actor_value not in actor_filters:
|
||||
return False
|
||||
return True
|
||||
|
||||
dashboards = [d for d in dashboards if _matches_dashboard_filters(d)]
|
||||
total = len(dashboards)
|
||||
total_pages = (total + page_size - 1) // page_size if total > 0 else 1
|
||||
start_idx = (page - 1) * page_size
|
||||
end_idx = start_idx + page_size
|
||||
paginated_dashboards = dashboards[start_idx:end_idx]
|
||||
else:
|
||||
# Compatibility path for mocked services in route tests.
|
||||
dashboards = await resource_service.get_dashboards_with_status(
|
||||
|
||||
@@ -20,6 +20,18 @@ from ...core.logger import belief_scope
|
||||
|
||||
router = APIRouter(prefix="/api/environments", tags=["Environments"])
|
||||
|
||||
|
||||
# [DEF:_normalize_superset_env_url:Function]
|
||||
# @PURPOSE: Canonicalize Superset environment URL to base host/path without trailing /api/v1.
|
||||
# @PRE: raw_url can be empty.
|
||||
# @POST: Returns normalized base URL.
|
||||
def _normalize_superset_env_url(raw_url: str) -> str:
|
||||
normalized = str(raw_url or "").strip().rstrip("/")
|
||||
if normalized.lower().endswith("/api/v1"):
|
||||
normalized = normalized[:-len("/api/v1")]
|
||||
return normalized.rstrip("/")
|
||||
# [/DEF:_normalize_superset_env_url:Function]
|
||||
|
||||
# [DEF:ScheduleSchema:DataClass]
|
||||
class ScheduleSchema(BaseModel):
|
||||
enabled: bool = False
|
||||
@@ -70,7 +82,7 @@ async def get_environments(
|
||||
EnvironmentResponse(
|
||||
id=e.id,
|
||||
name=e.name,
|
||||
url=e.url,
|
||||
url=_normalize_superset_env_url(e.url),
|
||||
stage=resolved_stage,
|
||||
is_production=(resolved_stage == "PROD"),
|
||||
backup_schedule=ScheduleSchema(
|
||||
|
||||
@@ -31,7 +31,38 @@ class LoggingConfigResponse(BaseModel):
|
||||
enable_belief_state: bool
|
||||
# [/DEF:LoggingConfigResponse:Class]
|
||||
|
||||
router = APIRouter()
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
# [DEF:_normalize_superset_env_url:Function]
|
||||
# @PURPOSE: Canonicalize Superset environment URL to base host/path without trailing /api/v1.
|
||||
# @PRE: raw_url can be empty.
|
||||
# @POST: Returns normalized base URL.
|
||||
def _normalize_superset_env_url(raw_url: str) -> str:
|
||||
normalized = str(raw_url or "").strip().rstrip("/")
|
||||
if normalized.lower().endswith("/api/v1"):
|
||||
normalized = normalized[:-len("/api/v1")]
|
||||
return normalized.rstrip("/")
|
||||
# [/DEF:_normalize_superset_env_url:Function]
|
||||
|
||||
|
||||
# [DEF:_validate_superset_connection_fast:Function]
|
||||
# @PURPOSE: Run lightweight Superset connectivity validation without full pagination scan.
|
||||
# @PRE: env contains valid URL and credentials.
|
||||
# @POST: Raises on auth/API failures; returns None on success.
|
||||
def _validate_superset_connection_fast(env: Environment) -> None:
|
||||
client = SupersetClient(env)
|
||||
# 1) Explicit auth check
|
||||
client.authenticate()
|
||||
# 2) Single lightweight API call to ensure read access
|
||||
client.get_dashboards_page(
|
||||
query={
|
||||
"page": 0,
|
||||
"page_size": 1,
|
||||
"columns": ["id"],
|
||||
}
|
||||
)
|
||||
# [/DEF:_validate_superset_connection_fast:Function]
|
||||
|
||||
# [DEF:get_settings:Function]
|
||||
# @PURPOSE: Retrieves all application settings.
|
||||
@@ -112,14 +143,18 @@ async def update_storage_settings(
|
||||
# @PRE: Config manager is available.
|
||||
# @POST: Returns list of environments.
|
||||
# @RETURN: List[Environment] - List of environments.
|
||||
@router.get("/environments", response_model=List[Environment])
|
||||
async def get_environments(
|
||||
@router.get("/environments", response_model=List[Environment])
|
||||
async def get_environments(
|
||||
config_manager: ConfigManager = Depends(get_config_manager),
|
||||
_ = Depends(has_permission("admin:settings", "READ"))
|
||||
):
|
||||
with belief_scope("get_environments"):
|
||||
logger.info("[get_environments][Entry] Fetching environments")
|
||||
return config_manager.get_environments()
|
||||
):
|
||||
with belief_scope("get_environments"):
|
||||
logger.info("[get_environments][Entry] Fetching environments")
|
||||
environments = config_manager.get_environments()
|
||||
return [
|
||||
env.copy(update={"url": _normalize_superset_env_url(env.url)})
|
||||
for env in environments
|
||||
]
|
||||
# [/DEF:get_environments:Function]
|
||||
|
||||
# [DEF:add_environment:Function]
|
||||
@@ -129,21 +164,21 @@ async def get_environments(
|
||||
# @PARAM: env (Environment) - The environment to add.
|
||||
# @RETURN: Environment - The added environment.
|
||||
@router.post("/environments", response_model=Environment)
|
||||
async def add_environment(
|
||||
env: Environment,
|
||||
async def add_environment(
|
||||
env: Environment,
|
||||
config_manager: ConfigManager = Depends(get_config_manager),
|
||||
_ = Depends(has_permission("admin:settings", "WRITE"))
|
||||
):
|
||||
with belief_scope("add_environment"):
|
||||
logger.info(f"[add_environment][Entry] Adding environment {env.id}")
|
||||
):
|
||||
with belief_scope("add_environment"):
|
||||
logger.info(f"[add_environment][Entry] Adding environment {env.id}")
|
||||
env = env.copy(update={"url": _normalize_superset_env_url(env.url)})
|
||||
|
||||
# Validate connection before adding
|
||||
try:
|
||||
client = SupersetClient(env)
|
||||
client.get_dashboards(query={"page_size": 1})
|
||||
except Exception as e:
|
||||
logger.error(f"[add_environment][Coherence:Failed] Connection validation failed: {e}")
|
||||
raise HTTPException(status_code=400, detail=f"Connection validation failed: {e}")
|
||||
# Validate connection before adding (fast path)
|
||||
try:
|
||||
_validate_superset_connection_fast(env)
|
||||
except Exception as e:
|
||||
logger.error(f"[add_environment][Coherence:Failed] Connection validation failed: {e}")
|
||||
raise HTTPException(status_code=400, detail=f"Connection validation failed: {e}")
|
||||
|
||||
config_manager.add_environment(env)
|
||||
return env
|
||||
@@ -157,28 +192,29 @@ async def add_environment(
|
||||
# @PARAM: env (Environment) - The updated environment data.
|
||||
# @RETURN: Environment - The updated environment.
|
||||
@router.put("/environments/{id}", response_model=Environment)
|
||||
async def update_environment(
|
||||
async def update_environment(
|
||||
id: str,
|
||||
env: Environment,
|
||||
config_manager: ConfigManager = Depends(get_config_manager)
|
||||
):
|
||||
):
|
||||
with belief_scope("update_environment"):
|
||||
logger.info(f"[update_environment][Entry] Updating environment {id}")
|
||||
|
||||
# If password is masked, we need the real one for validation
|
||||
env_to_validate = env.copy(deep=True)
|
||||
env = env.copy(update={"url": _normalize_superset_env_url(env.url)})
|
||||
|
||||
# If password is masked, we need the real one for validation
|
||||
env_to_validate = env.copy(deep=True)
|
||||
if env_to_validate.password == "********":
|
||||
old_env = next((e for e in config_manager.get_environments() if e.id == id), None)
|
||||
if old_env:
|
||||
env_to_validate.password = old_env.password
|
||||
|
||||
# Validate connection before updating
|
||||
try:
|
||||
client = SupersetClient(env_to_validate)
|
||||
client.get_dashboards(query={"page_size": 1})
|
||||
except Exception as e:
|
||||
logger.error(f"[update_environment][Coherence:Failed] Connection validation failed: {e}")
|
||||
raise HTTPException(status_code=400, detail=f"Connection validation failed: {e}")
|
||||
# Validate connection before updating (fast path)
|
||||
try:
|
||||
_validate_superset_connection_fast(env_to_validate)
|
||||
except Exception as e:
|
||||
logger.error(f"[update_environment][Coherence:Failed] Connection validation failed: {e}")
|
||||
raise HTTPException(status_code=400, detail=f"Connection validation failed: {e}")
|
||||
|
||||
if config_manager.update_environment(id, env):
|
||||
return env
|
||||
@@ -208,7 +244,7 @@ async def delete_environment(
|
||||
# @PARAM: id (str) - The ID of the environment to test.
|
||||
# @RETURN: dict - Success message or error.
|
||||
@router.post("/environments/{id}/test")
|
||||
async def test_environment_connection(
|
||||
async def test_environment_connection(
|
||||
id: str,
|
||||
config_manager: ConfigManager = Depends(get_config_manager)
|
||||
):
|
||||
@@ -220,15 +256,11 @@ async def test_environment_connection(
|
||||
if not env:
|
||||
raise HTTPException(status_code=404, detail=f"Environment {id} not found")
|
||||
|
||||
try:
|
||||
# Initialize client (this will trigger authentication)
|
||||
client = SupersetClient(env)
|
||||
|
||||
# Try a simple request to verify
|
||||
client.get_dashboards(query={"page_size": 1})
|
||||
|
||||
logger.info(f"[test_environment_connection][Coherence:OK] Connection successful for {id}")
|
||||
return {"status": "success", "message": "Connection successful"}
|
||||
try:
|
||||
_validate_superset_connection_fast(env)
|
||||
|
||||
logger.info(f"[test_environment_connection][Coherence:OK] Connection successful for {id}")
|
||||
return {"status": "success", "message": "Connection successful"}
|
||||
except Exception as e:
|
||||
logger.error(f"[test_environment_connection][Coherence:Failed] Connection failed for {id}: {e}")
|
||||
return {"status": "error", "message": str(e)}
|
||||
|
||||
@@ -101,7 +101,8 @@ class APIClient:
|
||||
def __init__(self, config: Dict[str, Any], verify_ssl: bool = True, timeout: int = DEFAULT_TIMEOUT):
|
||||
with belief_scope("__init__"):
|
||||
app_logger.info("[APIClient.__init__][Entry] Initializing APIClient.")
|
||||
self.base_url: str = config.get("base_url", "")
|
||||
self.base_url: str = self._normalize_base_url(config.get("base_url", ""))
|
||||
self.api_base_url: str = f"{self.base_url}/api/v1"
|
||||
self.auth = config.get("auth")
|
||||
self.request_settings = {"verify_ssl": verify_ssl, "timeout": timeout}
|
||||
self.session = self._init_session()
|
||||
@@ -156,6 +157,34 @@ class APIClient:
|
||||
return session
|
||||
# [/DEF:_init_session:Function]
|
||||
|
||||
# [DEF:_normalize_base_url:Function]
|
||||
# @PURPOSE: Normalize Superset environment URL to base host/path without trailing slash and /api/v1 suffix.
|
||||
# @PRE: raw_url can be empty.
|
||||
# @POST: Returns canonical base URL suitable for building API endpoints.
|
||||
# @RETURN: str
|
||||
def _normalize_base_url(self, raw_url: str) -> str:
|
||||
normalized = str(raw_url or "").strip().rstrip("/")
|
||||
if normalized.lower().endswith("/api/v1"):
|
||||
normalized = normalized[:-len("/api/v1")]
|
||||
return normalized.rstrip("/")
|
||||
# [/DEF:_normalize_base_url:Function]
|
||||
|
||||
# [DEF:_build_api_url:Function]
|
||||
# @PURPOSE: Build absolute Superset API URL for endpoint using canonical /api/v1 base.
|
||||
# @PRE: endpoint is relative path or absolute URL.
|
||||
# @POST: Returns full URL without accidental duplicate slashes.
|
||||
# @RETURN: str
|
||||
def _build_api_url(self, endpoint: str) -> str:
|
||||
normalized_endpoint = str(endpoint or "").strip()
|
||||
if normalized_endpoint.startswith("http://") or normalized_endpoint.startswith("https://"):
|
||||
return normalized_endpoint
|
||||
if not normalized_endpoint.startswith("/"):
|
||||
normalized_endpoint = f"/{normalized_endpoint}"
|
||||
if normalized_endpoint.startswith("/api/v1/") or normalized_endpoint == "/api/v1":
|
||||
return f"{self.base_url}{normalized_endpoint}"
|
||||
return f"{self.api_base_url}{normalized_endpoint}"
|
||||
# [/DEF:_build_api_url:Function]
|
||||
|
||||
# [DEF:authenticate:Function]
|
||||
# @PURPOSE: Выполняет аутентификацию в Superset API и получает access и CSRF токены.
|
||||
# @PRE: self.auth and self.base_url must be valid.
|
||||
@@ -166,7 +195,7 @@ class APIClient:
|
||||
with belief_scope("authenticate"):
|
||||
app_logger.info("[authenticate][Enter] Authenticating to %s", self.base_url)
|
||||
try:
|
||||
login_url = f"{self.base_url}/security/login"
|
||||
login_url = f"{self.api_base_url}/security/login"
|
||||
# Log the payload keys and values (masking password)
|
||||
masked_auth = {k: ("******" if k == "password" else v) for k, v in self.auth.items()}
|
||||
app_logger.info(f"[authenticate][Debug] Login URL: {login_url}")
|
||||
@@ -180,7 +209,7 @@ class APIClient:
|
||||
response.raise_for_status()
|
||||
access_token = response.json()["access_token"]
|
||||
|
||||
csrf_url = f"{self.base_url}/security/csrf_token/"
|
||||
csrf_url = f"{self.api_base_url}/security/csrf_token/"
|
||||
csrf_response = self.session.get(csrf_url, headers={"Authorization": f"Bearer {access_token}"}, timeout=self.request_settings["timeout"])
|
||||
csrf_response.raise_for_status()
|
||||
|
||||
@@ -224,7 +253,7 @@ class APIClient:
|
||||
# @RETURN: `requests.Response` если `raw_response=True`, иначе `dict`.
|
||||
# @THROW: SupersetAPIError, NetworkError и их подклассы.
|
||||
def request(self, method: str, endpoint: str, headers: Optional[Dict] = None, raw_response: bool = False, **kwargs) -> Union[requests.Response, Dict[str, Any]]:
|
||||
full_url = f"{self.base_url}{endpoint}"
|
||||
full_url = self._build_api_url(endpoint)
|
||||
_headers = self.headers.copy()
|
||||
if headers:
|
||||
_headers.update(headers)
|
||||
@@ -288,7 +317,7 @@ class APIClient:
|
||||
# @THROW: SupersetAPIError, NetworkError, TypeError.
|
||||
def upload_file(self, endpoint: str, file_info: Dict[str, Any], extra_data: Optional[Dict] = None, timeout: Optional[int] = None) -> Dict:
|
||||
with belief_scope("upload_file"):
|
||||
full_url = f"{self.base_url}{endpoint}"
|
||||
full_url = self._build_api_url(endpoint)
|
||||
_headers = self.headers.copy()
|
||||
_headers.pop('Content-Type', None)
|
||||
|
||||
|
||||
@@ -791,6 +791,28 @@ class GitService:
|
||||
}
|
||||
# [/DEF:_parse_remote_repo_identity:Function]
|
||||
|
||||
# [DEF:_derive_server_url_from_remote:Function]
|
||||
# @PURPOSE: Build API base URL from remote repository URL without credentials.
|
||||
# @PRE: remote_url may be any git URL.
|
||||
# @POST: Returns normalized http(s) base URL or None when derivation is impossible.
|
||||
# @RETURN: Optional[str]
|
||||
def _derive_server_url_from_remote(self, remote_url: str) -> Optional[str]:
|
||||
normalized = str(remote_url or "").strip()
|
||||
if not normalized or normalized.startswith("git@"):
|
||||
return None
|
||||
|
||||
parsed = urlparse(normalized)
|
||||
if parsed.scheme not in {"http", "https"}:
|
||||
return None
|
||||
if not parsed.hostname:
|
||||
return None
|
||||
|
||||
netloc = parsed.hostname
|
||||
if parsed.port:
|
||||
netloc = f"{netloc}:{parsed.port}"
|
||||
return f"{parsed.scheme}://{netloc}".rstrip("/")
|
||||
# [/DEF:_derive_server_url_from_remote:Function]
|
||||
|
||||
# [DEF:promote_direct_merge:Function]
|
||||
# @PURPOSE: Perform direct merge between branches in local repo and push target branch.
|
||||
# @PRE: Repository exists and both branches are valid.
|
||||
@@ -878,13 +900,32 @@ class GitService:
|
||||
"base": to_branch,
|
||||
"body": description or "",
|
||||
}
|
||||
data = await self._gitea_request(
|
||||
"POST",
|
||||
server_url,
|
||||
pat,
|
||||
f"/repos/{identity['namespace']}/{identity['repo']}/pulls",
|
||||
payload=payload,
|
||||
)
|
||||
endpoint = f"/repos/{identity['namespace']}/{identity['repo']}/pulls"
|
||||
try:
|
||||
data = await self._gitea_request(
|
||||
"POST",
|
||||
server_url,
|
||||
pat,
|
||||
endpoint,
|
||||
payload=payload,
|
||||
)
|
||||
except HTTPException as exc:
|
||||
fallback_url = self._derive_server_url_from_remote(remote_url)
|
||||
normalized_primary = self._normalize_git_server_url(server_url)
|
||||
if exc.status_code != 404 or not fallback_url or fallback_url == normalized_primary:
|
||||
raise
|
||||
|
||||
logger.warning(
|
||||
"[create_gitea_pull_request][Action] Primary Gitea URL not found, retrying with remote host: %s",
|
||||
fallback_url,
|
||||
)
|
||||
data = await self._gitea_request(
|
||||
"POST",
|
||||
fallback_url,
|
||||
pat,
|
||||
endpoint,
|
||||
payload=payload,
|
||||
)
|
||||
return {
|
||||
"id": data.get("number") or data.get("id"),
|
||||
"url": data.get("html_url") or data.get("url"),
|
||||
|
||||
67
backend/tests/core/test_git_service_gitea_pr.py
Normal file
67
backend/tests/core/test_git_service_gitea_pr.py
Normal file
@@ -0,0 +1,67 @@
|
||||
# [DEF:backend.tests.core.test_git_service_gitea_pr:Module]
|
||||
# @TIER: STANDARD
|
||||
# @SEMANTICS: tests, git, gitea, pull_request, fallback
|
||||
# @PURPOSE: Validate Gitea PR creation fallback behavior when configured server URL is stale.
|
||||
# @LAYER: Domain
|
||||
# @RELATION: TESTS -> backend.src.services.git_service.create_gitea_pull_request
|
||||
# @INVARIANT: A 404 from primary Gitea URL retries once against remote-url host when different.
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from fastapi import HTTPException
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
from src.services.git_service import GitService
|
||||
|
||||
|
||||
# [DEF:test_derive_server_url_from_remote_strips_credentials:Function]
|
||||
# @PURPOSE: Ensure helper returns host base URL and removes embedded credentials.
|
||||
# @PRE: remote_url is an https URL with username/token.
|
||||
# @POST: Result is scheme+host only.
|
||||
def test_derive_server_url_from_remote_strips_credentials():
|
||||
service = GitService(base_path="test_repos")
|
||||
derived = service._derive_server_url_from_remote(
|
||||
"https://oauth2:token@giteabusya.bebesh.ru/busya/covid-vaccine-dashboard.git"
|
||||
)
|
||||
assert derived == "https://giteabusya.bebesh.ru"
|
||||
# [/DEF:test_derive_server_url_from_remote_strips_credentials:Function]
|
||||
|
||||
|
||||
# [DEF:test_create_gitea_pull_request_retries_with_remote_host_on_404:Function]
|
||||
# @PURPOSE: Verify create_gitea_pull_request retries with remote URL host after primary 404.
|
||||
# @PRE: primary server_url differs from remote_url host.
|
||||
# @POST: Method returns success payload from fallback request.
|
||||
def test_create_gitea_pull_request_retries_with_remote_host_on_404(monkeypatch):
|
||||
service = GitService(base_path="test_repos")
|
||||
calls = []
|
||||
|
||||
async def fake_gitea_request(method, server_url, pat, endpoint, payload=None):
|
||||
calls.append((method, server_url, endpoint))
|
||||
if len(calls) == 1:
|
||||
raise HTTPException(status_code=404, detail="Gitea API error: The target couldn't be found.")
|
||||
return {"number": 42, "html_url": "https://giteabusya.bebesh.ru/busya/covid-vaccine-dashboard/pulls/42", "state": "open"}
|
||||
|
||||
monkeypatch.setattr(service, "_gitea_request", fake_gitea_request)
|
||||
|
||||
result = asyncio.run(
|
||||
service.create_gitea_pull_request(
|
||||
server_url="https://gitea.bebesh.ru",
|
||||
pat="secret",
|
||||
remote_url="https://oauth2:secret@giteabusya.bebesh.ru/busya/covid-vaccine-dashboard.git",
|
||||
from_branch="ss-dev",
|
||||
to_branch="main",
|
||||
title="Promote ss-dev -> main",
|
||||
description="",
|
||||
)
|
||||
)
|
||||
|
||||
assert result["id"] == 42
|
||||
assert len(calls) == 2
|
||||
assert calls[0][1] == "https://gitea.bebesh.ru"
|
||||
assert calls[1][1] == "https://giteabusya.bebesh.ru"
|
||||
# [/DEF:test_create_gitea_pull_request_retries_with_remote_host_on_404:Function]
|
||||
|
||||
# [/DEF:backend.tests.core.test_git_service_gitea_pr:Module]
|
||||
Reference in New Issue
Block a user