Salesforce Integration: Die richtige API für Ihre Anbindung finden – Streaming API

Mittels der Streaming-API kann sich auf in Salesforce erstellte Events registriert werden, um diese bei Auslösung zu empfangen. Die Streaming-API bietet die Möglichkeit, mittels Push-Technologie (Pub-Sub-Prinzip) Ereignisse in “Echtzeit” zu empfangen.

Grundlagen

Mittels der Streaming-API können in Salesforce Events konsumiert werden. Hierzu zählen:

  • Push Topics (Salesforce Eventbus)
    • Push Topics werden dazu verwendet, auf Änderungen von Datensätzen in “Echtzeit” benachrichtigt zu werden.

  • Generic Events
    • Werden von der Lightning Platform bereitgestellt und dienen dazu, benutzerdefinierte Ereignisse in Lightning-Komponenten darstellen zu können.

  • Platform Events
    • Sind durch Salesforce bereitgestellte Events, die verwendet werden können, um benutzerdefinierte Ereignisse in Salesforce erstellen zu können. Platform Events können von anderen Systemen abonniert werden.

  • Change Data Capture (CDC) Events
    • CDC-Events sind Teil von Salesforce, mit denen Änderungen an Datensätzen im Salesforce-System verfolgt und an andere Systeme übertragen werden können.

Die Streaming-API funktioniert nach dem Publisher-Subscriber-Prinzip. Um Events konsumieren zu können, wird sich auf die jeweiligen Events registriert. Die hierfür von Salesforce verwendete Messaging-Technologie ComtD basiert auf dem Bayeux-Protokoll (Bayeux Protocol, CometD, and Long Polling ).

Die untenstehende Abbildung zeigt den Verbindungsaufbau sowie den Mechanismus zwischen Salesforce und Integrationslösung.

Art der Anbindung

In diesem Artikel wird die Anbindung via Streaming API und Push-Topic-Events erläutert. Zur Verwendung von Push-Topics1 wird der interne Salesforce Eventbus verwendet. Weiterführende Informationen über andere Anbindungsmöglichkeiten mittels der Streaming-API kann unter nachgelesen werden.

1Push-Topics werden von den Salesforceentwicklern des jeweiligen Kunden erstellt. Ebenfalls sollten die entsprechenden Events durch den Kunden bereitgestellt werden.

Salesforce Eventbus

Der Salesforce Eventbus ist eine Funktion in Salesforce, die die Streaming-API verwendet, um eine zentrale Plattform zum Austausch von Ereignissen zwischen verschiedenen Anwendungen und Systemen bereitzustellen. Mit PushTopic-Regeln und benutzerdefinierten Ereignissen können Anwendungen in Echtzeit Ereignisse empfangen und darauf reagieren. Hierdurch können Geschäftsprozesse automatisiert und die Zusammenarbeit zwischen Third-Party-Applications und Salesforce verbessert werden.

Der Eventbus arbeitet nach dem Pub-Sub-Prinzip:

Nachrichten werden durch Eventproducer auf den Eventbus gelegt. Consumer können sich auf Events registrierten. Jede Nachricht erhält eine unique ID, die im technischen Feld ReplayID enthalten ist. Der Wert des Feldes ReplayID wird von Salesforce intern gesetzt, sobald das Ereignis dem Consumer zugestellt wird. Es bezieht sich auf die Position des Ereignisses im Ereignisstrom. Es ist allerdings nicht garantiert, dass die Ids für aufeinanderfolgende Ereignisse zusammenhängend sind. Beispielsweise kann das Ereignis, das auf das Ereignis mit der ID 999 folgt, eine ID von 1.025 haben. Somit kann ein Sequencing über die ReplayIDs nicht realisiert werden.

Ein Consumer kann die ReplayID speichern und im Falle eines Verbindungsabbruchs oder Ähnlichem über die ReplayID alle verpassten Events wiederholen lassen.

Der Replay-Mechanismus wird durch die Konfiguration/Schreiben der ReplayID bestimmt:

C#-spezifische Bibliotheken zur Anbindung von Salesforce mittels Streaming API

