Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade 1.5.2 and improve debugging #186

Merged
merged 11 commits into from
Oct 23, 2020
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,5 @@ local.appsettings.tests.json
# Mac
.DS_Store

local.settings.json.org
samples/dotnet/KafkaFunctionSample/Properties/
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,11 @@ The settings exposed here are targeted to more advanced users that want to custo
|MaxPartitionFetchBytes|max.partition.fetch.bytes|Trigger
|FetchMaxBytes|fetch.max.bytes|Trigger
|AutoCommitIntervalMs|auto.commit.interval.ms|Trigger
|LibkafkaDebug|debug|Both
|MetadataMaxAgeMs|metadata.max.age.ms|Both
|SocketKeepaliveEnable|socket.keepalive.enable|Both

**NOTE:** `MetadataMaxAgeMs` default is `180000` `SocketKeepaliveEnable` default is `true` otherwise, the default value is the same as the [Configuration properties](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md). The reason of the default settings, refer to this [issue](https://github.com/Azure/azure-functions-kafka-extension/issues/187).

If you are missing an configuration setting please create an issue and describe why you need it.

Expand Down
6 changes: 3 additions & 3 deletions samples/dotnet/ConsoleConsumer/ConsoleConsumer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.4.3" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.4.3" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.4.3" />
<PackageReference Include="Confluent.Kafka" Version="1.5.2" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.5.2" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.5.2" />
</ItemGroup>
<ItemGroup>
<None Update="cacert.pem">
Expand Down
6 changes: 3 additions & 3 deletions samples/dotnet/ConsoleProducer/ConsoleProducer.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.4.3" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.4.3" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Protobuf" Version="1.4.3" />
<PackageReference Include="Confluent.Kafka" Version="1.5.2" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.5.2" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Protobuf" Version="1.5.2" />
</ItemGroup>
<ItemGroup>
<None Update="cacert.pem">
Expand Down
4 changes: 2 additions & 2 deletions samples/dotnet/KafkaFunctionSample/KafkaFunctionSample.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
<AzureFunctionsVersion>v3</AzureFunctionsVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.4.3" />
<PackageReference Include="Confluent.Kafka" Version="1.5.2" />
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="3.0.7" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Protobuf" Version="1.4.3" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Protobuf" Version="1.5.2" />
</ItemGroup>
<ItemGroup>
<None Update="host.json">
Expand Down
99 changes: 68 additions & 31 deletions samples/dotnet/KafkaFunctionSample/SimpleKafkaTriggers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,50 +7,87 @@
using Microsoft.AspNetCore.Http;
using System;
using System.IO;
using System.Text;

namespace KafkaFunctionSample
{
public class SimpleKafkaTriggers
{
[FunctionName(nameof(SampleConsumer))]
public void SampleConsumer(
[KafkaTrigger(
[FunctionName(nameof(ConsoleConsumer))]
public void ConsoleConsumer(
[KafkaTrigger(
"LocalBroker",
"%EHTOPIC%",
"stringTopicTenPartitions",
ConsumerGroup = "$Default",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string> kafkaEvent,
ILogger logger)
AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string>[] kafkaEvents,
ILogger logger)
{
logger.LogInformation(kafkaEvent.Value.ToString());
foreach(var kafkaEvent in kafkaEvents)
logger.LogInformation(kafkaEvent.Value.ToString());
}

[FunctionName(nameof(SampleProducer))]
public IActionResult SampleProducer(
[FunctionName(nameof(ConsoleProducer))]
public static IActionResult ConsoleProducer(
[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
[Kafka(
"LocalBroker",
"%EHTOPIC%",
Username = "$ConnectionString",
Password = "%EventHubConnectionString%",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain)] out KafkaEventData<string>[] kafkaEventData,
ILogger logger)
[Kafka("LocalBroker", "stringTopicTenPartitions")] out string kafkaEventData,
ILogger log)
{
var data = new StreamReader(req.Body).ReadToEnd();
kafkaEventData = new[] {
new KafkaEventData<string>()
{
Value = data + ":1:" + DateTime.UtcNow.Ticks,
},
new KafkaEventData<string>()
{
Value = data + ":2:" + DateTime.UtcNow.Ticks,
},
};
try
{
var data = new StreamReader(req.Body).ReadToEnd();
kafkaEventData = data + ":1:" + DateTime.UtcNow.Ticks;
}
catch (Exception ex)
{
throw new Exception("Are you sure the topic 'stringTopic' exists? To create using Confluent Docker quickstart run this command: 'docker-compose exec broker kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 10 --topic stringTopicTenPartitions'", ex);
}

return new OkResult();
}

// EventHubs Configuration sample
TsuyoshiUshio marked this conversation as resolved.
Show resolved Hide resolved
//
//[FunctionName(nameof(SampleConsumer))]
//public void SampleConsumer(
//[KafkaTrigger(
// "LocalBroker",
// "%EHTOPIC%",
// ConsumerGroup = "$Default",
// Username = "$ConnectionString",
// Password = "%EventHubConnectionString%",
// Protocol = BrokerProtocol.SaslSsl,
// AuthenticationMode = BrokerAuthenticationMode.Plain)] KafkaEventData<string> kafkaEvent,
//ILogger logger)
//{
// logger.LogInformation(kafkaEvent.Value.ToString());
//}

// EventHubs Configuration sample
//
//[FunctionName(nameof(SampleProducer))]
//public IActionResult SampleProducer(
//[HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req,
//[Kafka(
// "LocalBroker",
// "%EHTOPIC%",
// Username = "$ConnectionString",
// Password = "%EventHubConnectionString%",
// Protocol = BrokerProtocol.SaslSsl,
// AuthenticationMode = BrokerAuthenticationMode.Plain)] out KafkaEventData<string>[] kafkaEventData,
//ILogger logger)
//{
// var data = new StreamReader(req.Body).ReadToEnd();
// kafkaEventData = new[] {
// new KafkaEventData<string>()
// {
// Value = data + ":1:" + DateTime.UtcNow.Ticks,
// },
// new KafkaEventData<string>()
// {
// Value = data + ":2:" + DateTime.UtcNow.Ticks,
// },
// };
// return new OkResult();
//}
}
}
15 changes: 12 additions & 3 deletions samples/dotnet/KafkaFunctionSample/host.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
{
"version": "2.0",
"logging": {
"logLevel": {
"default": "Information",
"Host.Triggers.Kafka": "Information",
"Kafka": "Information",
"Host.Results": "Error",
"Function": "Trace",
"Function.FunctionA": "Warning",
"Host.Aggregator": "Trace"
}
},
"extensions": {
"kafka": {
"maxBatchSize": 100
}

}
}
6 changes: 3 additions & 3 deletions samples/dotnet/SampleHost/SampleHost.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@

<ItemGroup>
<PackageReference Include="Microsoft.Azure.WebJobs.Logging" Version="3.0.5" />
<PackageReference Include="Confluent.Kafka" Version="1.4.3" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.4.3" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Protobuf" Version="1.4.3" />
<PackageReference Include="Confluent.Kafka" Version="1.5.2" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.5.2" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Protobuf" Version="1.5.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="3.0.4" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.2.0" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,28 @@ public int MaxBatchSize
/// <value>The auto commit interval ms.</value>
public int AutoCommitIntervalMs { get; set; } = 200;

/// <summary>
/// Gets or sets the debug option for librdkafka library.
/// Default = "" (disable)
/// A comma-separated list of debug contexts to enable: all,generic,broker,topic,metadata,producer,queue,msg,protocol,cgrp,security,fetch
/// Librdkafka: debug
/// </summary>
public string LibkafkaDebug { get; set; } = null;

// <summary>
// Metadata cache max age.
// https://github.com/Azure/azure-functions-kafka-extension/issues/187
// default: 180000
// </summary>
public int? MetadataMaxAgeMs { get; set; } = 180000;

// <summary>
// Enable TCP keep-alives (SO_KEEPALIVE) on broker sockets
// https://github.com/Azure/azure-functions-kafka-extension/issues/187
// default: true
// </summary>
public bool? SocketKeepaliveEnable { get; set; } = true;

int subscriberIntervalInSeconds = 1;
/// <summary>
/// Defines the minimum frequency in which messages will be executed by function. Only if the message volume is less than <see cref="MaxBatchSize"/> / <see cref="SubscriberIntervalInSeconds"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ private IConsumer<TKey, TValue> CreateConsumer()
builder.SetValueDeserializer(ValueDeserializer);
}

builder.SetLogHandler((_, m) =>
{
logger.Log((LogLevel)m.LevelAs(LogLevelType.MicrosoftExtensionsLogging), $"Libkafka: {m?.Message}");
});

return builder.Build();
}

Expand Down Expand Up @@ -155,6 +160,9 @@ private ConsumerConfig GetConsumerConfiguration()
// Interval in which commits stored in memory will be saved
AutoCommitIntervalMs = this.options.AutoCommitIntervalMs,

// Librdkafka debug options
Debug = this.options.LibkafkaDebug,

// start from earliest if no checkpoint has been committed
AutoOffsetReset = AutoOffsetReset.Earliest,

Expand All @@ -178,6 +186,8 @@ private ConsumerConfig GetConsumerConfiguration()
QueuedMaxMessagesKbytes = this.options.QueuedMaxMessagesKbytes,
MaxPartitionFetchBytes = this.options.MaxPartitionFetchBytes,
FetchMaxBytes = this.options.FetchMaxBytes,
MetadataMaxAgeMs = this.options.MetadataMaxAgeMs,
SocketKeepaliveEnable = this.options.SocketKeepaliveEnable
};

if (string.IsNullOrEmpty(this.listenerConfiguration.EventHubConnectionString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
<WarningsAsErrors />
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.4.3" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.4.3" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.4.3" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Protobuf" Version="1.4.3" />
<PackageReference Include="Confluent.Kafka" Version="1.5.2" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.5.2" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.5.2" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Protobuf" Version="1.5.2" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.14" />
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004">
<PrivateAssets>all</PrivateAssets>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ private string CreateKeyForConfig(ProducerConfig producerConfig)
private IProducer<byte[], byte[]> CreateBaseProducer(ProducerConfig producerConfig)
{
var builder = new ProducerBuilder<byte[], byte[]>(producerConfig);
ILogger logger = this.loggerProvider.CreateLogger("Kafka");
builder.SetLogHandler((_, m) =>
{
logger.Log((LogLevel)m.LevelAs(LogLevelType.MicrosoftExtensionsLogging), $"Libkafka: {m?.Message}");
});

return builder.Build();
}

Expand Down Expand Up @@ -107,7 +113,7 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
{
resolvedSslKeyLocation = entity.Attribute.SslKeyLocation;
}

var kafkaOptions = this.config.Get<KafkaOptions>();
var conf = new ProducerConfig()
{
BootstrapServers = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.BrokerList),
Expand All @@ -121,7 +127,10 @@ public ProducerConfig GetProducerConfig(KafkaProducerEntity entity)
SslKeyLocation = resolvedSslKeyLocation,
SslKeyPassword = entity.Attribute.SslKeyPassword,
SslCertificateLocation = resolvedSslCertificationLocation,
SslCaLocation = resolvedSslCaLocation
SslCaLocation = resolvedSslCaLocation,
Debug = kafkaOptions?.LibkafkaDebug,
MetadataMaxAgeMs = kafkaOptions?.MetadataMaxAgeMs,
SocketKeepaliveEnable = kafkaOptions?.SocketKeepaliveEnable
};

if (entity.Attribute.AuthenticationMode != BrokerAuthenticationMode.NotSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class MultipleItemFunctionExecutor<TKey, TValue> : FunctionExecutorBase<T
public MultipleItemFunctionExecutor(ITriggeredFunctionExecutor executor, IConsumer<TKey, TValue> consumer, int channelCapacity, int channelFullRetryIntervalInMs, ICommitStrategy<TKey, TValue> commitStrategy, ILogger logger)
: base(executor, consumer, channelCapacity, channelFullRetryIntervalInMs, commitStrategy, logger)
{
logger.LogInformation($"FunctionExecutor Loaded: {nameof(MultipleItemFunctionExecutor<TKey, TValue>)}");
}

protected override async Task ReaderAsync(ChannelReader<IKafkaEventData[]> reader, CancellationToken cancellationToken, ILogger logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class SingleItemFunctionExecutor<TKey, TValue> : FunctionExecutorBase<TKe
public SingleItemFunctionExecutor(ITriggeredFunctionExecutor executor, IConsumer<TKey, TValue> consumer, int channelCapacity, int channelFullRetryIntervalInMs, ICommitStrategy<TKey, TValue> commitStrategy, ILogger logger)
: base(executor, consumer, channelCapacity, channelFullRetryIntervalInMs, commitStrategy, logger)
{
logger.LogInformation($"FunctionExecutor Loaded: {nameof(SingleItemFunctionExecutor<TKey, TValue>)}");
}

protected override async Task ReaderAsync(ChannelReader<IKafkaEventData[]> reader, CancellationToken cancellationToken, ILogger logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
<PackageReference Include="xunit" Version="2.4.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Logging" Version="3.0.5" />
<PackageReference Include="Confluent.Kafka" Version="1.4.3" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.4.3" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.4.3" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Protobuf" Version="1.4.3" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="3.0.4" />
<PackageReference Include="Confluent.Kafka" Version="1.5.2" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.5.2" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.5.2" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Protobuf" Version="1.5.2" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="1.5.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ public async Task When_Options_Are_Set_Should_Be_Set_In_Consumer_Config()

await target.StartAsync(default);

Assert.Equal(10, target.ConsumerConfig.Count());
Assert.Equal(12, target.ConsumerConfig.Count());
Assert.Equal("testBroker", target.ConsumerConfig.BootstrapServers);
Assert.Equal("group1", target.ConsumerConfig.GroupId);
Assert.Equal("password1", target.ConsumerConfig.SslKeyPassword);
Expand All @@ -352,6 +352,8 @@ public async Task When_Options_Are_Set_Should_Be_Set_In_Consumer_Config()
Assert.Equal(kafkaOptions.AutoCommitIntervalMs, target.ConsumerConfig.AutoCommitIntervalMs);
Assert.Equal(true, target.ConsumerConfig.EnableAutoCommit);
Assert.Equal(false, target.ConsumerConfig.EnableAutoOffsetStore);
Assert.Equal(180000, target.ConsumerConfig.MetadataMaxAgeMs);
Assert.Equal(true, target.ConsumerConfig.SocketKeepaliveEnable);
Assert.Equal(AutoOffsetReset.Earliest, target.ConsumerConfig.AutoOffsetReset);

await target.StopAsync(default);
Expand Down Expand Up @@ -390,7 +392,7 @@ public async Task When_Options_With_Ssal_Are_Set_Should_Be_Set_In_Consumer_Confi

await target.StartAsync(default);

Assert.Equal(10, target.ConsumerConfig.Count());
Assert.Equal(12, target.ConsumerConfig.Count());
Assert.Equal("testBroker", target.ConsumerConfig.BootstrapServers);
Assert.Equal("group1", target.ConsumerConfig.GroupId);
Assert.Equal(kafkaOptions.AutoCommitIntervalMs, target.ConsumerConfig.AutoCommitIntervalMs);
Expand Down