Module src.runtime.notebook_runtime

Expand source code
import datetime
import json
import os
import uuid
from typing import Union

import nbconvert
import nbformat
import requests
from websocket import create_connection

from runtime.iruntime import IRuntime


class NotebookRuntime(IRuntime):
    """
    Class managing the Jupyter Notebook via REST API (kernel management) and WebSocket (execution).

    Notebook is fully edited locally (with nbformat package) and single cells are executed on the Jupyter Server via WebSockets.
    """

    _ws_username = "username"
    _ws_jupyter_message_version = "5.3"

    def __init__(
        self,
        token: str,
        host: str = "127.0.0.1",
        port: int = 8888,
        use_https: bool = True,
    ):
        if use_https:
            self._base_url = f"https://{host}:{port}/api"
        else:
            self._base_url = f"http://{host}:{port}/api"
        self._session = requests.Session()
        self._session.headers.update(
            {
                "Authorization": f"token {token}",
                "Accept": "application/json",
                "Content-Type": "application/json",
            }
        )

        self._notebook = nbformat.v4.new_notebook()
        self._kernel_id = self._start_kernel()
        self._ws_session = uuid.uuid4().hex

        if use_https:
            ws_url = f"wss://{host}:{port}/api/kernels/{self._kernel_id}/channels"
        else:
            ws_url = f"ws://{host}:{port}/api/kernels/{self._kernel_id}/channels"

        self._ws = create_connection(
            ws_url,
            header={"Authorization": f"token {token}"},
            timeout=10.0,
        )

    def __del__(self):
        self._session.close()
        self._ws.close()

    def set_report_title(self, title: str) -> None:
        """Sets the title of the report."""
        self._notebook.metadata["title"] = title

    def add_description(self, description: str) -> int:
        """Adds a cell with the description of the process."""
        self._notebook.cells.append(nbformat.v4.new_markdown_cell(description))
        return len(self._notebook.cells) - 1

    def add_code(self, code: str) -> int:
        """Adds a cell with the code."""
        self._notebook.cells.append(nbformat.v4.new_code_cell(code))
        return len(self._notebook.cells) - 1

    def remove_cell(self, cell_index: int = -1) -> None:
        """Removes the cell with the given index."""
        del self._notebook.cells[cell_index]

    def execute_cell(self, cell_index: int = -1) -> None:
        """Executes a cell with the given index, if it is a code cell. Otherwise, does nothing."""
        if self._notebook.cells[cell_index].cell_type == "code":
            self._notebook.cells[cell_index] = self._execute_cell(
                self._notebook.cells[cell_index]
            )

    def get_content(self, cell_index: int = -1) -> str:
        """Returns the content of the cell with the given index."""
        return self._notebook.cells[cell_index].source

    def get_cell_output_stream(self, cell_index: int = -1) -> Union[str, None]:
        """Returns the output (only stdout and stderr, no media) of the cell with the given index, if it is a code cell. Otherwise, returns None."""
        cell = self._notebook.cells[cell_index]
        if cell.cell_type != "code":
            return None

        out_stream = ""
        for output in cell.outputs:
            match output.output_type:
                case "stream":
                    out_stream += output.text.replace("\r", "")
                case "execute_result" | "display_data":
                    if output.data["text/plain"] and output.data["text/plain"] != "":
                        out_stream += output.data["text/plain"]
                case "error":
                    out_stream += output.ename + "\n"
                    out_stream += output.evalue + "\n"
                    out_stream += "\n".join(output.traceback)
                case _:
                    pass

        return out_stream

    def check_if_plot_in_output(self, cell_index: int = -1) -> bool:
        """Checks if the cell with the given index contains a plot."""
        cell = self._notebook.cells[cell_index]
        if cell.cell_type != "code":
            return False

        for output in cell.outputs:
            if output.output_type == "display_data" and "image/png" in output.data:
                return True

    def upload_file(self, local_path: str, dest_file_path: str) -> None:
        """Uploads a file to the Jupyter Server."""
        if not os.path.exists(local_path):
            raise FileNotFoundError("File does not exist")

        filename = os.path.basename(dest_file_path)
        url = f"{self._base_url}/contents/{dest_file_path}"

        with open(local_path, "rb") as f:
            body = {
                "name": filename,
                "path": dest_file_path,
                "type": "file",
                "format": "text",
                "content": f.read().decode("utf-8"),
            }
            response = self._session.put(url, json=body)

        response.raise_for_status()

    def generate_report(self, dest_dir: str, filename: str) -> str:
        """
        Generate a PDF report from the notebook and save it to the specified directory.

        Args:
            dest_dir (str): The destination directory where the PDF report will be saved.
            filename (str): The name of the PDF report file.

        Returns:
            str: The path to the generated PDF report file.
        """
        exporter = nbconvert.PDFExporter()
        body, resources = exporter.from_notebook_node(self._notebook)

        output_path = f"{dest_dir}/{filename}.pdf"
        with open(output_path, "wb") as f:
            f.write(body)
        return output_path

    def _execute_cell(
        self, cell: nbformat.notebooknode.NotebookNode
    ) -> nbformat.notebooknode.NotebookNode:
        """Executes the given cell in IPython kernel and save the result to the cell."""

        # TODO: Set timeout

        # Clear previous outputs
        cell.outputs = []

        content = {
            "code": cell.source,
            "silent": False,
            "store_history": True,
            "user_expressions": {},
            "allow_stdin": False,
            "stop_on_error": True,
        }

        msg = self._create_ws_message("execute_request", content)
        self._ws.send(json.dumps(msg))

        idle_signal_received = False
        execute_reply_received = False

        while not idle_signal_received or not execute_reply_received:
            response = json.loads(self._ws.recv())

            if (
                response["parent_header"].get("msg_id") is None
                or response["parent_header"]["msg_id"] != msg["msg_id"]
            ):
                # Message not related to the execution request
                continue

            match response["msg_type"]:
                case "status" if response["content"]["execution_state"] == "idle":
                    idle_signal_received = True
                case "execute_reply":
                    execute_reply_received = True
                    cell.execution_count = response["content"]["execution_count"]
                case "stream" | "display_data" | "execute_result" | "error":
                    cell.outputs.append(nbformat.v4.output_from_msg(response))
                case _:
                    pass

        return cell

    def _create_ws_message(self, msg_type: str, content: dict) -> dict:
        """Creates a message for the Jupyter Server via WebSocket."""
        header = {
            "msg_id": uuid.uuid4().hex,  # Must be unique per message
            "username": self._ws_username,  # Useful in collaborative settings where multiple users may be interacting with the same kernel simultaneously, so that frontends can label the various messages in a meaningful way.
            "session": self._ws_session,  # A client session id, in message headers from a client, should be unique among all clients connected to a kernel. When a client reconnects to a kernel, it should use the same client session id in its message headers. When a client restarts, it should generate a new client session id.
            "data": datetime.datetime.now().isoformat(),
            "msg_type": msg_type,
            "version": self._ws_jupyter_message_version,  # The version of the Jupyter messaging protocol that the message conforms to. This is distinct from the version of the overall Jupyter protocol, which is the version of the overall protocol that the message conforms to. The version of the overall protocol is specified in the outermost header of the message.
        }
        msg = {
            "header": header,
            "msg_id": header["msg_id"],  # Python API extension
            "msg_type": msg_type,  # Python API extension
            "parent_header": {},  # We assume this class is not providing any responses
            "metadata": {},
            "content": content,
            "buffers": [],
            "channel": "shell",  # Value not in Jupyter messaging protocol specification. Added by Jupyter Server (since one endpoint represents multiple channels of interal communication).
        }
        return msg

    def _start_kernel(self) -> str:
        """Starts the kernel on the Jupyter Server."""

        url = f"{self._base_url}/kernels"
        response = self._session.post(url, json={"name": "python3"})
        response.raise_for_status()
        return response.json()["id"]

    def _restart_kernel(self) -> None:
        """Restarts the kernel on the Jupyter Server."""

        url = f"{self._base_url}/kernels/{self._kernel_id}/restart"
        response = self._session.post(url)
        response.raise_for_status()

