Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Structured File Ingestor #180

Merged
merged 22 commits into from
Jul 19, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7764fc0
Add support for iterating over a StructuredFile data endpoint
smgallo Jun 19, 2017
93ee7f9
Improve comments, logging, and phpcs style fixes
smgallo Jun 19, 2017
765f18c
Merge branch 'xdmod7.0' of github.com:ubccr/xdmod into etl/structured…
smgallo Jun 19, 2017
d7e2402
Move handling of destination_record_map config directive from pdoInge…
smgallo Jun 21, 2017
536ee64
Consolidate common functionality. Move pre- and post-execute tasks in…
smgallo Jun 22, 2017
59f5049
Merge branch 'xdmod7.0' of github.com:ubccr/xdmod into etl/structured…
smgallo Jun 28, 2017
c0af908
Add handling of requested and expected record fields to StructuredFil…
smgallo Jun 29, 2017
a967a25
Merge branch 'xdmod7.0' of github.com:ubccr/xdmod into etl/structured…
smgallo Jun 30, 2017
8381b8e
Apply command-line where clause to table row count calculation
smgallo Jul 5, 2017
efe7468
Fix support for passing overrides on the ETL command line
smgallo Jul 6, 2017
6f7a502
Improve debugging output
smgallo Jul 6, 2017
5a5f352
Allow public properties of any object type to be verified
smgallo Jul 6, 2017
79a8f27
Fix bugs in experimental aggregation from update to DbModel
smgallo Jul 7, 2017
7c84f41
Improve debugging
smgallo Jul 7, 2017
9d148d4
PHPCS fixes
smgallo Jul 7, 2017
0cbbe57
PHPCS fixes
smgallo Jul 7, 2017
4c0c698
Add handling of record fields to StructuredFile endpoint
smgallo Jul 7, 2017
635d774
Update ingestors to use updated StructuredFile endpoint
smgallo Jul 7, 2017
d6863bd
Update sample ETL files
smgallo Jul 7, 2017
ab66e93
Move source data out of ETL table definitions into data directory. Up…
smgallo Jul 7, 2017
786fc57
Fix PHPCS style error
smgallo Jul 7, 2017
33d7bce
Remove commented out code
smgallo Jul 17, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion classes/ETL/Aggregator/JobsAggregator.php
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ protected function performPreExecuteTasks()
* ------------------------------------------------------------------------------------------
*/

protected function performPostExecuteTasks($numRecordsProcessed)
protected function performPostExecuteTasks($numRecordsProcessed = null)
{
$sourceSchema = $this->sourceEndpoint->getSchema(true);
$tableName = $this->destinationEndpoint->quoteSystemIdentifier(self::STATUS_TABLE);
Expand Down
27 changes: 1 addition & 26 deletions classes/ETL/Aggregator/aAggregator.php
Original file line number Diff line number Diff line change
Expand Up @@ -174,32 +174,6 @@ public function execute(EtlOverseerOptions $etlOverseerOptions)

} // execute()

/* ------------------------------------------------------------------------------------------
* Perform any pre-execution tasks. For example, disabling table keys on MyISAM tables, or other
* setup tasks.
*
* NOTE: This method must check if we are in DRYRUN mode before executing any tasks.
*
* @return true on success
* ------------------------------------------------------------------------------------------
*/

abstract protected function performPreExecuteTasks();

/* ------------------------------------------------------------------------------------------
* Perform any post-execution tasks. For example, enabling table keys on MyISAM tables, or
* tracking table history.
*
* NOTE: This method must check if we are in DRYRUN mode before executing any tasks.
*
* @param $numRecordsProcessed The number of records processed during this period.
*
* @return true on success
* ------------------------------------------------------------------------------------------
*/

abstract protected function performPostExecuteTasks($numRecordsProcessed);

/* ------------------------------------------------------------------------------------------
* Perform any pre-aggregation unit tasks. This are performed prior to aggregating each
* aggregation unit (e.g., day, month, quarter) and might be verifying that a status table
Expand Down Expand Up @@ -240,5 +214,6 @@ abstract protected function performPostAggregationUnitTasks($aggregationUnit, $n
* ------------------------------------------------------------------------------------------
*/

