summaryrefslogtreecommitdiffstats
path: root/tests
diff options
context:
space:
mode:
authorDonald Sharp <sharpd@cumulusnetworks.com>2017-12-13 13:36:57 +0100
committerGitHub <noreply@github.com>2017-12-13 13:36:57 +0100
commitdb33b83b1d90458503987fd9f5f21c09c045e9a7 (patch)
tree58d2898d93fd09edaf3268e06d7da24e07d9bd02 /tests
parentMerge pull request #1540 from opensourcerouting/isis-spfperf1 (diff)
parentlib: Address ZMQ lib TODOs (diff)
downloadfrr-db33b83b1d90458503987fd9f5f21c09c045e9a7.tar.xz
frr-db33b83b1d90458503987fd9f5f21c09c045e9a7.zip
Merge pull request #1478 from bingen/zeromq4
lib: Address ZMQ lib TODOs
Diffstat (limited to 'tests')
-rw-r--r--tests/lib/test_zmq.c151
-rw-r--r--tests/lib/test_zmq.refout49
2 files changed, 165 insertions, 35 deletions
diff --git a/tests/lib/test_zmq.c b/tests/lib/test_zmq.c
index c270ec3d1..b6624915e 100644
--- a/tests/lib/test_zmq.c
+++ b/tests/lib/test_zmq.c
@@ -23,6 +23,7 @@
#include "frr_zmq.h"
DEFINE_MTYPE_STATIC(LIB, TESTBUF, "zmq test buffer")
+DEFINE_MTYPE_STATIC(LIB, ZMQMSG, "zmq message")
static struct thread_master *master;
@@ -31,6 +32,25 @@ static void msg_buf_free(void *data, void *hint)
XFREE(MTYPE_TESTBUF, data);
}
+static int recv_delim(void *zmqsock)
+{
+ /* receive delim */
+ zmq_msg_t zdelim;
+ int more;
+ zmq_msg_init(&zdelim);
+ zmq_msg_recv(&zdelim, zmqsock, 0);
+ more = zmq_msg_more(&zdelim);
+ zmq_msg_close(&zdelim);
+ return more;
+}
+static void send_delim(void *zmqsock)
+{
+ /* Send delim */
+ zmq_msg_t zdelim;
+ zmq_msg_init(&zdelim);
+ zmq_msg_send(&zdelim, zmqsock, ZMQ_SNDMORE);
+ zmq_msg_close(&zdelim);
+}
static void run_client(int syncfd)
{
int i, j;
@@ -38,13 +58,14 @@ static void run_client(int syncfd)
char dummy;
void *zmqctx = NULL;
void *zmqsock;
+ int more;
read(syncfd, &dummy, 1);
zmqctx = zmq_ctx_new();
zmq_ctx_set(zmqctx, ZMQ_IPV6, 1);
- zmqsock = zmq_socket(zmqctx, ZMQ_REQ);
+ zmqsock = zmq_socket(zmqctx, ZMQ_DEALER);
if (zmq_connect(zmqsock, "tcp://127.0.0.1:17171")) {
perror("zmq_connect");
exit(1);
@@ -52,22 +73,28 @@ static void run_client(int syncfd)
/* single-part */
for (i = 0; i < 8; i++) {
- snprintf(buf, sizeof(buf), "msg #%d %c%c%c",
- i, 'a' + i, 'b' + i, 'c' + i);
+ snprintf(buf, sizeof(buf), "msg #%d %c%c%c", i, 'a' + i,
+ 'b' + i, 'c' + i);
printf("client send: %s\n", buf);
fflush(stdout);
- zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
- zmq_recv(zmqsock, buf, sizeof(buf), 0);
- printf("client recv: %s\n", buf);
+ send_delim(zmqsock);
+ zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+ more = recv_delim(zmqsock);
+ while (more) {
+ zmq_recv(zmqsock, buf, sizeof(buf), 0);
+ printf("client recv: %s\n", buf);
+ size_t len = sizeof(more);
+ if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
+ break;
+ }
}
/* multipart */
for (i = 2; i < 5; i++) {
- int more;
-
printf("---\n");
+ send_delim(zmqsock);
+ zmq_msg_t part;
for (j = 1; j <= i; j++) {
- zmq_msg_t part;
char *dyn = XMALLOC(MTYPE_TESTBUF, 32);
snprintf(dyn, 32, "part %d/%d", j, i);
@@ -79,7 +106,7 @@ static void run_client(int syncfd)
zmq_msg_send(&part, zmqsock, j < i ? ZMQ_SNDMORE : 0);
}
- zmq_msg_t part;
+ recv_delim(zmqsock);
do {
char *data;
@@ -90,26 +117,85 @@ static void run_client(int syncfd)
} while (more);
zmq_msg_close(&part);
}
+
+ /* write callback */
+ printf("---\n");
+ snprintf(buf, 32, "Done receiving");
+ printf("client send: %s\n", buf);
+ fflush(stdout);
+ send_delim(zmqsock);
+ zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+ /* wait for message from server */
+ more = recv_delim(zmqsock);
+ while (more) {
+ zmq_recv(zmqsock, buf, sizeof(buf), 0);
+ printf("client recv: %s\n", buf);
+ size_t len = sizeof(more);
+ if (zmq_getsockopt(zmqsock, ZMQ_RCVMORE, &more, &len))
+ break;
+ }
+
zmq_close(zmqsock);
zmq_ctx_term(zmqctx);
}
static struct frrzmq_cb *cb;
+static void recv_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
+{
+ /* receive id */
+ zmq_msg_init(msg_id);
+ zmq_msg_recv(msg_id, zmqsock, 0);
+ /* receive delim */
+ recv_delim(zmqsock);
+}
+static void send_id_and_delim(void *zmqsock, zmq_msg_t *msg_id)
+{
+ /* Send Id */
+ zmq_msg_send(msg_id, zmqsock, ZMQ_SNDMORE);
+ send_delim(zmqsock);
+}
+static void serverwritefn(void *arg, void *zmqsock)
+{
+ zmq_msg_t *msg_id = (zmq_msg_t *)arg;
+ char buf[32] = "Test write callback";
+ size_t i;
+
+ for (i = 0; i < strlen(buf); i++)
+ buf[i] = toupper(buf[i]);
+ printf("server send: %s\n", buf);
+ fflush(stdout);
+ send_id_and_delim(zmqsock, msg_id);
+ zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
+
+ /* send just once */
+ frrzmq_thread_cancel(&cb, &cb->write);
+
+ zmq_msg_close(msg_id);
+ XFREE(MTYPE_ZMQMSG, msg_id);
+}
static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg,
- unsigned partnum)
+ unsigned partnum)
{
+ static int num = 0;
int more = zmq_msg_more(msg);
char *in = zmq_msg_data(msg);
size_t i;
zmq_msg_t reply;
char *out;
+ /* Id */
+ if (partnum == 0) {
+ send_id_and_delim(zmqsock, msg);
+ return;
+ }
+ /* Delim */
+ if (partnum == 1)
+ return;
+
+
printf("server recv part %u (more: %d): %s\n", partnum, more, in);
fflush(stdout);
- /* REQ-REP doesn't allow sending a reply here */
- if (more)
- return;
out = XMALLOC(MTYPE_TESTBUF, strlen(in) + 1);
for (i = 0; i < strlen(in); i++)
@@ -118,39 +204,66 @@ static void serverpartfn(void *arg, void *zmqsock, zmq_msg_t *msg,
zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
zmq_msg_send(&reply, zmqsock, ZMQ_SNDMORE);
+ if (more)
+ return;
+
out = XMALLOC(MTYPE_TESTBUF, 32);
snprintf(out, 32, "msg# was %u", partnum);
zmq_msg_init_data(&reply, out, strlen(out) + 1, msg_buf_free, NULL);
zmq_msg_send(&reply, zmqsock, 0);
+
+ zmq_msg_close(&reply);
+
+ if (++num < 7)
+ return;
+
+ /* write callback test */
+ char buf[32];
+ zmq_msg_t *msg_id = XMALLOC(MTYPE_ZMQMSG, sizeof(zmq_msg_t));
+ recv_id_and_delim(zmqsock, msg_id);
+ zmq_recv(zmqsock, buf, sizeof(buf), 0);
+ printf("server recv: %s\n", buf);
+ fflush(stdout);
+
+ frrzmq_thread_add_write_msg(master, serverwritefn, NULL, msg_id,
+ zmqsock, &cb);
}
static void serverfn(void *arg, void *zmqsock)
{
static int num = 0;
+ zmq_msg_t msg_id;
char buf[32];
size_t i;
+
+ recv_id_and_delim(zmqsock, &msg_id);
zmq_recv(zmqsock, buf, sizeof(buf), 0);
printf("server recv: %s\n", buf);
fflush(stdout);
for (i = 0; i < strlen(buf); i++)
buf[i] = toupper(buf[i]);
+ send_id_and_delim(zmqsock, &msg_id);
+ zmq_msg_close(&msg_id);
zmq_send(zmqsock, buf, strlen(buf) + 1, 0);
if (++num < 4)
return;
/* change to multipart callback */
- frrzmq_thread_cancel(cb);
+ frrzmq_thread_cancel(&cb, &cb->read);
+ frrzmq_thread_cancel(&cb, &cb->write);
- cb = frrzmq_thread_add_read_part(master, serverpartfn, NULL, zmqsock);
+ frrzmq_thread_add_read_part(master, serverpartfn, NULL, NULL, zmqsock,
+ &cb);
}
static void sigchld(void)
{
printf("child exited.\n");
- frrzmq_thread_cancel(cb);
+ frrzmq_thread_cancel(&cb, &cb->read);
+ frrzmq_thread_cancel(&cb, &cb->write);
}
static struct quagga_signal_t sigs[] = {
@@ -170,13 +283,13 @@ static void run_server(int syncfd)
signal_init(master, array_size(sigs), sigs);
frrzmq_init();
- zmqsock = zmq_socket(frrzmq_context, ZMQ_REP);
+ zmqsock = zmq_socket(frrzmq_context, ZMQ_ROUTER);
if (zmq_bind(zmqsock, "tcp://*:17171")) {
perror("zmq_bind");
exit(1);
}
- cb = frrzmq_thread_add_read_msg(master, serverfn, NULL, zmqsock);
+ frrzmq_thread_add_read_msg(master, serverfn, NULL, NULL, zmqsock, &cb);
write(syncfd, &dummy, sizeof(dummy));
while (thread_fetch(master, &t))
diff --git a/tests/lib/test_zmq.refout b/tests/lib/test_zmq.refout
index 61f45f02b..acac50553 100644
--- a/tests/lib/test_zmq.refout
+++ b/tests/lib/test_zmq.refout
@@ -11,40 +11,57 @@ client send: msg #3 def
server recv: msg #3 def
client recv: MSG #3 DEF
client send: msg #4 efg
-server recv part 0 (more: 0): msg #4 efg
+server recv part 2 (more: 0): msg #4 efg
client recv: MSG #4 EFG
+client recv: msg# was 2
client send: msg #5 fgh
-client recv: msg# was 0
+server recv part 2 (more: 0): msg #5 fgh
+client recv: MSG #5 FGH
+client recv: msg# was 2
client send: msg #6 ghi
-server recv part 0 (more: 0): msg #6 ghi
+server recv part 2 (more: 0): msg #6 ghi
client recv: MSG #6 GHI
+client recv: msg# was 2
client send: msg #7 hij
-client recv: msg# was 0
+server recv part 2 (more: 0): msg #7 hij
+client recv: MSG #7 HIJ
+client recv: msg# was 2
---
client send: part 1/2
client send: part 2/2
-server recv part 0 (more: 1): part 1/2
-server recv part 1 (more: 0): part 2/2
+server recv part 2 (more: 1): part 1/2
+server recv part 3 (more: 0): part 2/2
+client recv (more: 1): PART 1/2
client recv (more: 1): PART 2/2
-client recv (more: 0): msg# was 1
+client recv (more: 0): msg# was 3
---
client send: part 1/3
client send: part 2/3
client send: part 3/3
-server recv part 0 (more: 1): part 1/3
-server recv part 1 (more: 1): part 2/3
-server recv part 2 (more: 0): part 3/3
+server recv part 2 (more: 1): part 1/3
+server recv part 3 (more: 1): part 2/3
+server recv part 4 (more: 0): part 3/3
+client recv (more: 1): PART 1/3
+client recv (more: 1): PART 2/3
client recv (more: 1): PART 3/3
-client recv (more: 0): msg# was 2
+client recv (more: 0): msg# was 4
---
client send: part 1/4
client send: part 2/4
client send: part 3/4
client send: part 4/4
-server recv part 0 (more: 1): part 1/4
-server recv part 1 (more: 1): part 2/4
-server recv part 2 (more: 1): part 3/4
-server recv part 3 (more: 0): part 4/4
+server recv part 2 (more: 1): part 1/4
+server recv part 3 (more: 1): part 2/4
+server recv part 4 (more: 1): part 3/4
+server recv part 5 (more: 0): part 4/4
+client recv (more: 1): PART 1/4
+client recv (more: 1): PART 2/4
+client recv (more: 1): PART 3/4
client recv (more: 1): PART 4/4
-client recv (more: 0): msg# was 3
+client recv (more: 0): msg# was 5
+---
+client send: Done receiving
+server recv: Done receiving
+server send: TEST WRITE CALLBACK
+client recv: TEST WRITE CALLBACK
child exited.