MASES.KNet 2.9.0

dotnet add package MASES.KNet --version 2.9.0                
NuGet\Install-Package MASES.KNet -Version 2.9.0                
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="MASES.KNet" Version="2.9.0" />                
For projects that support PackageReference, copy this XML node into the project file to reference the package.
paket add MASES.KNet --version 2.9.0                
#r "nuget: MASES.KNet, 2.9.0"                
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
// Install MASES.KNet as a Cake Addin
#addin nuget:?package=MASES.KNet&version=2.9.0

// Install MASES.KNet as a Cake Tool
#tool nuget:?package=MASES.KNet&version=2.9.0                

title: Usage of .NET suite for Apache Kafka™ _description: Describes how to use .NET suite for Apache Kafka™

KNet: library usage

To use KNet classes the developer can write code in .NET using the same classes available in the official Apache Kafka™ package. If classes or methods are not available yet it is possible to use the approach synthetized in What to do if an API was not yet implemented

Environment setup

KNet accepts many command-line switches to customize its behavior. The full list is available at Command line switch page.

JVM identification

One of the most important command-line switch is JVMPath and it is available in JCOBridge switches: it can be used to set-up the location of the JVM library if JCOBridge is not able to identify a suitable JRE/JDK installation. If a developer is using KNet within its own product it is possible to override the JVMPath property with a snippet like the following one:

    class MyKNetCore : KNetCore
    {
        public override string JVMPath
        {
            get
            {
                string pathToJVM = "Set here the path to JVM library or use your own search method";
                return pathToJVM;
            }
        }
    }

[!IMPORTANT] pathToJVM shall be escaped

  1. string pathToJVM = "C:\\Program Files\\Eclipse Adoptium\\jdk-11.0.18.10-hotspot\\bin\\server\\jvm.dll";
  2. string pathToJVM = @"C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\bin\server\jvm.dll";

Special initialization conditions

JCOBridge try to identify a suitable JRE/JDK installation within the system using some standard mechanism of JRE/JDK: JAVA_HOME environment variable or Windows registry if available. However it is possible, on Windows operating systems, that the library raises an InvalidOperationException: Missing Java Key in registry: Couldn't find Java installed on the machine. This means that neither JAVA_HOME nor Windows registry contains information about a default installed JRE/JDK: some vendors may not setup them. If the developer/user encounter this condition can do the following steps:

  1. On a command prompt execute set | findstr JAVA_HOME and verify the result;
  2. If something was reported maybe the JAVA_HOME environment variable is not set at system level, but at a different level like user level which is not visible from the KNet process that raised the exception;
  3. Try to set JAVA_HOME at system level e.g. JAVA_HOME=C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\;
  4. Try to set JCOBRIDGE_JVMPath at system level e.g. JCOBRIDGE_JVMPath=C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\.

[!IMPORTANT]

  • One of JCOBRIDGE_JVMPath or JAVA_HOME environment variables or Windows registry (on Windows OSes) shall be available
  • JCOBRIDGE_JVMPath environment variable takes precedence over JAVA_HOME and Windows registry: you can set JCOBRIDGE_JVMPath to C:\Program Files\Eclipse Adoptium\jdk-11.0.18.10-hotspot\bin\server\jvm.dll and avoid to override JVMPath in your code
  • After first initialization steps, JVMPath takes precedence over JCOBRIDGE_JVMPath/JAVA_HOME environment variables or Windows registry

Intel CET and KNet

KNet uses an embedded JVM through JNet/JCOBridge, however JVM initialization is incompatible with CET because the code used to identify CPU try to modify the return address and this is considered from CET a violation: see this comment.

From .NET 9 preview 6, CET is enabled by default on supported hardware when the final stage produce an executable artifact, i.e. the csproj file contains <OutputType>Exe</OutputType>.

If the application, upon startup, fails with the error 0xc0000409 (subcode 0x30) it was compiled with CET enabled and it fails during JVM initialization.

To solve the issue there are four possible solutions:

  1. use a .NET version, e.g. 8, that does not enable CET by default
  2. Add the following snippet to disable CET on executable (templates available for KNet are ready made and solve this issue):
	<PropertyGroup Condition="'$(TargetFramework)' == 'net9.0'">
		
		<CETCompat>false</CETCompat>
	</PropertyGroup>
  1. Use the dotnet app host, as reported in https://github.com/masesgroup/JCOBridgePublic/issues/7#issuecomment-2550031946, with a syntax like:
	dotnet MyApplication.dll

