Refactor EventManager

Watching a file descriptor is now done using a FDWatcher object
This commit is contained in:
Maxime Coste 2013-01-10 18:54:40 +01:00
parent bba7152063
commit 79d28e68dc
7 changed files with 101 additions and 85 deletions

View File

@ -51,7 +51,7 @@ void ClientManager::create_client(std::unique_ptr<UserInterface>&& ui,
return; return;
} }
EventManager::instance().watch(event_fd, [input_handler, context, this](int fd) { new FDWatcher(event_fd, [input_handler, context, this](FDWatcher& watcher) {
try try
{ {
input_handler->handle_available_inputs(*context); input_handler->handle_available_inputs(*context);
@ -64,8 +64,8 @@ void ClientManager::create_client(std::unique_ptr<UserInterface>&& ui,
catch (Kakoune::client_removed&) catch (Kakoune::client_removed&)
{ {
ClientManager::instance().remove_client_by_context(*context); ClientManager::instance().remove_client_by_context(*context);
EventManager::instance().unwatch(fd); close(watcher.fd());
close(fd); delete &watcher;
} }
ClientManager::instance().redraw_clients(); ClientManager::instance().redraw_clients();
}); });

View File

@ -57,20 +57,9 @@ Buffer* open_fifo(const String& name , const String& filename, Context& context)
throw runtime_error("unable to open " + filename); throw runtime_error("unable to open " + filename);
Buffer* buffer = new Buffer(name, Buffer::Flags::Fifo | Buffer::Flags::NoUndo); Buffer* buffer = new Buffer(name, Buffer::Flags::Fifo | Buffer::Flags::NoUndo);
buffer->hooks().add_hook("BufClose", auto watcher = new FDWatcher(fd, [buffer](FDWatcher& watcher) {
[fd, buffer](const String&, const Context&) {
// Check if fifo is still alive, else fd may
// refer to another file/socket
if (buffer->flags() & Buffer::Flags::Fifo)
{
EventManager::instance().unwatch(fd);
close(fd);
}
});
EventManager::instance().watch(fd, [buffer](int fd) {
char data[4096]; char data[4096];
ssize_t count = read(fd, data, 4096); ssize_t count = read(watcher.fd(), data, 4096);
buffer->insert(buffer->end()-1, buffer->insert(buffer->end()-1,
count > 0 ? String(data, data+count) count > 0 ? String(data, data+count)
: "*** kak: fifo closed ***\n"); : "*** kak: fifo closed ***\n");
@ -79,11 +68,21 @@ Buffer* open_fifo(const String& name , const String& filename, Context& context)
{ {
assert(buffer->flags() & Buffer::Flags::Fifo); assert(buffer->flags() & Buffer::Flags::Fifo);
buffer->flags() &= ~Buffer::Flags::Fifo; buffer->flags() &= ~Buffer::Flags::Fifo;
EventManager::instance().unwatch(fd); close(watcher.fd());
close(fd); delete &watcher;
} }
}); });
buffer->hooks().add_hook("BufClose",
[buffer, watcher](const String&, const Context&) {
// Check if fifo is still alive, else watcher is already dead
if (buffer->flags() & Buffer::Flags::Fifo)
{
close(watcher->fd());
delete watcher;
}
});
return buffer; return buffer;
} }

View File

@ -1,60 +1,71 @@
#include "event_manager.hh" #include "event_manager.hh"
#include <algorithm> #include <poll.h>
namespace Kakoune namespace Kakoune
{ {
FDWatcher::FDWatcher(int fd, Callback callback)
: m_fd{fd}, m_callback{std::move(callback)}
{
EventManager::instance().register_fd_watcher(this);
}
FDWatcher::~FDWatcher()
{
EventManager::instance().unregister_fd_watcher(this);
}
EventManager::EventManager() EventManager::EventManager()
{ {
m_forced.reserve(4); m_forced_fd.reserve(4);
} }
void EventManager::watch(int fd, EventHandler handler) EventManager::~EventManager()
{ {
auto event = std::find_if(m_events.begin(), m_events.end(), assert(m_fd_watchers.empty());
[&](const pollfd& pfd) { return pfd.fd == fd; });
if (event != m_events.end())
throw runtime_error("fd already watched");
m_events.emplace_back(pollfd{ fd, POLLIN | POLLPRI, 0 });
m_handlers.emplace_back(new EventHandler(std::move(handler)));
}
void EventManager::unwatch(int fd)
{
auto event = std::find_if(m_events.begin(), m_events.end(),
[&](const pollfd& pfd) { return pfd.fd == fd; });
assert(event != m_events.end());
auto handler = m_handlers.begin() + (event - m_events.begin());
// keep handler in m_handlers_trash so that it does not die now,
// but at the end of handle_next_events. We do this as handler might
// be our caller.
m_handlers_trash.emplace_back(std::move(*handler));
m_handlers.erase(handler);
m_events.erase(event);
} }
void EventManager::handle_next_events() void EventManager::handle_next_events()
{ {
const int timeout_ms = 100; const int timeout_ms = 100;
poll(m_events.data(), m_events.size(), timeout_ms); std::vector<pollfd> events;
std::vector<int> forced = m_forced; events.reserve(m_fd_watchers.size());
m_forced.clear(); for (auto& watcher : m_fd_watchers)
for (size_t i = 0; i < m_events.size(); ++i) events.emplace_back(pollfd{ watcher->fd(), POLLIN | POLLPRI, 0 });
std::vector<int> forced = m_forced_fd;
m_forced_fd.clear();
poll(events.data(), events.size(), timeout_ms);
for (size_t i = 0; i < events.size(); ++i)
{ {
auto& event = m_events[i]; auto& event = events[i];
const int fd = event.fd; const int fd = event.fd;
if (event.revents or contains(forced, fd)) if (event.revents or contains(forced, fd))
(*m_handlers[i])(fd); {
auto it = std::find_if(m_fd_watchers.begin(), m_fd_watchers.end(),
[fd](FDWatcher* w) { return w->fd() == fd; });
if (it != m_fd_watchers.end())
(*it)->run();
}
} }
m_handlers_trash.clear();
} }
void EventManager::force_signal(int fd) void EventManager::force_signal(int fd)
{ {
m_forced.push_back(fd); m_forced_fd.push_back(fd);
}
void EventManager::register_fd_watcher(FDWatcher* event)
{
assert(not contains(m_fd_watchers, event));
m_fd_watchers.push_back(event);
}
void EventManager::unregister_fd_watcher(FDWatcher* event)
{
auto it = find(m_fd_watchers, event);
assert(it != m_fd_watchers.end());
m_fd_watchers.erase(it);
} }
} }

View File

@ -1,15 +1,24 @@
#ifndef event_manager_hh_INCLUDED #ifndef event_manager_hh_INCLUDED
#define event_manager_hh_INCLUDED #define event_manager_hh_INCLUDED
#include <poll.h>
#include <unordered_map>
#include "utils.hh" #include "utils.hh"
namespace Kakoune namespace Kakoune
{ {
using EventHandler = std::function<void (int fd)>; class FDWatcher
{
public:
using Callback = std::function<void (FDWatcher& watcher)>;
FDWatcher(int fd, Callback callback);
~FDWatcher();
int fd() const { return m_fd; }
void run() { m_callback(*this); }
private:
int m_fd;
Callback m_callback;
};
// The EventManager provides an interface to file descriptor // The EventManager provides an interface to file descriptor
// based event handling. // based event handling.
@ -20,26 +29,21 @@ class EventManager : public Singleton<EventManager>
{ {
public: public:
EventManager(); EventManager();
// Watch the given file descriptor, when data becomes ~EventManager();
// ready, handler will be called with fd as parameter.
// It is an error to register multiple handlers on the
// same file descriptor.
void watch(int fd, EventHandler handler);
// stop watching fd
void unwatch(int fd);
void handle_next_events(); void handle_next_events();
// force the handler associated with fd to be executed // force the watchers associated with fd to be executed
// on next handle_next_events call. // on next handle_next_events call.
void force_signal(int fd); void force_signal(int fd);
private: private:
std::vector<pollfd> m_events; friend class FDWatcher;
std::vector<std::unique_ptr<EventHandler>> m_handlers; void register_fd_watcher(FDWatcher* watcher);
std::vector<std::unique_ptr<EventHandler>> m_handlers_trash; void unregister_fd_watcher(FDWatcher* watcher);
std::vector<int> m_forced;
std::vector<FDWatcher*> m_fd_watchers;
std::vector<int> m_forced_fd;
}; };
} }

View File

@ -585,42 +585,42 @@ struct Server : public Singleton<Server>
{ {
m_filename = "/tmp/kak-" + int_to_str(getpid()); m_filename = "/tmp/kak-" + int_to_str(getpid());
m_listen_sock = socket(AF_UNIX, SOCK_STREAM, 0); int listen_sock = socket(AF_UNIX, SOCK_STREAM, 0);
fcntl(m_listen_sock, F_SETFD, FD_CLOEXEC); fcntl(listen_sock, F_SETFD, FD_CLOEXEC);
sockaddr_un addr; sockaddr_un addr;
addr.sun_family = AF_UNIX; addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, m_filename.c_str(), sizeof(addr.sun_path) - 1); strncpy(addr.sun_path, m_filename.c_str(), sizeof(addr.sun_path) - 1);
if (bind(m_listen_sock, (sockaddr*) &addr, sizeof(sockaddr_un)) == -1) if (bind(listen_sock, (sockaddr*) &addr, sizeof(sockaddr_un)) == -1)
throw runtime_error("unable to bind listen socket " + m_filename); throw runtime_error("unable to bind listen socket " + m_filename);
if (listen(m_listen_sock, 4) == -1) if (listen(listen_sock, 4) == -1)
throw runtime_error("unable to listen on socket " + m_filename); throw runtime_error("unable to listen on socket " + m_filename);
auto accepter = [=](int socket) { auto accepter = [](FDWatcher& watcher) {
sockaddr_un client_addr; sockaddr_un client_addr;
socklen_t client_addr_len = sizeof(sockaddr_un); socklen_t client_addr_len = sizeof(sockaddr_un);
int sock = accept(socket, (sockaddr*) &client_addr, &client_addr_len); int sock = accept(watcher.fd(), (sockaddr*) &client_addr, &client_addr_len);
if (sock == -1) if (sock == -1)
throw runtime_error("accept failed"); throw runtime_error("accept failed");
fcntl(sock, F_SETFD, FD_CLOEXEC); fcntl(sock, F_SETFD, FD_CLOEXEC);
EventManager::instance().watch(sock, handle_remote); new FDWatcher{sock, handle_remote};
}; };
EventManager::instance().watch(m_listen_sock, accepter); m_listener.reset(new FDWatcher{listen_sock, accepter});
} }
~Server() ~Server()
{ {
unlink(m_filename.c_str()); unlink(m_filename.c_str());
close(m_listen_sock); close(m_listener->fd());
} }
const String& filename() const { return m_filename; } const String& filename() const { return m_filename; }
private: private:
int m_listen_sock;
String m_filename; String m_filename;
std::unique_ptr<FDWatcher> m_listener;
}; };
void register_env_vars() void register_env_vars()
@ -703,7 +703,7 @@ RemoteClient* connect_to(const String& pid, const String& init_command)
NCursesUI* ui = new NCursesUI{}; NCursesUI* ui = new NCursesUI{};
RemoteClient* remote_client = new RemoteClient{sock, ui, init_command}; RemoteClient* remote_client = new RemoteClient{sock, ui, init_command};
EventManager::instance().watch(sock, [=](int) { new FDWatcher{sock, [=](FDWatcher&) {
try try
{ {
remote_client->process_next_message(); remote_client->process_next_message();
@ -712,9 +712,9 @@ RemoteClient* connect_to(const String& pid, const String& init_command)
{ {
ui->print_status(error.description(), -1); ui->print_status(error.description(), -1);
} }
}); }};
EventManager::instance().watch(0, [=](int) { new FDWatcher{0, [=](FDWatcher& ev) {
try try
{ {
remote_client->write_next_key(); remote_client->write_next_key();
@ -723,7 +723,7 @@ RemoteClient* connect_to(const String& pid, const String& init_command)
{ {
ui->print_status(error.description(), -1); ui->print_status(error.description(), -1);
} }
}); }};
return remote_client; return remote_client;
} }

View File

@ -365,12 +365,13 @@ void RemoteClient::write_next_key()
} }
} }
void handle_remote(int socket) void handle_remote(FDWatcher& watcher)
{ {
int socket = watcher.fd();
String init_command = read<String>(socket); String init_command = read<String>(socket);
RemoteUI* ui = new RemoteUI{socket}; RemoteUI* ui = new RemoteUI{socket};
EventManager::instance().unwatch(socket); delete &watcher;
ClientManager::instance().create_client( ClientManager::instance().create_client(
std::unique_ptr<UserInterface>{ui}, socket, init_command); std::unique_ptr<UserInterface>{ui}, socket, init_command);
} }

View File

@ -3,13 +3,14 @@
#include "user_interface.hh" #include "user_interface.hh"
#include "display_buffer.hh" #include "display_buffer.hh"
#include "event_manager.hh"
namespace Kakoune namespace Kakoune
{ {
struct peer_disconnected {}; struct peer_disconnected {};
void handle_remote(int socket); void handle_remote(FDWatcher& event);
class RemoteClient class RemoteClient
{ {