// @codingStandardsIgnoreLine
abstract protected function _execute($aggregationUnit);
} // abstract class Aggregator
35 changes: 18 additions & 17 deletions classes/ETL/Aggregator/pdoAggregator.php
Original file line number Diff line number Diff line change
Expand Up @@ -290,32 +290,34 @@ protected function createDestinationTableObjects()

} // createDestinationTableObjects()

/* ------------------------------------------------------------------------------------------
* By default, there are no pre-execution tasks.
/** -----------------------------------------------------------------------------------------
* Note that we are not calling aRdbmsDestinationAction::performPreExecuteTasks()
* because we cannot properly manage the aggregation tables without knowing the
* aggregation unit or applying variable substitutions. Tables will be managed in
* performPreAggregationUnitTasks() instead.
*
* @see aAggregator::performPreExecuteTasks()
* @see aAction::performPreExecuteTasks()
* ------------------------------------------------------------------------------------------
*/

protected function performPreExecuteTasks()
{
// To support programmatic manipulation of the source Query object, save off the description
// of the first join (from) table
// To support programmatic manipulation of the source Query object, save off the
// description of the first join (from) table
$sourceJoins = $this->etlSourceQuery->joins;
$this->etlSourceQueryOrigFromTable = array_shift($sourceJoins);
$this->etlSourceQueryModified = false;

return true;
} // performPreExecuteTasks()

/* ------------------------------------------------------------------------------------------
* By default, there are no pre-execution tasks.
*
* @see aAggregator::performPostExecuteTasks()
/** -----------------------------------------------------------------------------------------
* @see performPostAggregationUnitTasks()
* @see aAction::performPostExecuteTasks()
* ------------------------------------------------------------------------------------------
*/

protected function performPostExecuteTasks($numRecordsProcessed)
protected function performPostExecuteTasks($numRecordsProcessed = null)
{
return true;
} // performPostExecuteTasks()
Expand Down Expand Up @@ -715,6 +717,7 @@ protected function getDirtyAggregationPeriods($aggregationUnit)
* ------------------------------------------------------------------------------------------
*/

