feat(pty): add multithread support

Adds the option `use_threads` to PTY (enabled by default) which improves
performance when enabled. For example, running `time cat file.txt` where
file is ~4.5MB will take ~0.250s with threads enabled, versus >20s when
disabled.
This commit is contained in:
Leroy Hopson 2024-03-02 13:20:33 +13:00
parent 46f3aa12bf
commit a0c9777264
No known key found for this signature in database
GPG key ID: D2747312A6DB51AA
3 changed files with 118 additions and 42 deletions

View file

@ -1,6 +1,12 @@
#include "pty.h"
#include<godot_cpp/classes/project_settings.hpp>
#include <godot_cpp/classes/mutex.hpp>
#include <godot_cpp/classes/node.hpp>
#include <godot_cpp/classes/os.hpp>
#include <godot_cpp/classes/project_settings.hpp>
#include <godot_cpp/classes/semaphore.hpp>
#include <godot_cpp/classes/thread.hpp>
#include <godot_cpp/core/mutex_lock.hpp>
#include <uv.h>
#if (defined(__linux__) || defined(__APPLE__)) && !defined(_PTY_DISABLED)
@ -8,6 +14,9 @@
#include <unistd.h>
#endif
// Require buffer to be flushed after reaching this size.
#define BUFFER_LIMIT 1048576 // 1MB
#define UV_ERR_MSG(uv_err) \
String(uv_err_name(uv_err)) + String(": ") + String(uv_strerror(uv_err))
@ -18,9 +27,8 @@
using namespace godot;
void _alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf);
void _read_cb(uv_stream_t *pipe, ssize_t nread, const uv_buf_t *buf);
void _write_cb(uv_write_t *req, int status) { std::free(req); }
void _close_cb(uv_handle_t *handle);
void _close_cb(uv_handle_t *handle) { /* no-op */ };
void PTY::_bind_methods() {
BIND_ENUM_CONSTANT(SIGNAL_SIGHUP);
@ -50,6 +58,10 @@ void PTY::_bind_methods() {
ClassDB::bind_method(D_METHOD("set_use_os_env", "use_os_env"), &PTY::set_use_os_env);
ClassDB::add_property("PTY", PropertyInfo(Variant::BOOL, "use_os_env"), "set_use_os_env", "get_use_os_env");
ClassDB::bind_method(D_METHOD("set_use_threads", "enabled"), &PTY::set_use_threads);
ClassDB::bind_method(D_METHOD("is_using_threads"), &PTY::is_using_threads);
ClassDB::add_property("PTY", PropertyInfo(Variant::BOOL, "use_threads"), "set_use_threads", "is_using_threads");
ClassDB::bind_method(D_METHOD("get_pts"), &PTY::get_pts);
ClassDB::bind_method(D_METHOD("fork", "file", "args", "cwd", "cols", "rows"), &PTY::fork, DEFVAL(""), DEFVAL(PackedStringArray()), DEFVAL("."), DEFVAL(80), DEFVAL(24));
@ -63,10 +75,18 @@ void PTY::_bind_methods() {
}
PTY::PTY() {
use_threads = true;
set_process_internal(false);
thread.instantiate();
buffer_write_mutex.instantiate();
buffer_cleared.instantiate();
env["TERM"] = "xterm-256color";
env["COLORTERM"] = "truecolor";
#if defined(__linux__) || defined(__APPLE__)
uv_async_init(uv_default_loop(), &async_handle, [](uv_async_t *handle) {});
uv_pipe_init(uv_default_loop(), &pipe, false);
pipe.data = this;
#endif
@ -96,6 +116,15 @@ void PTY::set_use_os_env(const bool value) {
use_os_env = value;
}
void PTY::set_use_threads(bool p_use) {
ERR_FAIL_COND(status != STATUS_CLOSED);
use_threads = p_use;
}
bool PTY::is_using_threads() const {
return use_threads;
}
String PTY::get_pts() const {
return pts;
}
@ -117,17 +146,19 @@ Error PTY::fork(const String &file, const PackedStringArray &args, const String
pid = result["pid"];
pts = result["pty"];
# if defined(__linux__) || defined(__APPLE__)
err = _pipe_open(fd);
if (err != OK) {
status = STATUS_ERROR;
ERR_FAIL_V_MSG(err, "Failed to open pipe.");
}
status = STATUS_OPEN;
#if defined(__linux__) || defined(__APPLE__)
_pipe_open(fd);
uv_read_start((uv_stream_t *)&pipe, _alloc_buffer, _read_cb);
#endif
status = STATUS_OPEN;
set_process_internal(true);
if (use_threads) {
stop_thread.clear();
thread->start(callable_mp(this, &PTY::_thread_func));
}
set_process_internal(true);
return OK;
}
@ -192,35 +223,61 @@ void PTY::_notification(int p_what) {
switch (p_what)
{
case NOTIFICATION_INTERNAL_PROCESS:
switch (status)
{
case STATUS_OPEN:
_run(UV_RUN_NOWAIT);
#if defined(__linux__) || defined(__APPLE__)
if (!use_threads) uv_run(uv_default_loop(), UV_RUN_NOWAIT);
#endif
buffer_write_mutex->lock();
if (buffer.size() > 0) {
emit_signal("data_received", buffer);
buffer.clear();
buffer_cleared->post();
}
buffer_write_mutex->unlock();
break;
}
case NOTIFICATION_EXIT_TREE:
_close();
break;
}
}
void PTY::_run(uv_run_mode mode) {
void PTY::_thread_func() {
while (!stop_thread.is_set()) {
if (buffer.size() < BUFFER_LIMIT) {
#if defined(__linux__) || defined(__APPLE__)
if (!uv_is_active((uv_handle_t *)&pipe)) {
uv_read_start((uv_stream_t *)&pipe, _alloc_buffer, _read_cb);
}
uv_run(uv_default_loop(), mode);
uv_run(uv_default_loop(), UV_RUN_ONCE);
#endif
} else {
buffer_cleared->wait();
}
}
}
void PTY::_close() {
if (use_threads) {
if (thread->is_started()) {
stop_thread.set();
#if defined(__linux__) || defined(__APPLE__)
uv_async_send(&async_handle);
#endif
thread->wait_to_finish();
}
}
#if defined(__linux__) || defined(__APPLE__)
if (!uv_is_closing((uv_handle_t *)&pipe)) {
uv_close((uv_handle_t *)&pipe, _close_cb);
uv_run(uv_default_loop(), UV_RUN_ONCE);
}
if (!uv_is_closing((uv_handle_t *)&async_handle)) {
uv_close((uv_handle_t *)&async_handle, _close_cb);
}
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
if (fd > 0) close(fd);
if (pid > 0) kill(SIGNAL_SIGHUP);
#endif
@ -306,7 +363,7 @@ PackedStringArray PTY::_parse_env(const Dictionary &env) const {
}
void PTY::_on_exit(int exit_code, int exit_signal) {
emit_signal(StringName("exited"), exit_code, exit_signal);
call_deferred("emit_signal", "exited", exit_code, exit_signal);
}
#if defined(__linux__) || defined(__APPLE__)
@ -316,7 +373,7 @@ void _alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
buf->len = suggested_size;
}
void _read_cb(uv_stream_t *pipe, ssize_t nread, const uv_buf_t *buf) {
void PTY::_read_cb(uv_stream_t *pipe, ssize_t nread, const uv_buf_t *buf) {
PTY *pty = static_cast<PTY *>(pipe->data);
if (nread < 0) {
@ -335,22 +392,17 @@ void _read_cb(uv_stream_t *pipe, ssize_t nread, const uv_buf_t *buf) {
return;
}
PackedByteArray data;
data.resize(nread);
memcpy(data.ptrw(), buf->base, nread);
if (nread > 0) {
MutexLock lock(*pty->buffer_write_mutex.ptr());
int old_size = pty->buffer.size();
int new_size = old_size + nread;
pty->buffer.resize(new_size);
memcpy(pty->buffer.ptrw() + old_size, buf->base, nread);
std::free((char *)buf->base);
pty->call_deferred("emit_signal", "data_received", data);
// Stop reading until the next poll, otherwise _read_cb could be called
// repeatedly, blocking Godot, and eventually resulting in a memory pool
// allocation error. This can be triggered with the command `cat /dev/urandom`
// if reading is not stopped.
uv_read_stop(pipe);
}
void _close_cb(uv_handle_t *pipe) {
PTY *pty = static_cast<PTY *>(pipe->data);
pty->status = PTY::Status::STATUS_CLOSED;
}
}
Error PTY::_pipe_open(const int fd) {

View file

@ -3,8 +3,11 @@
#pragma once
#include <godot_cpp/classes/mutex.hpp>
#include <godot_cpp/classes/node.hpp>
#include <godot_cpp/classes/os.hpp>
#include <godot_cpp/classes/semaphore.hpp>
#include <godot_cpp/classes/thread.hpp>
#include <uv.h>
namespace godot
@ -52,6 +55,9 @@ namespace godot
bool get_use_os_env() const;
void set_use_os_env(const bool value);
void set_use_threads(bool p_use);
bool is_using_threads() const;
String get_pts() const;
Error fork(const String &file = "", const PackedStringArray &args = PackedStringArray(), const String &cwd = ".", const int cols = 80, const int rows = 24);
@ -62,7 +68,6 @@ namespace godot
void write(const Variant &data) const;
void _notification(int p_what);
void _run(uv_run_mode mode);
protected:
static void _bind_methods();
@ -85,11 +90,21 @@ namespace godot
void _on_exit(int exit_code, int exit_signal);
void _close();
Ref<Thread> thread;
Ref<Mutex> buffer_write_mutex;
Ref<Semaphore> buffer_cleared;
PackedByteArray buffer;
SafeFlag stop_thread;
uv_async_t async_handle;
bool use_threads;
void _thread_func();
#if defined(__linux__) || defined(__APPLE__)
uv_pipe_t pipe;
Error _pipe_open(const int fd);
#endif
static void _read_cb(uv_stream_t *pipe, ssize_t nread, const uv_buf_t *buf);
static Error uv_err_to_godot_err(const int uv_err);
};
} // namespace godot

View file

@ -1,4 +1,4 @@
class_name UnixTest extends GutTest
class_name NixTest extends GutTest
var pty: PTY
var helper: Helper
@ -112,6 +112,15 @@ func test_emits_exited_with_signal():
assert_signal_emitted_with_parameters(pty, "exited", [0, PTY.SIGNAL_SIGSEGV])
# Run the same tests, but with use_threads = false.
class TestNoThreads:
extends NixTest
func before_each():
super.before_each()
pty.use_threads = false
class Helper:
static func get_pts() -> Array:
assert(false) #,"Abstract method")