Apache.Iggy 0.6.0

There is a newer prerelease version of this package available.
See the version list below for details.
dotnet add package Apache.Iggy --version 0.6.0
                    
NuGet\Install-Package Apache.Iggy -Version 0.6.0
                    
This command is intended to be used within the Package Manager Console in Visual Studio, as it uses the NuGet module's version of Install-Package.
<PackageReference Include="Apache.Iggy" Version="0.6.0" />
                    
For projects that support PackageReference, copy this XML node into the project file to reference the package.
<PackageVersion Include="Apache.Iggy" Version="0.6.0" />
                    
Directory.Packages.props
<PackageReference Include="Apache.Iggy" />
                    
Project file
For projects that support Central Package Management (CPM), copy this XML node into the solution Directory.Packages.props file to version the package.
paket add Apache.Iggy --version 0.6.0
                    
#r "nuget: Apache.Iggy, 0.6.0"
                    
#r directive can be used in F# Interactive and Polyglot Notebooks. Copy this into the interactive tool or source code of the script to reference the package.
#:package Apache.Iggy@0.6.0
                    
#:package directive can be used in C# file-based apps starting in .NET 10 preview 4. Copy this into a .cs file before any lines of code to reference the package.
#addin nuget:?package=Apache.Iggy&version=0.6.0
                    
Install as a Cake Addin
#tool nuget:?package=Apache.Iggy&version=0.6.0
                    
Install as a Cake Tool

C# SDK for Iggy

<div align="center">

Nuget (with prereleases)

</div>

Overview

The Apache Iggy C# SDK provides a comprehensive client library for interacting with Iggy message streaming servers. It offers a modern, async-first API with support for multiple transport protocols and comprehensive message streaming capabilities.

Getting Started

Installation

Install the NuGet package:

dotnet add package Apache.Iggy

Supported Protocols

The SDK supports two transport protocols:

  • TCP - Binary protocol for optimal performance and lower latency (recommended)
  • HTTP - RESTful JSON API for stateless operations

Creating a Client

The SDK is built around the IIggyClient interface. To create a client instance:

var loggerFactory = LoggerFactory.Create(builder =>
{
    builder
        .AddFilter("Apache.Iggy", LogLevel.Information)
        .AddConsole();
});

var client = IggyClientFactory.CreateClient(new IggyClientConfigurator
{
    BaseAddress = "127.0.0.1:8090",
    Protocol = Protocol.Tcp
}, loggerFactory);

await client.ConnectAsync();

The ILoggerFactory is required and used throughout the SDK for diagnostics and debugging.

Configuration

The IggyClientConfigurator provides comprehensive configuration options:

var client = IggyClientFactory.CreateClient(new IggyClientConfigurator
{
    BaseAddress = "127.0.0.1:8090",
    Protocol = Protocol.Tcp,

    // Buffer sizes (optional, default: 4096)
    ReceiveBufferSize = 4096,
    SendBufferSize = 4096,

    // TLS/SSL configuration
    TlsSettings = new TlsConfiguration
    {
        Enabled = true,
        Hostname = "iggy",
        CertificatePath = "/path/to/cert"
    },

    // Automatic reconnection with exponential backoff
    ReconnectionSettings = new ReconnectionSettings
    {
        Enabled = true,
        MaxRetries = 3,              // 0 = infinite retries
        InitialDelay = TimeSpan.FromSeconds(5),
        MaxDelay = TimeSpan.FromSeconds(30),
        WaitAfterReconnect = TimeSpan.FromSeconds(1),
        UseExponentialBackoff = true,
        BackoffMultiplier = 2.0
    },

    // Auto-login after connection
    AutoLoginSettings = new AutoLoginSettings
    {
        Enabled = true,
        Username = "your_username",
        Password = "your_password"
    }
}, loggerFactory);

await client.ConnectAsync();

Authentication

User Login

Begin by using the root account (note: the root account cannot be removed or updated):

var response = await client.LoginUser("iggy", "iggy");

Creating Users

Create new users with customizable permissions:

var permissions = new Permissions
{
    Global = new GlobalPermissions
    {
        ManageServers = true,
        ManageUsers = true,
        ManageStreams = true,
        ManageTopics = true,
        PollMessages = true,
        ReadServers = true,
        ReadStreams = true,
        ReadTopics = true,
        ReadUsers = true,
        SendMessages = true
    }
};

