diff options
Diffstat (limited to 'tests/lib/test_zmq.c')
-rw-r--r-- | tests/lib/test_zmq.c | 151 |
1 files changed, 132 insertions, 19 deletions
diff --git a/tests/lib/test_zmq.c b/tests/lib/test_zmq.c index c270ec3d1..b6624915e 100644 --- a/tests/lib/test_zmq.c +++ b/tests/lib/test_zmq.c @@ -23,6 +23,7 @@ #include "frr_zmq.h" DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer") +DEFINE_MTYPE_STATIC(LIB, ZMQMSG, "zmq message") static struct thread_master *master; @@ -31,6 +32,25 @@ static void msg_buf_free(void *data, void *hint) XFREE(MTYPE_TESTBUF, data); } +static int recv_delim(void *zmqsock) +{ + /* receive delim */ + zmq_msg_t zdelim; + int more; + zmq_msg_init(&zdelim); + zmq_msg_recv(&zdelim, zmqsock, 0); + more = zmq_msg_more(&zdelim); + zmq_msg_close(&zdelim); + return more; +} +static void send_delim(void *zmqsock) +{ + /* Send delim */ + zmq_msg_t zdelim; + zmq_msg_init(&zdelim); + zmq_msg_send(&zdelim, zmqsock, ZMQ_SNDMORE); + zmq_msg_close(&zdelim); +} static void run_client(int syncfd) { int i, j; @@ -38,13 +58,14 @@ static void run_client(int syncfd) char dummy; void *zmqctx = NULL; void *zmqsock; + int more; read(syncfd, &dummy, 1); zmqctx = zmq_ctx_new(); zmq_ctx_set(zmqctx, ZMQ_IPV6, 1); - zmqsock = zmq_socket(zmqctx, ZMQ_REQ); + zmqsock = zmq_socket(zmqctx, ZMQ_DEALER); if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) { perror("zmq_connect"); exit(1); @@ -52,22 +73,28 @@ static void run_client(int syncfd) /* single-part */ for (i = 0; i < 8; i++) { - snprintf(buf, sizeof(buf), "msg #%d %c%c%c", - i, 'a' + i, 'b' + i, 'c' + i); + snprintf(buf, sizeof(buf), "msg #%d %c%c%c", i, 'a' + i, + 'b' + i, 'c' + i); printf("client send: %s\n", buf); fflush(stdout); - zmq_send(zmqsock, buf, strlen(buf) + 1, 0); - zmq_recv(zmqsock, buf, sizeof(buf), 0); - printf("client recv: %s\n", buf); + send_delim(zmqsock); + zmq_send(zmqsock, buf, strlen(buf) + 1, 0); + more = recv_delim(zmqsock); + while (more) { + zmq_recv(zmqsock, buf, sizeof(buf), 0); + printf("client recv: %s\n", buf); + size_t len = sizeof(more); + if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len)) + break; + } } /* multipart */ for (i = 2; i < 5; i++) { - int more; - printf("---\n"); + send_delim(zmqsock); + zmq_msg_t part; for (j = 1; j <= i; j++) { - zmq_msg_t part; char *dyn = XMALLOC(MTYPE_TESTBUF, 32); snprintf(dyn, 32, "part %d/%d", j, i); @@ -79,7 +106,7 @@ static void run_client(int syncfd) zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0); } - zmq_msg_t part; + recv_delim(zmqsock); do { char *data; @@ -90,26 +117,85 @@ static void run_client(int syncfd) } while (more); zmq_msg_close(&part); } + + /* write callback */ + printf("---\n"); + snprintf(buf, 32, "Done receiving"); + printf("client send: %s\n", buf); + fflush(stdout); + send_delim(zmqsock); + zmq_send(zmqsock, buf, strlen(buf) + 1, 0); + /* wait for message from server */ + more = recv_delim(zmqsock); + while (more) { + zmq_recv(zmqsock, buf, sizeof(buf), 0); + printf("client recv: %s\n", buf); + size_t len = sizeof(more); + if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len)) + break; + } + zmq_close(zmqsock); zmq_ctx_term(zmqctx); } static struct frrzmq_cb *cb; +static void recv_id_and_delim(void *zmqsock, zmq_msg_t *msg_id) +{ + /* receive id */ + zmq_msg_init(msg_id); + zmq_msg_recv(msg_id, zmqsock, 0); + /* receive delim */ + recv_delim(zmqsock); +} +static void send_id_and_delim(void *zmqsock, zmq_msg_t *msg_id) +{ + /* Send Id */ + zmq_msg_send(msg_id, zmqsock, ZMQ_SNDMORE); + send_delim(zmqsock); +} +static void serverwritefn(void *arg, void *zmqsock) +{ + zmq_msg_t *msg_id = (zmq_msg_t *)arg; + char buf[32] = "Test write callback"; + size_t i; + + for (i = 0; i < strlen(buf); i++) + buf[i] = toupper(buf[i]); + printf("server send: %s\n", buf); + fflush(stdout); + send_id_and_delim(zmqsock, msg_id); + zmq_send(zmqsock, buf, strlen(buf) + 1, 0); + + /* send just once */ + frrzmq_thread_cancel(&cb, &cb->write); + + zmq_msg_close(msg_id); + XFREE(MTYPE_ZMQMSG, msg_id); +} static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg, - unsigned partnum) + unsigned partnum) { + static int num = 0; int more = zmq_msg_more(msg); char *in = zmq_msg_data(msg); size_t i; zmq_msg_t reply; char *out; + /* Id */ + if (partnum == 0) { + send_id_and_delim(zmqsock, msg); + return; + } + /* Delim */ + if (partnum == 1) + return; + + printf("server recv part %u (more: %d): %s\n", partnum, more, in); fflush(stdout); - /* REQ-REP doesn't allow sending a reply here */ - if (more) - return; out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1); for (i = 0; i < strlen(in); i++) @@ -118,39 +204,66 @@ static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg, zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE); + if (more) + return; + out = XMALLOC(MTYPE_TESTBUF, 32); snprintf(out, 32, "msg# was %u", partnum); zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); zmq_msg_send(&reply, zmqsock, 0); + + zmq_msg_close(&reply); + + if (++num < 7) + return; + + /* write callback test */ + char buf[32]; + zmq_msg_t *msg_id = XMALLOC(MTYPE_ZMQMSG, sizeof(zmq_msg_t)); + recv_id_and_delim(zmqsock, msg_id); + zmq_recv(zmqsock, buf, sizeof(buf), 0); + printf("server recv: %s\n", buf); + fflush(stdout); + + frrzmq_thread_add_write_msg(master, serverwritefn, NULL, msg_id, + zmqsock, &cb); } static void serverfn(void *arg, void *zmqsock) { static int num = 0; + zmq_msg_t msg_id; char buf[32]; size_t i; + + recv_id_and_delim(zmqsock, &msg_id); zmq_recv(zmqsock, buf, sizeof(buf), 0); printf("server recv: %s\n", buf); fflush(stdout); for (i = 0; i < strlen(buf); i++) buf[i] = toupper(buf[i]); + send_id_and_delim(zmqsock, &msg_id); + zmq_msg_close(&msg_id); zmq_send(zmqsock, buf, strlen(buf) + 1, 0); if (++num < 4) return; /* change to multipart callback */ - frrzmq_thread_cancel(cb); + frrzmq_thread_cancel(&cb, &cb->read); + frrzmq_thread_cancel(&cb, &cb->write); - cb = frrzmq_thread_add_read_part(master, serverpartfn, NULL, zmqsock); + frrzmq_thread_add_read_part(master, serverpartfn, NULL, NULL, zmqsock, + &cb); } static void sigchld(void) { printf("child exited.\n"); - frrzmq_thread_cancel(cb); + frrzmq_thread_cancel(&cb, &cb->read); + frrzmq_thread_cancel(&cb, &cb->write); } static struct quagga_signal_t sigs[] = { @@ -170,13 +283,13 @@ static void run_server(int syncfd) signal_init(master, array_size(sigs), sigs); frrzmq_init(); - zmqsock = zmq_socket(frrzmq_context, ZMQ_REP); + zmqsock = zmq_socket(frrzmq_context, ZMQ_ROUTER); if (zmq_bind(zmqsock, "tcp://*:17171")) { perror("zmq_bind"); exit(1); } - cb = frrzmq_thread_add_read_msg(master, serverfn, NULL, zmqsock); + frrzmq_thread_add_read_msg(master, serverfn, NULL, NULL, zmqsock, &cb); write(syncfd, &dummy, sizeof(dummy)); while (thread_fetch(master, &t)) |