Die in diesem Artikel verwendeten Bibliotheken zur Anbindung von Salesforce sind:

  • ist eine .NET-Bibliothek, die es ermöglicht, Salesforce-Ereignisse in Echtzeit zu abonnieren und darauf zu reagieren. Die Bibliothek basiert auf der Streaming-API und bietet eine einfache Möglichkeit, sich für Push-Topics zu registrieren. Die Bibliothek ist Open Source.

  • Ermöglicht die Interaktion mit der Streaming-API in .Net-Umgebungen.

  • ist eine C#-Bibliothek, die es ermöglicht, Daten und Metadaten über die REST-API in .NET Core-Anwendungen (unterstützt .net 5, .net6 und .net7) abzufragen, zu erstellen, zu aktualisieren und zu löschen. Die Bibliothek bietet eine einfache Möglichkeit, auf Salesforce-Daten zuzugreifen und mit ihnen zu interagieren. Diese Bibliothek wird für die OAuth-Authentifizierung der oben genannten Nugets verwendet (die oben genannten Nugets haben als Dependencies auf dieses Nuget). In bereits durchgeführten Kundenprojekten wurde die Bibliothek um den JWT-Bearer-Token-Flow erweitert und als eigenes Projekt mit in die Solution gehangen.

Authentifizierungsmöglichkeiten

Es wird sich via OAuth2.0 (siehe ) authentifiziert. Dies wird allerdings von den verwendeten Nuget-Paketen übernommen.

Vorbereitende Maßnahmen / Voraussetzungen (optional)

 

Authentifizierung

Die Authentifizierung erfolgt über OAuth (Connected App).

 

Limitierungen 

  • Aktuell sind keine Limitierungen für die Streaming-API (v.57) bekannt.

  • Für Push-Topics gelten jedoch folgende Limitierungen (Siehe PushTopic Streaming Allocations)

  • Messages werden im Salesforce-EventBus je nach Eventtyp nur 24h bzw. 72h gespeichert.

Code-Beispiel

Um Events von Salesforce empfangen zu können, wird im ersten Schritt ein Listener benötigt, der sich später durch Konfiguration auf die entsprechende Push-Topic registriert.

Schritt 1: Implementierung Listener

Hierzu wird eine Klasse erstellt, die das Interface IMessageListener implementiert (Referenziert über ).
Innerhalb der OnMessage-Methode wird die Logik implementiert. Hier ist beispielhaft die Deserialisierung der Message in ein Objekt dargestellt.

				
					using CometD.NetCore.Bayeux.Client;

public class MessageListener : IMessageListener
  {
		public void OnMessage(IClientSessionChannel _, CometD.NetCore.Bayeux.IMessage message)
        {
            var payload = JsonConvert.DeserializeObject<MessagePayload>(message.Json)                                                           
         }
  }
				
			

Der MessagePayload sieht beispielhaft (:warning: Die Payload wird durch die Definition des Kundenevents festgelegt) wie unten gezeigt aus. Die Data-Klasse bildet hierbei den Payload. Sie beinhaltet weitere Unterklassen. Die Klasse Sobject beinhaltet die Daten, aus dem vom Kunden definierten Event.

				
					using Newtonsoft.Json;
using System;

namespace MyNamespace
{
    public class Data
    {
        [JsonProperty("event")]
        public Event Event { get; set; }

        [JsonProperty("sobject")]
        public Sobject Sobject { get; set; }
    }

    public class Event
    {
        [JsonProperty("createdDate")]
        public DateTime CreatedDate { get; set; }

        [JsonProperty("replayId")]
        public int ReplayId { get; set; }

        [JsonProperty("type")]
        public string Type { get; set; }
    }

    /// <summary>
    /// This class represents the event sent by Salesforce.
    /// </summary>
    public class ProjectMessagePayload
    {
        [JsonProperty("data")]
        public Data Data { get; set; }

        [JsonProperty("channel")]
        public string Channel { get; set; }
    }

    public class Sobject
    {
        [JsonProperty("KA_OwnerName__c")]
        public string KAOwnerNameC { get; set; }

        [JsonProperty("Id")]
        public string Id { get; set; }

        [JsonProperty("KS_CRMID__c")]
        public string KsCrmIdC { get; set; }

        [JsonProperty("KS_SFCRMID__c")]
        public string KsSfcmId { get; set; }

