Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix: when kill a connection, it have a change make the killed thread …
…wait forever

the killer thread:
   shutdown the killed thread vio, this action will close the connection fd and remove the fd from the red-black tree of the epoll_wait fd
the killed thread:
   first run the start_io function, this function will bind the connection fd to the epoll_wait fd.
   second the connection wait from the epoll_wait

when worker thread process a connection, first bind the connection fd to the epoll_wait fd, then killer thread close the connection fd, which
will make the killed connection have no change to run any more.
  • Loading branch information
pkullj committed Sep 2, 2025
commit 27df4343a6cf440f49f1935923bffb03d11b117d
4 changes: 4 additions & 0 deletions include/violite.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,10 @@ struct Vio {
std::atomic_flag kevent_wakeup_flag = ATOMIC_FLAG_INIT;
#endif

#ifdef HAVE_POOL_OF_THREADS
std::atomic_flag epoll_shutdown_flag = ATOMIC_FLAG_INIT;
#endif

#ifdef HAVE_SETNS
/**
Socket network namespace.
Expand Down
12 changes: 12 additions & 0 deletions sql/threadpool_unix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,8 @@ static connection_t *listener(thread_group_t *thread_group) {
*/
for (int i = (listener_picks_event) ? 1 : 0; i < cnt; i++) {
connection_t *c = (connection_t *)native_event_get_userdata(&ev[i]);
Vio *vio = c->thd->get_protocol_classic()->get_vio();
vio->epoll_shutdown_flag.clear();
if (connection_is_high_prio(*c)) {
c->tickets--;
thread_group->high_prio_queue.push_back(c);
Expand All @@ -724,6 +726,8 @@ static connection_t *listener(thread_group_t *thread_group) {
if (listener_picks_event) {
/* Handle the first event. */
retval = (connection_t *)native_event_get_userdata(&ev[0]);
Vio *vio = retval->thd->get_protocol_classic()->get_vio();
vio->epoll_shutdown_flag.clear();
mysql_mutex_unlock(&thread_group->mutex);
break;
}
Expand Down Expand Up @@ -1403,6 +1407,14 @@ static int start_io(connection_t *connection) {
Bind to poll descriptor if not yet done.
*/
Vio *vio = connection->thd->get_protocol_classic()->get_vio();
if (vio && vio->epoll_shutdown_flag.test_and_set()) {
/*
connection in the shutdown progress,
return -1 make sure abort this connection
*/
vio->epoll_shutdown_flag.clear();
return -1;
}
int fd = mysql_socket_getfd(vio->mysql_socket);
if (!connection->bound_to_poll_descriptor) {
connection->bound_to_poll_descriptor = true;
Expand Down
16 changes: 16 additions & 0 deletions vio/viosocket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,15 @@ static void vio_wait_until_woken(Vio *vio) {
}
#endif

#ifdef HAVE_POOL_OF_THREADS
static void vio_wait_until_epoll_woken(Vio *vio) {
while (vio->epoll_shutdown_flag.test_and_set()) {
// make connection wake up from epoll_wait in the worker thread.
vio_cancel(vio, SHUT_RD);
}
}
#endif

int vio_shutdown(Vio *vio, int how) {
DBUG_TRACE;

Expand All @@ -553,6 +562,13 @@ int vio_shutdown(Vio *vio, int how) {
vio_wait_until_woken(vio);
#endif

#ifdef HAVE_POOL_OF_THREADS
if (vio->thread_id.value() != 0 &&
vio->epoll_shutdown_flag.test_and_set()) {
vio_wait_until_epoll_woken(vio);
}
#endif

if (mysql_socket_close(vio->mysql_socket)) r = -1;
#ifdef HAVE_KQUEUE
if (vio->kq_fd == -1 || close(vio->kq_fd)) r = -1;
Expand Down