Skip to content

Commit fdb3be9

Browse files
committed
Refactoring: new function to test if client has pending output.
1 parent 825f65d commit fdb3be9

File tree

3 files changed

+17
-8
lines changed

3 files changed

+17
-8
lines changed

src/networking.c

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,11 @@ int prepareClientToWrite(client *c) {
163163

164164
if (c->fd <= 0) return C_ERR; /* Fake client for AOF loading. */
165165

166-
/* Only install the handler if not already installed and, in case of
167-
* slaves, if the client can actually receive writes. */
168-
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
166+
/* Schedule the client to write the output buffers to the socket only
167+
* if not already done (there were no pending writes alreday and the client
168+
* was yet not flagged), and, for slaves, if the slave can actually
169+
* receive writes at this stage. */
170+
if (!clientHasPendingReplies(c) &&
169171
!(c->flags & CLIENT_PENDING_WRITE) &&
170172
(c->replstate == REPL_STATE_NONE ||
171173
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
@@ -591,6 +593,12 @@ void copyClientOutputBuffer(client *dst, client *src) {
591593
dst->reply_bytes = src->reply_bytes;
592594
}
593595

596+
/* Return true if the specified client has pending reply buffers to write to
597+
* the socket. */
598+
int clientHasPendingReplies(client *c) {
599+
return c->bufpos || listLength(c->reply);
600+
}
601+
594602
#define MAX_ACCEPTS_PER_CALL 1000
595603
static void acceptCommonHandler(int fd, int flags) {
596604
client *c;
@@ -824,7 +832,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
824832
size_t objmem;
825833
robj *o;
826834

827-
while(c->bufpos > 0 || listLength(c->reply)) {
835+
while(clientHasPendingReplies(c)) {
828836
if (c->bufpos > 0) {
829837
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
830838
if (nwritten <= 0) break;
@@ -890,7 +898,7 @@ int writeToClient(int fd, client *c, int handler_installed) {
890898
* We just rely on data / pings received for timeout detection. */
891899
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
892900
}
893-
if (c->bufpos == 0 && listLength(c->reply) == 0) {
901+
if (!clientHasPendingReplies(c)) {
894902
c->sentlen = 0;
895903
if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
896904

@@ -929,7 +937,7 @@ void handleClientsWithPendingWrites(void) {
929937

930938
/* If there is nothing left, do nothing. Otherwise install
931939
* the write handler. */
932-
if ((c->bufpos || listLength(c->reply)) &&
940+
if (clientHasPendingReplies(c) &&
933941
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
934942
sendReplyToClient, c) == AE_ERR)
935943
{

src/replication.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ void syncCommand(client *c) {
570570
* the client about already issued commands. We need a fresh reply
571571
* buffer registering the differences between the BGSAVE and the current
572572
* dataset, so that we can copy to other slaves if needed. */
573-
if (listLength(c->reply) != 0 || c->bufpos != 0) {
573+
if (clientHasPendingReplies(c)) {
574574
addReplyError(c,"SYNC and PSYNC are invalid with pending output");
575575
return;
576576
}
@@ -1924,7 +1924,7 @@ void replicationResurrectCachedMaster(int newfd) {
19241924

19251925
/* We may also need to install the write handler as well if there is
19261926
* pending data in the write buffers. */
1927-
if (server.master->bufpos || listLength(server.master->reply)) {
1927+
if (clientHasPendingReplies(server.master)) {
19281928
if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE,
19291929
sendReplyToClient, server.master)) {
19301930
serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));

src/server.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1111,6 +1111,7 @@ void pauseClients(mstime_t duration);
11111111
int clientsArePaused(void);
11121112
int processEventsWhileBlocked(void);
11131113
void handleClientsWithPendingWrites(void);
1114+
int clientHasPendingReplies(client *c);
11141115

11151116
#ifdef __GNUC__
11161117
void addReplyErrorFormat(client *c, const char *fmt, ...)

0 commit comments

Comments
 (0)