Asos.ServiceBus.MessageSiphon
1.0.20-ci-message-selec0003
Prefix Reserved
See the version list below for details.
dotnet tool install --global Asos.ServiceBus.MessageSiphon --version 1.0.20-ci-message-selec0003
dotnet new tool-manifest # if you are setting up this repo dotnet tool install --local Asos.ServiceBus.MessageSiphon --version 1.0.20-ci-message-selec0003
#tool dotnet:?package=Asos.ServiceBus.MessageSiphon&version=1.0.20-ci-message-selec0003&prerelease
nuke :add-package Asos.ServiceBus.MessageSiphon --version 1.0.20-ci-message-selec0003
azure-servicebus-message-siphon
A dotnet tool for message siphon scenarios when working with Azure Service Bus, allowing you to purge and replay messages, within a single namespace or across namespaces
What's it for?
You may have cases such as
- Messages have dead lettered and you want to replay them back onto the main queue\topic
- You want to clone messages from one entity to another in the same namespace
- You want to clone and remove messages from one namespace to another
- You want to select message by a header value and move them
- You want to purge messages, deleting from a queue, topic or dead-letter sub entity.
- You want to use connection strings, RBAC or both for the source and targets. (i.e. connection string for source, RBAC for target)
How does it work?
A configuration file is used that determines the set of work that will be performed. The configuration file is made up
of service bus connection details, and a list of siphon work that needs to be performed. The siphon work then references
the connections by Name
As the consumer of this package, you can define some standard work
you'd like to perform as a set of configuration
files, then use them to execute the work against one or many service bus namespaces - just provide the
appropriate ConnectionString
or FullyQualifiedNamespace
at runtime
Configuration details
ReplayMessagesJob
- A replay messages just has the following
JobType
- the type of work to perform. Can be one of DeadLettersSameEntity, DeadLettersToDifferentEntity or SourceToTargetDeadLettersSameEntity
- replays messages from the sub-queue back onto the parent entityDeadLettersToDifferentEntity
- replays messages from the entity sub-queue, to a different Target entitySourceToTarget
- siphons messages from any source to any target entityPurgeSource
- purges messages from the source entity, a delete operationPurgeDeadLetters
- as per PurgeSource but for dead letter entities
JobName
- Give the work a useful name, used in log messages etcNumberOfConcurrentProcesses
- When performing a siphon job, copying messages fromsource
totarget
, messages are received in batches as per theSourceBatchReceiveSize
setting. Once messages are received, they need to be sent to the target andNumberOfConcurrentProcesses
controls how many concurrent operations attempt to work through the batch. E.g. if your batch size is100
and you setNumberOfConcurrentProcesses
to10
, then 10 operations will each attempt to process 10 messages in parallel. Experiment with these settings to find the best combination to achieve high throughput.
ServiceBusDetails
- a list of service bus connections. A ServiceBusDetail instance has the following
Name
- give the item a useful name, you'll use this to refer to it from any siphon work you defineConnectionMode
- how to connect to the service bus, can be one ofConnectionString
orRbac
ConnectionString
- ifConnectionMode
isConnectionString
, then define the SAS connection string to useFullyQualifiedNamespace
= ifConnectionMode
isRbac
, then define the fully qualified namespace, e.g. namespace.servicebus.windows.net
SiphonWork - a list of work to perform, Siphon work requires the following
SiphonMode
- the type of operation to perform. Can be Clone or CloneAndDelete.- When
Clone
, the worker will use PeekMessagesAsync so not to increment the delivery count of messages, and the message will not be received or completed. A copy of the message is created - When
CloneAndDelete
, the worker will use ReceiveMessagesAsync, and will attempt to complete the message once the send operation to target has completed OK
- When
SourceConnectionName
- theName
of the ServiceBusDetails to use as the sourceSourceEntity
- the name of the entity, can be either a queue or a topicSourceSubscriptions
- ifSourceEntity
is a topic, then define a list of subscription names for the TopicTargetConnectionName
- theName
of the ServiceBusDetails to use as the targetTargetEntity
- the name of the entity to send to, can be either a queue or a topicSourceBatchReceiveSize
- the size of the batch to recieve messages from the source. Defaults to20
.MessagesOlderThan
- An ISO8601 date span, as per the duration specification. E.g. PT72H (72 hours), P30D (30 days). When performing a purge, only messages that have an older enqueue time than this value are considered, allowing you to perform work such aspurge messages older than 72 hours
HeaderSelector
- A filter to use on the message headers to select only matching messages. This has the following limitations- Only a single header filter is support - You can filter by one field only, e.g. HeaderName = Value
- Only =, > and < operators are supported
- If you use > or < operators, then the field selector must be numeric, e.g. HeaderName > 12345
Example configurations
Replaying service bus messages from dead letters on the subscriptions
In this configuration, we only need to define a single service bus connection, since we're replaying onto the same namespace/entity.
{
"Logging": {
"LogLevel": {
"Default": "Information"
}
},
"ReplayMessagesJob": {
"JobType": "DeadLettersSameEntity",
"JobName": "Replay-Messages-From-DeadLetter-Subqueue",
"NumberOfConcurrentProcesses": 1,
"ServiceBusDetails": [
{
"Name": "Source",
"ConnectionMode": "ConnectionString",
"ConnectionString": "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=your-sas-key"
}
],
"SiphonWork": [
{
"SiphonMode": "CloneAndDelete",
"SourceConnectionName": "Source",
"SourceEntity": "snapshot",
"SourceSubscriptions": [ "test", "test1" ]
}
]
}
}
Cloning messages from a subscription in one namespace using a connection string, to another namespace using RBAC.
In this example, messages will be receieved in batches of 40
, and 5
concurrent tasks will be used to to send them to
the target in parallel (i.e. each task processes 8 messages each)
{
"Logging": {
"LogLevel": {
}
},
"ReplayMessagesJob": {
"JobType": "SourceToTarget",
"JobName": "Local Debug",
"NumberOfConcurrentProcesses": 5,
"ServiceBusDetails": [
{
"Name": "Source",
"ConnectionMode": "ConnectionString",
"ConnectionString": "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=access-key"
},
{
"Name": "Target",
"ConnectionMode": "Rbac",
"FullyQualifiedNamespace": "namespace1.servicebus.windows.net"
}
],
"SiphonWork": [
{
"SiphonMode": "Clone",
"SourceConnectionName": "Source",
"SourceEntity": "topic-entity",
"SourceSubscriptions": [ "test-subscription" ],
"SourceBatchReceiveSize" : 40,
"TargetConnectionName": "Target",
"TargetEntity": "copy-of-topic"
}
]
}
}
Cloning messages from a subscription in one namespace to another topic in the same namespace, using a message selector to filter .
In this example, only messages that has a header
{
"Logging": {
"LogLevel": {
}
},
"ReplayMessagesJob": {
"JobType": "SourceToTargetByMessageSelector",
"JobName": "Local Debug",
"ServiceBusDetails": [
{
"Name": "Source",
"ConnectionMode": "ConnectionString",
"ConnectionString": "Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=access-key"
}
],
"SiphonWork": [
{
"SiphonMode": "Clone",
"SourceConnectionName": "Source",
"SourceEntity": "topic-entity",
"SourceSubscriptions": [ "test-subscription" ],
"HeaderSelector": "[Payload.MessageType] = 'ProductUpdated'",
"SourceBatchReceiveSize" : 40,
"TargetConnectionName": "Source",
"TargetEntity": "copy-of-topic"
}
]
}
}
Using RBAC, receive messages from a dead letter subscription in batches of 100 and purge messages that are older than 1 hour
{
"Logging": {
"LogLevel": {
"Default": "Information"
}
},
"ReplayMessagesJob": {
"JobType": "PurgeDeadLetters",
"JobName": "Purge Debug",
"ServiceBusDetails": [
{
"Name": "Source",
"ConnectionMode": "Rbac",
"FullyQualifiedNamespace": "namespace.servicebus.windows.net"
}
],
"SiphonWork": [
{
"SiphonMode": "Delete",
"SourceConnectionName": "Source",
"SourceEntity": "demo",
"SourceSubscriptions": [ "test" ],
"SourceBatchReceiveSize" : 100,
"MessagesOlderThan" : "PT1H"
}
]
}
}
How to use?
The package is available as a dotnet tool, so you can install from nuget.org by running the following command
dotnet tool update --global Asos.ServiceBus.MessageSiphon ---version x.x.x
Once installed, you have a command available to you siphon-asb-messages
. This requires a single parameter, the path to
the configuration file, e.g.
siphon-asb-messages -n D:\temp\replay-config.json
siphon-asb-messages -n usr/tmp/replay-config.json
Azure Pipelines
Since this is a dotnet tool, you can run it from your Azure Pipeline. This lets you take advantage of Managed Identity from Service Connections, and interact with namespaces that may be IP Restricted by running the work from a build pool that is allow-listed on the namespace.
This gives you a way to allow supporting team members to run jobs, replaying or purging messages, in a secure and repeatable manner & without requring any particular tools or permissions.
For example, another team may have a dependency on a service bus that you own and maintain. The namespace is IP rescricted and access is via RBAC, they need to replay some dead letters on a topic-subscription. You can define an Azure pipeline to do this work and define a particular build pool with a known IP, then grant their team members permissions to run the pipeline. In this way, the consuming team can self-serve without sharing connection strings or having to use Bastion services
To use this, you'd need to define a configuration file in your source repository. Note, we use a variable placeholder
here for $ServiceBusConnectionString
{
"Logging": {
"LogLevel": {
"Default": "Information"
}
},
"ReplayMessagesJob": {
"JobType": "DeadLettersSameEntity",
"JobName": "Dead Letters Replay",
"ServiceBusDetails": [
{
"Name": "Source",
"ConnectionMode": "ConnectionString",
"FullyQualifiedNamespace": "$(ServiceBusConnectionString)"
}
],
"SiphonWork": [
{
"SiphonMode": "CloneAndDelete",
"SourceConnectionName": "Source",
"SourceEntity": "some-topic-orders",
"SourceSubscriptions": [
"the-subscription-name"
]
}
]
}
}
An example pipeline might look like so. In this example, we support either RBAC using a fully qualified namespace, or
connection strings by retrieving a secret from a vault. Replace Tokens
will update the config file with the
appropriate values
parameters:
- name: StageName
displayName: Service Bus Connection String Secret
type: string
default: "ReplayPurgeMessages"
- name: AzureSubscription
displayName: The name of the service connection that's used to connect to Azure
type: string
- name: ConfigPath
displayName: Path to config
type: string
- name: ConfigFileName
displayName: Message replay config
type: string
- name: Environment
displayName: Environment
type: string
- name: ServiceBusFullyQualifiedName
displayName: ServiceBusFullyQualifiedName
type: string
default: ""
- name: KeyVaultSubscriptionName
displayName: Name of the subscription that contains the key vault, if using it to retrieve connection strings
type: string
default: ""
- name: KeyVaultName
displayName: Key Vault Name
type: string
default: ""
- name: SecretName
displayName: Service Bus Connection String Secret
type: string
default: ""
stages:
- stage: ${{parameters.StageName}}
displayName: ${{parameters.StageName}}
pool:
vmImage: ubuntu-latest
jobs:
- job:
steps:
- checkout: self
- task: AzureCLI@2
displayName: Get Service Bus Connection String From Vault
condition: and(succeeded(), ne('${{ parameters.KeyVaultName }}', ''))
inputs:
azureSubscription: ${{parameters.AzureSubscription}}
scriptType: pscore
scriptLocation: inlineScript
inlineScript: |
$subscription = ${{parameters.KeyVaultSubscriptionName}}
$connectionString = az keyvault secret show --name ${{parameters.SecretName}} --subscription $subscription --vault-name ${{parameters.KeyVaultName}} --query value -o tsv
echo "##vso[task.setvariable variable=ServiceBusConnectionString]$connectionString"
- task: AzureCLI@2
displayName: Set variable to fully qualified namespace parameter
condition: and(succeeded(), eq('${{ parameters.KeyVaultName }}', ''))
inputs:
azureSubscription: ${{parameters.AzureSubscription}}
scriptType: pscore
scriptLocation: inlineScript
inlineScript: |
$namespaceName = ${{parameters.ServiceBusFullyQualifiedName}}
echo "Connecting to namespace " + $namespaceName
echo "##vso[task.setvariable variable=ServiceBusFullyQualifiedName]$namespaceName"
- task: replacetokens@4
displayName: Replace Tokens
inputs:
targetFiles: '${{parameters.ConfigPath}}/*'
encoding: 'auto'
tokenPattern: 'azpipelines'
writeBOM: true
actionOnMissing: 'warn'
keepToken: false
actionOnNoFiles: 'continue'
enableTransforms: false
useLegacyPattern: false
enableTelemetry: true
- task: AzureCLI@2
displayName: Perform Message Work
inputs:
azureSubscription: ${{parameters.AzureSubscription}}
scriptType: pscore
scriptLocation: inlineScript
inlineScript: |
dotnet tool update Asos.ServiceBus.MessageSiphon --tool-path . --version 1.0.0
& "./siphon-asb-messages" -n ${{parameters.ConfigPath}}/${{parameters.ConfigFileName}}
Product | Versions Compatible and additional computed target framework versions. |
---|---|
.NET | net6.0 is compatible. net6.0-android was computed. net6.0-ios was computed. net6.0-maccatalyst was computed. net6.0-macos was computed. net6.0-tvos was computed. net6.0-windows was computed. net7.0 was computed. net7.0-android was computed. net7.0-ios was computed. net7.0-maccatalyst was computed. net7.0-macos was computed. net7.0-tvos was computed. net7.0-windows was computed. net8.0 was computed. net8.0-android was computed. net8.0-browser was computed. net8.0-ios was computed. net8.0-maccatalyst was computed. net8.0-macos was computed. net8.0-tvos was computed. net8.0-windows was computed. net9.0 was computed. net9.0-android was computed. net9.0-browser was computed. net9.0-ios was computed. net9.0-maccatalyst was computed. net9.0-macos was computed. net9.0-tvos was computed. net9.0-windows was computed. |
This package has no dependencies.
Version | Downloads | Last updated |
---|---|---|
1.2.32 | 134 | 7/8/2024 |
1.2.31 | 115 | 7/1/2024 |
1.2.30 | 107 | 6/24/2024 |
1.2.29 | 91 | 6/24/2024 |
1.2.28 | 122 | 6/17/2024 |
1.2.27 | 94 | 6/12/2024 |
1.2.26 | 97 | 6/10/2024 |
1.2.25 | 84 | 6/4/2024 |
1.2.24 | 106 | 6/3/2024 |
1.2.23 | 102 | 5/28/2024 |
1.2.22 | 106 | 5/20/2024 |
1.2.21 | 123 | 5/20/2024 |
1.2.20 | 74 | 5/13/2024 |
1.2.19 | 92 | 5/13/2024 |
1.2.18 | 140 | 5/6/2024 |
1.2.17 | 124 | 4/29/2024 |
1.2.16 | 119 | 4/22/2024 |
1.2.15 | 94 | 4/22/2024 |
1.2.14 | 153 | 4/17/2024 |
1.2.12 | 202 | 4/12/2024 |
1.1.35 | 233 | 1/30/2024 |
1.1.34 | 213 | 1/26/2024 |
1.1.33 | 194 | 1/8/2024 |
1.1.26 | 235 | 12/6/2023 |
1.1.24 | 143 | 12/4/2023 |
1.1.23 | 146 | 12/4/2023 |
1.1.22 | 171 | 12/4/2023 |
1.1.21 | 180 | 12/4/2023 |
1.1.20 | 164 | 11/27/2023 |
1.1.19 | 186 | 11/20/2023 |
1.1.18 | 199 | 11/13/2023 |
1.1.17 | 251 | 11/6/2023 |
1.1.16 | 217 | 10/23/2023 |
1.1.15 | 264 | 10/17/2023 |
1.1.14 | 249 | 10/17/2023 |
1.1.13 | 249 | 10/16/2023 |
1.1.12 | 228 | 10/16/2023 |
1.1.11 | 215 | 10/2/2023 |
1.1.10 | 258 | 9/25/2023 |
1.1.9 | 243 | 9/14/2023 |
1.1.8 | 180 | 6/28/2023 |
1.1.7 | 299 | 2/16/2023 |
1.1.6 | 248 | 2/13/2023 |
1.1.5 | 238 | 2/7/2023 |
1.1.4 | 280 | 2/2/2023 |
1.1.2 | 316 | 12/13/2022 |
1.1.1 | 295 | 12/13/2022 |
1.1.0 | 320 | 12/13/2022 |
1.0.24 | 308 | 12/13/2022 |
1.0.23 | 303 | 12/12/2022 |
1.0.22 | 306 | 12/9/2022 |
1.0.21 | 433 | 8/5/2022 |
1.0.21-pr0026-0011 | 276 | 8/4/2022 |
1.0.20 | 437 | 8/3/2022 |
1.0.20-pr0024-0004 | 265 | 8/2/2022 |
1.0.20-ci-message-selec0003 | 304 | 8/2/2022 |
1.0.19 | 504 | 1/24/2022 |