        [JsonProperty("CurrencyIsoCode")]
        public string CurrencyIsoCode { get; set; }

        [JsonProperty("KS_Name__c")]
        public string KSNameC { get; set; }
    }
}

				
			

Schritt 2: Registrierung von Eventbus und MessageListener

Der in Schritt 1 erstellte MessageListener muss nun für die Dependency Injection registriert werden. In der Methode zur Konfiguration der Services werden folgende Aufrufe getätigt:

				
					 .ConfigureServices((hostBuilderContext, services) =>
            {
              [...]
                       
                var salesforceConfig = 
                        hostBuilderContext
                          .Configuration
                            .GetSection($"{Assembly.GetEntryAssembly().GetName().Name}")
                                .GetSection(nameof(Settings))
                                    .GetSection(nameof(Settings.ConfigSectionSalesforce)).Value;
                                    
                _ = services.AddSalesforceEventBus(salesforceConfig);
                _ = services.AddHostedService<SalesforceEventBusHostedService<MessageListener>>();
                _ = services.AddTransient<IMessageListener, MessageListener>();
                
                [...]
            })
				
			
  • In Zeile 5–10 wird sich aus der Konfiguration der Sectionname der Salesforcekonfiguration geholt. Siehe Schritt 3.

  • In Zeile 12 wird über die Extension AddSalesforceEventBus(salesforceConfig)der darunterliegende Client für die Kommunikation mit Salesforce sowie der Eventbus der ServiceCollection hinzugefügt. Die Extension ist über das oben erwähnte Nuget verfügbar.

  • In Zeile 13 wird der SalesforceEventBusHostedService als HostedService hinzugefügt. Via des HostedServices erfolgt bei dem Start und Stop das Subscribe bzw. das Unsubscribe auf das jeweils konfigurierte CustomEvent.

  • In Zeile 14 wird der eigentliche Listener der ServiceCollection hinzugefügt.

Schritt 3: Konfiguration des Eventbus

Zur Konfiguration bedarf es codetechnisch nur einer Property.

				
					 public class Settings
    {
        public string ConfigSectionSalesforce { get; set; }
    }
				
			

Die eigentliche Konfiguration SalesforceConfiguration muss trotzdem entsprechend hinterlegt werden. Diese befindet sich verschachtelt in der Extension AddSalesforceEventBus(salesforceConfig) (siehe hierzu Schritt 2 Zeile 12). Innerhalb der Extension wird der Client zur Kommunikation mit Salesforce konfiguriert. Hier wird bei der Konfiguration die SalesforceConfiguration der ServiceCollection hinzugefügt, sodass diese im späteren Verlauf (beispielsweise bei dem Hinzufügen des SalesforceEventBusHostedService (wird im Konstruktor benötigt)) verwendet werden kann.

Die Konfiguration beinhaltet folgende Properties:

				
					public sealed class SalesforceConfiguration
    {
        /// <summary>
        /// Consumer Id of the Salesforce Connected App.
        /// </summary>
        public string ClientId { get; set; } = string.Empty;

        /// <summary>
        /// Consumer secret of the Salesforce Connected App.
        /// </summary>
        public string ClientSecret { get; set; } = string.Empty;

        /// <summary>
        /// Url to login to Salesforce
        ///  https://test.salesforce.com/services/oauth2/authorize
        ///  or https://login.salesforce.com/services/oauth2/authorize.
        /// </summary>
        public string LoginUrl { get; set; } = string.Empty;

        /// <summary>
        /// Url of the Salesforce organization.
        /// </summary>
        public string OrganizationUrl { get; set; } = string.Empty;

        /// <summary>
        /// Path of the endpoint to publish platform events.
        /// </summary>
        public string PublishEndpoint { get; set; } = string.Empty;

        /// <summary>
        /// OAuth refresh token of the Salesforce Connected App.
        /// </summary>
        public string RefreshToken { get; set; } = string.Empty;

        /// <summary>
        /// Uri to connect to CometD.
        /// </summary>
        public string CometDUri { get; set; } = string.Empty;

        /// <summary>
        /// Topic or Event Uri.
        /// </summary>
        public string EventOrTopicUri { get; set; } = string.Empty;

        /// <summary>
        /// Salesforce uri for OAuth authentication.
        /// </summary>
        public string OAuthUri { get; set; } = string.Empty;

        /// <summary>
        /// Retry for the connections and authentications.
        /// </summary>
        public int Retry { get; set; }

        /// <summary>
        /// The number to be used for BackoffPower for policy. The default is 2.
        /// </summary>
        public int BackoffPower { get; set; } = 2;

        /// <summary>
        /// The expiration for <see cref="ForceClient"/> token. The default is 1:00 hour.
        /// </summary>
        public string TokenExpiration { get; set; } = "01:00:00";

        /// <summary>
        /// The event that gets raised when a request for proposal is approved and the Deal is in working status.
        /// </summary>
        public string CustomEvent { get; set; } = string.Empty;

        /// <summary>
        /// Salesforce ReplayId for specific message.
        /// </summary>
        public int ReplayId { get; set; }

        /// <summary>
        /// Long polling duration. Default  120 * 1000.
        /// </summary>
        public long? ReadTimeOut { get; set; }

        /// <summary>
        /// The type of the Salesforce authentication token. Default "OAuth".
        /// </summary>
        public string TokenType { get; set; } = "OAuth";
    }
				
			

