Cursor Python SDK stop hook blocks terminal completion

Where does the bug appear (feature/product)?

Cursor SDK

Describe the Bug

In normal Cursor usage, stop hooks are non-blocking relative to the visible
agent response: the assistant can finish and the session can reach terminal UI
state while the stop hook runs in the background.

In the local Cursor Python SDK, a project-level stop hook appears to block the
run’s terminal status:

  • assistant text streams normally;
  • then no FINISHED status is emitted until the stop hook exits;
  • run.messages() does not complete until the hook exits;
  • run.wait() does not return until the hook exits.

This is severe for streaming SDK UIs. A long-running stop hook makes a finished
assistant response look like a hung run. If the UI has an idle watchdog, it may
cancel and mark the local agent as needing repair even though the model already
answered.

Steps to Reproduce

One-Command Repro

The repro script is self-contained. It creates temporary workspaces and a
temporary HOME, writes a project-level stop hook, and runs three cases:

  • no_hook_project
  • stop_hook_project
  • stop_hook_project_user

It does not read real user settings, real project files, or print
CURSOR_API_KEY.

from any environment with cursor-sdk installed:

export CURSOR_API_KEY=...
python repro_stop_hook_blocks_terminal.py

The script exits with status 1 when the stop-hook cases delay terminal
completion by more than the configured threshold.

repro_stop_hook_blocks_terminal.py:

#!/usr/bin/env python3
"""Reproduce Cursor Python SDK stop hooks blocking terminal completion.

The script creates temporary workspaces and a temporary HOME, writes a project
stop hook that sleeps, and compares terminal timing with and without the hook.
It imports only cursor_sdk and standard library modules.
"""

from __future__ import annotations

import argparse
import json
import os
import stat
import subprocess
import sys
import tempfile
import threading
import time
from pathlib import Path
from typing import Any

from cursor_sdk import Agent, LocalAgentOptions


PROJECT_SKILL = "sdk-stop-hook-repro-project-skill"
USER_SKILL = "sdk-stop-hook-repro-user-skill"


def main() -> int:
    parser = argparse.ArgumentParser(
        description="Reproduce local Cursor Python SDK stop hooks delaying terminal completion."
    )
    parser.add_argument("--model", default="default", help="Cursor model id.")
    parser.add_argument("--hook-sleep", type=float, default=30.0, help="Seconds for the stop hook to sleep.")
    parser.add_argument(
        "--delay-threshold",
        type=float,
        default=5.0,
        help="Mark a stop-hook case as delayed if assistant->FINISHED exceeds this many seconds.",
    )
    parser.add_argument(
        "--wait-timeout",
        type=float,
        default=10.0,
        help="Seconds to wait for run.wait() after run.messages() completes.",
    )
    parser.add_argument(
        "--stream-idle-timeout",
        type=float,
        default=60.0,
        help="Seconds without SDK messages before cancelling the run as hung.",
    )
    parser.add_argument("--keep-temp", action="store_true", help="Keep the temporary fixture for inspection.")
    args = parser.parse_args()

    if not os.environ.get("CURSOR_API_KEY"):
        print("CURSOR_API_KEY is required", file=sys.stderr)
        return 2

    with tempfile.TemporaryDirectory(prefix="cursor-sdk-stop-hook-blocks-terminal-") as tmp:
        root = Path(tmp)
        no_hook_workspace = root / "no-hook-workspace"
        stop_hook_workspace = root / "stop-hook-workspace"
        fake_home = root / "home"

        for workspace in (no_hook_workspace, stop_hook_workspace):
            workspace.mkdir(parents=True)
            git_init(workspace)
            write_skill(
                workspace / ".agents" / "skills" / PROJECT_SKILL / "SKILL.md",
                PROJECT_SKILL,
                "SDK_STOP_HOOK_REPRO_PROJECT_SKILL_SENTINEL",
            )
        fake_home.mkdir(parents=True)
        write_skill(
            fake_home / ".cursor" / "skills" / USER_SKILL / "SKILL.md",
            USER_SKILL,
            "SDK_STOP_HOOK_REPRO_USER_SKILL_SENTINEL",
        )
        write_stop_hook(stop_hook_workspace, sleep_seconds=args.hook_sleep)

        results = [
            run_case(
                name="no_hook_project",
                workspace=no_hook_workspace,
                setting_sources=["project"],
                model=args.model,
                wait_timeout=args.wait_timeout,
                stream_idle_timeout=args.stream_idle_timeout,
                delay_threshold=args.delay_threshold,
            ),
            run_case(
                name="stop_hook_project",
                workspace=stop_hook_workspace,
                setting_sources=["project"],
                model=args.model,
                wait_timeout=args.wait_timeout,
                stream_idle_timeout=args.stream_idle_timeout,
                delay_threshold=args.delay_threshold,
            ),
            run_case(
                name="stop_hook_project_user",
                workspace=stop_hook_workspace,
                setting_sources=["project", "user"],
                model=args.model,
                wait_timeout=args.wait_timeout,
                stream_idle_timeout=args.stream_idle_timeout,
                delay_threshold=args.delay_threshold,
                home=fake_home,
            ),
        ]

        report = {
            "fixture": {
                "workspace": "$TEMP_WORKSPACE",
                "home": "$TEMP_HOME",
                "hook_sleep_seconds": args.hook_sleep,
                "delay_threshold_seconds": args.delay_threshold,
            },
            "cases": results,
        }
        print(json.dumps(report, ensure_ascii=False, indent=2))

        if args.keep_temp:
            print(f"Temporary fixture kept at: {root}", file=sys.stderr)
            input("Press Enter after inspecting the fixture to remove it...")

    return 1 if any(case["delayed_by_stop_hook"] for case in results if case["has_stop_hook"]) else 0


