Cursor Python SDK `run.wait()` hangs after partial `run.messages()` consumption

Where does the bug appear (feature/product)?

Cursor SDK

Describe the Bug

For local Python SDK agents, run.wait() can hang if the caller has already
started consuming run.messages() and then stops before the message iterator
reaches its terminal status.

The stable repro is fully synthetic and uses only project-level settings:

  • Agent.create(...) returns normally.
  • agent.send(...) returns a run id normally.
  • run.messages() yields status/thinking/assistant events.
  • The caller stops reading run.messages() after a fixed number of events.
  • run.wait() does not return within the timeout.
  • run.cancel() releases the local run handle.

This matters for streaming UI integrations because a UI may consume the live
message stream independently from terminal result handling. The Python SDK docs
say run.wait() returns the terminal RunResult; they do not indicate that
wait() can hang if a previous messages() iterator was only partially
consumed.

During app debugging we also saw related terminal-completion symptoms with
setting_sources=["project", "user"]: assistant text streamed and user skills
were visible, but no terminal completion arrived promptly. That case appears
more environment/prompt dependent, so the included one-command repro focuses on
the smaller deterministic messages() / wait() interaction.

Steps to Reproduce

The script in this folder is independent from the application that found the
bug. It imports only cursor_sdk and standard library modules.

It creates:

  • a temporary git workspace;
  • one project skill under workspace/.agents/skills/;
  • a temporary HOME;
  • synthetic user skills under $TEMP_HOME/.cursor/skills/.

It does not read or print real user settings, real project paths, or
CURSOR_API_KEY.

Run:

cd /path/to/any/python/project/with/cursor-sdk
export CURSOR_API_KEY=...
python repro_cursor_sdk_stream_hang.py

repro_cursor_sdk_stream_hang.py:

#!/usr/bin/env python3
"""Cursor Python SDK repro for terminal completion hanging with user settings.

The script creates a temporary workspace and a temporary HOME, writes one
project skill and one user skill, then compares:

1. local.setting_sources=["project"]
2. local.setting_sources=["project", "user"]

It intentionally does not import the application under test and does not read
or print any real user settings. The only required external input is
CURSOR_API_KEY.
"""

from __future__ import annotations

import argparse
import json
import os
import subprocess
import sys
import tempfile
import threading
import time
from pathlib import Path
from typing import Any

from cursor_sdk import Agent, LocalAgentOptions


PROJECT_SKILL = "sdk-repro-project-skill"
USER_SKILL = "sdk-repro-user-skill"
PROJECT_SENTINEL = "SDK_REPRO_PROJECT_SKILL_SENTINEL"
USER_SENTINEL = "SDK_REPRO_USER_SKILL_SENTINEL"