Classes

class NotebookRuntime (token: str, host: str = '127.0.0.1', port: int = 8888, use_https: bool = True)

Class managing the Jupyter Notebook via REST API (kernel management) and WebSocket (execution).

Notebook is fully edited locally (with nbformat package) and single cells are executed on the Jupyter Server via WebSockets.

Expand source code
class NotebookRuntime(IRuntime):
    """
    Class managing the Jupyter Notebook via REST API (kernel management) and WebSocket (execution).

    Notebook is fully edited locally (with nbformat package) and single cells are executed on the Jupyter Server via WebSockets.
    """

    _ws_username = "username"
    _ws_jupyter_message_version = "5.3"

    def __init__(
        self,
        token: str,
        host: str = "127.0.0.1",
        port: int = 8888,
        use_https: bool = True,
    ):
        if use_https:
            self._base_url = f"https://{host}:{port}/api"
        else:
            self._base_url = f"http://{host}:{port}/api"
        self._session = requests.Session()
        self._session.headers.update(
            {
                "Authorization": f"token {token}",
                "Accept": "application/json",
                "Content-Type": "application/json",
            }
        )

        self._notebook = nbformat.v4.new_notebook()
        self._kernel_id = self._start_kernel()
        self._ws_session = uuid.uuid4().hex

        if use_https:
            ws_url = f"wss://{host}:{port}/api/kernels/{self._kernel_id}/channels"
        else:
            ws_url = f"ws://{host}:{port}/api/kernels/{self._kernel_id}/channels"

        self._ws = create_connection(
            ws_url,
            header={"Authorization": f"token {token}"},
            timeout=10.0,
        )

    def __del__(self):
        self._session.close()
        self._ws.close()

    def set_report_title(self, title: str) -> None:
        """Sets the title of the report."""
        self._notebook.metadata["title"] = title

    def add_description(self, description: str) -> int:
        """Adds a cell with the description of the process."""
        self._notebook.cells.append(nbformat.v4.new_markdown_cell(description))
        return len(self._notebook.cells) - 1

    def add_code(self, code: str) -> int:
        """Adds a cell with the code."""
        self._notebook.cells.append(nbformat.v4.new_code_cell(code))
        return len(self._notebook.cells) - 1

    def remove_cell(self, cell_index: int = -1) -> None:
        """Removes the cell with the given index."""
        del self._notebook.cells[cell_index]

    def execute_cell(self, cell_index: int = -1) -> None:
        """Executes a cell with the given index, if it is a code cell. Otherwise, does nothing."""
        if self._notebook.cells[cell_index].cell_type == "code":
            self._notebook.cells[cell_index] = self._execute_cell(
                self._notebook.cells[cell_index]
            )

    def get_content(self, cell_index: int = -1) -> str:
        """Returns the content of the cell with the given index."""
        return self._notebook.cells[cell_index].source

    def get_cell_output_stream(self, cell_index: int = -1) -> Union[str, None]:
        """Returns the output (only stdout and stderr, no media) of the cell with the given index, if it is a code cell. Otherwise, returns None."""
        cell = self._notebook.cells[cell_index]
        if cell.cell_type != "code":
            return None

        out_stream = ""
        for output in cell.outputs:
            match output.output_type:
                case "stream":
                    out_stream += output.text.replace("\r", "")
                case "execute_result" | "display_data":
                    if output.data["text/plain"] and output.data["text/plain"] != "":
                        out_stream += output.data["text/plain"]
                case "error":
                    out_stream += output.ename + "\n"
                    out_stream += output.evalue + "\n"
                    out_stream += "\n".join(output.traceback)
                case _:
                    pass

        return out_stream

    def check_if_plot_in_output(self, cell_index: int = -1) -> bool:
        """Checks if the cell with the given index contains a plot."""
        cell = self._notebook.cells[cell_index]
        if cell.cell_type != "code":
            return False

        for output in cell.outputs:
            if output.output_type == "display_data" and "image/png" in output.data:
                return True

    def upload_file(self, local_path: str, dest_file_path: str) -> None:
        """Uploads a file to the Jupyter Server."""
        if not os.path.exists(local_path):
            raise FileNotFoundError("File does not exist")

        filename = os.path.basename(dest_file_path)
        url = f"{self._base_url}/contents/{dest_file_path}"

        with open(local_path, "rb") as f:
            body = {
                "name": filename,
                "path": dest_file_path,
                "type": "file",
                "format": "text",
                "content": f.read().decode("utf-8"),
            }
            response = self._session.put(url, json=body)

        response.raise_for_status()

    def generate_report(self, dest_dir: str, filename: str) -> str:
        """
        Generate a PDF report from the notebook and save it to the specified directory.

        Args:
            dest_dir (str): The destination directory where the PDF report will be saved.
            filename (str): The name of the PDF report file.

        Returns:
            str: The path to the generated PDF report file.
        """
        exporter = nbconvert.PDFExporter()
        body, resources = exporter.from_notebook_node(self._notebook)

        output_path = f"{dest_dir}/{filename}.pdf"
        with open(output_path, "wb") as f:
            f.write(body)
        return output_path

    def _execute_cell(
        self, cell: nbformat.notebooknode.NotebookNode
    ) -> nbformat.notebooknode.NotebookNode:
        """Executes the given cell in IPython kernel and save the result to the cell."""

        # TODO: Set timeout

        # Clear previous outputs
        cell.outputs = []

        content = {
            "code": cell.source,
            "silent": False,
            "store_history": True,
            "user_expressions": {},
            "allow_stdin": False,
            "stop_on_error": True,
        }

        msg = self._create_ws_message("execute_request", content)
        self._ws.send(json.dumps(msg))

        idle_signal_received = False
        execute_reply_received = False

        while not idle_signal_received or not execute_reply_received:
            response = json.loads(self._ws.recv())

            if (
                response["parent_header"].get("msg_id") is None
                or response["parent_header"]["msg_id"] != msg["msg_id"]
            ):
                # Message not related to the execution request
                continue

            match response["msg_type"]:
                case "status" if response["content"]["execution_state"] == "idle":
                    idle_signal_received = True
                case "execute_reply":
                    execute_reply_received = True
                    cell.execution_count = response["content"]["execution_count"]
                case "stream" | "display_data" | "execute_result" | "error":
                    cell.outputs.append(nbformat.v4.output_from_msg(response))
                case _:
                    pass

        return cell

    def _create_ws_message(self, msg_type: str, content: dict) -> dict:
        """Creates a message for the Jupyter Server via WebSocket."""
        header = {
            "msg_id": uuid.uuid4().hex,  # Must be unique per message
            "username": self._ws_username,  # Useful in collaborative settings where multiple users may be interacting with the same kernel simultaneously, so that frontends can label the various messages in a meaningful way.
            "session": self._ws_session,  # A client session id, in message headers from a client, should be unique among all clients connected to a kernel. When a client reconnects to a kernel, it should use the same client session id in its message headers. When a client restarts, it should generate a new client session id.
            "data": datetime.datetime.now().isoformat(),
            "msg_type": msg_type,
            "version": self._ws_jupyter_message_version,  # The version of the Jupyter messaging protocol that the message conforms to. This is distinct from the version of the overall Jupyter protocol, which is the version of the overall protocol that the message conforms to. The version of the overall protocol is specified in the outermost header of the message.
        }
        msg = {
            "header": header,
            "msg_id": header["msg_id"],  # Python API extension
            "msg_type": msg_type,  # Python API extension
            "parent_header": {},  # We assume this class is not providing any responses
            "metadata": {},
            "content": content,
            "buffers": [],
            "channel": "shell",  # Value not in Jupyter messaging protocol specification. Added by Jupyter Server (since one endpoint represents multiple channels of interal communication).
        }
        return msg

    def _start_kernel(self) -> str:
        """Starts the kernel on the Jupyter Server."""

        url = f"{self._base_url}/kernels"
        response = self._session.post(url, json={"name": "python3"})
        response.raise_for_status()
        return response.json()["id"]

    def _restart_kernel(self) -> None:
        """Restarts the kernel on the Jupyter Server."""

        url = f"{self._base_url}/kernels/{self._kernel_id}/restart"
        response = self._session.post(url)
        response.raise_for_status()