instead of the classic:

	MyApplication.exe
  1. If you want to run the classic application execute the following command in an elevated shell:
	reg add "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion\Image File Execution Options\MyApplication.exe" /v MitigationOptions /t REG_BINARY /d "0000000000000000000000000000002000" /f

then run:

	MyApplication.exe

Use the following to enable again CET:

	reg delete "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\Windows NT\CurrentVersion\Image File Execution Options\MyApplication.exe" /v MitigationOptions /f

Producer example

Below the reader can found two different version of producer examples.

Simple producer

A basic producer can be like the following one:

using MASES.KNet;
using Org.Apache.Kafka™.Clients.Producer;
using Java.Util;
using System;
using System.Threading;

namespace MASES.KNetTemplate.KNetProducer
{
    class Program
    {
        const string theServer = "localhost:9092";
        const string theTopic = "myTopic";

        static string serverToUse = theServer;
        static string topicToUse = theTopic;

        static readonly ManualResetEvent resetEvent = new ManualResetEvent(false);

        static void Main(string[] args)
        {
            KNetCore.CreateGlobalInstance();
            var appArgs = KNetCore.FilteredArgs;

            if (appArgs.Length != 0)
            {
                serverToUse = args[0];
            }

            /**** Direct mode ******
            Properties props = new Properties();
            props.Put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
            props.Put(ProducerConfig.ACKS_CONFIG, "all");
            props.Put(ProducerConfig.RETRIES_CONFIG, 0);
            props.Put(ProducerConfig.LINGER_MS_CONFIG, 1);
            props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            ******/

            Properties props = ProducerConfigBuilder.Create()
                                                    .WithBootstrapServers(serverToUse)
                                                    .WithAcks(ProducerConfig.Acks.All)
                                                    .WithRetries(0)
                                                    .WithLingerMs(1)
                                                    .WithKeySerializerClass("org.apache.kafka.common.serialization.StringSerializer")
                                                    .WithValueSerializerClass("org.apache.kafka.common.serialization.StringSerializer")
                                                    .ToProperties();

            Console.CancelKeyPress += Console_CancelKeyPress;
            Console.WriteLine("Press Ctrl-C to exit");

			using (KafkaProducer producer = new KafkaProducer(props))
			{
				int i = 0;
				while (!resetEvent.WaitOne(0))
				{
					var record = new ProducerRecord<string, string>(topicToUse, i.ToString(), i.ToString());
					var result = producer.Send(record);
					Console.WriteLine($"Producing: {record} with result: {result.Get()}");
					producer.Flush();
					i++;
				}
			}
        }

        private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
        {
            if (e.Cancel) resetEvent.Set();
        }
    }
}

The example above can be found in the templates package. Its behavior is:

  • during initialization prepares the properties,
  • create a producer using the properties
  • create ProducerRecord and send it
  • print out the produced data and the resulting RecordMetadata

Producer with Callback

A producer with Callback can be like the following one. In this example the reader can highlight a slightly difference from the corresponding Java code. Surf JVM callbacks to go into detail in the callback management from JVM.

using MASES.KNet;
using Org.Apache.Kafka™.Clients.Producer;
using Java.Util;
using System;
using System.Threading;

namespace MASES.KNetTemplate.KNetProducer
{
    class Program
    {
        const string theServer = "localhost:9092";
        const string theTopic = "myTopic";

        static string serverToUse = theServer;
        static string topicToUse = theTopic;

        static readonly ManualResetEvent resetEvent = new ManualResetEvent(false);

