Source code for app.utility

import os
import pwd
import grp
from fastapi import HTTPException
import subprocess
import logging

LOGGING_LEVEL = os.getenv("LOGGING_LEVEL", "INFO").upper()
# Set up logging
logging.basicConfig(
    level=getattr(logging, LOGGING_LEVEL, logging.INFO),  # Set logging level to DEBUG for detailed logs
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(), # Log to console
        logging.FileHandler("staging_service.log")
    ]
)
logger = logging.getLogger(__name__)




[docs] def set_read_only(path, username): """ Set the given path (file or directory) to have read-only permissions (0o400 for files, 0o500 for directories) and change the ownership to the given username. Args: path (str): The path to the file or directory to set read-only. username (str): The username to change the ownership to. Raises: HTTPException: If the username does not exist. """ try: # Get UID and GID for the user user_info = pwd.getpwnam(username) uid = user_info.pw_uid gid = user_info.pw_gid if os.path.isfile(path): os.chmod(path, 0o400) # Read-only for owner os.chown(path, uid, gid) # Change ownership to the user elif os.path.isdir(path): for root, dirs, files in os.walk(path): for file in files: file_path = os.path.join(root, file) os.chmod(file_path, 0o400) # Read-only for owner os.chown(file_path, uid, gid) # Change ownership to the user for directory in dirs: dir_path = os.path.join(root, directory) os.chmod(dir_path, 0o500) # Read-only and traversable for owner os.chown(dir_path, uid, gid) # Change ownership to the user os.chmod(path, 0o500) # Read-only and traversable for the root directory os.chown(path, uid, gid) # Change ownership to the root directory except KeyError: raise HTTPException(status_code=400, detail=f"User {username} does not exist.")
[docs] def ensure_user_exists(username: str): """ Ensure that a user with the specified username exists on the system. Args: username (str): The username to check and potentially create. Raises: HTTPException: If there is an error in checking or creating the user. """ try: # Check if the user exists result = subprocess.run( ["id", username], stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) if result.returncode != 0: logger.info(f"User {username} does not exist. Creating user.") # Create the user subprocess.run( ["useradd", "-m", username], check=True ) logger.info(f"User {username} created successfully.") else: logger.debug(f"User {username} already exists.") except Exception as e: logger.exception(f"Failed to ensure user exists: {str(e)}") raise HTTPException( status_code=500, detail=f"Failed to ensure user exists: {str(e)}" )