344 lines
15 KiB
Python
344 lines
15 KiB
Python
# [DEF:StoragePlugin:Module]
|
|
#
|
|
# @SEMANTICS: storage, files, filesystem, plugin
|
|
# @PURPOSE: Provides core filesystem operations for managing backups and repositories.
|
|
# @LAYER: App
|
|
# @RELATION: IMPLEMENTS -> PluginBase
|
|
# @RELATION: DEPENDS_ON -> backend.src.models.storage
|
|
# @RELATION: USES -> TaskContext
|
|
#
|
|
# @INVARIANT: All file operations must be restricted to the configured storage root.
|
|
|
|
# [SECTION: IMPORTS]
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import Dict, Any, List, Optional
|
|
from fastapi import UploadFile
|
|
|
|
from ...core.plugin_base import PluginBase
|
|
from ...core.logger import belief_scope, logger
|
|
from ...models.storage import StoredFile, FileCategory
|
|
from ...dependencies import get_config_manager
|
|
from ...core.task_manager.context import TaskContext
|
|
# [/SECTION]
|
|
|
|
# [DEF:StoragePlugin:Class]
|
|
# @PURPOSE: Implementation of the storage management plugin.
|
|
class StoragePlugin(PluginBase):
|
|
"""
|
|
Plugin for managing local file storage for backups and repositories.
|
|
"""
|
|
|
|
# [DEF:__init__:Function]
|
|
# @PURPOSE: Initializes the StoragePlugin and ensures required directories exist.
|
|
# @PRE: Configuration manager must be accessible.
|
|
# @POST: Storage root and category directories are created on disk.
|
|
def __init__(self):
|
|
with belief_scope("StoragePlugin:init"):
|
|
self.ensure_directories()
|
|
# [/DEF:__init__:Function]
|
|
|
|
@property
|
|
# [DEF:id:Function]
|
|
# @PURPOSE: Returns the unique identifier for the storage plugin.
|
|
# @PRE: None.
|
|
# @POST: Returns the plugin ID string.
|
|
# @RETURN: str - "storage-manager"
|
|
def id(self) -> str:
|
|
with belief_scope("StoragePlugin:id"):
|
|
return "storage-manager"
|
|
# [/DEF:id:Function]
|
|
|
|
@property
|
|
# [DEF:name:Function]
|
|
# @PURPOSE: Returns the human-readable name of the storage plugin.
|
|
# @PRE: None.
|
|
# @POST: Returns the plugin name string.
|
|
# @RETURN: str - "Storage Manager"
|
|
def name(self) -> str:
|
|
with belief_scope("StoragePlugin:name"):
|
|
return "Storage Manager"
|
|
# [/DEF:name:Function]
|
|
|
|
@property
|
|
# [DEF:description:Function]
|
|
# @PURPOSE: Returns a description of the storage plugin.
|
|
# @PRE: None.
|
|
# @POST: Returns the plugin description string.
|
|
# @RETURN: str - Plugin description.
|
|
def description(self) -> str:
|
|
with belief_scope("StoragePlugin:description"):
|
|
return "Manages local file storage for backups and repositories."
|
|
# [/DEF:description:Function]
|
|
|
|
@property
|
|
# [DEF:version:Function]
|
|
# @PURPOSE: Returns the version of the storage plugin.
|
|
# @PRE: None.
|
|
# @POST: Returns the version string.
|
|
# @RETURN: str - "1.0.0"
|
|
def version(self) -> str:
|
|
with belief_scope("StoragePlugin:version"):
|
|
return "1.0.0"
|
|
# [/DEF:version:Function]
|
|
|
|
@property
|
|
# [DEF:ui_route:Function]
|
|
# @PURPOSE: Returns the frontend route for the storage plugin.
|
|
# @RETURN: str - "/tools/storage"
|
|
def ui_route(self) -> str:
|
|
with belief_scope("StoragePlugin:ui_route"):
|
|
return "/tools/storage"
|
|
# [/DEF:ui_route:Function]
|
|
|
|
# [DEF:get_schema:Function]
|
|
# @PURPOSE: Returns the JSON schema for storage plugin parameters.
|
|
# @PRE: None.
|
|
# @POST: Returns a dictionary representing the JSON schema.
|
|
# @RETURN: Dict[str, Any] - JSON schema.
|
|
def get_schema(self) -> Dict[str, Any]:
|
|
with belief_scope("StoragePlugin:get_schema"):
|
|
return {
|
|
"type": "object",
|
|
"properties": {
|
|
"category": {
|
|
"type": "string",
|
|
"enum": [c.value for c in FileCategory],
|
|
"title": "Category"
|
|
}
|
|
},
|
|
"required": ["category"]
|
|
}
|
|
# [/DEF:get_schema:Function]
|
|
|
|
# [DEF:execute:Function]
|
|
# @PURPOSE: Executes storage-related tasks with TaskContext support.
|
|
# @PARAM: params (Dict[str, Any]) - Storage parameters.
|
|
# @PARAM: context (Optional[TaskContext]) - Task context for logging with source attribution.
|
|
# @PRE: params must match the plugin schema.
|
|
# @POST: Task is executed and logged.
|
|
async def execute(self, params: Dict[str, Any], context: Optional[TaskContext] = None):
|
|
with belief_scope("StoragePlugin:execute"):
|
|
# Use TaskContext logger if available, otherwise fall back to app logger
|
|
log = context.logger if context else logger
|
|
|
|
# Create sub-loggers for different components
|
|
storage_log = log.with_source("storage") if context else log
|
|
log.with_source("filesystem") if context else log
|
|
|
|
storage_log.info(f"Executing with params: {params}")
|
|
# [/DEF:execute:Function]
|
|
|
|
# [DEF:get_storage_root:Function]
|
|
# @PURPOSE: Resolves the absolute path to the storage root.
|
|
# @PRE: Settings must define a storage root path.
|
|
# @POST: Returns a Path object representing the storage root.
|
|
def get_storage_root(self) -> Path:
|
|
with belief_scope("StoragePlugin:get_storage_root"):
|
|
config_manager = get_config_manager()
|
|
global_settings = config_manager.get_config().settings
|
|
|
|
# Use storage.root_path as the source of truth for storage UI
|
|
root = Path(global_settings.storage.root_path)
|
|
|
|
if not root.is_absolute():
|
|
# Resolve relative to the backend directory
|
|
# Path(__file__) is backend/src/plugins/storage/plugin.py
|
|
# parents[3] is the project root (ss-tools)
|
|
# We need to ensure it's relative to where backend/ is
|
|
project_root = Path(__file__).parents[3]
|
|
root = (project_root / root).resolve()
|
|
return root
|
|
# [/DEF:get_storage_root:Function]
|
|
|
|
# [DEF:resolve_path:Function]
|
|
# @PURPOSE: Resolves a dynamic path pattern using provided variables.
|
|
# @PARAM: pattern (str) - The path pattern to resolve.
|
|
# @PARAM: variables (Dict[str, str]) - Variables to substitute in the pattern.
|
|
# @PRE: pattern must be a valid format string.
|
|
# @POST: Returns the resolved path string.
|
|
# @RETURN: str - The resolved path.
|
|
def resolve_path(self, pattern: str, variables: Dict[str, str]) -> str:
|
|
with belief_scope("StoragePlugin:resolve_path"):
|
|
# Add common variables
|
|
vars_with_defaults = {
|
|
"timestamp": datetime.now().strftime("%Y%m%dT%H%M%S"),
|
|
**variables
|
|
}
|
|
try:
|
|
resolved = pattern.format(**vars_with_defaults)
|
|
# Clean up any double slashes or leading/trailing slashes for relative path
|
|
return os.path.normpath(resolved).strip("/")
|
|
except KeyError as e:
|
|
logger.warning(f"[StoragePlugin][Coherence:Failed] Missing variable for path resolution: {e}")
|
|
# Fallback to literal pattern if formatting fails partially (or handle as needed)
|
|
return pattern.replace("{", "").replace("}", "")
|
|
# [/DEF:resolve_path:Function]
|
|
|
|
# [DEF:ensure_directories:Function]
|
|
# @PURPOSE: Creates the storage root and category subdirectories if they don't exist.
|
|
# @PRE: Storage root must be resolvable.
|
|
# @POST: Directories are created on the filesystem.
|
|
# @SIDE_EFFECT: Creates directories on the filesystem.
|
|
def ensure_directories(self):
|
|
with belief_scope("StoragePlugin:ensure_directories"):
|
|
root = self.get_storage_root()
|
|
for category in FileCategory:
|
|
# Use singular name for consistency with BackupPlugin and GitService
|
|
path = root / category.value
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
logger.debug(f"[StoragePlugin][Action] Ensured directory: {path}")
|
|
# [/DEF:ensure_directories:Function]
|
|
|
|
# [DEF:validate_path:Function]
|
|
# @PURPOSE: Prevents path traversal attacks by ensuring the path is within the storage root.
|
|
# @PRE: path must be a Path object.
|
|
# @POST: Returns the resolved absolute path if valid, otherwise raises ValueError.
|
|
def validate_path(self, path: Path) -> Path:
|
|
with belief_scope("StoragePlugin:validate_path"):
|
|
root = self.get_storage_root().resolve()
|
|
resolved = path.resolve()
|
|
try:
|
|
resolved.relative_to(root)
|
|
except ValueError:
|
|
logger.error(f"[StoragePlugin][Coherence:Failed] Path traversal detected: {resolved} is not under {root}")
|
|
raise ValueError("Access denied: Path is outside of storage root.")
|
|
return resolved
|
|
# [/DEF:validate_path:Function]
|
|
|
|
# [DEF:list_files:Function]
|
|
# @PURPOSE: Lists all files and directories in a specific category and subpath.
|
|
# @PARAM: category (Optional[FileCategory]) - The category to list.
|
|
# @PARAM: subpath (Optional[str]) - Nested path within the category.
|
|
# @PRE: Storage root must exist.
|
|
# @POST: Returns a list of StoredFile objects.
|
|
# @RETURN: List[StoredFile] - List of file and directory metadata objects.
|
|
def list_files(self, category: Optional[FileCategory] = None, subpath: Optional[str] = None) -> List[StoredFile]:
|
|
with belief_scope("StoragePlugin:list_files"):
|
|
root = self.get_storage_root()
|
|
logger.info(f"[StoragePlugin][Action] Listing files in root: {root}, category: {category}, subpath: {subpath}")
|
|
files = []
|
|
|
|
categories = [category] if category else list(FileCategory)
|
|
|
|
for cat in categories:
|
|
# Scan the category subfolder + optional subpath
|
|
base_dir = root / cat.value
|
|
if subpath:
|
|
target_dir = self.validate_path(base_dir / subpath)
|
|
else:
|
|
target_dir = base_dir
|
|
|
|
if not target_dir.exists():
|
|
continue
|
|
|
|
logger.debug(f"[StoragePlugin][Action] Scanning directory: {target_dir}")
|
|
|
|
# Use os.scandir for better performance and to distinguish files vs dirs
|
|
with os.scandir(target_dir) as it:
|
|
for entry in it:
|
|
# Skip logs
|
|
if "Logs" in entry.path:
|
|
continue
|
|
|
|
stat = entry.stat()
|
|
is_dir = entry.is_dir()
|
|
|
|
files.append(StoredFile(
|
|
name=entry.name,
|
|
path=str(Path(entry.path).relative_to(root)),
|
|
size=stat.st_size if not is_dir else 0,
|
|
created_at=datetime.fromtimestamp(stat.st_ctime),
|
|
category=cat,
|
|
mime_type="directory" if is_dir else None
|
|
))
|
|
|
|
# Sort: directories first, then by name
|
|
return sorted(files, key=lambda x: (x.mime_type != "directory", x.name))
|
|
# [/DEF:list_files:Function]
|
|
|
|
# [DEF:save_file:Function]
|
|
# @PURPOSE: Saves an uploaded file to the specified category and optional subpath.
|
|
# @PARAM: file (UploadFile) - The uploaded file.
|
|
# @PARAM: category (FileCategory) - The target category.
|
|
# @PARAM: subpath (Optional[str]) - The target subpath.
|
|
# @PRE: file must be a valid UploadFile; category must be valid.
|
|
# @POST: File is written to disk and metadata is returned.
|
|
# @RETURN: StoredFile - Metadata of the saved file.
|
|
# @SIDE_EFFECT: Writes file to disk.
|
|
async def save_file(self, file: UploadFile, category: FileCategory, subpath: Optional[str] = None) -> StoredFile:
|
|
with belief_scope("StoragePlugin:save_file"):
|
|
root = self.get_storage_root()
|
|
dest_dir = root / category.value
|
|
if subpath:
|
|
dest_dir = dest_dir / subpath
|
|
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
dest_path = self.validate_path(dest_dir / file.filename)
|
|
|
|
with dest_path.open("wb") as buffer:
|
|
shutil.copyfileobj(file.file, buffer)
|
|
|
|
stat = dest_path.stat()
|
|
return StoredFile(
|
|
name=dest_path.name,
|
|
path=str(dest_path.relative_to(root)),
|
|
size=stat.st_size,
|
|
created_at=datetime.fromtimestamp(stat.st_ctime),
|
|
category=category,
|
|
mime_type=file.content_type
|
|
)
|
|
# [/DEF:save_file:Function]
|
|
|
|
# [DEF:delete_file:Function]
|
|
# @PURPOSE: Deletes a file or directory from the specified category and path.
|
|
# @PARAM: category (FileCategory) - The category.
|
|
# @PARAM: path (str) - The relative path of the file or directory.
|
|
# @PRE: path must belong to the specified category and exist on disk.
|
|
# @POST: The file or directory is removed from disk.
|
|
# @SIDE_EFFECT: Removes item from disk.
|
|
def delete_file(self, category: FileCategory, path: str):
|
|
with belief_scope("StoragePlugin:delete_file"):
|
|
root = self.get_storage_root()
|
|
# path is relative to root, but we ensure it starts with category
|
|
full_path = self.validate_path(root / path)
|
|
|
|
if not str(Path(path)).startswith(category.value):
|
|
raise ValueError(f"Path {path} does not belong to category {category}")
|
|
|
|
if full_path.exists():
|
|
if full_path.is_dir():
|
|
shutil.rmtree(full_path)
|
|
else:
|
|
full_path.unlink()
|
|
logger.info(f"[StoragePlugin][Action] Deleted: {full_path}")
|
|
else:
|
|
raise FileNotFoundError(f"Item {path} not found")
|
|
# [/DEF:delete_file:Function]
|
|
|
|
# [DEF:get_file_path:Function]
|
|
# @PURPOSE: Returns the absolute path of a file for download.
|
|
# @PARAM: category (FileCategory) - The category.
|
|
# @PARAM: path (str) - The relative path of the file.
|
|
# @PRE: path must belong to the specified category and be a file.
|
|
# @POST: Returns the absolute Path to the file.
|
|
# @RETURN: Path - Absolute path to the file.
|
|
def get_file_path(self, category: FileCategory, path: str) -> Path:
|
|
with belief_scope("StoragePlugin:get_file_path"):
|
|
root = self.get_storage_root()
|
|
file_path = self.validate_path(root / path)
|
|
|
|
if not str(Path(path)).startswith(category.value):
|
|
raise ValueError(f"Path {path} does not belong to category {category}")
|
|
|
|
if not file_path.exists() or file_path.is_dir():
|
|
raise FileNotFoundError(f"File {path} not found")
|
|
|
|
return file_path
|
|
# [/DEF:get_file_path:Function]
|
|
|
|
# [/DEF:StoragePlugin:Class]
|
|
# [/DEF:StoragePlugin:Module] |