diff --git a/ODBCLoader.cpp b/ODBCLoader.cpp index e6d9aa1..2d441d5 100644 --- a/ODBCLoader.cpp +++ b/ODBCLoader.cpp @@ -42,6 +42,10 @@ using namespace Vertica; +// ii declare global variable colInTable (# columns in source table, vidx (array conataining index of column in SELECT) +int colInTable = 0; +std::vector vidx; +// static inline TimeADT getTimeFromHMS(uint32 hour, uint8 min, uint8 sec) { return getTimeFromUnixTime(sec + min*60 + hour*3600); } @@ -288,7 +292,9 @@ class ODBCLoader : public UDParser { #if LOADER_DEBUG srvInterface.log("DEBUG Number of fetched rows/columns = %lu/%d", nfrows, numcols); #endif - for (uint32 j = 0; j < (uint32)nfrows; j++) { // for each fetched row... + for (uint32 j = 0; j < (uint32)nfrows; j++) { // for each fetched row... + for (SQLUSMALLINT i = 0; i < colInTable; i++) + writer->setNull(i); // set all cols to NULL for (SQLUSMALLINT i = 0; i < numcols; i++) { // for each column... #if LOADER_DEBUG srvInterface.log("DEBUG nfrows=%u j=%u i=%d lenp[%d][%d]=%ld", (uint32)nfrows, j, i, i, j, lenp[i][j]); @@ -303,38 +309,38 @@ class ODBCLoader : public UDParser { std::string rejectReason = "unrecognized syntax from remote database"; - // Null's are easy - // except when they're not due to typecast mismatch fun - if ((int)data.len == (int)SQL_NULL_DATA) { writer->setNull(i); } - else switch (vtype[i]) { + if ((int)data.len != (int)SQL_NULL_DATA ) { // (re)write NOT NULL cols + switch (vtype[i]) { // Simple fixed-length types // Let C++ figure out how to convert from, ie., SQLBIGINT to vint. // (Both are native C++ types with appropriate meanings, so hopefully this will DTRT.) // (In most implementations they are probably the same type so this is a no-op.) - case BoolOID: writer->setBool(i, (*(SQLCHAR*)data.buf == SQL_TRUE ? VTrue : VFalse)); break; + case BoolOID: + writer->setBool(vidx.at(i), (*(SQLCHAR*)data.buf == SQL_TRUE ? VTrue : VFalse)); + break; case Int8OID: if (quirks != Oracle) { - writer->setInt(i, *(SQLBIGINT*)data.buf); + writer->setInt(vidx.at(i), *(SQLBIGINT*)data.buf); } else { // Oracle doesn't support int64 as a type. // So we get the data as a string and parse it to an int64. - if (data.len == SQL_NTS) { writer->setInt(i, vint_null); } - else { writer->setInt(i, (vint)atoll((char*)data.buf)); } + if (data.len == SQL_NTS) { writer->setInt(vidx.at(i), vint_null); } + else { writer->setInt(vidx.at(i), (vint)atoll((char*)data.buf)); } } break; - case Float8OID: writer->setFloat(i, *(SQLDOUBLE*)data.buf); break; - - // Strings (all the same representation) + case Float8OID: + writer->setFloat(vidx.at(i), *(SQLDOUBLE*)data.buf); + break; case CharOID: case BinaryOID: case VarcharOID: case VarbinaryOID: - #ifndef NO_LONG_OIDS case LongVarcharOID: case LongVarbinaryOID: -#endif // NO_LONG_OIDS - - if (data.len == SQL_NTS) { data.len = strnlen((char*)data.buf, getFieldSizeForCol(i)); } - writer->getStringRef(i).copy((char*)data.buf, data.len); +#endif + if (data.len == SQL_NTS) { + data.len = strnlen((char*)data.buf, getFieldSizeForCol(vidx.at(i))); + } + writer->getStringRef(vidx.at(i)).copy((char*)data.buf, data.len); break; // Date/Time functions that work in reasonably direct ways @@ -342,12 +348,12 @@ class ODBCLoader : public UDParser { SQL_DATE_STRUCT &s = *(SQL_DATE_STRUCT*)data.buf; struct tm d = {0,0,0,s.day,s.month-1,s.year-1900,0,0,-1}; time_t unixtime = mktime(&d); - writer->setDate(i, getDateFromUnixTime(unixtime + d.tm_gmtoff)); + writer->setDate(vidx.at(i), getDateFromUnixTime(unixtime + d.tm_gmtoff)); break; } case TimeOID: { SQL_TIME_STRUCT &s = *(SQL_TIME_STRUCT*)data.buf; - writer->setTime(i, getTimeFromHMS(s.hour, s.minute, s.second)); + writer->setTime(vidx.at(i), getTimeFromHMS(s.hour, s.minute, s.second)); break; } case TimestampOID: { @@ -356,31 +362,31 @@ class ODBCLoader : public UDParser { time_t unixtime = mktime(&d); // s.fraction is in nanoseconds; Vertica only does microsecond resolution // setTimestamp() wants time since epoch localtime. - writer->setTimestamp(i, getTimestampFromUnixTime(unixtime + d.tm_gmtoff) + s.fraction/1000); + writer->setTimestamp(vidx.at(i), getTimestampFromUnixTime(unixtime + d.tm_gmtoff) + s.fraction/1000); break; } // Date/Time functions that require string-parsing case TimeTzOID: { // Hacky workaround: Some databases (ie., us) send the empty string instead of NULL here - if (((char*)data.buf)[0] == '\0') { writer->setNull(i); break; } + if (((char*)data.buf)[0] == '\0') { writer->setNull(vidx.at(i)); break; } TimeADT t = 0; - if (!parser.parseTimeTz((char*)data.buf, (size_t)data.len, i, t, getVerticaTypeOfCol(i), rejectReason)) { + if (!parser.parseTimeTz((char*)data.buf, (size_t)data.len, i, t, getVerticaTypeOfCol(vidx.at(i)), rejectReason)) { vt_report_error(0, "Error parsing TimeTz: '%s' (%s)", (char*)data.buf, rejectReason.c_str()); // No rejected-rows for us! Die on failure. } - writer->setTimeTz(i,t); + writer->setTimeTz(vidx.at(i),t); break; } case TimestampTzOID: { // Hacky workaround: Some databases (ie., us) send the empty string instead of NULL here - if (((char*)data.buf)[0] == '\0') { writer->setNull(i); break; } + if (((char*)data.buf)[0] == '\0') { writer->setNull(vidx.at(i)); break; } TimestampTz t = 0; - if (!parser.parseTimestampTz((char*)data.buf, (size_t)data.len, i, t, getVerticaTypeOfCol(i), rejectReason)) { + if (!parser.parseTimestampTz((char*)data.buf, (size_t)data.len, i, t, getVerticaTypeOfCol(vidx.at(i)), rejectReason)) { vt_report_error(0, "Error parsing TimestampTz: '%s' (%s)", (char*)data.buf, rejectReason.c_str()); // No rejected-rows for us! Die on failure. } - writer->setTimestampTz(i,t); + writer->setTimestampTz(vidx.at(i),t); break; } @@ -400,7 +406,7 @@ class ODBCLoader : public UDParser { + (intv.intval.day_second.fraction/1000)) // Fractions are in nanoseconds; we do microseconds * (intv.interval_sign == SQL_TRUE ? -1 : 1); // Apply the sign bit - writer->setInterval(i, ret); + writer->setInterval(vidx.at(i), ret); break; } @@ -417,7 +423,7 @@ class ODBCLoader : public UDParser { + (intv.intval.year_month.month)) * (intv.interval_sign == SQL_TRUE ? -1 : 1); // Apply the sign bit - writer->setInterval(i, ret); + writer->setInterval(vidx.at(i), ret); break; } @@ -426,25 +432,29 @@ class ODBCLoader : public UDParser { // make this use the native binary format and cast/convert as needed. case NumericOID: { // Hacky workaround: Some databases may send the empty string instead of NULL here - if (((char*)data.buf)[0] == '\0') { writer->setNull(i); break; } - if (!parser.parseNumeric((char*)data.buf, (size_t)data.len, i, writer->getNumericRef(i), getVerticaTypeOfCol(i), rejectReason)) { + if (((char*)data.buf)[0] == '\0') { writer->setNull(vidx.at(i)); break; } + if (!parser.parseNumeric((char*)data.buf, (size_t)data.len, i, writer->getNumericRef(vidx.at(i)), getVerticaTypeOfCol(vidx.at(i)), rejectReason)) { vt_report_error(0, "Error parsing Numeric: '%s' (%s)", (char*)data.buf, rejectReason.c_str()); // No rejected-rows for us! Die on failure. } break; } - default: vt_report_error(0, "Unrecognized Vertica type %s (OID %llu)", getVerticaTypeOfCol(i).getTypeStr(), getVerticaTypeOfCol(i).getTypeOid()); - } - } + default: + vt_report_error(0, "Unrecognized Vertica type %s (OID %llu)", + getVerticaTypeOfCol(vidx.at(i)).getTypeStr(), + getVerticaTypeOfCol(vidx.at(i)).getTypeOid()); + } // End SWITCH + } // End IF NOT NULL + } // End FOR EACH COLUMN - writer->next(); + writer->next(); // avanzamento alla riga successiva (scrive e avanza il cursor) if (++iter_counter == ROWS_PER_BREAK) { // Periodically yield and let upstream do its thing return KEEP_GOING; } - } - } + } // End FOR EACH ROW + } // End FETCH LOOP // If SQLFetch() failed for some reason, report it // But, SQLFetch() is allowed to return SQL_NO_DATA from time to time. @@ -454,7 +464,7 @@ class ODBCLoader : public UDParser { } return DONE; - } + } // End PROCESS void setQuirksMode(ServerInterface &srvInterface, SQLHDBC &dbc) { // Set the quirks mode based on the DB name @@ -556,42 +566,23 @@ class ODBCLoader : public UDParser { if ( src_cfilter ) { if (srvInterface.getParamReader().containsParameter("__query_col_name__")) { if (srvInterface.getParamReader().containsParameter("__query_col_idx__")) { - int ncols = (int)colInfo.getColumnCount() ; - std::stringstream ss_cols(srvInterface.getParamReader().getStringRef("__query_col_name__").str()); + colInTable = (int)colInfo.getColumnCount() ; +#if LOADER_DEBUG + srvInterface.log("DEBUG __query_col_name__=<%s>",srvInterface.getParamReader().getStringRef("__query_col_name__").str().c_str()); + srvInterface.log("DEBUG __query_col_idx__=<%s>",srvInterface.getParamReader().getStringRef("__query_col_idx__").str().c_str()); +srvInterface.log("-----> External Table Columns, colInTable=<%d>", colInTable); +#endif + std::string slist=srvInterface.getParamReader().getStringRef("__query_col_name__").str(); std::stringstream ss_idx(srvInterface.getParamReader().getStringRef("__query_col_idx__").str()); - std::string tk_col, tk_idx, slist="" ; - int k = 0; - + std::string tk_idx ; + vidx.clear(); while (std::getline(ss_idx, tk_idx, ',')) { - std::getline(ss_cols, tk_col, ','); - for ( ; k < atoi(tk_idx.c_str()) ; k++ ) - slist += k ? ", NULL" : "NULL" ; - if ( k ) - slist += "," ; - slist += ( tk_col == "override_query" ) ? "'.' AS override_query" : tk_col ; - k++ ; + vidx.push_back(stoi(tk_idx)); } + // MF to remove Vertica casts (::) -#if LOADER_DEBUG - srvInterface.log("DEBUG BEFORE GlobalReplace slist=<%s>", slist.c_str()); -#endif pcrecpp::RE(REG_CASTRM).GlobalReplace("", &slist) ; -#if LOADER_DEBUG - srvInterface.log("DEBUG AFTER GlobalReplace slist=<%s> (length=%zu)", slist.c_str(), slist.length()); -#endif - - // Add NULLs for the remaining columns (if select list is not empty) - if ( slist.length() ) { - for ( ; k < ncols ; k++ ) { - slist += k ? ", NULL" : "NULL" ; - } - } else { - slist = "*" ; - } query = "SELECT " + slist + " FROM ( " + query + " ) sq" ; -#if LOADER_DEBUG - srvInterface.log("DEBUG FINAL slist=%s", slist.c_str()); -#endif } else { query = "SELECT " + srvInterface.getParamReader().getStringRef("__query_col_name__").str() + @@ -600,6 +591,7 @@ class ODBCLoader : public UDParser { " ) sq" ; } } + } else { query = oq_flag ? "SELECT '.' AS override_query, sq.* FROM ( " + query + " ) sq" : "SELECT * FROM ( " + query + " ) sq" ; } @@ -652,10 +644,6 @@ class ODBCLoader : public UDParser { r = SQLNumResultCols(stmt, &numcols); handleReturnCode(srvInterface, r, SQL_HANDLE_STMT, stmt, "SQLNumResultCols()"); - if ((ssize_t)numcols != (ssize_t)colInfo.getColumnCount()) { - vt_report_error(0, "Expected %d columns; got %d from the remote database", (int)colInfo.getColumnCount(), (int)numcols); - } - // Allocate space for result & length array pointers resp = (SQLPOINTER *)srvInterface.allocator->alloc(numcols * sizeof(SQLPOINTER)) ; lenp = (SQLLEN **)srvInterface.allocator->alloc(numcols * sizeof(SQLLEN *)) ; @@ -667,15 +655,15 @@ class ODBCLoader : public UDParser { // Set up column-data buffers // Bind to the columns in question for (SQLSMALLINT i = 0; i < numcols; i++) { - vtype[i] = getVerticaTypeOfCol(i).getTypeOid(); - stype[i] = getFieldSizeForCol(i) ; + vtype[i] = getVerticaTypeOfCol(vidx.at(i)).getTypeOid(); + stype[i] = getFieldSizeForCol(vidx.at(i)) ; #if LOADER_DEBUG srvInterface.log("DEBUG i=%d rowset=%zu stype[i]=%d", i, rowset, stype[i]); #endif resp[i] = (SQLPOINTER)srvInterface.allocator->alloc(stype[i] * rowset); lenp[i] = (SQLLEN *)srvInterface.allocator->alloc(sizeof(SQLLEN) * rowset); - r = SQLBindCol(stmt, i+1, getCTypeOfCol(i), resp[i], stype[i], lenp[i]); + r = SQLBindCol(stmt, i+1, getCTypeOfCol(vidx.at(i)), resp[i], stype[i], lenp[i]); handleReturnCode(srvInterface, r, SQL_HANDLE_STMT, stmt, "SQLBindCol()"); } }