Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Command Timeout is not being honored in PostgreSQL and MySQL #490 #491

Merged
merged 2 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion Tortuga.Chain/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ public class SalesFigures
[GroupByColumn]
public string EmployeeName { get; set; }
}

```

To use this feature, you need use either of these patterns:
Expand All @@ -57,6 +56,42 @@ datasource.FromTable<TObject>(filter).ToAggregate().ToCollection().Execute();

In the second version, the table or view name is extracted from the class.


[#92 ToObjectStream](https://github.com/TortugaResearch/Tortuga.Chain/issues/92)

Previously, Chain would fully manage database connections by default. Specifically, it would open and close connections automatically unless a transaction was involved. In that case, the developer only needed to manage the transactional data source itself.

However, there are times when a result set is too large to handle at one time. In this case the developer will want an `IEnumerable` or `IAsyncEnumerable` instead of a collection. To support this, the `ToObjectStream` materializer was created.

When used in place of `ToCollection`, the caller gets a `ObjectStream` object. This object implements `IEnumerable<TObject>`, `IDisposable`, `IAsyncDisposable`, abd `IAsyncEnumerable<TObject>`. (That latter two are only available in .NET 6 or later.)

This object stream may be used directly, as shown below, or attached to an RX Observable or TPL Dataflow just like any other enumerable data structure.

```csharp
//sync pattern

using var objectStream = dataSource.From<Employee>(new { Title = uniqueKey }).ToObjectStream<Employee>().Execute();
foreach (var item in objectStream)
{
Assert.AreEqual(uniqueKey, item.Title);
}

//async pattern

