diff options
author | ßingen <bingen@voltanet.io> | 2017-11-09 16:02:18 +0100 |
---|---|---|
committer | ßingen <bingen@voltanet.io> | 2017-11-23 12:50:27 +0100 |
commit | afd0f10d6350b9753ad6d00a760fce150d90bbf1 (patch) | |
tree | 1fdff338f94f546338a04a5dfcc65209e73a8158 /tests | |
parent | Merge pull request #1391 from LabNConsulting/working/master/patch-set/vnc-vrf... (diff) | |
download | frr-afd0f10d6350b9753ad6d00a760fce150d90bbf1.tar.xz frr-afd0f10d6350b9753ad6d00a760fce150d90bbf1.zip |
lib: Address ZMQ lib TODOs
Add write callback.
Add error callback.
Add frrzmq_check_events() function to check for edge triggered things
that may have happened after a zmq_send() call or so.
Update ZMQ tests.
Signed-off-by: ßingen <bingen@voltanet.io>
Diffstat (limited to 'tests')
-rw-r--r-- | tests/lib/test_zmq.c | 151 | ||||
-rw-r--r-- | tests/lib/test_zmq.refout | 49 |
2 files changed, 165 insertions, 35 deletions
diff --git a/tests/lib/test_zmq.c b/tests/lib/test_zmq.c index c270ec3d1..b6624915e 100644 --- a/tests/lib/test_zmq.c +++ b/tests/lib/test_zmq.c @@ -23,6 +23,7 @@ #include "frr_zmq.h" DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer") +DEFINE_MTYPE_STATIC(LIB, ZMQMSG, "zmq message") static struct thread_master *master; @@ -31,6 +32,25 @@ static void msg_buf_free(void *data, void *hint) XFREE(MTYPE_TESTBUF, data); } +static int recv_delim(void *zmqsock) +{ + /* receive delim */ + zmq_msg_t zdelim; + int more; + zmq_msg_init(&zdelim); + zmq_msg_recv(&zdelim, zmqsock, 0); + more = zmq_msg_more(&zdelim); + zmq_msg_close(&zdelim); + return more; +} +static void send_delim(void *zmqsock) +{ + /* Send delim */ + zmq_msg_t zdelim; + zmq_msg_init(&zdelim); + zmq_msg_send(&zdelim, zmqsock, ZMQ_SNDMORE); + zmq_msg_close(&zdelim); +} static void run_client(int syncfd) { int i, j; @@ -38,13 +58,14 @@ static void run_client(int syncfd) char dummy; void *zmqctx = NULL; void *zmqsock; + int more; read(syncfd, &dummy, 1); zmqctx = zmq_ctx_new(); zmq_ctx_set(zmqctx, ZMQ_IPV6, 1); - zmqsock = zmq_socket(zmqctx, ZMQ_REQ); + zmqsock = zmq_socket(zmqctx, ZMQ_DEALER); if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) { perror("zmq_connect"); exit(1); @@ -52,22 +73,28 @@ static void run_client(int syncfd) /* single-part */ for (i = 0; i < 8; i++) { - snprintf(buf, sizeof(buf), "msg #%d %c%c%c", - i, 'a' + i, 'b' + i, 'c' + i); + snprintf(buf, sizeof(buf), "msg #%d %c%c%c", i, 'a' + i, + 'b' + i, 'c' + i); printf("client send: %s\n", buf); fflush(stdout); - zmq_send(zmqsock, buf, strlen(buf) + 1, 0); - zmq_recv(zmqsock, buf, sizeof(buf), 0); - printf("client recv: %s\n", buf); + send_delim(zmqsock); + zmq_send(zmqsock, buf, strlen(buf) + 1, 0); + more = recv_delim(zmqsock); + while (more) { + zmq_recv(zmqsock, buf, sizeof(buf), 0); + printf("client recv: %s\n", buf); + size_t len = sizeof(more); + if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len)) + break; + } } /* multipart */ for (i = 2; i < 5; i++) { - int more; - printf("---\n"); + send_delim(zmqsock); + zmq_msg_t part; for (j = 1; j <= i; j++) { - zmq_msg_t part; char *dyn = XMALLOC(MTYPE_TESTBUF, 32); snprintf(dyn, 32, "part %d/%d", j, i); @@ -79,7 +106,7 @@ static void run_client(int syncfd) zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0); } - zmq_msg_t part; + recv_delim(zmqsock); do { char *data; @@ -90,26 +117,85 @@ static void run_client(int syncfd) } while (more); zmq_msg_close(&part); } + + /* write callback */ + printf("---\n"); + snprintf(buf, 32, "Done receiving"); + printf("client send: %s\n", buf); + fflush(stdout); + send_delim(zmqsock); + zmq_send(zmqsock, buf, strlen(buf) + 1, 0); + /* wait for message from server */ + more = recv_delim(zmqsock); + while (more) { + zmq_recv(zmqsock, buf, sizeof(buf), 0); + printf("client recv: %s\n", buf); + size_t len = sizeof(more); + if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len)) + break; + } + zmq_close(zmqsock); zmq_ctx_term(zmqctx); } static struct frrzmq_cb *cb; +static void recv_id_and_delim(void *zmqsock, zmq_msg_t *msg_id) +{ + /* receive id */ + zmq_msg_init(msg_id); + zmq_msg_recv(msg_id, zmqsock, 0); + /* receive delim */ + recv_delim(zmqsock); +} +static void send_id_and_delim(void *zmqsock, zmq_msg_t *msg_id) +{ + /* Send Id */ + zmq_msg_send(msg_id, zmqsock, ZMQ_SNDMORE); + send_delim(zmqsock); +} +static void serverwritefn(void *arg, void *zmqsock) +{ + zmq_msg_t *msg_id = (zmq_msg_t *)arg; + char buf[32] = "Test write callback"; + size_t i; + + for (i = 0; i < strlen(buf); i++) + buf[i] = toupper(buf[i]); + printf("server send: %s\n", buf); + fflush(stdout); + send_id_and_delim(zmqsock, msg_id); + zmq_send(zmqsock, buf, strlen(buf) + 1, 0); + + /* send just once */ + frrzmq_thread_cancel(&cb, &cb->write); + + zmq_msg_close(msg_id); + XFREE(MTYPE_ZMQMSG, msg_id); +} static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg, - unsigned partnum) + unsigned partnum) { + static int num = 0; int more = zmq_msg_more(msg); char *in = zmq_msg_data(msg); size_t i; zmq_msg_t reply; char *out; + /* Id */ + if (partnum == 0) { + send_id_and_delim(zmqsock, msg); + return; + } + /* Delim */ + if (partnum == 1) + return; + + printf("server recv part %u (more: %d): %s\n", partnum, more, in); fflush(stdout); - /* REQ-REP doesn't allow sending a reply here */ - if (more) - return; out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1); for (i = 0; i < strlen(in); i++) @@ -118,39 +204,66 @@ static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg, zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE); + if (more) + return; + out = XMALLOC(MTYPE_TESTBUF, 32); snprintf(out, 32, "msg# was %u", partnum); zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL); zmq_msg_send(&reply, zmqsock, 0); + + zmq_msg_close(&reply); + + if (++num < 7) + return; + + /* write callback test */ + char buf[32]; + zmq_msg_t *msg_id = XMALLOC(MTYPE_ZMQMSG, sizeof(zmq_msg_t)); + recv_id_and_delim(zmqsock, msg_id); + zmq_recv(zmqsock, buf, sizeof(buf), 0); + printf("server recv: %s\n", buf); + fflush(stdout); + + frrzmq_thread_add_write_msg(master, serverwritefn, NULL, msg_id, + zmqsock, &cb); } static void serverfn(void *arg, void *zmqsock) { static int num = 0; + zmq_msg_t msg_id; char buf[32]; size_t i; + + recv_id_and_delim(zmqsock, &msg_id); zmq_recv(zmqsock, buf, sizeof(buf), 0); printf("server recv: %s\n", buf); fflush(stdout); for (i = 0; i < strlen(buf); i++) buf[i] = toupper(buf[i]); + send_id_and_delim(zmqsock, &msg_id); + zmq_msg_close(&msg_id); zmq_send(zmqsock, buf, strlen(buf) + 1, 0); if (++num < 4) return; /* change to multipart callback */ - frrzmq_thread_cancel(cb); + frrzmq_thread_cancel(&cb, &cb->read); + frrzmq_thread_cancel(&cb, &cb->write); - cb = frrzmq_thread_add_read_part(master, serverpartfn, NULL, zmqsock); + frrzmq_thread_add_read_part(master, serverpartfn, NULL, NULL, zmqsock, + &cb); } static void sigchld(void) { printf("child exited.\n"); - frrzmq_thread_cancel(cb); + frrzmq_thread_cancel(&cb, &cb->read); + frrzmq_thread_cancel(&cb, &cb->write); } static struct quagga_signal_t sigs[] = { @@ -170,13 +283,13 @@ static void run_server(int syncfd) signal_init(master, array_size(sigs), sigs); frrzmq_init(); - zmqsock = zmq_socket(frrzmq_context, ZMQ_REP); + zmqsock = zmq_socket(frrzmq_context, ZMQ_ROUTER); if (zmq_bind(zmqsock, "tcp://*:17171")) { perror("zmq_bind"); exit(1); } - cb = frrzmq_thread_add_read_msg(master, serverfn, NULL, zmqsock); + frrzmq_thread_add_read_msg(master, serverfn, NULL, NULL, zmqsock, &cb); write(syncfd, &dummy, sizeof(dummy)); while (thread_fetch(master, &t)) diff --git a/tests/lib/test_zmq.refout b/tests/lib/test_zmq.refout index 61f45f02b..acac50553 100644 --- a/tests/lib/test_zmq.refout +++ b/tests/lib/test_zmq.refout @@ -11,40 +11,57 @@ client send: msg #3 def server recv: msg #3 def client recv: MSG #3 DEF client send: msg #4 efg -server recv part 0 (more: 0): msg #4 efg +server recv part 2 (more: 0): msg #4 efg client recv: MSG #4 EFG +client recv: msg# was 2 client send: msg #5 fgh -client recv: msg# was 0 +server recv part 2 (more: 0): msg #5 fgh +client recv: MSG #5 FGH +client recv: msg# was 2 client send: msg #6 ghi -server recv part 0 (more: 0): msg #6 ghi +server recv part 2 (more: 0): msg #6 ghi client recv: MSG #6 GHI +client recv: msg# was 2 client send: msg #7 hij -client recv: msg# was 0 +server recv part 2 (more: 0): msg #7 hij +client recv: MSG #7 HIJ +client recv: msg# was 2 --- client send: part 1/2 client send: part 2/2 -server recv part 0 (more: 1): part 1/2 -server recv part 1 (more: 0): part 2/2 +server recv part 2 (more: 1): part 1/2 +server recv part 3 (more: 0): part 2/2 +client recv (more: 1): PART 1/2 client recv (more: 1): PART 2/2 -client recv (more: 0): msg# was 1 +client recv (more: 0): msg# was 3 --- client send: part 1/3 client send: part 2/3 client send: part 3/3 -server recv part 0 (more: 1): part 1/3 -server recv part 1 (more: 1): part 2/3 -server recv part 2 (more: 0): part 3/3 +server recv part 2 (more: 1): part 1/3 +server recv part 3 (more: 1): part 2/3 +server recv part 4 (more: 0): part 3/3 +client recv (more: 1): PART 1/3 +client recv (more: 1): PART 2/3 client recv (more: 1): PART 3/3 -client recv (more: 0): msg# was 2 +client recv (more: 0): msg# was 4 --- client send: part 1/4 client send: part 2/4 client send: part 3/4 client send: part 4/4 -server recv part 0 (more: 1): part 1/4 -server recv part 1 (more: 1): part 2/4 -server recv part 2 (more: 1): part 3/4 -server recv part 3 (more: 0): part 4/4 +server recv part 2 (more: 1): part 1/4 +server recv part 3 (more: 1): part 2/4 +server recv part 4 (more: 1): part 3/4 +server recv part 5 (more: 0): part 4/4 +client recv (more: 1): PART 1/4 +client recv (more: 1): PART 2/4 +client recv (more: 1): PART 3/4 client recv (more: 1): PART 4/4 -client recv (more: 0): msg# was 3 +client recv (more: 0): msg# was 5 +--- +client send: Done receiving +server recv: Done receiving +server send: TEST WRITE CALLBACK +client recv: TEST WRITE CALLBACK child exited. |