await client.CreateUser("test_user", "secure_password", UserStatus.Active, permissions);

// Login with the new user
var loginResponse = await client.LoginUser("test_user", "secure_password");

Personal Access Tokens

Create and use Personal Access Tokens (PAT) for programmatic access:

// Create a PAT
var patResponse = await client.CreatePersonalAccessTokenAsync("api-token", 3600);

// Login with PAT
await client.LoginWithPersonalAccessToken(patResponse.Token);

Streams and Topics

Creating Streams

await client.CreateStreamAsync("my-stream");

You can reference streams by either numeric ID or name:

var streamById = Identifier.Numeric(0);
var streamByName = Identifier.String("my-stream");

Creating Topics

Every stream contains topics for organizing messages:

var streamId = Identifier.String("my-stream");

await client.CreateTopicAsync(
    streamId,
    name: "my-topic",
    partitionsCount: 3,
    compressionAlgorithm: CompressionAlgorithm.None,
    replicationFactor: 1,
    messageExpiry: 0,  // 0 = never expire
    maxTopicSize: 0    // 0 = unlimited
);

Note: Stream and topic names use hyphens instead of spaces. Iggy automatically replaces spaces with hyphens.

Publishing Messages

Sending Messages

Send messages using the publisher interface:

var streamId = Identifier.String("my-stream");
var topicId = Identifier.String("my-topic");

var messages = new List<Message>
{
    new(Guid.NewGuid(), "Hello, Iggy!"u8.ToArray()),
    new(1, "Another message"u8.ToArray())
};

await client.SendMessagesAsync(
    streamId,
    topicId,
    Partitioning.None(),  // balanced partitioning
    messages
);

Partitioning Strategies

Control which partition receives each message:

// Balanced partitioning (default)
Partitioning.None()

// Send to specific partition
Partitioning.PartitionId(1)

// Key-based partitioning (string)
Partitioning.EntityIdString("user-123")

// Key-based partitioning (integer)
Partitioning.EntityIdInt(12345)

// Key-based partitioning (GUID)
Partitioning.EntityIdGuid(Guid.NewGuid())

User-Defined Headers

Add custom headers to messages with typed values:

var headers = new Dictionary<HeaderKey, HeaderValue>
{
    { new HeaderKey { Value = "correlation_id" }, HeaderValue.FromString("req-123") },
    { new HeaderKey { Value = "priority" }, HeaderValue.FromInt32(1) },
    { new HeaderKey { Value = "timeout" }, HeaderValue.FromInt64(5000) },
    { new HeaderKey { Value = "confidence" }, HeaderValue.FromFloat(0.95f) },
    { new HeaderKey { Value = "is_urgent" }, HeaderValue.FromBool(true) },
    { new HeaderKey { Value = "request_id" }, HeaderValue.FromGuid(Guid.NewGuid()) }
};

var messages = new List<Message>
{
    new(Guid.NewGuid(), "Message with headers"u8.ToArray(), headers)
};

await client.SendMessagesAsync(
    streamId,
    topicId,
    Partitioning.PartitionId(1),
    messages
);

Consumer Groups

Creating Consumer Groups

Coordinate message consumption across multiple consumers:

var groupResponse = await client.CreateConsumerGroupAsync(
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    "my-consumer-group"
);

Joining and Leaving Groups

Note: Join/Leave operations are only supported on TCP protocol and will throw FeatureUnavailableException on HTTP.

// Join a consumer group
await client.JoinConsumerGroupAsync(
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    Identifier.Numeric(1)  // consumer ID
);

// Leave a consumer group
await client.LeaveConsumerGroupAsync(
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    Identifier.Numeric(1)  // consumer ID
);

Consuming Messages

Fetching Messages

Fetch a batch of messages:

var polledMessages = await client.PollMessagesAsync(new MessageFetchRequest
{
    StreamId = streamId,
    TopicId = topicId,
    Consumer = Consumer.New(1), // or Consumer.Group("my-consumer-group") for consumer group
    Count = 10,
    PartitionId = 0, // optional, null for consumer group
    PollingStrategy = PollingStrategy.Next(),
    AutoCommit = true
});

