Source code for app.staging_methods

import os
from shutil import copy, copytree
import logging
from fastapi.responses import FileResponse
from app.jupyter_helper import get_user_status, start_user_server
import requests

LOGGING_LEVEL = os.getenv("LOGGING_LEVEL", "INFO").upper()
# Set up logging
logging.basicConfig(
    level=getattr(logging, LOGGING_LEVEL, logging.INFO),  # Set logging level to DEBUG for detailed logs
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(), # Log to console
        logging.FileHandler("staging_service.log")
    ]
)
logger = logging.getLogger(__name__)

# Staging method: Copy
# --------------------------- COPY       --------------------------------------------
[docs] def local_copy(local_path, relative_path): """ Copies a file or directory from the local storage to the user area. Parameters ---------- local_path : str Path to the file or directory on the local storage. relative_path : str Path to the file or directory relative to the user area. Raises ------ ValueError If the path provided is invalid. Exception If any other error occurs during the copy operation. """ try: logger.info(f"Starting local_copy from {local_path} to {relative_path}") if os.path.isdir(local_path): # Copy entire directory copytree(local_path, relative_path, dirs_exist_ok=True) logger.info(f"Copied directory from {local_path} to {relative_path}") elif os.path.isfile(local_path): # Copy single file copy(local_path, relative_path) logger.info(f"Copied file from {local_path} to {relative_path}") else: raise ValueError(f"Invalid path: {local_path}") except Exception as e: logger.error(f"Error during local_copy from {local_path} to {relative_path}: {e}") raise
# Staging method: Symlink # --------------------------- SYMLINK COPY -------------------------------------------- # Staging method: Direct Download # --------------------------- DIRECT DOWNLOAD --------------------------------------------
[docs] def direct_download(local_path): """ Serves a file from the local storage using a direct download. Parameters ---------- local_path : str Path to the file on the local storage. Returns ------- FileResponse A FileResponse object containing the file contents. Raises ------ FileNotFoundError If the file does not exist at the provided path. Exception If any other error occurs during the direct download operation. """ try: logger.info(f"Starting direct_download for path {local_path}") if not os.path.exists(local_path): logger.error(f"File not found at path: {local_path}") raise FileNotFoundError(f"File not found at path: {local_path}") # Return a FileResponse for FastAPI to serve the file logger.info(f"Serving file from path: {local_path}") return FileResponse(local_path, media_type='application/octet-stream', filename=os.path.basename(local_path)) except Exception as e: logger.error(f"Error during direct_download: {e}") raise
# Staging method: Jupyter Copy # --------------------------- JUPYTER COPY --------------------------------------------
[docs] def jupyter_copy(local_path, relative_path, username, token): """ Copies a file from the local storage to the user's Jupyter server. Parameters ---------- local_path : str Path to the file on the local storage. relative_path : str Path to the file relative to the user area. username : str Username of the user to copy the file for. token : str Token to use for authentication with the Jupyter server. Raises ------ FileNotFoundError If the file does not exist at the provided path. Exception If any other error occurs during the copy operation. """ try: logger.info(f"Starting jupyter_copy for user {username}") # Ensure the server is running hub_url = os.getenv("JUPYTERHUB_URL", "http://localhost:8080") user_status = get_user_status(username) if "servers" not in user_status or not user_status["servers"]: logger.info(f"Server not running for user {username}. Starting server...") start_user_server(username, hub_url) # Refresh user status user_status = get_user_status(username) server_info = user_status.get("servers", {}).get("", None) if not server_info or not server_info.get("ready", False): raise Exception(f"Failed to start or verify readiness of server for user {username}.") # Check if the file exists on the Jupyter server jupyter_path = relative_path url = f"{hub_url}/hub/api/users/{username}/servers/{jupyter_path}/api/contents/{jupyter_path}" headers = {"Authorization": f"token {token}", "Content-Type": "application/json"} response = requests.get(url, headers=headers) if response.status_code == 200: logger.info(f"File already exists at {jupyter_path} for user {username}") return # File does not exist; proceed with the copy with open(local_path, 'rb') as file: data = { "content": file.read().decode('utf-8'), "format": "text", "type": "file" } response = requests.put(url, headers=headers, json=data) if response.status_code not in [200, 201]: raise Exception(f"Failed to copy file: {response.status_code} - {response.text}") logger.info(f"Successfully copied file to {jupyter_path} for user {username}") except Exception as e: logger.error(f"Error during jupyter_copy for user {username}: {e}") raise
# Add jupyter_copy to AVAILABLE_METHODS AVAILABLE_METHODS: dict[str, callable] = { "local_copy": local_copy, "local_symlink": local_symlink, "jupyter_copy": jupyter_copy, "direct_download": direct_download, }