Файловое хранилище готово

This commit is contained in:
2026-01-26 11:08:18 +03:00
parent a542e7d2df
commit edf9286071
35 changed files with 377 additions and 497 deletions

View File

@@ -98,14 +98,43 @@ class StoragePlugin(PluginBase):
def get_storage_root(self) -> Path:
with belief_scope("StoragePlugin:get_storage_root"):
config_manager = get_config_manager()
storage_config = config_manager.get_config().settings.storage
root = Path(storage_config.root_path)
global_settings = config_manager.get_config().settings
# Use storage.root_path as the source of truth for storage UI
root = Path(global_settings.storage.root_path)
if not root.is_absolute():
# Resolve relative to the workspace root (ss-tools)
root = (Path(__file__).parents[4] / root).resolve()
# Resolve relative to the backend directory
# Path(__file__) is backend/src/plugins/storage/plugin.py
# parents[3] is the project root (ss-tools)
# We need to ensure it's relative to where backend/ is
project_root = Path(__file__).parents[3]
root = (project_root / root).resolve()
return root
# [/DEF:get_storage_root:Function]
# [DEF:resolve_path:Function]
# @PURPOSE: Resolves a dynamic path pattern using provided variables.
# @PARAM: pattern (str) - The path pattern to resolve.
# @PARAM: variables (Dict[str, str]) - Variables to substitute in the pattern.
# @RETURN: str - The resolved path.
def resolve_path(self, pattern: str, variables: Dict[str, str]) -> str:
with belief_scope("StoragePlugin:resolve_path"):
# Add common variables
vars_with_defaults = {
"timestamp": datetime.now().strftime("%Y%m%dT%H%M%S"),
**variables
}
try:
resolved = pattern.format(**vars_with_defaults)
# Clean up any double slashes or leading/trailing slashes for relative path
return os.path.normpath(resolved).strip("/")
except KeyError as e:
logger.warning(f"[StoragePlugin][Coherence:Failed] Missing variable for path resolution: {e}")
# Fallback to literal pattern if formatting fails partially (or handle as needed)
return pattern.replace("{", "").replace("}", "")
# [/DEF:resolve_path:Function]
# [DEF:ensure_directories:Function]
# @PURPOSE: Creates the storage root and category subdirectories if they don't exist.
# @SIDE_EFFECT: Creates directories on the filesystem.
@@ -113,7 +142,8 @@ class StoragePlugin(PluginBase):
with belief_scope("StoragePlugin:ensure_directories"):
root = self.get_storage_root()
for category in FileCategory:
path = root / f"{category.value}s"
# Use singular name for consistency with BackupPlugin and GitService
path = root / category.value
path.mkdir(parents=True, exist_ok=True)
logger.debug(f"[StoragePlugin][Action] Ensured directory: {path}")
# [/DEF:ensure_directories:Function]
@@ -135,46 +165,68 @@ class StoragePlugin(PluginBase):
# [/DEF:validate_path:Function]
# [DEF:list_files:Function]
# @PURPOSE: Lists all files in a specific category.
# @PURPOSE: Lists all files and directories in a specific category and subpath.
# @PARAM: category (Optional[FileCategory]) - The category to list.
# @RETURN: List[StoredFile] - List of file metadata objects.
def list_files(self, category: Optional[FileCategory] = None) -> List[StoredFile]:
# @PARAM: subpath (Optional[str]) - Nested path within the category.
# @RETURN: List[StoredFile] - List of file and directory metadata objects.
def list_files(self, category: Optional[FileCategory] = None, subpath: Optional[str] = None) -> List[StoredFile]:
with belief_scope("StoragePlugin:list_files"):
root = self.get_storage_root()
logger.info(f"[StoragePlugin][Action] Listing files in root: {root}, category: {category}, subpath: {subpath}")
files = []
categories = [category] if category else list(FileCategory)
for cat in categories:
cat_dir = root / f"{cat.value}s"
if not cat_dir.exists():
# Scan the category subfolder + optional subpath
base_dir = root / cat.value
if subpath:
target_dir = self.validate_path(base_dir / subpath)
else:
target_dir = base_dir
if not target_dir.exists():
continue
for item in cat_dir.iterdir():
if item.is_file():
stat = item.stat()
logger.debug(f"[StoragePlugin][Action] Scanning directory: {target_dir}")
# Use os.scandir for better performance and to distinguish files vs dirs
with os.scandir(target_dir) as it:
for entry in it:
# Skip logs
if "Logs" in entry.path:
continue
stat = entry.stat()
is_dir = entry.is_dir()
files.append(StoredFile(
name=item.name,
path=str(item.relative_to(root)),
size=stat.st_size,
name=entry.name,
path=str(Path(entry.path).relative_to(root)),
size=stat.st_size if not is_dir else 0,
created_at=datetime.fromtimestamp(stat.st_ctime),
category=cat,
mime_type=None # Could use python-magic here if needed
mime_type="directory" if is_dir else None
))
return sorted(files, key=lambda x: x.created_at, reverse=True)
# Sort: directories first, then by name
return sorted(files, key=lambda x: (x.mime_type != "directory", x.name))
# [/DEF:list_files:Function]
# [DEF:save_file:Function]
# @PURPOSE: Saves an uploaded file to the specified category.
# @PURPOSE: Saves an uploaded file to the specified category and optional subpath.
# @PARAM: file (UploadFile) - The uploaded file.
# @PARAM: category (FileCategory) - The target category.
# @PARAM: subpath (Optional[str]) - The target subpath.
# @RETURN: StoredFile - Metadata of the saved file.
# @SIDE_EFFECT: Writes file to disk.
async def save_file(self, file: UploadFile, category: FileCategory) -> StoredFile:
async def save_file(self, file: UploadFile, category: FileCategory, subpath: Optional[str] = None) -> StoredFile:
with belief_scope("StoragePlugin:save_file"):
root = self.get_storage_root()
dest_dir = root / f"{category.value}s"
dest_dir = root / category.value
if subpath:
dest_dir = dest_dir / subpath
dest_dir.mkdir(parents=True, exist_ok=True)
dest_path = self.validate_path(dest_dir / file.filename)
@@ -194,34 +246,44 @@ class StoragePlugin(PluginBase):
# [/DEF:save_file:Function]
# [DEF:delete_file:Function]
# @PURPOSE: Deletes a file from the specified category.
# @PURPOSE: Deletes a file or directory from the specified category and path.
# @PARAM: category (FileCategory) - The category.
# @PARAM: filename (str) - The name of the file.
# @SIDE_EFFECT: Removes file from disk.
def delete_file(self, category: FileCategory, filename: str):
# @PARAM: path (str) - The relative path of the file or directory.
# @SIDE_EFFECT: Removes item from disk.
def delete_file(self, category: FileCategory, path: str):
with belief_scope("StoragePlugin:delete_file"):
root = self.get_storage_root()
file_path = self.validate_path(root / f"{category.value}s" / filename)
# path is relative to root, but we ensure it starts with category
full_path = self.validate_path(root / path)
if file_path.exists():
file_path.unlink()
logger.info(f"[StoragePlugin][Action] Deleted file: {file_path}")
if not str(Path(path)).startswith(category.value):
raise ValueError(f"Path {path} does not belong to category {category}")
if full_path.exists():
if full_path.is_dir():
shutil.rmtree(full_path)
else:
full_path.unlink()
logger.info(f"[StoragePlugin][Action] Deleted: {full_path}")
else:
raise FileNotFoundError(f"File {filename} not found in {category.value}s")
raise FileNotFoundError(f"Item {path} not found")
# [/DEF:delete_file:Function]
# [DEF:get_file_path:Function]
# @PURPOSE: Returns the absolute path of a file for download.
# @PARAM: category (FileCategory) - The category.
# @PARAM: filename (str) - The name of the file.
# @PARAM: path (str) - The relative path of the file.
# @RETURN: Path - Absolute path to the file.
def get_file_path(self, category: FileCategory, filename: str) -> Path:
def get_file_path(self, category: FileCategory, path: str) -> Path:
with belief_scope("StoragePlugin:get_file_path"):
root = self.get_storage_root()
file_path = self.validate_path(root / f"{category.value}s" / filename)
file_path = self.validate_path(root / path)
if not file_path.exists():
raise FileNotFoundError(f"File {filename} not found in {category.value}s")
if not str(Path(path)).startswith(category.value):
raise ValueError(f"Path {path} does not belong to category {category}")
if not file_path.exists() or file_path.is_dir():
raise FileNotFoundError(f"File {path} not found")
return file_path
# [/DEF:get_file_path:Function]