foreach (var message in polledMessages.Messages)
{
    Console.WriteLine($"Message: {Encoding.UTF8.GetString(message.Payload)}");
}

Polling Strategies

Control where message consumption starts:

// Start from a specific offset
PollingStrategy.Offset(1000)

// Start from a specific timestamp (microseconds since epoch)
PollingStrategy.Timestamp(1699564800000000)

// Start from the first message
PollingStrategy.First()

// Start from the last message
PollingStrategy.Last()

// Start from the next unread message
PollingStrategy.Next()

Offset Management

Storing Offsets

Store the current consumer position:

await client.StoreOffsetAsync(
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    Identifier.Numeric(1),  // consumer ID
    0,                      // partition ID
    42                      // offset value
);

Retrieving Offsets

Get the current stored offset:

var offsetInfo = await client.GetOffsetAsync(
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    Identifier.Numeric(1),  // consumer ID
    0                       // partition ID
);

Console.WriteLine($"Current offset: {offsetInfo.Offset}");

Deleting Offsets

Clear stored offsets:

await client.DeleteOffsetAsync(
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    Identifier.Numeric(1),  // consumer ID
    0                       // partition ID
);

System Operations

Cluster Information

Get cluster metadata and node information:

var metadata = await client.GetClusterMetadataAsync();

Server Statistics

Retrieve server performance metrics:

var stats = await client.GetStatsAsync();

Health Checks

Verify server connectivity:

await client.PingAsync();

Client Information

Get information about connected clients:

var clients = await client.GetClientsAsync();
var currentClient = await client.GetMeAsync();

Event Subscription

Subscribe to connection events:

// Subscribe to connection events
client.SubscribeConnectionEvents(async connectionState =>
{
    Console.WriteLine($"Current connection state: {connectionState.CurrentState}");

    await SaveConnectionStateLog(connectionState.CurrentState);
});

// Unsubscribe
client.UnsubscribeConnectionEvents(handler);

Advanced: IggyPublisher

High-level publisher with background sending, retries, and encryption:

var publisher = IggyPublisherBuilder.Create(
    client,
    Identifier.String("my-stream"),
    Identifier.String("my-topic")
)
.WithBackgroundSending(enabled: true, batchSize: 100)
.WithRetry(maxAttempts: 3)
.Build();

await publisher.InitAsync();

var messages = new List<Message>
{
    new(Guid.NewGuid(), "Message 1"u8.ToArray()),
    new(0, "Message 2"u8.ToArray())
};

await publisher.SendMessages(messages);

// Wait for all messages to be sent
await publisher.WaitUntilAllSends();
await publisher.DisposeAsync();

For automatic object serialization, use the typed variant:

class OrderSerializer : ISerializer<Order>
{
    public byte[] Serialize(Order data) =>
        Encoding.UTF8.GetBytes(JsonSerializer.Serialize(data));
}

var publisher = IggyPublisherBuilder<Order>.Create(
    client,
    Identifier.String("orders-stream"),
    Identifier.String("orders-topic"),
    new OrderSerializer()
).Build();

await publisher.InitAsync();
await publisher.SendAsync(new List<Order> { /* ... */ });

Advanced: IggyConsumer

High-level consumer with automatic offset management and consumer groups:

var consumer = IggyConsumerBuilder.Create(
    client,
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    Consumer.New(1)
)
.WithPollingStrategy(PollingStrategy.Next())
.WithBatchSize(10)
.WithAutoCommitMode(AutoCommitMode.Auto)
.Build();

await consumer.InitAsync();

await foreach (var message in consumer.ReceiveAsync())
{
    var payload = Encoding.UTF8.GetString(message.Message.Payload);
    Console.WriteLine($"Offset {message.CurrentOffset}: {payload}");
}

For consumer groups (load-balanced across multiple consumers):

var consumer = IggyConsumerBuilder.Create(
    client,
    Identifier.String("my-stream"),
    Identifier.String("my-topic"),
    Consumer.Group("my-group")
)
.WithConsumerGroup("my-group", createIfNotExists: true)
.WithPollingStrategy(PollingStrategy.Next())
.WithAutoCommitMode(AutoCommitMode.AfterReceive)
.Build();

