# [DEF:backend.src.services.git_service:Module] # # @SEMANTICS: git, service, gitpython, repository, version_control # @PURPOSE: Core Git logic using GitPython to manage dashboard repositories. # @LAYER: Service # @RELATION: INHERITS_FROM -> None # @RELATION: USED_BY -> src.api.routes.git # @RELATION: USED_BY -> src.plugins.git_plugin # # @INVARIANT: All Git operations must be performed on a valid local directory. import os import httpx from git import Repo from fastapi import HTTPException from typing import List from datetime import datetime from src.core.logger import logger, belief_scope from src.models.git import GitProvider # [DEF:GitService:Class] # @PURPOSE: Wrapper for GitPython operations with semantic logging and error handling. class GitService: """ Wrapper for GitPython operations. """ # [DEF:__init__:Function] # @PURPOSE: Initializes the GitService with a base path for repositories. # @PARAM: base_path (str) - Root directory for all Git clones. # @PRE: base_path is a valid string path. # @POST: GitService is initialized; base_path directory exists. def __init__(self, base_path: str = "git_repos"): with belief_scope("GitService.__init__"): # Resolve relative to the backend directory # Path(__file__) is backend/src/services/git_service.py # parents[2] is backend/ from pathlib import Path backend_root = Path(__file__).parents[2] self.base_path = str((backend_root / base_path).resolve()) if not os.path.exists(self.base_path): os.makedirs(self.base_path) # [/DEF:__init__:Function] # [DEF:_get_repo_path:Function] # @PURPOSE: Resolves the local filesystem path for a dashboard's repository. # @PARAM: dashboard_id (int) # @PRE: dashboard_id is an integer. # @POST: Returns the absolute or relative path to the dashboard's repo. # @RETURN: str def _get_repo_path(self, dashboard_id: int) -> str: with belief_scope("GitService._get_repo_path"): return os.path.join(self.base_path, str(dashboard_id)) # [/DEF:_get_repo_path:Function] # [DEF:init_repo:Function] # @PURPOSE: Initialize or clone a repository for a dashboard. # @PARAM: dashboard_id (int) # @PARAM: remote_url (str) # @PARAM: pat (str) - Personal Access Token for authentication. # @PRE: dashboard_id is int, remote_url is valid Git URL, pat is provided. # @POST: Repository is cloned or opened at the local path. # @RETURN: Repo - GitPython Repo object. def init_repo(self, dashboard_id: int, remote_url: str, pat: str) -> Repo: with belief_scope("GitService.init_repo"): repo_path = self._get_repo_path(dashboard_id) # Inject PAT into remote URL if needed if pat and "://" in remote_url: proto, rest = remote_url.split("://", 1) auth_url = f"{proto}://oauth2:{pat}@{rest}" else: auth_url = remote_url if os.path.exists(repo_path): logger.info(f"[init_repo][Action] Opening existing repo at {repo_path}") return Repo(repo_path) logger.info(f"[init_repo][Action] Cloning {remote_url} to {repo_path}") return Repo.clone_from(auth_url, repo_path) # [/DEF:init_repo:Function] # [DEF:get_repo:Function] # @PURPOSE: Get Repo object for a dashboard. # @PRE: Repository must exist on disk for the given dashboard_id. # @POST: Returns a GitPython Repo instance for the dashboard. # @RETURN: Repo def get_repo(self, dashboard_id: int) -> Repo: with belief_scope("GitService.get_repo"): repo_path = self._get_repo_path(dashboard_id) if not os.path.exists(repo_path): logger.error(f"[get_repo][Coherence:Failed] Repository for dashboard {dashboard_id} does not exist") raise HTTPException(status_code=404, detail=f"Repository for dashboard {dashboard_id} not found") try: return Repo(repo_path) except Exception as e: logger.error(f"[get_repo][Coherence:Failed] Failed to open repository at {repo_path}: {e}") raise HTTPException(status_code=500, detail="Failed to open local Git repository") # [/DEF:get_repo:Function] # [DEF:list_branches:Function] # @PURPOSE: List all branches for a dashboard's repository. # @PRE: Repository for dashboard_id exists. # @POST: Returns a list of branch metadata dictionaries. # @RETURN: List[dict] def list_branches(self, dashboard_id: int) -> List[dict]: with belief_scope("GitService.list_branches"): repo = self.get_repo(dashboard_id) logger.info(f"[list_branches][Action] Listing branches for {dashboard_id}. Refs: {repo.refs}") branches = [] # Add existing refs for ref in repo.refs: try: # Strip prefixes for UI name = ref.name.replace('refs/heads/', '').replace('refs/remotes/origin/', '') # Avoid duplicates (e.g. local and remote with same name) if any(b['name'] == name for b in branches): continue branches.append({ "name": name, "commit_hash": ref.commit.hexsha if hasattr(ref, 'commit') else "0000000", "is_remote": ref.is_remote() if hasattr(ref, 'is_remote') else False, "last_updated": datetime.fromtimestamp(ref.commit.committed_date) if hasattr(ref, 'commit') else datetime.utcnow() }) except Exception as e: logger.warning(f"[list_branches][Action] Skipping ref {ref}: {e}") # Ensure the current active branch is in the list even if it has no commits or refs try: active_name = repo.active_branch.name if not any(b['name'] == active_name for b in branches): branches.append({ "name": active_name, "commit_hash": "0000000", "is_remote": False, "last_updated": datetime.utcnow() }) except Exception as e: logger.warning(f"[list_branches][Action] Could not determine active branch: {e}") # If everything else failed and list is still empty, add default if not branches: branches.append({ "name": "main", "commit_hash": "0000000", "is_remote": False, "last_updated": datetime.utcnow() }) return branches # [/DEF:list_branches:Function] # [DEF:create_branch:Function] # @PURPOSE: Create a new branch from an existing one. # @PARAM: name (str) - New branch name. # @PARAM: from_branch (str) - Source branch. # @PRE: Repository exists; name is valid; from_branch exists or repo is empty. # @POST: A new branch is created in the repository. def create_branch(self, dashboard_id: int, name: str, from_branch: str = "main"): with belief_scope("GitService.create_branch"): repo = self.get_repo(dashboard_id) logger.info(f"[create_branch][Action] Creating branch {name} from {from_branch}") # Handle empty repository case (no commits) if not repo.heads and not repo.remotes: logger.warning("[create_branch][Action] Repository is empty. Creating initial commit to enable branching.") readme_path = os.path.join(repo.working_dir, "README.md") if not os.path.exists(readme_path): with open(readme_path, "w") as f: f.write(f"# Dashboard {dashboard_id}\nGit repository for Superset dashboard integration.") repo.index.add(["README.md"]) repo.index.commit("Initial commit") # Verify source branch exists try: repo.commit(from_branch) except Exception: logger.warning(f"[create_branch][Action] Source branch {from_branch} not found, using HEAD") from_branch = repo.head try: new_branch = repo.create_head(name, from_branch) return new_branch except Exception as e: logger.error(f"[create_branch][Coherence:Failed] {e}") raise # [/DEF:create_branch:Function] # [DEF:checkout_branch:Function] # @PURPOSE: Switch to a specific branch. # @PRE: Repository exists and the specified branch name exists. # @POST: The repository working directory is updated to the specified branch. def checkout_branch(self, dashboard_id: int, name: str): with belief_scope("GitService.checkout_branch"): repo = self.get_repo(dashboard_id) logger.info(f"[checkout_branch][Action] Checking out branch {name}") repo.git.checkout(name) # [/DEF:checkout_branch:Function] # [DEF:commit_changes:Function] # @PURPOSE: Stage and commit changes. # @PARAM: message (str) - Commit message. # @PARAM: files (List[str]) - Optional list of specific files to stage. # @PRE: Repository exists and has changes (dirty) or files are specified. # @POST: Changes are staged and a new commit is created. def commit_changes(self, dashboard_id: int, message: str, files: List[str] = None): with belief_scope("GitService.commit_changes"): repo = self.get_repo(dashboard_id) # Check if there are any changes to commit if not repo.is_dirty(untracked_files=True) and not files: logger.info(f"[commit_changes][Action] No changes to commit for dashboard {dashboard_id}") return if files: logger.info(f"[commit_changes][Action] Staging files: {files}") repo.index.add(files) else: logger.info("[commit_changes][Action] Staging all changes") repo.git.add(A=True) repo.index.commit(message) logger.info(f"[commit_changes][Coherence:OK] Committed changes with message: {message}") # [/DEF:commit_changes:Function] # [DEF:push_changes:Function] # @PURPOSE: Push local commits to remote. # @PRE: Repository exists and has an 'origin' remote. # @POST: Local branch commits are pushed to origin. def push_changes(self, dashboard_id: int): with belief_scope("GitService.push_changes"): repo = self.get_repo(dashboard_id) # Ensure we have something to push if not repo.heads: logger.warning(f"[push_changes][Coherence:Failed] No local branches to push for dashboard {dashboard_id}") return try: origin = repo.remote(name='origin') except ValueError: logger.error(f"[push_changes][Coherence:Failed] Remote 'origin' not found for dashboard {dashboard_id}") raise HTTPException(status_code=400, detail="Remote 'origin' not configured") # Check if current branch has an upstream try: current_branch = repo.active_branch logger.info(f"[push_changes][Action] Pushing branch {current_branch.name} to origin") # Using a timeout for network operations push_info = origin.push(refspec=f'{current_branch.name}:{current_branch.name}') for info in push_info: if info.flags & info.ERROR: logger.error(f"[push_changes][Coherence:Failed] Error pushing ref {info.remote_ref_string}: {info.summary}") raise Exception(f"Git push error for {info.remote_ref_string}: {info.summary}") except Exception as e: logger.error(f"[push_changes][Coherence:Failed] Failed to push changes: {e}") raise HTTPException(status_code=500, detail=f"Git push failed: {str(e)}") # [/DEF:push_changes:Function] # [DEF:pull_changes:Function] # @PURPOSE: Pull changes from remote. # @PRE: Repository exists and has an 'origin' remote. # @POST: Changes from origin are pulled and merged into the active branch. def pull_changes(self, dashboard_id: int): with belief_scope("GitService.pull_changes"): repo = self.get_repo(dashboard_id) try: origin = repo.remote(name='origin') logger.info("[pull_changes][Action] Pulling changes from origin") fetch_info = origin.pull() for info in fetch_info: if info.flags & info.ERROR: logger.error(f"[pull_changes][Coherence:Failed] Error pulling ref {info.ref}: {info.note}") raise Exception(f"Git pull error for {info.ref}: {info.note}") except ValueError: logger.error(f"[pull_changes][Coherence:Failed] Remote 'origin' not found for dashboard {dashboard_id}") raise HTTPException(status_code=400, detail="Remote 'origin' not configured") except Exception as e: logger.error(f"[pull_changes][Coherence:Failed] Failed to pull changes: {e}") raise HTTPException(status_code=500, detail=f"Git pull failed: {str(e)}") # [/DEF:pull_changes:Function] # [DEF:get_status:Function] # @PURPOSE: Get current repository status (dirty files, untracked, etc.) # @PRE: Repository for dashboard_id exists. # @POST: Returns a dictionary representing the Git status. # @RETURN: dict def get_status(self, dashboard_id: int) -> dict: with belief_scope("GitService.get_status"): repo = self.get_repo(dashboard_id) # Handle empty repository (no commits) has_commits = False try: repo.head.commit has_commits = True except (ValueError, Exception): has_commits = False return { "is_dirty": repo.is_dirty(untracked_files=True), "untracked_files": repo.untracked_files, "modified_files": [item.a_path for item in repo.index.diff(None)], "staged_files": [item.a_path for item in repo.index.diff("HEAD")] if has_commits else [], "current_branch": repo.active_branch.name } # [/DEF:get_status:Function] # [DEF:get_diff:Function] # @PURPOSE: Generate diff for a file or the whole repository. # @PARAM: file_path (str) - Optional specific file. # @PARAM: staged (bool) - Whether to show staged changes. # @PRE: Repository for dashboard_id exists. # @POST: Returns the diff text as a string. # @RETURN: str def get_diff(self, dashboard_id: int, file_path: str = None, staged: bool = False) -> str: with belief_scope("GitService.get_diff"): repo = self.get_repo(dashboard_id) diff_args = [] if staged: diff_args.append("--staged") if file_path: return repo.git.diff(*diff_args, "--", file_path) return repo.git.diff(*diff_args) # [/DEF:get_diff:Function] # [DEF:get_commit_history:Function] # @PURPOSE: Retrieve commit history for a repository. # @PARAM: limit (int) - Max number of commits to return. # @PRE: Repository for dashboard_id exists. # @POST: Returns a list of dictionaries for each commit in history. # @RETURN: List[dict] def get_commit_history(self, dashboard_id: int, limit: int = 50) -> List[dict]: with belief_scope("GitService.get_commit_history"): repo = self.get_repo(dashboard_id) commits = [] try: # Check if there are any commits at all if not repo.heads and not repo.remotes: return [] for commit in repo.iter_commits(max_count=limit): commits.append({ "hash": commit.hexsha, "author": commit.author.name, "email": commit.author.email, "timestamp": datetime.fromtimestamp(commit.committed_date), "message": commit.message.strip(), "files_changed": list(commit.stats.files.keys()) }) except Exception as e: logger.warning(f"[get_commit_history][Action] Could not retrieve commit history for dashboard {dashboard_id}: {e}") return [] return commits # [/DEF:get_commit_history:Function] # [DEF:test_connection:Function] # @PURPOSE: Test connection to Git provider using PAT. # @PARAM: provider (GitProvider) # @PARAM: url (str) # @PARAM: pat (str) # @PRE: provider is valid; url is a valid HTTP(S) URL; pat is provided. # @POST: Returns True if connection to the provider's API succeeds. # @RETURN: bool async def test_connection(self, provider: GitProvider, url: str, pat: str) -> bool: with belief_scope("GitService.test_connection"): # Check for offline mode or local-only URLs if ".local" in url or "localhost" in url: logger.info("[test_connection][Action] Local/Offline mode detected for URL") return True if not url.startswith(('http://', 'https://')): logger.error(f"[test_connection][Coherence:Failed] Invalid URL protocol: {url}") return False if not pat or not pat.strip(): logger.error("[test_connection][Coherence:Failed] Git PAT is missing or empty") return False pat = pat.strip() try: async with httpx.AsyncClient() as client: if provider == GitProvider.GITHUB: headers = {"Authorization": f"token {pat}"} api_url = "https://api.github.com/user" if "github.com" in url else f"{url.rstrip('/')}/api/v3/user" resp = await client.get(api_url, headers=headers) elif provider == GitProvider.GITLAB: headers = {"PRIVATE-TOKEN": pat} api_url = f"{url.rstrip('/')}/api/v4/user" resp = await client.get(api_url, headers=headers) elif provider == GitProvider.GITEA: headers = {"Authorization": f"token {pat}"} api_url = f"{url.rstrip('/')}/api/v1/user" resp = await client.get(api_url, headers=headers) else: return False if resp.status_code != 200: logger.error(f"[test_connection][Coherence:Failed] Git connection test failed for {provider} at {api_url}. Status: {resp.status_code}") return resp.status_code == 200 except Exception as e: logger.error(f"[test_connection][Coherence:Failed] Error testing git connection: {e}") return False # [/DEF:test_connection:Function] # [/DEF:GitService:Class] # [/DEF:backend.src.services.git_service:Module]