diff --git a/include/qpid/dispatch/dispatch.h b/include/qpid/dispatch/dispatch.h index 93aa7f2cc..876be1abf 100644 --- a/include/qpid/dispatch/dispatch.h +++ b/include/qpid/dispatch/dispatch.h @@ -22,6 +22,7 @@ #include "qpid/dispatch/error.h" #include +#include /**@file * Configure and prepare a dispatch instance. @@ -33,6 +34,7 @@ typedef struct qd_dispatch_t qd_dispatch_t; typedef struct qd_connection_manager_t qd_connection_manager_t; typedef struct qd_policy_t qd_policy_t; +typedef struct qd_server_t qd_server_t; /** * Initialize the Dispatch library and prepare it for operation. @@ -83,6 +85,15 @@ qd_connection_manager_t *qd_dispatch_connection_manager(const qd_dispatch_t *qd) */ qd_policy_t *qd_dispatch_get_policy(const qd_dispatch_t *dispatch); +/** + * Return the configured inter-router data connection count + */ +uint32_t qd_dispatch_get_data_connection_count(const qd_dispatch_t *dispatch); + +/** + * Return the routers server + */ +qd_server_t *qd_dispatch_get_server(const qd_dispatch_t *dispatch); /** * @} diff --git a/src/adaptors/amqp/amqp_adaptor.c b/src/adaptors/amqp/amqp_adaptor.c index f3cb032a6..9cfe1d517 100644 --- a/src/adaptors/amqp/amqp_adaptor.c +++ b/src/adaptors/amqp/amqp_adaptor.c @@ -1402,8 +1402,10 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool const char *host = 0; char host_local[255]; const qd_server_config_t *config; - if (qd_connection_connector(conn)) { - config = qd_connector_config(qd_connection_connector(conn)); + qd_connector_t *connector = qd_connection_connector(conn); + + if (connector) { + config = qd_connector_get_config(connector); snprintf(host_local, 254, "%s", config->host_port); host = &host_local[0]; } @@ -1414,8 +1416,13 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool qd_router_connection_get_config(conn, &role, &cost, &name, &conn->strip_annotations_in, &conn->strip_annotations_out, &link_capacity); - if (conn->connector && conn->connector->config.has_data_connectors) { - memcpy(conn->group_correlator, conn->connector->group_correlator, QD_DISCRIMINATOR_SIZE); + if (connector && !!connector->ctor_config->data_connection_count) { + memcpy(conn->group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE); + if (connector->is_data_connector) { + // override the configured role to identify this as a data connection + assert(role == QDR_ROLE_INTER_ROUTER); + role = QDR_ROLE_INTER_ROUTER_DATA; + } } // check offered capabilities for streaming link support and connection trunking support @@ -1524,8 +1531,8 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool } else if ((key.size == strlen(QD_CONNECTION_PROPERTY_ACCESS_ID) && strncmp(key.start, QD_CONNECTION_PROPERTY_ACCESS_ID, key.size) == 0)) { if (!pn_data_next(props)) break; - if (!!conn->connector && !!conn->connector->vflow_record && pn_data_type(props) == PN_STRING) { - vflow_set_ref_from_pn(conn->connector->vflow_record, VFLOW_ATTRIBUTE_PEER, props); + if (!!connector && !!connector->vflow_record && pn_data_type(props) == PN_STRING) { + vflow_set_ref_from_pn(connector->vflow_record, VFLOW_ATTRIBUTE_PEER, props); } } else { @@ -1553,7 +1560,7 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool authenticated, conn->opened, (char*) mech, - conn->connector ? QD_OUTGOING : QD_INCOMING, + connector ? QD_OUTGOING : QD_INCOMING, host, proto, cipher, @@ -1592,17 +1599,14 @@ static void AMQP_opened_handler(qd_router_t *router, qd_connection_t *conn, bool qd_listener_add_link(conn->listener); } - if (!!conn->connector) { - qd_connector_add_link(conn->connector); - } - - if (conn->connector) { - sys_mutex_lock(&conn->connector->lock); - qd_format_string(conn->connector->conn_msg, QD_CXTR_CONN_MSG_BUF_SIZE, + if (!!connector) { + qd_connector_add_link(connector); + sys_mutex_lock(&connector->lock); + qd_format_string(connector->conn_msg, QD_CTOR_CONN_MSG_BUF_SIZE, "[C%"PRIu64"] Connection Opened: dir=%s host=%s encrypted=%s auth=%s user=%s container_id=%s", connection_id, inbound ? "in" : "out", host, encrypted ? proto : "no", authenticated ? mech : "no", (char*) user, container); - sys_mutex_unlock(&conn->connector->lock); + sys_mutex_unlock(&connector->lock); } free(proto); diff --git a/src/adaptors/amqp/connection_manager.c b/src/adaptors/amqp/connection_manager.c index 7b83209fe..6f56cfeda 100644 --- a/src/adaptors/amqp/connection_manager.c +++ b/src/adaptors/amqp/connection_manager.c @@ -40,13 +40,11 @@ #include #include -#define CHECK() if (qd_error_code()) goto error struct qd_connection_manager_t { qd_server_t *server; qd_listener_list_t listeners; - qd_connector_list_t connectors; - qd_connector_list_t data_connectors; + qd_connector_config_list_t connector_configs; }; @@ -63,7 +61,7 @@ QD_EXPORT qd_listener_t *qd_dispatch_configure_listener(qd_dispatch_t *qd, qd_en { qd_connection_manager_t *cm = qd->connection_manager; qd_listener_t *li = qd_listener(qd->server); - if (!li || qd_server_config_load(qd, &li->config, entity, true, 0) != QD_ERROR_NONE) { + if (!li || qd_server_config_load(&li->config, entity, true) != QD_ERROR_NONE) { qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create listener: %s", qd_error_message()); qd_listener_decref(li); return 0; @@ -168,49 +166,53 @@ static int get_failover_info_length(qd_failover_item_list_t conn_info_list) */ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl) { - qd_connector_t *connector = (qd_connector_t*) impl; + qd_connector_config_t *ctor_config = (qd_connector_config_t *) impl; + qd_connector_t *connector = 0; - int i = 1; - int num_items = 0; + qd_error_clear(); - sys_mutex_lock(&connector->lock); + // TODO(kgiusti): inter-router connections may have several qd_connector_ts active due to the router data connection + // count configuration. However we can only report 1 connector via management. It would be more accurate to report + // all connectors associated with this management entity + sys_mutex_lock(&ctor_config->lock); + connector = DEQ_HEAD(ctor_config->connectors); + if (connector) { + // prevent I/O thread from freeing connector while it is being accessed + sys_atomic_inc(&connector->ref_count); + } + sys_mutex_unlock(&ctor_config->lock); - int conn_index = connector->conn_index; - qd_failover_item_list_t conn_info_list = connector->conn_info_list; + if (connector) { + int i = 1; + int num_items = 0; - int conn_info_len = DEQ_SIZE(conn_info_list); + sys_mutex_lock(&connector->lock); - qd_failover_item_t *item = DEQ_HEAD(conn_info_list); + int conn_index = connector->conn_index; + qd_failover_item_list_t conn_info_list = connector->conn_info_list; - int arr_length = get_failover_info_length(conn_info_list); + int conn_info_len = DEQ_SIZE(conn_info_list); - // This is the string that will contain the comma separated failover list - char *failover_info = qd_calloc(arr_length + 1, sizeof(char)); - while(item) { + qd_failover_item_t *item = DEQ_HEAD(conn_info_list); - // Break out of the loop when we have hit all items in the list. - if (num_items >= conn_info_len) - break; + int arr_length = get_failover_info_length(conn_info_list); - if (num_items >= 1) { - strcat(failover_info, ", "); - } + // This is the string that will contain the comma separated failover list + char *failover_info = qd_calloc(arr_length + 1, sizeof(char)); + while (item) { - // We need to go to the elements in the list to get to the - // element that matches the connection index. This is the first - // url that the router will try to connect on failover. - if (conn_index == i) { - num_items += 1; - if (item->scheme) { - strcat(failover_info, item->scheme); - strcat(failover_info, "://"); - } - if (item->host_port) { - strcat(failover_info, item->host_port); + // Break out of the loop when we have hit all items in the list. + if (num_items >= conn_info_len) + break; + + if (num_items >= 1) { + strcat(failover_info, ", "); } - } - else { - if (num_items > 0) { + + // We need to go to the elements in the list to get to the + // element that matches the connection index. This is the first + // url that the router will try to connect on failover. + if (conn_index == i) { num_items += 1; if (item->scheme) { strcat(failover_info, item->scheme); @@ -220,191 +222,78 @@ QD_EXPORT qd_error_t qd_entity_refresh_connector(qd_entity_t* entity, void *impl strcat(failover_info, item->host_port); } } - } + else { + if (num_items > 0) { + num_items += 1; + if (item->scheme) { + strcat(failover_info, item->scheme); + strcat(failover_info, "://"); + } + if (item->host_port) { + strcat(failover_info, item->host_port); + } + } + } - i += 1; + i += 1; - item = DEQ_NEXT(item); - if (item == 0) - item = DEQ_HEAD(conn_info_list); - } + item = DEQ_NEXT(item); + if (item == 0) + item = DEQ_HEAD(conn_info_list); + } - const char *state_info = 0; - switch (connector->state) { - case CXTR_STATE_CONNECTING: - state_info = "CONNECTING"; - break; - case CXTR_STATE_OPEN: - state_info = "SUCCESS"; - break; - case CXTR_STATE_FAILED: - state_info = "FAILED"; - break; - case CXTR_STATE_INIT: - state_info = "INITIALIZING"; - break; - case CXTR_STATE_DELETED: - // deleted by management, waiting for connection to close - state_info = "CLOSING"; - break; - default: - state_info = "UNKNOWN"; - break; - } + const char *state_info = 0; + switch (connector->state) { + case CTOR_STATE_CONNECTING: + state_info = "CONNECTING"; + break; + case CTOR_STATE_OPEN: + state_info = "SUCCESS"; + break; + case CTOR_STATE_FAILED: + state_info = "FAILED"; + break; + case CTOR_STATE_INIT: + state_info = "INITIALIZING"; + break; + case CTOR_STATE_DELETED: + // deleted by management, waiting for connection to close + state_info = "CLOSING"; + break; + default: + state_info = "UNKNOWN"; + break; + } - if (qd_entity_set_string(entity, "failoverUrls", failover_info) == 0 - && qd_entity_set_string(entity, "connectionStatus", state_info) == 0 - && qd_entity_set_string(entity, "connectionMsg", connector->conn_msg) == 0) { + // stop updating entity on first failure to capture the error code + if (qd_entity_set_string(entity, "failoverUrls", failover_info) == 0 + && qd_entity_set_string(entity, "connectionStatus", state_info) == 0 + && qd_entity_set_string(entity, "connectionMsg", connector->conn_msg) == 0) { + // error code not set - nothing to do + } sys_mutex_unlock(&connector->lock); + qd_connector_decref(connector); // release local reference free(failover_info); - return QD_ERROR_NONE; + } else { + qd_error(QD_ERROR_NOT_FOUND, "No active connector present"); } - sys_mutex_unlock(&connector->lock); - free(failover_info); return qd_error_code(); } -QD_EXPORT qd_connector_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity) +QD_EXPORT qd_connector_config_t *qd_dispatch_configure_connector(qd_dispatch_t *qd, qd_entity_t *entity) { - qd_connection_manager_t *cm = qd->connection_manager; - qd_connector_t *ct = qd_server_connector(qd->server); - qd_connector_list_t data_connectors = DEQ_EMPTY; - - qd_error_clear(); - - if (!ct) { - char *name = qd_entity_opt_string(entity, "name", "UNKNOWN"); - qd_error(QD_ERROR_CONFIG, "Failed to create Connector %s: resource allocation failed", name); - free(name); + qd_connection_manager_t *cm = qd->connection_manager; + qd_connector_config_t *ctor_config = qd_connector_config_create(qd, entity); + if (!ctor_config) { return 0; } - if (qd_server_config_load(qd, &ct->config, entity, false, 0) == QD_ERROR_NONE) { - ct->policy_vhost = qd_entity_opt_string(entity, "policyVhost", 0); CHECK(); - - // - // If an sslProfile is configured allocate a TLS config for this connector's connections - // - if (ct->config.ssl_profile_name) { - ct->tls_config = qd_tls_config(ct->config.ssl_profile_name, - QD_TLS_TYPE_PROTON_AMQP, - QD_TLS_CONFIG_CLIENT_MODE, - ct->config.verify_host_name, - ct->config.ssl_require_peer_authentication); - if (!ct->tls_config) { - // qd_tls2_config() has set the qd_error_message(), which is logged below - goto error; - } - } - - // - // If this connection has a data-connection-group, set up the group members now - // - if (ct->config.has_data_connectors) { - qd_generate_discriminator(ct->group_correlator); - for (int i = 0; i < qd->data_connection_count; i++) { - qd_connector_t *dc = qd_server_connector(qd->server); - if (!dc) { - qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", ct->config.name); - goto error; - } - - if (qd_server_config_load(qd, &dc->config, entity, false, "inter-router-data") != QD_ERROR_NONE) { - // qd_server_config_load will set qd_error() - qd_connector_decref(dc); - goto error; - } - - if (ct->tls_config) { - dc->tls_config = qd_tls_config(ct->config.ssl_profile_name, - QD_TLS_TYPE_PROTON_AMQP, - QD_TLS_CONFIG_CLIENT_MODE, - ct->config.verify_host_name, - ct->config.ssl_require_peer_authentication); - if (!dc->tls_config) { - // qd_tls2_config() has set the qd_error_message(), which is logged below - qd_connector_decref(dc); - goto error; - } - } - - strncpy(dc->group_correlator, ct->group_correlator, QD_DISCRIMINATOR_SIZE); - dc->is_data_connector = true; - qd_failover_item_t *item = NEW(qd_failover_item_t); - ZERO(item); - if (dc->config.ssl_required) - item->scheme = strdup("amqps"); - else - item->scheme = strdup("amqp"); - item->host = strdup(dc->config.host); - item->port = strdup(dc->config.port); - int hplen = strlen(item->host) + strlen(item->port) + 2; - item->host_port = malloc(hplen); - snprintf(item->host_port, hplen, "%s:%s", item->host , item->port); - DEQ_INSERT_TAIL(dc->conn_info_list, item); - - DEQ_INSERT_TAIL(data_connectors, dc); - } - } - - // - // Add the first item to the ct->conn_info_list - // The initial connection information and any backup connection information is stored in the conn_info_list - // - qd_failover_item_t *item = NEW(qd_failover_item_t); - ZERO(item); - if (ct->config.ssl_required) - item->scheme = strdup("amqps"); - else - item->scheme = strdup("amqp"); - - item->host = strdup(ct->config.host); - item->port = strdup(ct->config.port); - - int hplen = strlen(item->host) + strlen(item->port) + 2; - item->host_port = malloc(hplen); - snprintf(item->host_port, hplen, "%s:%s", item->host , item->port); - DEQ_INSERT_TAIL(ct->conn_info_list, item); - - // - // Set up the vanflow record for this connector (LINK) - // Do this only for router-to-router connectors since the record represents an inter-router link - // - if (strcmp(ct->config.role, "inter-router") == 0 || - strcmp(ct->config.role, "edge") == 0 || - strcmp(ct->config.role, "inter-edge") == 0) { - ct->vflow_record = vflow_start_record(VFLOW_RECORD_LINK, 0); - vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_NAME, ct->config.name); - vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_ROLE, ct->config.role); - vflow_set_uint64(ct->vflow_record, VFLOW_ATTRIBUTE_LINK_COST, ct->config.inter_router_cost); - vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down"); - vflow_set_uint64(ct->vflow_record, VFLOW_ATTRIBUTE_DOWN_COUNT, 0); - vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_PROTOCOL, item->scheme); - vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_HOST, item->host); - vflow_set_string(ct->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_PORT, item->port); - vflow_set_uint64(ct->vflow_record, VFLOW_ATTRIBUTE_OCTETS, 0); - vflow_set_uint64(ct->vflow_record, VFLOW_ATTRIBUTE_OCTETS_REVERSE, 0); - } - - DEQ_APPEND(cm->data_connectors, data_connectors); - DEQ_INSERT_TAIL(cm->connectors, ct); - log_config(&ct->config, "Connector", true); - return ct; - } - - error: - qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message()); - for (qd_connector_t *dc = DEQ_HEAD(data_connectors); dc; dc = DEQ_HEAD(data_connectors)) { - DEQ_REMOVE_HEAD(data_connectors); - dc->state = CXTR_STATE_DELETED; - qd_connector_decref(dc); - } - ct->state = CXTR_STATE_DELETED; - qd_connector_decref(ct); - return 0; + DEQ_INSERT_TAIL(cm->connector_configs, ctor_config); + log_config(&ctor_config->config, "Connector", true); + return ctor_config; } @@ -416,8 +305,7 @@ qd_connection_manager_t *qd_connection_manager(qd_dispatch_t *qd) cm->server = qd->server; DEQ_INIT(cm->listeners); - DEQ_INIT(cm->connectors); - DEQ_INIT(cm->data_connectors); + DEQ_INIT(cm->connector_configs); return cm; } @@ -445,25 +333,11 @@ void qd_connection_manager_free(qd_connection_manager_t *cm) li = DEQ_HEAD(cm->listeners); } - qd_connector_list_t to_free; - DEQ_MOVE(cm->connectors, to_free); - DEQ_APPEND(to_free, cm->data_connectors); - - qd_connector_t *connector = DEQ_HEAD(to_free); - while (connector) { - DEQ_REMOVE_HEAD(to_free); - sys_mutex_lock(&connector->lock); - // setting DELETED below ensures the timer callback - // will not initiate a re-connect once we drop - // the lock - connector->state = CXTR_STATE_DELETED; - sys_mutex_unlock(&connector->lock); - // cannot cancel timer while holding lock since the - // callback takes the lock - qd_timer_cancel(connector->timer); - qd_connector_decref(connector); - - connector = DEQ_HEAD(to_free); + qd_connector_config_t *ctor_config = DEQ_HEAD(cm->connector_configs); + while (ctor_config) { + DEQ_REMOVE_HEAD(cm->connector_configs); + qd_connector_config_delete(ctor_config); + ctor_config = DEQ_HEAD(cm->connector_configs); } free(cm); @@ -477,8 +351,7 @@ QD_EXPORT void qd_connection_manager_start(qd_dispatch_t *qd) { static bool first_start = true; qd_listener_t *li = DEQ_HEAD(qd->connection_manager->listeners); - qd_connector_t *ct = DEQ_HEAD(qd->connection_manager->connectors); - qd_connector_t *dc = DEQ_HEAD(qd->connection_manager->data_connectors); + qd_connector_config_t *ctor_config = DEQ_HEAD(qd->connection_manager->connector_configs); while (li) { if (!li->pn_listener) { @@ -493,24 +366,9 @@ QD_EXPORT void qd_connection_manager_start(qd_dispatch_t *qd) li = DEQ_NEXT(li); } - while (ct) { - if (ct->state == CXTR_STATE_OPEN || ct->state == CXTR_STATE_CONNECTING) { - ct = DEQ_NEXT(ct); - continue; - } - - qd_connector_connect(ct); - ct = DEQ_NEXT(ct); - } - - while (dc) { - if (dc->state == CXTR_STATE_OPEN || dc->state == CXTR_STATE_CONNECTING) { - dc = DEQ_NEXT(dc); - continue; - } - - qd_connector_connect(dc); - dc = DEQ_NEXT(dc); + while (ctor_config) { + qd_connector_config_connect(ctor_config); + ctor_config = DEQ_NEXT(ctor_config); } first_start = false; @@ -536,66 +394,24 @@ QD_EXPORT void qd_connection_manager_delete_listener(qd_dispatch_t *qd, void *im } -static void deferred_close(void *context, bool discard) { - if (!discard) { - pn_connection_close((pn_connection_t*)context); - } -} - - // threading: called by management thread while I/O thread may be // referencing the qd_connector_t via the qd_connection_t // QD_EXPORT void qd_connection_manager_delete_connector(qd_dispatch_t *qd, void *impl) { - qd_connector_t *ct = (qd_connector_t*) impl; - if (ct) { - // cannot free the timer while holding ct->lock since the - // timer callback may be running during the call to qd_timer_free - qd_timer_t *timer = 0; - bool has_data_connectors = ct->config.has_data_connectors; - void *dct = qd_connection_new_qd_deferred_call_t(); - sys_mutex_lock(&ct->lock); - timer = ct->timer; - ct->timer = 0; - ct->state = CXTR_STATE_DELETED; - qd_connection_t *conn = ct->qd_conn; - if (conn && conn->pn_conn) { - qd_connection_invoke_deferred_impl(conn, deferred_close, conn->pn_conn, dct); - sys_mutex_unlock(&ct->lock); - } else { - sys_mutex_unlock(&ct->lock); - qd_connection_free_qd_deferred_call_t(dct); - } - qd_timer_free(timer); - if (ct->is_data_connector) { - DEQ_REMOVE(qd->connection_manager->data_connectors, ct); - } else { - log_config(&ct->config, "Connector", false); - DEQ_REMOVE(qd->connection_manager->connectors, ct); - } + qd_connector_config_t *ctor_config = (qd_connector_config_t *) impl; + assert(ctor_config); - // - // Remove correlated data connectors - // - if (has_data_connectors) { - qd_connector_t *dc = DEQ_HEAD(qd->connection_manager->data_connectors); - while (!!dc) { - qd_connector_t *next = DEQ_NEXT(dc); - if (strncmp(dc->group_correlator, ct->group_correlator, QD_DISCRIMINATOR_SIZE) == 0) { - qd_connection_manager_delete_connector(qd, (void*) dc); - } - dc = next; - } - } + // take it off the connection manager - qd_connector_decref(ct); - } + log_config(&ctor_config->config, "Connector", false); + DEQ_REMOVE(qd->connection_manager->connector_configs, ctor_config); + qd_connector_config_delete(ctor_config); } const char *qd_connector_name(qd_connector_t *ct) { - return ct ? ct->config.name : 0; + return ct ? ct->ctor_config->config.name : 0; } diff --git a/src/adaptors/amqp/qd_connection.c b/src/adaptors/amqp/qd_connection.c index 2e68e0459..fe56a6aec 100644 --- a/src/adaptors/amqp/qd_connection.c +++ b/src/adaptors/amqp/qd_connection.c @@ -150,7 +150,7 @@ static void decorate_connection(qd_connection_t *ctx, const qd_server_config_t * pn_data_put_int(pn_connection_properties(conn), QDR_ROLE_INTER_ROUTER_DATA); } - if (ctx->connector && (ctx->connector->is_data_connector || ctx->connector->config.has_data_connectors)) { + if (ctx->connector && (ctx->connector->is_data_connector || !!ctx->connector->ctor_config->data_connection_count)) { pn_data_put_symbol(pn_connection_properties(conn), pn_bytes(strlen(QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY), QD_CONNECTION_PROPERTY_GROUP_CORRELATOR_KEY)); pn_data_put_string(pn_connection_properties(conn), @@ -242,7 +242,7 @@ static void decorate_connection(qd_connection_t *ctx, const qd_server_config_t * * Does not allocate any managed objects and therefore * does not take ENTITY_CACHE lock. */ -void qd_connection_init(qd_connection_t *ctx, qd_server_t *server, qd_server_config_t *config, qd_connector_t *connector, qd_listener_t *listener) +void qd_connection_init(qd_connection_t *ctx, qd_server_t *server, const qd_server_config_t *config, qd_connector_t *connector, qd_listener_t *listener) { ctx->pn_conn = pn_connection(); assert(ctx->pn_conn); @@ -464,7 +464,7 @@ const qd_server_config_t *qd_connection_config(const qd_connection_t *conn) if (conn->listener) return &conn->listener->config; if (conn->connector) - return &conn->connector->config; + return &conn->connector->ctor_config->config; return NULL; } @@ -533,7 +533,7 @@ void qd_connection_invoke_deferred_calls(qd_connection_t *conn, bool discard) const char* qd_connection_name(const qd_connection_t *c) { if (c->connector) { - return c->connector->config.host_port; + return c->connector->ctor_config->config.host_port; } else { return c->rhost_port; } @@ -595,14 +595,14 @@ static void set_rhost_port(qd_connection_t *ctx) { static bool setup_ssl_sasl_and_open(qd_connection_t *ctx) { qd_connector_t *ct = ctx->connector; - const qd_server_config_t *config = &ct->config; + const qd_server_config_t *config = &ct->ctor_config->config; pn_transport_t *tport = pn_connection_transport(ctx->pn_conn); // // Create an SSL session if required // - if (ct->tls_config) { - ctx->ssl = qd_tls_session_amqp(ct->tls_config, tport, false); + if (ct->ctor_config->tls_config) { + ctx->ssl = qd_tls_session_amqp(ct->ctor_config->tls_config, tport, false); if (!ctx->ssl) { qd_log(LOG_SERVER, QD_LOG_ERROR, "Failed to create TLS session for connection [C%" PRIu64 "] to %s:%s (%s)", @@ -682,7 +682,7 @@ static void on_connection_bound(qd_server_t *server, pn_event_t *e) { qd_log(LOG_SERVER, QD_LOG_INFO, "[C%" PRIu64 "] Accepted connection to %s from %s", ctx->connection_id, name, ctx->rhost_port); } else if (ctx->connector) { /* Establishing an outgoing connection */ - config = &ctx->connector->config; + config = &ctx->connector->ctor_config->config; if (!setup_ssl_sasl_and_open(ctx)) { qd_log(LOG_SERVER, QD_LOG_ERROR, "[C%" PRIu64 "] Connection aborted due to internal setup error", ctx->connection_id); diff --git a/src/adaptors/amqp/qd_connection.h b/src/adaptors/amqp/qd_connection.h index 658c633e3..c74040799 100644 --- a/src/adaptors/amqp/qd_connection.h +++ b/src/adaptors/amqp/qd_connection.h @@ -118,7 +118,7 @@ struct qd_connection_t { ALLOC_DECLARE_SAFE(qd_connection_t); // initialize a newly allocated qd_connection_t -void qd_connection_init(qd_connection_t *qd_conn, qd_server_t *server, qd_server_config_t *config, +void qd_connection_init(qd_connection_t *qd_conn, qd_server_t *server, const qd_server_config_t *config, qd_connector_t *connector, qd_listener_t *listener); qd_connector_t* qd_connection_connector(const qd_connection_t *c); diff --git a/src/adaptors/amqp/qd_connector.c b/src/adaptors/amqp/qd_connector.c index a033472c6..be39cdb78 100644 --- a/src/adaptors/amqp/qd_connector.c +++ b/src/adaptors/amqp/qd_connector.c @@ -20,11 +20,13 @@ #include "qd_connector.h" #include "qd_connection.h" #include "private.h" +#include "entity.h" #include "qpid/dispatch/alloc_pool.h" #include "qpid/dispatch/timer.h" #include "qpid/dispatch/vanflow.h" #include "qpid/dispatch/tls_amqp.h" +#include "qpid/dispatch/dispatch.h" #include @@ -32,6 +34,7 @@ ALLOC_DEFINE(qd_connector_t); +ALLOC_DEFINE(qd_connector_config_t); static qd_failover_item_t *qd_connector_get_conn_info_lh(qd_connector_t *ct) TA_REQ(ct->lock) @@ -50,10 +53,12 @@ static qd_failover_item_t *qd_connector_get_conn_info_lh(qd_connector_t *ct) TA_ /* Timer callback to try/retry connection open, connector->lock held */ static void try_open_lh(qd_connector_t *connector, qd_connection_t *qd_conn) TA_REQ(connector->lock) { - assert(connector->state != CXTR_STATE_DELETED); - qd_connection_init(qd_conn, connector->server, &connector->config, connector, 0); + assert(connector->state != CTOR_STATE_DELETED); - connector->state = CXTR_STATE_OPEN; + const qd_connector_config_t *ctor_config = connector->ctor_config; + qd_connection_init(qd_conn, ctor_config->server, &ctor_config->config, connector, 0); + + connector->state = CTOR_STATE_OPEN; connector->delay = 5000; // @@ -70,7 +75,7 @@ static void try_open_lh(qd_connector_t *connector, qd_connection_t *qd_conn) TA_ // Set the sasl user name and password on the proton connection object. This has to be // done before pn_proactor_connect which will bind a transport to the connection - const qd_server_config_t *config = &connector->config; + const qd_server_config_t *config = &connector->ctor_config->config; if(config->sasl_username) pn_connection_set_user(qd_conn->pn_conn, config->sasl_username); if (config->sasl_password) @@ -78,7 +83,7 @@ static void try_open_lh(qd_connector_t *connector, qd_connection_t *qd_conn) TA_ qd_log(LOG_SERVER, QD_LOG_DEBUG, "[C%" PRIu64 "] Connecting to %s", qd_conn->connection_id, host_port); /* Note: the transport is configured in the PN_CONNECTION_BOUND event */ - pn_proactor_connect(qd_server_proactor(connector->server), qd_conn->pn_conn, host_port); + pn_proactor_connect(qd_server_proactor(connector->ctor_config->server), qd_conn->pn_conn, host_port); // at this point the qd_conn may now be scheduled on another thread } @@ -96,9 +101,9 @@ static void try_open_cb(void *context) sys_mutex_lock(&ct->lock); - if (ct->state == CXTR_STATE_CONNECTING || ct->state == CXTR_STATE_INIT) { + if (ct->state == CTOR_STATE_CONNECTING || ct->state == CTOR_STATE_INIT) { // else deleted or failed - on failed wait until after connection is freed - // and state is set to CXTR_STATE_CONNECTING (timer is rescheduled then) + // and state is set to CTOR_STATE_CONNECTING (timer is rescheduled then) try_open_lh(ct, ctx); ctx = 0; // owned by ct } @@ -109,54 +114,96 @@ static void try_open_cb(void *context) } -const qd_server_config_t *qd_connector_config(const qd_connector_t *c) +/** Close the proton connection + * + * Scheduled on the target connections thread + */ +static void deferred_close(void *context, bool discard) +{ + if (!discard) { + pn_connection_close((pn_connection_t*)context); + } +} + + +const qd_server_config_t *qd_connector_get_config(const qd_connector_t *c) { - return &c->config; + return &c->ctor_config->config; } -qd_connector_t *qd_server_connector(qd_server_t *server) +qd_connector_t *qd_connector(qd_connector_config_t *ctor_config, bool is_data_connector) { qd_connector_t *connector = new_qd_connector_t(); if (!connector) return 0; + ZERO(connector); - sys_atomic_init(&connector->ref_count, 1); + sys_atomic_init(&connector->ref_count, 1); // for caller DEQ_INIT(connector->conn_info_list); DEQ_ITEM_INIT(connector); sys_mutex_init(&connector->lock); - connector->timer = qd_timer(amqp_adaptor.dispatch, try_open_cb, connector); - if (!connector->timer) - goto error; + connector->timer = qd_timer(amqp_adaptor.dispatch, try_open_cb, connector); + connector->reconnect_enabled = true; + connector->is_data_connector = is_data_connector; + + connector->ctor_config = ctor_config; + sys_atomic_inc(&ctor_config->ref_count); - connector->server = server; connector->conn_index = 1; - connector->state = CXTR_STATE_INIT; + connector->state = CTOR_STATE_INIT; - return connector; + qd_failover_item_t *item = NEW(qd_failover_item_t); + ZERO(item); + if (ctor_config->config.ssl_required) + item->scheme = strdup("amqps"); + else + item->scheme = strdup("amqp"); + item->host = strdup(ctor_config->config.host); + item->port = strdup(ctor_config->config.port); + int hplen = strlen(item->host) + strlen(item->port) + 2; + item->host_port = malloc(hplen); + snprintf(item->host_port, hplen, "%s:%s", item->host , item->port); + DEQ_INSERT_TAIL(connector->conn_info_list, item); -error: - connector->state = CXTR_STATE_DELETED; - qd_connector_decref(connector); - return 0; + // + // Set up the vanflow record for this connector (LINK) + // Do this only for router-to-router connectors since the record represents an inter-router link + // + if ((strcmp(ctor_config->config.role, "inter-router") == 0 && !is_data_connector) || + strcmp(ctor_config->config.role, "edge") == 0 || + strcmp(ctor_config->config.role, "inter-edge") == 0) { + connector->vflow_record = vflow_start_record(VFLOW_RECORD_LINK, 0); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_NAME, ctor_config->config.name); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_ROLE, ctor_config->config.role); + vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_LINK_COST, ctor_config->config.inter_router_cost); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_OPER_STATUS, "down"); + vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_DOWN_COUNT, 0); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_PROTOCOL, item->scheme); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_HOST, item->host); + vflow_set_string(connector->vflow_record, VFLOW_ATTRIBUTE_DESTINATION_PORT, item->port); + vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_OCTETS, 0); + vflow_set_uint64(connector->vflow_record, VFLOW_ATTRIBUTE_OCTETS_REVERSE, 0); + } + return connector; } const char *qd_connector_policy_vhost(const qd_connector_t* ct) { - return ct->policy_vhost; + return ct->ctor_config->policy_vhost; } bool qd_connector_connect(qd_connector_t *ct) { sys_mutex_lock(&ct->lock); - if (ct->state != CXTR_STATE_DELETED) { + if (ct->state != CTOR_STATE_DELETED) { // expect: do not attempt to connect an already connected qd_connection assert(ct->qd_conn == 0); ct->qd_conn = 0; ct->delay = 0; - ct->state = CXTR_STATE_CONNECTING; + ct->state = CTOR_STATE_CONNECTING; qd_timer_schedule(ct->timer, ct->delay); sys_mutex_unlock(&ct->lock); return true; @@ -166,20 +213,47 @@ bool qd_connector_connect(qd_connector_t *ct) } +// Teardown the connection associated with the connector and +// prepare the connector for deletion +// +void qd_connector_close(qd_connector_t *ct) +{ + // cannot free the timer while holding ct->lock since the + // timer callback may be running during the call to qd_timer_free + qd_timer_t *timer = 0; + void *dct = qd_connection_new_qd_deferred_call_t(); + + sys_mutex_lock(&ct->lock); + timer = ct->timer; + ct->timer = 0; + ct->state = CTOR_STATE_DELETED; + qd_connection_t *conn = ct->qd_conn; + if (conn && conn->pn_conn) { + qd_connection_invoke_deferred_impl(conn, deferred_close, conn->pn_conn, dct); + sys_mutex_unlock(&ct->lock); + } else { + sys_mutex_unlock(&ct->lock); + qd_connection_free_qd_deferred_call_t(dct); + } + qd_timer_free(timer); +} + + void qd_connector_decref(qd_connector_t* connector) { if (!connector) return; if (sys_atomic_dec(&connector->ref_count) == 1) { // expect both mgmt and qd_connection no longer reference this - assert(connector->state == CXTR_STATE_DELETED); + assert(connector->state == CTOR_STATE_DELETED); assert(connector->qd_conn == 0); + qd_connector_config_decref(connector->ctor_config); vflow_end_record(connector->vflow_record); connector->vflow_record = 0; - qd_server_config_free(&connector->config); qd_timer_free(connector->timer); sys_mutex_free(&connector->lock); + sys_atomic_destroy(&connector->ref_count); qd_failover_item_t *item = DEQ_HEAD(connector->conn_info_list); while (item) { @@ -192,9 +266,6 @@ void qd_connector_decref(qd_connector_t* connector) free(item); item = DEQ_HEAD(connector->conn_info_list); } - if (connector->policy_vhost) free(connector->policy_vhost); - qd_tls_config_decref(connector->tls_config); - free_qd_connector_t(connector); } } @@ -229,16 +300,16 @@ static void increment_conn_index_lh(qd_connector_t *connector) TA_REQ(connector- */ void qd_connector_handle_transport_error(qd_connector_t *connector, uint64_t connection_id, pn_condition_t *condition) { - const qd_server_config_t *config = &connector->config; - char conn_msg[QD_CXTR_CONN_MSG_BUF_SIZE]; // avoid holding connector lock when logging - char conn_msg_1[QD_CXTR_CONN_MSG_BUF_SIZE]; // this connection message does not contain the connection id + const qd_server_config_t *config = &connector->ctor_config->config; + char conn_msg[QD_CTOR_CONN_MSG_BUF_SIZE]; // avoid holding connector lock when logging + char conn_msg_1[QD_CTOR_CONN_MSG_BUF_SIZE]; // this connection message does not contain the connection id bool log_error_message = false; sys_mutex_lock(&connector->lock); - if (connector->state != CXTR_STATE_DELETED) { + if (connector->state != CTOR_STATE_DELETED) { increment_conn_index_lh(connector); // note: will transition back to STATE_CONNECTING when associated connection is freed (pn_connection_free) - connector->state = CXTR_STATE_FAILED; + connector->state = CTOR_STATE_FAILED; if (condition && pn_condition_is_set(condition)) { qd_format_string(conn_msg, sizeof(conn_msg), "[C%"PRIu64"] Connection to %s failed: %s %s", connection_id, config->host_port, pn_condition_get_name(condition), @@ -262,7 +333,7 @@ void qd_connector_handle_transport_error(qd_connector_t *connector, uint64_t con // if (strcmp(connector->conn_msg, conn_msg_1) != 0) { - strncpy(connector->conn_msg, conn_msg_1, QD_CXTR_CONN_MSG_BUF_SIZE); + strncpy(connector->conn_msg, conn_msg_1, QD_CTOR_CONN_MSG_BUF_SIZE); log_error_message = true; } } @@ -299,7 +370,7 @@ void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx ctx->connector = connector; connector->qd_conn = ctx; - strncpy(ctx->group_correlator, connector->group_correlator, QD_DISCRIMINATOR_SIZE); + strncpy(ctx->group_correlator, connector->ctor_config->group_correlator, QD_DISCRIMINATOR_SIZE); } @@ -333,7 +404,7 @@ void qd_connector_remove_connection(qd_connector_t *connector, bool final, const connector->qd_conn = 0; ctx->connector = 0; - if (connector->state != CXTR_STATE_DELETED) { + if (connector->state != CTOR_STATE_DELETED) { // Increment the connection index by so that we can try connecting to the failover url (if any). bool has_failover = qd_connector_has_failover_info(connector); long delay = connector->delay; @@ -344,7 +415,7 @@ void qd_connector_remove_connection(qd_connector_t *connector, bool final, const // We want to quickly keep cycling thru the failover urls every second. delay = 1000; } - connector->state = CXTR_STATE_CONNECTING; + connector->state = CTOR_STATE_CONNECTING; qd_timer_schedule(connector->timer, delay); } sys_mutex_unlock(&connector->lock); @@ -352,3 +423,148 @@ void qd_connector_remove_connection(qd_connector_t *connector, bool final, const // Drop reference held by connection. qd_connector_decref(connector); } + + +/** + * Create a new qd_connector_config_t instance + */ +qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t *entity) +{ + qd_connector_config_t *ctor_config = new_qd_connector_config_t(); + if (!ctor_config) { + char *name = qd_entity_opt_string(entity, "name", "UNKNOWN"); + qd_error(QD_ERROR_CONFIG, "Failed to create Connector %s: resource allocation failed", name); + free(name); + return 0; + } + + qd_error_clear(); + + ZERO(ctor_config); + DEQ_ITEM_INIT(ctor_config); + sys_atomic_init(&ctor_config->ref_count, 1); // for caller + sys_mutex_init(&ctor_config->lock); + ctor_config->server = qd_dispatch_get_server(qd); + DEQ_INIT(ctor_config->connectors); + + if (qd_server_config_load(&ctor_config->config, entity, false) != QD_ERROR_NONE) { + qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message()); + qd_connector_config_decref(ctor_config); + return 0; + } + + ctor_config->policy_vhost = qd_entity_opt_string(entity, "policyVhost", 0); + if (qd_error_code()) { + qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message()); + qd_connector_config_decref(ctor_config); + return 0; + } + + // + // If an sslProfile is configured allocate a TLS config to be used by all child connector's connections + // + if (ctor_config->config.ssl_profile_name) { + ctor_config->tls_config = qd_tls_config(ctor_config->config.ssl_profile_name, + QD_TLS_TYPE_PROTON_AMQP, + QD_TLS_CONFIG_CLIENT_MODE, + ctor_config->config.verify_host_name, + ctor_config->config.ssl_require_peer_authentication); + if (!ctor_config->tls_config) { + // qd_tls2_config() has set the qd_error_message(), which is logged below + goto error; + } + } + + // For inter-router connectors create associated inter-router data connectors if configured + + if (strcmp(ctor_config->config.role, "inter-router") == 0) { + ctor_config->data_connection_count = qd_dispatch_get_data_connection_count(qd); + if (!!ctor_config->data_connection_count) { + qd_generate_discriminator(ctor_config->group_correlator); + + // Add any data connectors to the head of the connectors list first. This allows the + // router control connector to be located at the head of the list. + + for (int i = 0; i < ctor_config->data_connection_count; i++) { + qd_connector_t *dc = qd_connector(ctor_config, true); + if (!dc) { + qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", ctor_config->config.name); + goto error; + } + DEQ_INSERT_HEAD(ctor_config->connectors, dc); + } + } + } + + // Create the primary connector associated with this configuration. It will be located + // at the head of the connectors list + + qd_connector_t *ct = qd_connector(ctor_config, false); + if (!ct) { + qd_error(QD_ERROR_CONFIG, "Failed to create data Connector %s: resource allocation failed", ctor_config->config.name); + goto error; + } + DEQ_INSERT_HEAD(ctor_config->connectors, ct); + + return ctor_config; + + error: + if (qd_error_code()) + qd_log(LOG_CONN_MGR, QD_LOG_ERROR, "Unable to create connector: %s", qd_error_message()); + for (qd_connector_t *dc = DEQ_HEAD(ctor_config->connectors); dc; dc = DEQ_HEAD(ctor_config->connectors)) { + DEQ_REMOVE_HEAD(ctor_config->connectors); + dc->state = CTOR_STATE_DELETED; + qd_connector_decref(dc); + } + qd_connector_config_decref(ctor_config); + return 0; +} + + +void qd_connector_config_delete(qd_connector_config_t *ctor_config) +{ + qd_connector_t *ct = DEQ_HEAD(ctor_config->connectors); + while (ct) { + DEQ_REMOVE_HEAD(ctor_config->connectors); + qd_connector_close(ct); + qd_connector_decref(ct); + ct = DEQ_HEAD(ctor_config->connectors); + } + + // drop ref held by the caller + qd_connector_config_decref(ctor_config); +} + + +void qd_connector_config_decref(qd_connector_config_t *ctor_config) +{ + if (!ctor_config) + return; + + uint32_t rc = sys_atomic_dec(&ctor_config->ref_count); + (void) rc; + assert(rc > 0); // else underflow + + if (rc == 1) { + // Expect: all connectors hold the ref_count so this must be empty + assert(DEQ_IS_EMPTY(ctor_config->connectors)); + sys_mutex_free(&ctor_config->lock); + sys_atomic_destroy(&ctor_config->ref_count); + free(ctor_config->policy_vhost); + qd_tls_config_decref(ctor_config->tls_config); + qd_server_config_free(&ctor_config->config); + free_qd_connector_config_t(ctor_config); + } +} + + +// Initiate connections on all child connectors +void qd_connector_config_connect(qd_connector_config_t *ctor_config) +{ + if (!ctor_config->activated) { + ctor_config->activated = true; + for (qd_connector_t *ct = DEQ_HEAD(ctor_config->connectors); !!ct; ct = DEQ_NEXT(ct)) { + qd_connector_connect(ct); + } + } +} diff --git a/src/adaptors/amqp/qd_connector.h b/src/adaptors/amqp/qd_connector.h index 5ebdca9d2..667fc743b 100644 --- a/src/adaptors/amqp/qd_connector.h +++ b/src/adaptors/amqp/qd_connector.h @@ -34,76 +34,135 @@ typedef struct qd_server_t qd_server_t; typedef struct qd_connection_t qd_connection_t; typedef struct vflow_record_t vflow_record_t; typedef struct qd_tls_config_t qd_tls_config_t; +typedef struct qd_connector_config_t qd_connector_config_t; typedef enum { - CXTR_STATE_INIT = 0, - CXTR_STATE_CONNECTING, - CXTR_STATE_OPEN, - CXTR_STATE_FAILED, - CXTR_STATE_DELETED // by management -} cxtr_state_t; + CTOR_STATE_INIT = 0, + CTOR_STATE_CONNECTING, + CTOR_STATE_OPEN, + CTOR_STATE_FAILED, + CTOR_STATE_DELETED // by management +} connector_state_t; /** - * Connector objects represent the desire to create and maintain an outgoing transport connection. + * A qd_connector_t manages a single outgoing AMQP network connection connection (represented by a qd_connection_t + * instance). It is responsible for re-establishing the network connection should it fail. */ typedef struct qd_connector_t { + + // Sibling connectors sharing the same qd_connector_config_t DEQ_LINKS(struct qd_connector_t); + qd_connector_config_t *ctor_config; - /* Referenced by connection_manager and pn_connection_t */ + /* Referenced by parent qd_connector_config_t and child qd_connection_t */ sys_atomic_t ref_count; - qd_server_t *server; - qd_server_config_t config; qd_timer_t *timer; long delay; - /* Connector state and ctx can be modified by I/O or management threads. */ + /* Connector state and qd_conn can be modified by I/O or management threads. */ sys_mutex_t lock; - cxtr_state_t state; + connector_state_t state; qd_connection_t *qd_conn; vflow_record_t *vflow_record; bool oper_status_down; // set when oper-status transitions to 'down' to avoid repeated error indications. + bool reconnect_enabled; // False: disable reconnect on connection drop + bool is_data_connector; // inter-router conn for streaming messages /* This conn_list contains all the connection information needed to make a connection. It also includes failover connection information */ qd_failover_item_list_t conn_info_list; int conn_index; // Which connection in the connection list to connect to next. - char *policy_vhost; /* Optional policy vhost name */ - qd_tls_config_t *tls_config; - - /* Connection group state */ - bool is_data_connector; - char group_correlator[QD_DISCRIMINATOR_SIZE]; /* holds proton transport error condition text on connection failure */ -#define QD_CXTR_CONN_MSG_BUF_SIZE 300 - char conn_msg[QD_CXTR_CONN_MSG_BUF_SIZE]; +#define QD_CTOR_CONN_MSG_BUF_SIZE 300 + char conn_msg[QD_CTOR_CONN_MSG_BUF_SIZE]; } qd_connector_t; DEQ_DECLARE(qd_connector_t, qd_connector_list_t); -const qd_server_config_t *qd_connector_config(const qd_connector_t *c); /** - * Initiate an outgoing connection. Returns true if successful. + * An qd_connector_config_t instance is created for each "connector" configuration object provisioned on the router. It + * holds the configuration information that is used for outgoing AMQP connections. The qd_connector_config_t instance + * will be used to construct one or more qd_connection_t instances that share that configuration data. + * + * qd_connector_config_t instances are managed by the Connection Manager. */ -bool qd_connector_connect(qd_connector_t *ct); +struct qd_connector_config_t { + DEQ_LINKS(struct qd_connector_config_t); // connection_manager list -// KAG: todo: fixme: -qd_connector_t *qd_server_connector(qd_server_t *server); + /* Referenced by connection_manager and children qd_connector_t */ + sys_atomic_t ref_count; + qd_server_config_t config; + qd_server_t *server; + char *policy_vhost; /* Optional policy vhost name */ + qd_tls_config_t *tls_config; + uint32_t data_connection_count; // # of child inter-router data connections + + // The group correlation id for all child connections + char group_correlator[QD_DISCRIMINATOR_SIZE]; + + bool activated; // T: activated by connection manager + sys_mutex_t lock; // protect connectors list + qd_connector_list_t connectors; +}; -void qd_connector_decref(qd_connector_t* ct); +DEQ_DECLARE(qd_connector_config_t, qd_connector_config_list_t); + +/** Management call to instantiate a qd_connector_config_t from a configuration entity + */ +qd_connector_config_t *qd_connector_config_create(qd_dispatch_t *qd, qd_entity_t *entity); + +/** Management call to delete a qd_connector_config_t + * + * This will close and release all child connector and connections, then decrement the callers reference count to the + * qd_connector_config_t instance. + */ +void qd_connector_config_delete(qd_connector_config_t *ctor_config); + +/** Management call start all child connections for the given configuration instance + */ +void qd_connector_config_connect(qd_connector_config_t *ctor_config); + +/** Drop a reference to the configuration instance. + * This may free the given instance. + */ +void qd_connector_config_decref(qd_connector_config_t *ctor_config); + +/** + * Connector API + */ + +/** + * Create a new connector. + * Call qd_connector_connect() to initiate the outgoing connection + */ +qd_connector_t *qd_connector(qd_connector_config_t *ctor_config, bool is_data_connector); + +/** + * Initiate an outgoing connection. Returns true if successful. + */ +bool qd_connector_connect(qd_connector_t *ctor); + +/** + * Close the associated connection and deactivate the connector + */ +void qd_connector_close(qd_connector_t *ctor); -bool qd_connector_has_failover_info(const qd_connector_t* ct); +void qd_connector_decref(qd_connector_t *ctor); -const char *qd_connector_policy_vhost(const qd_connector_t* ct); -void qd_connector_handle_transport_error(qd_connector_t *connector, uint64_t connection_id, pn_condition_t *condition); -void qd_connector_remote_opened(qd_connector_t *connector); +const qd_server_config_t *qd_connector_get_config(const qd_connector_t *ctor); +const char *qd_connector_get_group_correlator(const qd_connector_t *ctor); +bool qd_connector_has_failover_info(const qd_connector_t* ctor); +const char *qd_connector_policy_vhost(const qd_connector_t* ctor); +void qd_connector_handle_transport_error(qd_connector_t *ctor, uint64_t connection_id, pn_condition_t *condition); +void qd_connector_remote_opened(qd_connector_t *ctor); // add a new connection to the parent connector -void qd_connector_add_connection(qd_connector_t *connector, qd_connection_t *ctx); -void qd_connector_add_link(qd_connector_t *connector); +void qd_connector_add_connection(qd_connector_t *ctor, qd_connection_t *qd_conn); +void qd_connector_add_link(qd_connector_t *ctor); // remove the child connection // NOTE WELL: this may free the connector if the connection is holding the last // reference to it -void qd_connector_remove_connection(qd_connector_t *connector, bool final, const char *condition_name, const char *condition_description); +void qd_connector_remove_connection(qd_connector_t *ctor, bool final, const char *condition_name, const char *condition_description); #endif diff --git a/src/adaptors/amqp/server_config.c b/src/adaptors/amqp/server_config.c index 86e7fba78..7f18c4522 100644 --- a/src/adaptors/amqp/server_config.c +++ b/src/adaptors/amqp/server_config.c @@ -95,7 +95,7 @@ static void set_config_host(qd_server_config_t *config, qd_entity_t* entity) /** * Initialize the server_config from the listener or connector management entity instance */ -qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config, qd_entity_t* entity, bool is_listener, const char *role_override) +qd_error_t qd_server_config_load(qd_server_config_t *config, qd_entity_t *entity, bool is_listener) { qd_error_clear(); @@ -109,14 +109,9 @@ qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config, config->message_log_flags = qd_message_repr_flags(config->log_message); config->port = qd_entity_get_string(entity, "port"); CHECK(); config->name = qd_entity_opt_string(entity, "name", 0); CHECK(); + config->role = qd_entity_get_string(entity, "role"); CHECK(); long inter_router_cost = qd_entity_opt_long(entity, "cost", 1); CHECK(); - if (!role_override) { - config->role = qd_entity_get_string(entity, "role"); CHECK(); - } else { - config->role = strdup(role_override); - } - // // The cost field on the listener or the connector should be > 0 and <= INT32_MAX // The router will terminate on invalid cost values. @@ -148,15 +143,6 @@ qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *config, config->policy_vhost = qd_entity_opt_string(entity, "policyVhost", 0); CHECK(); config->conn_props = qd_entity_opt_map(entity, "openProperties"); CHECK(); - - if (strcmp(config->role, "inter-router") == 0) { - // For inter-router connections only, the dataConnectionCount defaults to "auto", - // which means it will be determined as a function of the number of worker threads. - // If the user has *not* explicitly set the value "0", - // then we will have some data connections. - config->has_data_connectors = true; - } - set_config_host(config, entity); if (config->sasl_password) { diff --git a/src/adaptors/amqp/server_config.h b/src/adaptors/amqp/server_config.h index cb2025af7..04390d150 100644 --- a/src/adaptors/amqp/server_config.h +++ b/src/adaptors/amqp/server_config.h @@ -268,8 +268,6 @@ typedef struct qd_server_config_t { */ pn_data_t *conn_props; - bool has_data_connectors; - /** * @name These fields are not primary configuration, they are computed. * @{ @@ -286,7 +284,7 @@ typedef struct qd_server_config_t { } qd_server_config_t; -qd_error_t qd_server_config_load(qd_dispatch_t *qd, qd_server_config_t *cf, qd_entity_t *entity, bool is_listener, const char *role_override); +qd_error_t qd_server_config_load(qd_server_config_t *cf, qd_entity_t *entity, bool is_listener); void qd_server_config_free(qd_server_config_t *cf); void qd_server_config_process_password(char **actual_val, char *pw, bool *is_file, bool allow_literal_prefix); void qd_set_password_from_file(const char *password_file, char **password_field); diff --git a/src/dispatch.c b/src/dispatch.c index 9e29654b3..3fcc89c1a 100644 --- a/src/dispatch.c +++ b/src/dispatch.c @@ -458,3 +458,13 @@ qd_policy_t *qd_dispatch_get_policy(const qd_dispatch_t *dispatch) { return qd->policy; } + +uint32_t qd_dispatch_get_data_connection_count(const qd_dispatch_t *dispatch) +{ + return qd->data_connection_count; +} + +qd_server_t *qd_dispatch_get_server(const qd_dispatch_t *dispatch) +{ + return qd->server; +}