def run_case(
    *,
    name: str,
    workspace: Path,
    setting_sources: list[str],
    model: str,
    wait_timeout: float,
    stream_idle_timeout: float,
    delay_threshold: float,
    home: Path | None = None,
) -> dict[str, Any]:
    old_home = os.environ.get("HOME")
    if home is not None:
        os.environ["HOME"] = str(home)

    started_at = time.monotonic()
    events: list[dict[str, Any]] = []
    assistant_last_at: float | None = None
    finished_at: float | None = None

    def mark(event: str, **fields: Any) -> None:
        events.append({"t": round(time.monotonic() - started_at, 3), "event": event, **fields})

    try:
        with Agent.create(
            model=model,
            api_key=os.environ["CURSOR_API_KEY"],
            local=LocalAgentOptions(cwd=str(workspace), setting_sources=setting_sources),
        ) as agent:
            mark("agent_ready", agent_id=public_id(agent))
            run = agent.send("Do not use tools. Reply exactly: OK")
            mark("send_returned", run_id=public_id(run))

            stream_done = threading.Event()
            stream_error: list[str] = []
            last_message_at = time.monotonic()

            def consume_messages() -> None:
                nonlocal assistant_last_at, finished_at, last_message_at
                try:
                    for index, message in enumerate(run.messages()):
                        now = time.monotonic()
                        last_message_at = now
                        message_type = str(getattr(message, "type", type(message).__name__))
                        status = str(getattr(message, "status", "") or "")
                        if message_type == "assistant":
                            assistant_last_at = now
                        if message_type == "status" and status == "FINISHED":
                            finished_at = now
                        mark("message", index=index, message_type=message_type, status=status)
                    mark("messages_completed")
                except BaseException as exc:
                    stream_error.append(f"{type(exc).__name__}: {exc}")
                    mark("messages_error", error=stream_error[-1])
                finally:
                    stream_done.set()

            stream_thread = threading.Thread(target=consume_messages, daemon=True)
            stream_thread.start()
            while not stream_done.wait(0.05):
                if time.monotonic() - last_message_at < stream_idle_timeout:
                    continue
                mark("messages_hung_after_idle", idle_seconds=round(time.monotonic() - last_message_at, 3))
                cancel_run(run, mark)
                stream_done.wait(0.5)
                break

            wait_box: dict[str, Any] = {}

            def wait_for_result() -> None:
                try:
                    result = run.wait()
                    wait_box["result"] = {
                        "status": getattr(result, "status", ""),
                        "id": public_id(result),
                        "result_len": len(getattr(result, "result", "") or ""),
                    }
                except BaseException as exc:
                    wait_box["error"] = f"{type(exc).__name__}: {exc}"

            wait_thread = threading.Thread(target=wait_for_result, daemon=True)
            wait_thread.start()
            wait_thread.join(wait_timeout)
            if wait_box:
                mark("wait_returned", result=wait_box)
            else:
                mark("wait_hung_after_messages_completed", timeout_seconds=wait_timeout)
                cancel_run(run, mark)
    finally:
        if old_home is None:
            os.environ.pop("HOME", None)
        else:
            os.environ["HOME"] = old_home

    assistant_to_finished = None
    if assistant_last_at is not None and finished_at is not None:
        assistant_to_finished = round(finished_at - assistant_last_at, 3)

    has_stop_hook = (workspace / ".cursor" / "hooks.json").exists()
    delayed = bool(
        has_stop_hook
        and assistant_to_finished is not None
        and assistant_to_finished >= delay_threshold
    )
    return {
        "name": name,
        "setting_sources": setting_sources,
        "has_stop_hook": has_stop_hook,
        "assistant_to_finished_seconds": assistant_to_finished,
        "delayed_by_stop_hook": delayed,
        "events": events,
    }


