Memphis.Client
0.7.7
See the version list below for details.
dotnet add package Memphis.Client --version 0.7.7
NuGet\Install-Package Memphis.Client -Version 0.7.7
<PackageReference Include="Memphis.Client" Version="0.7.7" />
paket add Memphis.Client --version 0.7.7
#r "nuget: Memphis.Client, 0.7.7"
// Install Memphis.Client as a Cake Addin #addin nuget:?package=Memphis.Client&version=0.7.7 // Install Memphis.Client as a Cake Tool #tool nuget:?package=Memphis.Client&version=0.7.7
Memphis is a simple, robust, and durable cloud-native message broker wrapped with<br> an entire ecosystem that enables cost-effective, fast, and reliable development of modern queue-based use cases.<br> Memphis enables the building of modern queue-based applications that require<br> large volumes of streamed and enriched data, modern protocols, zero ops, rapid development,<br> extreme cost reduction, and a significantly lower amount of dev time for data-oriented developers and data engineers.
Installation
dotnet add package Memphis.Client -v ${MEMPHIS_CLIENT_VERSION}
Update
Update-Package Memphis.Client
Importing
using Memphis.Client;
Connecting to Memphis
First, we need to create or use default ClientOptions
and then connect to Memphis by using MemphisClientFactory.CreateClient(ClientOptions opst)
.
try
{
var options = MemphisClientFactory.GetDefaultOptions();
options.Host = "<broker-address>";
options.Username = "<application-type-username>";
options.ConnectionToken = "<broker-token>"; // you will get it on broker creation
var memphisClient = await MemphisClientFactory.CreateClient(options);
...
}
catch (Exception ex)
{
Console.Error.WriteLine("Exception: " + ex.Message);
Console.Error.WriteLine(ex);
}
We can also connect using a password:
try
{
var options = MemphisClientFactory.GetDefaultOptions();
options.Host = "<broker-address>";
options.Username = "<application-type-username>";
options.Password = "<password>"; // you will get it on client type user creation
var memphisClient = await MemphisClientFactory.CreateClient(options);
...
}
catch (Exception ex)
{
Console.Error.WriteLine("Exception: " + ex.Message);
Console.Error.WriteLine(ex);
}
Once client created, the entire functionalities offered by Memphis are available.
Disconnecting from Memphis
To disconnect from Memphis, call Dispose()
on the MemphisClient
.
await memphisClient.Dispose()
Creating a Station
try
{
// First: creating Memphis client
var options = MemphisClientFactory.GetDefaultOptions();
options.Host = "<memphis-host>";
options.Username = "<username>";
options.Password = "<password>";
var client = await MemphisClientFactory.CreateClient(options);
// Second: creaing Memphis station
var station = await client.CreateStation(
stationOptions: new StationOptions()
{
Name = "<station-name>",
RetentionType = RetentionTypes.MAX_MESSAGE_AGE_SECONDS,
RetentionValue = 3600,
StorageType = StorageTypes.DISK,
Replicas = 1,
IdempotencyWindowMs = 0,
SendPoisonMessageToDls = true,
SendSchemaFailedMessageToDls = true,
});
}
catch (Exception ex)
{
Console.Error.WriteLine("Exception: " + ex.Message);
Console.Error.WriteLine(ex);
}
Retention types
Memphis currently supports the following types of retention:
RetentionTypes.MAX_MESSAGE_AGE_SECONDS
The above means that every message persists for the value set in the retention value field (in seconds).
RetentionTypes.MESSAGES
The above means that after the maximum number of saved messages (set in retention value) has been reached, the oldest messages will be deleted.
RetentionTypes.BYTES
The above means that after maximum number of saved bytes (set in retention value) has been reached, the oldest messages will be deleted.
Retention Values
The retention values
are directly related to the retention types
mentioned above,<br> where the values vary according to the type of retention chosen.
All retention values are of type int
but with different representations as follows:
RetentionTypes.MAX_MESSAGE_AGE_SECONDS
is represented in seconds, RetentionTypes.MESSAGES
in a number of messages <br> and finally RetentionTypes.BYTES
in a number of bytes.
After these limits are reached oldest messages will be deleted.
Storage Types
Memphis currently supports the following types of messages storage:
StorageTypes.DISK
The above means that messages persist on disk.
StorageTypes.MEMORY
The above means that messages persist on the main memory.
Destroying a Station
Destroying a station will remove all its resources (including producers and consumers).
station.DestroyAsync()
Attaching a Schema to an Existing Station
await client.AttachSchema(stationName: "<station-name>", schemaName: "<schema-name>");
Detaching a Schema from Station
await client.DetachSchema(stationName: station.Name);
Produce and Consume messages
The most common client operations are produce
to send messages and consume
to
receive messages.
Messages are published to a station and consumed from it by creating a consumer. Consumers are pull based and consume all the messages in a station unless you are using a consumers group, in this case messages are spread across all members in this group.
Memphis messages are payload agnostic. Payloads are byte[]
.
In order to stop getting messages, you have to call consumer.Dispose()
. Destroy will terminate regardless
of whether there are messages in flight for the client.
Creating a Producer
try
{
// First: creating Memphis client
var options = MemphisClientFactory.GetDefaultOptions();
options.Host = "<memphis-host>";
options.Username = "<username>";
options.Password = "<password>";
var client = await MemphisClientFactory.CreateClient(options);
// Second: creating the Memphis producer
var producer = await client.CreateProducer(new MemphisProducerOptions
{
StationName = "<memphis-station-name>",
ProducerName = "<memphis-producer-name>",
GenerateUniqueSuffix = true
});
}
catch (Exception ex)
{
Console.Error.WriteLine("Exception: " + ex.Message);
Console.Error.WriteLine(ex);
}
Producing a message
var commonHeaders = new NameValueCollection();
commonHeaders.Add("key-1", "value-1");
await producer.ProduceAsync(
message: Encoding.UTF8.GetBytes(text),
headers:commonHeaders
);
Message ID
Stations are idempotent by default for 2 minutes (can be configured), Idempotence achieved by adding a message id
await producer.ProduceAsync(
message: Encoding.UTF8.GetBytes(text),
headers:commonHeaders,
messageId:"id" // defaults to null
);
Destroying a Producer
await producer.DestroyAsync()
Creating a Consumer
try
{
// First: creating Memphis client
var options = MemphisClientFactory.GetDefaultOptions();
options.Host = "<memphis-host>";
options.Username = "<username>";
options.Password = "<password>";
var client = await MemphisClientFactory.CreateClient(options);
// Second: creaing Memphis consumer
var consumer = await client.CreateConsumer(new ConsumerOptions
{
StationName = "<station-name>",
ConsumerName = "<consumer-name>",
ConsumerGroup = "<consumer-group-name>",
});
}
catch (Exception ex)
{
Console.Error.WriteLine("Exception: " + ex.Message);
Console.Error.WriteLine(ex);
}
Creating message handler for consuming a message
To configure message handler, use the MessageReceived
event:
consumer.MessageReceived += (sender, args) =>
{
if (args.Exception != null)
{
Console.Error.WriteLine(args.Exception);
return;
}
foreach (var msg in args.MessageList)
{
//print message itself
Console.WriteLine("Received data: " + Encoding.UTF8.GetString(msg.GetData()));
// print message headers
foreach (var headerKey in msg.GetHeaders().Keys)
{
Console.WriteLine(
$"Header Key: {headerKey}, value: {msg.GetHeaders()[headerKey.ToString()]}");
}
Console.WriteLine("---------");
msg.Ack();
}
Console.WriteLine("destroyed");
};
Consuming a message
The consumer will try to fetch messages every PullIntervalMs (that was given in Consumer's creation) and call the defined message handler.
await consumer.ConsumeAsync();
Fetch a single batch of messages
client.FetchMessages(new FetchMessageOptions
{
StationName= "<station-name>",
ConsumerName= "<consumer-name>",
ConsumerGroup= "<group-name>", // defaults to the consumer name.
BatchSize= 10, // defaults to 10
BatchMaxTimeToWaitMs= 1000, // defaults to 1000
MaxAckTimeMs= 30000, // defaults to 30000
MaxMsgDeliveries= 2, // defaults to 2
GenerateUniqueSuffix= false, // defaults to false
StartConsumeFromSequence= 1, // start consuming from a specific sequence. defaults to 1
LastMessages= -1 // consume the last N messages, defaults to -1 (all messages in the station)
});
Fetch a single batch of messages after creating a consumer
prefetch = true
will prefetch next batch of messages and save it in memory for future Fetch() request
Note: Use a higher MaxAckTime as the messages will sit in a local cache for some time before processing
var messages = consumer.Fetch(
batchSize: 10,
prefetch: true
);
Acknowledging a Message
Acknowledging a message indicates to the Memphis server to not re-send the same message again to the same consumer or consumers group.
msg.Ack();
Delay the message after a given duration
Delay the message and tell Memphis server to re-send the same message again to the same consumer group.
The message will be redelivered only in case Consumer.MaxMsgDeliveries
is not reached yet.
msg.Delay(<delayMilliSeconds>);
Get headers
Get headers per message
msg.GetHeaders()
Destroying a Consumer
await consumer.DestroyAsync();
Check if broker is connected
memphisClient.IsConnected();
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net5.0 was computed. net5.0-windows was computed. net6.0 was computed. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. |
.NET Core | netcoreapp2.0 was computed. netcoreapp2.1 was computed. netcoreapp2.2 was computed. netcoreapp3.0 was computed. netcoreapp3.1 was computed. |
.NET Standard | netstandard2.0 is compatible. netstandard2.1 was computed. |
.NET Framework | net461 was computed. net462 was computed. net463 was computed. net47 was computed. net471 was computed. net472 was computed. net48 was computed. net481 was computed. |
MonoAndroid | monoandroid was computed. |
MonoMac | monomac was computed. |
MonoTouch | monotouch was computed. |
Tizen | tizen40 was computed. tizen60 was computed. |
Xamarin.iOS | xamarinios was computed. |
Xamarin.Mac | xamarinmac was computed. |
Xamarin.TVOS | xamarintvos was computed. |
Xamarin.WatchOS | xamarinwatchos was computed. |
-
.NETStandard 2.0
- AvroConvert (>= 3.3.6)
- GraphQL (>= 7.2.1)
- GraphQL.SystemTextJson (>= 7.2.1)
- murmurhash (>= 1.0.3)
- NATS.Client (>= 1.1.0)
- NJsonSchema (>= 10.8.0)
- ProtoBufEval (>= 0.1.5)
NuGet packages (2)
Showing the top 2 NuGet packages that depend on Memphis.Client:
Package | Downloads |
---|---|
Fr4ctal.Core
Fractal Core é uma biblioteca versátil para .NET, projetada para simplificar e otimizar a integração e gerenciamento de uma variedade de serviços essenciais em sistemas modernos. Com suporte robusto para mensageria via RabbitMQ, ela permite uma comunicação assíncrona eficaz, essencial para aplicativos de alto desempenho. O pacote também oferece recursos avançados de integração de banco de dados, suportando tanto MySQL quanto MongoDB, proporcionando aos desenvolvedores a flexibilidade de trabalhar com dados tanto SQL quanto NoSQL de maneira eficiente. Além disso, a integração com o Redis oferece uma solução de cache poderosa, crucial para melhorar a resposta dos aplicativos e reduzir a carga no banco de dados. Fractal Core também facilita a interação com serviços da AWS, especialmente o S3, garantindo armazenamento em nuvem seguro, durável e escalável. Essa biblioteca é a escolha ideal para desenvolvedores que procuram eficiência, redução na complexidade do código e uma maneira simplificada de lidar com várias tecnologias de backend. Ela é perfeita tanto para desenvolvedores individuais quanto para equipes, acelerando o desenvolvimento e permitindo que se concentrem na lógica de negócios, enquanto a Fractal Core cuida das integrações complexas. |
|
PaulPhillips.Framework.Feature
Microservie eco system that has built-in capability for Idempontancy, Event Mangement, Logging, Error HandlingSecurity and Request Handling. |
GitHub repositories
This package is not used by any popular GitHub repositories.
Version | Downloads | Last updated |
---|---|---|
0.8.1-beta.1 | 71 | 2/12/2024 |
0.8.0-beta.1 | 72 | 2/7/2024 |
0.7.9-beta.1 | 69 | 2/7/2024 |
0.7.8-beta.1 | 51 | 2/7/2024 |
0.7.7 | 877 | 2/12/2024 |
0.7.7-beta.1 | 61 | 2/5/2024 |
0.7.6 | 117 | 2/7/2024 |
0.7.6-beta.1 | 61 | 2/5/2024 |
0.7.5 | 105 | 2/7/2024 |
0.7.5-beta.1 | 70 | 1/23/2024 |
0.7.4 | 104 | 2/5/2024 |
0.7.4-beta.1 | 76 | 1/11/2024 |
0.7.3 | 136 | 1/23/2024 |
0.7.3-beta.1 | 62 | 1/8/2024 |
0.7.2 | 154 | 1/11/2024 |
0.7.2-beta.1 | 76 | 1/3/2024 |
0.7.1 | 199 | 1/8/2024 |
0.7.1-beta.1 | 77 | 1/2/2024 |
0.7.0 | 164 | 1/3/2024 |
0.7.0-beta.1 | 103 | 12/31/2023 |
0.6.2 | 1,546 | 12/13/2023 |
0.6.2-beta.1 | 92 | 12/11/2023 |
0.6.1 | 194 | 12/4/2023 |
0.6.1-beta.1 | 71 | 12/4/2023 |
0.6.0 | 318 | 11/26/2023 |
0.5.1-beta.1 | 115 | 11/22/2023 |
0.5.0 | 160 | 11/6/2023 |
0.5.0-beta.1 | 91 | 11/6/2023 |
0.3.8-beta.1 | 86 | 9/12/2023 |
0.3.7-beta.1 | 95 | 9/5/2023 |
0.3.6-beta.1 | 76 | 9/5/2023 |
0.3.5-beta.1 | 75 | 9/3/2023 |
0.3.4-beta.1 | 75 | 8/15/2023 |
0.3.3 | 617 | 9/12/2023 |
0.3.3-beta.1 | 97 | 8/9/2023 |
0.3.2 | 157 | 9/5/2023 |
0.3.2-beta.1 | 78 | 8/9/2023 |
0.3.1 | 580 | 8/15/2023 |
0.3.1-beta.1 | 87 | 8/9/2023 |
0.3.0 | 224 | 8/9/2023 |
0.3.0-beta.1 | 86 | 8/9/2023 |
0.2.3 | 392 | 7/18/2023 |
0.2.3-beta.1 | 87 | 7/18/2023 |
0.2.2 | 256 | 7/18/2023 |
0.2.2-beta.1 | 82 | 7/17/2023 |
0.2.1 | 206 | 7/10/2023 |
0.2.1-beta.1 | 82 | 7/9/2023 |
0.2.0 | 184 | 7/2/2023 |
0.2.0-beta.1 | 77 | 6/27/2023 |
0.1.1 | 168 | 5/31/2023 |
0.1.1-beta.2 | 72 | 5/31/2023 |
0.1.1-beta.1 | 71 | 5/30/2023 |
0.1.0 | 570 | 5/11/2023 |
0.1.0-beta.2 | 84 | 5/10/2023 |
0.1.0-beta.1 | 68 | 5/10/2023 |