Skip to content

Commit

Permalink
Merge a074915 into 8796c8a
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Aug 16, 2024
2 parents 8796c8a + a074915 commit 428074b
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
40 changes: 40 additions & 0 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2130,6 +2130,46 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
}
}

Y_UNIT_TEST(TestReadEmptyFileWithCsvFormat) {
const TString externalDataSourceName = "/Root/external_data_source";
const TString bucket = "test_bucket1";

CreateBucketWithObject(bucket, "test_object", "");

auto kikimr = NTestUtils::MakeKikimrRunner();

auto tc = kikimr->GetTableClient();
auto session = tc.CreateSession().GetValueSync().GetSession();
const TString query = fmt::format(R"(
CREATE EXTERNAL DATA SOURCE `{external_source}` WITH (
SOURCE_TYPE="ObjectStorage",
LOCATION="{location}",
AUTH_METHOD="NONE"
);)",
"external_source"_a = externalDataSourceName,
"location"_a = GetBucketLocation(bucket)
);
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

const TString sql = fmt::format(R"(
SELECT * FROM `{external_source}`.`/`
WITH (
SCHEMA = (
data String
),
FORMAT = "csv_with_names"
)
)", "external_source"_a=externalDataSourceName);

auto db = kikimr->GetQueryClient();
auto scriptExecutionOperation = db.ExecuteScript(sql).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL_C(readyOp.Metadata().ExecStatus, EExecStatus::Completed, readyOp.Status().GetIssues().ToString());
}
}

} // namespace NKikimr::NKqp
5 changes: 5 additions & 0 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,11 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
)
);
if (buffer->eof()) {
LOG_CORO_D("RunClickHouseParserOverHttp - SKIP EMPTY FILE");
return;
}
while (NDB::Block batch = stream->read()) {
Paused = SourceContext->Add(batch.bytes(), SelfActorId);
const bool isCancelled = StopIfConsumedEnough(batch.rows());
Expand Down

0 comments on commit 428074b

Please sign in to comment.