myst_nb.core.execute.inline 源代码

"""Execute a notebook inline."""
from __future__ import annotations

import asyncio
from datetime import datetime
import re
import shutil
from tempfile import mkdtemp
import time
import traceback

from nbclient.client import (
    CellControlSignal,
    CellExecutionError,
    CellTimeoutError,
    DeadKernelError,
    NotebookClient,
    ensure_async,
    run_sync,
)
import nbformat
from nbformat import NotebookNode

from myst_nb.ext.glue import extract_glue_data_cell

from .base import EvalNameError, ExecutionError, NotebookClientBase


[文档] class NotebookClientInline(NotebookClientBase): """A notebook client that executes the notebook inline, i.e. during the render. This allows for the client to be called in-between code cell executions, in order to extract variable state. """ def start_client(self): self._tmp_path = None if self.nb_config.execution_in_temp: self._tmp_path = mkdtemp() resources = {"metadata": {"path": self._tmp_path}} else: if self.path is None: raise ValueError( "Input source must exist as file, if execution_in_temp=False" ) resources = {"metadata": {"path": str(self.path.parent)}} self.logger.info("Starting inline execution client") self._time_start = time.perf_counter() self._client = ModifiedNotebookClient( self.notebook, record_timing=False, resources=resources, allow_errors=self.nb_config.execution_allow_errors, timeout=self.nb_config.execution_timeout, ) self._client.reset_execution_trackers() if self._client.km is None: self._client.km = self._client.create_kernel_manager() if not self._client.km.has_kernel: self._client.start_new_kernel() self._client.start_new_kernel_client() # retrieve the the language_info from the kernel assert self._client.kc is not None msg_id = self._client.kc.kernel_info() info_msg = self._client.wait_for_reply(msg_id) if info_msg is not None and "language_info" in info_msg["content"]: self.notebook.metadata["language_info"] = info_msg["content"][ "language_info" ] else: self.logger.warning("Failed to retrieve language info from kernel") self._last_cell_executed: int = -1 self._cell_error: None | Exception = None self._exc_string: None | str = None def finalise_client(self): try: self._client.set_widgets_metadata() except Exception as exc: self.logger.warning(f"Failed to set widgets metadata: {exc}") def close_client(self, exc_type, exc_val, exc_tb): self.logger.info("Stopping inline execution client") if self._client.owns_km: self._client._cleanup_kernel() del self._client _exec_time = time.perf_counter() - self._time_start self.exec_metadata = { "mtime": datetime.now().timestamp(), "runtime": _exec_time, "method": self.nb_config.execution_mode, "succeeded": False if self._cell_error else True, "error": f"{self._cell_error.__class__.__name__}" if self._cell_error else None, "traceback": self._exc_string, } if not self._cell_error: self.logger.info(f"Executed notebook in {_exec_time:.2f} seconds") else: msg = f"Executing notebook failed: {self._cell_error.__class__.__name__}" if self.nb_config.execution_show_tb: msg += f"\n{self._exc_string}" self.logger.warning(msg, subtype="exec") if self._tmp_path: shutil.rmtree(self._tmp_path, ignore_errors=True) def code_cell_outputs( self, cell_index: int ) -> tuple[int | None, list[NotebookNode]]: cells = self.notebook.get("cells", []) # ensure all cells up to and including the requested cell have been executed while (not self._cell_error) and cell_index > self._last_cell_executed: self._last_cell_executed += 1 try: next_cell = cells[self._last_cell_executed] except IndexError: break try: self._client.execute_cell( next_cell, self._last_cell_executed, execution_count=self._client.code_cells_executed + 1, ) except (CellExecutionError, CellTimeoutError) as err: if self.nb_config.execution_raise_on_error: raise ExecutionError(str(self.path)) from err self._cell_error = err self._exc_string = "".join(traceback.format_exc()) for key, cell_data in extract_glue_data_cell(next_cell): if key in self._glue_data: self.logger.warning( f"glue key {key!r} duplicate", subtype="glue", line=self.cell_line(self._last_cell_executed), ) self._glue_data[key] = cell_data cell = cells[cell_index] return cell.get("execution_count", None), cell.get("outputs", []) def eval_variable(self, name: str) -> list[NotebookNode]: if not re.match(self.nb_config.eval_name_regex, name): raise EvalNameError(name) return self._client.eval_expression(name)
class ModifiedNotebookClient(NotebookClient): async def async_eval_expression(self, name: str) -> list[NotebookNode]: """Evaluate an expression in the kernel. This is a modified version of `async_execute_cell`, which executed a single cell, with `name` as the source and returns the result. """ assert self.kc is not None self.log.debug(f"Evaluating expression: {name}") parent_msg_id = await ensure_async( self.kc.execute( str(name), store_history=False, stop_on_error=False, ) ) cell = nbformat.v4.new_code_cell(source=str(name)) exec_timeout = self._get_timeout(cell) cell_index = -1 self.clear_before_next_output = False task_poll_kernel_alive = asyncio.ensure_future(self._async_poll_kernel_alive()) task_poll_output_msg = asyncio.ensure_future( self._async_poll_output_msg(parent_msg_id, cell, cell_index) ) self.task_poll_for_reply = asyncio.ensure_future( self._async_poll_for_reply( parent_msg_id, cell, exec_timeout, task_poll_output_msg, task_poll_kernel_alive, ) ) try: await self.task_poll_for_reply except asyncio.CancelledError: # can only be cancelled by task_poll_kernel_alive when the kernel is dead task_poll_output_msg.cancel() raise DeadKernelError("Kernel died") except Exception as e: # Best effort to cancel request if it hasn't been resolved try: # Check if the task_poll_output is doing the raising for us if not isinstance(e, CellControlSignal): task_poll_output_msg.cancel() finally: raise return cell.outputs eval_expression = run_sync(async_eval_expression)