summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRenato Westphal <renato@opensourcerouting.org>2020-08-20 01:33:40 +0200
committerRenato Westphal <renato@opensourcerouting.org>2020-08-20 16:53:46 +0200
commit24ed137c2059bf2e1ec16a46f2b9482dd09579a7 (patch)
treeab3b0fb1b5fc99a676b7cec89cfb743ff851f903
parentMerge pull request #6924 from AnuradhaKaruppiah/mem-fixes (diff)
downloadfrr-24ed137c2059bf2e1ec16a46f2b9482dd09579a7.tar.xz
frr-24ed137c2059bf2e1ec16a46f2b9482dd09579a7.zip
lib: adapt plugin to use new Sysrepo version
Sysrepo recently underwent a complete rewrite, where some substantial architectural changes were made (the most important one being the extinction of the sysrepod daemon). While most of the existing API was preserved, quite a few backward-incompatible changes [1] were introduced (mostly simplifications). This commit adapts our sysrepo northbound plugin to those API changes in order for it to be compatible with the latest Sysrepo version. Additional notes: * The old Sysrepo version is EOL and not supported anymore. * The new Sysrepo version requires libyang 1.x. Closes #6936 [1] https://github.com/sysrepo/sysrepo/blob/devel/CHANGES Signed-off-by: Renato Westphal <renato@opensourcerouting.org>
-rw-r--r--lib/northbound_sysrepo.c316
-rw-r--r--lib/yang.h1
2 files changed, 77 insertions, 240 deletions
diff --git a/lib/northbound_sysrepo.c b/lib/northbound_sysrepo.c
index 2209b19c1..3dec68592 100644
--- a/lib/northbound_sysrepo.c
+++ b/lib/northbound_sysrepo.c
@@ -37,13 +37,11 @@ DEFINE_MTYPE_STATIC(LIB, SYSREPO, "Sysrepo module")
static struct debug nb_dbg_client_sysrepo = {0, "Northbound client: Sysrepo"};
static struct thread_master *master;
-static struct list *sysrepo_threads;
static sr_session_ctx_t *session;
static sr_conn_ctx_t *connection;
static struct nb_transaction *transaction;
static int frr_sr_read_cb(struct thread *thread);
-static int frr_sr_write_cb(struct thread *thread);
static int frr_sr_finish(void);
/* Convert FRR YANG data value to sysrepo YANG data value. */
@@ -236,25 +234,23 @@ static int frr_sr_process_change(struct nb_config *candidate,
return NB_OK;
}
-static int frr_sr_config_change_cb_verify(sr_session_ctx_t *session,
- const char *module_name,
- bool startup_config)
+static int frr_sr_config_change_cb_prepare(sr_session_ctx_t *session,
+ const char *module_name,
+ bool startup_config)
{
sr_change_iter_t *it;
int ret;
sr_change_oper_t sr_op;
sr_val_t *sr_old_val, *sr_new_val;
- char xpath[XPATH_MAXLEN];
struct nb_context context = {};
struct nb_config *candidate;
char errmsg[BUFSIZ] = {0};
- snprintf(xpath, sizeof(xpath), "/%s:*", module_name);
- ret = sr_get_changes_iter(session, xpath, &it);
+ ret = sr_get_changes_iter(session, "//*", &it);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_LIBSYSREPO,
- "%s: sr_get_changes_iter() failed for xpath %s",
- __func__, xpath);
+ "%s: sr_get_changes_iter() failed for \"%s\"",
+ __func__, module_name);
return ret;
}
@@ -307,12 +303,14 @@ static int frr_sr_config_change_cb_verify(sr_session_ctx_t *session,
__func__, nb_err_name(ret), errmsg);
}
+ if (!transaction)
+ nb_config_free(candidate);
+
/* Map northbound return code to sysrepo return code. */
switch (ret) {
case NB_OK:
return SR_ERR_OK;
case NB_ERR_NO_CHANGES:
- nb_config_free(candidate);
return SR_ERR_OK;
case NB_ERR_LOCKED:
return SR_ERR_LOCKED;
@@ -356,22 +354,23 @@ static int frr_sr_config_change_cb_abort(sr_session_ctx_t *session,
/* Callback for changes in the running configuration. */
static int frr_sr_config_change_cb(sr_session_ctx_t *session,
- const char *module_name,
- sr_notif_event_t sr_ev, void *private_ctx)
+ const char *module_name, const char *xpath,
+ sr_event_t sr_ev, uint32_t request_id,
+ void *private_data)
{
switch (sr_ev) {
case SR_EV_ENABLED:
- return frr_sr_config_change_cb_verify(session, module_name,
- true);
- case SR_EV_VERIFY:
- return frr_sr_config_change_cb_verify(session, module_name,
- false);
- case SR_EV_APPLY:
+ return frr_sr_config_change_cb_prepare(session, module_name,
+ true);
+ case SR_EV_CHANGE:
+ return frr_sr_config_change_cb_prepare(session, module_name,
+ false);
+ case SR_EV_DONE:
return frr_sr_config_change_cb_apply(session, module_name);
case SR_EV_ABORT:
return frr_sr_config_change_cb_abort(session, module_name);
default:
- flog_err(EC_LIB_LIBSYSREPO, "%s: unknown sysrepo event: %u",
+ flog_err(EC_LIB_LIBSYSREPO, "%s: unexpected sysrepo event: %u",
__func__, sr_ev);
return SR_ERR_INTERNAL;
}
@@ -381,70 +380,49 @@ static int frr_sr_state_data_iter_cb(const struct lys_node *snode,
struct yang_translator *translator,
struct yang_data *data, void *arg)
{
- struct list *elements = arg;
-
- listnode_add(elements, data);
+ struct lyd_node *dnode = arg;
+
+ ly_errno = 0;
+ dnode = lyd_new_path(dnode, ly_native_ctx, data->xpath, data->value, 0,
+ LYD_PATH_OPT_UPDATE);
+ if (!dnode && ly_errno) {
+ flog_warn(EC_LIB_LIBYANG, "%s: lyd_new_path() failed",
+ __func__);
+ yang_data_free(data);
+ return NB_ERR;
+ }
+ yang_data_free(data);
return NB_OK;
}
/* Callback for state retrieval. */
-static int frr_sr_state_cb(const char *xpath, sr_val_t **values,
- size_t *values_cnt, uint64_t request_id,
- const char *original_xpath, void *private_ctx)
+static int frr_sr_state_cb(sr_session_ctx_t *session, const char *module_name,
+ const char *xpath, const char *request_xpath,
+ uint32_t request_id, struct lyd_node **parent,
+ void *private_ctx)
{
- struct list *elements;
- struct yang_data *data;
- struct listnode *node;
- sr_val_t *v;
- int ret, count, i = 0;
+ struct lyd_node *dnode;
- elements = yang_data_list_new();
- if (nb_oper_data_iterate(xpath, NULL, NB_OPER_DATA_ITER_NORECURSE,
- frr_sr_state_data_iter_cb, elements)
+ dnode = *parent;
+ if (nb_oper_data_iterate(request_xpath, NULL, 0,
+ frr_sr_state_data_iter_cb, dnode)
!= NB_OK) {
flog_warn(EC_LIB_NB_OPERATIONAL_DATA,
"%s: failed to obtain operational data [xpath %s]",
__func__, xpath);
- goto exit;
- }
-
- if (list_isempty(elements))
- goto exit;
-
- count = listcount(elements);
- ret = sr_new_values(count, &v);
- if (ret != SR_ERR_OK) {
- flog_err(EC_LIB_LIBSYSREPO, "%s: sr_new_values(): %s", __func__,
- sr_strerror(ret));
- goto exit;
- }
-
- for (ALL_LIST_ELEMENTS_RO(elements, node, data)) {
- if (yang_data_frr2sr(data, &v[i++]) != 0) {
- flog_err(EC_LIB_SYSREPO_DATA_CONVERT,
- "%s: failed to convert data to sysrepo format",
- __func__);
- }
+ return SR_ERR_INTERNAL;
}
- *values = v;
- *values_cnt = count;
-
- list_delete(&elements);
-
- return SR_ERR_OK;
-
-exit:
- list_delete(&elements);
- *values = NULL;
- *values_cnt = 0;
+ *parent = dnode;
return SR_ERR_OK;
}
-static int frr_sr_config_rpc_cb(const char *xpath, const sr_val_t *sr_input,
- const size_t input_cnt, sr_val_t **sr_output,
+static int frr_sr_config_rpc_cb(sr_session_ctx_t *session, const char *xpath,
+ const sr_val_t *sr_input,
+ const size_t input_cnt, sr_event_t sr_ev,
+ uint32_t request_id, sr_val_t **sr_output,
size_t *sr_output_cnt, void *private_ctx)
{
struct nb_node *nb_node;
@@ -551,8 +529,7 @@ static int frr_sr_notification_send(const char *xpath, struct list *arguments)
}
}
- ret = sr_event_notif_send(session, xpath, values, values_cnt,
- SR_EV_NOTIF_DEFAULT);
+ ret = sr_event_notif_send(session, xpath, values, values_cnt);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_LIBSYSREPO,
"%s: sr_event_notif_send() failed for xpath %s",
@@ -563,102 +540,13 @@ static int frr_sr_notification_send(const char *xpath, struct list *arguments)
return NB_OK;
}
-/* Code to integrate the sysrepo client into FRR main event loop. */
-struct sysrepo_thread {
- struct thread *thread;
- sr_fd_event_t event;
- int fd;
-};
-
-static struct sysrepo_thread *frr_sr_fd_lookup(sr_fd_event_t event, int fd)
-{
- struct sysrepo_thread *sr_thread;
- struct listnode *node;
-
- for (ALL_LIST_ELEMENTS_RO(sysrepo_threads, node, sr_thread)) {
- if (sr_thread->event == event && sr_thread->fd == fd)
- return sr_thread;
- }
-
- return NULL;
-}
-
-static void frr_sr_fd_add(int event, int fd)
-{
- struct sysrepo_thread *sr_thread;
-
- if (frr_sr_fd_lookup(event, fd) != NULL)
- return;
-
- sr_thread = XCALLOC(MTYPE_SYSREPO, sizeof(*sr_thread));
- sr_thread->event = event;
- sr_thread->fd = fd;
- listnode_add(sysrepo_threads, sr_thread);
-
- switch (event) {
- case SR_FD_INPUT_READY:
- thread_add_read(master, frr_sr_read_cb, NULL, fd,
- &sr_thread->thread);
- break;
- case SR_FD_OUTPUT_READY:
- thread_add_write(master, frr_sr_write_cb, NULL, fd,
- &sr_thread->thread);
- break;
- default:
- return;
- }
-}
-
-static void frr_sr_fd_free(struct sysrepo_thread *sr_thread)
-{
- THREAD_OFF(sr_thread->thread);
- XFREE(MTYPE_SYSREPO, sr_thread);
-}
-
-static void frr_sr_fd_del(int event, int fd)
-{
- struct sysrepo_thread *sr_thread;
-
- sr_thread = frr_sr_fd_lookup(event, fd);
- if (!sr_thread)
- return;
-
- listnode_delete(sysrepo_threads, sr_thread);
- frr_sr_fd_free(sr_thread);
-}
-
-static void frr_sr_fd_update(sr_fd_change_t *fd_change_set,
- size_t fd_change_set_cnt)
-{
- for (size_t i = 0; i < fd_change_set_cnt; i++) {
- int fd = fd_change_set[i].fd;
- int event = fd_change_set[i].events;
-
- if (event != SR_FD_INPUT_READY && event != SR_FD_OUTPUT_READY)
- continue;
-
- switch (fd_change_set[i].action) {
- case SR_FD_START_WATCHING:
- frr_sr_fd_add(event, fd);
- break;
- case SR_FD_STOP_WATCHING:
- frr_sr_fd_del(event, fd);
- break;
- default:
- break;
- }
- }
-}
-
static int frr_sr_read_cb(struct thread *thread)
{
+ sr_subscription_ctx_t *sr_subscription = THREAD_ARG(thread);
int fd = THREAD_FD(thread);
- sr_fd_change_t *fd_change_set = NULL;
- size_t fd_change_set_cnt = 0;
int ret;
- ret = sr_fd_event_process(fd, SR_FD_INPUT_READY, &fd_change_set,
- &fd_change_set_cnt);
+ ret = sr_process_events(sr_subscription, session, NULL);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_LIBSYSREPO, "%s: sr_fd_event_process(): %s",
__func__, sr_strerror(ret));
@@ -666,31 +554,7 @@ static int frr_sr_read_cb(struct thread *thread)
}
thread = NULL;
- thread_add_read(master, frr_sr_read_cb, NULL, fd, &thread);
-
- frr_sr_fd_update(fd_change_set, fd_change_set_cnt);
- free(fd_change_set);
-
- return 0;
-}
-
-static int frr_sr_write_cb(struct thread *thread)
-{
- int fd = THREAD_FD(thread);
- sr_fd_change_t *fd_change_set = NULL;
- size_t fd_change_set_cnt = 0;
- int ret;
-
- ret = sr_fd_event_process(fd, SR_FD_OUTPUT_READY, &fd_change_set,
- &fd_change_set_cnt);
- if (ret != SR_ERR_OK) {
- flog_err(EC_LIB_LIBSYSREPO, "%s: sr_fd_event_process(): %s",
- __func__, sr_strerror(ret));
- return -1;
- }
-
- frr_sr_fd_update(fd_change_set, fd_change_set_cnt);
- free(fd_change_set);
+ thread_add_read(master, frr_sr_read_cb, sr_subscription, fd, &thread);
return 0;
}
@@ -700,8 +564,8 @@ static void frr_sr_subscribe_config(struct yang_module *module)
int ret;
ret = sr_module_change_subscribe(
- session, module->name, frr_sr_config_change_cb, NULL, 0,
- SR_SUBSCR_DEFAULT | SR_SUBSCR_EV_ENABLED,
+ session, module->name, NULL, frr_sr_config_change_cb, NULL, 0,
+ SR_SUBSCR_DEFAULT | SR_SUBSCR_ENABLED | SR_SUBSCR_NO_THREAD,
&module->sr_subscription);
if (ret != SR_ERR_OK)
flog_err(EC_LIB_LIBSYSREPO, "sr_module_change_subscribe(): %s",
@@ -725,11 +589,11 @@ static int frr_sr_subscribe_state(const struct lys_node *snode, void *arg)
DEBUGD(&nb_dbg_client_sysrepo, "%s: providing data to '%s'", __func__,
nb_node->xpath);
- ret = sr_dp_get_items_subscribe(
- session, nb_node->xpath, frr_sr_state_cb, NULL,
- SR_SUBSCR_CTX_REUSE, &module->sr_subscription);
+ ret = sr_oper_get_items_subscribe(
+ session, snode->module->name, nb_node->xpath, frr_sr_state_cb,
+ NULL, SR_SUBSCR_CTX_REUSE, &module->sr_subscription);
if (ret != SR_ERR_OK)
- flog_err(EC_LIB_LIBSYSREPO, "sr_dp_get_items_subscribe(): %s",
+ flog_err(EC_LIB_LIBSYSREPO, "sr_oper_get_items_subscribe(): %s",
sr_strerror(ret));
return YANG_ITER_CONTINUE;
@@ -750,7 +614,7 @@ static int frr_sr_subscribe_rpc(const struct lys_node *snode, void *arg)
nb_node->xpath);
ret = sr_rpc_subscribe(session, nb_node->xpath, frr_sr_config_rpc_cb,
- NULL, SR_SUBSCR_CTX_REUSE,
+ NULL, 0, SR_SUBSCR_CTX_REUSE,
&module->sr_subscription);
if (ret != SR_ERR_OK)
flog_err(EC_LIB_LIBSYSREPO, "sr_rpc_subscribe(): %s",
@@ -759,30 +623,6 @@ static int frr_sr_subscribe_rpc(const struct lys_node *snode, void *arg)
return YANG_ITER_CONTINUE;
}
-static int frr_sr_subscribe_action(const struct lys_node *snode, void *arg)
-{
- struct yang_module *module = arg;
- struct nb_node *nb_node;
- int ret;
-
- if (snode->nodetype != LYS_ACTION)
- return YANG_ITER_CONTINUE;
-
- nb_node = snode->priv;
-
- DEBUGD(&nb_dbg_client_sysrepo, "%s: providing action to '%s'", __func__,
- nb_node->xpath);
-
- ret = sr_action_subscribe(session, nb_node->xpath, frr_sr_config_rpc_cb,
- NULL, SR_SUBSCR_CTX_REUSE,
- &module->sr_subscription);
- if (ret != SR_ERR_OK)
- flog_err(EC_LIB_LIBSYSREPO, "sr_action_subscribe(): %s",
- sr_strerror(ret));
-
- return YANG_ITER_CONTINUE;
-}
-
/* CLI commands. */
DEFUN (debug_nb_sr,
debug_nb_sr_cmd,
@@ -830,22 +670,13 @@ static void frr_sr_cli_init(void)
}
/* FRR's Sysrepo initialization. */
-static int frr_sr_init(const char *program_name)
+static int frr_sr_init(void)
{
struct yang_module *module;
- int sysrepo_fd, ret;
-
- sysrepo_threads = list_new();
-
- ret = sr_fd_watcher_init(&sysrepo_fd, NULL);
- if (ret != SR_ERR_OK) {
- flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_fd_watcher_init(): %s",
- __func__, sr_strerror(ret));
- goto cleanup;
- }
+ int ret;
/* Connect to Sysrepo. */
- ret = sr_connect(program_name, SR_CONN_DEFAULT, &connection);
+ ret = sr_connect(SR_CONN_DEFAULT, &connection);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_connect(): %s", __func__,
sr_strerror(ret));
@@ -853,8 +684,7 @@ static int frr_sr_init(const char *program_name)
}
/* Start session. */
- ret = sr_session_start(connection, SR_DS_RUNNING, SR_SESS_DEFAULT,
- &session);
+ ret = sr_session_start(connection, SR_DS_RUNNING, &session);
if (ret != SR_ERR_OK) {
flog_err(EC_LIB_SYSREPO_INIT, "%s: sr_session_start(): %s",
__func__, sr_strerror(ret));
@@ -863,19 +693,28 @@ static int frr_sr_init(const char *program_name)
/* Perform subscriptions. */
RB_FOREACH (module, yang_modules, &yang_modules) {
+ int event_pipe;
+
frr_sr_subscribe_config(module);
yang_snodes_iterate_module(module->info, frr_sr_subscribe_state,
0, module);
yang_snodes_iterate_module(module->info, frr_sr_subscribe_rpc,
0, module);
- yang_snodes_iterate_module(module->info,
- frr_sr_subscribe_action, 0, module);
+
+ /* Watch subscriptions. */
+ ret = sr_get_event_pipe(module->sr_subscription, &event_pipe);
+ if (ret != SR_ERR_OK) {
+ flog_err(EC_LIB_SYSREPO_INIT,
+ "%s: sr_get_event_pipe(): %s", __func__,
+ sr_strerror(ret));
+ goto cleanup;
+ }
+ thread_add_read(master, frr_sr_read_cb, module->sr_subscription,
+ event_pipe, &module->sr_thread);
}
hook_register(nb_notification_send, frr_sr_notification_send);
- frr_sr_fd_add(SR_FD_INPUT_READY, sysrepo_fd);
-
return 0;
cleanup:
@@ -891,7 +730,8 @@ static int frr_sr_finish(void)
RB_FOREACH (module, yang_modules, &yang_modules) {
if (!module->sr_subscription)
continue;
- sr_unsubscribe(session, module->sr_subscription);
+ sr_unsubscribe(module->sr_subscription);
+ THREAD_OFF(module->sr_thread);
}
if (session)
@@ -899,10 +739,6 @@ static int frr_sr_finish(void)
if (connection)
sr_disconnect(connection);
- sysrepo_threads->del = (void (*)(void *))frr_sr_fd_free;
- list_delete(&sysrepo_threads);
- sr_fd_watcher_cleanup();
-
return 0;
}
@@ -910,7 +746,7 @@ static int frr_sr_module_late_init(struct thread_master *tm)
{
master = tm;
- if (frr_sr_init(frr_get_progname()) < 0) {
+ if (frr_sr_init() < 0) {
flog_err(EC_LIB_SYSREPO_INIT,
"failed to initialize the Sysrepo module");
return -1;
diff --git a/lib/yang.h b/lib/yang.h
index cc048c44e..94bbed233 100644
--- a/lib/yang.h
+++ b/lib/yang.h
@@ -63,6 +63,7 @@ struct yang_module {
#endif
#ifdef HAVE_SYSREPO
sr_subscription_ctx_t *sr_subscription;
+ struct thread *sr_thread;
#endif
};
RB_HEAD(yang_modules, yang_module);