diff --git a/Directory.Packages.props b/Directory.Packages.props
index a226bec96..d37617bbf 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -35,6 +35,7 @@
+
diff --git a/Testcontainers.sln b/Testcontainers.sln
index 9595905ed..e8c10a811 100644
--- a/Testcontainers.sln
+++ b/Testcontainers.sln
@@ -85,6 +85,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.PostgreSql",
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.PubSub", "src\Testcontainers.PubSub\Testcontainers.PubSub.csproj", "{E6642255-667D-476B-B584-089AA5E6C0B1}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.Pulsar", "src\Testcontainers.Pulsar\Testcontainers.Pulsar.csproj", "{27D46863-65B9-4934-B3C8-2383B217A477}"
+EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.RabbitMq", "src\Testcontainers.RabbitMq\Testcontainers.RabbitMq.csproj", "{A6D480BC-FDE8-4B92-A2A6-FF16BEE486AE}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.RavenDb", "src\Testcontainers.RavenDb\Testcontainers.RavenDb.csproj", "{F6394475-D6F1-46E2-81BF-4BA78A40B878}"
@@ -179,6 +181,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.PostgreSql.T
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.PubSub.Tests", "tests\Testcontainers.PubSub.Tests\Testcontainers.PubSub.Tests.csproj", "{0F86BCE8-62E1-4BFC-AA84-63C7514C90AC}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.Pulsar.Tests", "tests\Testcontainers.Pulsar.Tests\Testcontainers.Pulsar.Tests.csproj", "{D05FCB31-793E-43E0-BD6C-077013AE9113}"
+EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.RabbitMq.Tests", "tests\Testcontainers.RabbitMq.Tests\Testcontainers.RabbitMq.Tests.csproj", "{19564567-1736-4626-B406-17E4E02F18B2}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Testcontainers.RavenDb.Tests", "tests\Testcontainers.RavenDb.Tests\Testcontainers.RavenDb.Tests.csproj", "{D53726B6-5447-47E6-B881-A44EFF6E5534}"
@@ -348,6 +352,10 @@ Global
{E6642255-667D-476B-B584-089AA5E6C0B1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E6642255-667D-476B-B584-089AA5E6C0B1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E6642255-667D-476B-B584-089AA5E6C0B1}.Release|Any CPU.Build.0 = Release|Any CPU
+ {27D46863-65B9-4934-B3C8-2383B217A477}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {27D46863-65B9-4934-B3C8-2383B217A477}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {27D46863-65B9-4934-B3C8-2383B217A477}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {27D46863-65B9-4934-B3C8-2383B217A477}.Release|Any CPU.Build.0 = Release|Any CPU
{A6D480BC-FDE8-4B92-A2A6-FF16BEE486AE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A6D480BC-FDE8-4B92-A2A6-FF16BEE486AE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A6D480BC-FDE8-4B92-A2A6-FF16BEE486AE}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -536,6 +544,10 @@ Global
{0F86BCE8-62E1-4BFC-AA84-63C7514C90AC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0F86BCE8-62E1-4BFC-AA84-63C7514C90AC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0F86BCE8-62E1-4BFC-AA84-63C7514C90AC}.Release|Any CPU.Build.0 = Release|Any CPU
+ {D05FCB31-793E-43E0-BD6C-077013AE9113}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {D05FCB31-793E-43E0-BD6C-077013AE9113}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {D05FCB31-793E-43E0-BD6C-077013AE9113}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {D05FCB31-793E-43E0-BD6C-077013AE9113}.Release|Any CPU.Build.0 = Release|Any CPU
{19564567-1736-4626-B406-17E4E02F18B2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{19564567-1736-4626-B406-17E4E02F18B2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{19564567-1736-4626-B406-17E4E02F18B2}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -606,6 +618,7 @@ Global
{464F1120-A0DA-462B-B9E8-45176D883625} = {673F23AE-7694-4BB9-ABD4-136D6C13634E}
{8AB91636-9055-4900-A72A-7CFFACDFDBF0} = {673F23AE-7694-4BB9-ABD4-136D6C13634E}
{E6642255-667D-476B-B584-089AA5E6C0B1} = {673F23AE-7694-4BB9-ABD4-136D6C13634E}
+ {27D46863-65B9-4934-B3C8-2383B217A477} = {673F23AE-7694-4BB9-ABD4-136D6C13634E}
{A6D480BC-FDE8-4B92-A2A6-FF16BEE486AE} = {673F23AE-7694-4BB9-ABD4-136D6C13634E}
{F6394475-D6F1-46E2-81BF-4BA78A40B878} = {673F23AE-7694-4BB9-ABD4-136D6C13634E}
{BFDA179A-40EB-4CEB-B8E9-0DF32C65E2C5} = {673F23AE-7694-4BB9-ABD4-136D6C13634E}
@@ -653,6 +666,7 @@ Global
{3E55CBE8-AFE8-426D-9470-49D63CD1051C} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF}
{56D0DCA5-567F-4B3B-8B79-CB108F8EB8A6} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF}
{0F86BCE8-62E1-4BFC-AA84-63C7514C90AC} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF}
+ {D05FCB31-793E-43E0-BD6C-077013AE9113} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF}
{19564567-1736-4626-B406-17E4E02F18B2} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF}
{D53726B6-5447-47E6-B881-A44EFF6E5534} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF}
{31EE94A0-E721-4073-B6F1-DD912D004DEF} = {7164F1FB-7F24-444A-ACD2-2C329C2B3CCF}
diff --git a/docs/modules/elasticsearch.md b/docs/modules/elasticsearch.md
index 156eb76d2..3d141524d 100644
--- a/docs/modules/elasticsearch.md
+++ b/docs/modules/elasticsearch.md
@@ -51,5 +51,3 @@ To execute the tests, use the command `dotnet test` from a terminal.
## A Note To Developers
The Testcontainers module creates a container that listens to requests over **HTTPS**. To communicate with the Elasticsearch instance, developers must create a `ElasticsearchClientSettings` instance and set the `ServerCertificateValidationCallback` delegate to `CertificateValidations.AllowAll`. Failing to do so will result in a communication failure as the .NET will reject the certificate coming from the container.
-
-[xunit]: https://xunit.net/
diff --git a/docs/modules/index.md b/docs/modules/index.md
index e533faecf..2c2cf5ea2 100644
--- a/docs/modules/index.md
+++ b/docs/modules/index.md
@@ -56,6 +56,7 @@ await moduleNameContainer.StartAsync();
| Papercut | `jijiechen/papercut:latest` | [NuGet](https://www.nuget.org/packages/Testcontainers.Papercut) | [Source](https://github.com/testcontainers/testcontainers-dotnet/tree/develop/src/Testcontainers.Papercut) |
| PostgreSQL | `postgres:15.1` | [NuGet](https://www.nuget.org/packages/Testcontainers.PostgreSql) | [Source](https://github.com/testcontainers/testcontainers-dotnet/tree/develop/src/Testcontainers.PostgreSql) |
| PubSub | `gcr.io/google.com/cloudsdktool/google-cloud-cli:446.0.1-emulators` | [NuGet](https://www.nuget.org/packages/Testcontainers.PubSub) | [Source](https://github.com/testcontainers/testcontainers-dotnet/tree/develop/src/Testcontainers.PubSub) |
+| Pulsar | `apachepulsar/pulsar:3.2.3` | [NuGet](https://www.nuget.org/packages/Testcontainers.Pulsar) | [Source](https://github.com/testcontainers/testcontainers-dotnet/tree/develop/src/Testcontainers.Pulsar) |
| RabbitMQ | `rabbitmq:3.11` | [NuGet](https://www.nuget.org/packages/Testcontainers.RabbitMq) | [Source](https://github.com/testcontainers/testcontainers-dotnet/tree/develop/src/Testcontainers.RabbitMq) |
| RavenDB | `ravendb/ravendb:5.4-ubuntu-latest` | [NuGet](https://www.nuget.org/packages/Testcontainers.RavenDb) | [Source](https://github.com/testcontainers/testcontainers-dotnet/tree/develop/src/Testcontainers.RavenDb) |
| Redis | `redis:7.0` | [NuGet](https://www.nuget.org/packages/Testcontainers.Redis) | [Source](https://github.com/testcontainers/testcontainers-dotnet/tree/develop/src/Testcontainers.Redis) |
diff --git a/docs/modules/mongodb.md b/docs/modules/mongodb.md
index 43cb06fcf..478d0d63e 100644
--- a/docs/modules/mongodb.md
+++ b/docs/modules/mongodb.md
@@ -45,5 +45,3 @@ public sealed class MongoDbContainerTest : IAsyncLifetime
```
To execute the tests, use the command `dotnet test` from a terminal.
-
-[xunit]: https://xunit.net/
diff --git a/docs/modules/neo4j.md b/docs/modules/neo4j.md
index 991bbc7f0..c93e7189a 100644
--- a/docs/modules/neo4j.md
+++ b/docs/modules/neo4j.md
@@ -47,5 +47,3 @@ public sealed class Neo4jContainerTest : IAsyncLifetime
```
To execute the tests, use the command `dotnet test` from a terminal.
-
-[xunit]: https://xunit.net/
diff --git a/docs/modules/pulsar.md b/docs/modules/pulsar.md
new file mode 100644
index 000000000..3f91b83fa
--- /dev/null
+++ b/docs/modules/pulsar.md
@@ -0,0 +1,108 @@
+# Apache Pulsar
+
+Testcontainers can be used to automatically create [Apache Pulsar](https://pulsar.apache.org) containers without the need for external services. Based on the official Apache Pulsar Docker image, it is recommended to read the official [getting started](https://pulsar.apache.org/docs/next/getting-started-docker/) guide.
+
+The following example uses the following NuGet packages:
+
+```console title="Install the NuGet dependencies"
+dotnet add package Testcontainers.Pulsar
+dotnet add package DotPulsar
+dotnet add package xunit
+```
+
+IDEs and editors may also require the following packages to run tests: `xunit.runner.visualstudio` and `Microsoft.NET.Test.Sdk`.
+
+Copy and paste the following code into a new `.cs` test file within an existing test project.
+
+```csharp
+using System;
+using System.Text;
+using System.Threading.Tasks;
+using DotPulsar;
+using DotPulsar.Extensions;
+using Xunit;
+
+namespace Testcontainers.Pulsar;
+
+public sealed class PulsarContainerTest : IAsyncLifetime
+{
+ private readonly PulsarContainer _pulsarContainer =
+ new PulsarBuilder().Build();
+
+ [Fact]
+ public async Task ConsumerReceivesSendMessage()
+ {
+ const string helloPulsar = "Hello, Pulsar!";
+
+ var topic = $"persistent://public/default/{Guid.NewGuid():D}";
+
+ var name = Guid.NewGuid().ToString("D");
+
+ await using var client = PulsarClient.Builder()
+ .ServiceUrl(new Uri(_pulsarContainer.GetBrokerAddress()))
+ .Build();
+
+ await using var producer = client.NewProducer(Schema.String)
+ .Topic(topic)
+ .Create();
+
+ await using var consumer = client.NewConsumer(Schema.String)
+ .Topic(topic)
+ .SubscriptionName(name)
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .Create();
+
+ _ = await producer.Send(helloPulsar)
+ .ConfigureAwait(true);
+
+ var message = await consumer.Receive()
+ .ConfigureAwait(true);
+
+ Assert.Equal(helloPulsar, Encoding.Default.GetString(message.Data));
+ }
+
+ public Task InitializeAsync()
+ => _pulsarContainer.StartAsync();
+
+ public Task DisposeAsync()
+ => _pulsarContainer.DisposeAsync().AsTask();
+}
+```
+
+To execute the tests, use the command `dotnet test` from a terminal.
+
+## Access Pulsar
+
+To get the Pulsar broker URL use:
+
+```csharp
+string pulsarBrokerUrl = _pulsarContainer.GetPulsarBrokerUrl();
+```
+
+To get the Pulsar service URL use:
+```csharp
+string pulsarServiceUrl = _pulsarContainer.GetHttpServiceUrl();
+```
+
+## Enable token authentication
+
+If you need to use token authentication, use the following builder configuration to enable authentication:
+
+```csharp
+PulsarContainer _pulsarContainer = PulsarBuilder().WithTokenAuthentication().Build();
+```
+
+Start the container and get the token from the running instance by using:
+
+```csharp
+var authToken = await container.CreateAuthenticationTokenAsync(TimeSpan.FromHours(1))
+ .ConfigureAwait(false);
+```
+
+## Enable Pulsar Functions
+
+If you need to use Pulsar Functions, use the following builder configuration to enable it:
+
+```csharp
+PulsarContainer _pulsarContainer = PulsarBuilder().WithFunctions().Build();
+```
diff --git a/mkdocs.yml b/mkdocs.yml
index 97c52d79b..8c43c04de 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -45,4 +45,5 @@ nav:
- modules/mssql.md
- modules/neo4j.md
- modules/postgres.md
+ - modules/pulsar.md
- modules/rabbitmq.md
diff --git a/src/Testcontainers.Pulsar/.editorconfig b/src/Testcontainers.Pulsar/.editorconfig
new file mode 100644
index 000000000..6f066619d
--- /dev/null
+++ b/src/Testcontainers.Pulsar/.editorconfig
@@ -0,0 +1 @@
+root = true
\ No newline at end of file
diff --git a/src/Testcontainers.Pulsar/PulsarBuilder.cs b/src/Testcontainers.Pulsar/PulsarBuilder.cs
new file mode 100644
index 000000000..e223c5c43
--- /dev/null
+++ b/src/Testcontainers.Pulsar/PulsarBuilder.cs
@@ -0,0 +1,207 @@
+namespace Testcontainers.Pulsar;
+
+///
+[PublicAPI]
+public sealed class PulsarBuilder : ContainerBuilder
+{
+ public const string PulsarImage = "apachepulsar/pulsar:3.2.3";
+
+ public const ushort PulsarBrokerDataPort = 6650;
+
+ public const ushort PulsarWebServicePort = 8080;
+
+ public const string StartupScriptFilePath = "/testcontainers.sh";
+
+ public const string SecretKeyFilePath = "/pulsar/secret.key";
+
+ public const string Username = "test-user";
+
+ private static readonly IReadOnlyDictionary AuthenticationEnvVars;
+
+ static PulsarBuilder()
+ {
+ const string authenticationPlugin = "org.apache.pulsar.client.impl.auth.AuthenticationToken";
+ var authenticationEnvVars = new Dictionary();
+ authenticationEnvVars.Add("authenticateOriginalAuthData", "false");
+ authenticationEnvVars.Add("authenticationEnabled", "true");
+ authenticationEnvVars.Add("authorizationEnabled", "true");
+ authenticationEnvVars.Add("authenticationProviders", "org.apache.pulsar.broker.authentication.AuthenticationProviderToken");
+ authenticationEnvVars.Add("brokerClientAuthenticationPlugin", authenticationPlugin);
+ authenticationEnvVars.Add("CLIENT_PREFIX_authPlugin", authenticationPlugin);
+ authenticationEnvVars.Add("PULSAR_PREFIX_authenticationRefreshCheckSeconds", "5");
+ authenticationEnvVars.Add("PULSAR_PREFIX_tokenSecretKey", "file://" + SecretKeyFilePath);
+ authenticationEnvVars.Add("superUserRoles", Username);
+ AuthenticationEnvVars = new ReadOnlyDictionary(authenticationEnvVars);
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ public PulsarBuilder()
+ : this(new PulsarConfiguration())
+ {
+ DockerResourceConfiguration = Init().DockerResourceConfiguration;
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The Docker resource configuration.
+ private PulsarBuilder(PulsarConfiguration resourceConfiguration)
+ : base(resourceConfiguration)
+ {
+ DockerResourceConfiguration = resourceConfiguration;
+ }
+
+ ///
+ protected override PulsarConfiguration DockerResourceConfiguration { get; }
+
+ ///
+ /// Enables authentication.
+ ///
+ ///
+ /// To create an authentication call .
+ ///
+ /// A configured instance of .
+ public PulsarBuilder WithAuthentication()
+ {
+ return Merge(DockerResourceConfiguration, new PulsarConfiguration(authenticationEnabled: true))
+ .WithEnvironment(AuthenticationEnvVars);
+ }
+
+ ///
+ /// Enables function workers.
+ ///
+ /// Determines whether function workers is enabled or not.
+ /// A configured instance of .
+ public PulsarBuilder WithFunctionsWorker(bool functionsWorkerEnabled = true)
+ {
+ return Merge(DockerResourceConfiguration, new PulsarConfiguration(functionsWorkerEnabled: functionsWorkerEnabled));
+ }
+
+ ///
+ public override PulsarContainer Build()
+ {
+ Validate();
+
+ var waitStrategy = Wait.ForUnixContainer().AddCustomWaitStrategy(new WaitUntil(DockerResourceConfiguration.AuthenticationEnabled.GetValueOrDefault()));
+
+ if (DockerResourceConfiguration.FunctionsWorkerEnabled.GetValueOrDefault())
+ {
+ waitStrategy = waitStrategy.UntilMessageIsLogged("Function worker service started");
+ }
+
+ var pulsarBuilder = WithWaitStrategy(waitStrategy);
+ return new PulsarContainer(pulsarBuilder.DockerResourceConfiguration);
+ }
+
+ ///
+ protected override PulsarBuilder Init()
+ {
+ return base.Init()
+ .WithImage(PulsarImage)
+ .WithPortBinding(PulsarBrokerDataPort, true)
+ .WithPortBinding(PulsarWebServicePort, true)
+ .WithFunctionsWorker(false)
+ .WithEntrypoint("/bin/sh", "-c")
+ .WithCommand("while [ ! -f " + StartupScriptFilePath + " ]; do sleep 0.1; done; " + StartupScriptFilePath)
+ .WithStartupCallback((container, ct) => container.CopyStartupScriptAsync(ct));
+ }
+
+ ///
+ protected override PulsarBuilder Clone(IResourceConfiguration resourceConfiguration)
+ {
+ return Merge(DockerResourceConfiguration, new PulsarConfiguration(resourceConfiguration));
+ }
+
+ ///
+ protected override PulsarBuilder Clone(IContainerConfiguration resourceConfiguration)
+ {
+ return Merge(DockerResourceConfiguration, new PulsarConfiguration(resourceConfiguration));
+ }
+
+ ///
+ protected override PulsarBuilder Merge(PulsarConfiguration oldValue, PulsarConfiguration newValue)
+ {
+ return new PulsarBuilder(new PulsarConfiguration(oldValue, newValue));
+ }
+
+ ///
+ private sealed class WaitUntil : IWaitUntil
+ {
+ private readonly HttpWaitStrategy _httpWaitStrategy = new HttpWaitStrategy()
+ .ForPath("/admin/v2/clusters")
+ .ForPort(PulsarWebServicePort)
+ .ForResponseMessageMatching(IsClusterHealthyAsync);
+
+ private readonly bool _authenticationEnabled;
+
+ private string _authToken;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// A value indicating whether authentication is enabled or not.
+ public WaitUntil(bool authenticationEnabled)
+ {
+ _authenticationEnabled = authenticationEnabled;
+ }
+
+ ///
+ public Task UntilAsync(IContainer container)
+ {
+ return UntilAsync(container as PulsarContainer);
+ }
+
+ ///
+ private async Task UntilAsync(PulsarContainer container)
+ {
+ _ = Guard.Argument(container, nameof(container))
+ .NotNull();
+
+ if (_authenticationEnabled && _authToken == null)
+ {
+ try
+ {
+ _authToken = await container.CreateAuthenticationTokenAsync(TimeSpan.FromHours(1))
+ .ConfigureAwait(false);
+
+ _ = _httpWaitStrategy.WithHeader("Authorization", "Bearer " + _authToken.Trim());
+ }
+ catch
+ {
+ return false;
+ }
+ }
+
+ return await _httpWaitStrategy.UntilAsync(container)
+ .ConfigureAwait(false);
+ }
+
+ ///
+ /// Determines whether the cluster is healthy or not.
+ ///
+ /// The HTTP response that contains the cluster information.
+ /// A value indicating whether the cluster is healthy or not.
+ private static async Task IsClusterHealthyAsync(HttpResponseMessage response)
+ {
+ var jsonString = await response.Content.ReadAsStringAsync()
+ .ConfigureAwait(false);
+
+ try
+ {
+ var status = JsonDocument.Parse(jsonString)
+ .RootElement
+ .EnumerateArray()
+ .ElementAt(0)
+ .GetString();
+
+ return "standalone".Equals(status);
+ }
+ catch
+ {
+ return false;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/Testcontainers.Pulsar/PulsarConfiguration.cs b/src/Testcontainers.Pulsar/PulsarConfiguration.cs
new file mode 100644
index 000000000..9a290878b
--- /dev/null
+++ b/src/Testcontainers.Pulsar/PulsarConfiguration.cs
@@ -0,0 +1,71 @@
+namespace Testcontainers.Pulsar;
+
+///
+[PublicAPI]
+public sealed class PulsarConfiguration : ContainerConfiguration
+{
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// A value indicating whether authentication is enabled or not.
+ /// A value indicating whether function workers is enabled or not.
+ public PulsarConfiguration(
+ bool? authenticationEnabled = null,
+ bool? functionsWorkerEnabled = null)
+ {
+ AuthenticationEnabled = authenticationEnabled;
+ FunctionsWorkerEnabled = functionsWorkerEnabled;
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The Docker resource configuration.
+ public PulsarConfiguration(IResourceConfiguration resourceConfiguration)
+ : base(resourceConfiguration)
+ {
+ // Passes the configuration upwards to the base implementations to create an updated immutable copy.
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The Docker resource configuration.
+ public PulsarConfiguration(IContainerConfiguration resourceConfiguration)
+ : base(resourceConfiguration)
+ {
+ // Passes the configuration upwards to the base implementations to create an updated immutable copy.
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The Docker resource configuration.
+ public PulsarConfiguration(PulsarConfiguration resourceConfiguration)
+ : this(new PulsarConfiguration(), resourceConfiguration)
+ {
+ // Passes the configuration upwards to the base implementations to create an updated immutable copy.
+ }
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The old Docker resource configuration.
+ /// The new Docker resource configuration.
+ public PulsarConfiguration(PulsarConfiguration oldValue, PulsarConfiguration newValue)
+ : base(oldValue, newValue)
+ {
+ AuthenticationEnabled = (oldValue.AuthenticationEnabled.HasValue && oldValue.AuthenticationEnabled.Value) || (newValue.AuthenticationEnabled.HasValue && newValue.AuthenticationEnabled.Value);
+ FunctionsWorkerEnabled = (oldValue.FunctionsWorkerEnabled.HasValue && oldValue.FunctionsWorkerEnabled.Value) || (newValue.FunctionsWorkerEnabled.HasValue && newValue.FunctionsWorkerEnabled.Value);
+ }
+
+ ///
+ /// Gets a value indicating whether authentication is enabled or not.
+ ///
+ public bool? AuthenticationEnabled { get; }
+
+ ///
+ /// Gets a value indicating whether function workers is enabled or not.
+ ///
+ public bool? FunctionsWorkerEnabled { get; }
+}
\ No newline at end of file
diff --git a/src/Testcontainers.Pulsar/PulsarContainer.cs b/src/Testcontainers.Pulsar/PulsarContainer.cs
new file mode 100644
index 000000000..7d42e7cac
--- /dev/null
+++ b/src/Testcontainers.Pulsar/PulsarContainer.cs
@@ -0,0 +1,117 @@
+namespace Testcontainers.Pulsar;
+
+///
+[PublicAPI]
+public sealed class PulsarContainer : DockerContainer
+{
+ private readonly PulsarConfiguration _configuration;
+
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The container configuration.
+ public PulsarContainer(PulsarConfiguration configuration)
+ : base(configuration)
+ {
+ _configuration = configuration;
+ }
+
+ ///
+ /// Gets the Pulsar broker address.
+ ///
+ /// The Pulsar broker address.
+ public string GetBrokerAddress()
+ {
+ return new UriBuilder("pulsar", Hostname, GetMappedPublicPort(PulsarBuilder.PulsarBrokerDataPort)).ToString();
+ }
+
+ ///
+ /// Gets the Pulsar web service address.
+ ///
+ /// The Pulsar web service address.
+ public string GetServiceAddress()
+ {
+ return new UriBuilder(Uri.UriSchemeHttp, Hostname, GetMappedPublicPort(PulsarBuilder.PulsarWebServicePort)).ToString();
+ }
+
+ ///
+ /// Creates an authentication token.
+ ///
+ /// The time after the authentication token expires.
+ /// Cancellation token.
+ /// A task that completes when the authentication token has been created.
+ ///
+ public async Task CreateAuthenticationTokenAsync(TimeSpan expire = default, CancellationToken ct = default)
+ {
+ int secondsToMilliseconds;
+
+ if (_configuration.AuthenticationEnabled.HasValue && !_configuration.AuthenticationEnabled.Value)
+ {
+ throw new ArgumentException("Failed to create token. Authentication is not enabled.");
+ }
+
+ if (_configuration.Image.Tag.StartsWith("3.2") || _configuration.Image.Tag.StartsWith("latest"))
+ {
+ Logger.LogWarning("The 'apachepulsar/pulsar:3.2.?' image contains a regression. The expiry time is converted to the wrong unit of time: https://github.com/apache/pulsar/issues/22811.");
+ secondsToMilliseconds = 1000;
+ }
+ else
+ {
+ secondsToMilliseconds = 1;
+ }
+
+ var command = new[]
+ {
+ "bin/pulsar",
+ "tokens",
+ "create",
+ "--secret-key",
+ PulsarBuilder.SecretKeyFilePath,
+ "--subject",
+ PulsarBuilder.Username,
+ "--expiry-time",
+ $"{secondsToMilliseconds * expire.TotalSeconds}s",
+ };
+
+ var tokensResult = await ExecAsync(command, ct)
+ .ConfigureAwait(false);
+
+ if (tokensResult.ExitCode != 0)
+ {
+ throw new ArgumentException($"Failed to create token. Command returned a non-zero exit code: {tokensResult.Stderr}.");
+ }
+
+ return tokensResult.Stdout;
+ }
+
+ ///
+ /// Copies the Pulsar startup script to the container.
+ ///
+ /// Cancellation token.
+ /// A task that completes when the startup script has been copied.
+ internal Task CopyStartupScriptAsync(CancellationToken ct = default)
+ {
+ var startupScript = new StringWriter();
+ startupScript.NewLine = "\n";
+ startupScript.WriteLine("#!/bin/bash");
+
+ if (_configuration.AuthenticationEnabled.HasValue && _configuration.AuthenticationEnabled.Value)
+ {
+ startupScript.WriteLine("bin/pulsar tokens create-secret-key --output " + PulsarBuilder.SecretKeyFilePath);
+ startupScript.WriteLine("export brokerClientAuthenticationParameters=token:$(bin/pulsar tokens create --secret-key $PULSAR_PREFIX_tokenSecretKey --subject $superUserRoles)");
+ startupScript.WriteLine("export CLIENT_PREFIX_authParams=$brokerClientAuthenticationParameters");
+ startupScript.WriteLine("bin/apply-config-from-env.py conf/standalone.conf");
+ startupScript.WriteLine("bin/apply-config-from-env-with-prefix.py CLIENT_PREFIX_ conf/client.conf");
+ }
+
+ startupScript.Write("bin/pulsar standalone");
+
+ if (_configuration.FunctionsWorkerEnabled.HasValue && !_configuration.FunctionsWorkerEnabled.Value)
+ {
+ startupScript.Write(" --no-functions-worker");
+ startupScript.Write(" --no-stream-storage");
+ }
+
+ return CopyAsync(Encoding.Default.GetBytes(startupScript.ToString()), PulsarBuilder.StartupScriptFilePath, Unix.FileMode755, ct);
+ }
+}
\ No newline at end of file
diff --git a/src/Testcontainers.Pulsar/Testcontainers.Pulsar.csproj b/src/Testcontainers.Pulsar/Testcontainers.Pulsar.csproj
new file mode 100644
index 000000000..a108060b3
--- /dev/null
+++ b/src/Testcontainers.Pulsar/Testcontainers.Pulsar.csproj
@@ -0,0 +1,12 @@
+
+
+ netstandard2.0;netstandard2.1
+ latest
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/Testcontainers.Pulsar/Usings.cs b/src/Testcontainers.Pulsar/Usings.cs
new file mode 100644
index 000000000..59c490a37
--- /dev/null
+++ b/src/Testcontainers.Pulsar/Usings.cs
@@ -0,0 +1,17 @@
+global using System;
+global using System.Collections.Generic;
+global using System.Collections.ObjectModel;
+global using System.IO;
+global using System.Linq;
+global using System.Net.Http;
+global using System.Text;
+global using System.Text.Json;
+global using System.Threading;
+global using System.Threading.Tasks;
+global using Docker.DotNet.Models;
+global using DotNet.Testcontainers;
+global using DotNet.Testcontainers.Builders;
+global using DotNet.Testcontainers.Configurations;
+global using DotNet.Testcontainers.Containers;
+global using JetBrains.Annotations;
+global using Microsoft.Extensions.Logging;
\ No newline at end of file
diff --git a/tests/Testcontainers.Databases.Tests/Usings.cs b/tests/Testcontainers.Databases.Tests/Usings.cs
index 2ea483b34..5eaa436ef 100644
--- a/tests/Testcontainers.Databases.Tests/Usings.cs
+++ b/tests/Testcontainers.Databases.Tests/Usings.cs
@@ -1,5 +1,4 @@
global using System;
-global using System.Collections.Generic;
global using System.Collections.Immutable;
global using System.Data.Common;
global using System.IO;
diff --git a/tests/Testcontainers.Pulsar.Tests/.editorconfig b/tests/Testcontainers.Pulsar.Tests/.editorconfig
new file mode 100644
index 000000000..6f066619d
--- /dev/null
+++ b/tests/Testcontainers.Pulsar.Tests/.editorconfig
@@ -0,0 +1 @@
+root = true
\ No newline at end of file
diff --git a/tests/Testcontainers.Pulsar.Tests/PulsarContainerTest.cs b/tests/Testcontainers.Pulsar.Tests/PulsarContainerTest.cs
new file mode 100644
index 000000000..27ae620e4
--- /dev/null
+++ b/tests/Testcontainers.Pulsar.Tests/PulsarContainerTest.cs
@@ -0,0 +1,88 @@
+namespace Testcontainers.Pulsar;
+
+public abstract class PulsarContainerTest : IAsyncLifetime
+{
+ private readonly PulsarContainer _pulsarContainer;
+
+ private PulsarContainerTest(PulsarContainer pulsarContainer)
+ {
+ _pulsarContainer = pulsarContainer;
+ }
+
+ protected abstract Task CreateClientAsync(CancellationToken ct = default);
+
+ public Task InitializeAsync()
+ {
+ return _pulsarContainer.StartAsync();
+ }
+
+ public Task DisposeAsync()
+ {
+ return _pulsarContainer.DisposeAsync().AsTask();
+ }
+
+ [Fact]
+ public async Task ConsumerReceivesSendMessage()
+ {
+ // Given
+ const string helloPulsar = "Hello, Pulsar!";
+
+ var topic = $"persistent://public/default/{Guid.NewGuid():D}";
+
+ var name = Guid.NewGuid().ToString("D");
+
+ await using var client = await CreateClientAsync()
+ .ConfigureAwait(true);
+
+ await using var producer = client.NewProducer(Schema.String)
+ .Topic(topic)
+ .Create();
+
+ await using var consumer = client.NewConsumer(Schema.String)
+ .Topic(topic)
+ .SubscriptionName(name)
+ .InitialPosition(SubscriptionInitialPosition.Earliest)
+ .Create();
+
+ // When
+ _ = await producer.Send(helloPulsar)
+ .ConfigureAwait(true);
+
+ var message = await consumer.Receive()
+ .ConfigureAwait(true);
+
+ // Then
+ Assert.Equal(helloPulsar, Encoding.Default.GetString(message.Data));
+ }
+
+ [UsedImplicitly]
+ public sealed class PulsarDefaultConfiguration : PulsarContainerTest
+ {
+ public PulsarDefaultConfiguration()
+ : base(new PulsarBuilder().Build())
+ {
+ }
+
+ protected override Task CreateClientAsync(CancellationToken ct = default)
+ {
+ return Task.FromResult(PulsarClient.Builder().ServiceUrl(new Uri(_pulsarContainer.GetBrokerAddress())).Build());
+ }
+ }
+
+ [UsedImplicitly]
+ public sealed class PulsarAuthConfiguration : PulsarContainerTest
+ {
+ public PulsarAuthConfiguration()
+ : base(new PulsarBuilder().WithAuthentication().Build())
+ {
+ }
+
+ protected override async Task CreateClientAsync(CancellationToken ct = default)
+ {
+ var authToken = await _pulsarContainer.CreateAuthenticationTokenAsync(TimeSpan.FromHours(1), ct)
+ .ConfigureAwait(false);
+
+ return PulsarClient.Builder().ServiceUrl(new Uri(_pulsarContainer.GetBrokerAddress())).Authentication(new TokenAuthentication(authToken)).Build();
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/Testcontainers.Pulsar.Tests/Testcontainers.Pulsar.Tests.csproj b/tests/Testcontainers.Pulsar.Tests/Testcontainers.Pulsar.Tests.csproj
new file mode 100644
index 000000000..7faabcd5d
--- /dev/null
+++ b/tests/Testcontainers.Pulsar.Tests/Testcontainers.Pulsar.Tests.csproj
@@ -0,0 +1,18 @@
+
+
+ net8.0
+ false
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/tests/Testcontainers.Pulsar.Tests/Usings.cs b/tests/Testcontainers.Pulsar.Tests/Usings.cs
new file mode 100644
index 000000000..56f7420c0
--- /dev/null
+++ b/tests/Testcontainers.Pulsar.Tests/Usings.cs
@@ -0,0 +1,10 @@
+global using System;
+global using System.Text;
+global using System.Threading;
+global using System.Threading.Tasks;
+global using DotPulsar;
+global using DotPulsar.Abstractions;
+global using DotPulsar.Extensions;
+global using DotPulsar.Internal;
+global using JetBrains.Annotations;
+global using Xunit;
\ No newline at end of file