From a0c97772649c49e0fcee5ddc859e6ba6d511a0b2 Mon Sep 17 00:00:00 2001 From: Leroy Hopson Date: Sat, 2 Mar 2024 13:20:33 +1300 Subject: [PATCH] feat(pty): add multithread support Adds the option `use_threads` to PTY (enabled by default) which improves performance when enabled. For example, running `time cat file.txt` where file is ~4.5MB will take ~0.250s with threads enabled, versus >20s when disabled. --- addons/godot_xterm/native/src/pty.cpp | 132 ++++++++++++++++++-------- addons/godot_xterm/native/src/pty.h | 17 +++- test/test_nix.gd | 11 ++- 3 files changed, 118 insertions(+), 42 deletions(-) diff --git a/addons/godot_xterm/native/src/pty.cpp b/addons/godot_xterm/native/src/pty.cpp index 803f342..5206a25 100644 --- a/addons/godot_xterm/native/src/pty.cpp +++ b/addons/godot_xterm/native/src/pty.cpp @@ -1,6 +1,12 @@ #include "pty.h" -#include +#include +#include +#include +#include +#include +#include +#include #include #if (defined(__linux__) || defined(__APPLE__)) && !defined(_PTY_DISABLED) @@ -8,6 +14,9 @@ #include #endif +// Require buffer to be flushed after reaching this size. +#define BUFFER_LIMIT 1048576 // 1MB + #define UV_ERR_MSG(uv_err) \ String(uv_err_name(uv_err)) + String(": ") + String(uv_strerror(uv_err)) @@ -18,9 +27,8 @@ using namespace godot; void _alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf); -void _read_cb(uv_stream_t *pipe, ssize_t nread, const uv_buf_t *buf); void _write_cb(uv_write_t *req, int status) { std::free(req); } -void _close_cb(uv_handle_t *handle); +void _close_cb(uv_handle_t *handle) { /* no-op */ }; void PTY::_bind_methods() { BIND_ENUM_CONSTANT(SIGNAL_SIGHUP); @@ -50,6 +58,10 @@ void PTY::_bind_methods() { ClassDB::bind_method(D_METHOD("set_use_os_env", "use_os_env"), &PTY::set_use_os_env); ClassDB::add_property("PTY", PropertyInfo(Variant::BOOL, "use_os_env"), "set_use_os_env", "get_use_os_env"); + ClassDB::bind_method(D_METHOD("set_use_threads", "enabled"), &PTY::set_use_threads); + ClassDB::bind_method(D_METHOD("is_using_threads"), &PTY::is_using_threads); + ClassDB::add_property("PTY", PropertyInfo(Variant::BOOL, "use_threads"), "set_use_threads", "is_using_threads"); + ClassDB::bind_method(D_METHOD("get_pts"), &PTY::get_pts); ClassDB::bind_method(D_METHOD("fork", "file", "args", "cwd", "cols", "rows"), &PTY::fork, DEFVAL(""), DEFVAL(PackedStringArray()), DEFVAL("."), DEFVAL(80), DEFVAL(24)); @@ -63,10 +75,18 @@ void PTY::_bind_methods() { } PTY::PTY() { + use_threads = true; + + set_process_internal(false); + thread.instantiate(); + buffer_write_mutex.instantiate(); + buffer_cleared.instantiate(); + env["TERM"] = "xterm-256color"; env["COLORTERM"] = "truecolor"; #if defined(__linux__) || defined(__APPLE__) + uv_async_init(uv_default_loop(), &async_handle, [](uv_async_t *handle) {}); uv_pipe_init(uv_default_loop(), &pipe, false); pipe.data = this; #endif @@ -96,6 +116,15 @@ void PTY::set_use_os_env(const bool value) { use_os_env = value; } +void PTY::set_use_threads(bool p_use) { + ERR_FAIL_COND(status != STATUS_CLOSED); + use_threads = p_use; +} + +bool PTY::is_using_threads() const { + return use_threads; +} + String PTY::get_pts() const { return pts; } @@ -117,17 +146,19 @@ Error PTY::fork(const String &file, const PackedStringArray &args, const String pid = result["pid"]; pts = result["pty"]; - # if defined(__linux__) || defined(__APPLE__) - err = _pipe_open(fd); - if (err != OK) { - status = STATUS_ERROR; - ERR_FAIL_V_MSG(err, "Failed to open pipe."); - } + status = STATUS_OPEN; + + #if defined(__linux__) || defined(__APPLE__) + _pipe_open(fd); + uv_read_start((uv_stream_t *)&pipe, _alloc_buffer, _read_cb); #endif - status = STATUS_OPEN; - set_process_internal(true); + if (use_threads) { + stop_thread.clear(); + thread->start(callable_mp(this, &PTY::_thread_func)); + } + set_process_internal(true); return OK; } @@ -192,35 +223,61 @@ void PTY::_notification(int p_what) { switch (p_what) { case NOTIFICATION_INTERNAL_PROCESS: - switch (status) - { - case STATUS_OPEN: - _run(UV_RUN_NOWAIT); + { + #if defined(__linux__) || defined(__APPLE__) + if (!use_threads) uv_run(uv_default_loop(), UV_RUN_NOWAIT); + #endif + + buffer_write_mutex->lock(); + if (buffer.size() > 0) { + emit_signal("data_received", buffer); + buffer.clear(); + buffer_cleared->post(); } + buffer_write_mutex->unlock(); + break; + } case NOTIFICATION_EXIT_TREE: _close(); break; } } -void PTY::_run(uv_run_mode mode) { - #if defined(__linux__) || defined(__APPLE__) - if (!uv_is_active((uv_handle_t *)&pipe)) { - uv_read_start((uv_stream_t *)&pipe, _alloc_buffer, _read_cb); +void PTY::_thread_func() { + while (!stop_thread.is_set()) { + if (buffer.size() < BUFFER_LIMIT) { + #if defined(__linux__) || defined(__APPLE__) + uv_run(uv_default_loop(), UV_RUN_ONCE); + #endif + } else { + buffer_cleared->wait(); + } } - - uv_run(uv_default_loop(), mode); - #endif } void PTY::_close() { + if (use_threads) { + if (thread->is_started()) { + stop_thread.set(); + #if defined(__linux__) || defined(__APPLE__) + uv_async_send(&async_handle); + #endif + thread->wait_to_finish(); + } + } + #if defined(__linux__) || defined(__APPLE__) if (!uv_is_closing((uv_handle_t *)&pipe)) { uv_close((uv_handle_t *)&pipe, _close_cb); - uv_run(uv_default_loop(), UV_RUN_ONCE); } + if (!uv_is_closing((uv_handle_t *)&async_handle)) { + uv_close((uv_handle_t *)&async_handle, _close_cb); + } + + uv_run(uv_default_loop(), UV_RUN_NOWAIT); + if (fd > 0) close(fd); if (pid > 0) kill(SIGNAL_SIGHUP); #endif @@ -230,7 +287,7 @@ void PTY::_close() { set_process_internal(false); status = STATUS_CLOSED; -} +} String PTY::_get_fork_file(const String &file) const { if (!file.is_empty()) return file; @@ -306,7 +363,7 @@ PackedStringArray PTY::_parse_env(const Dictionary &env) const { } void PTY::_on_exit(int exit_code, int exit_signal) { - emit_signal(StringName("exited"), exit_code, exit_signal); + call_deferred("emit_signal", "exited", exit_code, exit_signal); } #if defined(__linux__) || defined(__APPLE__) @@ -316,7 +373,7 @@ void _alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { buf->len = suggested_size; } -void _read_cb(uv_stream_t *pipe, ssize_t nread, const uv_buf_t *buf) { +void PTY::_read_cb(uv_stream_t *pipe, ssize_t nread, const uv_buf_t *buf) { PTY *pty = static_cast(pipe->data); if (nread < 0) { @@ -335,22 +392,17 @@ void _read_cb(uv_stream_t *pipe, ssize_t nread, const uv_buf_t *buf) { return; } - PackedByteArray data; - data.resize(nread); - memcpy(data.ptrw(), buf->base, nread); - std::free((char *)buf->base); - pty->call_deferred("emit_signal", "data_received", data); + if (nread > 0) { + MutexLock lock(*pty->buffer_write_mutex.ptr()); - // Stop reading until the next poll, otherwise _read_cb could be called - // repeatedly, blocking Godot, and eventually resulting in a memory pool - // allocation error. This can be triggered with the command `cat /dev/urandom` - // if reading is not stopped. - uv_read_stop(pipe); -} + int old_size = pty->buffer.size(); + int new_size = old_size + nread; -void _close_cb(uv_handle_t *pipe) { - PTY *pty = static_cast(pipe->data); - pty->status = PTY::Status::STATUS_CLOSED; + pty->buffer.resize(new_size); + memcpy(pty->buffer.ptrw() + old_size, buf->base, nread); + + std::free((char *)buf->base); + } } Error PTY::_pipe_open(const int fd) { diff --git a/addons/godot_xterm/native/src/pty.h b/addons/godot_xterm/native/src/pty.h index 6ecef7a..fcd0add 100644 --- a/addons/godot_xterm/native/src/pty.h +++ b/addons/godot_xterm/native/src/pty.h @@ -3,8 +3,11 @@ #pragma once +#include #include #include +#include +#include #include namespace godot @@ -52,6 +55,9 @@ namespace godot bool get_use_os_env() const; void set_use_os_env(const bool value); + void set_use_threads(bool p_use); + bool is_using_threads() const; + String get_pts() const; Error fork(const String &file = "", const PackedStringArray &args = PackedStringArray(), const String &cwd = ".", const int cols = 80, const int rows = 24); @@ -62,7 +68,6 @@ namespace godot void write(const Variant &data) const; void _notification(int p_what); - void _run(uv_run_mode mode); protected: static void _bind_methods(); @@ -85,11 +90,21 @@ namespace godot void _on_exit(int exit_code, int exit_signal); void _close(); + Ref thread; + Ref buffer_write_mutex; + Ref buffer_cleared; + PackedByteArray buffer; + SafeFlag stop_thread; + uv_async_t async_handle; + bool use_threads; + void _thread_func(); + #if defined(__linux__) || defined(__APPLE__) uv_pipe_t pipe; Error _pipe_open(const int fd); #endif + static void _read_cb(uv_stream_t *pipe, ssize_t nread, const uv_buf_t *buf); static Error uv_err_to_godot_err(const int uv_err); }; } // namespace godot diff --git a/test/test_nix.gd b/test/test_nix.gd index faa79ba..ba2dbc0 100644 --- a/test/test_nix.gd +++ b/test/test_nix.gd @@ -1,4 +1,4 @@ -class_name UnixTest extends GutTest +class_name NixTest extends GutTest var pty: PTY var helper: Helper @@ -112,6 +112,15 @@ func test_emits_exited_with_signal(): assert_signal_emitted_with_parameters(pty, "exited", [0, PTY.SIGNAL_SIGSEGV]) +# Run the same tests, but with use_threads = false. +class TestNoThreads: + extends NixTest + + func before_each(): + super.before_each() + pty.use_threads = false + + class Helper: static func get_pts() -> Array: assert(false) #,"Abstract method")