        static void Main(string[] args)
        {
            KNetCore.CreateGlobalInstance();
            var appArgs = KNetCore.FilteredArgs;

            if (appArgs.Length != 0)
            {
                serverToUse = args[0];
            }

            /**** Direct mode ******
            Properties props = new Properties();
            props.Put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
            props.Put(ProducerConfig.ACKS_CONFIG, "all");
            props.Put(ProducerConfig.RETRIES_CONFIG, 0);
            props.Put(ProducerConfig.LINGER_MS_CONFIG, 1);
            props.Put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            props.Put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            ******/

            Properties props = ProducerConfigBuilder.Create()
                                                    .WithBootstrapServers(serverToUse)
                                                    .WithAcks(ProducerConfig.Acks.All)
                                                    .WithRetries(0)
                                                    .WithLingerMs(1)
                                                    .WithKeySerializerClass("org.apache.kafka.common.serialization.StringSerializer")
                                                    .WithValueSerializerClass("org.apache.kafka.common.serialization.StringSerializer")
                                                    .ToProperties();

            Console.CancelKeyPress += Console_CancelKeyPress;
            Console.WriteLine("Press Ctrl-C to exit");

			using (KafkaProducer producer = new KafkaProducer(props))
			{
				int i = 0;
				using (var callback = new Callback((o1, o2) =>
				{
					if (o2 != null) Console.WriteLine(o2.ToString());
					else Console.WriteLine($"Produced on topic {o1.Topic} at offset {o1.Offset}");
				}))
				{
					while (!resetEvent.WaitOne(0))
					{
						var record = new ProducerRecord<string, string>(topicToUse, i.ToString(), i.ToString());
						var result = producer.Send(record, callback);
						Console.WriteLine($"Producing: {record} with result: {result.Get()}");
						producer.Flush();
						i++;
					}
				}
			}
        }

        private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
        {
            if (e.Cancel) resetEvent.Set();
        }
    }
}

The example above can be found in the templates package. Its behavior is:

  • during initialization prepares the properties
  • create a producer using the properties
  • create ProducerRecord and send it using the API Send with the attached Callback
  • when the operation completed the Callback is called:
    • if an Exception was raised it will be printed out
    • otherwise the RecordMetadata is printed out
  • print out the produced data and the resulting RecordMetadata

Consumer example

A basic consumer can be like the following one:

using MASES.KNet;
using Org.Apache.Kafka™.Clients.Consumer;
using Java.Util;
using System;

namespace MASES.KNetTemplate.KNetConsumer
{
    class Program
    {
        const string theServer = "localhost:9092";
        const string theTopic = "myTopic";

        static string serverToUse = theServer;
        static string topicToUse = theTopic;

        static readonly ManualResetEvent resetEvent = new ManualResetEvent(false);

        static void Main(string[] args)
        {
            KNetCore.CreateGlobalInstance();
            var appArgs = KNetCore.FilteredArgs;

            if (appArgs.Length != 0)
            {
                serverToUse = args[0];
            }

            /**** Direct mode ******
            Properties props = new Properties();
            props.Put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverToUse);
            props.Put(ConsumerConfig.GROUP_ID_CONFIG, "test");
            props.Put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.Put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            props.Put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.Put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            *******/

            Properties props = ConsumerConfigBuilder.Create()
                                                    .WithBootstrapServers(serverToUse)
                                                    .WithGroupId("test")
                                                    .WithEnableAutoCommit(true)
                                                    .WithAutoCommitIntervalMs(1000)
                                                    .WithKeyDeserializerClass("org.apache.kafka.common.serialization.StringDeserializer")
                                                    .WithValueDeserializerClass("org.apache.kafka.common.serialization.StringDeserializer")
                                                    .ToProperties();

            Console.CancelKeyPress += Console_CancelKeyPress;
            Console.WriteLine("Press Ctrl-C to exit");

            using (var consumer = new KafkaConsumer<string, string>(props))
            {
                var topics = Collections.Singleton(topicToUse);
                consumer.Subscribe(topics);
                while (!resetEvent.WaitOne(0))
                {
                    var records = consumer.Poll((long)TimeSpan.FromMilliseconds(200).TotalMilliseconds);
                    foreach (var item in records)
                    {
                        Console.WriteLine($"Offset = {item.Offset}, Key = {item.Key}, Value = {item.Value}");
                    }
                }
                topics?.Dispose(); // needed to avoid Java.Lang.NullPointerException in some conditions where .NET GC retires topics too early
            }
        }

        private static void Console_CancelKeyPress(object sender, ConsoleCancelEventArgs e)
        {
            if (e.Cancel) resetEvent.Set();
        }
    }
}

