Where does the bug appear (feature/product)?
Cursor SDK
Describe the Bug
In normal Cursor usage, stop hooks are non-blocking relative to the visible
agent response: the assistant can finish and the session can reach terminal UI
state while the stop hook runs in the background.
In the local Cursor Python SDK, a project-level stop hook appears to block the
run’s terminal status:
- assistant text streams normally;
- then no
FINISHEDstatus is emitted until thestophook exits; run.messages()does not complete until the hook exits;run.wait()does not return until the hook exits.
This is severe for streaming SDK UIs. A long-running stop hook makes a finished
assistant response look like a hung run. If the UI has an idle watchdog, it may
cancel and mark the local agent as needing repair even though the model already
answered.
Steps to Reproduce
One-Command Repro
The repro script is self-contained. It creates temporary workspaces and a
temporary HOME, writes a project-level stop hook, and runs three cases:
no_hook_projectstop_hook_projectstop_hook_project_user
It does not read real user settings, real project files, or print
CURSOR_API_KEY.
from any environment with cursor-sdk installed:
export CURSOR_API_KEY=...
python repro_stop_hook_blocks_terminal.py
The script exits with status 1 when the stop-hook cases delay terminal
completion by more than the configured threshold.
repro_stop_hook_blocks_terminal.py:
#!/usr/bin/env python3
"""Reproduce Cursor Python SDK stop hooks blocking terminal completion.
The script creates temporary workspaces and a temporary HOME, writes a project
stop hook that sleeps, and compares terminal timing with and without the hook.
It imports only cursor_sdk and standard library modules.
"""
from __future__ import annotations
import argparse
import json
import os
import stat
import subprocess
import sys
import tempfile
import threading
import time
from pathlib import Path
from typing import Any
from cursor_sdk import Agent, LocalAgentOptions
PROJECT_SKILL = "sdk-stop-hook-repro-project-skill"
USER_SKILL = "sdk-stop-hook-repro-user-skill"
def main() -> int:
parser = argparse.ArgumentParser(
description="Reproduce local Cursor Python SDK stop hooks delaying terminal completion."
)
parser.add_argument("--model", default="default", help="Cursor model id.")
parser.add_argument("--hook-sleep", type=float, default=30.0, help="Seconds for the stop hook to sleep.")
parser.add_argument(
"--delay-threshold",
type=float,
default=5.0,
help="Mark a stop-hook case as delayed if assistant->FINISHED exceeds this many seconds.",
)
parser.add_argument(
"--wait-timeout",
type=float,
default=10.0,
help="Seconds to wait for run.wait() after run.messages() completes.",
)
parser.add_argument(
"--stream-idle-timeout",
type=float,
default=60.0,
help="Seconds without SDK messages before cancelling the run as hung.",
)
parser.add_argument("--keep-temp", action="store_true", help="Keep the temporary fixture for inspection.")
args = parser.parse_args()
if not os.environ.get("CURSOR_API_KEY"):
print("CURSOR_API_KEY is required", file=sys.stderr)
return 2
with tempfile.TemporaryDirectory(prefix="cursor-sdk-stop-hook-blocks-terminal-") as tmp:
root = Path(tmp)
no_hook_workspace = root / "no-hook-workspace"
stop_hook_workspace = root / "stop-hook-workspace"
fake_home = root / "home"
for workspace in (no_hook_workspace, stop_hook_workspace):
workspace.mkdir(parents=True)
git_init(workspace)
write_skill(
workspace / ".agents" / "skills" / PROJECT_SKILL / "SKILL.md",
PROJECT_SKILL,
"SDK_STOP_HOOK_REPRO_PROJECT_SKILL_SENTINEL",
)
fake_home.mkdir(parents=True)
write_skill(
fake_home / ".cursor" / "skills" / USER_SKILL / "SKILL.md",
USER_SKILL,
"SDK_STOP_HOOK_REPRO_USER_SKILL_SENTINEL",
)
write_stop_hook(stop_hook_workspace, sleep_seconds=args.hook_sleep)
results = [
run_case(
name="no_hook_project",
workspace=no_hook_workspace,
setting_sources=["project"],
model=args.model,
wait_timeout=args.wait_timeout,
stream_idle_timeout=args.stream_idle_timeout,
delay_threshold=args.delay_threshold,
),
run_case(
name="stop_hook_project",
workspace=stop_hook_workspace,
setting_sources=["project"],
model=args.model,
wait_timeout=args.wait_timeout,
stream_idle_timeout=args.stream_idle_timeout,
delay_threshold=args.delay_threshold,
),
run_case(
name="stop_hook_project_user",
workspace=stop_hook_workspace,
setting_sources=["project", "user"],
model=args.model,
wait_timeout=args.wait_timeout,
stream_idle_timeout=args.stream_idle_timeout,
delay_threshold=args.delay_threshold,
home=fake_home,
),
]
report = {
"fixture": {
"workspace": "$TEMP_WORKSPACE",
"home": "$TEMP_HOME",
"hook_sleep_seconds": args.hook_sleep,
"delay_threshold_seconds": args.delay_threshold,
},
"cases": results,
}
print(json.dumps(report, ensure_ascii=False, indent=2))
if args.keep_temp:
print(f"Temporary fixture kept at: {root}", file=sys.stderr)
input("Press Enter after inspecting the fixture to remove it...")
return 1 if any(case["delayed_by_stop_hook"] for case in results if case["has_stop_hook"]) else 0
def run_case(
*,
name: str,
workspace: Path,
setting_sources: list[str],
model: str,
wait_timeout: float,
stream_idle_timeout: float,
delay_threshold: float,
home: Path | None = None,
) -> dict[str, Any]:
old_home = os.environ.get("HOME")
if home is not None:
os.environ["HOME"] = str(home)
started_at = time.monotonic()
events: list[dict[str, Any]] = []
assistant_last_at: float | None = None
finished_at: float | None = None
def mark(event: str, **fields: Any) -> None:
events.append({"t": round(time.monotonic() - started_at, 3), "event": event, **fields})
try:
with Agent.create(
model=model,
api_key=os.environ["CURSOR_API_KEY"],
local=LocalAgentOptions(cwd=str(workspace), setting_sources=setting_sources),
) as agent:
mark("agent_ready", agent_id=public_id(agent))
run = agent.send("Do not use tools. Reply exactly: OK")
mark("send_returned", run_id=public_id(run))
stream_done = threading.Event()
stream_error: list[str] = []
last_message_at = time.monotonic()
def consume_messages() -> None:
nonlocal assistant_last_at, finished_at, last_message_at
try:
for index, message in enumerate(run.messages()):
now = time.monotonic()
last_message_at = now
message_type = str(getattr(message, "type", type(message).__name__))
status = str(getattr(message, "status", "") or "")
if message_type == "assistant":
assistant_last_at = now
if message_type == "status" and status == "FINISHED":
finished_at = now
mark("message", index=index, message_type=message_type, status=status)
mark("messages_completed")
except BaseException as exc:
stream_error.append(f"{type(exc).__name__}: {exc}")
mark("messages_error", error=stream_error[-1])
finally:
stream_done.set()
stream_thread = threading.Thread(target=consume_messages, daemon=True)
stream_thread.start()
while not stream_done.wait(0.05):
if time.monotonic() - last_message_at < stream_idle_timeout:
continue
mark("messages_hung_after_idle", idle_seconds=round(time.monotonic() - last_message_at, 3))
cancel_run(run, mark)
stream_done.wait(0.5)
break
wait_box: dict[str, Any] = {}
def wait_for_result() -> None:
try:
result = run.wait()
wait_box["result"] = {
"status": getattr(result, "status", ""),
"id": public_id(result),
"result_len": len(getattr(result, "result", "") or ""),
}
except BaseException as exc:
wait_box["error"] = f"{type(exc).__name__}: {exc}"
wait_thread = threading.Thread(target=wait_for_result, daemon=True)
wait_thread.start()
wait_thread.join(wait_timeout)
if wait_box:
mark("wait_returned", result=wait_box)
else:
mark("wait_hung_after_messages_completed", timeout_seconds=wait_timeout)
cancel_run(run, mark)
finally:
if old_home is None:
os.environ.pop("HOME", None)
else:
os.environ["HOME"] = old_home
assistant_to_finished = None
if assistant_last_at is not None and finished_at is not None:
assistant_to_finished = round(finished_at - assistant_last_at, 3)
has_stop_hook = (workspace / ".cursor" / "hooks.json").exists()
delayed = bool(
has_stop_hook
and assistant_to_finished is not None
and assistant_to_finished >= delay_threshold
)
return {
"name": name,
"setting_sources": setting_sources,
"has_stop_hook": has_stop_hook,
"assistant_to_finished_seconds": assistant_to_finished,
"delayed_by_stop_hook": delayed,
"events": events,
}
def write_skill(path: Path, name: str, sentinel: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(
"\n".join(
[
"---",
f"name: {name}",
f"description: Use this skill only when the user mentions {sentinel}.",
"---",
"",
f"When this skill is active, mention {sentinel}.",
"",
]
),
encoding="utf-8",
)
def write_stop_hook(workspace: Path, *, sleep_seconds: float) -> None:
hook_dir = workspace / ".cursor" / "hooks"
hook_dir.mkdir(parents=True, exist_ok=True)
script = hook_dir / "sleep-stop.sh"
script.write_text(
"\n".join(
[
"#!/usr/bin/env bash",
f"sleep {sleep_seconds}",
"echo '{}'",
"",
]
),
encoding="utf-8",
)
script.chmod(script.stat().st_mode | stat.S_IXUSR)
(workspace / ".cursor" / "hooks.json").write_text(
json.dumps(
{
"version": 1,
"hooks": {
"stop": [
{
"command": ".cursor/hooks/sleep-stop.sh",
"timeout": max(int(sleep_seconds) + 15, 30),
"failClosed": False,
}
]
},
},
indent=2,
),
encoding="utf-8",
)
def git_init(path: Path) -> None:
subprocess.run(["git", "init", "-q", str(path)], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def public_id(value: Any) -> str:
return str(getattr(value, "id", "") or getattr(value, "agent_id", "") or "")
def cancel_run(run: Any, mark) -> None:
cancel = getattr(run, "cancel", None)
if not callable(cancel):
mark("cancel_unavailable")
return
try:
cancel()
mark("cancel_called")
except BaseException as exc:
mark("cancel_error", error=f"{type(exc).__name__}: {exc}")
if __name__ == "__main__":
raise SystemExit(main())
Representative Output
{
"cases": [
{
"name": "no_hook_project",
"setting_sources": ["project"],
"assistant_to_finished_seconds": 0.1,
"delayed_by_stop_hook": false
},
{
"name": "stop_hook_project",
"setting_sources": ["project"],
"assistant_to_finished_seconds": 30.1,
"delayed_by_stop_hook": true
},
{
"name": "stop_hook_project_user",
"setting_sources": ["project", "user"],
"assistant_to_finished_seconds": 60.0,
"delayed_by_stop_hook": true,
"events": [
{"event": "message", "message_type": "assistant"},
{"event": "messages_hung_after_idle", "idle_seconds": 60.0},
{"event": "message", "message_type": "status", "status": "FINISHED"},
{"event": "cancel_called"}
]
}
]
}
The exact timings vary slightly. The important signal is that terminal
FINISHED is delayed by approximately the hook’s sleep duration for the
project-only case, and can require cancellation in the project+user case.
Expected Behavior
The SDK should match normal Cursor behavior:
stophooks should not block terminal run completion for the SDK stream.run.messages()should emit the terminalFINISHEDstatus promptly after
the assistant response is complete.run.wait()should return promptly after the assistant response is complete.- Stop hook execution should be observable separately if needed, but should not
make the run look active or hung.
Operating System
Linux
Version Information
- Python
cursor-sdk==0.1.5 - TypeScript
@cursor/sdk==1.0.15
Does this stop you from using Cursor
Yes - Cursor is unusable