In der Konfiguration wäre die Angabe der Config dann wie folgt:

				
					{
	"Projekt:ClientId": "kommt aus der Connected App in Salesforce",
	"Projekt:ClientSecret": "kommt aus der Connected App in Salesforce",
	"Projekt:CometDUri": "/cometd/55.0",
	"Projekt:CustomEvent": "EventName",
	"Projekt:EventOrTopicUri": "/topic",
	"Projekt:LoginUrl": "<https://login.salesforce.com",>
	"Projekt:OAuthUri": "/services/oauth2/token",
	"Projekt:PublishEndpoint": "/services/data/v55.0/sobjects/",
	"Projekt:RedirectUri": "https://login.salesforce.com",
	"Projekt:RefreshToken": "muss manuell generiert werden (siehe Besonderheiten)",
	"Projekt:ReplayId": "-1",
	"Projekt:Retry": "2",
	"Projekt:ConfigSectionSalesforce": "siehe Schritt 2 Zeile 12"
}
				
			

Besonderheiten (optional)

Zugriffs- und Aktualisierungs-Token von der Salesforce REST API abrufen.

Nach Konfiguration der Connected App (weitere Informationen finden Sie auf unserem Beitrag Salesforce Integration)


Schritt 1:

Aufrufen von (HTTP-GET) via Browser:

				
					https://<YOUR_INSTANCE>.salesforce.com/services/oauth2/authorize?response_type=code&client_id=<CONSUMER_KEY>&redirect_uri=https://login.salesforce.com/
				
			

Falls ein Popup erscheint, bitte zustimmen.


Es wird redirected zu einer URL, die wie folgt aussieht:

				
					https://login.salesforce.com/?code=aPrxYXyxzkuBzbDGdwv67qekAQredtrsWqty38LsdhfREyTRbvdjvTqdbvxPVC__4Cb9xGKDGErtOw%3D%3D
				
			

:warning: Code ist URL-Encoded

Schritt 2: Get Access Token & Refresh Token

Aufruf per CURL oder per Postman (HTTP-POST)

				
					https://<YOUR_INSTANCE>.salesforce.com/services/oauth2/token?code=<CODE>&grant_type=authorization_code&client_id=<CONSUMER_KEY>&client_secret=<CONSUMER_SECRET>&redirect_uri=https://login.salesforce.com/
				
			

In der Response sollten sowohl Access-Token als auch Refresh-Token enthalten sein.

Sie haben Fragen oder möchten mehr über unsere Produkte und Dienstleistungen erfahren?
Kontaktieren Sie uns jetzt über unser Kontaktformular!

Data Passion Milen & Carsten
Data Passion Milen & Carsten
HABEN WIR
IHR INTERESSE
GEWECKT?
Schreiben Sie uns eine kurze E-Mail oder rufen Sie uns einfach an! Wir kümmern uns um Ihr Anliegen innerhalb der nächsten 24 Stunden.

Tel.: +49 (40) 6963816–0
Tel.: +49 (151) 1176898-0
E-Mail: [email protected]

Kontaktanfrage

Kontaktanfrage