Akka.Streams.Kafka
1.5.30
Prefix Reserved
dotnet add package Akka.Streams.Kafka --version 1.5.30
NuGet\Install-Package Akka.Streams.Kafka -Version 1.5.30
<PackageReference Include="Akka.Streams.Kafka" Version="1.5.30" />
paket add Akka.Streams.Kafka --version 1.5.30
#r "nuget: Akka.Streams.Kafka, 1.5.30"
// Install Akka.Streams.Kafka as a Cake Addin #addin nuget:?package=Akka.Streams.Kafka&version=1.5.30 // Install Akka.Streams.Kafka as a Cake Tool #tool nuget:?package=Akka.Streams.Kafka&version=1.5.30
Akka Streams Kafka
Akka Streams Kafka is an Akka Streams connector for Apache Kafka. This is a port of the Alpakka Kafka project (https://github.com/akka/alpakka-kafka).
Library is based on Confluent.Kafka driver, and implements Sources, Sinks and Flows to handle Kafka message streams. All stages are build with Akka.Streams advantages in mind:
- There is no constant Kafka topics pooling: messages are consumed on demand, and with back-pressure support
- There is no internal buffering: consumed messages are passed to the downstream in realtime, and producer stages publish messages to Kafka as soon as get them from upstream
- Each stage can make use of it's own
IConsumer
orIProducer
instance, or can share them (can be used for optimization) - All Kafka failures can be handled with usual stream error handling strategies
Builds
Producer
A producer publishes messages to Kafka topics. The message itself contains information about what topic and partition to publish to so you can publish to different topics with the same producer.
Settings
When creating a producer stream you need to pass in ProducerSettings
that defines things like:
- bootstrap servers of the Kafka cluster
- serializers for the keys and values
- tuning parameters
var producerSettings = ProducerSettings<Null, string>.Create(system, null, null)
.WithBootstrapServers("localhost:9092");
// OR you can use Config instance
var config = system.Settings.Config.GetConfig("akka.kafka.producer");
var producerSettings = ProducerSettings<Null, string>.Create(config, null, null)
.WithBootstrapServers("localhost:9092");
NOTE:
Specifying
null
as a key/value serializer uses default serializer for key/value type. Built-in serializers are available inConfluent.Kafka.Serializers
class.
By default when creating ProducerSettings
with the ActorSystem parameter it uses the config section akka.kafka.producer
.
Defining Kafka Properties Directly Inside HOCON
You can embed Kafka properties directly inside the HOCON configuration by declaring them inside the kafka-clients
section:
akka.kafka.producer.kafka-clients {
bootstrap.servers = "localhost:9092"
client.id = client-1
enable.idempotence = true
}
Importing Confluent.Kafka.ProducerConfig Directly Into ProducerSettings
Working with ProducerConfig
is a lot more convenient than having to use the ProducerSettings.WithProperty
method because you don't have to memorize all of Kafka property names.
You can import ProducerConfig
directly into Akka.Streams.Kafka.ProducerSettings
by using the convenience method ProducerSettings.WithProducerConfig
to import all of the defined Kafka properties.
var config = new ProducerConfig
{
BootstrapServers = "localhost:9092",
ClientId = "client1",
EnableIdempotence = true
};
var settings = ProducerSettings<string, string>.Create(system, null, null)
.WithProducerConfig(config);
Default Producer HOCON Settings
akka.kafka.producer {
# Tuning parameter of how many sends that can run in parallel.
parallelism = 100
# How long to wait for `Producer.Flush`
flush-timeout = 10s
# Fully qualified config path which holds the dispatcher configuration
# to be used by the producer stages. Some blocking may occur.
# When this value is empty, the dispatcher configured for the stream
# will be used.
use-dispatcher = "akka.kafka.default-dispatcher"
# Properties defined by Confluent.Kafka.ProducerConfig
# can be defined in this configuration section.
kafka-clients {
}
}
PlainSink
KafkaProducer.PlainSink
is the easiest way to publish messages. The sink consumes ProducerRecord
elements which contains a topic name to which the record is being sent, an optional partition number, and an optional key, and a value.
Source
.From(Enumerable.Range(1, 100))
.Select(c => c.ToString())
.Select(elem => new ProducerRecord<TKey, string>(topic, elem.ToString()))
.RunWith(KafkaProducer.PlainSink(producerSettings), materializer);
The materialized value of the sink is a Task
which is completed with result when the stream completes or with exception if an error occurs.
Producer as a Flow
Sometimes there is a need for publishing messages in the middle of the stream processing, not as the last step, and then you can use KafkaProducer.FlexiFlow
.
Source
.Cycle(() => Enumerable.Range(1, 100).GetEnumerator())
.Select(c => c.ToString())
.Select(elem => ProducerMessage.Single(new ProducerRecord<Null, string>("akka100", elem)))
.Via(KafkaProducer.FlexiFlow<Null, string, NotUsed>(producerSettings))
.Select(result =>
{
var response = result as Result<Null, string, NotUsed>;
Console.WriteLine($"Producer: {response.Metadata.Topic}/{response.Metadata.Partition} {response.Metadata.Offset}: {response.Metadata.Value}");
return result;
})
.RunWith(Sink.Ignore<IResults<Null, string, NotUsed>>(), materializer);
This flow accepts implementations of Akka.Streams.Kafka.Messages.IEnvelope
and return Akka.Streams.Kafka.Messages.IResults
elements.
IEnvelope
elements contain an extra field to pass through data, the so called passThrough
.
Its value is passed through the flow and becomes available in the ProducerMessage.Results
’s PassThrough
.
It can for example hold a Akka.Streams.Kafka.Messages.CommittableOffset
or Akka.Streams.Kafka.Messages.CommittableOffsetBatch
(from a KafkaConsumer.CommittableSource
)
that can be committed after publishing to Kafka:
DrainingControl<NotUsed> control = KafkaConsumer.CommittableSource(consumerSettings, Subscriptions.Topics(topic1))
.Select(message =>
{
return ProducerMessage.Single(
new ProducerRecord<Null, string>(topic1, message.Record.Key, message.Record.Value),
message.CommitableOffset as ICommittable // the passThrough
);
})
.Via(KafkaProducer.FlexiFlow<Null, string, ICommittable>(ProducerSettings))
.Select(m => m.PassThrough)
.ToMaterialized(Committer.Sink(CommitterSettings), Keep.Both)
.MapMaterializedValue(DrainingControl<NotUsed>.Create)
.Run(Materializer);
Produce a single message to Kafka
To create one message to a Kafka topic, use the Akka.Streams.Kafka.Messages.Message
implementation of IEnvelop
.
It can be created with ProducerMessage.Single
helper:
IEnvelope<TKey, TValue, TPassThrough> single = ProducerMessage.Single(
new ProducerRecord<Null, string>("topic", key, value),
passThrough)
The flow with ProducerMessage.Message
will continue as ProducerMessage.Result
elements containing:
- the original input Message
- the record metadata, and
- access to
PassThrough
within the message
Let one stream element produce multiple messages to Kafka
The ProducerMessage.MultiMessage
implementation of IEnvelope
contains a list of ProducerRecord
s to produce multiple messages to Kafka topics:
var multiMessage = ProducerMessage.Multi(new[]
{
new ProducerRecord<string, string>(topic2, record.Key, record.Value),
new ProducerRecord<string, string>(topic3, record.Key, record.Value)
}.ToImmutableSet(), passThrough);
The flow with ProducerMessage.MultiMessage
will continue as ProducerMessage.MultiResult
elements containing:
- a list of
MultiResultPart
with- the original input message
- the record metadata
- the
PassThrough
data
Let a stream element pass through, without producing a message to Kafka
The ProducerMessage.PassThroughMessage
allows to let an element pass through a Kafka flow without producing a new message to a Kafka topic.
This is primarily useful with Kafka commit offsets and transactions, so that these can be committed without producing new messages.
var passThroughMessage = ProducerMessage.PassThrough<string, string>(passThrough);
For flows the ProducerMessage.PassThroughMessage
s continue as ProducerMessage.PassThroughResult
elements containing the passThrough
data.
Sharing the IProducer
instance
Sometimes you may need to make use of already existing Confluent.Kafka.IProducer
instance (i.e. for integration with existing code).
Each of the KafkaProducer
methods has an overload accepting IProducer
as a parameter.
Consumer
A consumer subscribes to Kafka topics and passes the messages into an Akka Stream.
Settings
When creating a consumer stream you need to pass in ConsumerSettings
that define things like:
- de-serializers for the keys and values
- bootstrap servers of the Kafka cluster
- group id for the consumer, note that offsets are always committed for a given consumer group
- tuning parameters
var consumerSettings = ConsumerSettings<Null, string>.Create(system, null, Serializers.Int32)
.WithBootstrapServers("localhost:9092")
.WithGroupId("group1"); // Specifying GroupId is required before starting stream - otherwise you will get an exception at runtime
// OR you can use Config instance
var config = system.Settings.Config.GetConfig("akka.kafka.consumer");
var consumerSettings = ConsumerSettings<Null, string>.Create(config, null, Serializers.Int32)
.WithBootstrapServers("localhost:9092")
.WithGroupId("group1"); // Specifying GroupId is required before starting stream - otherwise you will get an exception at runtime
As with producer settings, they are loaded from akka.kafka.consumer
of configuration file (or custom Config
instance provided).
Defining Kafka Properties Directly Inside HOCON
You can embed Kafka properties directly inside the HOCON configuration by declaring them inside the kafka-clients
section:
akka.kafka.consumer.kafka-clients {
bootstrap.servers = "localhost:9092"
client.id = client-1
group.id = group-1
}
Importing Confluent.Kafka.ConsumerConfig Directly Into ConsumerSettings
Working with ConsumerConfig
is a lot more convenient than having to use the ConsumerSettings.WithProperty
method because you don't have to memorize all of Kafka property names.
You can import ConsumerConfig
directly into Akka.Streams.Kafka.ConsumerSettings
by using the convenience method ConsumerSettings.WithConsumerConfig
to import all of the defined Kafka properties.
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Latest,
EnableAutoCommit = true,
GroupId = "group1",
ClientId = "client1"
};
var settings = ConsumerSettings<string, string>.Create(actorSystem, null, null)
.WithConsumerConfig(config);
Default Consumer HOCON Settings
# Properties for akka.kafka.ConsumerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.consumer {
# Tuning property of scheduled polls.
# Controls the interval from one scheduled poll to the next.
poll-interval = 50ms
# Tuning property of the `KafkaConsumer.poll` parameter.
# Note that non-zero value means that the thread that
# is executing the stage will be blocked. See also the `wakup-timeout` setting below.
poll-timeout = 50ms
# The stage will delay stopping the internal actor to allow processing of
# messages already in the stream (required for successful committing).
# This can be set to 0 for streams using `DrainingControl`.
stop-timeout = 30s
# If offset commit requests are not completed within this timeout
# the returned Future is completed `CommitTimeoutException`.
# The `Transactional.source` waits this ammount of time for the producer to mark messages as not
# being in flight anymore as well as waiting for messages to drain, when rebalance is triggered.
commit-timeout = 15s
# If commits take longer than this time a warning is logged
commit-time-warning = 1s
# Not relevant for Kafka after version 2.1.0.
# If set to a finite duration, the consumer will re-send the last committed offsets periodically
# for all assigned partitions. See https://issues.apache.org/jira/browse/KAFKA-4682.
commit-refresh-interval = infinite
buffer-size = 128
# Fully qualified config path which holds the dispatcher configuration
# to be used by the KafkaConsumerActor. Some blocking may occur.
use-dispatcher = "akka.kafka.default-dispatcher"
# Properties defined by Confluent.Kafka.ConsumerConfig
# can be defined in this configuration section.
kafka-clients {
# Disable auto-commit by default
enable.auto.commit = false
}
# Time to wait for pending requests when a partition is closed
wait-close-partition = 500ms
# Limits the query to Kafka for a topic's position
position-timeout = 5s
# When using `AssignmentOffsetsForTimes` subscriptions: timeout for the
# call to Kafka's API
offset-for-times-timeout = 5s
# Timeout for akka.kafka.Metadata requests
# This value is used instead of Kafka's default from `default.api.timeout.ms`
# which is 1 minute.
metadata-request-timeout = 5s
# Interval for checking that transaction was completed before closing the consumer.
# Used in the transactional flow for exactly-once-semantics processing.
eos-draining-check-interval = 30ms
# Issue warnings when a call to a partition assignment handler method takes
# longer than this.
partition-handler-warning = 5s
}
PlainSource
To consume messages without committing them you can use KafkaConsumer.PlainSource
method. This will emit consumed messages of ConsumeResult
type.
Note: When using this source, you need to store consumer offset externally - it does not have support of committing offsets to Kafka.
var subscription = Subscriptions.Assignment(new TopicPartition("akka", 0));
KafkaConsumer.PlainSource(consumerSettings, subscription)
.RunForeach(result =>
{
Console.WriteLine($"Consumer: {result.Topic}/{result.Partition} {result.Offset}: {result.Value}");
}, materializer);
PlainExternalSource
Special source that can use an external KafkaConsumerActor
. This is useful when you have
a lot of manually assigned topic-partitions and want to keep only one kafka consumer.
You can create reusable consumer actor reference like this:
var consumer = Sys.ActorOf(KafkaConsumerActorMetadata.GetProps(consumerSettings));
CommittableSource
The KafkaConsumer.CommittableSource
makes it possible to commit offset positions to Kafka.
If you need to store offsets in anything other than Kafka, PlainSource
should be used instead of this API.
This is useful when “at-least once delivery” is desired, as each message will likely be delivered one time but in failure cases could be duplicated:
KafkaConsumer.CommittableSource(consumerSettings, Subscriptions.Topics("topic1"))
.SelectAsync(1, async elem =>
{
await elem.CommitableOffset.Commit();
return Done.Instance;
})
.RunWith(Sink.Ignore<Done>(), _materializer);
The above example uses separate SelectAsync
stages for processing and committing. This guarantees that for parallelism higher than 1 we will keep correct ordering of messages sent for commit.
Committing the offset for each message as illustrated above is rather slow. It is recommended to batch the commits for better throughput, with the trade-off that more messages may be re-delivered in case of failures.
PlainPartitionedSource
The PlainPartitionedSource
is a way to track automatic partition assignment from Kafka.
When a topic-partition is assigned to a consumer, this source will emit tuples with the assigned topic-partition and a corresponding source of ConsumerRecord
s.
When a topic-partition is revoked, the corresponding source completes.
var control = KafkaConsumer.PlainPartitionedSource(consumerSettings, Subscriptions.Topics(topic))
.GroupBy(3, tuple => tuple.Item1)
.SelectAsync(8, async tuple =>
{
var (topicPartition, source) = tuple;
Log.Info($"Sub-source for {topicPartition}");
var sourceMessages = await source
.Scan(0, (i, message) => i + 1)
.Select(i => LogReceivedMessages(topicPartition, i))
.RunWith(Sink.Last<long>(), Materializer);
Log.Info($"{topicPartition}: Received {sourceMessages} messages in total");
return sourceMessages;
})
.MergeSubstreams()
.As<Source<long, IControl>>()
.Scan(0L, (i, subValue) => i + subValue)
.ToMaterialized(Sink.Last<long>(), Keep.Both)
.MapMaterializedValue(DrainingControl<long>.Create)
.Run(Materializer);
CommitWithMetadataSource
The CommitWithMetadataSource
makes it possible to add additional metadata (in the form of a string)
when an offset is committed based on the record. This can be useful (for example) to store information about which
node made the commit, what time the commit was made, the timestamp of the record etc.
string MetadataFromMessage<K, V>(ConsumeResult<K, V> message) => message.Offset.ToString();
KafkaConsumer.CommitWithMetadataSource(settings, Subscriptions.Topics("topic"), MetadataFromMessage)
.ToMaterialized(Sink.Ignore<CommittableMessage<Null, string>>(), Keep.Both)
.Run(Materializer);
SourceWithOffsetContext
This source emits <see cref="ConsumeResult{TKey,TValue}"/> together with the offset position as flow context, thus makes it possible to commit offset positions to Kafka.
This is useful when "at-least once delivery" is desired, as each message will likely be delivered one time but in failure cases could be duplicated.
It is intended to be used with KafkaProducer.FlowWithContext
and/or Committer.SinkWithOffsetContext
var control = KafkaConsumer.SourceWithOffsetContext(consumerSettings, Subscriptions.Topics("topic1"))
// Having committable offset as a context now, and passing plain record to the downstream
.Select(record =>
{
IEnvelope<string, string, NotUsed> output = ProducerMessage.Single(new ProducerRecord<string, string>("topic2", record.Key, record.Value));
return output;
})
// Producing message with maintaining the context
.Via(KafkaProducer.FlowWithContext<string, string, ICommittableOffset>(producerSettings))
.AsSource()
// Using Committer.SinkWithOffsetContext to commit messages using offset stored in flow context
.ToMaterialized(Committer.SinkWithOffsetContext<IResults<string, string, ICommittableOffset>>(committerSettings), Keep.Both)
.MapMaterializedValue(tuple => DrainingControl<NotUsed>.Create(tuple.Item1, tuple.Item2.ContinueWith(t => NotUsed.Instance)))
.Run(Materializer);
CommittableExternalSource
Like PlainExternalSource
, allows to use external KafkaConsumerActor
(see documentation above).
CommittablePartitionedSource
Same as PlainPartitionedSource
but with committable offset support.
AtMostOnceSource
Convenience for "at-most once delivery" semantics. The offset of each message is committed to Kafka before being emitted downstream.
CommitWithMetadataPartitionedSource
The same as PlainPartitionedSource
but with offset commit with metadata support.
PlainPartitionedManualOffsetSource
The PlainPartitionedManualOffsetSource
is similar to PlainPartitionedSource
but allows the use of an offset store outside of Kafka, while retaining the automatic partition assignment.
When a topic-partition is assigned to a consumer, the getOffsetsOnAssign
function will be called to retrieve the offset, followed by a seek to the correct spot in the partition.
The onRevoke
function gives the consumer a chance to store any uncommitted offsets, and do any other cleanup
that is required.
var source = KafkaConsumer.PlainPartitionedManualOffsetSource(consumerSettings, Subscriptions.Topics(topic),
assignedPartitions =>
{
// Handle assigned partitions
},
revokedPartitions =>
{
// Handle partitions that are revoked
})
// Pass message values down to the stream
.Select(m => m.Value);
Transactional Producers and Consumers
Are not implemented yet. Waiting for issue https://github.com/akkadotnet/Akka.Streams.Kafka/issues/85 to be resolved.
Partition Events handling
Sometimes you may need to add custom handling for partition events, like assigning partition to consumer. To do that, you will need:
- To write custom implementation of
IPartitionEventHandler
interface:
class CustomEventsHandler : IPartitionEventHandler
{
/// <inheritdoc />
public void OnRevoke(IImmutableSet<TopicPartitionOffset> revokedTopicPartitions, IRestrictedConsumer consumer)
{
// Your code here
}
/// <inheritdoc />
public void OnAssign(IImmutableSet<TopicPartition> assignedTopicPartitions, IRestrictedConsumer consumer)
{
// Your code here
}
/// <inheritdoc />
public void OnStop(IImmutableSet<TopicPartition> topicPartitions, IRestrictedConsumer consumer)
{
// Your code here
}
}
Here IRestrictedConsumer
is an object providing access to some limited API of internal consumer kafka client.
- Use
WithPartitionEventsHandler
ofTopic
/TopicPartition
subscriptions, like this:
var customHandler = new CustomEventsHandler();
KafkaConsumer.PlainSource(settings, Subscriptions.Topics(yourTopic).WithPartitionEventsHandler(customHandler));
Note: Your handler callbacks will be invoked in the same thread where kafka consumer is handling all events and getting messages, so be careful when using it.
Error handling
Akka.Streams.Kafka stages utilizes stream supervision deciders to dictate what happens when a failure or
exception is thrown from inside the stream stage. These deciders are basically delegate functions that
returns an Akka.Streams.Supervision.Directive
enumeration to tell the stage how to behave when a
specific exception occured during the stream lifetime.
You can read more about stream supervision strategies in the Akka documentation
NOTE:
A decider applied to a stream using
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(decider))
will be used for the whole stream, any exception that happened in any of the stream stages will use the same decider to determine their fault behavior.
Producer error handling
The Akka.Streams.Kafka producers are using a default convenience error handling class called
Akka.Streams.Kafka.Supervision.DefaultProducerDecider
. This supervision decider uses these strategies
by default:
- Any
ProduceException
with itsIsFatal
flag set will use a hard wiredDirective.Stop
. - Any
ProduceException
that is classified as a serialization error will use aDirective.Stop
. This behavior can be overriden. - Any other
ProduceException
will use aDirective.Stop
. This behavior can be overriden. - Any
KafkaRetriableException
will use a hard wiredDirective.Resume
. This behavior assumes that this exception is a transient exception. - Any
KafkaException
will use aDirective.Stop
. This behavior can be overriden. - Any
Exception
will use aDirective.Stop
. This behavior can be overriden.
To create a custom decider, you will need to extend the DefaultProducerDecider
:
private class CustomProducerDecider<K, V> : DefaultProducerDecider<K, V>
{
protected override Directive OnSerializationError(ProduceException<K, V> exception)
{
// custom logic can go here
return Directive.Resume;
}
protected override Directive OnProduceException(ProduceException<TKey, TValue> exception)
{
// custom logic can go here
return Directive.Resume;
}
protected virtual Directive OnKafkaException(KafkaException exception)
{
// custom logic can go here
return Directive.Stop;
}
protected virtual Directive OnException(Exception exception)
{
// custom logic can go here
return Directive.Stop;
}
}
You then register this new decider on to the stream using a stream attribute:
var decider = new CustomProducerDecider<Null, int>();
var topicPartition = new TopicPartition("my-topic", 0);
await Source.From(new []{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
.Select(elem => new ProducerRecord<Null, string>(topicPartition, elem))
.RunWith(
KafkaProducer.PlainSink(ProducerSettings)
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(decider.Decide)),
System.Materializer());
In this case, the decider is applied only to the PlainSink
stream, it is not propagated to the stream
using it.
Consumer error handling
The Akka.Streams.Kafka consumers are using a default convenience error handling class called
Akka.Streams.Kafka.Supervision.DefaultConsumerDecider
. This supervision decider uses these strategies
by default:
Any
ConsumeException
with itsIsFatal
flag set will use a hard wiredDirective.Stop
.As of the writing of this document, there are no fatal
ConsumeException
. Fatal exceptions are only thrown by producers that requires idempotence guarantee or requires transactions.Any
ConsumeException
that returns an error code ofErrorCode.UnknownTopicOrPart
inside a kafka stream withauto.create.topics.enable
enabled will use aDirective.Resume
.This behavior assumes that the consumer client started before the producer was running and the broker did not have the topic or partition created yet.
Any
ConsumeException
that is classified as a deserialization error will use aDirective.Stop
. This behavior can be overriden.Any other
ConsumeException
will use aDirective.Resume
. This behavior can be overriden.Any
KafkaRetriableException
will use a hard wiredDirective.Resume
. This behavior assumes that this exception is a transient exception.Any
KafkaException
will use aDirective.Resume
. This behavior can be overriden.Any
Exception
will use aDirective.Stop
. This behavior can be overriden.
A Directive.Resume
is chosen as default because a fatal and data compromising error very rarely
happened during a Consumer.Consume
. The most common exceptions that are thrown during a consume are
kafka configuration errors.
To create a custom decider, you will need to extend the DefaultConsumerDecider
:
private class CustomConsumerDecider : DefaultConsumerDecider
{
protected override Directive OnDeserializationError(ConsumeException exception)
{
// custom logic can go here
return Directive.Resume;
}
protected override Directive OnConsumeException(ConsumeException exception)
{
// custom logic can go here
return Directive.Resume;
}
protected virtual Directive OnKafkaException(KafkaException exception)
{
// custom logic can go here
return Directive.Resume;
}
protected virtual Directive OnException(Exception exception)
{
// custom logic can go here
return Directive.Stop;
}
}
You then register this new decider on to the stream using a stream attribute:
var decider = new CustomConsumerDecider();
var topicPartition = new TopicPartition("my-topic", 0);
var publisher = KafkaConsumer
.PlainSource(settings, Subscriptions.Assignment(topicPartition))
.WithAttributes(ActorAttributes.CreateSupervisionStrategy(decider.Decide))
.Select(c => c.Value)
.RunWith(Sink.Publisher<int>(), System.Materializer());
Handling de/serialization exceptions
In the producer side, any serialization errors will be routed to the
OnSerializationError(ProduceException<K, V> exception)
callback function. The original message will be
embedded inside the ProduceException.DeliveryResult.Message
property if analysis were needed to
determine the cause of the serialization failure. A key serialization failure will have an error code
of ErrorCode.Local_KeySerialization
, while a value serialization failure will have an error code
of ErrorCode.Local_ValueSerialization
.
In the consumer side, any deserialization errors will be routed to the
OnDeserializationError(ConsumeException exception)
callback function. The consumed message will be
embedded inside the ConsumeException.ConsumerRecord
property as a ConsumeResult<byte[], byte[]>
instance. You can inspect the raw byte arrays to determine the cause of the failure. A key
deserialization failure will have an error code of ErrorCode.Local_KeyDeserialization
, while a value
deserialization failure will have an error code of ErrorCode.Local_ValueDeserialization
.
Local development
There are some helpers to simplify local development
Tests: File logging
Sometimes it is useful to have all logs written to a file in addition to console.
There is a built-in file logger, that will be added to default Akka.NET loggers if you will set AKKA_STREAMS_KAFKA_TEST_FILE_LOGGING
environment variable on your local system to any value.
When set, all logs will be written to logs
subfolder near to your test assembly, one file per test. Here is how log file name is generated:
public readonly string LogPath = $"logs\\{DateTime.Now:yyyy-MM-dd_HH-mm-ss}_{Guid.NewGuid():N}.txt";
Tests: Kafka container reuse
By default, tests are configured to be friendly to CI - that is, before starting tests docker Kafka images will be downloaded (if not yet exist) and containers started, and after all tests finish full cleanup will be performed (except the fact that downloaded docker images will not be removed).
While this might be useful when running tests locally, there are situations when you would like to save startup/shutdown tests time by using some pre-existing container, that will be used for all test runs and will not be stopped/started each time.
To achieve that, set AKKA_STREAMS_KAFKA_TEST_CONTAINER_REUSE
environment variable on your local machine to any value. This will force using existing Kafka container, listening on port 29092
. Use docker-compose up
console command in the root of project folder to get this container up and running.
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
.NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.0 is compatible. netstandard2.1 was computed. |
.NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen40 was computed. tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- Akka.Streams (>= 1.5.30)
- Confluent.Kafka (>= 2.4.0)
NuGet packages (1)
Showing the top 1 NuGet packages that depend on Akka.Streams.Kafka:
Package | Downloads |
---|---|
AkkaDotModule.Webnori
Package Description |
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
1.5.30 | 624 | 10/3/2024 |
1.5.29 | 140 | 10/1/2024 |
1.5.15 | 7,536 | 1/11/2024 |
1.5.13.1 | 1,547 | 10/4/2023 |
1.5.13 | 143 | 10/4/2023 |
1.5.8 | 4,708 | 6/16/2023 |
1.5.0 | 108,669 | 3/4/2023 |
1.4.49 | 2,872 | 1/27/2023 |
1.2.2 | 12,731 | 10/21/2022 |
1.2.1 | 73,008 | 1/26/2022 |
1.2.0 | 1,240 | 11/17/2021 |
1.1.4 | 4,257 | 9/8/2021 |
1.1.3 | 2,493 | 5/24/2021 |
1.1.2 | 29,797 | 3/9/2021 |
1.1.1 | 3,335 | 12/16/2020 |
1.1.0 | 4,879 | 11/6/2020 |
1.0.1 | 2,530 | 5/27/2020 |
1.0.0 | 3,407 | 3/11/2020 |
1.0.0-rc2 | 399 | 3/10/2020 |
1.0.0-rc1 | 392 | 3/2/2020 |
1.0.0-beta2 | 622 | 2/18/2020 |
1.0.0-beta1 | 2,124 | 11/28/2019 |
0.5.0-beta | 459 | 8/7/2019 |
[Upgraded to Akka.NET v1.5.30](https://github.com/akkadotnet/akka.net/releases/tag/1.5.30)
[Bump Confluent.Kafka to 2.4.0](https://github.com/akkadotnet/Akka.Streams.Kafka/pull/402)