summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--zebra/zserv.c73
-rw-r--r--zebra/zserv.h1
2 files changed, 42 insertions, 32 deletions
diff --git a/zebra/zserv.c b/zebra/zserv.c
index 4b12aefd2..20c0bf547 100644
--- a/zebra/zserv.c
+++ b/zebra/zserv.c
@@ -170,6 +170,7 @@ static void zserv_client_close(struct zserv *client)
{
THREAD_OFF(client->t_read);
THREAD_OFF(client->t_write);
+ THREAD_OFF(client->t_flush);
zserv_event(client, ZSERV_HANDLE_CLOSE);
}
@@ -177,7 +178,6 @@ static int zserv_flush_data(struct thread *thread)
{
struct zserv *client = THREAD_ARG(thread);
- client->t_write = NULL;
switch (buffer_flush_available(client->wb, client->sock)) {
case BUFFER_ERROR:
zlog_warn(
@@ -217,6 +217,7 @@ static int zserv_write(struct thread *thread)
uint32_t wcmd;
int writerv = BUFFER_EMPTY;
struct stream_fifo *cache = stream_fifo_new();
+ bool ok = true;
pthread_mutex_lock(&client->obuf_mtx);
{
@@ -226,7 +227,7 @@ static int zserv_write(struct thread *thread)
}
pthread_mutex_unlock(&client->obuf_mtx);
- while (cache->head) {
+ while (stream_fifo_head(cache) && ok) {
msg = stream_fifo_pop(cache);
stream_set_getp(msg, 0);
@@ -234,26 +235,29 @@ static int zserv_write(struct thread *thread)
writerv = buffer_write(client->wb, client->sock,
STREAM_DATA(msg), stream_get_endp(msg));
+ switch (writerv) {
+ case BUFFER_ERROR:
+ zlog_warn("%s: buffer_write failed to ZAPI client %s [fd = %d]",
+ __func__, zebra_route_string(client->proto),
+ client->sock);
+ zlog_warn("%s: closing connection to %s", __func__,
+ zebra_route_string(client->proto));
+ zserv_client_close(client);
+ ok = false;
+ break;
+ /* continue writing */
+ case BUFFER_PENDING:
+ case BUFFER_EMPTY:
+ break;
+ }
+
stream_free(msg);
}
- stream_fifo_free(cache);
-
- switch (writerv) {
- case BUFFER_ERROR:
- zlog_warn("%s: buffer_write failed to ZAPI client %s [fd = %d]",
- __func__, zebra_route_string(client->proto),
- client->sock);
- zlog_warn("%s: closing connection to %s", __func__,
- zebra_route_string(client->proto));
- zserv_client_close(client);
- break;
- case BUFFER_PENDING:
+ if (ok && writerv == BUFFER_PENDING)
zserv_client_event(client, ZSERV_CLIENT_FLUSH_DATA);
- break;
- case BUFFER_EMPTY:
- break;
- }
+
+ stream_fifo_free(cache);
atomic_store_explicit(&client->last_write_cmd, wcmd,
memory_order_relaxed);
@@ -411,14 +415,15 @@ static int zserv_read(struct thread *thread)
stream_fifo_pop(cache));
}
pthread_mutex_unlock(&client->ibuf_mtx);
+
+ /* Schedule job to process those packets */
+ zserv_event(client, ZSERV_PROCESS_MESSAGES);
+
}
if (IS_ZEBRA_DEBUG_PACKET)
zlog_debug("Read %d packets", p2p_orig - p2p);
- /* Schedule job to process those packets */
- zserv_event(client, ZSERV_PROCESS_MESSAGES);
-
/* Reschedule ourselves */
zserv_client_event(client, ZSERV_CLIENT_READ);
@@ -446,7 +451,7 @@ static void zserv_client_event(struct zserv *client,
break;
case ZSERV_CLIENT_FLUSH_DATA:
thread_add_write(client->pthread->master, zserv_flush_data,
- client, client->sock, &client->t_write);
+ client, client->sock, &client->t_flush);
break;
}
}
@@ -464,8 +469,11 @@ static void zserv_client_event(struct zserv *client,
* with the message is executed. This proceeds until there are no more messages,
* an error occurs, or the processing limit is reached.
*
- * This task reschedules itself if it cannot process everything on the input
- * queue in one run.
+ * The client's I/O thread can push at most zebrad.packets_to_process messages
+ * onto the input buffer before notifying us there are packets to read. As long
+ * as we always process zebrad.packets_to_process messages here, then we can
+ * rely on the read thread to handle queuing this task enough times to process
+ * everything on the input queue.
*/
static int zserv_process_messages(struct thread *thread)
{
@@ -477,19 +485,19 @@ static int zserv_process_messages(struct thread *thread)
pthread_mutex_lock(&client->ibuf_mtx);
{
- for (uint32_t i = p2p - 1; i && client->ibuf_fifo->head; --i)
- stream_fifo_push(cache,
- stream_fifo_pop(client->ibuf_fifo));
+ uint32_t i;
+ for (i = 0; i < p2p && stream_fifo_head(client->ibuf_fifo);
+ ++i) {
+ msg = stream_fifo_pop(client->ibuf_fifo);
+ stream_fifo_push(cache, msg);
+ }
- if (client->ibuf_fifo->head)
- zserv_event(client, ZSERV_PROCESS_MESSAGES);
+ msg = NULL;
}
pthread_mutex_unlock(&client->ibuf_mtx);
- while (p2p--) {
+ while (stream_fifo_head(cache)) {
msg = stream_fifo_pop(cache);
- if (!msg)
- break;
zserv_handle_commands(client, msg);
stream_free(msg);
}
@@ -614,6 +622,7 @@ static int zserv_handle_client_close(struct thread *thread)
*/
assert(!client->t_read);
assert(!client->t_write);
+ assert(!client->t_flush);
/* synchronously stop thread */
frr_pthread_stop(client->pthread, NULL);
diff --git a/zebra/zserv.h b/zebra/zserv.h
index a1b55bf8e..fc338d89e 100644
--- a/zebra/zserv.h
+++ b/zebra/zserv.h
@@ -72,6 +72,7 @@ struct zserv {
/* Threads for read/write. */
struct thread *t_read;
struct thread *t_write;
+ struct thread *t_flush;
/* default routing table this client munges */
int rtm_table;