def main() -> int:
    parser = argparse.ArgumentParser(
        description="Reproduce local Cursor Python SDK terminal completion hanging when user settings are loaded."
    )
    parser.add_argument("--model", default="default", help="Cursor model id to use.")
    parser.add_argument(
        "--cwd",
        default="",
        help=(
            "Optional existing workspace to use instead of the synthetic workspace. "
            "Use with --use-current-home-redacted when reproducing against a private repo."
        ),
    )
    parser.add_argument(
        "--stream-idle-timeout",
        type=float,
        default=5.0,
        help="Seconds after the last SDK message before declaring run.messages() hung.",
    )
    parser.add_argument(
        "--wait-timeout",
        type=float,
        default=5.0,
        help="Seconds to wait for run.wait() after run.messages() completes.",
    )
    parser.add_argument(
        "--keep-temp",
        action="store_true",
        help="Keep the temporary fixture on disk and print its path.",
    )
    parser.add_argument(
        "--user-skill-count",
        type=int,
        default=40,
        help="Number of synthetic user-level skills to create under the temporary HOME.",
    )
    parser.add_argument(
        "--use-current-home-redacted",
        action="store_true",
        help=(
            "Use the caller's current HOME for the project_plus_user case. "
            "Output redacts assistant text and does not print user skill names or paths."
        ),
    )
    parser.add_argument(
        "--prompt",
        default="",
        help="Optional prompt. By default the script asks about injected skill sentinels.",
    )
    parser.add_argument(
        "--partial-message-limit",
        type=int,
        default=20,
        help="For the partial_messages_then_wait case, stop reading messages after this many items.",
    )
    args = parser.parse_args()

    if not os.environ.get("CURSOR_API_KEY"):
        print("CURSOR_API_KEY is required", file=sys.stderr)
        return 2

    with tempfile.TemporaryDirectory(prefix="cursor-sdk-user-settings-hang-") as tmp:
        root = Path(tmp)
        workspace = Path(args.cwd).expanduser().resolve() if args.cwd else root / "workspace"
        fake_home = root / "home"
        workspace.mkdir(exist_ok=True)
        fake_home.mkdir()
        if not args.cwd:
            _git_init(workspace)
            _write_skill(workspace / ".agents" / "skills" / PROJECT_SKILL / "SKILL.md", PROJECT_SKILL, PROJECT_SENTINEL)
        for index in range(args.user_skill_count):
            user_skill_name = f"{USER_SKILL}-{index:03d}"
            _write_skill(fake_home / ".cursor" / "skills" / user_skill_name / "SKILL.md", user_skill_name, USER_SENTINEL)

        original_home = os.environ.get("HOME", "")
        old_home = os.environ.get("HOME")
        os.environ["HOME"] = str(fake_home)
        try:
            cases = [
                run_partial_messages_then_wait_case(
                    workspace=workspace,
                    model=args.model,
                    wait_timeout=args.wait_timeout,
                    message_limit=args.partial_message_limit,
                ),
                run_case(
                    name="project_only",
                    workspace=workspace,
                    setting_sources=["project"],
                    model=args.model,
                    stream_idle_timeout=args.stream_idle_timeout,
                    wait_timeout=args.wait_timeout,
                    redact_assistant=False,
                    prompt=args.prompt,
                )
            ]
            if args.use_current_home_redacted:
                if original_home:
                    os.environ["HOME"] = original_home
                cases.append(
                    run_case(
                        name="project_plus_current_user_redacted",
                        workspace=workspace,
                        setting_sources=["project", "user"],
                        model=args.model,
                        stream_idle_timeout=args.stream_idle_timeout,
                        wait_timeout=args.wait_timeout,
                        redact_assistant=True,
                        prompt=args.prompt,
                    )
                )
                os.environ["HOME"] = str(fake_home)
            else:
                cases.append(
                    run_case(
                        name="project_plus_synthetic_user",
                        workspace=workspace,
                        setting_sources=["project", "user"],
                        model=args.model,
                        stream_idle_timeout=args.stream_idle_timeout,
                        wait_timeout=args.wait_timeout,
                        redact_assistant=False,
                        prompt=args.prompt,
                    )
                )
            report = {
                "fixture": {
                    "workspace": "$EXISTING_CWD" if args.cwd else str(workspace),
                    "uses_existing_cwd": bool(args.cwd),
                    "home": str(fake_home) if args.keep_temp else "$TEMP_HOME",
                    "project_skill": PROJECT_SKILL,
                    "user_skill_prefix": USER_SKILL,
                    "user_skill_count": args.user_skill_count,
                    "used_current_home_redacted": args.use_current_home_redacted,
                },
                "cases": cases,
            }
        finally:
            if old_home is None:
                os.environ.pop("HOME", None)
            else:
                os.environ["HOME"] = old_home

        print(json.dumps(report, ensure_ascii=False, indent=2))
        if args.keep_temp:
            print(f"Temporary fixture kept at: {root}", file=sys.stderr)
            input("Press Enter after inspecting the fixture to remove it...")

    return 1 if any(case["hung"] for case in report["cases"]) else 0


