Skip to content

Sinks

A sink is a destination for the events that are replicated from the source cluster. Replicator supports the following sinks:

When replicating events to latest KurrentDB versions, we recommend using the KurrentDB sink.

You need to specify two configurations options for it:

  • replicator.sink.protocol - set to grpc
  • replicator.sink.connectionString - use the target cluster connection string, which you’d use for the regular client.

For example, for a Kurrent Cloud cluster the connection string would look like:

esdb+discover://<username>:<password>@<cluster_id>.mesdb.eventstore.cloud.

Using gRPC gives you more predictable write operation time. For example, on a C4-size instance in Google Cloud Platform, one write would take 4-5 ms, and this number allows you to calculate the replication process throughput, as it doesn’t change much when the database size grows.

The TCP sink should only be used when migrating from one older version cluster to another older version cluster. For EventStoreDB v20+ and KurrentDB, use the KurrentDB sink instead.

For the TCP sink, you need to specify two configurations options for it:

  • replicator.sink.protocol - set to tcp
  • replicator.sink.connectionString - use the target cluster connection string, which you’d use for the TCP client.

Check the connection string format and options in the TCP client documentation.

The risk of using the TCP sink is that you might get unstable write speed. The speed might go down when the database size grows, unlike KurrentDB sink write speed, which remains stable.

Apache Kafka Experimental

Section titled “Apache Kafka ”

The Kafka sink allows you to set up continuous replication from EventStoreDB to Apache Kafka. It might be useful, for example, to scale out subscriptions, as you can partition events in Kafka. Then, you can have a consumer group with concurrent consumers, which process individual partitions, instead of having a single partition on $all.

There’s no way to specify a custom partition, so the default (random) Kafka partitioner will be used.

The Kafka sink needs to be configured in the sink section of the Replicator configuration.

  • replicator.sink.protocol - set to kafka
  • replicator.sink.connectionString - Kafka connection string, which is a comma-separated list of connection options
  • replicator.sink.partitionCount - the number of Kafka partitions in the target topic
  • replicator.sink.router - optional JavaScript function to route events to topics and partitions

Example:

replicator:
reader:
connectionString: esdb+discover://admin:[email protected]
protocol: grpc
sink:
connectionString: bootstrap.servers=localhost:9092
protocol: kafka
partitionCount: 10
router: ./config/route.js

Replicator needs to route events to Kafka. In particular, it needs to know the topic, where to write events to, and the partition key. By default, the topic is the stream “category” (similar to the category projection), which is part of the event stream before the dash. For example, an event from Customer-123 stream will be routed to the Customer topic. The stream name is used as the partition key to ensure events order within a stream.

It’s possible to customise both topic and partition key by using a routing function. You can supply a JavaScript code file, which will instruct Replicator about routing events to topics and partitions.

The code file must have a function called route, which accepts the following parameters:

  • stream - original stream name
  • eventType - original event type
  • data - event payload (data), only works with JSON
  • metadata - event metadata, only works with JSON

The function needs to return an object with two fields:

  • topic - target topic
  • partitionKey - partition key

For example:

function route(stream, eventType, data, meta) {
return {
topic: "myTopic",
partitionKey: stream
}
}

The example function will tell Replicator to produce all the events to the myTopic topic, using the stream name as partition key.

You need to specify the name of the while, which contains the route function, in the replicator.sink.router setting. Such a configuration is displayed in the sample configuration YAML snipped above.