Ancestors

  • runtime.iruntime.IRuntime
  • abc.ABC

Methods

def add_code(self, code: str) ‑> int

Adds a cell with the code.

Expand source code
def add_code(self, code: str) -> int:
    """Adds a cell with the code."""
    self._notebook.cells.append(nbformat.v4.new_code_cell(code))
    return len(self._notebook.cells) - 1
def add_description(self, description: str) ‑> int

Adds a cell with the description of the process.

Expand source code
def add_description(self, description: str) -> int:
    """Adds a cell with the description of the process."""
    self._notebook.cells.append(nbformat.v4.new_markdown_cell(description))
    return len(self._notebook.cells) - 1
def check_if_plot_in_output(self, cell_index: int = -1) ‑> bool

Checks if the cell with the given index contains a plot.

Expand source code
def check_if_plot_in_output(self, cell_index: int = -1) -> bool:
    """Checks if the cell with the given index contains a plot."""
    cell = self._notebook.cells[cell_index]
    if cell.cell_type != "code":
        return False

    for output in cell.outputs:
        if output.output_type == "display_data" and "image/png" in output.data:
            return True
def execute_cell(self, cell_index: int = -1) ‑> None

Executes a cell with the given index, if it is a code cell. Otherwise, does nothing.

Expand source code
def execute_cell(self, cell_index: int = -1) -> None:
    """Executes a cell with the given index, if it is a code cell. Otherwise, does nothing."""
    if self._notebook.cells[cell_index].cell_type == "code":
        self._notebook.cells[cell_index] = self._execute_cell(
            self._notebook.cells[cell_index]
        )
