Apache.Iggy
0.6.0
See the version list below for details.
dotnet add package Apache.Iggy --version 0.6.0
NuGet\Install-Package Apache.Iggy -Version 0.6.0
<PackageReference Include="Apache.Iggy" Version="0.6.0" />
<PackageVersion Include="Apache.Iggy" Version="0.6.0" />
<PackageReference Include="Apache.Iggy" />
paket add Apache.Iggy --version 0.6.0
#r "nuget: Apache.Iggy, 0.6.0"
#:package Apache.Iggy@0.6.0
#addin nuget:?package=Apache.Iggy&version=0.6.0
#tool nuget:?package=Apache.Iggy&version=0.6.0
C# SDK for Iggy
<div align="center">
</div>
Overview
The Apache Iggy C# SDK provides a comprehensive client library for interacting with Iggy message streaming servers. It offers a modern, async-first API with support for multiple transport protocols and comprehensive message streaming capabilities.
Getting Started
Installation
Install the NuGet package:
dotnet add package Apache.Iggy
Supported Protocols
The SDK supports two transport protocols:
- TCP - Binary protocol for optimal performance and lower latency (recommended)
- HTTP - RESTful JSON API for stateless operations
Creating a Client
The SDK is built around the IIggyClient interface. To create a client instance:
var loggerFactory = LoggerFactory.Create(builder =>
{
builder
.AddFilter("Apache.Iggy", LogLevel.Information)
.AddConsole();
});
var client = IggyClientFactory.CreateClient(new IggyClientConfigurator
{
BaseAddress = "127.0.0.1:8090",
Protocol = Protocol.Tcp
}, loggerFactory);
await client.ConnectAsync();
The ILoggerFactory is required and used throughout the SDK for diagnostics and debugging.
Configuration
The IggyClientConfigurator provides comprehensive configuration options:
var client = IggyClientFactory.CreateClient(new IggyClientConfigurator
{
BaseAddress = "127.0.0.1:8090",
Protocol = Protocol.Tcp,
// Buffer sizes (optional, default: 4096)
ReceiveBufferSize = 4096,
SendBufferSize = 4096,
// TLS/SSL configuration
TlsSettings = new TlsConfiguration
{
Enabled = true,
Hostname = "iggy",
CertificatePath = "/path/to/cert"
},
// Automatic reconnection with exponential backoff
ReconnectionSettings = new ReconnectionSettings
{
Enabled = true,
MaxRetries = 3, // 0 = infinite retries
InitialDelay = TimeSpan.FromSeconds(5),
MaxDelay = TimeSpan.FromSeconds(30),
WaitAfterReconnect = TimeSpan.FromSeconds(1),
UseExponentialBackoff = true,
BackoffMultiplier = 2.0
},
// Auto-login after connection
AutoLoginSettings = new AutoLoginSettings
{
Enabled = true,
Username = "your_username",
Password = "your_password"
}
}, loggerFactory);
await client.ConnectAsync();
Authentication
User Login
Begin by using the root account (note: the root account cannot be removed or updated):
var response = await client.LoginUser("iggy", "iggy");
Creating Users
Create new users with customizable permissions:
var permissions = new Permissions
{
Global = new GlobalPermissions
{
ManageServers = true,
ManageUsers = true,
ManageStreams = true,
ManageTopics = true,
PollMessages = true,
ReadServers = true,
ReadStreams = true,
ReadTopics = true,
ReadUsers = true,
SendMessages = true
}
};
await client.CreateUser("test_user", "secure_password", UserStatus.Active, permissions);
// Login with the new user
var loginResponse = await client.LoginUser("test_user", "secure_password");
Personal Access Tokens
Create and use Personal Access Tokens (PAT) for programmatic access:
// Create a PAT
var patResponse = await client.CreatePersonalAccessTokenAsync("api-token", 3600);
// Login with PAT
await client.LoginWithPersonalAccessToken(patResponse.Token);
Streams and Topics
Creating Streams
await client.CreateStreamAsync("my-stream");
You can reference streams by either numeric ID or name:
var streamById = Identifier.Numeric(0);
var streamByName = Identifier.String("my-stream");
Creating Topics
Every stream contains topics for organizing messages:
var streamId = Identifier.String("my-stream");
await client.CreateTopicAsync(
streamId,
name: "my-topic",
partitionsCount: 3,
compressionAlgorithm: CompressionAlgorithm.None,
replicationFactor: 1,
messageExpiry: 0, // 0 = never expire
maxTopicSize: 0 // 0 = unlimited
);
Note: Stream and topic names use hyphens instead of spaces. Iggy automatically replaces spaces with hyphens.
Publishing Messages
Sending Messages
Send messages using the publisher interface:
var streamId = Identifier.String("my-stream");
var topicId = Identifier.String("my-topic");
var messages = new List<Message>
{
new(Guid.NewGuid(), "Hello, Iggy!"u8.ToArray()),
new(1, "Another message"u8.ToArray())
};
await client.SendMessagesAsync(
streamId,
topicId,
Partitioning.None(), // balanced partitioning
messages
);
Partitioning Strategies
Control which partition receives each message:
// Balanced partitioning (default)
Partitioning.None()
// Send to specific partition
Partitioning.PartitionId(1)
// Key-based partitioning (string)
Partitioning.EntityIdString("user-123")
// Key-based partitioning (integer)
Partitioning.EntityIdInt(12345)
// Key-based partitioning (GUID)
Partitioning.EntityIdGuid(Guid.NewGuid())
User-Defined Headers
Add custom headers to messages with typed values:
var headers = new Dictionary<HeaderKey, HeaderValue>
{
{ new HeaderKey { Value = "correlation_id" }, HeaderValue.FromString("req-123") },
{ new HeaderKey { Value = "priority" }, HeaderValue.FromInt32(1) },
{ new HeaderKey { Value = "timeout" }, HeaderValue.FromInt64(5000) },
{ new HeaderKey { Value = "confidence" }, HeaderValue.FromFloat(0.95f) },
{ new HeaderKey { Value = "is_urgent" }, HeaderValue.FromBool(true) },
{ new HeaderKey { Value = "request_id" }, HeaderValue.FromGuid(Guid.NewGuid()) }
};
var messages = new List<Message>
{
new(Guid.NewGuid(), "Message with headers"u8.ToArray(), headers)
};
await client.SendMessagesAsync(
streamId,
topicId,
Partitioning.PartitionId(1),
messages
);
Consumer Groups
Creating Consumer Groups
Coordinate message consumption across multiple consumers:
var groupResponse = await client.CreateConsumerGroupAsync(
Identifier.String("my-stream"),
Identifier.String("my-topic"),
"my-consumer-group"
);
Joining and Leaving Groups
Note: Join/Leave operations are only supported on TCP protocol and will throw FeatureUnavailableException on HTTP.
// Join a consumer group
await client.JoinConsumerGroupAsync(
Identifier.String("my-stream"),
Identifier.String("my-topic"),
Identifier.Numeric(1) // consumer ID
);
// Leave a consumer group
await client.LeaveConsumerGroupAsync(
Identifier.String("my-stream"),
Identifier.String("my-topic"),
Identifier.Numeric(1) // consumer ID
);
Consuming Messages
Fetching Messages
Fetch a batch of messages:
var polledMessages = await client.PollMessagesAsync(new MessageFetchRequest
{
StreamId = streamId,
TopicId = topicId,
Consumer = Consumer.New(1), // or Consumer.Group("my-consumer-group") for consumer group
Count = 10,
PartitionId = 0, // optional, null for consumer group
PollingStrategy = PollingStrategy.Next(),
AutoCommit = true
});
foreach (var message in polledMessages.Messages)
{
Console.WriteLine($"Message: {Encoding.UTF8.GetString(message.Payload)}");
}
Polling Strategies
Control where message consumption starts:
// Start from a specific offset
PollingStrategy.Offset(1000)
// Start from a specific timestamp (microseconds since epoch)
PollingStrategy.Timestamp(1699564800000000)
// Start from the first message
PollingStrategy.First()
// Start from the last message
PollingStrategy.Last()
// Start from the next unread message
PollingStrategy.Next()
Offset Management
Storing Offsets
Store the current consumer position:
await client.StoreOffsetAsync(
Identifier.String("my-stream"),
Identifier.String("my-topic"),
Identifier.Numeric(1), // consumer ID
0, // partition ID
42 // offset value
);
Retrieving Offsets
Get the current stored offset:
var offsetInfo = await client.GetOffsetAsync(
Identifier.String("my-stream"),
Identifier.String("my-topic"),
Identifier.Numeric(1), // consumer ID
0 // partition ID
);
Console.WriteLine($"Current offset: {offsetInfo.Offset}");
Deleting Offsets
Clear stored offsets:
await client.DeleteOffsetAsync(
Identifier.String("my-stream"),
Identifier.String("my-topic"),
Identifier.Numeric(1), // consumer ID
0 // partition ID
);
System Operations
Cluster Information
Get cluster metadata and node information:
var metadata = await client.GetClusterMetadataAsync();
Server Statistics
Retrieve server performance metrics:
var stats = await client.GetStatsAsync();
Health Checks
Verify server connectivity:
await client.PingAsync();
Client Information
Get information about connected clients:
var clients = await client.GetClientsAsync();
var currentClient = await client.GetMeAsync();
Event Subscription
Subscribe to connection events:
// Subscribe to connection events
client.SubscribeConnectionEvents(async connectionState =>
{
Console.WriteLine($"Current connection state: {connectionState.CurrentState}");
await SaveConnectionStateLog(connectionState.CurrentState);
});
// Unsubscribe
client.UnsubscribeConnectionEvents(handler);
Advanced: IggyPublisher
High-level publisher with background sending, retries, and encryption:
var publisher = IggyPublisherBuilder.Create(
client,
Identifier.String("my-stream"),
Identifier.String("my-topic")
)
.WithBackgroundSending(enabled: true, batchSize: 100)
.WithRetry(maxAttempts: 3)
.Build();
await publisher.InitAsync();
var messages = new List<Message>
{
new(Guid.NewGuid(), "Message 1"u8.ToArray()),
new(0, "Message 2"u8.ToArray())
};
await publisher.SendMessages(messages);
// Wait for all messages to be sent
await publisher.WaitUntilAllSends();
await publisher.DisposeAsync();
For automatic object serialization, use the typed variant:
class OrderSerializer : ISerializer<Order>
{
public byte[] Serialize(Order data) =>
Encoding.UTF8.GetBytes(JsonSerializer.Serialize(data));
}
var publisher = IggyPublisherBuilder<Order>.Create(
client,
Identifier.String("orders-stream"),
Identifier.String("orders-topic"),
new OrderSerializer()
).Build();
await publisher.InitAsync();
await publisher.SendAsync(new List<Order> { /* ... */ });
Advanced: IggyConsumer
High-level consumer with automatic offset management and consumer groups:
var consumer = IggyConsumerBuilder.Create(
client,
Identifier.String("my-stream"),
Identifier.String("my-topic"),
Consumer.New(1)
)
.WithPollingStrategy(PollingStrategy.Next())
.WithBatchSize(10)
.WithAutoCommitMode(AutoCommitMode.Auto)
.Build();
await consumer.InitAsync();
await foreach (var message in consumer.ReceiveAsync())
{
var payload = Encoding.UTF8.GetString(message.Message.Payload);
Console.WriteLine($"Offset {message.CurrentOffset}: {payload}");
}
For consumer groups (load-balanced across multiple consumers):
var consumer = IggyConsumerBuilder.Create(
client,
Identifier.String("my-stream"),
Identifier.String("my-topic"),
Consumer.Group("my-group")
)
.WithConsumerGroup("my-group", createIfNotExists: true)
.WithPollingStrategy(PollingStrategy.Next())
.WithAutoCommitMode(AutoCommitMode.AfterReceive)
.Build();
await consumer.InitAsync();
await foreach (var message in consumer.ReceiveAsync())
{
Console.WriteLine($"Partition {message.PartitionId}: {message.Message.Payload}");
}
await consumer.DisposeAsync();
For automatic deserialization:
class OrderDeserializer : IDeserializer<OrderEvent>
{
public OrderEvent Deserialize(byte[] data) =>
JsonSerializer.Deserialize<OrderEvent>(Encoding.UTF8.GetString(data))!;
}
var consumer = IggyConsumerBuilder<OrderEvent>.Create(
client,
Identifier.String("orders-stream"),
Identifier.String("orders-topic"),
Consumer.Group("order-processors"),
new OrderDeserializer()
)
.WithAutoCommitMode(AutoCommitMode.Auto)
.Build();
await consumer.InitAsync();
await foreach (var message in consumer.ReceiveDeserializedAsync())
{
if (message.Status == MessageStatus.Success)
{
Console.WriteLine($"Order: {message.Data?.OrderId}");
}
}
API Reference
The SDK provides the following main interfaces:
- IIggyClient - Main client interface (aggregates all features)
- IIggyPublisher - High-level message publishing interface
- IIggyConsumer - High-level message consumption interface
- IIggyStream - Stream management
- IIggyTopic - Topic management
- IIggyOffset - Offset management
- IIggyConsumerGroup - Consumer group operations
- IIggyPartition - Partition operations
- IIggyUsers - User and authentication management
- IIggySystem - System and cluster operations
- IIggyPersonalAccessToken - Personal access token management
Additionally, builder-based APIs are available:
- IggyPublisherBuilder / IggyPublisherBuilder<T> - Fluent publisher configuration
- IggyConsumerBuilder / IggyConsumerBuilder<T> - Fluent consumer configuration
Running Examples
Examples are located in examples/csharp/ in root iggy directory.
- Start the Iggy server:
cargo run --bin iggy-server
- Run the producer example:
dotnet run -c Release --project Iggy_SDK.Examples.GettingStarted.Producer
- Run the consumer example:
dotnet run -c Release --project Iggy_SDK.Examples.GettingStarted.Consumer
Integration Tests
Integration tests are located in Iggy_SDK.Tests.Integration/. Tests can run against:
- A dockerized Iggy server with TestContainers
- A local Iggy server (set
IGGY_SERVER_HOSTenvironment variable)
Useful Resources
ROADMAP - TODO
- Error handling with status codes and descriptions
- Add support for
ASP.NET CoreDependency Injection
| Product | Versions Compatible and additional computed target framework versions. |
|---|---|
| .NET | net8.0 is compatible. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. net10.0 was computed. net10.0-android was computed. net10.0-browser was computed. net10.0-ios was computed. net10.0-maccatalyst was computed. net10.0-macos was computed. net10.0-tvos was computed. net10.0-windows was computed. |
-
net8.0
- Microsoft.Extensions.Logging (>= 8.0.1)
- System.IO.Hashing (>= 8.0.0)
NuGet packages
This package is not used by any NuGet packages.
GitHub repositories
This package is not used by any popular GitHub repositories.
| Version | Downloads | Last Updated |
|---|---|---|
| 0.6.3-edge.3 | 53 | 2/11/2026 |
| 0.6.3-edge.2 | 81 | 2/7/2026 |
| 0.6.3-edge.1 | 59 | 2/4/2026 |
| 0.6.0 | 520 | 12/9/2025 |
| 0.6.0-edge.5 | 153 | 11/24/2025 |
| 0.6.0-edge.3 | 227 | 11/10/2025 |
| 0.6.0-edge.2 | 144 | 10/23/2025 |
| 0.6.0-edge.1 | 136 | 8/15/2025 |
| 0.5.0 | 247 | 8/10/2025 |
| 0.5.0-edge.1 | 117 | 7/27/2025 |