493 lines
17 KiB
GDScript
493 lines
17 KiB
GDScript
@tool
|
|
class_name McpConnection
|
|
extends Node
|
|
|
|
## WebSocket transport to the Godot AI Python server.
|
|
## Only handles connect, reconnect, send, and receive.
|
|
## Command dispatch is owned by McpDispatcher.
|
|
|
|
const RECONNECT_DELAYS: Array[float] = [1.0, 2.0, 4.0, 8.0, 16.0, 30.0, 60.0]
|
|
const RECONNECT_VERBOSE_ATTEMPTS := 5
|
|
const RECONNECT_LOG_EVERY_N_ATTEMPTS := 10
|
|
## Backpressure policy: do not queue responses once the WebSocket's current
|
|
## outbound buffer plus the next payload would exceed this cap. Command
|
|
## responses get a compact structured error when that can still be sent;
|
|
## state events report failure so their callers can retry on a later tick.
|
|
const OUTBOUND_BUFFER_LIMIT_BYTES := 4 * 1024 * 1024
|
|
## Cap the inbound packet drain per `_process` tick. A flooding peer or a
|
|
## fast batch could otherwise saturate `_handle_message` in one frame and
|
|
## blow the documented 4ms budget. Packets beyond this cap spill to the
|
|
## next frame; the cumulative spill counter is logged so flood patterns
|
|
## are observable in `logs_read`. See audit-v2 finding #12 (issue #356).
|
|
const PACKET_DRAIN_CAP_PER_TICK := 32
|
|
const ClientConfigurator := preload("res://addons/godot_ai/client_configurator.gd")
|
|
const ErrorCodes := preload("res://addons/godot_ai/utils/error_codes.gd")
|
|
|
|
## Emitted whenever the underlying WebSocket open/closed state flips.
|
|
## Subscribers (e.g. the plugin-side telemetry helper) use this to drain
|
|
## events that were enqueued before the socket was ready. Emitted with
|
|
## ``true`` on first OPEN per connect, ``false`` on transition to CLOSED
|
|
## (including ``disconnect_from_server()``).
|
|
signal connection_state_changed(is_open: bool)
|
|
|
|
var _peer := WebSocketPeer.new()
|
|
## Set by plugin.gd after resolving the configured WebSocket port once for the
|
|
## server spawn. Reconnects reuse this cached value so they keep dialing the
|
|
## same port the Python server was asked to bind.
|
|
var ws_port := ClientConfigurator.DEFAULT_WS_PORT
|
|
var _url := ""
|
|
var _connected := false
|
|
var _reconnect_attempt := 0
|
|
var _reconnect_timer := 0.0
|
|
var _session_id := ""
|
|
## Godot-AI Python package version reported by the server in its `handshake_ack`
|
|
## reply. Empty until the ack lands. Older servers (pre-handshake_ack) leave
|
|
## this empty forever — callers that gate on it (the dock's mismatch banner)
|
|
## must treat empty as "unknown, don't raise a false alarm".
|
|
var server_version := ""
|
|
|
|
var dispatcher
|
|
var log_buffer
|
|
## Set by plugin.gd when the HTTP port is occupied by an incompatible or
|
|
## unverified server. Keeping the Connection node alive lets handlers and the
|
|
## dock share one object, but no WebSocket is opened to the wrong server.
|
|
var connect_blocked := false
|
|
var connect_block_reason := ""
|
|
var _blocked_notice_logged := false
|
|
## Compatibility property used by existing handlers. Setting true increments
|
|
## the pause depth; setting false decrements it. Processing stays paused until
|
|
## every nested pause has resumed.
|
|
var pause_processing: bool:
|
|
get: return _pause_depth > 0
|
|
set(value):
|
|
if value:
|
|
pause()
|
|
else:
|
|
resume()
|
|
var _pause_depth := 0
|
|
## Cumulative count of inbound packets that didn't fit in their tick's drain
|
|
## budget and got deferred to a subsequent tick. Reset on disconnect so each
|
|
## connection starts with a clean spillover history. Logged whenever new
|
|
## spillover occurs so flood patterns surface in `logs_read`.
|
|
var _packet_spillover_total := 0
|
|
|
|
|
|
func _ready() -> void:
|
|
_session_id = _make_session_id(ProjectSettings.globalize_path("res://"))
|
|
## Increase outbound buffer for large messages (e.g. screenshot base64).
|
|
## Default is 64 KB; screenshots can be several MB.
|
|
_peer.outbound_buffer_size = OUTBOUND_BUFFER_LIMIT_BYTES
|
|
if connect_blocked:
|
|
_log_blocked_notice_once()
|
|
set_process(false)
|
|
return
|
|
_connect_to_server()
|
|
_hook_editor_signals()
|
|
|
|
|
|
func _process(delta: float) -> void:
|
|
if pause_processing:
|
|
return
|
|
_peer.poll()
|
|
|
|
match _peer.get_ready_state():
|
|
WebSocketPeer.STATE_OPEN:
|
|
if not _connected:
|
|
_connected = true
|
|
_reconnect_attempt = 0
|
|
log_buffer.log("connected to server")
|
|
_send_handshake()
|
|
connection_state_changed.emit(true)
|
|
|
|
_drain_inbound_packets(_peer)
|
|
|
|
_check_state_changes()
|
|
|
|
if dispatcher:
|
|
for response in dispatcher.tick():
|
|
_send_json(response)
|
|
|
|
WebSocketPeer.STATE_CLOSED:
|
|
if _connected:
|
|
_connected = false
|
|
_clear_on_disconnect()
|
|
var code := _peer.get_close_code()
|
|
log_buffer.log("disconnected (code %d)" % code)
|
|
connection_state_changed.emit(false)
|
|
_reconnect_timer -= delta
|
|
if _reconnect_timer <= 0.0:
|
|
_attempt_reconnect()
|
|
|
|
WebSocketPeer.STATE_CLOSING:
|
|
pass
|
|
WebSocketPeer.STATE_CONNECTING:
|
|
pass
|
|
|
|
|
|
## Drain up to PACKET_DRAIN_CAP_PER_TICK inbound packets and dispatch each
|
|
## via `_handle_message`. Anything past the cap stays in the peer's queue
|
|
## and gets picked up next tick. The cumulative spillover count is logged
|
|
## (via `log_buffer`) only when the cap was actually hit AND packets remain
|
|
## — sustained flood thus emits one log line per tick with the running
|
|
## total, while a normal-traffic frame stays silent.
|
|
##
|
|
## `peer` is untyped (Variant) so tests can inject a duck-typed fake with
|
|
## `get_available_packet_count()` + `get_packet()`. Production passes the
|
|
## real `_peer: WebSocketPeer`.
|
|
func _drain_inbound_packets(peer) -> Dictionary:
|
|
var drained := 0
|
|
while peer.get_available_packet_count() > 0 and drained < PACKET_DRAIN_CAP_PER_TICK:
|
|
var raw: String = peer.get_packet().get_string_from_utf8()
|
|
_handle_message(raw)
|
|
drained += 1
|
|
|
|
var spilled := 0
|
|
if drained >= PACKET_DRAIN_CAP_PER_TICK and peer.get_available_packet_count() > 0:
|
|
spilled = peer.get_available_packet_count()
|
|
_packet_spillover_total += spilled
|
|
if log_buffer:
|
|
log_buffer.log(
|
|
(
|
|
"[backpressure] inbound drain capped at %d/tick;"
|
|
+ " %d packets spilled to next frame (cumulative %d)"
|
|
)
|
|
% [PACKET_DRAIN_CAP_PER_TICK, spilled, _packet_spillover_total]
|
|
)
|
|
|
|
return {"drained": drained, "spilled": spilled}
|
|
|
|
|
|
var is_connected: bool:
|
|
get: return _connected
|
|
|
|
|
|
func disconnect_from_server() -> void:
|
|
if _connected:
|
|
_peer.close(1000, "Plugin unloading")
|
|
_connected = false
|
|
connection_state_changed.emit(false)
|
|
|
|
|
|
## Reset per-connection state that was filled in by the previous server
|
|
## and must NOT bleed into the next one. `force_restart_server` swaps
|
|
## servers without reloading the plugin, so without this reset the dock
|
|
## would keep showing the killed server's version until the next ack.
|
|
## Also fires on plain reconnect-loop drops — correct either way.
|
|
func _clear_on_disconnect() -> void:
|
|
server_version = ""
|
|
## Reset the spillover counter so a flood pattern from the previous
|
|
## connection doesn't pollute the next one's `logs_read` baseline.
|
|
_packet_spillover_total = 0
|
|
if dispatcher:
|
|
dispatcher.clear_deferred_responses()
|
|
|
|
|
|
## Full pre-free cleanup for plugin unload: stop _process, close the
|
|
## socket, and drop dispatcher/log_buffer refs so their Callable-held
|
|
## RefCounted handlers decref before plugin.gd clears _handlers.
|
|
## See issue #46 and plugin.gd::_exit_tree.
|
|
func teardown() -> void:
|
|
set_process(false)
|
|
disconnect_from_server()
|
|
dispatcher = null
|
|
log_buffer = null
|
|
|
|
|
|
func _connect_to_server() -> void:
|
|
_url = "ws://127.0.0.1:%d" % ws_port
|
|
var err := _peer.connect_to_url(_url)
|
|
if err != OK:
|
|
log_buffer.log("failed to initiate connection (error %d)" % err)
|
|
|
|
|
|
func _attempt_reconnect() -> void:
|
|
if connect_blocked:
|
|
_log_blocked_notice_once()
|
|
set_process(false)
|
|
return
|
|
var delay := _reconnect_delay_for_attempt(_reconnect_attempt)
|
|
_reconnect_attempt += 1
|
|
_reconnect_timer = delay
|
|
if _should_log_reconnect_attempt(_reconnect_attempt):
|
|
log_buffer.log(
|
|
"reconnecting (attempt %d; next retry in %.0fs if needed)"
|
|
% [_reconnect_attempt, delay]
|
|
)
|
|
## Always create a fresh WebSocketPeer before reconnecting. A peer that has
|
|
## reached STATE_CLOSED is terminal; reusing it can leave the editor stuck in
|
|
## a quiet reconnect loop after the Python server restarts.
|
|
_peer = WebSocketPeer.new()
|
|
_peer.outbound_buffer_size = OUTBOUND_BUFFER_LIMIT_BYTES
|
|
_connect_to_server()
|
|
|
|
|
|
func pause() -> void:
|
|
_pause_depth += 1
|
|
|
|
|
|
func resume() -> void:
|
|
_pause_depth = maxi(0, _pause_depth - 1)
|
|
|
|
|
|
func pause_depth() -> int:
|
|
return _pause_depth
|
|
|
|
|
|
static func _reconnect_delay_for_attempt(attempt_index: int) -> float:
|
|
var delay_idx := mini(attempt_index, RECONNECT_DELAYS.size() - 1)
|
|
return RECONNECT_DELAYS[delay_idx]
|
|
|
|
|
|
static func _should_log_reconnect_attempt(attempt_number: int) -> bool:
|
|
## Log the first few failures for immediate diagnostics, then only periodic
|
|
## progress markers. Reconnect continues indefinitely; the log should not.
|
|
return (
|
|
attempt_number <= RECONNECT_VERBOSE_ATTEMPTS
|
|
or attempt_number % RECONNECT_LOG_EVERY_N_ATTEMPTS == 0
|
|
)
|
|
|
|
|
|
func _log_blocked_notice_once() -> void:
|
|
if _blocked_notice_logged:
|
|
return
|
|
_blocked_notice_logged = true
|
|
if log_buffer and not connect_block_reason.is_empty():
|
|
log_buffer.log(connect_block_reason)
|
|
|
|
|
|
func _send_handshake() -> void:
|
|
_last_readiness = get_readiness()
|
|
_send_json({
|
|
"type": "handshake",
|
|
"session_id": _session_id,
|
|
"godot_version": Engine.get_version_info().get("string", "unknown"),
|
|
"project_path": ProjectSettings.globalize_path("res://"),
|
|
"plugin_version": ClientConfigurator.get_plugin_version(),
|
|
"protocol_version": 1,
|
|
"readiness": _last_readiness,
|
|
"editor_pid": OS.get_process_id(),
|
|
"server_launch_mode": ClientConfigurator.get_server_launch_mode(),
|
|
})
|
|
|
|
|
|
func _handle_message(raw: String) -> void:
|
|
var parsed = JSON.parse_string(raw)
|
|
if parsed == null:
|
|
push_warning("MCP: failed to parse message: %s" % raw)
|
|
return
|
|
if not (parsed is Dictionary):
|
|
return
|
|
if parsed.get("type", "") == "handshake_ack":
|
|
server_version = str(parsed.get("server_version", ""))
|
|
return
|
|
if parsed.has("request_id") and parsed.has("command"):
|
|
if dispatcher:
|
|
dispatcher.enqueue(parsed)
|
|
|
|
|
|
## Send a state event to the server (not a command response).
|
|
func send_event(event_name: String, data: Dictionary = {}) -> bool:
|
|
return _send_json({"type": "event", "event": event_name, "data": data})
|
|
|
|
|
|
## Push a command response for a request_id whose handler deferred its reply
|
|
## (see McpDispatcher.DEFERRED_RESPONSE). `payload` must carry either a `data`
|
|
## or `error` field in the same shape handlers normally return.
|
|
func send_deferred_response(request_id: String, payload: Dictionary) -> void:
|
|
if dispatcher != null and not dispatcher.has_pending_deferred_response(request_id):
|
|
if log_buffer:
|
|
log_buffer.log("[defer] dropped late response for expired request %s" % request_id)
|
|
return
|
|
var response := payload.duplicate()
|
|
response["request_id"] = request_id
|
|
if not response.has("status"):
|
|
response["status"] = "ok" if payload.has("data") else "error"
|
|
## Symmetric with McpDispatcher::_dispatch — stamp live readiness on the
|
|
## deferred reply so the server's session cache self-heals from any
|
|
## response, not just the synchronous ones. Lets `project_stop` (the
|
|
## main deferred-response producer) stay correct even if its bespoke
|
|
## `readiness_after` payload field were ever dropped.
|
|
if not response.has("readiness"):
|
|
response["readiness"] = get_readiness()
|
|
if _send_json(response) and dispatcher != null:
|
|
dispatcher.complete_deferred_response(request_id)
|
|
|
|
|
|
func _hook_editor_signals() -> void:
|
|
# Scene change: poll in _process since there's no direct signal for scene switch
|
|
# Play state: EditorInterface signals
|
|
EditorInterface.get_editor_settings() # ensure interface is ready
|
|
_last_scene_path = _get_current_scene_path()
|
|
_last_play_state = EditorInterface.is_playing_scene()
|
|
|
|
|
|
var _last_scene_path := ""
|
|
var _last_play_state := false
|
|
var _last_readiness := ""
|
|
|
|
|
|
## Compute current editor readiness from live Godot state.
|
|
static func get_readiness() -> String:
|
|
if EditorInterface.get_resource_filesystem().is_scanning():
|
|
return "importing"
|
|
if EditorInterface.is_playing_scene():
|
|
return "playing"
|
|
if EditorInterface.get_edited_scene_root() == null:
|
|
return "no_scene"
|
|
return "ready"
|
|
|
|
|
|
## Check for scene/play state changes each frame (lightweight polling).
|
|
func _check_state_changes() -> void:
|
|
var scene_path := _get_current_scene_path()
|
|
if scene_path != _last_scene_path:
|
|
if send_event("scene_changed", {"current_scene": scene_path}):
|
|
_last_scene_path = scene_path
|
|
if log_buffer:
|
|
log_buffer.log("[event] scene_changed -> %s" % scene_path)
|
|
|
|
var playing := EditorInterface.is_playing_scene()
|
|
if playing != _last_play_state:
|
|
var state := "playing" if playing else "stopped"
|
|
if send_event("play_state_changed", {"play_state": state}):
|
|
_last_play_state = playing
|
|
if log_buffer:
|
|
log_buffer.log("[event] play_state_changed -> %s" % state)
|
|
|
|
var readiness := get_readiness()
|
|
if readiness != _last_readiness:
|
|
if send_event("readiness_changed", {"readiness": readiness}):
|
|
_last_readiness = readiness
|
|
if log_buffer:
|
|
log_buffer.log("[event] readiness -> %s" % readiness)
|
|
|
|
|
|
func _get_current_scene_path() -> String:
|
|
var scene_root := EditorInterface.get_edited_scene_root()
|
|
return scene_root.scene_file_path if scene_root else ""
|
|
|
|
|
|
func _send_json(data: Dictionary) -> bool:
|
|
if not _connected:
|
|
return false
|
|
var text := JSON.stringify(data)
|
|
var message_bytes := text.to_utf8_buffer().size()
|
|
var buffered_bytes := _peer.get_current_outbound_buffered_amount()
|
|
if _would_exceed_outbound_backpressure(buffered_bytes, message_bytes):
|
|
return _handle_outbound_backpressure(data, buffered_bytes, message_bytes)
|
|
var err := _peer.send_text(text)
|
|
if err != OK:
|
|
if log_buffer:
|
|
log_buffer.log("[send] websocket send_text failed: %s" % error_string(err))
|
|
return false
|
|
return true
|
|
|
|
|
|
static func _would_exceed_outbound_backpressure(buffered_bytes: int, message_bytes: int) -> bool:
|
|
return buffered_bytes + message_bytes > OUTBOUND_BUFFER_LIMIT_BYTES
|
|
|
|
|
|
func _handle_outbound_backpressure(
|
|
data: Dictionary,
|
|
buffered_bytes: int,
|
|
message_bytes: int,
|
|
) -> bool:
|
|
var request_id: String = data.get("request_id", "")
|
|
if request_id.is_empty():
|
|
if log_buffer:
|
|
log_buffer.log(
|
|
"[send] requestless payload blocked by websocket backpressure "
|
|
+ "(buffered=%d, message=%d, limit=%d)"
|
|
% [buffered_bytes, message_bytes, OUTBOUND_BUFFER_LIMIT_BYTES]
|
|
)
|
|
return false
|
|
|
|
var err_response := _make_backpressure_error(request_id, buffered_bytes, message_bytes)
|
|
var err_text := JSON.stringify(err_response)
|
|
var err_bytes := err_text.to_utf8_buffer().size()
|
|
if _would_exceed_outbound_backpressure(buffered_bytes, err_bytes):
|
|
if log_buffer:
|
|
log_buffer.log(
|
|
"[send] dropped response for request %s due to websocket backpressure "
|
|
+ "(buffered=%d, message=%d, limit=%d)"
|
|
% [request_id, buffered_bytes, message_bytes, OUTBOUND_BUFFER_LIMIT_BYTES]
|
|
)
|
|
return false
|
|
|
|
var send_err := _peer.send_text(err_text)
|
|
if send_err != OK:
|
|
if log_buffer:
|
|
log_buffer.log("[send] websocket backpressure error send failed: %s" % error_string(send_err))
|
|
return false
|
|
if log_buffer:
|
|
log_buffer.log(
|
|
"[send] %s -> error: outbound websocket backpressure"
|
|
% data.get("command", "response")
|
|
)
|
|
return true
|
|
|
|
|
|
static func _make_backpressure_error(
|
|
request_id: String,
|
|
buffered_bytes: int,
|
|
message_bytes: int,
|
|
) -> Dictionary:
|
|
return {
|
|
"request_id": request_id,
|
|
"status": "error",
|
|
"data": {},
|
|
## Stamp readiness on the backpressure error too — the server's
|
|
## per-response self-heal applies to every response shape the
|
|
## plugin emits, and the next legitimate reply may already be
|
|
## queued behind this one.
|
|
"readiness": get_readiness(),
|
|
"error": {
|
|
"code": ErrorCodes.INTERNAL_ERROR,
|
|
"message": (
|
|
"Outbound WebSocket buffer is full; dropped response before queueing "
|
|
+ "more data. Retry with a smaller payload (for screenshots, lower "
|
|
+ "max_resolution or set include_image=false)."
|
|
),
|
|
"data": {
|
|
"buffered_bytes": buffered_bytes,
|
|
"message_bytes": message_bytes,
|
|
"limit_bytes": OUTBOUND_BUFFER_LIMIT_BYTES,
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
## Build a human-readable session ID of form "<slug>@<4hex>" from the project path.
|
|
## The slug is derived from the project directory name so agents can recognize
|
|
## which editor they're targeting; the hex suffix disambiguates same-project twins.
|
|
static func _make_session_id(project_path: String) -> String:
|
|
var base := project_path.rstrip("/\\").get_file()
|
|
if base == "":
|
|
base = "project"
|
|
var slug := _slugify(base)
|
|
if slug == "":
|
|
slug = "project"
|
|
var suffix := _rand_hex(4)
|
|
return "%s@%s" % [slug, suffix]
|
|
|
|
|
|
static func _slugify(s: String) -> String:
|
|
var out := ""
|
|
var prev_dash := false
|
|
for c in s.to_lower():
|
|
if (c >= "a" and c <= "z") or (c >= "0" and c <= "9"):
|
|
out += c
|
|
prev_dash = false
|
|
elif not prev_dash and out != "":
|
|
out += "-"
|
|
prev_dash = true
|
|
return out.trim_suffix("-")
|
|
|
|
|
|
static func _rand_hex(n: int) -> String:
|
|
var bytes := PackedByteArray()
|
|
var byte_count := int(ceil(float(n) / 2.0))
|
|
for i in byte_count:
|
|
bytes.append(randi() % 256)
|
|
return bytes.hex_encode().substr(0, n)
|