def generate_report(self, dest_dir: str, filename: str) ‑> str

Generate a PDF report from the notebook and save it to the specified directory.

Args

dest_dir : str
The destination directory where the PDF report will be saved.
filename : str
The name of the PDF report file.

Returns

str
The path to the generated PDF report file.
Expand source code
def generate_report(self, dest_dir: str, filename: str) -> str:
    """
    Generate a PDF report from the notebook and save it to the specified directory.

    Args:
        dest_dir (str): The destination directory where the PDF report will be saved.
        filename (str): The name of the PDF report file.

    Returns:
        str: The path to the generated PDF report file.
    """
    exporter = nbconvert.PDFExporter()
    body, resources = exporter.from_notebook_node(self._notebook)

    output_path = f"{dest_dir}/{filename}.pdf"
    with open(output_path, "wb") as f:
        f.write(body)
    return output_path
def get_cell_output_stream(self, cell_index: int = -1) ‑> Optional[str]

Returns the output (only stdout and stderr, no media) of the cell with the given index, if it is a code cell. Otherwise, returns None.

Expand source code
def get_cell_output_stream(self, cell_index: int = -1) -> Union[str, None]:
    """Returns the output (only stdout and stderr, no media) of the cell with the given index, if it is a code cell. Otherwise, returns None."""
    cell = self._notebook.cells[cell_index]
    if cell.cell_type != "code":
        return None

    out_stream = ""
    for output in cell.outputs:
        match output.output_type:
            case "stream":
                out_stream += output.text.replace("\r", "")
            case "execute_result" | "display_data":
                if output.data["text/plain"] and output.data["text/plain"] != "":
                    out_stream += output.data["text/plain"]
            case "error":
                out_stream += output.ename + "\n"
                out_stream += output.evalue + "\n"
                out_stream += "\n".join(output.traceback)
            case _:
                pass

    return out_stream