def run_case(
    *,
    name: str,
    workspace: Path,
    setting_sources: list[str],
    model: str,
    stream_idle_timeout: float,
    wait_timeout: float,
    redact_assistant: bool,
    prompt: str,
) -> dict[str, Any]:
    started_at = time.monotonic()
    observations: list[dict[str, Any]] = []
    assistant_text: list[str] = []

    def mark(event: str, **fields: Any) -> None:
        observations.append({"t": round(time.monotonic() - started_at, 3), "event": event, **fields})

    prompt = prompt or (
        "Do not read files. Answer only from injected workspace metadata, "
        "available_skills, and loaded rules. Report the Workspace Path and say "
        f"whether {PROJECT_SENTINEL} and {USER_SENTINEL} are visible. "
        "If user skills are visible, list the first 20 user skill names."
    )

    with Agent.create(
        model=model,
        api_key=os.environ["CURSOR_API_KEY"],
        local=LocalAgentOptions(cwd=str(workspace), setting_sources=setting_sources),
    ) as agent:
        mark("agent_ready", agent_id=_public_id(agent))
        run = agent.send(prompt)
        mark("send_returned", run_id=_public_id(run))

        stream_done = threading.Event()
        stream_error: list[str] = []
        last_stream_at = time.monotonic()

        def consume_messages() -> None:
            nonlocal last_stream_at
            try:
                for index, message in enumerate(run.messages()):
                    last_stream_at = time.monotonic()
                    message_type = getattr(message, "type", type(message).__name__)
                    text = message_text(message)
                    if message_type == "assistant" and text:
                        assistant_text.append(text)
                    mark(
                        "message",
                        index=index,
                        message_type=message_type,
                        summary=redacted_summary(message_type, text) if redact_assistant else text[:160] if text else summarize_message(message),
                    )
                mark("messages_completed")
            except BaseException as exc:
                stream_error.append(f"{type(exc).__name__}: {exc}")
                mark("messages_error", error=stream_error[-1])
            finally:
                stream_done.set()

        stream_thread = threading.Thread(target=consume_messages, daemon=True)
        stream_thread.start()
        while not stream_done.wait(0.05):
            if time.monotonic() - last_stream_at < stream_idle_timeout:
                continue
            mark("messages_hung_after_idle", idle_seconds=round(time.monotonic() - last_stream_at, 3))
            cancel_run(run, mark)
            stream_done.wait(0.5)
            break

        wait_done = threading.Event()
        wait_result: dict[str, Any] = {}

        def wait_for_result() -> None:
            try:
                result = run.wait()
                wait_result["result"] = {
                    "status": getattr(result, "status", ""),
                    "id": _public_id(result),
                    "result_len": len(getattr(result, "result", "") or ""),
                }
                mark("wait_returned", result=wait_result["result"])
            except BaseException as exc:
                wait_result["error"] = f"{type(exc).__name__}: {exc}"
                mark("wait_error", error=wait_result["error"])
            finally:
                wait_done.set()

        wait_thread = threading.Thread(target=wait_for_result, daemon=True)
        wait_thread.start()
        if not wait_done.wait(wait_timeout):
            mark("wait_hung", timeout_seconds=wait_timeout)
            cancel_run(run, mark)
            wait_done.wait(0.5)

    text = "".join(assistant_text)
    return {
        "name": name,
        "setting_sources": setting_sources,
        "hung": any(o["event"] in {"messages_hung_after_idle", "wait_hung"} for o in observations),
        "assistant_mentions_project_sentinel": PROJECT_SENTINEL in text,
        "assistant_mentions_user_sentinel": USER_SENTINEL in text,
        "assistant_excerpt": "[redacted]" if redact_assistant else text[:1000],
        "observations": observations,
    }


def run_partial_messages_then_wait_case(
    *,
    workspace: Path,
    model: str,
    wait_timeout: float,
    message_limit: int,
) -> dict[str, Any]:
    started_at = time.monotonic()
    observations: list[dict[str, Any]] = []

    def mark(event: str, **fields: Any) -> None:
        observations.append({"t": round(time.monotonic() - started_at, 3), "event": event, **fields})

    prompt = "Do not use tools. Write 300 words about SDK_REPRO_PARTIAL_MESSAGES_WAIT_SENTINEL."
    with Agent.create(
        model=model,
        api_key=os.environ["CURSOR_API_KEY"],
        local=LocalAgentOptions(cwd=str(workspace), setting_sources=["project"]),
    ) as agent:
        mark("agent_ready", agent_id=_public_id(agent))
        run = agent.send(prompt)
        mark("send_returned", run_id=_public_id(run))
        for index, message in enumerate(run.messages()):
            mark("message", index=index, message_type=getattr(message, "type", type(message).__name__))
            if index >= message_limit:
                mark("messages_stopped_early", index=index)
                break

        wait_done = threading.Event()
        wait_result: dict[str, Any] = {}

        def wait_for_result() -> None:
            try:
                result = run.wait()
                wait_result["result"] = {
                    "status": getattr(result, "status", ""),
                    "id": _public_id(result),
                    "result_len": len(getattr(result, "result", "") or ""),
                }
                mark("wait_returned", result=wait_result["result"])
            except BaseException as exc:
                wait_result["error"] = f"{type(exc).__name__}: {exc}"
                mark("wait_error", error=wait_result["error"])
            finally:
                wait_done.set()

        wait_thread = threading.Thread(target=wait_for_result, daemon=True)
        wait_thread.start()
        if not wait_done.wait(wait_timeout):
            mark("wait_hung", timeout_seconds=wait_timeout)
            cancel_run(run, mark)
            wait_done.wait(0.5)

    return {
        "name": "partial_messages_then_wait",
        "setting_sources": ["project"],
        "hung": any(o["event"] == "wait_hung" for o in observations),
        "observations": observations,
    }


