Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure correct field order in INSERT and implement order_id sequence functionality #201

Merged
merged 25 commits into from
Aug 2, 2017
Merged
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7764fc0
Add support for iterating over a StructuredFile data endpoint
smgallo Jun 19, 2017
93ee7f9
Improve comments, logging, and phpcs style fixes
smgallo Jun 19, 2017
765f18c
Merge branch 'xdmod7.0' of github.com:ubccr/xdmod into etl/structured…
smgallo Jun 19, 2017
d7e2402
Move handling of destination_record_map config directive from pdoInge…
smgallo Jun 21, 2017
536ee64
Consolidate common functionality. Move pre- and post-execute tasks in…
smgallo Jun 22, 2017
59f5049
Merge branch 'xdmod7.0' of github.com:ubccr/xdmod into etl/structured…
smgallo Jun 28, 2017
c0af908
Add handling of requested and expected record fields to StructuredFil…
smgallo Jun 29, 2017
a967a25
Merge branch 'xdmod7.0' of github.com:ubccr/xdmod into etl/structured…
smgallo Jun 30, 2017
8381b8e
Apply command-line where clause to table row count calculation
smgallo Jul 5, 2017
efe7468
Fix support for passing overrides on the ETL command line
smgallo Jul 6, 2017
6f7a502
Improve debugging output
smgallo Jul 6, 2017
5a5f352
Allow public properties of any object type to be verified
smgallo Jul 6, 2017
79a8f27
Fix bugs in experimental aggregation from update to DbModel
smgallo Jul 7, 2017
7c84f41
Improve debugging
smgallo Jul 7, 2017
9d148d4
PHPCS fixes
smgallo Jul 7, 2017
0cbbe57
PHPCS fixes
smgallo Jul 7, 2017
4c0c698
Add handling of record fields to StructuredFile endpoint
smgallo Jul 7, 2017
635d774
Update ingestors to use updated StructuredFile endpoint
smgallo Jul 7, 2017
d6863bd
Update sample ETL files
smgallo Jul 7, 2017
ab66e93
Move source data out of ETL table definitions into data directory. Up…
smgallo Jul 7, 2017
786fc57
Fix PHPCS style error
smgallo Jul 7, 2017
33d7bce
Remove commented out code
smgallo Jul 17, 2017
33ee1eb
Merge branch 'xdmod7.0' of github.com:ubccr/xdmod into etl/structured…
smgallo Jul 20, 2017
22da205
Merge branch 'xdmod7.0' of github.com:ubccr/xdmod into etl/structured…
smgallo Jul 26, 2017
28dbdcc
Ensure correct field order in INSERT. Implement order_id as sequence.
smgallo Aug 2, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 52 additions & 17 deletions classes/ETL/Ingestor/pdoIngestor.php
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null)
}

// Get the list of available source query fields. If we have described the source query in
// the JSON config, use the record keys otherwise we need to parse the SQL string.
// the JSON config, use the record keys otherwise we need to parse the SQL string. It is
// expected that the source record fields are in the same order as they are in the SQL
// query.

$this->sourceRecordFields =
( null !== $this->etlSourceQuery
Expand Down Expand Up @@ -460,10 +462,19 @@ private function singleDatabaseIngest()
reset($this->etlDestinationTableList);
$qualifiedDestTableName = current($this->etlDestinationTableList)->getFullName();

// Keys are table definition columns (destination) and values are query result columns (source). For a
// single database ingest it is assumed that no mapping is taking place (i.e., all source
// columns are mapped to the same destination columns)
$destColumnList = array_keys(current($this->destinationFieldMappings));
// Generate the list of destination fields. Note that the field list for the INSERT must be
// in the same order as the fields returned by the query or we will get field mismatches.
// Use the source record fields (generated from the source query in initialize()) as the
// correct order. Since the destination field map may have been user-specified we cannot
// guarantee the order.

$firstFieldMap = current($this->destinationFieldMappings);
$destColumnList = array();
foreach ( $this->sourceRecordFields as $sourceField ) {
if ( array_key_exists($sourceField, $firstFieldMap) ) {
$destColumnList[] = $sourceField;
}
}

// The default method for ingestion is INSERT INTO ON DUPLICATE KEY UPDATE because tests
// have shown an approx 40% performance improvement when updating existing data over REPLACE
Expand All @@ -485,7 +496,7 @@ function ($s) {
$destColumnList
);
$updateColumns = implode(',', $updateColumnList);
$sql = "INSERT INTO $qualifiedDestTableName ($destColumns) " . $this->sourceQueryString
$sql = "INSERT INTO $qualifiedDestTableName\n($destColumns)\n" . $this->sourceQueryString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the mix of interpolation and not here is messing with my reading of this...

. "\nON DUPLICATE KEY UPDATE $updateColumns";
}

Expand Down Expand Up @@ -525,12 +536,16 @@ private function multiDatabaseIngest()
$loadStatementList = array();
$numDestinationTables = count($this->etlDestinationTableList);

foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) {
// Iterate over the destination field mappings rather than the destination table list because it
// is possible that a table definition is provided but no data is mapped to it.

foreach ( $this->destinationFieldMappings as $etlTableKey => $destFieldToSourceFieldMap ) {
$etlTable = $this->etlDestinationTableList[$etlTableKey];
$qualifiedDestTableName = $etlTable->getFullName();

// If there are no source query columns mapped to this table, skip it.

if ( 0 == count($this->destinationFieldMappings[$etlTableKey]) ) {
if ( 0 == count($destFieldToSourceFieldMap) ) {
continue;
}

Expand All @@ -542,7 +557,7 @@ private function multiDatabaseIngest()
$this->logger->debug("Using temporary file '$infileName' for destination table key '$etlTableKey'");

// Keys are table columns (destination) and values are query result columns (source)
$destColumnList = array_keys($this->destinationFieldMappings[$etlTableKey]);
$destColumnList = array_keys($destFieldToSourceFieldMap);

// The default method for ingestion is INSERT INTO ON DUPLICATE KEY UPDATE because tests
// have shown an approx 40% performance improvement when updating existing data over
Expand Down Expand Up @@ -587,7 +602,7 @@ function ($s) {
$infileList[$etlTableKey] = $infileName;
$loadStatementList[$etlTableKey] = $loadStatement;

} // foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable )
} // foreach ( $this->destinationFieldMappings as $etlTableKey => $destFieldToSourceFieldMap )

if ( $this->getEtlOverseerOptions()->isDryrun() ) {
$this->logger->debug("Source query " . $this->sourceEndpoint . ":\n" . $this->sourceQueryString);
Expand Down Expand Up @@ -707,14 +722,29 @@ function ($s) {
// actually *hinders* performance in the tests! This was verified with 4,195,524 and 124,120
// rows.

$orderId = 0;

while ( $srcRecord = $sourceStatement->fetch(PDO::FETCH_ASSOC) ) {

$numSourceRecordsProcessed++;

// Some historical ingestors use an order_id and treat it similar to an auto-increment
// field, setting its value starting at 0 and incrementing for each record. It is not
// clear if this is used anywhere, but the functionality is maintained here for
// compatibility. Note that this does not work when adding fields into a table (e.g.,
// only when the table is truncated) and will not work properly in cases where the order
// is relative to a key. For example, if a key is resource_id and the order should be
// maintained for each unique resource_id we cannot use this method. To not overwrite
// existing data, only set the order_id if the source field exists and is NULL.

if ( array_key_exists('order_id', $srcRecord) && null === $srcRecord['order_id'] ) {
$srcRecord['order_id'] = $orderId++;
}

// Note that an array of transformed records is returned because a single source
// record may be transformed into multiple records.

$transformedRecords = $this->transform($srcRecord);
$transformedRecords = $this->transform($srcRecord, $orderId);

foreach ( $transformedRecords as $record ) {

Expand Down Expand Up @@ -781,7 +811,7 @@ function ($s) {
$this->logger->err($msg);
throw $e;
}
} // foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable )
} // foreach ( $loadStatementList as $etlTableKey => $loadStatement )

$this->logger->debug(
sprintf('Loaded %d files in %ds', $numFilesLoaded, microtime(true) - $loadFileStart)
Expand Down Expand Up @@ -822,7 +852,7 @@ function ($s) {
fclose($outFdList[$etlTableKey]);
@unlink($infileList[$etlTableKey]);

} // foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable )
} // foreach ( $loadStatementList as $etlTableKey => $loadStatement )

$this->logger->debug(sprintf('Loaded %d files in %ds', $numFilesLoaded, microtime(true) - $loadFileStart));

Expand Down Expand Up @@ -860,13 +890,15 @@ function ($s) {
*
* @param array $record An associative array containing the source record where the
* keys are the field names.
* @param int $orderId A reference to the relative ordering value used to set the value of an
* order_id field. @see multiDatabaseIngest()
*
* @return array A 2-dimensional array of potentially transformed records where each
* element is an individual record.
* ------------------------------------------------------------------------------------------
*/

protected function transform(array $srcRecord)
protected function transform(array $srcRecord, &$orderId)
{
foreach ( $srcRecord as $key => &$value ) {
if ( null === $value ) {
Expand Down Expand Up @@ -931,18 +963,21 @@ protected function allowSingleDatabaseOptimization()
return false;
}

// Can't optimize more than 1 destination table
// Can't optimize when writing data to more than 1 destination table

if ( count($this->etlDestinationTableList) > 1 ) {
$this->logger->debug("Multiple destination tables being populated");
return false;
}

// Can't optimize if mapping a subset of the query fields
// When creating the INSERT INTO ... SELECT statement in singleDatabaseIngest() we use the
// destination field map keys to generate the destination column list and use
// $this->sourceQueryString as the source query. These need to have the same fields (and be
// in the same order, but we will ensure proper order when generating the field list).

reset($this->destinationFieldMappings);

if ( count($this->sourceRecordFields) != count(current($this->destinationFieldMappings)) ) {
if ( 0 != count(array_diff($this->sourceRecordFields, array_keys(current($this->destinationFieldMappings)))) ) {
$this->logger->debug("Mapping a subset of the source query fields");
return false;
}
Expand Down