# [DEF:backend.src.services.git_service:Module] # # @SEMANTICS: git, service, gitpython, repository, version_control # @PURPOSE: Core Git logic using GitPython to manage dashboard repositories. # @LAYER: Service # @RELATION: INHERITS_FROM -> None # @RELATION: USED_BY -> src.api.routes.git # @RELATION: USED_BY -> src.plugins.git_plugin # # @INVARIANT: All Git operations must be performed on a valid local directory. import os import httpx from git import Repo from fastapi import HTTPException from typing import Any, Dict, List, Optional from datetime import datetime from urllib.parse import quote, urlparse from src.core.logger import logger, belief_scope from src.models.git import GitProvider # [DEF:GitService:Class] # @PURPOSE: Wrapper for GitPython operations with semantic logging and error handling. class GitService: """ Wrapper for GitPython operations. """ # [DEF:__init__:Function] # @PURPOSE: Initializes the GitService with a base path for repositories. # @PARAM: base_path (str) - Root directory for all Git clones. # @PRE: base_path is a valid string path. # @POST: GitService is initialized; base_path directory exists. def __init__(self, base_path: str = "git_repos"): with belief_scope("GitService.__init__"): # Resolve relative to the backend directory # Path(__file__) is backend/src/services/git_service.py # parents[2] is backend/ from pathlib import Path backend_root = Path(__file__).parents[2] self.base_path = str((backend_root / base_path).resolve()) if not os.path.exists(self.base_path): os.makedirs(self.base_path) # [/DEF:__init__:Function] # [DEF:_get_repo_path:Function] # @PURPOSE: Resolves the local filesystem path for a dashboard's repository. # @PARAM: dashboard_id (int) # @PRE: dashboard_id is an integer. # @POST: Returns the absolute or relative path to the dashboard's repo. # @RETURN: str def _get_repo_path(self, dashboard_id: int) -> str: with belief_scope("GitService._get_repo_path"): if dashboard_id is None: raise ValueError("dashboard_id cannot be None") return os.path.join(self.base_path, str(dashboard_id)) # [/DEF:_get_repo_path:Function] # [DEF:init_repo:Function] # @PURPOSE: Initialize or clone a repository for a dashboard. # @PARAM: dashboard_id (int) # @PARAM: remote_url (str) # @PARAM: pat (str) - Personal Access Token for authentication. # @PRE: dashboard_id is int, remote_url is valid Git URL, pat is provided. # @POST: Repository is cloned or opened at the local path. # @RETURN: Repo - GitPython Repo object. def init_repo(self, dashboard_id: int, remote_url: str, pat: str) -> Repo: with belief_scope("GitService.init_repo"): repo_path = self._get_repo_path(dashboard_id) # Inject PAT into remote URL if needed if pat and "://" in remote_url: proto, rest = remote_url.split("://", 1) auth_url = f"{proto}://oauth2:{pat}@{rest}" else: auth_url = remote_url if os.path.exists(repo_path): logger.info(f"[init_repo][Action] Opening existing repo at {repo_path}") return Repo(repo_path) logger.info(f"[init_repo][Action] Cloning {remote_url} to {repo_path}") return Repo.clone_from(auth_url, repo_path) # [/DEF:init_repo:Function] # [DEF:get_repo:Function] # @PURPOSE: Get Repo object for a dashboard. # @PRE: Repository must exist on disk for the given dashboard_id. # @POST: Returns a GitPython Repo instance for the dashboard. # @RETURN: Repo def get_repo(self, dashboard_id: int) -> Repo: with belief_scope("GitService.get_repo"): repo_path = self._get_repo_path(dashboard_id) if not os.path.exists(repo_path): logger.error(f"[get_repo][Coherence:Failed] Repository for dashboard {dashboard_id} does not exist") raise HTTPException(status_code=404, detail=f"Repository for dashboard {dashboard_id} not found") try: return Repo(repo_path) except Exception as e: logger.error(f"[get_repo][Coherence:Failed] Failed to open repository at {repo_path}: {e}") raise HTTPException(status_code=500, detail="Failed to open local Git repository") # [/DEF:get_repo:Function] # [DEF:list_branches:Function] # @PURPOSE: List all branches for a dashboard's repository. # @PRE: Repository for dashboard_id exists. # @POST: Returns a list of branch metadata dictionaries. # @RETURN: List[dict] def list_branches(self, dashboard_id: int) -> List[dict]: with belief_scope("GitService.list_branches"): repo = self.get_repo(dashboard_id) logger.info(f"[list_branches][Action] Listing branches for {dashboard_id}. Refs: {repo.refs}") branches = [] # Add existing refs for ref in repo.refs: try: # Strip prefixes for UI name = ref.name.replace('refs/heads/', '').replace('refs/remotes/origin/', '') # Avoid duplicates (e.g. local and remote with same name) if any(b['name'] == name for b in branches): continue branches.append({ "name": name, "commit_hash": ref.commit.hexsha if hasattr(ref, 'commit') else "0000000", "is_remote": ref.is_remote() if hasattr(ref, 'is_remote') else False, "last_updated": datetime.fromtimestamp(ref.commit.committed_date) if hasattr(ref, 'commit') else datetime.utcnow() }) except Exception as e: logger.warning(f"[list_branches][Action] Skipping ref {ref}: {e}") # Ensure the current active branch is in the list even if it has no commits or refs try: active_name = repo.active_branch.name if not any(b['name'] == active_name for b in branches): branches.append({ "name": active_name, "commit_hash": "0000000", "is_remote": False, "last_updated": datetime.utcnow() }) except Exception as e: logger.warning(f"[list_branches][Action] Could not determine active branch: {e}") # If everything else failed and list is still empty, add default if not branches: branches.append({ "name": "main", "commit_hash": "0000000", "is_remote": False, "last_updated": datetime.utcnow() }) return branches # [/DEF:list_branches:Function] # [DEF:create_branch:Function] # @PURPOSE: Create a new branch from an existing one. # @PARAM: name (str) - New branch name. # @PARAM: from_branch (str) - Source branch. # @PRE: Repository exists; name is valid; from_branch exists or repo is empty. # @POST: A new branch is created in the repository. def create_branch(self, dashboard_id: int, name: str, from_branch: str = "main"): with belief_scope("GitService.create_branch"): repo = self.get_repo(dashboard_id) logger.info(f"[create_branch][Action] Creating branch {name} from {from_branch}") # Handle empty repository case (no commits) if not repo.heads and not repo.remotes: logger.warning("[create_branch][Action] Repository is empty. Creating initial commit to enable branching.") readme_path = os.path.join(repo.working_dir, "README.md") if not os.path.exists(readme_path): with open(readme_path, "w") as f: f.write(f"# Dashboard {dashboard_id}\nGit repository for Superset dashboard integration.") repo.index.add(["README.md"]) repo.index.commit("Initial commit") # Verify source branch exists try: repo.commit(from_branch) except Exception: logger.warning(f"[create_branch][Action] Source branch {from_branch} not found, using HEAD") from_branch = repo.head try: new_branch = repo.create_head(name, from_branch) return new_branch except Exception as e: logger.error(f"[create_branch][Coherence:Failed] {e}") raise # [/DEF:create_branch:Function] # [DEF:checkout_branch:Function] # @PURPOSE: Switch to a specific branch. # @PRE: Repository exists and the specified branch name exists. # @POST: The repository working directory is updated to the specified branch. def checkout_branch(self, dashboard_id: int, name: str): with belief_scope("GitService.checkout_branch"): repo = self.get_repo(dashboard_id) logger.info(f"[checkout_branch][Action] Checking out branch {name}") repo.git.checkout(name) # [/DEF:checkout_branch:Function] # [DEF:commit_changes:Function] # @PURPOSE: Stage and commit changes. # @PARAM: message (str) - Commit message. # @PARAM: files (List[str]) - Optional list of specific files to stage. # @PRE: Repository exists and has changes (dirty) or files are specified. # @POST: Changes are staged and a new commit is created. def commit_changes(self, dashboard_id: int, message: str, files: List[str] = None): with belief_scope("GitService.commit_changes"): repo = self.get_repo(dashboard_id) # Check if there are any changes to commit if not repo.is_dirty(untracked_files=True) and not files: logger.info(f"[commit_changes][Action] No changes to commit for dashboard {dashboard_id}") return if files: logger.info(f"[commit_changes][Action] Staging files: {files}") repo.index.add(files) else: logger.info("[commit_changes][Action] Staging all changes") repo.git.add(A=True) repo.index.commit(message) logger.info(f"[commit_changes][Coherence:OK] Committed changes with message: {message}") # [/DEF:commit_changes:Function] # [DEF:push_changes:Function] # @PURPOSE: Push local commits to remote. # @PRE: Repository exists and has an 'origin' remote. # @POST: Local branch commits are pushed to origin. def push_changes(self, dashboard_id: int): with belief_scope("GitService.push_changes"): repo = self.get_repo(dashboard_id) # Ensure we have something to push if not repo.heads: logger.warning(f"[push_changes][Coherence:Failed] No local branches to push for dashboard {dashboard_id}") return try: origin = repo.remote(name='origin') except ValueError: logger.error(f"[push_changes][Coherence:Failed] Remote 'origin' not found for dashboard {dashboard_id}") raise HTTPException(status_code=400, detail="Remote 'origin' not configured") # Check if current branch has an upstream try: current_branch = repo.active_branch logger.info(f"[push_changes][Action] Pushing branch {current_branch.name} to origin") tracking_branch = None try: tracking_branch = current_branch.tracking_branch() except Exception: tracking_branch = None # First push for a new branch must set upstream, otherwise future pull fails. if tracking_branch is None: repo.git.push("--set-upstream", "origin", f"{current_branch.name}:{current_branch.name}") else: push_info = origin.push(refspec=f'{current_branch.name}:{current_branch.name}') for info in push_info: if info.flags & info.ERROR: logger.error(f"[push_changes][Coherence:Failed] Error pushing ref {info.remote_ref_string}: {info.summary}") raise Exception(f"Git push error for {info.remote_ref_string}: {info.summary}") except Exception as e: logger.error(f"[push_changes][Coherence:Failed] Failed to push changes: {e}") raise HTTPException(status_code=500, detail=f"Git push failed: {str(e)}") # [/DEF:push_changes:Function] # [DEF:pull_changes:Function] # @PURPOSE: Pull changes from remote. # @PRE: Repository exists and has an 'origin' remote. # @POST: Changes from origin are pulled and merged into the active branch. def pull_changes(self, dashboard_id: int): with belief_scope("GitService.pull_changes"): repo = self.get_repo(dashboard_id) try: origin = repo.remote(name='origin') current_branch = repo.active_branch.name remote_ref = f"origin/{current_branch}" has_remote_branch = any(ref.name == remote_ref for ref in repo.refs) if not has_remote_branch: raise HTTPException( status_code=409, detail=f"Remote branch '{current_branch}' does not exist yet. Push this branch first.", ) logger.info(f"[pull_changes][Action] Pulling changes from origin/{current_branch}") fetch_info = origin.pull(current_branch) for info in fetch_info: if info.flags & info.ERROR: logger.error(f"[pull_changes][Coherence:Failed] Error pulling ref {info.ref}: {info.note}") raise Exception(f"Git pull error for {info.ref}: {info.note}") except ValueError: logger.error(f"[pull_changes][Coherence:Failed] Remote 'origin' not found for dashboard {dashboard_id}") raise HTTPException(status_code=400, detail="Remote 'origin' not configured") except HTTPException: raise except Exception as e: logger.error(f"[pull_changes][Coherence:Failed] Failed to pull changes: {e}") raise HTTPException(status_code=500, detail=f"Git pull failed: {str(e)}") # [/DEF:pull_changes:Function] # [DEF:get_status:Function] # @PURPOSE: Get current repository status (dirty files, untracked, etc.) # @PRE: Repository for dashboard_id exists. # @POST: Returns a dictionary representing the Git status. # @RETURN: dict def get_status(self, dashboard_id: int) -> dict: with belief_scope("GitService.get_status"): repo = self.get_repo(dashboard_id) # Handle empty repository (no commits) has_commits = False try: repo.head.commit has_commits = True except (ValueError, Exception): has_commits = False current_branch = repo.active_branch.name tracking_branch = None has_upstream = False ahead_count = 0 behind_count = 0 try: tracking_branch = repo.active_branch.tracking_branch() has_upstream = tracking_branch is not None except Exception: tracking_branch = None has_upstream = False if has_upstream and tracking_branch is not None: try: # Commits present locally but not in upstream. ahead_count = sum( 1 for _ in repo.iter_commits(f"{tracking_branch.name}..{current_branch}") ) # Commits present in upstream but not local. behind_count = sum( 1 for _ in repo.iter_commits(f"{current_branch}..{tracking_branch.name}") ) except Exception: ahead_count = 0 behind_count = 0 is_dirty = repo.is_dirty(untracked_files=True) untracked_files = repo.untracked_files modified_files = [item.a_path for item in repo.index.diff(None)] staged_files = [item.a_path for item in repo.index.diff("HEAD")] if has_commits else [] is_diverged = ahead_count > 0 and behind_count > 0 if is_diverged: sync_state = "DIVERGED" elif behind_count > 0: sync_state = "BEHIND_REMOTE" elif ahead_count > 0: sync_state = "AHEAD_REMOTE" elif is_dirty or modified_files or staged_files or untracked_files: sync_state = "CHANGES" else: sync_state = "SYNCED" return { "is_dirty": is_dirty, "untracked_files": untracked_files, "modified_files": modified_files, "staged_files": staged_files, "current_branch": current_branch, "upstream_branch": tracking_branch.name if tracking_branch is not None else None, "has_upstream": has_upstream, "ahead_count": ahead_count, "behind_count": behind_count, "is_diverged": is_diverged, "sync_state": sync_state, } # [/DEF:get_status:Function] # [DEF:get_diff:Function] # @PURPOSE: Generate diff for a file or the whole repository. # @PARAM: file_path (str) - Optional specific file. # @PARAM: staged (bool) - Whether to show staged changes. # @PRE: Repository for dashboard_id exists. # @POST: Returns the diff text as a string. # @RETURN: str def get_diff(self, dashboard_id: int, file_path: str = None, staged: bool = False) -> str: with belief_scope("GitService.get_diff"): repo = self.get_repo(dashboard_id) diff_args = [] if staged: diff_args.append("--staged") if file_path: return repo.git.diff(*diff_args, "--", file_path) return repo.git.diff(*diff_args) # [/DEF:get_diff:Function] # [DEF:get_commit_history:Function] # @PURPOSE: Retrieve commit history for a repository. # @PARAM: limit (int) - Max number of commits to return. # @PRE: Repository for dashboard_id exists. # @POST: Returns a list of dictionaries for each commit in history. # @RETURN: List[dict] def get_commit_history(self, dashboard_id: int, limit: int = 50) -> List[dict]: with belief_scope("GitService.get_commit_history"): repo = self.get_repo(dashboard_id) commits = [] try: # Check if there are any commits at all if not repo.heads and not repo.remotes: return [] for commit in repo.iter_commits(max_count=limit): commits.append({ "hash": commit.hexsha, "author": commit.author.name, "email": commit.author.email, "timestamp": datetime.fromtimestamp(commit.committed_date), "message": commit.message.strip(), "files_changed": list(commit.stats.files.keys()) }) except Exception as e: logger.warning(f"[get_commit_history][Action] Could not retrieve commit history for dashboard {dashboard_id}: {e}") return [] return commits # [/DEF:get_commit_history:Function] # [DEF:test_connection:Function] # @PURPOSE: Test connection to Git provider using PAT. # @PARAM: provider (GitProvider) # @PARAM: url (str) # @PARAM: pat (str) # @PRE: provider is valid; url is a valid HTTP(S) URL; pat is provided. # @POST: Returns True if connection to the provider's API succeeds. # @RETURN: bool async def test_connection(self, provider: GitProvider, url: str, pat: str) -> bool: with belief_scope("GitService.test_connection"): # Check for offline mode or local-only URLs if ".local" in url or "localhost" in url: logger.info("[test_connection][Action] Local/Offline mode detected for URL") return True if not url.startswith(('http://', 'https://')): logger.error(f"[test_connection][Coherence:Failed] Invalid URL protocol: {url}") return False if not pat or not pat.strip(): logger.error("[test_connection][Coherence:Failed] Git PAT is missing or empty") return False pat = pat.strip() try: async with httpx.AsyncClient() as client: if provider == GitProvider.GITHUB: headers = {"Authorization": f"token {pat}"} api_url = "https://api.github.com/user" if "github.com" in url else f"{url.rstrip('/')}/api/v3/user" resp = await client.get(api_url, headers=headers) elif provider == GitProvider.GITLAB: headers = {"PRIVATE-TOKEN": pat} api_url = f"{url.rstrip('/')}/api/v4/user" resp = await client.get(api_url, headers=headers) elif provider == GitProvider.GITEA: headers = {"Authorization": f"token {pat}"} api_url = f"{url.rstrip('/')}/api/v1/user" resp = await client.get(api_url, headers=headers) else: return False if resp.status_code != 200: logger.error(f"[test_connection][Coherence:Failed] Git connection test failed for {provider} at {api_url}. Status: {resp.status_code}") return resp.status_code == 200 except Exception as e: logger.error(f"[test_connection][Coherence:Failed] Error testing git connection: {e}") return False # [/DEF:test_connection:Function] # [DEF:_normalize_git_server_url:Function] # @PURPOSE: Normalize Git server URL for provider API calls. # @PRE: raw_url is non-empty. # @POST: Returns URL without trailing slash. # @RETURN: str def _normalize_git_server_url(self, raw_url: str) -> str: normalized = (raw_url or "").strip() if not normalized: raise HTTPException(status_code=400, detail="Git server URL is required") return normalized.rstrip("/") # [/DEF:_normalize_git_server_url:Function] # [DEF:_gitea_headers:Function] # @PURPOSE: Build Gitea API authorization headers. # @PRE: pat is provided. # @POST: Returns headers with token auth. # @RETURN: Dict[str, str] def _gitea_headers(self, pat: str) -> Dict[str, str]: token = (pat or "").strip() if not token: raise HTTPException(status_code=400, detail="Git PAT is required for Gitea operations") return { "Authorization": f"token {token}", "Content-Type": "application/json", "Accept": "application/json", } # [/DEF:_gitea_headers:Function] # [DEF:_gitea_request:Function] # @PURPOSE: Execute HTTP request against Gitea API with stable error mapping. # @PRE: method and endpoint are valid. # @POST: Returns decoded JSON payload. # @RETURN: Any async def _gitea_request( self, method: str, server_url: str, pat: str, endpoint: str, payload: Optional[Dict[str, Any]] = None, ) -> Any: base_url = self._normalize_git_server_url(server_url) url = f"{base_url}/api/v1{endpoint}" headers = self._gitea_headers(pat) try: async with httpx.AsyncClient(timeout=20.0) as client: response = await client.request( method=method, url=url, headers=headers, json=payload, ) except Exception as e: logger.error(f"[gitea_request][Coherence:Failed] Network error: {e}") raise HTTPException(status_code=503, detail=f"Gitea API is unavailable: {str(e)}") if response.status_code >= 400: detail = response.text try: parsed = response.json() detail = parsed.get("message") or parsed.get("error") or detail except Exception: pass logger.error( f"[gitea_request][Coherence:Failed] method={method} endpoint={endpoint} status={response.status_code} detail={detail}" ) raise HTTPException( status_code=response.status_code, detail=f"Gitea API error: {detail}", ) if response.status_code == 204: return None return response.json() # [/DEF:_gitea_request:Function] # [DEF:get_gitea_current_user:Function] # @PURPOSE: Resolve current Gitea user for PAT. # @PRE: server_url and pat are valid. # @POST: Returns current username. # @RETURN: str async def get_gitea_current_user(self, server_url: str, pat: str) -> str: payload = await self._gitea_request("GET", server_url, pat, "/user") username = payload.get("login") or payload.get("username") if not username: raise HTTPException(status_code=500, detail="Failed to resolve Gitea username") return str(username) # [/DEF:get_gitea_current_user:Function] # [DEF:list_gitea_repositories:Function] # @PURPOSE: List repositories visible to authenticated Gitea user. # @PRE: server_url and pat are valid. # @POST: Returns repository list from Gitea. # @RETURN: List[dict] async def list_gitea_repositories(self, server_url: str, pat: str) -> List[dict]: payload = await self._gitea_request( "GET", server_url, pat, "/user/repos?limit=100&page=1", ) if not isinstance(payload, list): return [] return payload # [/DEF:list_gitea_repositories:Function] # [DEF:create_gitea_repository:Function] # @PURPOSE: Create repository in Gitea for authenticated user. # @PRE: name is non-empty and PAT has repo creation permission. # @POST: Returns created repository payload. # @RETURN: dict async def create_gitea_repository( self, server_url: str, pat: str, name: str, private: bool = True, description: Optional[str] = None, auto_init: bool = True, default_branch: Optional[str] = "main", ) -> Dict[str, Any]: payload = { "name": name, "private": bool(private), "auto_init": bool(auto_init), } if description: payload["description"] = description if default_branch: payload["default_branch"] = default_branch created = await self._gitea_request( "POST", server_url, pat, "/user/repos", payload=payload, ) if not isinstance(created, dict): raise HTTPException(status_code=500, detail="Unexpected Gitea response while creating repository") return created # [/DEF:create_gitea_repository:Function] # [DEF:delete_gitea_repository:Function] # @PURPOSE: Delete repository in Gitea. # @PRE: owner and repo_name are non-empty. # @POST: Repository deleted on Gitea server. async def delete_gitea_repository( self, server_url: str, pat: str, owner: str, repo_name: str, ) -> None: if not owner or not repo_name: raise HTTPException(status_code=400, detail="owner and repo_name are required") await self._gitea_request( "DELETE", server_url, pat, f"/repos/{owner}/{repo_name}", ) # [/DEF:delete_gitea_repository:Function] # [DEF:create_github_repository:Function] # @PURPOSE: Create repository in GitHub or GitHub Enterprise. # @PRE: PAT has repository create permission. # @POST: Returns created repository payload. # @RETURN: dict async def create_github_repository( self, server_url: str, pat: str, name: str, private: bool = True, description: Optional[str] = None, auto_init: bool = True, default_branch: Optional[str] = "main", ) -> Dict[str, Any]: base_url = self._normalize_git_server_url(server_url) if "github.com" in base_url: api_url = "https://api.github.com/user/repos" else: api_url = f"{base_url}/api/v3/user/repos" headers = { "Authorization": f"token {pat.strip()}", "Content-Type": "application/json", "Accept": "application/vnd.github+json", } payload: Dict[str, Any] = { "name": name, "private": bool(private), "auto_init": bool(auto_init), } if description: payload["description"] = description # GitHub API does not reliably support setting default branch on create without template/import. if default_branch: payload["default_branch"] = default_branch try: async with httpx.AsyncClient(timeout=20.0) as client: response = await client.post(api_url, headers=headers, json=payload) except Exception as e: raise HTTPException(status_code=503, detail=f"GitHub API is unavailable: {str(e)}") if response.status_code >= 400: detail = response.text try: parsed = response.json() detail = parsed.get("message") or detail except Exception: pass raise HTTPException(status_code=response.status_code, detail=f"GitHub API error: {detail}") return response.json() # [/DEF:create_github_repository:Function] # [DEF:create_gitlab_repository:Function] # @PURPOSE: Create repository(project) in GitLab. # @PRE: PAT has api scope. # @POST: Returns created repository payload. # @RETURN: dict async def create_gitlab_repository( self, server_url: str, pat: str, name: str, private: bool = True, description: Optional[str] = None, auto_init: bool = True, default_branch: Optional[str] = "main", ) -> Dict[str, Any]: base_url = self._normalize_git_server_url(server_url) api_url = f"{base_url}/api/v4/projects" headers = { "PRIVATE-TOKEN": pat.strip(), "Content-Type": "application/json", "Accept": "application/json", } payload: Dict[str, Any] = { "name": name, "visibility": "private" if private else "public", "initialize_with_readme": bool(auto_init), } if description: payload["description"] = description if default_branch: payload["default_branch"] = default_branch try: async with httpx.AsyncClient(timeout=20.0) as client: response = await client.post(api_url, headers=headers, json=payload) except Exception as e: raise HTTPException(status_code=503, detail=f"GitLab API is unavailable: {str(e)}") if response.status_code >= 400: detail = response.text try: parsed = response.json() if isinstance(parsed, dict): detail = parsed.get("message") or detail except Exception: pass raise HTTPException(status_code=response.status_code, detail=f"GitLab API error: {detail}") data = response.json() # Normalize clone URL key to keep route response stable. if "clone_url" not in data: data["clone_url"] = data.get("http_url_to_repo") if "html_url" not in data: data["html_url"] = data.get("web_url") if "ssh_url" not in data: data["ssh_url"] = data.get("ssh_url_to_repo") if "full_name" not in data: data["full_name"] = data.get("path_with_namespace") or data.get("name") return data # [/DEF:create_gitlab_repository:Function] # [DEF:_parse_remote_repo_identity:Function] # @PURPOSE: Parse owner/repo from remote URL for Git server API operations. # @PRE: remote_url is a valid git URL. # @POST: Returns owner/repo tokens. # @RETURN: Dict[str, str] def _parse_remote_repo_identity(self, remote_url: str) -> Dict[str, str]: normalized = str(remote_url or "").strip() if not normalized: raise HTTPException(status_code=400, detail="Repository remote_url is empty") if normalized.startswith("git@"): # git@host:owner/repo.git path = normalized.split(":", 1)[1] if ":" in normalized else "" else: parsed = urlparse(normalized) path = parsed.path or "" path = path.strip("/") if path.endswith(".git"): path = path[:-4] parts = [segment for segment in path.split("/") if segment] if len(parts) < 2: raise HTTPException(status_code=400, detail=f"Cannot parse repository owner/name from remote URL: {remote_url}") owner = parts[0] repo = parts[-1] namespace = "/".join(parts[:-1]) return { "owner": owner, "repo": repo, "namespace": namespace, "full_name": f"{namespace}/{repo}", } # [/DEF:_parse_remote_repo_identity:Function] # [DEF:promote_direct_merge:Function] # @PURPOSE: Perform direct merge between branches in local repo and push target branch. # @PRE: Repository exists and both branches are valid. # @POST: Target branch contains merged changes from source branch. # @RETURN: Dict[str, Any] def promote_direct_merge( self, dashboard_id: int, from_branch: str, to_branch: str, ) -> Dict[str, Any]: with belief_scope("GitService.promote_direct_merge"): if not from_branch or not to_branch: raise HTTPException(status_code=400, detail="from_branch and to_branch are required") repo = self.get_repo(dashboard_id) source = from_branch.strip() target = to_branch.strip() if source == target: raise HTTPException(status_code=400, detail="from_branch and to_branch must be different") try: origin = repo.remote(name="origin") except ValueError: raise HTTPException(status_code=400, detail="Remote 'origin' not configured") try: origin.fetch() # Ensure local source branch exists. if source not in [head.name for head in repo.heads]: if f"origin/{source}" in [ref.name for ref in repo.refs]: repo.git.checkout("-b", source, f"origin/{source}") else: raise HTTPException(status_code=404, detail=f"Source branch '{source}' not found") # Ensure local target branch exists and is checked out. if target in [head.name for head in repo.heads]: repo.git.checkout(target) elif f"origin/{target}" in [ref.name for ref in repo.refs]: repo.git.checkout("-b", target, f"origin/{target}") else: raise HTTPException(status_code=404, detail=f"Target branch '{target}' not found") # Bring target up to date and merge source into target. try: origin.pull(target) except Exception: pass repo.git.merge(source, "--no-ff", "-m", f"chore(flow): promote {source} -> {target}") origin.push(refspec=f"{target}:{target}") except HTTPException: raise except Exception as e: message = str(e) if "CONFLICT" in message.upper(): raise HTTPException(status_code=409, detail=f"Merge conflict during direct promote: {message}") raise HTTPException(status_code=500, detail=f"Direct promote failed: {message}") return { "mode": "direct", "from_branch": source, "to_branch": target, "status": "merged", } # [/DEF:promote_direct_merge:Function] # [DEF:create_gitea_pull_request:Function] # @PURPOSE: Create pull request in Gitea. # @PRE: Config and remote URL are valid. # @POST: Returns normalized PR metadata. # @RETURN: Dict[str, Any] async def create_gitea_pull_request( self, server_url: str, pat: str, remote_url: str, from_branch: str, to_branch: str, title: str, description: Optional[str] = None, ) -> Dict[str, Any]: identity = self._parse_remote_repo_identity(remote_url) payload = { "title": title, "head": from_branch, "base": to_branch, "body": description or "", } data = await self._gitea_request( "POST", server_url, pat, f"/repos/{identity['namespace']}/{identity['repo']}/pulls", payload=payload, ) return { "id": data.get("number") or data.get("id"), "url": data.get("html_url") or data.get("url"), "status": data.get("state") or "open", } # [/DEF:create_gitea_pull_request:Function] # [DEF:create_github_pull_request:Function] # @PURPOSE: Create pull request in GitHub or GitHub Enterprise. # @PRE: Config and remote URL are valid. # @POST: Returns normalized PR metadata. # @RETURN: Dict[str, Any] async def create_github_pull_request( self, server_url: str, pat: str, remote_url: str, from_branch: str, to_branch: str, title: str, description: Optional[str] = None, draft: bool = False, ) -> Dict[str, Any]: identity = self._parse_remote_repo_identity(remote_url) base_url = self._normalize_git_server_url(server_url) if "github.com" in base_url: api_url = f"https://api.github.com/repos/{identity['namespace']}/{identity['repo']}/pulls" else: api_url = f"{base_url}/api/v3/repos/{identity['namespace']}/{identity['repo']}/pulls" headers = { "Authorization": f"token {pat.strip()}", "Content-Type": "application/json", "Accept": "application/vnd.github+json", } payload = { "title": title, "head": from_branch, "base": to_branch, "body": description or "", "draft": bool(draft), } try: async with httpx.AsyncClient(timeout=20.0) as client: response = await client.post(api_url, headers=headers, json=payload) except Exception as e: raise HTTPException(status_code=503, detail=f"GitHub API is unavailable: {str(e)}") if response.status_code >= 400: detail = response.text try: detail = response.json().get("message") or detail except Exception: pass raise HTTPException(status_code=response.status_code, detail=f"GitHub API error: {detail}") data = response.json() return { "id": data.get("number") or data.get("id"), "url": data.get("html_url") or data.get("url"), "status": data.get("state") or "open", } # [/DEF:create_github_pull_request:Function] # [DEF:create_gitlab_merge_request:Function] # @PURPOSE: Create merge request in GitLab. # @PRE: Config and remote URL are valid. # @POST: Returns normalized MR metadata. # @RETURN: Dict[str, Any] async def create_gitlab_merge_request( self, server_url: str, pat: str, remote_url: str, from_branch: str, to_branch: str, title: str, description: Optional[str] = None, remove_source_branch: bool = False, ) -> Dict[str, Any]: identity = self._parse_remote_repo_identity(remote_url) base_url = self._normalize_git_server_url(server_url) project_id = quote(identity["full_name"], safe="") api_url = f"{base_url}/api/v4/projects/{project_id}/merge_requests" headers = { "PRIVATE-TOKEN": pat.strip(), "Content-Type": "application/json", "Accept": "application/json", } payload = { "source_branch": from_branch, "target_branch": to_branch, "title": title, "description": description or "", "remove_source_branch": bool(remove_source_branch), } try: async with httpx.AsyncClient(timeout=20.0) as client: response = await client.post(api_url, headers=headers, json=payload) except Exception as e: raise HTTPException(status_code=503, detail=f"GitLab API is unavailable: {str(e)}") if response.status_code >= 400: detail = response.text try: parsed = response.json() if isinstance(parsed, dict): detail = parsed.get("message") or detail except Exception: pass raise HTTPException(status_code=response.status_code, detail=f"GitLab API error: {detail}") data = response.json() return { "id": data.get("iid") or data.get("id"), "url": data.get("web_url") or data.get("url"), "status": data.get("state") or "opened", } # [/DEF:create_gitlab_merge_request:Function] # [/DEF:GitService:Class] # [/DEF:backend.src.services.git_service:Module]