Skip to content

Commit

Permalink
fetch only columns referenced in SELECT (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
kipta1 authored Dec 14, 2024
1 parent a9cc095 commit eae4c95
Showing 1 changed file with 62 additions and 74 deletions.
136 changes: 62 additions & 74 deletions ODBCLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@

using namespace Vertica;

// ii declare global variable colInTable (# columns in source table, vidx (array conataining index of column in SELECT)
int colInTable = 0;
std::vector<int> vidx;
//
static inline TimeADT getTimeFromHMS(uint32 hour, uint8 min, uint8 sec) {
return getTimeFromUnixTime(sec + min*60 + hour*3600);
}
Expand Down Expand Up @@ -288,7 +292,9 @@ class ODBCLoader : public UDParser {
#if LOADER_DEBUG
srvInterface.log("DEBUG Number of fetched rows/columns = %lu/%d", nfrows, numcols);
#endif
for (uint32 j = 0; j < (uint32)nfrows; j++) { // for each fetched row...
for (uint32 j = 0; j < (uint32)nfrows; j++) { // for each fetched row...
for (SQLUSMALLINT i = 0; i < colInTable; i++)
writer->setNull(i); // set all cols to NULL
for (SQLUSMALLINT i = 0; i < numcols; i++) { // for each column...
#if LOADER_DEBUG
srvInterface.log("DEBUG nfrows=%u j=%u i=%d lenp[%d][%d]=%ld", (uint32)nfrows, j, i, i, j, lenp[i][j]);
Expand All @@ -303,51 +309,51 @@ class ODBCLoader : public UDParser {

std::string rejectReason = "unrecognized syntax from remote database";

// Null's are easy
// except when they're not due to typecast mismatch fun
if ((int)data.len == (int)SQL_NULL_DATA) { writer->setNull(i); }
else switch (vtype[i]) {
if ((int)data.len != (int)SQL_NULL_DATA ) { // (re)write NOT NULL cols
switch (vtype[i]) {

// Simple fixed-length types
// Let C++ figure out how to convert from, ie., SQLBIGINT to vint.
// (Both are native C++ types with appropriate meanings, so hopefully this will DTRT.)
// (In most implementations they are probably the same type so this is a no-op.)
case BoolOID: writer->setBool(i, (*(SQLCHAR*)data.buf == SQL_TRUE ? VTrue : VFalse)); break;
case BoolOID:
writer->setBool(vidx.at(i), (*(SQLCHAR*)data.buf == SQL_TRUE ? VTrue : VFalse));
break;
case Int8OID:
if (quirks != Oracle) {
writer->setInt(i, *(SQLBIGINT*)data.buf);
writer->setInt(vidx.at(i), *(SQLBIGINT*)data.buf);
} else {
// Oracle doesn't support int64 as a type.
// So we get the data as a string and parse it to an int64.
if (data.len == SQL_NTS) { writer->setInt(i, vint_null); }
else { writer->setInt(i, (vint)atoll((char*)data.buf)); }
if (data.len == SQL_NTS) { writer->setInt(vidx.at(i), vint_null); }
else { writer->setInt(vidx.at(i), (vint)atoll((char*)data.buf)); }
}
break;
case Float8OID: writer->setFloat(i, *(SQLDOUBLE*)data.buf); break;

// Strings (all the same representation)
case Float8OID:
writer->setFloat(vidx.at(i), *(SQLDOUBLE*)data.buf);
break;
case CharOID: case BinaryOID:
case VarcharOID: case VarbinaryOID:

#ifndef NO_LONG_OIDS
case LongVarcharOID: case LongVarbinaryOID:
#endif // NO_LONG_OIDS

if (data.len == SQL_NTS) { data.len = strnlen((char*)data.buf, getFieldSizeForCol(i)); }
writer->getStringRef(i).copy((char*)data.buf, data.len);
#endif
if (data.len == SQL_NTS) {
data.len = strnlen((char*)data.buf, getFieldSizeForCol(vidx.at(i)));
}
writer->getStringRef(vidx.at(i)).copy((char*)data.buf, data.len);
break;

// Date/Time functions that work in reasonably direct ways
case DateOID: {
SQL_DATE_STRUCT &s = *(SQL_DATE_STRUCT*)data.buf;
struct tm d = {0,0,0,s.day,s.month-1,s.year-1900,0,0,-1};
time_t unixtime = mktime(&d);
writer->setDate(i, getDateFromUnixTime(unixtime + d.tm_gmtoff));
writer->setDate(vidx.at(i), getDateFromUnixTime(unixtime + d.tm_gmtoff));
break;
}
case TimeOID: {
SQL_TIME_STRUCT &s = *(SQL_TIME_STRUCT*)data.buf;
writer->setTime(i, getTimeFromHMS(s.hour, s.minute, s.second));
writer->setTime(vidx.at(i), getTimeFromHMS(s.hour, s.minute, s.second));
break;
}
case TimestampOID: {
Expand All @@ -356,31 +362,31 @@ class ODBCLoader : public UDParser {
time_t unixtime = mktime(&d);
// s.fraction is in nanoseconds; Vertica only does microsecond resolution
// setTimestamp() wants time since epoch localtime.
writer->setTimestamp(i, getTimestampFromUnixTime(unixtime + d.tm_gmtoff) + s.fraction/1000);
writer->setTimestamp(vidx.at(i), getTimestampFromUnixTime(unixtime + d.tm_gmtoff) + s.fraction/1000);
break;
}

// Date/Time functions that require string-parsing
case TimeTzOID: {
// Hacky workaround: Some databases (ie., us) send the empty string instead of NULL here
if (((char*)data.buf)[0] == '\0') { writer->setNull(i); break; }
if (((char*)data.buf)[0] == '\0') { writer->setNull(vidx.at(i)); break; }
TimeADT t = 0;

if (!parser.parseTimeTz((char*)data.buf, (size_t)data.len, i, t, getVerticaTypeOfCol(i), rejectReason)) {
if (!parser.parseTimeTz((char*)data.buf, (size_t)data.len, i, t, getVerticaTypeOfCol(vidx.at(i)), rejectReason)) {
vt_report_error(0, "Error parsing TimeTz: '%s' (%s)", (char*)data.buf, rejectReason.c_str()); // No rejected-rows for us! Die on failure.
}
writer->setTimeTz(i,t);
writer->setTimeTz(vidx.at(i),t);
break;
}

case TimestampTzOID: {
// Hacky workaround: Some databases (ie., us) send the empty string instead of NULL here
if (((char*)data.buf)[0] == '\0') { writer->setNull(i); break; }
if (((char*)data.buf)[0] == '\0') { writer->setNull(vidx.at(i)); break; }
TimestampTz t = 0;
if (!parser.parseTimestampTz((char*)data.buf, (size_t)data.len, i, t, getVerticaTypeOfCol(i), rejectReason)) {
if (!parser.parseTimestampTz((char*)data.buf, (size_t)data.len, i, t, getVerticaTypeOfCol(vidx.at(i)), rejectReason)) {
vt_report_error(0, "Error parsing TimestampTz: '%s' (%s)", (char*)data.buf, rejectReason.c_str()); // No rejected-rows for us! Die on failure.
}
writer->setTimestampTz(i,t);
writer->setTimestampTz(vidx.at(i),t);
break;
}

Expand All @@ -400,7 +406,7 @@ class ODBCLoader : public UDParser {
+ (intv.intval.day_second.fraction/1000)) // Fractions are in nanoseconds; we do microseconds
* (intv.interval_sign == SQL_TRUE ? -1 : 1); // Apply the sign bit

writer->setInterval(i, ret);
writer->setInterval(vidx.at(i), ret);
break;
}

Expand All @@ -417,7 +423,7 @@ class ODBCLoader : public UDParser {
+ (intv.intval.year_month.month))
* (intv.interval_sign == SQL_TRUE ? -1 : 1); // Apply the sign bit

writer->setInterval(i, ret);
writer->setInterval(vidx.at(i), ret);
break;
}

Expand All @@ -426,25 +432,29 @@ class ODBCLoader : public UDParser {
// make this use the native binary format and cast/convert as needed.
case NumericOID: {
// Hacky workaround: Some databases may send the empty string instead of NULL here
if (((char*)data.buf)[0] == '\0') { writer->setNull(i); break; }
if (!parser.parseNumeric((char*)data.buf, (size_t)data.len, i, writer->getNumericRef(i), getVerticaTypeOfCol(i), rejectReason)) {
if (((char*)data.buf)[0] == '\0') { writer->setNull(vidx.at(i)); break; }
if (!parser.parseNumeric((char*)data.buf, (size_t)data.len, i, writer->getNumericRef(vidx.at(i)), getVerticaTypeOfCol(vidx.at(i)), rejectReason)) {
vt_report_error(0, "Error parsing Numeric: '%s' (%s)", (char*)data.buf, rejectReason.c_str()); // No rejected-rows for us! Die on failure.
}
break;
}

default: vt_report_error(0, "Unrecognized Vertica type %s (OID %llu)", getVerticaTypeOfCol(i).getTypeStr(), getVerticaTypeOfCol(i).getTypeOid());
}
}
default:
vt_report_error(0, "Unrecognized Vertica type %s (OID %llu)",
getVerticaTypeOfCol(vidx.at(i)).getTypeStr(),
getVerticaTypeOfCol(vidx.at(i)).getTypeOid());
} // End SWITCH
} // End IF NOT NULL
} // End FOR EACH COLUMN

writer->next();
writer->next(); // avanzamento alla riga successiva (scrive e avanza il cursor)

if (++iter_counter == ROWS_PER_BREAK) {
// Periodically yield and let upstream do its thing
return KEEP_GOING;
}
}
}
} // End FOR EACH ROW
} // End FETCH LOOP

// If SQLFetch() failed for some reason, report it
// But, SQLFetch() is allowed to return SQL_NO_DATA from time to time.
Expand All @@ -454,7 +464,7 @@ class ODBCLoader : public UDParser {
}

return DONE;
}
} // End PROCESS

void setQuirksMode(ServerInterface &srvInterface, SQLHDBC &dbc) {
// Set the quirks mode based on the DB name
Expand Down Expand Up @@ -556,42 +566,23 @@ class ODBCLoader : public UDParser {
if ( src_cfilter ) {
if (srvInterface.getParamReader().containsParameter("__query_col_name__")) {
if (srvInterface.getParamReader().containsParameter("__query_col_idx__")) {
int ncols = (int)colInfo.getColumnCount() ;
std::stringstream ss_cols(srvInterface.getParamReader().getStringRef("__query_col_name__").str());
colInTable = (int)colInfo.getColumnCount() ;
#if LOADER_DEBUG
srvInterface.log("DEBUG __query_col_name__=<%s>",srvInterface.getParamReader().getStringRef("__query_col_name__").str().c_str());
srvInterface.log("DEBUG __query_col_idx__=<%s>",srvInterface.getParamReader().getStringRef("__query_col_idx__").str().c_str());
srvInterface.log("-----> External Table Columns, colInTable=<%d>", colInTable);
#endif
std::string slist=srvInterface.getParamReader().getStringRef("__query_col_name__").str();
std::stringstream ss_idx(srvInterface.getParamReader().getStringRef("__query_col_idx__").str());
std::string tk_col, tk_idx, slist="" ;
int k = 0;

std::string tk_idx ;
vidx.clear();
while (std::getline(ss_idx, tk_idx, ',')) {
std::getline(ss_cols, tk_col, ',');
for ( ; k < atoi(tk_idx.c_str()) ; k++ )
slist += k ? ", NULL" : "NULL" ;
if ( k )
slist += "," ;
slist += ( tk_col == "override_query" ) ? "'.' AS override_query" : tk_col ;
k++ ;
vidx.push_back(stoi(tk_idx));
}

// MF to remove Vertica casts (::<data_type>)
#if LOADER_DEBUG
srvInterface.log("DEBUG BEFORE GlobalReplace slist=<%s>", slist.c_str());
#endif
pcrecpp::RE(REG_CASTRM).GlobalReplace("", &slist) ;
#if LOADER_DEBUG
srvInterface.log("DEBUG AFTER GlobalReplace slist=<%s> (length=%zu)", slist.c_str(), slist.length());
#endif

// Add NULLs for the remaining columns (if select list is not empty)
if ( slist.length() ) {
for ( ; k < ncols ; k++ ) {
slist += k ? ", NULL" : "NULL" ;
}
} else {
slist = "*" ;
}
query = "SELECT " + slist + " FROM ( " + query + " ) sq" ;
#if LOADER_DEBUG
srvInterface.log("DEBUG FINAL slist=%s", slist.c_str());
#endif
} else {
query = "SELECT " +
srvInterface.getParamReader().getStringRef("__query_col_name__").str() +
Expand All @@ -600,6 +591,7 @@ class ODBCLoader : public UDParser {
" ) sq" ;
}
}

} else {
query = oq_flag ? "SELECT '.' AS override_query, sq.* FROM ( " + query + " ) sq" : "SELECT * FROM ( " + query + " ) sq" ;
}
Expand Down Expand Up @@ -652,10 +644,6 @@ class ODBCLoader : public UDParser {
r = SQLNumResultCols(stmt, &numcols);
handleReturnCode(srvInterface, r, SQL_HANDLE_STMT, stmt, "SQLNumResultCols()");

if ((ssize_t)numcols != (ssize_t)colInfo.getColumnCount()) {
vt_report_error(0, "Expected %d columns; got %d from the remote database", (int)colInfo.getColumnCount(), (int)numcols);
}

// Allocate space for result & length array pointers
resp = (SQLPOINTER *)srvInterface.allocator->alloc(numcols * sizeof(SQLPOINTER)) ;
lenp = (SQLLEN **)srvInterface.allocator->alloc(numcols * sizeof(SQLLEN *)) ;
Expand All @@ -667,15 +655,15 @@ class ODBCLoader : public UDParser {
// Set up column-data buffers
// Bind to the columns in question
for (SQLSMALLINT i = 0; i < numcols; i++) {
vtype[i] = getVerticaTypeOfCol(i).getTypeOid();
stype[i] = getFieldSizeForCol(i) ;
vtype[i] = getVerticaTypeOfCol(vidx.at(i)).getTypeOid();
stype[i] = getFieldSizeForCol(vidx.at(i)) ;
#if LOADER_DEBUG
srvInterface.log("DEBUG i=%d rowset=%zu stype[i]=%d", i, rowset, stype[i]);
#endif
resp[i] = (SQLPOINTER)srvInterface.allocator->alloc(stype[i] * rowset);
lenp[i] = (SQLLEN *)srvInterface.allocator->alloc(sizeof(SQLLEN) * rowset);

r = SQLBindCol(stmt, i+1, getCTypeOfCol(i), resp[i], stype[i], lenp[i]);
r = SQLBindCol(stmt, i+1, getCTypeOfCol(vidx.at(i)), resp[i], stype[i], lenp[i]);
handleReturnCode(srvInterface, r, SQL_HANDLE_STMT, stmt, "SQLBindCol()");
}
}
Expand Down

0 comments on commit eae4c95

Please sign in to comment.