v6.0.0
SetEndpointType
can be utilized to configure the KSqlDbProvider
to either use /query
or /query-stream
endpoint.
The deprecation of the /query
endpoint was proposed as part of KLIP-15 in favor of the new HTTP/2-based /query-stream.
However, the deprecation itself has not been scheduled yet.
using ksqlDB.RestApi.Client.KSql.Query.Context.Options;
using ksqlDB.RestApi.Client.KSql.Query.Options;
new KSqlDbContextOptionsBuilder()
.UseKSqlDb(url)
.SetEndpointType(EndpointType.Query)
.Options;
The preceding example is analogous to this one:
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.Query.Options;
var contextOptions = new KSqlDBContextOptions(ksqlDbUrl)
{
EndpointType = EndpointType.Query
};
QuertStream
isn't accessible in the netstandard version because it lacks support for HTTP/2.
SetEndpointType.QueryStream
is set by default.
The function Configure
is an extension method for IServiceCollection
whose purpose is to set up a ksqlDB
context.
The code provided here is for integrating custom or third-party HttpMessageHandlers within your application.
The method ConfigurePrimaryHttpMessageHandler
accepts a lambda expression which is used to configure the primary HttpMessageHandler
.
In the provided code, a new HttpClientHandler
is created, a client certificate is added to it, and then this HttpClientHandler
is returned from the lambda.
using System;
using System.Threading.Tasks;
using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.RestApi.Http;
using Microsoft.Extensions.DependencyInjection;
private static void Configure(this IServiceCollection serviceCollection, string ksqlDbUrl)
{
serviceCollection.AddDbContext<IKSqlDBContext, KSqlDBContext>(c =>
{
c.UseKSqlDb(ksqlDbUrl);
c.ReplaceHttpClient<IHttpClientFactory, HttpClientFactory>(_ => {})
.ConfigurePrimaryHttpMessageHandler(sp =>
{
X509Certificate2 clientCertificate = CreateClientCertificate();
var httpClientHandler = new HttpClientHandler
{
ClientCertificateOptions = ClientCertificateOption.Manual
};
httpClientHandler.ClientCertificates.Add(clientCertificate);
return httpClientHandler;
})
.AddHttpMessageHandler(_ => new DebugHandler());
});
}
This DebugHandler
class is a custom HttpMessageHandler
that logs the request URI to the debug output before delegating the request processing to the next handler in the pipeline.
internal class DebugHandler : System.Net.Http.DelegatingHandler
{
protected override Task<System.Net.Http.HttpResponseMessage> SendAsync(
System.Net.Http.HttpRequestMessage request, CancellationToken cancellationToken)
{
System.Diagnostics.Debug.WriteLine($"Process request: {request.RequestUri}");
return base.SendAsync(request, cancellationToken);
}
}
During application startup, the services required by the IKSqlDBContext
and IKSqlDbRestApiClient
can be registered for dependency injection. This allows components that need these services to receive them through constructor parameters.
using ksqlDb.RestApi.Client.DependencyInjection;
serviceCollection.ConfigureKSqlDb(ksqlDbUrl);
The ConfigureKSqlDb
extension method, as demonstrated below, takes care of registering ksqldDB-related services in the dependency injection container on your behalf:
using ksqlDB.RestApi.Client.KSql.RestApi;
using Microsoft.Extensions.DependencyInjection;
var ksqlDbUrl = @"http://localhost:8088";
var serviceCollection = new ServiceCollection();
serviceCollection.AddHttpClient<ksqlDB.RestApi.Client.KSql.RestApi.Http.IHttpClientFactory, ksqlDB.RestApi.Client.KSql.RestApi.Http.HttpClientFactory>(httpClient =>
{
httpClient.BaseAddress = new Uri(ksqlDbUrl);
});
serviceCollection.AddScoped<IKSqlDbRestApiClient, KSqlDbRestApiClient>();
var provider = serviceCollection.BuildServiceProvider();
var restApiClient = provider.GetRequiredService<IKSqlDbRestApiClient>();
In your client application, you can include a Bearer token in the request headers when interacting with the ksqlDB
server. This can typically be done by adding an "Authorization" header with the value "Bearer <token>"
.
using System.Net.Http.Headers;
using ksqlDb.RestApi.Client.DependencyInjection;
using ksqlDB.RestApi.Client.KSql.Query.Context;
using ksqlDB.RestApi.Client.KSql.RestApi.Http;
using Microsoft.Extensions.DependencyInjection;
using IHttpClientFactory = ksqlDB.RestApi.Client.KSql.RestApi.Http.IHttpClientFactory;
namespace ksqlDB.Api.Client.Samples;
public static class KSqlDDbServiceCollectionExtensions
{
public static void Configure(this IServiceCollection services, string ksqlDbUrl)
{
services.AddDbContext<IKSqlDBContext, KSqlDBContext>(c =>
{
c.UseKSqlDb(ksqlDbUrl);
c.ReplaceHttpClient<IHttpClientFactory, HttpClientFactory>(_ => { })
.AddHttpMessageHandler(_ => new BearerAuthHandler());
});
}
}
internal class BearerAuthHandler : DelegatingHandler
{
public BearerAuthHandler()
{
InnerHandler = new HttpClientHandler();
}
protected override Task<HttpResponseMessage> SendAsync(
HttpRequestMessage request, System.Threading.CancellationToken cancellationToken)
{
var token = "xoidiag"; //CreateToken();
request.Headers.Authorization = new AuthenticationHeaderValue("bearer", token);
return base.SendAsync(request, cancellationToken);
}
}
In .NET, it's important to properly dispose of HttpClient
instances to release underlying resources and avoid potential issues with resource exhaustion.
KSqlDBContextOptions
and KSqlDbRestApiClient
- DisposeHttpClient
property is by default set to false
. From v2.0.0 the used HttpClients
will not be disposed by default.
It is possible to override the aforementioned behavior in the following ways, although it is not recommended:
var contextOptions = new KSqlDBContextOptions(ksqlDbUrl)
{
DisposeHttpClient = true
};
var httpClient = new HttpClient
{
BaseAddress = new Uri(ksqlDbUrl)
};
var httpClientFactory = new HttpClientFactory(httpClient);
var kSqlDbRestApiClient = new KSqlDbRestApiClient(httpClientFactory)
{
DisposeHttpClient = true
};
The recommended approach is to create a single instance of HttpClient
and reuse it throughout the lifespan of an application.
To obtain an instance of HttpClient
using IHttpClientFactory
from the ServicesCollection
in .NET for IKSqlDbRestApiClient
and IKSqlDBContext
, you can follow the steps in this section.
v1.4.0
- KSqlDbContextOptionsBuilder and KSqlDbContextOptions
SetJsonSerializerOptions
- a way to configure the JsonSerializerOptions for the materialization of the incoming values.
With System.Text.Json
source generators, you can automatically generate serialization and deserialization code for JSON models, eliminating the need for manual code writing and reducing boilerplate code. This feature improves performance and reduces maintenance efforts when working with JSON data.
For better performance you can use the new System.Text.Json
source generator in this way:
var contextOptions = new KSqlDbContextOptionsBuilder()
.UseKSqlDb(ksqlDbUrl)
.SetJsonSerializerOptions(c =>
{
c.Converters.Add(new CustomJsonConverter());
jsonOptions.AddContext<SourceGenerationContext>();
}).Options;
//or
contextOptions = new KSqlDBContextOptions(ksqlDbUrl)
.SetJsonSerializerOptions(serializerOptions =>
{
serializerOptions.Converters.Add(new CustomJsonConverter());
jsonOptions.AddContext<SourceGenerationContext>();
});
using System.Text.Json.Serialization;
using ksqlDB.Api.Client.Samples.Models.Movies;
[JsonSourceGenerationOptions(WriteIndented = true)]
[JsonSerializable(typeof(Movie))]
internal partial class SourceGenerationContext : JsonSerializerContext
{
}
v1.0.0
In ksqlDB
, processing guarantees refer to the level of reliability and consistency provided by the system when processing and handling streaming data.
ExactlyOnce - Records are processed once. To achieve a true exactly-once system, end consumers and producers must also implement exactly-once semantics.
AtLeastOnce - Records are never lost but may be redelivered.
For more info check exactly once semantics
public enum ProcessingGuarantee
{
ExactlyOnce,
ExactlyOnceV2,
AtLeastOnce
}