diff options
-rw-r--r-- | zebra/zserv.c | 73 | ||||
-rw-r--r-- | zebra/zserv.h | 1 |
2 files changed, 42 insertions, 32 deletions
diff --git a/zebra/zserv.c b/zebra/zserv.c index 4b12aefd2..20c0bf547 100644 --- a/zebra/zserv.c +++ b/zebra/zserv.c @@ -170,6 +170,7 @@ static void zserv_client_close(struct zserv *client) { THREAD_OFF(client->t_read); THREAD_OFF(client->t_write); + THREAD_OFF(client->t_flush); zserv_event(client, ZSERV_HANDLE_CLOSE); } @@ -177,7 +178,6 @@ static int zserv_flush_data(struct thread *thread) { struct zserv *client = THREAD_ARG(thread); - client->t_write = NULL; switch (buffer_flush_available(client->wb, client->sock)) { case BUFFER_ERROR: zlog_warn( @@ -217,6 +217,7 @@ static int zserv_write(struct thread *thread) uint32_t wcmd; int writerv = BUFFER_EMPTY; struct stream_fifo *cache = stream_fifo_new(); + bool ok = true; pthread_mutex_lock(&client->obuf_mtx); { @@ -226,7 +227,7 @@ static int zserv_write(struct thread *thread) } pthread_mutex_unlock(&client->obuf_mtx); - while (cache->head) { + while (stream_fifo_head(cache) && ok) { msg = stream_fifo_pop(cache); stream_set_getp(msg, 0); @@ -234,26 +235,29 @@ static int zserv_write(struct thread *thread) writerv = buffer_write(client->wb, client->sock, STREAM_DATA(msg), stream_get_endp(msg)); + switch (writerv) { + case BUFFER_ERROR: + zlog_warn("%s: buffer_write failed to ZAPI client %s [fd = %d]", + __func__, zebra_route_string(client->proto), + client->sock); + zlog_warn("%s: closing connection to %s", __func__, + zebra_route_string(client->proto)); + zserv_client_close(client); + ok = false; + break; + /* continue writing */ + case BUFFER_PENDING: + case BUFFER_EMPTY: + break; + } + stream_free(msg); } - stream_fifo_free(cache); - - switch (writerv) { - case BUFFER_ERROR: - zlog_warn("%s: buffer_write failed to ZAPI client %s [fd = %d]", - __func__, zebra_route_string(client->proto), - client->sock); - zlog_warn("%s: closing connection to %s", __func__, - zebra_route_string(client->proto)); - zserv_client_close(client); - break; - case BUFFER_PENDING: + if (ok && writerv == BUFFER_PENDING) zserv_client_event(client, ZSERV_CLIENT_FLUSH_DATA); - break; - case BUFFER_EMPTY: - break; - } + + stream_fifo_free(cache); atomic_store_explicit(&client->last_write_cmd, wcmd, memory_order_relaxed); @@ -411,14 +415,15 @@ static int zserv_read(struct thread *thread) stream_fifo_pop(cache)); } pthread_mutex_unlock(&client->ibuf_mtx); + + /* Schedule job to process those packets */ + zserv_event(client, ZSERV_PROCESS_MESSAGES); + } if (IS_ZEBRA_DEBUG_PACKET) zlog_debug("Read %d packets", p2p_orig - p2p); - /* Schedule job to process those packets */ - zserv_event(client, ZSERV_PROCESS_MESSAGES); - /* Reschedule ourselves */ zserv_client_event(client, ZSERV_CLIENT_READ); @@ -446,7 +451,7 @@ static void zserv_client_event(struct zserv *client, break; case ZSERV_CLIENT_FLUSH_DATA: thread_add_write(client->pthread->master, zserv_flush_data, - client, client->sock, &client->t_write); + client, client->sock, &client->t_flush); break; } } @@ -464,8 +469,11 @@ static void zserv_client_event(struct zserv *client, * with the message is executed. This proceeds until there are no more messages, * an error occurs, or the processing limit is reached. * - * This task reschedules itself if it cannot process everything on the input - * queue in one run. + * The client's I/O thread can push at most zebrad.packets_to_process messages + * onto the input buffer before notifying us there are packets to read. As long + * as we always process zebrad.packets_to_process messages here, then we can + * rely on the read thread to handle queuing this task enough times to process + * everything on the input queue. */ static int zserv_process_messages(struct thread *thread) { @@ -477,19 +485,19 @@ static int zserv_process_messages(struct thread *thread) pthread_mutex_lock(&client->ibuf_mtx); { - for (uint32_t i = p2p - 1; i && client->ibuf_fifo->head; --i) - stream_fifo_push(cache, - stream_fifo_pop(client->ibuf_fifo)); + uint32_t i; + for (i = 0; i < p2p && stream_fifo_head(client->ibuf_fifo); + ++i) { + msg = stream_fifo_pop(client->ibuf_fifo); + stream_fifo_push(cache, msg); + } - if (client->ibuf_fifo->head) - zserv_event(client, ZSERV_PROCESS_MESSAGES); + msg = NULL; } pthread_mutex_unlock(&client->ibuf_mtx); - while (p2p--) { + while (stream_fifo_head(cache)) { msg = stream_fifo_pop(cache); - if (!msg) - break; zserv_handle_commands(client, msg); stream_free(msg); } @@ -614,6 +622,7 @@ static int zserv_handle_client_close(struct thread *thread) */ assert(!client->t_read); assert(!client->t_write); + assert(!client->t_flush); /* synchronously stop thread */ frr_pthread_stop(client->pthread, NULL); diff --git a/zebra/zserv.h b/zebra/zserv.h index a1b55bf8e..fc338d89e 100644 --- a/zebra/zserv.h +++ b/zebra/zserv.h @@ -72,6 +72,7 @@ struct zserv { /* Threads for read/write. */ struct thread *t_read; struct thread *t_write; + struct thread *t_flush; /* default routing table this client munges */ int rtm_table; |