Source code for app.jupyter_helper

import requests
import time
import json
import logging
import os
from fastapi import HTTPException

JUPYTERHUB_URL = "http://localhost:8080/hub/api"
API_TOKEN = "8179213ad63c46d5b9e4abff384b3e1f"
HEADERS = {"Authorization": f"token {API_TOKEN}"}
LOGGING_LEVEL = os.getenv("LOGGING_LEVEL", "INFO").upper()
# Set up logging
logging.basicConfig(
    level=getattr(logging, LOGGING_LEVEL, logging.INFO),  # Set logging level to DEBUG for detailed logs
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(), # Log to console
        logging.FileHandler("staging_service.log")
    ]
)
logger = logging.getLogger(__name__)

[docs] def event_stream(session, url): """ Generator yielding events from a JSON event stream. Args: session: Session object for making requests. url (str): URL of the event stream. Yields: dict: Event data. """ r = session.get(url, stream=True) r.raise_for_status() for line in r.iter_lines(): line = line.decode('utf8', 'replace') if line.startswith('data:'): yield json.loads(line.split(':', 1)[1])
[docs] def get_user_status(user_name): """ Get detailed user status. Args: user_name (str): Name of the user. Returns: dict: User status data. Raises: HTTPException: If the request to fetch user status fails. """ try: url = f"{JUPYTERHUB_URL}/hub/api/users/{user_name}" response = requests.get(url, headers=HEADERS) response.raise_for_status() return response.json() except requests.exceptions.RequestException as e: logger.exception(f"Failed to fetch user status: {e}") raise HTTPException(status_code=500, detail=f"Failed to fetch user status: {e}")
[docs] def start_user_server(user_name, server_name=""): """ Start a JupyterHub server and wait for it to be ready. Args: user_name (str): Name of the user. server_name (str, optional): Name of the server. Defaults to "". Returns: dict: Server data. Raises: HTTPException: If the request to start the server fails. """ user_url = f"{JUPYTERHUB_URL}/users/{user_name}" session = requests.Session() # Check user status user_model = get_user_status(user_name) if server_name not in user_model.get('servers', {}): # Server not active, request launch response = session.post(f"{user_url}/servers/{server_name}", headers=HEADERS) response.raise_for_status() if response.status_code == 202: # Wait for the server to be ready progress_url = f"{user_model['servers'][server_name]['progress_url']}" for event in event_stream(session, f"{JUPYTERHUB_URL}{progress_url}"): print(f"Progress {event['progress']}%: {event['message']}") if event.get("ready"): return event["url"] # Server is already running or just launched server = user_model["servers"][server_name] return server["url"]
[docs] def stop_user_server(user_name, server_name=""): """ Stop a server and wait for it to complete. Args: user_name (str): Name of the user. server_name (str, optional): Name of the server. Defaults to "". Returns: dict: Server data. Raises: HTTPException: If the request to stop the server fails. """ user_url = f"{JUPYTERHUB_URL}/users/{user_name}" server_url = f"{user_url}/servers/{server_name}" session = requests.Session() response = session.delete(server_url, headers=HEADERS) if response.status_code == 404: print(f"Server {user_name}/{server_name} already stopped.") return response.raise_for_status() # Wait for the server to be stopped while True: user_model = get_user_status(user_name) if server_name not in user_model.get("servers", {}): print(f"Server {user_name}/{server_name} stopped.") break time.sleep(1)