await using var objectStream = await dataSource.From<Employee>(new { Title = uniqueKey }).ToObjectStream<Employee>().ExecuteAsync();
await foreach (var item in objectStream)
{
Assert.AreEqual(uniqueKey, item.Title);
}
```
It is vital that the object stream is disposed after use. If that doesn't occur, the database can suffer from thread exhaustion or deadlocks.

### Bugs

[#490 Command Timeout is not being honored in PostgreSQL and MySQL](https://github.com/TortugaResearch/Tortuga.Chain/issues/490)

See the ticket for an explaination for why this was broken.

### Technical Debt

[#488 Add IAsyncDisposable support](https://github.com/TortugaResearch/Tortuga.Chain/issues/488)
Expand Down
81 changes: 81 additions & 0 deletions Tortuga.Chain/Shared/Tests/Materializers/ObjectStreamTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
using Tests.Models;

namespace Tests.Materializers;

[TestClass]
public class ToObjectStreamTests : TestBase
{
[DataTestMethod, BasicData(DataSourceGroup.Primary)]
public void ToObjectStream(string dataSourceName, DataSourceType mode)
{
var dataSource = DataSource(dataSourceName, mode);

try
{
var uniqueKey = Guid.NewGuid().ToString();

var empA1 = new Employee() { FirstName = "A", LastName = "1", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empA1).ToObject<Employee>().Execute();

var empA2 = new Employee() { FirstName = "B", LastName = "2", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empA2).ToObject<Employee>().Execute();

var empB1 = new Employee() { FirstName = "B", LastName = "1", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empB1).ToObject<Employee>().Execute();

var empB2 = new Employee() { FirstName = "B", LastName = "2", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empB2).ToObject<Employee>().Execute();

var empB3 = new Employee() { FirstName = "B", LastName = "3", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empB3).ToObject<Employee>().Execute();

using var objectStream = dataSource.From<Employee>(new { Title = uniqueKey }).ToObjectStream<Employee>().Execute();
foreach (var item in objectStream)
{
Assert.AreEqual(uniqueKey, item.Title);
}
}
finally
{
Release(dataSource);
}
}

#if NET6_0_OR_GREATER
[DataTestMethod, BasicData(DataSourceGroup.Primary)]
public async Task ToObjectStreamAsync(string dataSourceName, DataSourceType mode)
{
var dataSource = DataSource(dataSourceName, mode);

try
{
var uniqueKey = Guid.NewGuid().ToString();

var empA1 = new Employee() { FirstName = "A", LastName = "1", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empA1).ToObject<Employee>().Execute();

var empA2 = new Employee() { FirstName = "B", LastName = "2", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empA2).ToObject<Employee>().Execute();

var empB1 = new Employee() { FirstName = "B", LastName = "1", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empB1).ToObject<Employee>().Execute();

var empB2 = new Employee() { FirstName = "B", LastName = "2", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empB2).ToObject<Employee>().Execute();

var empB3 = new Employee() { FirstName = "B", LastName = "3", Title = uniqueKey };
dataSource.Insert(EmployeeTableName, empB3).ToObject<Employee>().Execute();

await using var objectStream = await dataSource.From<Employee>(new { Title = uniqueKey }).ToObjectStream<Employee>().ExecuteAsync();
await foreach (var item in objectStream)
{
Assert.AreEqual(uniqueKey, item.Title);
}
}
finally
{
Release(dataSource);
}
}
#endif
}
134 changes: 132 additions & 2 deletions Tortuga.Chain/Tortuga.Chain.Access/Access/AccessOpenDataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,137 @@ internal AccessOpenDataSource(AccessDataSource dataSource, OleDbConnection conne
m_Transaction = transaction;
}

/// <summary>
/// Executes the stream.
/// </summary>
/// <param name="executionToken">The execution token.</param>
/// <param name="implementation">The implementation.</param>
/// <param name="state">The state.</param>
/// <returns>StreamingCommandCompletionToken.</returns>
public override StreamingCommandCompletionToken ExecuteStream(CommandExecutionToken<OleDbCommand, OleDbParameter> executionToken, StreamingCommandImplementation<OleDbCommand> implementation, object? state)
{
if (executionToken == null)
throw new ArgumentNullException(nameof(executionToken), $"{nameof(executionToken)} is null.");
if (implementation == null)
throw new ArgumentNullException(nameof(implementation), $"{nameof(implementation)} is null.");
var currentToken = executionToken as AccessCommandExecutionToken;
if (currentToken == null)
throw new ArgumentNullException(nameof(executionToken), "only AccessCommandExecutionToken is supported.");

var startTime = DateTimeOffset.Now;
OnExecutionStarted(executionToken, startTime, state);

try
{
OleDbCommand? cmdToReturn = null;

while (currentToken != null)
{
OnExecutionStarted(currentToken, startTime, state);
using (var cmd = new OleDbCommand())
{
cmd.Connection = m_Connection;
if (m_Transaction != null)
cmd.Transaction = m_Transaction;

currentToken.PopulateCommand(cmd, DefaultCommandTimeout);

if (currentToken.ExecutionMode == AccessCommandExecutionMode.Materializer)
{
implementation(cmd);
cmdToReturn = cmd;
}
else if (currentToken.ExecutionMode == AccessCommandExecutionMode.ExecuteScalarAndForward)
{
if (currentToken.ForwardResult == null)
throw new InvalidOperationException("currentToken.ExecutionMode is ExecuteScalarAndForward, but currentToken.ForwardResult is null.");

currentToken.ForwardResult(cmd.ExecuteScalar());
}
else
cmd.ExecuteNonQuery();
}
currentToken = currentToken.NextCommand;
}
return new StreamingCommandCompletionToken(this, executionToken, startTime, state, cmdToReturn, null);
}
catch (Exception ex)
{
OnExecutionError(executionToken, startTime, DateTimeOffset.Now, ex, state);
throw;
}
}

/// <summary>
/// Executes the stream asynchronous.
/// </summary>
/// <param name="executionToken">The execution token.</param>
/// <param name="implementation">The implementation.</param>
/// <param name="cancellationToken">The cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
/// <param name="state">The state.</param>
/// <returns>Task&lt;StreamingCommandCompletionToken&gt;.</returns>
public override async Task<StreamingCommandCompletionToken> ExecuteStreamAsync(CommandExecutionToken<OleDbCommand, OleDbParameter> executionToken, StreamingCommandImplementationAsync<OleDbCommand> implementation, CancellationToken cancellationToken, object? state)
{
if (executionToken == null)
throw new ArgumentNullException(nameof(executionToken), $"{nameof(executionToken)} is null.");
if (implementation == null)
throw new ArgumentNullException(nameof(implementation), $"{nameof(implementation)} is null.");
var currentToken = executionToken as AccessCommandExecutionToken;
if (currentToken == null)
throw new ArgumentNullException(nameof(executionToken), "only AccessCommandExecutionToken is supported.");

var startTime = DateTimeOffset.Now;
OnExecutionStarted(executionToken, startTime, state);

try
{
OleDbCommand? cmdToReturn = null;

while (currentToken != null)
{
OnExecutionStarted(currentToken, startTime, state);
using (var cmd = new OleDbCommand())
{
cmd.Connection = m_Connection;
if (m_Transaction != null)
cmd.Transaction = m_Transaction;
currentToken.PopulateCommand(cmd, DefaultCommandTimeout);

if (currentToken.ExecutionMode == AccessCommandExecutionMode.Materializer)
{
await implementation(cmd).ConfigureAwait(false);
cmdToReturn = cmd;
}
else if (currentToken.ExecutionMode == AccessCommandExecutionMode.ExecuteScalarAndForward)
{
if (currentToken.ForwardResult == null)
throw new InvalidOperationException("currentToken.ExecutionMode is ExecuteScalarAndForward, but currentToken.ForwardResult is null.");

currentToken.ForwardResult(await cmd.ExecuteScalarAsync().ConfigureAwait(false));
}
else
await cmd.ExecuteNonQueryAsync().ConfigureAwait(false);
}
currentToken = currentToken.NextCommand;
}
return new StreamingCommandCompletionToken(this, executionToken, startTime, state, cmdToReturn, null);
}
catch (Exception ex)
{
if (cancellationToken.IsCancellationRequested) //convert AccessException into a OperationCanceledException
{
var ex2 = new OperationCanceledException("Operation was canceled.", ex, cancellationToken);
OnExecutionError(executionToken, startTime, DateTimeOffset.Now, ex2, state);
throw ex2;
}
else
{
OnExecutionError(executionToken, startTime, DateTimeOffset.Now, ex, state);
throw;
}
}
}

/// <summary>
/// Executes the specified operation.
/// </summary>
Expand Down Expand Up @@ -123,7 +254,6 @@ internal AccessOpenDataSource(AccessDataSource dataSource, OleDbConnection conne
/// <param name="cancellationToken">The cancellation token.</param>
/// <param name="state">User supplied state.</param>
/// <returns>Task.</returns>
/// <exception cref="NotImplementedException"></exception>
protected override async Task<int?> ExecuteAsync(CommandExecutionToken<OleDbCommand, OleDbParameter> executionToken, CommandImplementationAsync<OleDbCommand> implementation, CancellationToken cancellationToken, object? state)
{
if (executionToken == null)
Expand Down Expand Up @@ -233,4 +363,4 @@ private partial AccessOpenDataSource OnOverride(IEnumerable<AuditRule>? addition

return this;
}
}
}
Loading