Skip to content

Commit

Permalink
Rewrite of StructuredFile data endpoint to support Euca ingest (#145)
Browse files Browse the repository at this point in the history
* Update commit reference for xdmod-test-artifacts
* Add functions for verifying types of stdClass properties
* Rename specs_dir -> schema_dir
* Rewrite StructuredFile data endpoint to allow record separators and data filtering by external processes
* Add tests for File and StructuredFile data endpoints
* When verifying tables select a non-nullable column for comparing LEFT OUTER JOIN results to reduce chance of false positives
* Return distinct negative values from compare() to facilitate debugging
* Change StructuredFile filter definition to an array. Update tests accordingly.
  • Loading branch information
smgallo authored May 25, 2017
1 parent 3ab5059 commit 66ba63a
Show file tree
Hide file tree
Showing 19 changed files with 1,703 additions and 247 deletions.
121 changes: 73 additions & 48 deletions classes/ETL/DataEndpoint/File.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace ETL\DataEndpoint;

use ETL\DataEndpoint\DataEndpointOptions;
use \Log;
use Log;

class File extends aDataEndpoint implements iDataEndpoint
{
Expand All @@ -16,10 +16,7 @@ class File extends aDataEndpoint implements iDataEndpoint
protected $path = null;

// File mode. See http://php.net/manual/en/function.fopen.php
protected $mode = null;

// The default directory for files that do not specify a full path
protected $data_dir = null;
protected $mode = 'r';

/* ------------------------------------------------------------------------------------------
* @see iDataEndpoint::__construct()
Expand All @@ -33,83 +30,99 @@ public function __construct(DataEndpointOptions $options, Log $logger = null)
$requiredKeys = array("path");
$this->verifyRequiredConfigKeys($requiredKeys, $options);

// Default to read-only mode
$this->mode = ( null === $options->mode || "" == $options->mode ? "r" : $options->mode );
$messages = array();
$propertyTypes = array(
'path' => 'string',
'mode' => 'string'
);

if ( ! \xd_utilities\verify_object_property_types($options, $propertyTypes, $messages, true) ) {
$this->logAndThrowException("Error verifying options: " . implode(", ", $messages));
}

if ( isset($options->mode) ) {
$this->mode = $options->mode;
}

$this->path = $options->path;

$this->key = md5(implode($this->keySeparator, array($this->type, $this->path, $this->mode)));

$this->path = $options->applyBasePath("paths->data_dir", $options->path);
if ( isset($options->paths->data_dir) ) {
$this->path = \xd_utilities\qualify_path($options->path, $options->paths->data_dir);
}

} // __construct()

/* ------------------------------------------------------------------------------------------
* Set the value of the path for this endpoint.
*
* @param $path The path to the file for this endpoint
*
* @return This object to support method chaining
* @see iFile::getPath()
* ------------------------------------------------------------------------------------------
*/

public function setPath($path)
public function getPath()
{
if ( ! is_string($path) ) {
$msg = "Path must be a string";
$this->logAndThrowException($msg);
}

return $this;

} // setPath()
return $this->path;
} // getPath()

/* ------------------------------------------------------------------------------------------
* @return The path to this file
* @see iFile::getMode()
* ------------------------------------------------------------------------------------------
*/

public function getPath()
public function getMode()
{
return $this->path;
} // getPath()


return $this->mode;
} // getMode()

/* ------------------------------------------------------------------------------------------
* @see aDataEndpoint::connect()
* @see iDataEndpoint::connect()
* ------------------------------------------------------------------------------------------
*/

public function connect()
{
// The first time a connection is made the endpoint handle should be set.

$this->handle = @fopen($this->path, $this->mode);
if ( false === $this->handle ) {
$error = error_get_last();
$msg = "Error opening file '{$this->path}': " . $error['message'];
$this->logAndThrowException($msg);
if ( null === $this->handle ) {

$this->handle = @fopen($this->path, $this->mode);

if ( false === $this->handle ) {
$this->handle = null;
$error = error_get_last();
$this->logAndThrowException(
sprintf("Error opening file '%s': %s", $this->path, $error['message'])
);
}

}

return $this->handle;

} // connect()

/* ------------------------------------------------------------------------------------------
* @see aDataEndpoint::disconnect()
* @see iDataEndpoint::disconnect()
* ------------------------------------------------------------------------------------------
*/

public function disconnect()
{
if ( null === $this->handle ) {
return true;
}
if ( null !== $this->handle ) {

if ( false === @fclose($this->handle) ) {
$error = error_get_last();
$msg = "Error closing file '{$this->path}': " . $error['message'];
$this->logAndThrowException($msg);
}
if ( false === @fclose($this->handle) ) {
$error = error_get_last();
$msg = "Error closing file '{$this->path}': " . $error['message'];
$this->logAndThrowException(
sprintf("Error closing file '%s': %s", $this->path, $error['message'])
);
}

$this->handle = null;
$this->handle = null;

}

return true;

Expand All @@ -126,25 +139,37 @@ public function verify($dryrun = false, $leaveConnected = false)
$writeModes = array("r+", "w", "w+", "a", "a+", "x", "x+", "c", "c+");

if ( ! is_string($this->path) ) {
$msg = "Path '" . $this->path . "' is not a string";
$this->logAndThrowException($msg);
$this->logAndThrowException("Path '" . $this->path . "' is not a string");
}

if ( ! in_array($this->mode, array_merge($readModes, $writeModes)) ) {
$msg = "Unsupported mode '{$this->mode}'";
$this->logAndThrowException($msg);
$this->logAndThrowException("Unsupported mode '" . $this->mode . "'");
}

if ( ! is_file($this->path) ) {
$this->logAndThrowException("Path '" . $this->path . "' is not a file");
}

if ( in_array($this->mode, $readModes) && ! is_readable($this->path) ) {
$msg = "File '{$this->path}' is not readable";
$this->logAndThrowException($msg);
$this->logAndThrowException("Path '" . $this->path . "' is not readable");
}

$fh = $this->connect();

if ( ! $leaveConnected ) {
$this->disconnect();
}

return true;
} // verify()

/* ------------------------------------------------------------------------------------------
* @see iDataEndpoint::__toString()
* ------------------------------------------------------------------------------------------
*/

public function __toString()
{
return sprintf('%s (class=%s, path=%s)', $this->name, get_class($this), $this->path);
} // __toString()
} // class File
Loading

0 comments on commit 66ba63a

Please sign in to comment.