Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 24 additions & 27 deletions connectd/multiplex.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ struct subd {

/* FIXME: reorder! */
static void destroy_connected_subd(struct subd *subd);
static struct io_plan *write_to_peer(struct io_conn *peer_conn,
struct peer *peer);

static struct subd *find_subd(struct peer *peer,
const struct channel_id *channel_id)
Expand Down Expand Up @@ -160,6 +162,21 @@ void disconnect_peer(struct peer *peer)
}
}

static void free_all_subds(struct peer *peer)
{
for (size_t i = 0; i < tal_count(peer->subds); i++) {
/* Once conn exists, subd is a child of the conn. Free conn, free subd. */
if (peer->subds[i]->conn) {
tal_del_destructor(peer->subds[i], destroy_connected_subd);
tal_free(peer->subds[i]->conn);
} else {
/* We told lightningd that peer spoke, but it hasn't returned yet. */
tal_free(peer->subds[i]);
}
}
tal_resize(&peer->subds, 0);
}

/* Send warning, close connection to peer */
static void send_warning(struct peer *peer, const char *fmt, ...)
{
Expand All @@ -177,17 +194,7 @@ static void send_warning(struct peer *peer, const char *fmt, ...)
inject_peer_msg(peer, take(msg));

/* Free all the subds immediately */
for (size_t i = 0; i < tal_count(peer->subds); i++) {
/* Once conn exists, subd is a child of the conn. Free conn, free subd. */
if (peer->subds[i]->conn) {
tal_del_destructor(peer->subds[i], destroy_connected_subd);
tal_free(peer->subds[i]->conn);
} else {
/* We told lightningd that peer spoke, but it hasn't returned yet. */
tal_free(peer->subds[i]);
}
}
tal_resize(&peer->subds, 0);
free_all_subds(peer);
disconnect_peer(peer);
}

Expand Down Expand Up @@ -397,12 +404,6 @@ static bool is_urgent(enum peer_wire type)
return false;
}

/* io_sock_shutdown, but in format suitable for an io_plan callback */
static struct io_plan *io_sock_shutdown_cb(struct io_conn *conn, struct peer *unused)
{
return io_sock_shutdown(conn);
}

/* Process and eat protocol_batch_element messages, encrypt each element message
* and return the encrypted messages as one long byte array. */
static u8 *process_batch_elements(struct peer *peer, const u8 *msg TAKES)
Expand Down Expand Up @@ -466,11 +467,7 @@ static u8 *process_batch_elements(struct peer *peer, const u8 *msg TAKES)
return ret;
}

static struct io_plan *encrypt_and_send(struct peer *peer,
const u8 *msg TAKES,
struct io_plan *(*next)
(struct io_conn *peer_conn,
struct peer *peer))
static struct io_plan *encrypt_and_send(struct peer *peer, const u8 *msg TAKES)
{
int type = fromwire_peektype(msg);

Expand All @@ -482,8 +479,8 @@ static struct io_plan *encrypt_and_send(struct peer *peer,
case DEV_DISCONNECT_OUT_AFTER:
/* Disallow reads from now on */
peer->dev_read_enabled = false;
/* Using io_close here can lose the data we're about to send! */
next = io_sock_shutdown_cb;
free_all_subds(peer);
drain_peer(peer);
break;
case DEV_DISCONNECT_OUT_BLACKHOLE:
/* Disable both reads and writes from now on */
Expand All @@ -499,7 +496,7 @@ static struct io_plan *encrypt_and_send(struct peer *peer,
/* Tell them to read again, */
io_wake(&peer->subds);
return msg_queue_wait(peer->to_peer, peer->peer_outq,
next, peer);
write_to_peer, peer);
case DEV_DISCONNECT_OUT_DISABLE_AFTER:
peer->dev_read_enabled = false;
peer->dev_writes_enabled = tal(peer, u32);
Expand All @@ -523,7 +520,7 @@ static struct io_plan *encrypt_and_send(struct peer *peer,
return io_write(peer->to_peer,
peer->sent_to_peer,
tal_bytelen(peer->sent_to_peer),
next, peer);
write_to_peer, peer);
}

/* Kicks off write_to_peer() to look for more gossip to send from store */
Expand Down Expand Up @@ -1120,7 +1117,7 @@ static struct io_plan *write_to_peer(struct io_conn *peer_conn,
(*peer->dev_writes_enabled)--;
}

return encrypt_and_send(peer, take(msg), write_to_peer);
return encrypt_and_send(peer, take(msg));
}

static struct io_plan *read_from_subd(struct io_conn *subd_conn,
Expand Down
Loading