def _write_skill(path: Path, name: str, sentinel: str) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(
        "\n".join(
            [
                "---",
                f"name: {name}",
                f"description: Use this skill only when the user mentions {sentinel}.",
                "---",
                "",
                f"When this skill is active, mention {sentinel}.",
                "",
            ]
        ),
        encoding="utf-8",
    )


def _git_init(path: Path) -> None:
    subprocess.run(["git", "init", "-q", str(path)], check=False, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)


def message_text(message: Any) -> str:
    if getattr(message, "type", "") in {"thinking", "reasoning"}:
        inner = getattr(message, "message", message)
        return str(getattr(inner, "text", "") or getattr(inner, "content", "") or "")
    content = getattr(getattr(message, "message", message), "content", None)
    if content is None:
        return str(getattr(message, "text", "") or "")
    return "".join(str(getattr(block, "text", "") or "") for block in content)


def summarize_message(message: Any) -> str:
    return repr(message)[:200]


def redacted_summary(message_type: str, text: str) -> str:
    if message_type == "assistant":
        return f"[assistant chunk redacted; chars={len(text)}]"
    if message_type in {"thinking", "reasoning"}:
        return f"[{message_type} chunk redacted; chars={len(text)}]"
    return f"[{message_type} message redacted]"


def _public_id(value: Any) -> str:
    return str(getattr(value, "id", "") or getattr(value, "agent_id", "") or "")


def cancel_run(run: Any, mark) -> None:
    cancel = getattr(run, "cancel", None)
    if not callable(cancel):
        mark("cancel_unavailable")
        return
    try:
        cancel()
        mark("cancel_called")
    except BaseException as exc:
        mark("cancel_error", error=f"{type(exc).__name__}: {exc}")


if __name__ == "__main__":
    raise SystemExit(main())

The script runs two cases:

  • partial_messages_then_wait: consumes only part of run.messages(), then
    calls run.wait(). This is the deterministic failing case.
  • project_only: LocalAgentOptions(..., setting_sources=["project"])
  • project_plus_synthetic_user:
    LocalAgentOptions(..., setting_sources=["project", "user"])

It exits with status 1 if either case emits messages_hung_after_idle or
wait_hung.

Expected Behavior

For a local run:

  1. If a caller stops consuming run.messages() before the stream ends,
    run.wait() should drain remaining run events and return the terminal
    RunResult; or
  2. the SDK documentation should state that run.messages() must be fully
    exhausted before run.wait() can be called safely.

Operating System

Linux

Version Information

  • Python cursor-sdk==0.1.5
  • TypeScript @cursor/sdk==1.0.15

Does this stop you from using Cursor

Yes - Cursor is unusable

Confirmed. When messages() is partially consumed, wait() tries to drain the remaining events from the same underlying stream before checking for the terminal result, which causes the indefinite hang you observed. The async variant has the same issue; the TypeScript SDK is unaffected.

Workaround: Either fully exhaust run.messages() before calling wait(), or skip messages() and call wait() directly. If you need partial streaming, run.cancel() after stopping messages() will unblock wait().

We’re tracking this with our SDK team. Thanks a lot for the neat bug report, really made it easy for us!