The example above can be found in the templates package. Its behavior is:

  • during initialization prepares the properties,
  • create a consumer using the properties
  • subscribe and starts consume
  • when data are received it logs to the console the information.
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 is compatible. 
.NET Framework net462 is compatible.  net463 was computed.  net47 was computed.  net471 was computed.  net472 was computed.  net48 was computed.  net481 was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages (5)

Showing the top 5 NuGet packages that depend on MASES.KNet:

Package Downloads
MASES.EntityFrameworkCore.KNet.Serialization

EntityFrameworkCore KNet - Serialization support for EntityFrameworkCore provider for Apache Kafka

MASES.KNet.Serialization.Avro

Avro Serializer/Deserializer of .NET suite for Apache Kafka. KNet is a comprehensive .NET suite for Apache Kafka providing all features: Producer, Consumer, Admin, Streams, Connect, backends (ZooKeeper and Kafka).

MASES.KNet.Serialization.Json

Json Serializer/Deserializer of .NET suite for Apache Kafka. KNet is a comprehensive .NET suite for Apache Kafka providing all features: Producer, Consumer, Admin, Streams, Connect, backends (ZooKeeper and Kafka).

MASES.KNet.Serialization.MessagePack

MessagePack Serializer/Deserializer of .NET suite for Apache Kafka. KNet is a comprehensive .NET suite for Apache Kafka providing all features: Producer, Consumer, Admin, Streams, Connect, backends (ZooKeeper and Kafka).

MASES.KNet.Serialization.Protobuf

Protobuf Serializer/Deserializer of .NET suite for Apache Kafka. KNet is a comprehensive .NET suite for Apache Kafka providing all features: Producer, Consumer, Admin, Streams, Connect, backends (ZooKeeper and Kafka).

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last updated
2.9.0 467 12/23/2024
2.8.2 63,130 11/5/2024
2.8.1 143,936 9/20/2024
2.8.0 41,437 8/6/2024
2.7.10 380 11/5/2024
2.7.9 581 9/20/2024
2.7.8 13,908 7/31/2024
2.7.7 5,836 7/30/2024
2.7.6 323 7/29/2024
2.7.5 32,296 7/2/2024
2.7.4 55,468 6/27/2024
2.7.3 28,219 6/24/2024
2.7.2 41,942 5/25/2024
2.7.1 14,000 5/18/2024
2.7.0 904 5/16/2024
2.6.7 375 11/5/2024
2.6.6 452 9/20/2024
2.6.5 538 9/16/2024
2.6.4 456 6/14/2024
2.6.3 399 6/11/2024
2.6.2 654 5/17/2024
2.6.1 20,302 5/3/2024
2.6.0 139,699 3/1/2024
2.5.0 698 2/28/2024
2.4.3 58,343 2/11/2024
2.4.2 47,398 1/27/2024
2.4.1 11,707 1/21/2024
2.4.0 386 1/20/2024
2.3.0 93,021 11/25/2023
2.2.0 48,516 10/19/2023
2.1.3 26,547 10/11/2023
2.1.2 26,740 10/6/2023
2.1.1 1,427 10/5/2023
2.1.0 994 9/27/2023
2.0.2 135,944 8/2/2023
2.0.1 10,082 7/11/2023
2.0.0 37,438 7/8/2023
1.5.5 32,647 7/1/2023
1.5.4 25,601 5/28/2023
1.5.3 46,258 4/16/2023
1.5.2 33,315 4/11/2023
1.5.1 65,175 3/15/2023
1.5.0 55,807 2/9/2023
1.4.8 143,698 11/28/2022
1.4.7 820 11/23/2022
1.4.6 912 11/22/2022
1.4.5 768 11/21/2022
1.4.4 19,831 11/1/2022
1.4.3 19,997 10/21/2022
1.4.2 909 10/17/2022
1.4.1 1,490 10/10/2022
1.4.0 52,741 10/6/2022
1.3.6 36,963 9/19/2022
1.3.5 54,319 9/8/2022
1.3.4 66,345 8/18/2022
1.3.3 960 8/5/2022
1.3.2 963 6/19/2022
1.3.1 35,642 5/23/2022
1.2.4 1,113 5/11/2022
1.2.3 1,008 5/7/2022
1.2.2 998 5/2/2022
1.2.1 6,079 3/28/2022
1.2.0 1,583 3/20/2022