// @codingStandardsIgnoreLine
protected function _execute($aggregationUnit)
{
$time_start = microtime(true);
Expand Down Expand Up @@ -770,11 +773,10 @@ protected function _execute($aggregationUnit)
$sourceJoins = $this->etlSourceQuery->joins;
$firstJoin = array_shift($sourceJoins);
$newFirstJoin = clone $firstJoin;
$newFirstJoin->setName($tmpTableName);
$newFirstJoin->name = $tmpTableName;
$newFirstJoin->schema = $this->sourceEndpoint->getSchema();

$this->etlSourceQuery->deleteJoins();
$this->etlSourceQuery->addJoin($newFirstJoin);
$this->etlSourceQuery->joins = array($newFirstJoin);
foreach ( $sourceJoins as $join ) {
$this->etlSourceQuery->addJoin($join);
}
Expand All @@ -788,8 +790,7 @@ protected function _execute($aggregationUnit)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove commented out code

$sourceJoins = $this->etlSourceQuery->joins;
array_shift($sourceJoins);
$this->etlSourceQuery->deleteJoins();
$this->etlSourceQuery->addJoin($this->etlSourceQueryOrigFromTable);
$this->etlSourceQuery->joins = array($this->etlSourceQueryOrigFromTable);
foreach ( $sourceJoins as $join ) {
$this->etlSourceQuery->addJoin($join);
}
Expand Down Expand Up @@ -891,7 +892,7 @@ protected function _execute($aggregationUnit)

$sourceJoins = $this->etlSourceQuery->joins;
$firstJoin = current($sourceJoins);
$tmpTableAlias = $firstJoin->getAlias();
$tmpTableAlias = $firstJoin->alias;

while ( ! $done ) {

Expand Down Expand Up @@ -945,7 +946,7 @@ protected function _execute($aggregationUnit)
try {
// Use the where clause from the aggregation query to create the temporary table

$whereClause = implode(" AND ", $this->etlSourceQuery->getWheres());
$whereClause = implode(" AND ", $this->etlSourceQuery->where);

$whereClause = Utilities::substituteVariables(
$whereClause,
Expand Down
2 changes: 1 addition & 1 deletion classes/ETL/Configuration/EtlConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public function __construct(
if ( array_key_exists('option_overrides', $options) && null !== $options['option_overrides'] ) {
if ( ! is_array($options['option_overrides']) ) {
$this->logAndThrowException("Option overrides must be an array");
} else {
} elseif ( 0 !== count($options['option_overrides']) ) {
$this->optionOverrides = $options['option_overrides'];
}
}
Expand Down
4 changes: 4 additions & 0 deletions classes/ETL/DataEndpoint/Filter/ExternalProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ public function onCreate()
$arguments = ( isset($this->params->arguments) ? " " . $this->params->arguments : "" );
$this->command = $this->params->path . $arguments;

if ( isset($this->params->logger) ) {
$this->params->logger->debug(sprintf("Creating filter %s: %s", self::NAME, $this->command));
}

// stream_bucket_new() needs somewhere to store temporary data but the
// documentation doesn't give any details:
// http://php.net/manual/en/function.stream-bucket-new.php
Expand Down
96 changes: 95 additions & 1 deletion classes/ETL/DataEndpoint/JsonFile.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,21 @@ protected function decodeRecord($data)
);
}

if ( is_array($decoded) ) {
// If we parsed an empty array or object do not include it as a record.

if (
(is_array($decoded) && 0 == count($decoded)) ||
(is_object($decoded) && 0 == count(get_object_vars($decoded)))
) {
return true;
}

// If we have decoded an array of records (either arrays or objects) then merge
// them onto the record list. Be careful that we have not decoded a single record
// that is an array, as this should simply be appended on to the end of the record
// list.

if ( is_array($decoded) && (is_array(current($decoded)) || is_object(current($decoded))) ) {
$this->recordList = array_merge($this->recordList, $decoded);
} else {
$this->recordList[] = $decoded;
Expand Down Expand Up @@ -146,6 +160,86 @@ protected function verifyData()

} // verifyData()

/** -----------------------------------------------------------------------------------------
* @see aStructuredFile::discoverRecordFieldNames()
* ------------------------------------------------------------------------------------------
*/

protected function discoverRecordFieldNames()
{
// If there are no records in the file then we don't need to set the discovered
// field names.

if ( 0 == count($this->recordList) ) {
return;
}

// Determine the record names based on the structure of the JSON that we are
// parsing.

reset($this->recordList);
$record = current($this->recordList);

if ( is_array($record) ) {

if ( $this->hasHeaderRecord ) {

// If we have a header record skip the first record and use its values as
// the field names

$this->discoveredRecordFieldNames = array_shift($this->recordList);

} elseif ( 0 !== count($this->requestedRecordFieldNames) ) {

// If there is no header record and the requested field names have been
// provided, use them as the discovered field names. If a subsequent
// record contains fewer fields return NULL values for those fields, if a
// subsequent record contains more fields ignore them.

$this->discoveredRecordFieldNames = $this->requestedRecordFieldNames;

} else {
$this->logAndThrowException("Record field names must be specified for JSON array records");
}

} elseif ( is_object($record) ) {

// Pull the record field names from the object keys

$this->discoveredRecordFieldNames = array_keys(get_object_vars($record));

} else {
$this->logAndThrowException(
sprintf("Unsupported record type in %s. Got %s, expected array or object", $this->path, gettype($record))
);
}

// If no field names were requested, return all discovered fields

if ( 0 == count($this->requestedRecordFieldNames) ) {
$this->requestedRecordFieldNames = $this->discoveredRecordFieldNames;
}

} // setRecordFieldNames()

/** -----------------------------------------------------------------------------------------
* @see aStructuredFile::createReturnRecord()
* ------------------------------------------------------------------------------------------
*/

protected function createReturnRecord($record)
{
$arrayRecord = parent::createReturnRecord($record);

// If the original record is a stdClass object, be sure to maintain its type.

if ( is_object($record) ) {
return (object) $arrayRecord;
} else {
return $arrayRecord;
}
} // createReturnRecord()

/** -----------------------------------------------------------------------------------------
* Implementation of json_last_error_msg() for pre PHP 5.5 systems.
*
Expand Down
Loading