-
-
Notifications
You must be signed in to change notification settings - Fork 289
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
# Apache Pulsar Module | ||
|
||
Testcontainers can be used to automatically create [Apache Pulsar](https://pulsar.apache.org) containers without external services. | ||
|
||
It's based on the official Apache Pulsar docker image, it is recommended to read the [official guide](https://pulsar.apache.org/docs/next/getting-started-docker/). | ||
|
||
The following example uses the following NuGet packages: | ||
|
||
```console title="Install the NuGet dependencies" | ||
dotnet add package Testcontainers.Pulsar | ||
dotnet add package DotPulsar | ||
dotnet add package xunit | ||
``` | ||
IDEs and editors may also require the following packages to run tests: `xunit.runner.visualstudio` and `Microsoft.NET.Test.Sdk`. | ||
|
||
Copy and paste the following code into a new `.cs` test file within an existing test project. | ||
|
||
```csharp | ||
using System.Collections.Generic; | ||
using System.Threading; | ||
using DotPulsar; | ||
using DotPulsar.Abstractions; | ||
using DotPulsar.Extensions; | ||
using Xunit.Abstractions; | ||
|
||
namespace Testcontainers.Pulsar.Tests; | ||
|
||
public sealed class PulsarContainerTest : IAsyncLifetime | ||
{ | ||
private readonly CancellationTokenSource _cts; | ||
private readonly PulsarContainer _pulsarContainer; | ||
private readonly ITestOutputHelper _testOutputHelper; | ||
|
||
public PulsarContainerTest(ITestOutputHelper testOutputHelper) | ||
{ | ||
_testOutputHelper = testOutputHelper; | ||
_pulsarContainer = new PulsarBuilder().Build(); | ||
_cts = new CancellationTokenSource(TimeSpan.FromMinutes(1)); | ||
} | ||
|
||
public Task InitializeAsync() | ||
{ | ||
return _pulsarContainer.StartAsync(); | ||
} | ||
|
||
public Task DisposeAsync() | ||
{ | ||
return _pulsarContainer.DisposeAsync().AsTask(); | ||
} | ||
|
||
[Fact] | ||
[Trait(nameof(DockerCli.DockerPlatform), nameof(DockerCli.DockerPlatform.Linux))] | ||
public async Task PulsarContainer_WhenBrokerIsStarted_ShouldConnect() | ||
{ | ||
// Given | ||
await using var client = CreateClient(); | ||
var expected = new List<MessageId> { MessageId.Earliest }; | ||
await using var reader = CreateReader(client, MessageId.Earliest, await CreateTopic(_cts.Token)); | ||
|
||
// When | ||
var actual = await reader.GetLastMessageIds(_cts.Token); | ||
|
||
// Then | ||
Assert.Equal(expected,actual); | ||
} | ||
|
||
private IReader<string> CreateReader(IPulsarClient pulsarClient, MessageId messageId, string topicName) | ||
=> pulsarClient.NewReader(Schema.String) | ||
.StartMessageId(messageId) | ||
.Topic(topicName) | ||
.Create(); | ||
|
||
private static string CreateTopicName() => $"persistent://public/default/{Guid.NewGuid():N}"; | ||
|
||
private async Task CreateTopic(string topic, CancellationToken cancellationToken) | ||
{ | ||
var arguments = $"bin/pulsar-admin topics create {topic}"; | ||
|
||
var result = await _pulsarContainer.ExecAsync(new[] { "/bin/bash", "-c", arguments }, cancellationToken); | ||
|
||
if (result.ExitCode != 0) | ||
throw new Exception($"Could not create the topic: {result.Stderr}"); | ||
} | ||
|
||
private async Task<string> CreateTopic(CancellationToken cancellationToken) | ||
{ | ||
var topic = CreateTopicName(); | ||
await CreateTopic(topic, cancellationToken); | ||
return topic; | ||
} | ||
|
||
private IPulsarClient CreateClient() | ||
=> PulsarClient | ||
.Builder() | ||
.ExceptionHandler(context => _testOutputHelper.WriteLine($"PulsarClient got an exception: {context.Exception}")) | ||
.ServiceUrl(new Uri(_pulsarContainer.GetPulsarBrokerUrl())) | ||
.Build(); | ||
} | ||
``` | ||
|
||
To execute the test, use the command `dotnet test` from a terminal. | ||
|
||
## Builder | ||
|
||
### Token authentication | ||
If you need to use token authentication use the follow with method in the builder | ||
```csharp | ||
PulsarBuilder().WithTokenAuthentication().Build(); | ||
``` | ||
|
||
and get the token by using | ||
```csharp | ||
var token = await _pulsarContainer.CreateToken(Timeout.InfiniteTimeSpan); | ||
``` | ||
|
||
#### Pulsar Functions | ||
If you need to use Pulsar Functions use the follow with method in the builder | ||
```csharp | ||
PulsarBuilder().WithFunctions().Build(); | ||
``` | ||
## Access Pulsar | ||
To get the the Pulsar broker url. | ||
```csharp | ||
string pulsarBrokerUrl = _pulsarContainer.GetPulsarBrokerUrl(); | ||
``` | ||
|
||
To get the the Pulsar service url. | ||
```csharp | ||
string pulsarBrokerUrl = _pulsarContainer.GetHttpServiceUrl(); | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
root = true |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,113 @@ | ||
using System.Text; | ||
using Testcontainers.Pulsar; | ||
|
||
/// <inheritdoc cref="ContainerBuilder{TBuilderEntity, TContainerEntity, TConfigurationEntity}" /> | ||
[PublicAPI] | ||
public sealed class PulsarBuilder : ContainerBuilder<PulsarBuilder, PulsarContainer, PulsarConfiguration> | ||
{ | ||
private const string AuthenticationPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationToken"; | ||
private const string SecretKeyPath = "/pulsar/secret.key"; | ||
private const string UserName = "test-user"; | ||
private const string PulsarImage = "apachepulsar/pulsar:3.0.2"; | ||
private const string AdminClustersEndpoint = "/admin/v2/clusters"; | ||
internal const string Enabled = "Enabled"; | ||
|
||
private Dictionary<string, string> _environmentVariables = new Dictionary<string, string> | ||
{ | ||
{ "PULSAR_PREFIX_tokenSecretKey", $"file://{SecretKeyPath}" }, | ||
{ "PULSAR_PREFIX_authenticationRefreshCheckSeconds", "5" }, | ||
{ "superUserRoles", UserName }, | ||
{ "authenticationEnabled", "true" }, | ||
{ "authorizationEnabled", "true" }, | ||
{ "authenticationProviders", "org.apache.pulsar.broker.authentication.AuthenticationProviderToken" }, | ||
{ "authenticateOriginalAuthData", "false" }, | ||
{ "brokerClientAuthenticationPlugin", AuthenticationPlugin }, | ||
{ "CLIENT_PREFIX_authPlugin", AuthenticationPlugin } | ||
}; | ||
|
||
public const ushort PulsarBrokerPort = 6650; | ||
public const ushort PulsarBrokerHttpPort = 8080; | ||
|
||
/// <summary> | ||
/// Initializes a new instance of the <see cref="PulsarBuilder" /> class. | ||
/// </summary> | ||
public PulsarBuilder() | ||
: this(new PulsarConfiguration()) | ||
{ | ||
DockerResourceConfiguration = Init().DockerResourceConfiguration; | ||
} | ||
|
||
/// <summary> | ||
/// Initializes a new instance of the <see cref="PulsarBuilder" /> class. | ||
/// </summary> | ||
/// <param name="resourceConfiguration">The Docker resource configuration.</param> | ||
private PulsarBuilder(PulsarConfiguration resourceConfiguration) | ||
: base(resourceConfiguration) | ||
{ | ||
DockerResourceConfiguration = resourceConfiguration; | ||
} | ||
|
||
/// <inheritdoc /> | ||
protected override PulsarConfiguration DockerResourceConfiguration { get; } | ||
|
||
/// <inheritdoc /> | ||
public override PulsarContainer Build() | ||
{ | ||
Validate(); | ||
var pulsarStartupCommands = String.Empty; | ||
if (DockerResourceConfiguration.Authentication == Enabled) | ||
{ | ||
pulsarStartupCommands = $"bin/pulsar tokens create-secret-key --output {SecretKeyPath} && " + | ||
$"export brokerClientAuthenticationParameters=token:$(bin/pulsar tokens create --secret-key {SecretKeyPath} --subject {UserName}) && " + | ||
$"export CLIENT_PREFIX_authParams=$brokerClientAuthenticationParameters && bin/apply-config-from-env.py conf/standalone.conf && " + | ||
$"bin/apply-config-from-env-with-prefix.py CLIENT_PREFIX_ conf/client.conf && "; | ||
} | ||
pulsarStartupCommands += "bin/pulsar standalone"; | ||
|
||
if (DockerResourceConfiguration.Functions != Enabled) | ||
pulsarStartupCommands += " --no-functions-worker"; | ||
|
||
var pulsarBuilder = WithCommand("/bin/bash", "-c",pulsarStartupCommands); | ||
return new PulsarContainer(pulsarBuilder.DockerResourceConfiguration, TestcontainersSettings.Logger); | ||
} | ||
|
||
/// <inheritdoc /> | ||
protected override PulsarBuilder Init() | ||
{ | ||
return base.Init() | ||
.WithImage(PulsarImage) | ||
.WithPortBinding(PulsarBrokerPort, true) | ||
.WithPortBinding(PulsarBrokerHttpPort, true) | ||
.WithWaitStrategy(Wait.ForUnixContainer() | ||
.UntilCommandIsCompleted(["/bin/bash", "-c", "bin/pulsar-admin clusters list"])); | ||
} | ||
|
||
public PulsarBuilder WithTokenAuthentication() | ||
{ | ||
return Merge(DockerResourceConfiguration, new PulsarConfiguration(authentication: Enabled)) | ||
.WithEnvironment(_environmentVariables); | ||
} | ||
|
||
public PulsarBuilder WithFunctions() | ||
{ | ||
return Merge(DockerResourceConfiguration, new PulsarConfiguration(functions: Enabled)); | ||
} | ||
|
||
/// <inheritdoc /> | ||
protected override PulsarBuilder Clone(IResourceConfiguration<CreateContainerParameters> resourceConfiguration) | ||
{ | ||
return Merge(DockerResourceConfiguration, new PulsarConfiguration(resourceConfiguration)); | ||
} | ||
|
||
/// <inheritdoc /> | ||
protected override PulsarBuilder Clone(IContainerConfiguration resourceConfiguration) | ||
{ | ||
return Merge(DockerResourceConfiguration, new PulsarConfiguration(resourceConfiguration)); | ||
} | ||
|
||
/// <inheritdoc /> | ||
protected override PulsarBuilder Merge(PulsarConfiguration oldValue, PulsarConfiguration newValue) | ||
{ | ||
return new PulsarBuilder(new PulsarConfiguration(oldValue, newValue)); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
namespace Testcontainers.Pulsar; | ||
|
||
/// <inheritdoc cref="ContainerConfiguration" /> | ||
[PublicAPI] | ||
public sealed class PulsarConfiguration : ContainerConfiguration | ||
{ | ||
/// <summary> | ||
/// Initializes a new instance of the <see cref="PulsarConfiguration" /> class. | ||
/// </summary> | ||
public PulsarConfiguration(string authentication = null, | ||
Check warning on line 10 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (ubuntu-22.04)
Check warning on line 10 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (ubuntu-22.04)
Check warning on line 10 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (windows-2022)
|
||
string functions = null) | ||
Check warning on line 11 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (ubuntu-22.04)
Check warning on line 11 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (ubuntu-22.04)
Check warning on line 11 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (windows-2022)
|
||
{ | ||
Authentication = authentication; | ||
Functions = functions; | ||
} | ||
|
||
/// <summary> | ||
/// Initializes a new instance of the <see cref="PulsarConfiguration" /> class. | ||
/// </summary> | ||
/// <param name="resourceConfiguration">The Docker resource configuration.</param> | ||
public PulsarConfiguration(IResourceConfiguration<CreateContainerParameters> resourceConfiguration) | ||
Check warning on line 21 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (ubuntu-22.04)
Check warning on line 21 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (ubuntu-22.04)
Check warning on line 21 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (ubuntu-22.04)
Check warning on line 21 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (ubuntu-22.04)
Check warning on line 21 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (windows-2022)
Check warning on line 21 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (windows-2022)
Check warning on line 21 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (windows-2022)
|
||
: base(resourceConfiguration) | ||
{ | ||
// Passes the configuration upwards to the base implementations to create an updated immutable copy. | ||
} | ||
|
||
/// <summary> | ||
/// Initializes a new instance of the <see cref="PulsarConfiguration" /> class. | ||
/// </summary> | ||
/// <param name="resourceConfiguration">The Docker resource configuration.</param> | ||
public PulsarConfiguration(IContainerConfiguration resourceConfiguration) | ||
Check warning on line 31 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (ubuntu-22.04)
Check warning on line 31 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (ubuntu-22.04)
Check warning on line 31 in src/Testcontainers.Pulsar/PulsarConfiguration.cs GitHub Actions / build (windows-2022)
|
||
: base(resourceConfiguration) | ||
{ | ||
// Passes the configuration upwards to the base implementations to create an updated immutable copy. | ||
} | ||
|
||
/// <summary> | ||
/// Initializes a new instance of the <see cref="PulsarConfiguration" /> class. | ||
/// </summary> | ||
/// <param name="resourceConfiguration">The Docker resource configuration.</param> | ||
public PulsarConfiguration(PulsarConfiguration resourceConfiguration) | ||
: this(new PulsarConfiguration(), resourceConfiguration) | ||
{ | ||
// Passes the configuration upwards to the base implementations to create an updated immutable copy. | ||
} | ||
|
||
/// <summary> | ||
/// Initializes a new instance of the <see cref="PulsarConfiguration" /> class. | ||
/// </summary> | ||
/// <param name="oldValue">The old Docker resource configuration.</param> | ||
/// <param name="newValue">The new Docker resource configuration.</param> | ||
public PulsarConfiguration(PulsarConfiguration oldValue, PulsarConfiguration newValue) | ||
: base(oldValue, newValue) | ||
{ | ||
Authentication = BuildConfiguration.Combine(oldValue.Authentication, newValue.Authentication); | ||
Functions = BuildConfiguration.Combine(oldValue.Functions, newValue.Functions); | ||
} | ||
|
||
/// <summary> | ||
/// Gets authentication. | ||
/// </summary> | ||
public string Authentication { get; } | ||
|
||
/// <summary> | ||
/// Gets functions. | ||
/// </summary> | ||
public string Functions { get; } | ||
} |