Use a select based event handling and fix deadlock

This commit is contained in:
Maxime Coste 2014-12-03 13:56:02 +00:00
parent def4221ac7
commit 0517a19e6d
5 changed files with 47 additions and 33 deletions

View File

@ -95,7 +95,7 @@ Buffer* create_fifo_buffer(String name, int fd, bool scroll)
auto watcher_deleter = [buffer](FDWatcher* watcher) {
kak_assert(buffer->flags() & Buffer::Flags::Fifo);
close(watcher->fd());
watcher->close_fd();
buffer->run_hook_in_own_context("BufCloseFifo", "");
buffer->flags() &= ~Buffer::Flags::Fifo;
watcher->~FDWatcher();

View File

@ -1,6 +1,6 @@
#include "event_manager.hh"
#include <poll.h>
#include <unistd.h>
namespace Kakoune
{
@ -21,6 +21,12 @@ void FDWatcher::run(EventMode mode)
m_callback(*this, mode);
}
void FDWatcher::close_fd()
{
close(m_fd);
m_fd = -1;
}
Timer::Timer(TimePoint date, Callback callback, EventMode mode)
: m_date{date}, m_callback{std::move(callback)}, m_mode(mode)
{
@ -47,7 +53,7 @@ void Timer::run(EventMode mode)
EventManager::EventManager()
{
m_forced_fd.reserve(4);
FD_ZERO(&m_forced_fd);
}
EventManager::~EventManager()
@ -58,10 +64,18 @@ EventManager::~EventManager()
void EventManager::handle_next_events(EventMode mode)
{
std::vector<pollfd> events;
events.reserve(m_fd_watchers.size());
int max_fd = 0;
fd_set rfds;
FD_ZERO(&rfds);
for (auto& watcher : m_fd_watchers)
events.emplace_back(pollfd{ watcher->fd(), POLLIN | POLLPRI, 0 });
{
const int fd = watcher->fd();
if (fd != -1)
{
max_fd = std::max(fd, max_fd);
FD_SET(fd, &rfds);
}
}
TimePoint next_timer = TimePoint::max();
for (auto& timer : m_timers)
@ -70,21 +84,23 @@ void EventManager::handle_next_events(EventMode mode)
next_timer = timer->next_date();
}
using namespace std::chrono;
auto timeout = duration_cast<milliseconds>(next_timer - Clock::now()).count();
poll(events.data(), events.size(), timeout < INT_MAX ? (int)timeout : INT_MAX);
auto timeout = duration_cast<microseconds>(next_timer - Clock::now()).count();
// gather forced fds *after* poll, so that signal handlers can write to
constexpr auto us = 1000000000ll;
timeval tv{ (long)(timeout / us), (long)(timeout % us) };
int res = select(max_fd + 1, &rfds, nullptr, nullptr, &tv);
// copy forced fds *after* poll, so that signal handlers can write to
// m_forced_fd, interupt poll, and directly be serviced.
std::vector<int> forced;
std::swap(forced, m_forced_fd);
fd_set forced = m_forced_fd;
FD_ZERO(&m_forced_fd);
for (auto& event : events)
for (int fd = 0; fd < max_fd + 1; ++fd)
{
const int fd = event.fd;
if (event.revents or contains(forced, fd))
if ((res > 0 and FD_ISSET(fd, &rfds)) or FD_ISSET(fd, &forced))
{
auto it = find_if(m_fd_watchers,
[fd](FDWatcher* w) { return w->fd() == fd; });
[fd](const FDWatcher* w){return w->fd() == fd; });
if (it != m_fd_watchers.end())
(*it)->run(mode);
}
@ -100,7 +116,7 @@ void EventManager::handle_next_events(EventMode mode)
void EventManager::force_signal(int fd)
{
m_forced_fd.push_back(fd);
FD_SET(fd, &m_forced_fd);
}
}

View File

@ -7,6 +7,8 @@
#include <chrono>
#include <unordered_set>
#include <sys/select.h>
namespace Kakoune
{
@ -28,6 +30,9 @@ public:
int fd() const { return m_fd; }
void run(EventMode mode);
void close_fd();
bool closed() const { return m_fd == -1; }
private:
int m_fd;
@ -80,7 +85,7 @@ private:
friend class Timer;
std::unordered_set<FDWatcher*> m_fd_watchers;
std::unordered_set<Timer*> m_timers;
std::vector<int> m_forced_fd;
fd_set m_forced_fd;
TimePoint m_last;
};

View File

@ -295,7 +295,7 @@ RemoteUI::~RemoteUI()
{
write_debug("remote client disconnected: " +
to_string(m_socket_watcher.fd()));
close(m_socket_watcher.fd());
m_socket_watcher.close_fd();
}
void RemoteUI::menu_show(memoryview<String> choices,
@ -646,7 +646,7 @@ Server::Server(String session_name)
void Server::close_session()
{
unlink(("/tmp/kak-" + m_session).c_str());
close(m_listener->fd());
m_listener->close_fd();
m_listener.reset();
}

View File

@ -66,27 +66,20 @@ String ShellManager::pipe(StringView input,
String error;
{
auto pipe_reader = [](String& output, bool& closed) {
return [&output, &closed](FDWatcher& watcher, EventMode) {
if (closed)
return;
const int fd = watcher.fd();
auto pipe_reader = [](String& output) {
return [&output](FDWatcher& watcher, EventMode) {
char buffer[1024];
size_t size = read(fd, buffer, 1024);
size_t size = read(watcher.fd(), buffer, 1024);
if (size <= 0)
{
close(fd);
closed = true;
}
watcher.close_fd();
output += String(buffer, buffer+size);
};
};
bool stdout_closed = false, stderr_closed = false;
FDWatcher stdout_watcher{read_pipe[0], pipe_reader(output, stdout_closed)};
FDWatcher stderr_watcher{error_pipe[0], pipe_reader(error, stderr_closed)};
FDWatcher stdout_watcher{read_pipe[0], pipe_reader(output)};
FDWatcher stderr_watcher{error_pipe[0], pipe_reader(error)};
while (not stdout_closed or not stderr_closed)
while (not stdout_watcher.closed() and not stderr_watcher.closed())
EventManager::instance().handle_next_events(EventMode::Urgent);
}