Only write to remote socket when we know they are writable

Buffer data (in an horribly innefficient way for now), and use the
event manager to watch for the socket being ready for a write.

Fixes #945
This commit is contained in:
Maxime Coste 2016-12-01 20:11:09 +00:00
parent 95c1528342
commit 8c862c4eea
4 changed files with 138 additions and 102 deletions

View File

@ -143,6 +143,16 @@ bool fd_readable(int fd)
return select(fd+1, &rfds, nullptr, nullptr, &tv) == 1; return select(fd+1, &rfds, nullptr, nullptr, &tv) == 1;
} }
bool fd_writable(int fd)
{
fd_set rfds;
FD_ZERO(&rfds);
FD_SET(fd, &rfds);
timeval tv{0,0};
return select(fd+1, nullptr, &rfds, nullptr, &tv) == 1;
}
String read_fd(int fd, bool text) String read_fd(int fd, bool text)
{ {
String content; String content;

View File

@ -30,6 +30,7 @@ std::pair<StringView, StringView> split_path(StringView path);
String get_kak_binary_path(); String get_kak_binary_path();
bool fd_readable(int fd); bool fd_readable(int fd);
bool fd_writable(int fd);
String read_fd(int fd, bool text = false); String read_fd(int fd, bool text = false);
String read_file(StringView filename, bool text = false); String read_file(StringView filename, bool text = false);
void write(int fd, StringView data); void write(int fd, StringView data);

View File

@ -42,7 +42,8 @@ enum class MessageType : char
class MsgWriter class MsgWriter
{ {
public: public:
MsgWriter(int sock, MessageType type) : m_socket(sock) MsgWriter(RemoteBuffer& buffer, MessageType type)
: m_buffer{buffer}, m_start{(uint32_t)buffer.size()}
{ {
write(type); write(type);
write((uint32_t)0); // message size, to be patched on write write((uint32_t)0); // message size, to be patched on write
@ -50,23 +51,13 @@ public:
~MsgWriter() noexcept(false) ~MsgWriter() noexcept(false)
{ {
uint32_t count = (uint32_t)m_stream.size(); uint32_t count = (uint32_t)m_buffer.size() - m_start;
char* data = m_stream.data(); *reinterpret_cast<uint32_t*>(m_buffer.data() + m_start + 1) = count;
*reinterpret_cast<uint32_t*>(data+1) = count;
while (count > 0)
{
int res = ::write(m_socket, data, count);
if (res <= 0)
throw remote_error{res ? "peer disconnected"
: format("socket write failed: {}", strerror(errno))};
data += res;
count -= res;
}
} }
void write(const char* val, size_t size) void write(const char* val, size_t size)
{ {
m_stream.insert(m_stream.end(), val, val + size); m_buffer.insert(m_buffer.end(), val, val + size);
} }
template<typename T> template<typename T>
@ -139,8 +130,8 @@ public:
} }
private: private:
Vector<char, MemoryDomain::Remote> m_stream; RemoteBuffer& m_buffer;
int m_socket; uint32_t m_start;
}; };
class MsgReader class MsgReader
@ -344,18 +335,34 @@ private:
MsgReader m_reader; MsgReader m_reader;
DisplayCoord m_dimensions; DisplayCoord m_dimensions;
OnKeyCallback m_on_key; OnKeyCallback m_on_key;
RemoteBuffer m_send_buffer;
SafePtr<Client> m_client; SafePtr<Client> m_client;
}; };
static bool send_data(int fd, RemoteBuffer& buffer)
{
while (buffer.size() > 0 and fd_writable(fd))
{
int res = ::write(fd, buffer.data(), buffer.size());
if (res <= 0)
throw remote_error{res ? "peer disconnected"
: format("socket write failed: {}", strerror(errno))};
buffer.erase(buffer.begin(), buffer.begin() + res);
}
return buffer.empty();
}
RemoteUI::RemoteUI(int socket, DisplayCoord dimensions) RemoteUI::RemoteUI(int socket, DisplayCoord dimensions)
: m_socket_watcher(socket, FdEvents::Read, : m_socket_watcher(socket, FdEvents::Read | FdEvents::Write,
[this](FDWatcher& watcher, FdEvents, EventMode mode) { [this](FDWatcher& watcher, FdEvents events, EventMode mode) {
const int sock = watcher.fd(); const int sock = watcher.fd();
try try
{ {
while (fd_readable(sock)) if (events & FdEvents::Write and send_data(sock, m_send_buffer))
m_socket_watcher.events() &= ~FdEvents::Write;
while (events & FdEvents::Read and fd_readable(sock))
{ {
m_reader.read_available(sock); m_reader.read_available(sock);
@ -377,7 +384,7 @@ RemoteUI::RemoteUI(int socket, DisplayCoord dimensions)
} }
catch (const remote_error& err) catch (const remote_error& err)
{ {
write_to_debug_buffer(format("Error while reading remote message: {}", err.what())); write_to_debug_buffer(format("Error while transfering remote messages: {}", err.what()));
ClientManager::instance().remove_client(*m_client, false); ClientManager::instance().remove_client(*m_client, false);
} }
}), }),
@ -396,72 +403,81 @@ void RemoteUI::menu_show(ConstArrayView<DisplayLine> choices,
DisplayCoord anchor, Face fg, Face bg, DisplayCoord anchor, Face fg, Face bg,
MenuStyle style) MenuStyle style)
{ {
MsgWriter msg{m_socket_watcher.fd(), MessageType::MenuShow}; MsgWriter msg{m_send_buffer, MessageType::MenuShow};
msg.write(choices); msg.write(choices);
msg.write(anchor); msg.write(anchor);
msg.write(fg); msg.write(fg);
msg.write(bg); msg.write(bg);
msg.write(style); msg.write(style);
m_socket_watcher.events() |= FdEvents::Write;
} }
void RemoteUI::menu_select(int selected) void RemoteUI::menu_select(int selected)
{ {
MsgWriter msg{m_socket_watcher.fd(), MessageType::MenuSelect}; MsgWriter msg{m_send_buffer, MessageType::MenuSelect};
msg.write(selected); msg.write(selected);
m_socket_watcher.events() |= FdEvents::Write;
} }
void RemoteUI::menu_hide() void RemoteUI::menu_hide()
{ {
MsgWriter msg{m_socket_watcher.fd(), MessageType::MenuHide}; MsgWriter msg{m_send_buffer, MessageType::MenuHide};
m_socket_watcher.events() |= FdEvents::Write;
} }
void RemoteUI::info_show(StringView title, StringView content, void RemoteUI::info_show(StringView title, StringView content,
DisplayCoord anchor, Face face, DisplayCoord anchor, Face face,
InfoStyle style) InfoStyle style)
{ {
MsgWriter msg{m_socket_watcher.fd(), MessageType::InfoShow}; MsgWriter msg{m_send_buffer, MessageType::InfoShow};
msg.write(title); msg.write(title);
msg.write(content); msg.write(content);
msg.write(anchor); msg.write(anchor);
msg.write(face); msg.write(face);
msg.write(style); msg.write(style);
m_socket_watcher.events() |= FdEvents::Write;
} }
void RemoteUI::info_hide() void RemoteUI::info_hide()
{ {
MsgWriter msg{m_socket_watcher.fd(), MessageType::InfoHide}; MsgWriter msg{m_send_buffer, MessageType::InfoHide};
m_socket_watcher.events() |= FdEvents::Write;
} }
void RemoteUI::draw(const DisplayBuffer& display_buffer, void RemoteUI::draw(const DisplayBuffer& display_buffer,
const Face& default_face, const Face& default_face,
const Face& padding_face) const Face& padding_face)
{ {
MsgWriter msg{m_socket_watcher.fd(), MessageType::Draw}; MsgWriter msg{m_send_buffer, MessageType::Draw};
msg.write(display_buffer); msg.write(display_buffer);
msg.write(default_face); msg.write(default_face);
msg.write(padding_face); msg.write(padding_face);
m_socket_watcher.events() |= FdEvents::Write;
} }
void RemoteUI::draw_status(const DisplayLine& status_line, void RemoteUI::draw_status(const DisplayLine& status_line,
const DisplayLine& mode_line, const DisplayLine& mode_line,
const Face& default_face) const Face& default_face)
{ {
MsgWriter msg{m_socket_watcher.fd(), MessageType::DrawStatus}; MsgWriter msg{m_send_buffer, MessageType::DrawStatus};
msg.write(status_line); msg.write(status_line);
msg.write(mode_line); msg.write(mode_line);
msg.write(default_face); msg.write(default_face);
m_socket_watcher.events() |= FdEvents::Write;
} }
void RemoteUI::refresh(bool force) void RemoteUI::refresh(bool force)
{ {
MsgWriter msg{m_socket_watcher.fd(), MessageType::Refresh}; MsgWriter msg{m_send_buffer, MessageType::Refresh};
msg.write(force); msg.write(force);
m_socket_watcher.events() |= FdEvents::Write;
} }
void RemoteUI::set_ui_options(const Options& options) void RemoteUI::set_ui_options(const Options& options)
{ {
MsgWriter msg{m_socket_watcher.fd(), MessageType::SetOptions}; MsgWriter msg{m_send_buffer, MessageType::SetOptions};
msg.write(options); msg.write(options);
m_socket_watcher.events() |= FdEvents::Write;
} }
static sockaddr_un session_addr(StringView session) static sockaddr_un session_addr(StringView session)
@ -500,26 +516,32 @@ RemoteClient::RemoteClient(StringView session, std::unique_ptr<UserInterface>&&
int sock = connect_to(session); int sock = connect_to(session);
{ {
MsgWriter msg{sock, MessageType::Connect}; MsgWriter msg{m_send_buffer, MessageType::Connect};
msg.write(init_command); msg.write(init_command);
msg.write(m_ui->dimensions()); msg.write(m_ui->dimensions());
msg.write(env_vars); msg.write(env_vars);
} }
m_ui->set_on_key([this](Key key){ m_ui->set_on_key([this](Key key){
MsgWriter msg(m_socket_watcher->fd(), MessageType::Key); MsgWriter msg(m_send_buffer, MessageType::Key);
msg.write(key); msg.write(key);
m_socket_watcher->events() |= FdEvents::Write;
}); });
MsgReader reader; MsgReader reader;
m_socket_watcher.reset(new FDWatcher{sock, FdEvents::Read, m_socket_watcher.reset(new FDWatcher{sock, FdEvents::Read | FdEvents::Write,
[this, reader](FDWatcher& watcher, FdEvents, EventMode) mutable { [this, reader](FDWatcher& watcher, FdEvents events, EventMode) mutable {
const int sock = watcher.fd(); const int sock = watcher.fd();
while (fd_readable(sock) and not reader.ready()) if (events & FdEvents::Write and send_data(sock, m_send_buffer))
m_socket_watcher->events() &= ~FdEvents::Write;
while (events & FdEvents::Read and
not reader.ready() and fd_readable(sock))
{
reader.read_available(sock); reader.read_available(sock);
if (not reader.ready()) if (not reader.ready())
return; continue;
auto clear_reader = on_scope_end([&reader] { reader.reset(); }); auto clear_reader = on_scope_end([&reader] { reader.reset(); });
switch (reader.type()) switch (reader.type())
@ -578,6 +600,7 @@ RemoteClient::RemoteClient(StringView session, std::unique_ptr<UserInterface>&&
default: default:
kak_assert(false); kak_assert(false);
} }
}
}}); }});
} }
@ -585,8 +608,12 @@ void send_command(StringView session, StringView command)
{ {
int sock = connect_to(session); int sock = connect_to(session);
auto close_sock = on_scope_end([sock]{ close(sock); }); auto close_sock = on_scope_end([sock]{ close(sock); });
MsgWriter msg{sock, MessageType::Command}; RemoteBuffer buffer;
{
MsgWriter msg{buffer, MessageType::Command};
msg.write(command); msg.write(command);
}
write(sock, {buffer.data(), buffer.data() + buffer.size()});
} }
@ -613,11 +640,8 @@ private:
const int sock = m_socket_watcher.fd(); const int sock = m_socket_watcher.fd();
try try
{ {
do while (not m_reader.ready() and fd_readable(sock))
{
m_reader.read_available(sock); m_reader.read_available(sock);
}
while (fd_readable(sock) and not m_reader.ready());
if (not m_reader.ready()) if (not m_reader.ready())
return; return;

View File

@ -4,6 +4,7 @@
#include "env_vars.hh" #include "env_vars.hh"
#include "exception.hh" #include "exception.hh"
#include "utils.hh" #include "utils.hh"
#include "vector.hh"
#include <memory> #include <memory>
@ -12,14 +13,13 @@ namespace Kakoune
struct remote_error : runtime_error struct remote_error : runtime_error
{ {
remote_error(String error) using runtime_error::runtime_error;
: runtime_error{std::move(error)}
{}
}; };
class FDWatcher; class FDWatcher;
class UserInterface; class UserInterface;
struct Key;
using RemoteBuffer = Vector<char, MemoryDomain::Remote>;
// A remote client handle communication between a client running on the server // A remote client handle communication between a client running on the server
// and a user interface running on the local process. // and a user interface running on the local process.
@ -32,6 +32,7 @@ public:
private: private:
std::unique_ptr<UserInterface> m_ui; std::unique_ptr<UserInterface> m_ui;
std::unique_ptr<FDWatcher> m_socket_watcher; std::unique_ptr<FDWatcher> m_socket_watcher;
RemoteBuffer m_send_buffer;
}; };
void send_command(StringView session, StringView command); void send_command(StringView session, StringView command);