Sinks
A sink is a destination for the events that are replicated from the source cluster. Replicator supports the following sinks:
KurrentDB
Section titled “KurrentDB”When replicating events to latest KurrentDB versions, we recommend using the KurrentDB sink.
You need to specify two configurations options for it:
replicator.sink.protocol
- set togrpc
replicator.sink.connectionString
- use the target cluster connection string, which you’d use for the regular client.
For example, for a Kurrent Cloud cluster the connection string would look like:
esdb+discover://<username>:<password>@<cluster_id>.mesdb.eventstore.cloud
.
Using gRPC gives you more predictable write operation time. For example, on a C4-size instance in Google Cloud Platform, one write would take 4-5 ms, and this number allows you to calculate the replication process throughput, as it doesn’t change much when the database size grows.
EventStoreDB TCP Legacy
Section titled “EventStoreDB TCP ”The TCP sink should only be used when migrating from one older version cluster to another older version cluster. For EventStoreDB v20+ and KurrentDB, use the KurrentDB sink instead.
For the TCP sink, you need to specify two configurations options for it:
replicator.sink.protocol
- set totcp
replicator.sink.connectionString
- use the target cluster connection string, which you’d use for the TCP client.
Check the connection string format and options in the TCP client documentation.
The risk of using the TCP sink is that you might get unstable write speed. The speed might go down when the database size grows, unlike KurrentDB sink write speed, which remains stable.
Apache Kafka Experimental
Section titled “Apache Kafka ”The Kafka sink allows you to set up continuous replication from EventStoreDB to Apache Kafka. It might be useful, for example, to scale out subscriptions, as you can partition events in Kafka. Then, you can have a consumer group with concurrent consumers, which process individual partitions, instead of having a single partition on $all
.
There’s no way to specify a custom partition, so the default (random) Kafka partitioner will be used.
The Kafka sink needs to be configured in the sink
section of the Replicator configuration.
replicator.sink.protocol
- set tokafka
replicator.sink.connectionString
- Kafka connection string, which is a comma-separated list of connection optionsreplicator.sink.partitionCount
- the number of Kafka partitions in the target topicreplicator.sink.router
- optional JavaScript function to route events to topics and partitions
Example:
replicator: reader: protocol: grpc sink: connectionString: bootstrap.servers=localhost:9092 protocol: kafka partitionCount: 10 router: ./config/route.js
Routing
Section titled “Routing”Replicator needs to route events to Kafka. In particular, it needs to know the topic, where to write events to, and the partition key. By default, the topic is the stream “category” (similar to the category projection), which is part of the event stream before the dash. For example, an event from Customer-123
stream will be routed to the Customer
topic. The stream name is used as the partition key to ensure events order within a stream.
It’s possible to customise both topic and partition key by using a routing function. You can supply a JavaScript code file, which will instruct Replicator about routing events to topics and partitions.
The code file must have a function called route
, which accepts the following parameters:
stream
- original stream nameeventType
- original event typedata
- event payload (data), only works with JSONmetadata
- event metadata, only works with JSON
The function needs to return an object with two fields:
topic
- target topicpartitionKey
- partition key
For example:
function route(stream, eventType, data, meta) { return { topic: "myTopic", partitionKey: stream }}
The example function will tell Replicator to produce all the events to the myTopic
topic, using the stream name as partition key.
You need to specify the name of the while, which contains the route
function, in the replicator.sink.router
setting. Such a configuration is displayed in the sample configuration YAML snipped above.