Mittels der Streaming-API kann sich auf in Salesforce erstellte Events registriert werden, um diese bei Auslösung zu empfangen. Die Streaming-API bietet die Möglichkeit, mittels Push-Technologie (Pub-Sub-Prinzip) Ereignisse in “Echtzeit” zu empfangen.
Grundlagen
Mittels der Streaming-API können in Salesforce Events konsumiert werden. Hierzu zählen:
- Push Topics (Salesforce Eventbus)
- Push Topics werden dazu verwendet, auf Änderungen von Datensätzen in “Echtzeit” benachrichtigt zu werden.
- Push Topics werden dazu verwendet, auf Änderungen von Datensätzen in “Echtzeit” benachrichtigt zu werden.
- Generic Events
- Werden von der Lightning Platform bereitgestellt und dienen dazu, benutzerdefinierte Ereignisse in Lightning-Komponenten darstellen zu können.
- Werden von der Lightning Platform bereitgestellt und dienen dazu, benutzerdefinierte Ereignisse in Lightning-Komponenten darstellen zu können.
- Platform Events
- Sind durch Salesforce bereitgestellte Events, die verwendet werden können, um benutzerdefinierte Ereignisse in Salesforce erstellen zu können. Platform Events können von anderen Systemen abonniert werden.
- Sind durch Salesforce bereitgestellte Events, die verwendet werden können, um benutzerdefinierte Ereignisse in Salesforce erstellen zu können. Platform Events können von anderen Systemen abonniert werden.
- Change Data Capture (CDC) Events
- CDC-Events sind Teil von Salesforce, mit denen Änderungen an Datensätzen im Salesforce-System verfolgt und an andere Systeme übertragen werden können.
- CDC-Events sind Teil von Salesforce, mit denen Änderungen an Datensätzen im Salesforce-System verfolgt und an andere Systeme übertragen werden können.
Die Streaming-API funktioniert nach dem Publisher-Subscriber-Prinzip. Um Events konsumieren zu können, wird sich auf die jeweiligen Events registriert. Die hierfür von Salesforce verwendete Messaging-Technologie ComtD basiert auf dem Bayeux-Protokoll (Bayeux Protocol, CometD, and Long Polling ).
Die untenstehende Abbildung zeigt den Verbindungsaufbau sowie den Mechanismus zwischen Salesforce und Integrationslösung.
![](https://datapassion.de/wp-content/uploads/2023/04/CometD.png)
Art der Anbindung
In diesem Artikel wird die Anbindung via Streaming API und Push-Topic-Events erläutert. Zur Verwendung von Push-Topics1 wird der interne Salesforce Eventbus verwendet. Weiterführende Informationen über andere Anbindungsmöglichkeiten mittels der Streaming-API kann unter Getting Started with the Salesforce Streaming API in .NET Core nachgelesen werden.
1Push-Topics werden von den Salesforceentwicklern des jeweiligen Kunden erstellt. Ebenfalls sollten die entsprechenden Events durch den Kunden bereitgestellt werden.
Salesforce Eventbus
Der Salesforce Eventbus ist eine Funktion in Salesforce, die die Streaming-API verwendet, um eine zentrale Plattform zum Austausch von Ereignissen zwischen verschiedenen Anwendungen und Systemen bereitzustellen. Mit PushTopic-Regeln und benutzerdefinierten Ereignissen können Anwendungen in Echtzeit Ereignisse empfangen und darauf reagieren. Hierdurch können Geschäftsprozesse automatisiert und die Zusammenarbeit zwischen Third-Party-Applications und Salesforce verbessert werden.
Der Eventbus arbeitet nach dem Pub-Sub-Prinzip:
![](https://datapassion.de/wp-content/uploads/2023/04/Eventbus.png)
Nachrichten werden durch Eventproducer auf den Eventbus gelegt. Consumer können sich auf Events registrierten. Jede Nachricht erhält eine unique ID, die im technischen Feld ReplayID
enthalten ist. Der Wert des Feldes ReplayID
wird von Salesforce intern gesetzt, sobald das Ereignis dem Consumer zugestellt wird. Es bezieht sich auf die Position des Ereignisses im Ereignisstrom. Es ist allerdings nicht garantiert, dass die Ids für aufeinanderfolgende Ereignisse zusammenhängend sind. Beispielsweise kann das Ereignis, das auf das Ereignis mit der ID 999 folgt, eine ID von 1.025 haben. Somit kann ein Sequencing über die ReplayIDs
nicht realisiert werden.
Ein Consumer kann die ReplayID
speichern und im Falle eines Verbindungsabbruchs oder Ähnlichem über die ReplayID
alle verpassten Events wiederholen lassen.
Der Replay-Mechanismus wird durch die Konfiguration/Schreiben der ReplayID
bestimmt:
![](https://datapassion.de/wp-content/uploads/2023/04/ReplayID-1024x581.png)
C#-spezifische Bibliotheken zur Anbindung von Salesforce mittels Streaming API
Die in diesem Artikel verwendeten Bibliotheken zur Anbindung von Salesforce sind:
Bet.BuildingBlocks.SalesforceEventBus 3.0.1 ist eine .NET-Bibliothek, die es ermöglicht, Salesforce-Ereignisse in Echtzeit zu abonnieren und darauf zu reagieren. Die Bibliothek basiert auf der Streaming-API und bietet eine einfache Möglichkeit, sich für Push-Topics zu registrieren. Die Bibliothek ist Open Source.
CometD.NetCore.Salesforce 2.0.5
Ermöglicht die Interaktion mit der Streaming-API in .Net-Umgebungen.
NetCoreForce.Client 3.1.0 ist eine C#-Bibliothek, die es ermöglicht, Daten und Metadaten über die REST-API in .NET Core-Anwendungen (unterstützt .net 5, .net6 und .net7) abzufragen, zu erstellen, zu aktualisieren und zu löschen. Die Bibliothek bietet eine einfache Möglichkeit, auf Salesforce-Daten zuzugreifen und mit ihnen zu interagieren. Diese Bibliothek wird für die OAuth-Authentifizierung der oben genannten Nugets verwendet (die oben genannten Nugets haben als Dependencies auf dieses Nuget). In bereits durchgeführten Kundenprojekten wurde die Bibliothek um den JWT-Bearer-Token-Flow erweitert und als eigenes Projekt mit in die Solution gehangen.
Authentifizierungsmöglichkeiten
Es wird sich via OAuth2.0 (siehe Salesforce Developers) authentifiziert. Dies wird allerdings von den verwendeten Nuget-Paketen übernommen.
Vorbereitende Maßnahmen / Voraussetzungen (optional)
Authentifizierung
Die Authentifizierung erfolgt über OAuth (Connected App).
Limitierungen
Aktuell sind keine Limitierungen für die Streaming-API (v.57) bekannt.
Für Push-Topics gelten jedoch folgende Limitierungen (Siehe PushTopic Streaming Allocations)
![](https://datapassion.de/wp-content/uploads/2023/04/Limitierung-Salesforce-1-1024x490.png)
Messages werden im Salesforce-EventBus je nach Eventtyp nur 24h bzw. 72h gespeichert.
Code-Beispiel
Um Events von Salesforce empfangen zu können, wird im ersten Schritt ein Listener benötigt, der sich später durch Konfiguration auf die entsprechende Push-Topic registriert.
Schritt 1: Implementierung Listener
Hierzu wird eine Klasse erstellt, die das Interface IMessageListener
implementiert (Referenziert über CometD.NetCore.Salesforce 2.0.5 ).
Innerhalb der OnMessage
-Methode wird die Logik implementiert. Hier ist beispielhaft die Deserialisierung der Message in ein Objekt dargestellt.
using CometD.NetCore.Bayeux.Client;
public class MessageListener : IMessageListener
{
public void OnMessage(IClientSessionChannel _, CometD.NetCore.Bayeux.IMessage message)
{
var payload = JsonConvert.DeserializeObject(message.Json)
}
}
Der MessagePayload sieht beispielhaft ( Die Payload wird durch die Definition des Kundenevents festgelegt) wie unten gezeigt aus. Die
Data
-Klasse bildet hierbei den Payload. Sie beinhaltet weitere Unterklassen. Die Klasse Sobject
beinhaltet die Daten, aus dem vom Kunden definierten Event.
using Newtonsoft.Json;
using System;
namespace MyNamespace
{
public class Data
{
[JsonProperty("event")]
public Event Event { get; set; }
[JsonProperty("sobject")]
public Sobject Sobject { get; set; }
}
public class Event
{
[JsonProperty("createdDate")]
public DateTime CreatedDate { get; set; }
[JsonProperty("replayId")]
public int ReplayId { get; set; }
[JsonProperty("type")]
public string Type { get; set; }
}
///
/// This class represents the event sent by Salesforce.
///
public class ProjectMessagePayload
{
[JsonProperty("data")]
public Data Data { get; set; }
[JsonProperty("channel")]
public string Channel { get; set; }
}
public class Sobject
{
[JsonProperty("KA_OwnerName__c")]
public string KAOwnerNameC { get; set; }
[JsonProperty("Id")]
public string Id { get; set; }
[JsonProperty("KS_CRMID__c")]
public string KsCrmIdC { get; set; }
[JsonProperty("KS_SFCRMID__c")]
public string KsSfcmId { get; set; }
[JsonProperty("CurrencyIsoCode")]
public string CurrencyIsoCode { get; set; }
[JsonProperty("KS_Name__c")]
public string KSNameC { get; set; }
}
}
Schritt 2: Registrierung von Eventbus und MessageListener
Der in Schritt 1 erstellte MessageListener
muss nun für die Dependency Injection registriert werden. In der Methode zur Konfiguration der Services werden folgende Aufrufe getätigt:
.ConfigureServices((hostBuilderContext, services) =>
{
[...]
var salesforceConfig =
hostBuilderContext
.Configuration
.GetSection($"{Assembly.GetEntryAssembly().GetName().Name}")
.GetSection(nameof(Settings))
.GetSection(nameof(Settings.ConfigSectionSalesforce)).Value;
_ = services.AddSalesforceEventBus(salesforceConfig);
_ = services.AddHostedService>();
_ = services.AddTransient();
[...]
})
In Zeile 5–10 wird sich aus der Konfiguration der Sectionname der Salesforcekonfiguration geholt. Siehe Schritt 3.
In Zeile 12 wird über die Extension
AddSalesforceEventBus(salesforceConfig)
der darunterliegende Client für die Kommunikation mit Salesforce sowie der Eventbus der ServiceCollection hinzugefügt. Die Extension ist über das oben erwähnte Nuget Bet.BuildingBlocks.SalesforceEventBus 3.0.1 verfügbar.In Zeile 13 wird der
SalesforceEventBusHostedService
als HostedService hinzugefügt. Via des HostedServices erfolgt bei dem Start und Stop das Subscribe bzw. das Unsubscribe auf das jeweils konfigurierte CustomEvent.In Zeile 14 wird der eigentliche Listener der ServiceCollection hinzugefügt.
Schritt 3: Konfiguration des Eventbus
Zur Konfiguration bedarf es codetechnisch nur einer Property.
public class Settings
{
public string ConfigSectionSalesforce { get; set; }
}
Die eigentliche Konfiguration SalesforceConfiguration
muss trotzdem entsprechend hinterlegt werden. Diese befindet sich verschachtelt in der Extension AddSalesforceEventBus(salesforceConfig)
(siehe hierzu Schritt 2 Zeile 12). Innerhalb der Extension wird der Client zur Kommunikation mit Salesforce konfiguriert. Hier wird bei der Konfiguration die SalesforceConfiguration
der ServiceCollection hinzugefügt, sodass diese im späteren Verlauf (beispielsweise bei dem Hinzufügen des SalesforceEventBusHostedService
(wird im Konstruktor benötigt)) verwendet werden kann.
Die Konfiguration beinhaltet folgende Properties:
public sealed class SalesforceConfiguration
{
///
/// Consumer Id of the Salesforce Connected App.
///
public string ClientId { get; set; } = string.Empty;
///
/// Consumer secret of the Salesforce Connected App.
///
public string ClientSecret { get; set; } = string.Empty;
///
/// Url to login to Salesforce
/// https://test.salesforce.com/services/oauth2/authorize
/// or https://login.salesforce.com/services/oauth2/authorize.
///
public string LoginUrl { get; set; } = string.Empty;
///
/// Url of the Salesforce organization.
///
public string OrganizationUrl { get; set; } = string.Empty;
///
/// Path of the endpoint to publish platform events.
///
public string PublishEndpoint { get; set; } = string.Empty;
///
/// OAuth refresh token of the Salesforce Connected App.
///
public string RefreshToken { get; set; } = string.Empty;
///
/// Uri to connect to CometD.
///
public string CometDUri { get; set; } = string.Empty;
///
/// Topic or Event Uri.
///
public string EventOrTopicUri { get; set; } = string.Empty;
///
/// Salesforce uri for OAuth authentication.
///
public string OAuthUri { get; set; } = string.Empty;
///
/// Retry for the connections and authentications.
///
public int Retry { get; set; }
///
/// The number to be used for BackoffPower for policy. The default is 2.
///
public int BackoffPower { get; set; } = 2;
///
/// The expiration for token. The default is 1:00 hour.
///
public string TokenExpiration { get; set; } = "01:00:00";
///
/// The event that gets raised when a request for proposal is approved and the Deal is in working status.
///
public string CustomEvent { get; set; } = string.Empty;
///
/// Salesforce ReplayId for specific message.
///
public int ReplayId { get; set; }
///
/// Long polling duration. Default 120 * 1000.
///
public long? ReadTimeOut { get; set; }
///
/// The type of the Salesforce authentication token. Default "OAuth".
///
public string TokenType { get; set; } = "OAuth";
}
In der Konfiguration wäre die Angabe der Config dann wie folgt:
{
"Projekt:ClientId": "kommt aus der Connected App in Salesforce",
"Projekt:ClientSecret": "kommt aus der Connected App in Salesforce",
"Projekt:CometDUri": "/cometd/55.0",
"Projekt:CustomEvent": "EventName",
"Projekt:EventOrTopicUri": "/topic",
"Projekt:LoginUrl": "
"Projekt:OAuthUri": "/services/oauth2/token",
"Projekt:PublishEndpoint": "/services/data/v55.0/sobjects/",
"Projekt:RedirectUri": "https://login.salesforce.com",
"Projekt:RefreshToken": "muss manuell generiert werden (siehe Besonderheiten)",
"Projekt:ReplayId": "-1",
"Projekt:Retry": "2",
"Projekt:ConfigSectionSalesforce": "siehe Schritt 2 Zeile 12"
}
Besonderheiten (optional)
Zugriffs- und Aktualisierungs-Token von der Salesforce REST API abrufen.
Nach Konfiguration der Connected App (weitere Informationen finden Sie auf unserem Beitrag Salesforce Integration)
Schritt 1:
Aufrufen von (HTTP-GET) via Browser:
https://.salesforce.com/services/oauth2/authorize?response_type=code&client_id=&redirect_uri=https://login.salesforce.com/
Falls ein Popup erscheint, bitte zustimmen.
Es wird redirected zu einer URL, die wie folgt aussieht:
https://login.salesforce.com/?code=aPrxYXyxzkuBzbDGdwv67qekAQredtrsWqty38LsdhfREyTRbvdjvTqdbvxPVC__4Cb9xGKDGErtOw%3D%3D
Code ist URL-Encoded
Schritt 2: Get Access Token & Refresh Token
Aufruf per CURL oder per Postman (HTTP-POST)
https://.salesforce.com/services/oauth2/token?code=&grant_type=authorization_code&client_id=&client_secret=&redirect_uri=https://login.salesforce.com/
In der Response sollten sowohl Access-Token als auch Refresh-Token enthalten sein.
Sie haben Fragen oder möchten mehr über unsere Produkte und Dienstleistungen erfahren?
Kontaktieren Sie uns jetzt über unser Kontaktformular!