await consumer.InitAsync();

await foreach (var message in consumer.ReceiveAsync())
{
    Console.WriteLine($"Partition {message.PartitionId}: {message.Message.Payload}");
}

await consumer.DisposeAsync();

For automatic deserialization:

class OrderDeserializer : IDeserializer<OrderEvent>
{
    public OrderEvent Deserialize(byte[] data) =>
        JsonSerializer.Deserialize<OrderEvent>(Encoding.UTF8.GetString(data))!;
}

var consumer = IggyConsumerBuilder<OrderEvent>.Create(
    client,
    Identifier.String("orders-stream"),
    Identifier.String("orders-topic"),
    Consumer.Group("order-processors"),
    new OrderDeserializer()
)
.WithAutoCommitMode(AutoCommitMode.Auto)
.Build();

await consumer.InitAsync();

await foreach (var message in consumer.ReceiveDeserializedAsync())
{
    if (message.Status == MessageStatus.Success)
    {
        Console.WriteLine($"Order: {message.Data?.OrderId}");
    }
}

API Reference

The SDK provides the following main interfaces:

  • IIggyClient - Main client interface (aggregates all features)
  • IIggyPublisher - High-level message publishing interface
  • IIggyConsumer - High-level message consumption interface
  • IIggyStream - Stream management
  • IIggyTopic - Topic management
  • IIggyOffset - Offset management
  • IIggyConsumerGroup - Consumer group operations
  • IIggyPartition - Partition operations
  • IIggyUsers - User and authentication management
  • IIggySystem - System and cluster operations
  • IIggyPersonalAccessToken - Personal access token management

Additionally, builder-based APIs are available:

  • IggyPublisherBuilder / IggyPublisherBuilder<T> - Fluent publisher configuration
  • IggyConsumerBuilder / IggyConsumerBuilder<T> - Fluent consumer configuration

Running Examples

Examples are located in examples/csharp/ in root iggy directory.

  • Start the Iggy server:
cargo run --bin iggy-server
  • Run the producer example:
dotnet run -c Release --project Iggy_SDK.Examples.GettingStarted.Producer
  • Run the consumer example:
dotnet run -c Release --project Iggy_SDK.Examples.GettingStarted.Consumer

Integration Tests

Integration tests are located in Iggy_SDK.Tests.Integration/. Tests can run against:

  • A dockerized Iggy server with TestContainers
  • A local Iggy server (set IGGY_SERVER_HOST environment variable)

Useful Resources

ROADMAP - TODO

  • Error handling with status codes and descriptions
  • Add support for ASP.NET Core Dependency Injection
Product Compatible and additional computed target framework versions.
.NET net8.0 is compatible.  net8.0-android was computed.  net8.0-browser was computed.  net8.0-ios was computed.  net8.0-maccatalyst was computed.  net8.0-macos was computed.  net8.0-tvos was computed.  net8.0-windows was computed.  net9.0 was computed.  net9.0-android was computed.  net9.0-browser was computed.  net9.0-ios was computed.  net9.0-maccatalyst was computed.  net9.0-macos was computed.  net9.0-tvos was computed.  net9.0-windows was computed.  net10.0 was computed.  net10.0-android was computed.  net10.0-browser was computed.  net10.0-ios was computed.  net10.0-maccatalyst was computed.  net10.0-macos was computed.  net10.0-tvos was computed.  net10.0-windows was computed. 
Compatible target framework(s)
Included target framework(s) (in package)
Learn more about Target Frameworks and .NET Standard.

NuGet packages

This package is not used by any NuGet packages.

GitHub repositories

This package is not used by any popular GitHub repositories.

Version Downloads Last Updated
0.6.3-edge.3 53 2/11/2026
0.6.3-edge.2 81 2/7/2026
0.6.3-edge.1 59 2/4/2026
0.6.0 520 12/9/2025
0.6.0-edge.5 153 11/24/2025
0.6.0-edge.3 227 11/10/2025
0.6.0-edge.2 144 10/23/2025
0.6.0-edge.1 136 8/15/2025
0.5.0 247 8/10/2025
0.5.0-edge.1 117 7/27/2025