Run Logic App workflows as publisher/subscriber with the PubSub
Asynchronous messaging helps with decoupling publishers from consumers, as it avoids the blocking during publishing. Especially in Azure Logic Apps workflows, where we can't assume that the interaction between publisher and consumer is always synchronous.
Invictus provides a solution called the PubSub, that allows Azure Service Bus to act as a message broker and interact in a publish/subscribe-approach via HTTP endpoints; plus having Azure Blob Storage act as a claim-check provider upon publishing messages with too big a size.
🔗 See also the Publisher-Subscriber integration pattern.
Available endpoints
/api/Publish: by sending a HTTP request with a custom content, it places a message on an Azure Service Bus topic./api/Subscribe: by sending a HTTP request with a specified Azure Service Bus topic subscription name, it response available messages./api/Acknowledge: by sending a HTTP request with a specified message sequence number, it settles the Azure Service Bus message.

➡️ Publish single message
The /api/Publish endpoint allows users to send a single message to the configured Azure Service Bus topic (default: pubsubv2router) where subscribers are listening to.
| JSON property | Required (default) | Translates to ServiceBusMessage | Description |
|---|---|---|---|
Content | yes | Body | Raw binary content for the message. If exceeds certain size (default 20 000 bytes), then the component applies the claim-check pattern: The message gets send with an empty body, and the content gets saved in Azure Blob Storage. The Subscribe action automatically loads the content based on specific application properties from either the message itself or from Blob Storage. |
Context | yes | ApplicationProperties | User-provided properties, appended with the HTTP request headers x-ms-client-tracking-id and x-ms-workflow-run-id if present |
MessageId | no (new GUID) | MessageId | Optional message ID for duplicate detection purposes. |
Full request example
// POST /api/Publish
{
"Content": "ew0KICAiQ291bnRyeUNvZGUiOiAiQkUiLA0KICAiTW9uZXkiOiAgeyAiQW1vdW50IjogIDUwLCAiQ3VycmVuY3kiOiAgIkdCUCIgIH0NCn0NCg==",
"MessageId": "b0f11049-7f4d-4bae-90b2-91d93e69367d",
"Context": {
"x-applicationName": "InvoiceApp",
"x-batchId": "975f7ea4-6247-431b-afb6-6d27fb47516f",
"x-conversationId": "29500405-d7cf-4877-a72b-a3288cff9dc0",
"x-correlationId": "fc13d345-ebd7-44f2-89a9-4371258c0a08"
}
}
The endpoint will respond with 202 Accepted, if the message got published successfully.
⬅️ Subscribe for messages
The /api/Subscribe endpoint allows users to periodically ask for any available published messages on the configured Azure Service Bus topic (default: pubsubv2router).
| JSON property | Required (default) | Translates to Service Bus | Description |
|---|---|---|---|
Subscription | yes | SubscriptionName | Name of Azure Service Bus topic subscription, gets created if not exists. (Name is also used as name of the Rule.) |
Filter | no (subscribe on all messages) | SqlExpression | Optional SQL expression that acts as a filter rule for which messages to subscribe on. |
BatchSize | no (10) | BatchSize | Maximum messages to receive during this single call. |
TimeoutMilliseconds | no (1min) | MaxWaitTime | Maximum time to wait for a message before responding with an empty set of messages. |
ShouldDeleteOnReceive | no (false) | ReceiveAndDelete | false (default) means PeekLock, true means receiving messages with ReceiveAndDelete. ⚠️ In some rare cases, the use of ShouldDeleteOnReceive=true could cause the receiver to lose messages. For example when an error occurs on during receiving and one loses the sequence number, or when cancelled/scaled-down happens at the exact moment the client receives the message (and doesn't exist on the topic subscription anymore). |
SkipSubscriptionUpsert | no (false) | create/update subscription and rule | true means there should already be a topic subscription available, false (default) means that a subscription will be created with the provided Filter. |
One can also use the HTTP request query parameters instead of the request body to POST to the /api/Subscribe endpoint: /api/Subscribe?Subscription=orderProcessor.
Full request example
// POST -> /api/Subscribe
{
"Subscription": "orderProcessor",
"Filter": "sys.label = 'OrderCreated'",
"BatchSize": 11,
"TimeoutMilliseconds": 30000,
"ShouldDeleteOnReceive": false,
"SkipSubscriptionUpsert": false
}
Full response example
// 200 OK <- /api/Subscribe
[
{
"subscription": "orderProcessor",
"content": "ew0KICAiQ291bnRyeUNvZGUiOiAiQkUiLA0KICAiTW9uZXkiOiAgeyAiQW1vdW50IjogIDUwLCAiQ3VycmVuY3kiOiAgIkdCUCIgIH0NCn0NCg==",
"context": {
"x-applicationName": "InvoiceApp",
"x-batchId": "975f7ea4-6247-431b-afb6-6d27fb47516f",
"x-conversationId": "29500405-d7cf-4877-a72b-a3288cff9dc0",
"x-correlationId": "fc13d345-ebd7-44f2-89a9-4371258c0a08",
"x-ms-client-tracking-id": "test",
"Diagnostic-Id": "00-0cc7ed09eeaa51b0e835d90890aefb60-b0a02deac9f6fe6d-00"
},
"sequenceNumber": 99
},
...
]
Because messages can be 'acknowledged' separately from the receive location by 'subscription', the message is internally deferred. This is due to restrictions in the Azure SDK which impose that the same receiver must both receive and settle message. Deferring a message doesn't have this limitation.
✔️ Acknowledge message
The /api/Acknowledge endpoint allows users to 'settle' a received message via the /api/Subscribe endpoint. This requires the sequence number of the message.
| JSON Property | Required (default) | Translates to Service Bus | Description |
|---|---|---|---|
Subscription | yes | CreateReceiver | Name of Azure Service Bus topic subscription to receive the deferred message on (See internal workaround on /api/Subscribe) |
SequenceNumber | yes | ReceiveDeferredMessage | Unique number assigned by Service Bus, received by the /api/Subscribe response. |
AcknowledgementType | no (Complete) | Message settlement | Type of acknowledge action to take on the message:
|
IgnoreNotFoundException | no (false) | MessageNotFound | true means that MessageNotFound Service Bus failures during lookup of the message by its SequenceNumber will result in 202 Accepted; false means a 400 BadRequest will be responded. |
Full request example
// POST /api/Acknowledge
{
"Subscription": "subscriptionName",
"AcknowledgementType":"Complete",
"SequenceNumber": 99,
"IgnoreNotFoundException": false,
}
Customization
Available Bicep parameters
| Bicep parameter | Default | Description |
|---|---|---|
pubSubV2TopicName | pubsubv2router | Name of Azure Service Bus topic that acts as message broker for the PubSub V2 component. |
approvedMessageSizeInBytes | 200000 (200 KB) | Threshold to save Azure Service Bus messages' contents Azure Blob Storage, a.k.a. claim-checked. |
blobContainerPrefix | invictus (final container name: {blobContainerPrefix}{pubSubV2TopicName}) | Prefix of the Azure Blob Storage container to store messages with too big a size, a.k.a. claim-checked. |
serviceBusMessageTimeToLiveMinutes | 30 days | Period the published message should be active on the Azure Service Bus topic (translates to TimeToLive). |
pubSubSubscriptionLockTimeoutInMinutes | 1 min | Duration of the peek lock receive, see LockDuration. |
Migrating PubSub v1 to v2
Migrating to v2 includes changes in the authentication, endpoint and removal the metadata links.
👉 The
/api/Subscribeendpoint also needs to use aPOSTinstead of aGETHTTP method.
Publish message example
{
"PublishMessage": {
"type": "Http",
"inputs": {
"authentication": {
- "password": "@parameters('invictusPassword')",
- "type": "Basic",
- "username": "Invictus"
+ "audience": "[parameters('invictus').authentication.audience]",
+ "identity": "[parameters('infra').managedIdentity.id]",
+ "type": "ManagedServiceIdentity"
},
"method": "post",
- "uri": "[parameters('invictus').framework.pubSub.v1.publishUrl]",
+ "uri": "[parameters('invictus').framework.pubSub.v2.publishUrl]",
"body": {
"Content": "@{decodeBase64(body('Extract_Message_Context')['Content'])}",
"Context": "@body('Extract_Message_Context')?['Context']"
},
},
- "metadata": {
- "apiDefinitionUrl": "[parameters('invictus').framework.pubSub.definitionUrl]",
- "swaggerSource": "custom"
- },
"runAfter": {
"Extract_Message_Context": [
"Succeeded"
]
}
}
}
Subscribe message example
{
"SubscribeMessage": {
"type": "Http"
"inputs": {
"authentication": {
- "password": "@parameters('invictusPassword')",
- "type": "Basic",
- "username": "Invictus"
+ "audience": "[parameters('invictus').authentication.audience]",
+ "identity": "[parameters('infra').managedIdentity.id]",
+ "type": "ManagedServiceIdentity"
},
- "method": "get",
+ "method": "post",
"queries": {
"deleteOnReceive": false,
"filter": "Domain = 'B2B-Gateway' AND Action = 'EDI' AND Version = '1.0'",
"subscription": "[concat(substring(variables('logicAppName'), max(createarray(0, sub(length(variables('logicAppName')), 36)))), '-', uniquestring(variables('logicAppName')))]"
},
- "uri": "[parameters('invictus').framework.pubSub.v1.subscribeUrl]"
+ "uri": "[parameters('invictus').framework.pubSub.v2.subscribeUrl]"
},
- "metadata": {
- "apiDefinitionUrl": "[parameters('invictus').framework.pubSub.definitionUrl]",
- "swaggerSource": "custom"
- },
"recurrence": {
"frequency": "Second",
"interval": 1
},
"splitOn": "@triggerBody()",
"splitOnConfiguration": {
"correlation": {
"clientTrackingId": "@triggerBody()['Context']['x-ms-client-tracking-id']"
}
}
}
}
Acknowledge message example
{
"AcknowledgeMessage": {
"type": "Http",
"inputs": {
"authentication": {
- "password": "@parameters('invictusPassword')",
- "type": "Basic",
- "username": "Invictus"
+ "audience": "[parameters('invictus').authentication.audience]",
+ "identity": "[parameters('infra').managedIdentity.id]",
+ "type": "ManagedServiceIdentity"
},
"body": {
"AcknowledgementType": "Complete",
"IgnoreNotFoundException": true,
"Subscription": "@triggerBody()?['subscription']",
+ "SequenceNumber": "@triggerBody()?['sequenceNumber']"
- "LockToken": "@triggerBody()?['LockToken']",
- "MessageReadTime": "@trigger()['startTime']"
},
"method": "post",
- "uri": "[parameters('invictus').framework.pubSub.v1.acknowledgeUrl]"
+ "uri": "[parameters('invictus').framework.pubSub.v2.acknowledgeUrl]"
},
- "metadata": {
- "apiDefinitionUrl": "[parameters('invictus').framework.pubSub.v1.definitionUrl]",
- "swaggerSource": "custom"
- },
"runAfter": {}
}
}