From 6c2ec9284b9842b2708baadcdcef736981550465 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Tue, 4 Feb 2025 10:11:29 -0500 Subject: [PATCH 1/2] Fixes #1732: implement new Admin Connector type --- include/qpid/dispatch/dispatch.h | 11 + src/adaptors/amqp/amqp_adaptor.c | 34 ++- src/adaptors/amqp/connection_manager.c | 404 +++++++------------------ src/adaptors/amqp/qd_connection.c | 16 +- src/adaptors/amqp/qd_connection.h | 2 +- src/adaptors/amqp/qd_connector.c | 262 ++++++++++++++-- src/adaptors/amqp/qd_connector.h | 88 +++++- src/adaptors/amqp/server_config.c | 18 +- src/adaptors/amqp/server_config.h | 4 +- src/dispatch.c | 10 + 10 files changed, 474 insertions(+), 375 deletions(-) diff --git a/include/qpid/dispatch/dispatch.h b/include/qpid/dispatch/dispatch.h index 93aa7f2cc..876be1abf 100644 --- a/include/qpid/dispatch/dispatch.h +++ b/include/qpid/dispatch/dispatch.h @@ -22,6 +22,7 @@ #include "qpid/dispatch/error.h" #include +#include /**@file * Configure and prepare a dispatch instance. @@ -33,6 +34,7 @@ typedef struct qd_dispatch_t qd_dispatch_t; typedef struct qd_connection_manager_t qd_connection_manager_t; typedef struct qd_policy_t qd_policy_t; +typedef struct qd_server_t qd_server_t; /** * Initialize the Dispatch library and prepare it for operation. @@ -83,6 +85,15 @@ qd_connection_manager_t *qd_dispatch_connection_manager(const qd_dispatch_t *qd) */ qd_policy_t *qd_dispatch_get_policy(const qd_dispatch_t *dispatch); +/** + * Return the configured inter-router data connection count + */ +uint32_t qd_dispatch_get_data_connection_count(const qd_dispatch_t *dispatch); + +/** + * Return the routers server + */ +qd_server_t *qd_dispatch_get_server(const qd_dispatch_t *dispatch); /** * @} diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index f3cb032a6..452310df3 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -1402,8 +1402,10 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool const char *host = 0; char host_local[255]; const qd_server_config_t *config; - if (qd_connection_connector(conn)) { - config = qd_connector_config(qd_connection_connector(conn)); + qd_connector_t *connector = qd_connection_connector(conn); + + if (connector) { + config = qd_connector_get_config(connector); snprintf(host_local, 254, "%s", config->host_port); host = &host_local[0]; } @@ -1414,8 +1416,13 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool qd_router_connection_get_config(conn, &role, &cost, &name, &conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity); - if (conn->connector && conn->connector->config.has_data_connectors) { - memcpy(conn->group_correlator, conn->connector->group_correlator, QD_DISCRIMINATOR_SIZE); + if (connector && !!connector->admin_conn->data_connection_count) { + memcpy(conn->group_correlator, connector->admin_conn->group_correlator, QD_DISCRIMINATOR_SIZE); + if (connector->is_data_connector) { + // override the configured role to identify this as a data connection + assert(role == QDR_ROLE_INTER_ROUTER); + role = QDR_ROLE_INTER_ROUTER_DATA; + } } // check offered capabilities for streaming link support and connection trunking support @@ -1524,8 +1531,8 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool } else if ((key.size == strlen(QD_CONNECTION_PROPERTY_ACCESS_ID) && strncmp(key.start, QD_CONNECTION_PROPERTY_ACCESS_ID, key.size) == 0)) { if (!pn_data_next(props)) break; - if (!!conn->connector && !!conn->connector->vflow_record && pn_data_type(props) == PN_STRING) { - vflow_set_ref_from_pn(conn->connector->vflow_record, VFLOW_ATTRIBUTE_PEER, props); + if (!!connector && !!connector->vflow_record && pn_data_type(props) == PN_STRING) { + vflow_set_ref_from_pn(connector->vflow_record, VFLOW_ATTRIBUTE_PEER, props); } } else { @@ -1553,7 +1560,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool authenticated, conn->opened, (char*) mech, - conn->connector ? QD_OUTGOING : QD_INCOMING, + connector ? QD_OUTGOING : QD_INCOMING, host, proto, cipher, @@ -1592,17 +1599,14 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool qd_listener_add_link(conn->listener); } - if (!!conn->connector) { - qd_connector_add_link(conn->connector); - } - - if (conn->connector) { - sys_mutex_lock(&conn->connector->lock); - qd_format_string(conn->connector->conn_msg, QD_CXTR_CONN_MSG_BUF_SIZE, + if (!!connector) { + qd_connector_add_link(connector); + sys_mutex_lock(&connector->lock); + qd_format_string(connector->conn_msg, QD_CXTR_CONN_MSG_BUF_SIZE, "[C%"PRIu64"] Connection Opened: dir=%s host=%s encrypted=%s auth=%s user=%s container_id=%s", connection_id, inbound ? "in" : "out", host, encrypted ? proto : "no", authenticated ? mech : "no", (char*) user, container); - sys_mutex_unlock(&conn->connector->lock); + sys_mutex_unlock(&connector->lock); } free(proto); diff --git a/src/adaptors/amqp/connection_manager.c b/src/adaptors/amqp/connection_manager.c index 7b83209fe..0ccde98a1 100644 --- a/src/adaptors/amqp/connection_manager.c +++ b/src/adaptors/amqp/connection_manager.c @@ -40,13 +40,11 @@ #include #include -#define CHECK() if (qd_error_code()) goto error struct qd_connection_manager_t { qd_server_t *server; qd_listener_list_t listeners; - qd_connector_list_t connectors; - qd_connector_list_t data_connectors; + qd_admin_connector_list_t admin_connectors; }; @@ -63,7 +61,7 @@ QD_EXPORT qd_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_en { qd_connection_manager_t *cm = qd->connection_manager; qd_listener_t *li = qd_listener(qd->server); - if (!li || qd_server_config_load(qd, &li->config, entity, true, 0) != QD_ERROR_NONE) { + if (!li || qd_server_config_load(&li->config, entity, true) != QD_ERROR_NONE) { qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create listener: %s", qd_error_message()); qd_listener_decref(li); return 0; @@ -168,49 +166,53 @@ static int get_failover_info_length(qd_failover_item_list_t conn_info_list) */ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl) { - qd_connector_t *connector = (qd_connector_t*) impl; + qd_admin_connector_t *admin_conn = (qd_admin_connector_t *) impl; + qd_connector_t *connector = 0; - int i = 1; - int num_items = 0; + qd_error_clear(); - sys_mutex_lock(&connector->lock); + // TODO(kgiusti): inter-router connections may have several qd_connector_ts active due to the router data connection + // count configuration. However we can only report 1 connector via management. It would be more accurate to report + // all connectors associated with this management entity + sys_mutex_lock(&admin_conn->lock); + connector = DEQ_HEAD(admin_conn->connectors); + if (connector) { + // prevent I/O thread from freeing connector while it is being accessed + sys_atomic_inc(&connector->ref_count); + } + sys_mutex_unlock(&admin_conn->lock); - int conn_index = connector->conn_index; - qd_failover_item_list_t conn_info_list = connector->conn_info_list; + if (connector) { + int i = 1; + int num_items = 0; - int conn_info_len = DEQ_SIZE(conn_info_list); + sys_mutex_lock(&connector->lock); - qd_failover_item_t *item = DEQ_HEAD(conn_info_list); + int conn_index = connector->conn_index; + qd_failover_item_list_t conn_info_list = connector->conn_info_list; - int arr_length = get_failover_info_length(conn_info_list); + int conn_info_len = DEQ_SIZE(conn_info_list); - // This is the string that will contain the comma separated failover list - char *failover_info = qd_calloc(arr_length + 1, sizeof(char)); - while(item) { + qd_failover_item_t *item = DEQ_HEAD(conn_info_list); - // Break out of the loop when we have hit all items in the list. - if (num_items >= conn_info_len) - break; + int arr_length = get_failover_info_length(conn_info_list); - if (num_items >= 1) { - strcat(failover_info, ", "); - } + // This is the string that will contain the comma separated failover list + char *failover_info = qd_calloc(arr_length + 1, sizeof(char)); + while (item) { - // We need to go to the elements in the list to get to the - // element that matches the connection index. This is the first - // url that the router will try to connect on failover. - if (conn_index == i) { - num_items += 1; - if (item->scheme) { - strcat(failover_info, item->scheme); - strcat(failover_info, "://"); - } - if (item->host_port) { - strcat(failover_info, item->host_port); + // Break out of the loop when we have hit all items in the list. + if (num_items >= conn_info_len) + break; + + if (num_items >= 1) { + strcat(failover_info, ", "); } - } - else { - if (num_items > 0) { + + // We need to go to the elements in the list to get to the + // element that matches the connection index. This is the first + // url that the router will try to connect on failover. + if (conn_index == i) { num_items += 1; if (item->scheme) { strcat(failover_info, item->scheme); @@ -220,191 +222,78 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl strcat(failover_info, item->host_port); } } - } + else { + if (num_items > 0) { + num_items += 1; + if (item->scheme) { + strcat(failover_info, item->scheme); + strcat(failover_info, "://"); + } + if (item->host_port) { + strcat(failover_info, item->host_port); + } + } + } - i += 1; + i += 1; - item = DEQ_NEXT(item); - if (item == 0) - item = DEQ_HEAD(conn_info_list); - } + item = DEQ_NEXT(item); + if (item == 0) + item = DEQ_HEAD(conn_info_list); + } - const char *state_info = 0; - switch (connector->state) { - case CXTR_STATE_CONNECTING: - state_info = "CONNECTING"; - break; - case CXTR_STATE_OPEN: - state_info = "SUCCESS"; - break; - case CXTR_STATE_FAILED: - state_info = "FAILED"; - break; - case CXTR_STATE_INIT: - state_info = "INITIALIZING"; - break; - case CXTR_STATE_DELETED: - // deleted by management, waiting for connection to close - state_info = "CLOSING"; - break; - default: - state_info = "UNKNOWN"; - break; - } + const char *state_info = 0; + switch (connector->state) { + case CXTR_STATE_CONNECTING: + state_info = "CONNECTING"; + break; + case CXTR_STATE_OPEN: + state_info = "SUCCESS"; + break; + case CXTR_STATE_FAILED: + state_info = "FAILED"; + break; + case CXTR_STATE_INIT: + state_info = "INITIALIZING"; + break; + case CXTR_STATE_DELETED: + // deleted by management, waiting for connection to close + state_info = "CLOSING"; + break; + default: + state_info = "UNKNOWN"; + break; + } - if (qd_entity_set_string(entity, "failoverUrls", failover_info) == 0 - && qd_entity_set_string(entity, "connectionStatus", state_info) == 0 - && qd_entity_set_string(entity, "connectionMsg", connector->conn_msg) == 0) { + // stop updating entity on first failure to capture the error code + if (qd_entity_set_string(entity, "failoverUrls", failover_info) == 0 + && qd_entity_set_string(entity, "connectionStatus", state_info) == 0 + && qd_entity_set_string(entity, "connectionMsg", connector->conn_msg) == 0) { + // error code not set - nothing to do + } sys_mutex_unlock(&connector->lock); + qd_connector_decref(connector); // release local reference free(failover_info); - return QD_ERROR_NONE; + } else { + qd_error(QD_ERROR_NOT_FOUND, "No active connector present"); } - sys_mutex_unlock(&connector->lock); - free(failover_info); return qd_error_code(); } -QD_EXPORT qd_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity) +QD_EXPORT qd_admin_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity) { - qd_connection_manager_t *cm = qd->connection_manager; - qd_connector_t *ct = qd_server_connector(qd->server); - qd_connector_list_t data_connectors = DEQ_EMPTY; - - qd_error_clear(); - - if (!ct) { - char *name = qd_entity_opt_string(entity, "name", "UNKNOWN"); - qd_error(QD_ERROR_CONFIG, "Failed to create Connector %s: resource allocation failed", name); - free(name); + qd_connection_manager_t *cm = qd->connection_manager; + qd_admin_connector_t *admin_conn = qd_admin_connector_create(qd, entity); + if (!admin_conn) { return 0; } - if (qd_server_config_load(qd, &ct->config, entity, false, 0) == QD_ERROR_NONE) { - ct->policy_vhost = qd_entity_opt_string(entity, "policyVhost", 0); CHECK(); - - // - // If an sslProfile is configured allocate a TLS config for this connector's connections - // - if (ct->config.ssl_profile_name) { - ct->tls_config = qd_tls_config(ct->config.ssl_profile_name, - QD_TLS_TYPE_PROTON_AMQP, - QD_TLS_CONFIG_CLIENT_MODE, - ct->config.verify_host_name, - ct->config.ssl_require_peer_authentication); - if (!ct->tls_config) { - // qd_tls2_config() has set the qd_error_message(), which is logged below - goto error; - } - } - - // - // If this connection has a data-connection-group, set up the group members now - // - if (ct->config.has_data_connectors) { - qd_generate_discriminator(ct->group_correlator); - for (int i = 0; i < qd->data_connection_count; i++) { - qd_connector_t *dc = qd_server_connector(qd->server); - if (!dc) { - qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", ct->config.name); - goto error; - } - - if (qd_server_config_load(qd, &dc->config, entity, false, "inter-router-data") != QD_ERROR_NONE) { - // qd_server_config_load will set qd_error() - qd_connector_decref(dc); - goto error; - } - - if (ct->tls_config) { - dc->tls_config = qd_tls_config(ct->config.ssl_profile_name, - QD_TLS_TYPE_PROTON_AMQP, - QD_TLS_CONFIG_CLIENT_MODE, - ct->config.verify_host_name, - ct->config.ssl_require_peer_authentication); - if (!dc->tls_config) { - // qd_tls2_config() has set the qd_error_message(), which is logged below - qd_connector_decref(dc); - goto error; - } - } - - strncpy(dc->group_correlator, ct->group_correlator, QD_DISCRIMINATOR_SIZE); - dc->is_data_connector = true; - qd_failover_item_t *item = NEW(qd_failover_item_t); - ZERO(item); - if (dc->config.ssl_required) - item->scheme = strdup("amqps"); - else - item->scheme = strdup("amqp"); - item->host = strdup(dc->config.host); - item->port = strdup(dc->config.port); - int hplen = strlen(item->host) + strlen(item->port) + 2; - item->host_port = malloc(hplen); - snprintf(item->host_port, hplen, "%s:%s", item->host , item->port); - DEQ_INSERT_TAIL(dc->conn_info_list, item); - - DEQ_INSERT_TAIL(data_connectors, dc); - } - } - - // - // Add the first item to the ct->conn_info_list - // The initial connection information and any backup connection information is stored in the conn_info_list - // - qd_failover_item_t *item = NEW(qd_failover_item_t); - ZERO(item); - if (ct->config.ssl_required) - item->scheme = strdup("amqps"); - else - item->scheme = strdup("amqp"); - - item->host = strdup(ct->config.host); - item->port = strdup(ct->config.port); - - int hplen = strlen(item->host) + strlen(item->port) + 2; - item->host_port = malloc(hplen); - snprintf(item->host_port, hplen, "%s:%s", item->host , item->port); - DEQ_INSERT_TAIL(ct->conn_info_list, item); - - // - // Set up the vanflow record for this connector (LINK) - // Do this only for router-to-router connectors since the record represents an inter-router link - // - if (strcmp(ct->config.role, "inter-router") == 0 || - strcmp(ct->config.role, "edge") == 0 || - strcmp(ct->config.role, "inter-edge") == 0) { - ct->vflow_record = vflow_start_record(VFLOW_RECORD_LINK, 0); - vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_NAME, ct->config.name); - vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_ROLE, ct->config.role); - vflow_set_uint64(ct->vflow_record, VFLOW_ATTRIBUTE_LINK_COST, ct->config.inter_router_cost); - vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down"); - vflow_set_uint64(ct->vflow_record, VFLOW_ATTRIBUTE_DOWN_COUNT, 0); - vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_PROTOCOL, item->scheme); - vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_HOST, item->host); - vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_PORT, item->port); - vflow_set_uint64(ct->vflow_record, VFLOW_ATTRIBUTE_OCTETS, 0); - vflow_set_uint64(ct->vflow_record, VFLOW_ATTRIBUTE_OCTETS_REVERSE, 0); - } - - DEQ_APPEND(cm->data_connectors, data_connectors); - DEQ_INSERT_TAIL(cm->connectors, ct); - log_config(&ct->config, "Connector", true); - return ct; - } - - error: - qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message()); - for (qd_connector_t *dc = DEQ_HEAD(data_connectors); dc; dc = DEQ_HEAD(data_connectors)) { - DEQ_REMOVE_HEAD(data_connectors); - dc->state = CXTR_STATE_DELETED; - qd_connector_decref(dc); - } - ct->state = CXTR_STATE_DELETED; - qd_connector_decref(ct); - return 0; + DEQ_INSERT_TAIL(cm->admin_connectors, admin_conn); + log_config(&admin_conn->config, "Connector", true); + return admin_conn; } @@ -416,8 +305,7 @@ qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd) cm->server = qd->server; DEQ_INIT(cm->listeners); - DEQ_INIT(cm->connectors); - DEQ_INIT(cm->data_connectors); + DEQ_INIT(cm->admin_connectors); return cm; } @@ -445,25 +333,11 @@ void qd_connection_manager_free(qd_connection_manager_t *cm) li = DEQ_HEAD(cm->listeners); } - qd_connector_list_t to_free; - DEQ_MOVE(cm->connectors, to_free); - DEQ_APPEND(to_free, cm->data_connectors); - - qd_connector_t *connector = DEQ_HEAD(to_free); - while (connector) { - DEQ_REMOVE_HEAD(to_free); - sys_mutex_lock(&connector->lock); - // setting DELETED below ensures the timer callback - // will not initiate a re-connect once we drop - // the lock - connector->state = CXTR_STATE_DELETED; - sys_mutex_unlock(&connector->lock); - // cannot cancel timer while holding lock since the - // callback takes the lock - qd_timer_cancel(connector->timer); - qd_connector_decref(connector); - - connector = DEQ_HEAD(to_free); + qd_admin_connector_t *admin_conn = DEQ_HEAD(cm->admin_connectors); + while (admin_conn) { + DEQ_REMOVE_HEAD(cm->admin_connectors); + qd_admin_connector_delete(admin_conn); + admin_conn = DEQ_HEAD(cm->admin_connectors); } free(cm); @@ -477,8 +351,7 @@ QD_EXPORT void qd_connection_manager_start(qd_dispatch_t *qd) { static bool first_start = true; qd_listener_t *li = DEQ_HEAD(qd->connection_manager->listeners); - qd_connector_t *ct = DEQ_HEAD(qd->connection_manager->connectors); - qd_connector_t *dc = DEQ_HEAD(qd->connection_manager->data_connectors); + qd_admin_connector_t *admin_conn = DEQ_HEAD(qd->connection_manager->admin_connectors); while (li) { if (!li->pn_listener) { @@ -493,24 +366,9 @@ QD_EXPORT void qd_connection_manager_start(qd_dispatch_t *qd) li = DEQ_NEXT(li); } - while (ct) { - if (ct->state == CXTR_STATE_OPEN || ct->state == CXTR_STATE_CONNECTING) { - ct = DEQ_NEXT(ct); - continue; - } - - qd_connector_connect(ct); - ct = DEQ_NEXT(ct); - } - - while (dc) { - if (dc->state == CXTR_STATE_OPEN || dc->state == CXTR_STATE_CONNECTING) { - dc = DEQ_NEXT(dc); - continue; - } - - qd_connector_connect(dc); - dc = DEQ_NEXT(dc); + while (admin_conn) { + qd_admin_connector_connect(admin_conn); + admin_conn = DEQ_NEXT(admin_conn); } first_start = false; @@ -536,66 +394,24 @@ QD_EXPORT void qd_connection_manager_delete_listener(qd_dispatch_t *qd, void *im } -static void deferred_close(void *context, bool discard) { - if (!discard) { - pn_connection_close((pn_connection_t*)context); - } -} - - // threading: called by management thread while I/O thread may be // referencing the qd_connector_t via the qd_connection_t // QD_EXPORT void qd_connection_manager_delete_connector(qd_dispatch_t *qd, void *impl) { - qd_connector_t *ct = (qd_connector_t*) impl; - if (ct) { - // cannot free the timer while holding ct->lock since the - // timer callback may be running during the call to qd_timer_free - qd_timer_t *timer = 0; - bool has_data_connectors = ct->config.has_data_connectors; - void *dct = qd_connection_new_qd_deferred_call_t(); - sys_mutex_lock(&ct->lock); - timer = ct->timer; - ct->timer = 0; - ct->state = CXTR_STATE_DELETED; - qd_connection_t *conn = ct->qd_conn; - if (conn && conn->pn_conn) { - qd_connection_invoke_deferred_impl(conn, deferred_close, conn->pn_conn, dct); - sys_mutex_unlock(&ct->lock); - } else { - sys_mutex_unlock(&ct->lock); - qd_connection_free_qd_deferred_call_t(dct); - } - qd_timer_free(timer); - if (ct->is_data_connector) { - DEQ_REMOVE(qd->connection_manager->data_connectors, ct); - } else { - log_config(&ct->config, "Connector", false); - DEQ_REMOVE(qd->connection_manager->connectors, ct); - } + qd_admin_connector_t *admin_conn = (qd_admin_connector_t*) impl; + assert(admin_conn); - // - // Remove correlated data connectors - // - if (has_data_connectors) { - qd_connector_t *dc = DEQ_HEAD(qd->connection_manager->data_connectors); - while (!!dc) { - qd_connector_t *next = DEQ_NEXT(dc); - if (strncmp(dc->group_correlator, ct->group_correlator, QD_DISCRIMINATOR_SIZE) == 0) { - qd_connection_manager_delete_connector(qd, (void*) dc); - } - dc = next; - } - } + // take it off the connection manager - qd_connector_decref(ct); - } + log_config(&admin_conn->config, "Connector", false); + DEQ_REMOVE(qd->connection_manager->admin_connectors, admin_conn); + qd_admin_connector_delete(admin_conn); } const char *qd_connector_name(qd_connector_t *ct) { - return ct ? ct->config.name : 0; + return ct ? ct->admin_conn->config.name : 0; } diff --git a/src/adaptors/amqp/qd_connection.c b/src/adaptors/amqp/qd_connection.c index 2e68e0459..e0a0f4ccf 100644 --- a/src/adaptors/amqp/qd_connection.c +++ b/src/adaptors/amqp/qd_connection.c @@ -150,7 +150,7 @@ static void decorate_connection(qd_connection_t *ctx, const qd_server_config_t * pn_data_put_int(pn_connection_properties(conn), QDR_ROLE_INTER_ROUTER_DATA); } - if (ctx->connector && (ctx->connector->is_data_connector || ctx->connector->config.has_data_connectors)) { + if (ctx->connector && (ctx->connector->is_data_connector || !!ctx->connector->admin_conn->data_connection_count)) { pn_data_put_symbol(pn_connection_properties(conn), pn_bytes(strlen(QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY), QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY)); pn_data_put_string(pn_connection_properties(conn), @@ -242,7 +242,7 @@ static void decorate_connection(qd_connection_t *ctx, const qd_server_config_t * * Does not allocate any managed objects and therefore * does not take ENTITY_CACHE lock. */ -void qd_connection_init(qd_connection_t *ctx, qd_server_t *server, qd_server_config_t *config, qd_connector_t *connector, qd_listener_t *listener) +void qd_connection_init(qd_connection_t *ctx, qd_server_t *server, const qd_server_config_t *config, qd_connector_t *connector, qd_listener_t *listener) { ctx->pn_conn = pn_connection(); assert(ctx->pn_conn); @@ -464,7 +464,7 @@ const qd_server_config_t *qd_connection_config(const qd_connection_t *conn) if (conn->listener) return &conn->listener->config; if (conn->connector) - return &conn->connector->config; + return &conn->connector->admin_conn->config; return NULL; } @@ -533,7 +533,7 @@ void qd_connection_invoke_deferred_calls(qd_connection_t *conn, bool discard) const char* qd_connection_name(const qd_connection_t *c) { if (c->connector) { - return c->connector->config.host_port; + return c->connector->admin_conn->config.host_port; } else { return c->rhost_port; } @@ -595,14 +595,14 @@ static void set_rhost_port(qd_connection_t *ctx) { static bool setup_ssl_sasl_and_open(qd_connection_t *ctx) { qd_connector_t *ct = ctx->connector; - const qd_server_config_t *config = &ct->config; + const qd_server_config_t *config = &ct->admin_conn->config; pn_transport_t *tport = pn_connection_transport(ctx->pn_conn); // // Create an SSL session if required // - if (ct->tls_config) { - ctx->ssl = qd_tls_session_amqp(ct->tls_config, tport, false); + if (ct->admin_conn->tls_config) { + ctx->ssl = qd_tls_session_amqp(ct->admin_conn->tls_config, tport, false); if (!ctx->ssl) { qd_log(LOG_SERVER, QD_LOG_ERROR, "Failed to create TLS session for connection [C%" PRIu64 "] to %s:%s (%s)", @@ -682,7 +682,7 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) { qd_log(LOG_SERVER, QD_LOG_INFO, "[C%" PRIu64 "] Accepted connection to %s from %s", ctx->connection_id, name, ctx->rhost_port); } else if (ctx->connector) { /* Establishing an outgoing connection */ - config = &ctx->connector->config; + config = &ctx->connector->admin_conn->config; if (!setup_ssl_sasl_and_open(ctx)) { qd_log(LOG_SERVER, QD_LOG_ERROR, "[C%" PRIu64 "] Connection aborted due to internal setup error", ctx->connection_id); diff --git a/src/adaptors/amqp/qd_connection.h b/src/adaptors/amqp/qd_connection.h index 658c633e3..c74040799 100644 --- a/src/adaptors/amqp/qd_connection.h +++ b/src/adaptors/amqp/qd_connection.h @@ -118,7 +118,7 @@ struct qd_connection_t { ALLOC_DECLARE_SAFE(qd_connection_t); // initialize a newly allocated qd_connection_t -void qd_connection_init(qd_connection_t *qd_conn, qd_server_t *server, qd_server_config_t *config, +void qd_connection_init(qd_connection_t *qd_conn, qd_server_t *server, const qd_server_config_t *config, qd_connector_t *connector, qd_listener_t *listener); qd_connector_t* qd_connection_connector(const qd_connection_t *c); diff --git a/src/adaptors/amqp/qd_connector.c b/src/adaptors/amqp/qd_connector.c index a033472c6..cbb1e2909 100644 --- a/src/adaptors/amqp/qd_connector.c +++ b/src/adaptors/amqp/qd_connector.c @@ -20,11 +20,13 @@ #include "qd_connector.h" #include "qd_connection.h" #include "private.h" +#include "entity.h" #include "qpid/dispatch/alloc_pool.h" #include "qpid/dispatch/timer.h" #include "qpid/dispatch/vanflow.h" #include "qpid/dispatch/tls_amqp.h" +#include "qpid/dispatch/dispatch.h" #include @@ -32,6 +34,7 @@ ALLOC_DEFINE(qd_connector_t); +ALLOC_DEFINE(qd_admin_connector_t); static qd_failover_item_t *qd_connector_get_conn_info_lh(qd_connector_t *ct) TA_REQ(ct->lock) @@ -51,7 +54,9 @@ static qd_failover_item_t *qd_connector_get_conn_info_lh(qd_connector_t *ct) TA_ static void try_open_lh(qd_connector_t *connector, qd_connection_t *qd_conn) TA_REQ(connector->lock) { assert(connector->state != CXTR_STATE_DELETED); - qd_connection_init(qd_conn, connector->server, &connector->config, connector, 0); + + const qd_admin_connector_t *admin_conn = connector->admin_conn; + qd_connection_init(qd_conn, admin_conn->server, &admin_conn->config, connector, 0); connector->state = CXTR_STATE_OPEN; connector->delay = 5000; @@ -70,7 +75,7 @@ static void try_open_lh(qd_connector_t *connector, qd_connection_t *qd_conn) TA_ // Set the sasl user name and password on the proton connection object. This has to be // done before pn_proactor_connect which will bind a transport to the connection - const qd_server_config_t *config = &connector->config; + const qd_server_config_t *config = &connector->admin_conn->config; if(config->sasl_username) pn_connection_set_user(qd_conn->pn_conn, config->sasl_username); if (config->sasl_password) @@ -78,7 +83,7 @@ static void try_open_lh(qd_connector_t *connector, qd_connection_t *qd_conn) TA_ qd_log(LOG_SERVER, QD_LOG_DEBUG, "[C%" PRIu64 "] Connecting to %s", qd_conn->connection_id, host_port); /* Note: the transport is configured in the PN_CONNECTION_BOUND event */ - pn_proactor_connect(qd_server_proactor(connector->server), qd_conn->pn_conn, host_port); + pn_proactor_connect(qd_server_proactor(connector->admin_conn->server), qd_conn->pn_conn, host_port); // at this point the qd_conn may now be scheduled on another thread } @@ -109,42 +114,84 @@ static void try_open_cb(void *context) } -const qd_server_config_t *qd_connector_config(const qd_connector_t *c) +/** Close the proton connection + * + * Scheduled on the target connections thread + */ +static void deferred_close(void *context, bool discard) +{ + if (!discard) { + pn_connection_close((pn_connection_t*)context); + } +} + + +const qd_server_config_t *qd_connector_get_config(const qd_connector_t *c) { - return &c->config; + return &c->admin_conn->config; } -qd_connector_t *qd_server_connector(qd_server_t *server) +qd_connector_t *qd_connector(qd_admin_connector_t *admin_conn, bool is_data_connector) { qd_connector_t *connector = new_qd_connector_t(); if (!connector) return 0; + ZERO(connector); - sys_atomic_init(&connector->ref_count, 1); + sys_atomic_init(&connector->ref_count, 1); // for caller DEQ_INIT(connector->conn_info_list); DEQ_ITEM_INIT(connector); sys_mutex_init(&connector->lock); - connector->timer = qd_timer(amqp_adaptor.dispatch, try_open_cb, connector); - if (!connector->timer) - goto error; + connector->timer = qd_timer(amqp_adaptor.dispatch, try_open_cb, connector); + connector->reconnect_enabled = true; + connector->is_data_connector = is_data_connector; + + connector->admin_conn = admin_conn; + sys_atomic_inc(&admin_conn->ref_count); - connector->server = server; connector->conn_index = 1; connector->state = CXTR_STATE_INIT; - return connector; + qd_failover_item_t *item = NEW(qd_failover_item_t); + ZERO(item); + if (admin_conn->config.ssl_required) + item->scheme = strdup("amqps"); + else + item->scheme = strdup("amqp"); + item->host = strdup(admin_conn->config.host); + item->port = strdup(admin_conn->config.port); + int hplen = strlen(item->host) + strlen(item->port) + 2; + item->host_port = malloc(hplen); + snprintf(item->host_port, hplen, "%s:%s", item->host , item->port); + DEQ_INSERT_TAIL(connector->conn_info_list, item); -error: - connector->state = CXTR_STATE_DELETED; - qd_connector_decref(connector); - return 0; + // + // Set up the vanflow record for this connector (LINK) + // Do this only for router-to-router connectors since the record represents an inter-router link + // + if ((strcmp(admin_conn->config.role, "inter-router") == 0 && !is_data_connector) || + strcmp(admin_conn->config.role, "edge") == 0 || + strcmp(admin_conn->config.role, "inter-edge") == 0) { + connector->vflow_record = vflow_start_record(VFLOW_RECORD_LINK, 0); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_NAME, admin_conn->config.name); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_ROLE, admin_conn->config.role); + vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_LINK_COST, admin_conn->config.inter_router_cost); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down"); + vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_DOWN_COUNT, 0); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_PROTOCOL, item->scheme); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_HOST, item->host); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_PORT, item->port); + vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_OCTETS, 0); + vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_OCTETS_REVERSE, 0); + } + return connector; } const char *qd_connector_policy_vhost(const qd_connector_t* ct) { - return ct->policy_vhost; + return ct->admin_conn->policy_vhost; } @@ -166,6 +213,32 @@ bool qd_connector_connect(qd_connector_t *ct) } +// Teardown the connection associated with the connector and +// prepare the connector for deletion +// +void qd_connector_close(qd_connector_t *ct) +{ + // cannot free the timer while holding ct->lock since the + // timer callback may be running during the call to qd_timer_free + qd_timer_t *timer = 0; + void *dct = qd_connection_new_qd_deferred_call_t(); + + sys_mutex_lock(&ct->lock); + timer = ct->timer; + ct->timer = 0; + ct->state = CXTR_STATE_DELETED; + qd_connection_t *conn = ct->qd_conn; + if (conn && conn->pn_conn) { + qd_connection_invoke_deferred_impl(conn, deferred_close, conn->pn_conn, dct); + sys_mutex_unlock(&ct->lock); + } else { + sys_mutex_unlock(&ct->lock); + qd_connection_free_qd_deferred_call_t(dct); + } + qd_timer_free(timer); +} + + void qd_connector_decref(qd_connector_t* connector) { if (!connector) return; @@ -175,11 +248,12 @@ void qd_connector_decref(qd_connector_t* connector) assert(connector->state == CXTR_STATE_DELETED); assert(connector->qd_conn == 0); + qd_admin_connector_decref(connector->admin_conn); vflow_end_record(connector->vflow_record); connector->vflow_record = 0; - qd_server_config_free(&connector->config); qd_timer_free(connector->timer); sys_mutex_free(&connector->lock); + sys_atomic_destroy(&connector->ref_count); qd_failover_item_t *item = DEQ_HEAD(connector->conn_info_list); while (item) { @@ -192,9 +266,6 @@ void qd_connector_decref(qd_connector_t* connector) free(item); item = DEQ_HEAD(connector->conn_info_list); } - if (connector->policy_vhost) free(connector->policy_vhost); - qd_tls_config_decref(connector->tls_config); - free_qd_connector_t(connector); } } @@ -229,7 +300,7 @@ static void increment_conn_index_lh(qd_connector_t *connector) TA_REQ(connector- */ void qd_connector_handle_transport_error(qd_connector_t *connector, uint64_t connection_id, pn_condition_t *condition) { - const qd_server_config_t *config = &connector->config; + const qd_server_config_t *config = &connector->admin_conn->config; char conn_msg[QD_CXTR_CONN_MSG_BUF_SIZE]; // avoid holding connector lock when logging char conn_msg_1[QD_CXTR_CONN_MSG_BUF_SIZE]; // this connection message does not contain the connection id @@ -299,7 +370,7 @@ void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx ctx->connector = connector; connector->qd_conn = ctx; - strncpy(ctx->group_correlator, connector->group_correlator, QD_DISCRIMINATOR_SIZE); + strncpy(ctx->group_correlator, connector->admin_conn->group_correlator, QD_DISCRIMINATOR_SIZE); } @@ -352,3 +423,148 @@ void qd_connector_remove_connection(qd_connector_t *connector, bool final, const // Drop reference held by connection. qd_connector_decref(connector); } + + +/** + * Create a new qd_admin_connector_t instance + */ +qd_admin_connector_t *qd_admin_connector_create(qd_dispatch_t *qd, qd_entity_t *entity) +{ + qd_admin_connector_t *admin_conn = new_qd_admin_connector_t(); + if (!admin_conn) { + char *name = qd_entity_opt_string(entity, "name", "UNKNOWN"); + qd_error(QD_ERROR_CONFIG, "Failed to create Connector %s: resource allocation failed", name); + free(name); + return 0; + } + + qd_error_clear(); + + ZERO(admin_conn); + DEQ_ITEM_INIT(admin_conn); + sys_atomic_init(&admin_conn->ref_count, 1); // for caller + sys_mutex_init(&admin_conn->lock); + admin_conn->server = qd_dispatch_get_server(qd); + DEQ_INIT(admin_conn->connectors); + + if (qd_server_config_load(&admin_conn->config, entity, false) != QD_ERROR_NONE) { + qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message()); + qd_admin_connector_decref(admin_conn); + return 0; + } + + admin_conn->policy_vhost = qd_entity_opt_string(entity, "policyVhost", 0); + if (qd_error_code()) { + qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message()); + qd_admin_connector_decref(admin_conn); + return 0; + } + + // + // If an sslProfile is configured allocate a TLS config to be used by all child connector's connections + // + if (admin_conn->config.ssl_profile_name) { + admin_conn->tls_config = qd_tls_config(admin_conn->config.ssl_profile_name, + QD_TLS_TYPE_PROTON_AMQP, + QD_TLS_CONFIG_CLIENT_MODE, + admin_conn->config.verify_host_name, + admin_conn->config.ssl_require_peer_authentication); + if (!admin_conn->tls_config) { + // qd_tls2_config() has set the qd_error_message(), which is logged below + goto error; + } + } + + // For inter-router connectors create associated inter-router data connectors if configured + + if (strcmp(admin_conn->config.role, "inter-router") == 0) { + admin_conn->data_connection_count = qd_dispatch_get_data_connection_count(qd); + if (!!admin_conn->data_connection_count) { + qd_generate_discriminator(admin_conn->group_correlator); + + // Add any data connectors to the head of the connectors list in the admin_connector first. This allows the + // router control connector to be located at the head of the list. + + for (int i = 0; i < admin_conn->data_connection_count; i++) { + qd_connector_t *dc = qd_connector(admin_conn, true); + if (!dc) { + qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", admin_conn->config.name); + goto error; + } + DEQ_INSERT_HEAD(admin_conn->connectors, dc); + } + } + } + + // Create the primary connector associated with this configuration. It will be located + // at the head of the connectors list + + qd_connector_t *ct = qd_connector(admin_conn, false); + if (!ct) { + qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", admin_conn->config.name); + goto error; + } + DEQ_INSERT_HEAD(admin_conn->connectors, ct); + + return admin_conn; + + error: + if (qd_error_code()) + qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message()); + for (qd_connector_t *dc = DEQ_HEAD(admin_conn->connectors); dc; dc = DEQ_HEAD(admin_conn->connectors)) { + DEQ_REMOVE_HEAD(admin_conn->connectors); + dc->state = CXTR_STATE_DELETED; + qd_connector_decref(dc); + } + qd_admin_connector_decref(admin_conn); + return 0; +} + + +void qd_admin_connector_delete(qd_admin_connector_t *admin_conn) +{ + qd_connector_t *ct = DEQ_HEAD(admin_conn->connectors); + while (ct) { + DEQ_REMOVE_HEAD(admin_conn->connectors); + qd_connector_close(ct); + qd_connector_decref(ct); + ct = DEQ_HEAD(admin_conn->connectors); + } + + // drop ref held by the caller + qd_admin_connector_decref(admin_conn); +} + + +void qd_admin_connector_decref(qd_admin_connector_t *admin_conn) +{ + if (!admin_conn) + return; + + uint32_t rc = sys_atomic_dec(&admin_conn->ref_count); + (void) rc; + assert(rc > 0); // else underflow + + if (rc == 1) { + // Expect: all connectors hold the ref_count so this must be empty + assert(DEQ_IS_EMPTY(admin_conn->connectors)); + sys_mutex_free(&admin_conn->lock); + sys_atomic_destroy(&admin_conn->ref_count); + free(admin_conn->policy_vhost); + qd_tls_config_decref(admin_conn->tls_config); + qd_server_config_free(&admin_conn->config); + free_qd_admin_connector_t(admin_conn); + } +} + + +// Initiate connections on all child connectors +void qd_admin_connector_connect(qd_admin_connector_t *admin_conn) +{ + if (!admin_conn->activated) { + admin_conn->activated = true; + for (qd_connector_t *ct = DEQ_HEAD(admin_conn->connectors); !!ct; ct = DEQ_NEXT(ct)) { + qd_connector_connect(ct); + } + } +} diff --git a/src/adaptors/amqp/qd_connector.h b/src/adaptors/amqp/qd_connector.h index 5ebdca9d2..a3971c9d1 100644 --- a/src/adaptors/amqp/qd_connector.h +++ b/src/adaptors/amqp/qd_connector.h @@ -34,6 +34,7 @@ typedef struct qd_server_t qd_server_t; typedef struct qd_connection_t qd_connection_t; typedef struct vflow_record_t vflow_record_t; typedef struct qd_tls_config_t qd_tls_config_t; +typedef struct qd_admin_connector_t qd_admin_connector_t; typedef enum { CXTR_STATE_INIT = 0, @@ -44,15 +45,17 @@ typedef enum { } cxtr_state_t; /** - * Connector objects represent the desire to create and maintain an outgoing transport connection. + * A qd_connector_t manages a single outgoing transport connection. It is responsible for re-establishing the connection + * should it fail. It is the child of a qd_admin_connector_t instance. */ typedef struct qd_connector_t { + + // Sibling connectors belonging to the same parent qd_admin_connector_t DEQ_LINKS(struct qd_connector_t); + qd_admin_connector_t *admin_conn; - /* Referenced by connection_manager and pn_connection_t */ + /* Referenced by parent qd_admin_connector_t and child qd_connection_t */ sys_atomic_t ref_count; - qd_server_t *server; - qd_server_config_t config; qd_timer_t *timer; long delay; @@ -62,16 +65,12 @@ typedef struct qd_connector_t { qd_connection_t *qd_conn; vflow_record_t *vflow_record; bool oper_status_down; // set when oper-status transitions to 'down' to avoid repeated error indications. + bool reconnect_enabled; // False: disable reconnect on connection drop + bool is_data_connector; // inter-router conn for streaming messages /* This conn_list contains all the connection information needed to make a connection. It also includes failover connection information */ qd_failover_item_list_t conn_info_list; int conn_index; // Which connection in the connection list to connect to next. - char *policy_vhost; /* Optional policy vhost name */ - qd_tls_config_t *tls_config; - - /* Connection group state */ - bool is_data_connector; - char group_correlator[QD_DISCRIMINATOR_SIZE]; /* holds proton transport error condition text on connection failure */ #define QD_CXTR_CONN_MSG_BUF_SIZE 300 @@ -80,20 +79,79 @@ typedef struct qd_connector_t { DEQ_DECLARE(qd_connector_t, qd_connector_list_t); -const qd_server_config_t *qd_connector_config(const qd_connector_t *c); + +/** + * An qd_admin_connector_t instance is created for each "connector" configuration object provisioned on the router. A + * connector may have one or more outgoing connections associated with it depending on the connectors role. The purpose + * of the qd_admin_connector_t is to manage a set of outgoing connections associated with the connector + * configuration. An qd_admin_connector_t will instantiate a qd_connector_t for each outgoing connection required by the + * connector configuration. + */ +struct qd_admin_connector_t { + DEQ_LINKS(struct qd_admin_connector_t); // connection_manager list + + /* Referenced by connection_manager and children qd_connector_t */ + sys_atomic_t ref_count; + qd_server_config_t config; + qd_server_t *server; + char *policy_vhost; /* Optional policy vhost name */ + qd_tls_config_t *tls_config; + uint32_t data_connection_count; // # of child inter-router data connections + + // The group correlation id for all child connections + char group_correlator[QD_DISCRIMINATOR_SIZE]; + + bool activated; // T: activated by connection manager + sys_mutex_t lock; // protect connectors list + qd_connector_list_t connectors; +}; + +DEQ_DECLARE(qd_admin_connector_t, qd_admin_connector_list_t); + +/** Management call to create an Admin Connector + */ +qd_admin_connector_t *qd_admin_connector_create(qd_dispatch_t *qd, qd_entity_t *entity); + +/** Management call to delete the Admin Connector + * + * This will close and release all child connector and connections then + * decrement the callers reference count to the admin connector. + */ +void qd_admin_connector_delete(qd_admin_connector_t *admin_conn); + +/** Management call to start all child connector connections + */ +void qd_admin_connector_connect(qd_admin_connector_t *admin_conn); + +/** Drop a reference to the Admin Connector + */ +void qd_admin_connector_decref(qd_admin_connector_t *admin_conn); + +/** + * Connector API + */ + +/** + * Create a new connector. + * Call qd_connector_connect() to initiate the outgoing connection + */ +qd_connector_t *qd_connector(qd_admin_connector_t *admin_conn, bool is_data_connector); /** * Initiate an outgoing connection. Returns true if successful. */ bool qd_connector_connect(qd_connector_t *ct); -// KAG: todo: fixme: -qd_connector_t *qd_server_connector(qd_server_t *server); +/** + * Close the associated connection and deactivate the connector + */ +void qd_connector_close(qd_connector_t *ct); -void qd_connector_decref(qd_connector_t* ct); +void qd_connector_decref(qd_connector_t *ct); +const qd_server_config_t *qd_connector_get_config(const qd_connector_t *ct); +const char *qd_connector_get_group_correlator(const qd_connector_t *ct); bool qd_connector_has_failover_info(const qd_connector_t* ct); - const char *qd_connector_policy_vhost(const qd_connector_t* ct); void qd_connector_handle_transport_error(qd_connector_t *connector, uint64_t connection_id, pn_condition_t *condition); void qd_connector_remote_opened(qd_connector_t *connector); diff --git a/src/adaptors/amqp/server_config.c b/src/adaptors/amqp/server_config.c index 86e7fba78..7f18c4522 100644 --- a/src/adaptors/amqp/server_config.c +++ b/src/adaptors/amqp/server_config.c @@ -95,7 +95,7 @@ static void set_config_host(qd_server_config_t *config, qd_entity_t* entity) /** * Initialize the server_config from the listener or connector management entity instance */ -qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config, qd_entity_t* entity, bool is_listener, const char *role_override) +qd_error_t qd_server_config_load(qd_server_config_t *config, qd_entity_t *entity, bool is_listener) { qd_error_clear(); @@ -109,14 +109,9 @@ qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config, config->message_log_flags = qd_message_repr_flags(config->log_message); config->port = qd_entity_get_string(entity, "port"); CHECK(); config->name = qd_entity_opt_string(entity, "name", 0); CHECK(); + config->role = qd_entity_get_string(entity, "role"); CHECK(); long inter_router_cost = qd_entity_opt_long(entity, "cost", 1); CHECK(); - if (!role_override) { - config->role = qd_entity_get_string(entity, "role"); CHECK(); - } else { - config->role = strdup(role_override); - } - // // The cost field on the listener or the connector should be > 0 and <= INT32_MAX // The router will terminate on invalid cost values. @@ -148,15 +143,6 @@ qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config, config->policy_vhost = qd_entity_opt_string(entity, "policyVhost", 0); CHECK(); config->conn_props = qd_entity_opt_map(entity, "openProperties"); CHECK(); - - if (strcmp(config->role, "inter-router") == 0) { - // For inter-router connections only, the dataConnectionCount defaults to "auto", - // which means it will be determined as a function of the number of worker threads. - // If the user has *not* explicitly set the value "0", - // then we will have some data connections. - config->has_data_connectors = true; - } - set_config_host(config, entity); if (config->sasl_password) { diff --git a/src/adaptors/amqp/server_config.h b/src/adaptors/amqp/server_config.h index cb2025af7..04390d150 100644 --- a/src/adaptors/amqp/server_config.h +++ b/src/adaptors/amqp/server_config.h @@ -268,8 +268,6 @@ typedef struct qd_server_config_t { */ pn_data_t *conn_props; - bool has_data_connectors; - /** * @name These fields are not primary configuration, they are computed. * @{ @@ -286,7 +284,7 @@ typedef struct qd_server_config_t { } qd_server_config_t; -qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *cf, qd_entity_t *entity, bool is_listener, const char *role_override); +qd_error_t qd_server_config_load(qd_server_config_t *cf, qd_entity_t *entity, bool is_listener); void qd_server_config_free(qd_server_config_t *cf); void qd_server_config_process_password(char **actual_val, char *pw, bool *is_file, bool allow_literal_prefix); void qd_set_password_from_file(const char *password_file, char **password_field); diff --git a/src/dispatch.c b/src/dispatch.c index 9e29654b3..3fcc89c1a 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -458,3 +458,13 @@ qd_policy_t *qd_dispatch_get_policy(const qd_dispatch_t *dispatch) { return qd->policy; } + +uint32_t qd_dispatch_get_data_connection_count(const qd_dispatch_t *dispatch) +{ + return qd->data_connection_count; +} + +qd_server_t *qd_dispatch_get_server(const qd_dispatch_t *dispatch) +{ + return qd->server; +} From ec68f0e5f97ebc1e525a77cd11d155a28d0bfbc2 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Thu, 6 Feb 2025 15:59:42 -0500 Subject: [PATCH 2/2] fixup: API name changes: o) rename qd_admin_connector_t --> qd_connector_config_t o) change "admin_conn" to "ctor_config" o) Use "ctor" as shorthand for "connector" in API and local vars --- src/adaptors/amqp/amqp_adaptor.c | 6 +- src/adaptors/amqp/connection_manager.c | 64 ++++----- src/adaptors/amqp/qd_connection.c | 14 +- src/adaptors/amqp/qd_connector.c | 186 ++++++++++++------------- src/adaptors/amqp/qd_connector.h | 95 ++++++------- 5 files changed, 183 insertions(+), 182 deletions(-) diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index 452310df3..9cfe1d517 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -1416,8 +1416,8 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool qd_router_connection_get_config(conn, &role, &cost, &name, &conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity); - if (connector && !!connector->admin_conn->data_connection_count) { - memcpy(conn->group_correlator, connector->admin_conn->group_correlator, QD_DISCRIMINATOR_SIZE); + if (connector && !!connector->ctor_config->data_connection_count) { + memcpy(conn->group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE); if (connector->is_data_connector) { // override the configured role to identify this as a data connection assert(role == QDR_ROLE_INTER_ROUTER); @@ -1602,7 +1602,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool if (!!connector) { qd_connector_add_link(connector); sys_mutex_lock(&connector->lock); - qd_format_string(connector->conn_msg, QD_CXTR_CONN_MSG_BUF_SIZE, + qd_format_string(connector->conn_msg, QD_CTOR_CONN_MSG_BUF_SIZE, "[C%"PRIu64"] Connection Opened: dir=%s host=%s encrypted=%s auth=%s user=%s container_id=%s", connection_id, inbound ? "in" : "out", host, encrypted ? proto : "no", authenticated ? mech : "no", (char*) user, container); diff --git a/src/adaptors/amqp/connection_manager.c b/src/adaptors/amqp/connection_manager.c index 0ccde98a1..6f56cfeda 100644 --- a/src/adaptors/amqp/connection_manager.c +++ b/src/adaptors/amqp/connection_manager.c @@ -44,7 +44,7 @@ struct qd_connection_manager_t { qd_server_t *server; qd_listener_list_t listeners; - qd_admin_connector_list_t admin_connectors; + qd_connector_config_list_t connector_configs; }; @@ -166,7 +166,7 @@ static int get_failover_info_length(qd_failover_item_list_t conn_info_list) */ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl) { - qd_admin_connector_t *admin_conn = (qd_admin_connector_t *) impl; + qd_connector_config_t *ctor_config = (qd_connector_config_t *) impl; qd_connector_t *connector = 0; qd_error_clear(); @@ -174,13 +174,13 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl // TODO(kgiusti): inter-router connections may have several qd_connector_ts active due to the router data connection // count configuration. However we can only report 1 connector via management. It would be more accurate to report // all connectors associated with this management entity - sys_mutex_lock(&admin_conn->lock); - connector = DEQ_HEAD(admin_conn->connectors); + sys_mutex_lock(&ctor_config->lock); + connector = DEQ_HEAD(ctor_config->connectors); if (connector) { // prevent I/O thread from freeing connector while it is being accessed sys_atomic_inc(&connector->ref_count); } - sys_mutex_unlock(&admin_conn->lock); + sys_mutex_unlock(&ctor_config->lock); if (connector) { int i = 1; @@ -244,19 +244,19 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl const char *state_info = 0; switch (connector->state) { - case CXTR_STATE_CONNECTING: + case CTOR_STATE_CONNECTING: state_info = "CONNECTING"; break; - case CXTR_STATE_OPEN: + case CTOR_STATE_OPEN: state_info = "SUCCESS"; break; - case CXTR_STATE_FAILED: + case CTOR_STATE_FAILED: state_info = "FAILED"; break; - case CXTR_STATE_INIT: + case CTOR_STATE_INIT: state_info = "INITIALIZING"; break; - case CXTR_STATE_DELETED: + case CTOR_STATE_DELETED: // deleted by management, waiting for connection to close state_info = "CLOSING"; break; @@ -283,17 +283,17 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl } -QD_EXPORT qd_admin_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity) +QD_EXPORT qd_connector_config_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity) { qd_connection_manager_t *cm = qd->connection_manager; - qd_admin_connector_t *admin_conn = qd_admin_connector_create(qd, entity); - if (!admin_conn) { + qd_connector_config_t *ctor_config = qd_connector_config_create(qd, entity); + if (!ctor_config) { return 0; } - DEQ_INSERT_TAIL(cm->admin_connectors, admin_conn); - log_config(&admin_conn->config, "Connector", true); - return admin_conn; + DEQ_INSERT_TAIL(cm->connector_configs, ctor_config); + log_config(&ctor_config->config, "Connector", true); + return ctor_config; } @@ -305,7 +305,7 @@ qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd) cm->server = qd->server; DEQ_INIT(cm->listeners); - DEQ_INIT(cm->admin_connectors); + DEQ_INIT(cm->connector_configs); return cm; } @@ -333,11 +333,11 @@ void qd_connection_manager_free(qd_connection_manager_t *cm) li = DEQ_HEAD(cm->listeners); } - qd_admin_connector_t *admin_conn = DEQ_HEAD(cm->admin_connectors); - while (admin_conn) { - DEQ_REMOVE_HEAD(cm->admin_connectors); - qd_admin_connector_delete(admin_conn); - admin_conn = DEQ_HEAD(cm->admin_connectors); + qd_connector_config_t *ctor_config = DEQ_HEAD(cm->connector_configs); + while (ctor_config) { + DEQ_REMOVE_HEAD(cm->connector_configs); + qd_connector_config_delete(ctor_config); + ctor_config = DEQ_HEAD(cm->connector_configs); } free(cm); @@ -351,7 +351,7 @@ QD_EXPORT void qd_connection_manager_start(qd_dispatch_t *qd) { static bool first_start = true; qd_listener_t *li = DEQ_HEAD(qd->connection_manager->listeners); - qd_admin_connector_t *admin_conn = DEQ_HEAD(qd->connection_manager->admin_connectors); + qd_connector_config_t *ctor_config = DEQ_HEAD(qd->connection_manager->connector_configs); while (li) { if (!li->pn_listener) { @@ -366,9 +366,9 @@ QD_EXPORT void qd_connection_manager_start(qd_dispatch_t *qd) li = DEQ_NEXT(li); } - while (admin_conn) { - qd_admin_connector_connect(admin_conn); - admin_conn = DEQ_NEXT(admin_conn); + while (ctor_config) { + qd_connector_config_connect(ctor_config); + ctor_config = DEQ_NEXT(ctor_config); } first_start = false; @@ -399,19 +399,19 @@ QD_EXPORT void qd_connection_manager_delete_listener(qd_dispatch_t *qd, void *im // QD_EXPORT void qd_connection_manager_delete_connector(qd_dispatch_t *qd, void *impl) { - qd_admin_connector_t *admin_conn = (qd_admin_connector_t*) impl; - assert(admin_conn); + qd_connector_config_t *ctor_config = (qd_connector_config_t *) impl; + assert(ctor_config); // take it off the connection manager - log_config(&admin_conn->config, "Connector", false); - DEQ_REMOVE(qd->connection_manager->admin_connectors, admin_conn); - qd_admin_connector_delete(admin_conn); + log_config(&ctor_config->config, "Connector", false); + DEQ_REMOVE(qd->connection_manager->connector_configs, ctor_config); + qd_connector_config_delete(ctor_config); } const char *qd_connector_name(qd_connector_t *ct) { - return ct ? ct->admin_conn->config.name : 0; + return ct ? ct->ctor_config->config.name : 0; } diff --git a/src/adaptors/amqp/qd_connection.c b/src/adaptors/amqp/qd_connection.c index e0a0f4ccf..fe56a6aec 100644 --- a/src/adaptors/amqp/qd_connection.c +++ b/src/adaptors/amqp/qd_connection.c @@ -150,7 +150,7 @@ static void decorate_connection(qd_connection_t *ctx, const qd_server_config_t * pn_data_put_int(pn_connection_properties(conn), QDR_ROLE_INTER_ROUTER_DATA); } - if (ctx->connector && (ctx->connector->is_data_connector || !!ctx->connector->admin_conn->data_connection_count)) { + if (ctx->connector && (ctx->connector->is_data_connector || !!ctx->connector->ctor_config->data_connection_count)) { pn_data_put_symbol(pn_connection_properties(conn), pn_bytes(strlen(QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY), QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY)); pn_data_put_string(pn_connection_properties(conn), @@ -464,7 +464,7 @@ const qd_server_config_t *qd_connection_config(const qd_connection_t *conn) if (conn->listener) return &conn->listener->config; if (conn->connector) - return &conn->connector->admin_conn->config; + return &conn->connector->ctor_config->config; return NULL; } @@ -533,7 +533,7 @@ void qd_connection_invoke_deferred_calls(qd_connection_t *conn, bool discard) const char* qd_connection_name(const qd_connection_t *c) { if (c->connector) { - return c->connector->admin_conn->config.host_port; + return c->connector->ctor_config->config.host_port; } else { return c->rhost_port; } @@ -595,14 +595,14 @@ static void set_rhost_port(qd_connection_t *ctx) { static bool setup_ssl_sasl_and_open(qd_connection_t *ctx) { qd_connector_t *ct = ctx->connector; - const qd_server_config_t *config = &ct->admin_conn->config; + const qd_server_config_t *config = &ct->ctor_config->config; pn_transport_t *tport = pn_connection_transport(ctx->pn_conn); // // Create an SSL session if required // - if (ct->admin_conn->tls_config) { - ctx->ssl = qd_tls_session_amqp(ct->admin_conn->tls_config, tport, false); + if (ct->ctor_config->tls_config) { + ctx->ssl = qd_tls_session_amqp(ct->ctor_config->tls_config, tport, false); if (!ctx->ssl) { qd_log(LOG_SERVER, QD_LOG_ERROR, "Failed to create TLS session for connection [C%" PRIu64 "] to %s:%s (%s)", @@ -682,7 +682,7 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) { qd_log(LOG_SERVER, QD_LOG_INFO, "[C%" PRIu64 "] Accepted connection to %s from %s", ctx->connection_id, name, ctx->rhost_port); } else if (ctx->connector) { /* Establishing an outgoing connection */ - config = &ctx->connector->admin_conn->config; + config = &ctx->connector->ctor_config->config; if (!setup_ssl_sasl_and_open(ctx)) { qd_log(LOG_SERVER, QD_LOG_ERROR, "[C%" PRIu64 "] Connection aborted due to internal setup error", ctx->connection_id); diff --git a/src/adaptors/amqp/qd_connector.c b/src/adaptors/amqp/qd_connector.c index cbb1e2909..be39cdb78 100644 --- a/src/adaptors/amqp/qd_connector.c +++ b/src/adaptors/amqp/qd_connector.c @@ -34,7 +34,7 @@ ALLOC_DEFINE(qd_connector_t); -ALLOC_DEFINE(qd_admin_connector_t); +ALLOC_DEFINE(qd_connector_config_t); static qd_failover_item_t *qd_connector_get_conn_info_lh(qd_connector_t *ct) TA_REQ(ct->lock) @@ -53,12 +53,12 @@ static qd_failover_item_t *qd_connector_get_conn_info_lh(qd_connector_t *ct) TA_ /* Timer callback to try/retry connection open, connector->lock held */ static void try_open_lh(qd_connector_t *connector, qd_connection_t *qd_conn) TA_REQ(connector->lock) { - assert(connector->state != CXTR_STATE_DELETED); + assert(connector->state != CTOR_STATE_DELETED); - const qd_admin_connector_t *admin_conn = connector->admin_conn; - qd_connection_init(qd_conn, admin_conn->server, &admin_conn->config, connector, 0); + const qd_connector_config_t *ctor_config = connector->ctor_config; + qd_connection_init(qd_conn, ctor_config->server, &ctor_config->config, connector, 0); - connector->state = CXTR_STATE_OPEN; + connector->state = CTOR_STATE_OPEN; connector->delay = 5000; // @@ -75,7 +75,7 @@ static void try_open_lh(qd_connector_t *connector, qd_connection_t *qd_conn) TA_ // Set the sasl user name and password on the proton connection object. This has to be // done before pn_proactor_connect which will bind a transport to the connection - const qd_server_config_t *config = &connector->admin_conn->config; + const qd_server_config_t *config = &connector->ctor_config->config; if(config->sasl_username) pn_connection_set_user(qd_conn->pn_conn, config->sasl_username); if (config->sasl_password) @@ -83,7 +83,7 @@ static void try_open_lh(qd_connector_t *connector, qd_connection_t *qd_conn) TA_ qd_log(LOG_SERVER, QD_LOG_DEBUG, "[C%" PRIu64 "] Connecting to %s", qd_conn->connection_id, host_port); /* Note: the transport is configured in the PN_CONNECTION_BOUND event */ - pn_proactor_connect(qd_server_proactor(connector->admin_conn->server), qd_conn->pn_conn, host_port); + pn_proactor_connect(qd_server_proactor(connector->ctor_config->server), qd_conn->pn_conn, host_port); // at this point the qd_conn may now be scheduled on another thread } @@ -101,9 +101,9 @@ static void try_open_cb(void *context) sys_mutex_lock(&ct->lock); - if (ct->state == CXTR_STATE_CONNECTING || ct->state == CXTR_STATE_INIT) { + if (ct->state == CTOR_STATE_CONNECTING || ct->state == CTOR_STATE_INIT) { // else deleted or failed - on failed wait until after connection is freed - // and state is set to CXTR_STATE_CONNECTING (timer is rescheduled then) + // and state is set to CTOR_STATE_CONNECTING (timer is rescheduled then) try_open_lh(ct, ctx); ctx = 0; // owned by ct } @@ -128,11 +128,11 @@ static void deferred_close(void *context, bool discard) const qd_server_config_t *qd_connector_get_config(const qd_connector_t *c) { - return &c->admin_conn->config; + return &c->ctor_config->config; } -qd_connector_t *qd_connector(qd_admin_connector_t *admin_conn, bool is_data_connector) +qd_connector_t *qd_connector(qd_connector_config_t *ctor_config, bool is_data_connector) { qd_connector_t *connector = new_qd_connector_t(); if (!connector) return 0; @@ -147,20 +147,20 @@ qd_connector_t *qd_connector(qd_admin_connector_t *admin_conn, bool is_data_conn connector->reconnect_enabled = true; connector->is_data_connector = is_data_connector; - connector->admin_conn = admin_conn; - sys_atomic_inc(&admin_conn->ref_count); + connector->ctor_config = ctor_config; + sys_atomic_inc(&ctor_config->ref_count); connector->conn_index = 1; - connector->state = CXTR_STATE_INIT; + connector->state = CTOR_STATE_INIT; qd_failover_item_t *item = NEW(qd_failover_item_t); ZERO(item); - if (admin_conn->config.ssl_required) + if (ctor_config->config.ssl_required) item->scheme = strdup("amqps"); else item->scheme = strdup("amqp"); - item->host = strdup(admin_conn->config.host); - item->port = strdup(admin_conn->config.port); + item->host = strdup(ctor_config->config.host); + item->port = strdup(ctor_config->config.port); int hplen = strlen(item->host) + strlen(item->port) + 2; item->host_port = malloc(hplen); snprintf(item->host_port, hplen, "%s:%s", item->host , item->port); @@ -170,13 +170,13 @@ qd_connector_t *qd_connector(qd_admin_connector_t *admin_conn, bool is_data_conn // Set up the vanflow record for this connector (LINK) // Do this only for router-to-router connectors since the record represents an inter-router link // - if ((strcmp(admin_conn->config.role, "inter-router") == 0 && !is_data_connector) || - strcmp(admin_conn->config.role, "edge") == 0 || - strcmp(admin_conn->config.role, "inter-edge") == 0) { + if ((strcmp(ctor_config->config.role, "inter-router") == 0 && !is_data_connector) || + strcmp(ctor_config->config.role, "edge") == 0 || + strcmp(ctor_config->config.role, "inter-edge") == 0) { connector->vflow_record = vflow_start_record(VFLOW_RECORD_LINK, 0); - vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_NAME, admin_conn->config.name); - vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_ROLE, admin_conn->config.role); - vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_LINK_COST, admin_conn->config.inter_router_cost); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_NAME, ctor_config->config.name); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_ROLE, ctor_config->config.role); + vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_LINK_COST, ctor_config->config.inter_router_cost); vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down"); vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_DOWN_COUNT, 0); vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_PROTOCOL, item->scheme); @@ -191,19 +191,19 @@ qd_connector_t *qd_connector(qd_admin_connector_t *admin_conn, bool is_data_conn const char *qd_connector_policy_vhost(const qd_connector_t* ct) { - return ct->admin_conn->policy_vhost; + return ct->ctor_config->policy_vhost; } bool qd_connector_connect(qd_connector_t *ct) { sys_mutex_lock(&ct->lock); - if (ct->state != CXTR_STATE_DELETED) { + if (ct->state != CTOR_STATE_DELETED) { // expect: do not attempt to connect an already connected qd_connection assert(ct->qd_conn == 0); ct->qd_conn = 0; ct->delay = 0; - ct->state = CXTR_STATE_CONNECTING; + ct->state = CTOR_STATE_CONNECTING; qd_timer_schedule(ct->timer, ct->delay); sys_mutex_unlock(&ct->lock); return true; @@ -226,7 +226,7 @@ void qd_connector_close(qd_connector_t *ct) sys_mutex_lock(&ct->lock); timer = ct->timer; ct->timer = 0; - ct->state = CXTR_STATE_DELETED; + ct->state = CTOR_STATE_DELETED; qd_connection_t *conn = ct->qd_conn; if (conn && conn->pn_conn) { qd_connection_invoke_deferred_impl(conn, deferred_close, conn->pn_conn, dct); @@ -245,10 +245,10 @@ void qd_connector_decref(qd_connector_t* connector) if (sys_atomic_dec(&connector->ref_count) == 1) { // expect both mgmt and qd_connection no longer reference this - assert(connector->state == CXTR_STATE_DELETED); + assert(connector->state == CTOR_STATE_DELETED); assert(connector->qd_conn == 0); - qd_admin_connector_decref(connector->admin_conn); + qd_connector_config_decref(connector->ctor_config); vflow_end_record(connector->vflow_record); connector->vflow_record = 0; qd_timer_free(connector->timer); @@ -300,16 +300,16 @@ static void increment_conn_index_lh(qd_connector_t *connector) TA_REQ(connector- */ void qd_connector_handle_transport_error(qd_connector_t *connector, uint64_t connection_id, pn_condition_t *condition) { - const qd_server_config_t *config = &connector->admin_conn->config; - char conn_msg[QD_CXTR_CONN_MSG_BUF_SIZE]; // avoid holding connector lock when logging - char conn_msg_1[QD_CXTR_CONN_MSG_BUF_SIZE]; // this connection message does not contain the connection id + const qd_server_config_t *config = &connector->ctor_config->config; + char conn_msg[QD_CTOR_CONN_MSG_BUF_SIZE]; // avoid holding connector lock when logging + char conn_msg_1[QD_CTOR_CONN_MSG_BUF_SIZE]; // this connection message does not contain the connection id bool log_error_message = false; sys_mutex_lock(&connector->lock); - if (connector->state != CXTR_STATE_DELETED) { + if (connector->state != CTOR_STATE_DELETED) { increment_conn_index_lh(connector); // note: will transition back to STATE_CONNECTING when associated connection is freed (pn_connection_free) - connector->state = CXTR_STATE_FAILED; + connector->state = CTOR_STATE_FAILED; if (condition && pn_condition_is_set(condition)) { qd_format_string(conn_msg, sizeof(conn_msg), "[C%"PRIu64"] Connection to %s failed: %s %s", connection_id, config->host_port, pn_condition_get_name(condition), @@ -333,7 +333,7 @@ void qd_connector_handle_transport_error(qd_connector_t *connector, uint64_t con // if (strcmp(connector->conn_msg, conn_msg_1) != 0) { - strncpy(connector->conn_msg, conn_msg_1, QD_CXTR_CONN_MSG_BUF_SIZE); + strncpy(connector->conn_msg, conn_msg_1, QD_CTOR_CONN_MSG_BUF_SIZE); log_error_message = true; } } @@ -370,7 +370,7 @@ void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx ctx->connector = connector; connector->qd_conn = ctx; - strncpy(ctx->group_correlator, connector->admin_conn->group_correlator, QD_DISCRIMINATOR_SIZE); + strncpy(ctx->group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE); } @@ -404,7 +404,7 @@ void qd_connector_remove_connection(qd_connector_t *connector, bool final, const connector->qd_conn = 0; ctx->connector = 0; - if (connector->state != CXTR_STATE_DELETED) { + if (connector->state != CTOR_STATE_DELETED) { // Increment the connection index by so that we can try connecting to the failover url (if any). bool has_failover = qd_connector_has_failover_info(connector); long delay = connector->delay; @@ -415,7 +415,7 @@ void qd_connector_remove_connection(qd_connector_t *connector, bool final, const // We want to quickly keep cycling thru the failover urls every second. delay = 1000; } - connector->state = CXTR_STATE_CONNECTING; + connector->state = CTOR_STATE_CONNECTING; qd_timer_schedule(connector->timer, delay); } sys_mutex_unlock(&connector->lock); @@ -426,12 +426,12 @@ void qd_connector_remove_connection(qd_connector_t *connector, bool final, const /** - * Create a new qd_admin_connector_t instance + * Create a new qd_connector_config_t instance */ -qd_admin_connector_t *qd_admin_connector_create(qd_dispatch_t *qd, qd_entity_t *entity) +qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t *entity) { - qd_admin_connector_t *admin_conn = new_qd_admin_connector_t(); - if (!admin_conn) { + qd_connector_config_t *ctor_config = new_qd_connector_config_t(); + if (!ctor_config) { char *name = qd_entity_opt_string(entity, "name", "UNKNOWN"); qd_error(QD_ERROR_CONFIG, "Failed to create Connector %s: resource allocation failed", name); free(name); @@ -440,36 +440,36 @@ qd_admin_connector_t *qd_admin_connector_create(qd_dispatch_t *qd, qd_entity_t * qd_error_clear(); - ZERO(admin_conn); - DEQ_ITEM_INIT(admin_conn); - sys_atomic_init(&admin_conn->ref_count, 1); // for caller - sys_mutex_init(&admin_conn->lock); - admin_conn->server = qd_dispatch_get_server(qd); - DEQ_INIT(admin_conn->connectors); + ZERO(ctor_config); + DEQ_ITEM_INIT(ctor_config); + sys_atomic_init(&ctor_config->ref_count, 1); // for caller + sys_mutex_init(&ctor_config->lock); + ctor_config->server = qd_dispatch_get_server(qd); + DEQ_INIT(ctor_config->connectors); - if (qd_server_config_load(&admin_conn->config, entity, false) != QD_ERROR_NONE) { + if (qd_server_config_load(&ctor_config->config, entity, false) != QD_ERROR_NONE) { qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message()); - qd_admin_connector_decref(admin_conn); + qd_connector_config_decref(ctor_config); return 0; } - admin_conn->policy_vhost = qd_entity_opt_string(entity, "policyVhost", 0); + ctor_config->policy_vhost = qd_entity_opt_string(entity, "policyVhost", 0); if (qd_error_code()) { qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message()); - qd_admin_connector_decref(admin_conn); + qd_connector_config_decref(ctor_config); return 0; } // // If an sslProfile is configured allocate a TLS config to be used by all child connector's connections // - if (admin_conn->config.ssl_profile_name) { - admin_conn->tls_config = qd_tls_config(admin_conn->config.ssl_profile_name, + if (ctor_config->config.ssl_profile_name) { + ctor_config->tls_config = qd_tls_config(ctor_config->config.ssl_profile_name, QD_TLS_TYPE_PROTON_AMQP, QD_TLS_CONFIG_CLIENT_MODE, - admin_conn->config.verify_host_name, - admin_conn->config.ssl_require_peer_authentication); - if (!admin_conn->tls_config) { + ctor_config->config.verify_host_name, + ctor_config->config.ssl_require_peer_authentication); + if (!ctor_config->tls_config) { // qd_tls2_config() has set the qd_error_message(), which is logged below goto error; } @@ -477,21 +477,21 @@ qd_admin_connector_t *qd_admin_connector_create(qd_dispatch_t *qd, qd_entity_t * // For inter-router connectors create associated inter-router data connectors if configured - if (strcmp(admin_conn->config.role, "inter-router") == 0) { - admin_conn->data_connection_count = qd_dispatch_get_data_connection_count(qd); - if (!!admin_conn->data_connection_count) { - qd_generate_discriminator(admin_conn->group_correlator); + if (strcmp(ctor_config->config.role, "inter-router") == 0) { + ctor_config->data_connection_count = qd_dispatch_get_data_connection_count(qd); + if (!!ctor_config->data_connection_count) { + qd_generate_discriminator(ctor_config->group_correlator); - // Add any data connectors to the head of the connectors list in the admin_connector first. This allows the + // Add any data connectors to the head of the connectors list first. This allows the // router control connector to be located at the head of the list. - for (int i = 0; i < admin_conn->data_connection_count; i++) { - qd_connector_t *dc = qd_connector(admin_conn, true); + for (int i = 0; i < ctor_config->data_connection_count; i++) { + qd_connector_t *dc = qd_connector(ctor_config, true); if (!dc) { - qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", admin_conn->config.name); + qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", ctor_config->config.name); goto error; } - DEQ_INSERT_HEAD(admin_conn->connectors, dc); + DEQ_INSERT_HEAD(ctor_config->connectors, dc); } } } @@ -499,71 +499,71 @@ qd_admin_connector_t *qd_admin_connector_create(qd_dispatch_t *qd, qd_entity_t * // Create the primary connector associated with this configuration. It will be located // at the head of the connectors list - qd_connector_t *ct = qd_connector(admin_conn, false); + qd_connector_t *ct = qd_connector(ctor_config, false); if (!ct) { - qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", admin_conn->config.name); + qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", ctor_config->config.name); goto error; } - DEQ_INSERT_HEAD(admin_conn->connectors, ct); + DEQ_INSERT_HEAD(ctor_config->connectors, ct); - return admin_conn; + return ctor_config; error: if (qd_error_code()) qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message()); - for (qd_connector_t *dc = DEQ_HEAD(admin_conn->connectors); dc; dc = DEQ_HEAD(admin_conn->connectors)) { - DEQ_REMOVE_HEAD(admin_conn->connectors); - dc->state = CXTR_STATE_DELETED; + for (qd_connector_t *dc = DEQ_HEAD(ctor_config->connectors); dc; dc = DEQ_HEAD(ctor_config->connectors)) { + DEQ_REMOVE_HEAD(ctor_config->connectors); + dc->state = CTOR_STATE_DELETED; qd_connector_decref(dc); } - qd_admin_connector_decref(admin_conn); + qd_connector_config_decref(ctor_config); return 0; } -void qd_admin_connector_delete(qd_admin_connector_t *admin_conn) +void qd_connector_config_delete(qd_connector_config_t *ctor_config) { - qd_connector_t *ct = DEQ_HEAD(admin_conn->connectors); + qd_connector_t *ct = DEQ_HEAD(ctor_config->connectors); while (ct) { - DEQ_REMOVE_HEAD(admin_conn->connectors); + DEQ_REMOVE_HEAD(ctor_config->connectors); qd_connector_close(ct); qd_connector_decref(ct); - ct = DEQ_HEAD(admin_conn->connectors); + ct = DEQ_HEAD(ctor_config->connectors); } // drop ref held by the caller - qd_admin_connector_decref(admin_conn); + qd_connector_config_decref(ctor_config); } -void qd_admin_connector_decref(qd_admin_connector_t *admin_conn) +void qd_connector_config_decref(qd_connector_config_t *ctor_config) { - if (!admin_conn) + if (!ctor_config) return; - uint32_t rc = sys_atomic_dec(&admin_conn->ref_count); + uint32_t rc = sys_atomic_dec(&ctor_config->ref_count); (void) rc; assert(rc > 0); // else underflow if (rc == 1) { // Expect: all connectors hold the ref_count so this must be empty - assert(DEQ_IS_EMPTY(admin_conn->connectors)); - sys_mutex_free(&admin_conn->lock); - sys_atomic_destroy(&admin_conn->ref_count); - free(admin_conn->policy_vhost); - qd_tls_config_decref(admin_conn->tls_config); - qd_server_config_free(&admin_conn->config); - free_qd_admin_connector_t(admin_conn); + assert(DEQ_IS_EMPTY(ctor_config->connectors)); + sys_mutex_free(&ctor_config->lock); + sys_atomic_destroy(&ctor_config->ref_count); + free(ctor_config->policy_vhost); + qd_tls_config_decref(ctor_config->tls_config); + qd_server_config_free(&ctor_config->config); + free_qd_connector_config_t(ctor_config); } } // Initiate connections on all child connectors -void qd_admin_connector_connect(qd_admin_connector_t *admin_conn) +void qd_connector_config_connect(qd_connector_config_t *ctor_config) { - if (!admin_conn->activated) { - admin_conn->activated = true; - for (qd_connector_t *ct = DEQ_HEAD(admin_conn->connectors); !!ct; ct = DEQ_NEXT(ct)) { + if (!ctor_config->activated) { + ctor_config->activated = true; + for (qd_connector_t *ct = DEQ_HEAD(ctor_config->connectors); !!ct; ct = DEQ_NEXT(ct)) { qd_connector_connect(ct); } } diff --git a/src/adaptors/amqp/qd_connector.h b/src/adaptors/amqp/qd_connector.h index a3971c9d1..667fc743b 100644 --- a/src/adaptors/amqp/qd_connector.h +++ b/src/adaptors/amqp/qd_connector.h @@ -34,34 +34,34 @@ typedef struct qd_server_t qd_server_t; typedef struct qd_connection_t qd_connection_t; typedef struct vflow_record_t vflow_record_t; typedef struct qd_tls_config_t qd_tls_config_t; -typedef struct qd_admin_connector_t qd_admin_connector_t; +typedef struct qd_connector_config_t qd_connector_config_t; typedef enum { - CXTR_STATE_INIT = 0, - CXTR_STATE_CONNECTING, - CXTR_STATE_OPEN, - CXTR_STATE_FAILED, - CXTR_STATE_DELETED // by management -} cxtr_state_t; + CTOR_STATE_INIT = 0, + CTOR_STATE_CONNECTING, + CTOR_STATE_OPEN, + CTOR_STATE_FAILED, + CTOR_STATE_DELETED // by management +} connector_state_t; /** - * A qd_connector_t manages a single outgoing transport connection. It is responsible for re-establishing the connection - * should it fail. It is the child of a qd_admin_connector_t instance. + * A qd_connector_t manages a single outgoing AMQP network connection connection (represented by a qd_connection_t + * instance). It is responsible for re-establishing the network connection should it fail. */ typedef struct qd_connector_t { - // Sibling connectors belonging to the same parent qd_admin_connector_t + // Sibling connectors sharing the same qd_connector_config_t DEQ_LINKS(struct qd_connector_t); - qd_admin_connector_t *admin_conn; + qd_connector_config_t *ctor_config; - /* Referenced by parent qd_admin_connector_t and child qd_connection_t */ + /* Referenced by parent qd_connector_config_t and child qd_connection_t */ sys_atomic_t ref_count; qd_timer_t *timer; long delay; - /* Connector state and ctx can be modified by I/O or management threads. */ + /* Connector state and qd_conn can be modified by I/O or management threads. */ sys_mutex_t lock; - cxtr_state_t state; + connector_state_t state; qd_connection_t *qd_conn; vflow_record_t *vflow_record; bool oper_status_down; // set when oper-status transitions to 'down' to avoid repeated error indications. @@ -73,22 +73,22 @@ typedef struct qd_connector_t { int conn_index; // Which connection in the connection list to connect to next. /* holds proton transport error condition text on connection failure */ -#define QD_CXTR_CONN_MSG_BUF_SIZE 300 - char conn_msg[QD_CXTR_CONN_MSG_BUF_SIZE]; +#define QD_CTOR_CONN_MSG_BUF_SIZE 300 + char conn_msg[QD_CTOR_CONN_MSG_BUF_SIZE]; } qd_connector_t; DEQ_DECLARE(qd_connector_t, qd_connector_list_t); /** - * An qd_admin_connector_t instance is created for each "connector" configuration object provisioned on the router. A - * connector may have one or more outgoing connections associated with it depending on the connectors role. The purpose - * of the qd_admin_connector_t is to manage a set of outgoing connections associated with the connector - * configuration. An qd_admin_connector_t will instantiate a qd_connector_t for each outgoing connection required by the - * connector configuration. + * An qd_connector_config_t instance is created for each "connector" configuration object provisioned on the router. It + * holds the configuration information that is used for outgoing AMQP connections. The qd_connector_config_t instance + * will be used to construct one or more qd_connection_t instances that share that configuration data. + * + * qd_connector_config_t instances are managed by the Connection Manager. */ -struct qd_admin_connector_t { - DEQ_LINKS(struct qd_admin_connector_t); // connection_manager list +struct qd_connector_config_t { + DEQ_LINKS(struct qd_connector_config_t); // connection_manager list /* Referenced by connection_manager and children qd_connector_t */ sys_atomic_t ref_count; @@ -106,26 +106,27 @@ struct qd_admin_connector_t { qd_connector_list_t connectors; }; -DEQ_DECLARE(qd_admin_connector_t, qd_admin_connector_list_t); +DEQ_DECLARE(qd_connector_config_t, qd_connector_config_list_t); -/** Management call to create an Admin Connector +/** Management call to instantiate a qd_connector_config_t from a configuration entity */ -qd_admin_connector_t *qd_admin_connector_create(qd_dispatch_t *qd, qd_entity_t *entity); +qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t *entity); -/** Management call to delete the Admin Connector +/** Management call to delete a qd_connector_config_t * - * This will close and release all child connector and connections then - * decrement the callers reference count to the admin connector. + * This will close and release all child connector and connections, then decrement the callers reference count to the + * qd_connector_config_t instance. */ -void qd_admin_connector_delete(qd_admin_connector_t *admin_conn); +void qd_connector_config_delete(qd_connector_config_t *ctor_config); -/** Management call to start all child connector connections +/** Management call start all child connections for the given configuration instance */ -void qd_admin_connector_connect(qd_admin_connector_t *admin_conn); +void qd_connector_config_connect(qd_connector_config_t *ctor_config); -/** Drop a reference to the Admin Connector +/** Drop a reference to the configuration instance. + * This may free the given instance. */ -void qd_admin_connector_decref(qd_admin_connector_t *admin_conn); +void qd_connector_config_decref(qd_connector_config_t *ctor_config); /** * Connector API @@ -135,33 +136,33 @@ void qd_admin_connector_decref(qd_admin_connector_t *admin_conn); * Create a new connector. * Call qd_connector_connect() to initiate the outgoing connection */ -qd_connector_t *qd_connector(qd_admin_connector_t *admin_conn, bool is_data_connector); +qd_connector_t *qd_connector(qd_connector_config_t *ctor_config, bool is_data_connector); /** * Initiate an outgoing connection. Returns true if successful. */ -bool qd_connector_connect(qd_connector_t *ct); +bool qd_connector_connect(qd_connector_t *ctor); /** * Close the associated connection and deactivate the connector */ -void qd_connector_close(qd_connector_t *ct); +void qd_connector_close(qd_connector_t *ctor); -void qd_connector_decref(qd_connector_t *ct); +void qd_connector_decref(qd_connector_t *ctor); -const qd_server_config_t *qd_connector_get_config(const qd_connector_t *ct); -const char *qd_connector_get_group_correlator(const qd_connector_t *ct); -bool qd_connector_has_failover_info(const qd_connector_t* ct); -const char *qd_connector_policy_vhost(const qd_connector_t* ct); -void qd_connector_handle_transport_error(qd_connector_t *connector, uint64_t connection_id, pn_condition_t *condition); -void qd_connector_remote_opened(qd_connector_t *connector); +const qd_server_config_t *qd_connector_get_config(const qd_connector_t *ctor); +const char *qd_connector_get_group_correlator(const qd_connector_t *ctor); +bool qd_connector_has_failover_info(const qd_connector_t* ctor); +const char *qd_connector_policy_vhost(const qd_connector_t* ctor); +void qd_connector_handle_transport_error(qd_connector_t *ctor, uint64_t connection_id, pn_condition_t *condition); +void qd_connector_remote_opened(qd_connector_t *ctor); // add a new connection to the parent connector -void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx); -void qd_connector_add_link(qd_connector_t *connector); +void qd_connector_add_connection(qd_connector_t *ctor, qd_connection_t *qd_conn); +void qd_connector_add_link(qd_connector_t *ctor); // remove the child connection // NOTE WELL: this may free the connector if the connection is holding the last // reference to it -void qd_connector_remove_connection(qd_connector_t *connector, bool final, const char *condition_name, const char *condition_description); +void qd_connector_remove_connection(qd_connector_t *ctor, bool final, const char *condition_name, const char *condition_description); #endif