def write_skill(path: Path, name: str, sentinel: str) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(
        "\n".join(
            [
                "---",
                f"name: {name}",
                f"description: Use this skill only when the user mentions {sentinel}.",
                "---",
                "",
                f"When this skill is active, mention {sentinel}.",
                "",
            ]
        ),
        encoding="utf-8",
    )


def write_stop_hook(workspace: Path, *, sleep_seconds: float) -> None:
    hook_dir = workspace / ".cursor" / "hooks"
    hook_dir.mkdir(parents=True, exist_ok=True)
    script = hook_dir / "sleep-stop.sh"
    script.write_text(
        "\n".join(
            [
                "#!/usr/bin/env bash",
                f"sleep {sleep_seconds}",
                "echo '{}'",
                "",
            ]
        ),
        encoding="utf-8",
    )
    script.chmod(script.stat().st_mode | stat.S_IXUSR)
    (workspace / ".cursor" / "hooks.json").write_text(
        json.dumps(
            {
                "version": 1,
                "hooks": {
                    "stop": [
                        {
                            "command": ".cursor/hooks/sleep-stop.sh",
                            "timeout": max(int(sleep_seconds) + 15, 30),
                            "failClosed": False,
                        }
                    ]
                },
            },
            indent=2,
        ),
        encoding="utf-8",
    )


def git_init(path: Path) -> None:
    subprocess.run(["git", "init", "-q", str(path)], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)


def public_id(value: Any) -> str:
    return str(getattr(value, "id", "") or getattr(value, "agent_id", "") or "")


def cancel_run(run: Any, mark) -> None:
    cancel = getattr(run, "cancel", None)
    if not callable(cancel):
        mark("cancel_unavailable")
        return
    try:
        cancel()
        mark("cancel_called")
    except BaseException as exc:
        mark("cancel_error", error=f"{type(exc).__name__}: {exc}")


if __name__ == "__main__":
    raise SystemExit(main())

Representative Output

{
  "cases": [
    {
      "name": "no_hook_project",
      "setting_sources": ["project"],
      "assistant_to_finished_seconds": 0.1,
      "delayed_by_stop_hook": false
    },
    {
      "name": "stop_hook_project",
      "setting_sources": ["project"],
      "assistant_to_finished_seconds": 30.1,
      "delayed_by_stop_hook": true
    },
    {
      "name": "stop_hook_project_user",
      "setting_sources": ["project", "user"],
      "assistant_to_finished_seconds": 60.0,
      "delayed_by_stop_hook": true,
      "events": [
        {"event": "message", "message_type": "assistant"},
        {"event": "messages_hung_after_idle", "idle_seconds": 60.0},
        {"event": "message", "message_type": "status", "status": "FINISHED"},
        {"event": "cancel_called"}
      ]
    }
  ]
}

The exact timings vary slightly. The important signal is that terminal
FINISHED is delayed by approximately the hook’s sleep duration for the
project-only case, and can require cancellation in the project+user case.

Expected Behavior

The SDK should match normal Cursor behavior:

  1. stop hooks should not block terminal run completion for the SDK stream.
  2. run.messages() should emit the terminal FINISHED status promptly after
    the assistant response is complete.
  3. run.wait() should return promptly after the assistant response is complete.
  4. Stop hook execution should be observable separately if needed, but should not
    make the run look active or hung.

Operating System

Linux

Version Information

  • Python cursor-sdk==0.1.5
  • TypeScript @cursor/sdk==1.0.15

Does this stop you from using Cursor

Yes - Cursor is unusable

Confirmed - your root cause analysis is accurate. The SDK’s onTurnCompleted awaits the stop hook inline, blocking FINISHED from being emitted until the hook completes. The IDE invokes the same hook as fire-and-forget, so it’s never blocked. We’re filing this with the SDK team.

As a workaround until this is fixed: you could restructure long-running hook logic to spawn an asynchronous background process (or write state to a file/socket and return immediately), so the stop hook script itself exits quickly. Not ideal if you need the followup_message return value, but it would unblock the stream for non-looping hooks.

Your project + user case where it hangs until cancellation (60s idle timeout hit) is consistent with the hook running per setting source. We’ll include that in the ticket.