def get_content(self, cell_index: int = -1) ‑> str

Returns the content of the cell with the given index.

Expand source code
def get_content(self, cell_index: int = -1) -> str:
    """Returns the content of the cell with the given index."""
    return self._notebook.cells[cell_index].source
def remove_cell(self, cell_index: int = -1) ‑> None

Removes the cell with the given index.

Expand source code
def remove_cell(self, cell_index: int = -1) -> None:
    """Removes the cell with the given index."""
    del self._notebook.cells[cell_index]
def set_report_title(self, title: str) ‑> None

Sets the title of the report.

Expand source code
def set_report_title(self, title: str) -> None:
    """Sets the title of the report."""
    self._notebook.metadata["title"] = title
def upload_file(self, local_path: str, dest_file_path: str) ‑> None

Uploads a file to the Jupyter Server.

Expand source code
def upload_file(self, local_path: str, dest_file_path: str) -> None:
    """Uploads a file to the Jupyter Server."""
    if not os.path.exists(local_path):
        raise FileNotFoundError("File does not exist")

    filename = os.path.basename(dest_file_path)
    url = f"{self._base_url}/contents/{dest_file_path}"

    with open(local_path, "rb") as f:
        body = {
            "name": filename,
            "path": dest_file_path,
            "type": "file",
            "format": "text",
            "content": f.read().decode("utf-8"),
        }
        response = self._session.put(url, json=body)

    response.raise_for_status()