Mikroservis mimarilerde mikroDB kavramı karşımıza çıkar. Her servis kendi içinde bir veri tabanına sahiptir. Bir serviste yapılan bir işlemden anlık olarak birden fazla mikroservisin haberdar olması gerekir. Bu durumda mikroservisler arasında anlık veri akışı nasıl sağlanır?
Bu noktada CDC mikroservis mimaride mikroDB’ler arasında veri alışverişini sağlamak için kullanılan en efektif yöntemlerdendir.
CDC (Change Data Capture) Nedir
Temelde CDC bir tabloda yapılan insert, update, delete işlemlerini yakalayan ve veri üzerindeki bu değişiklikleri istenilen hedefe aktarmamızı sağlayan bir design pattern’dir. Modern veri tabanlarında bu işlem ilgili kaydın zaman, ilk durum ve son durum bilgilerinin log’lanmasıyla sağlanır.
Bu yazımızda Debezium kullanarak CDC pattern’i nasıl kullanabileceğimize bakalım.
Debezium Nedir
Debezium, Kafka kullanarak CDC pattern’i uygulamamızı sağlayan açık kaynak kodlu bir framework’tür. Bilinen birçok veri tabanı (MySQL, MongoDB, PostgreSQL, Oracle, SQL Server ve Cassandra gibi) Debezium ile çalışabilir.
Debezium, Kafka Connect cluster üzerinde çalışır, veri akışı sağlanmak istenen her bir kaynak veri tabanı veya tablo için Kafka Connect üzerinde bir Source Connector olmalıdır. Bu connector sayesinde Debezium kaynak veri tabanındaki işlemleri track ederek Kafka için anlamlı hale getirir ve Kafka’ya bir mesaj olarak iletir. İlgili topic’e gelen mesajlar Sink Connectorvasıtası ile hedef veri tabanına gönderilir.
CDC için Docker üzerinde bir Kafka instance Kafka Connect ve Debezium kurulumuna ihtiyacımız var. Kurulumlar için debezioum.io sitesindeki adımları takip edebilirsiniz:
Tutorial
This tutorial demonstrates how to use Debezium to monitor a MySQL database. As the data in the database changes, you…
debezium.io
Debezium için çalışma ortamımız hazır. Sıra geldi veri tabanı veya tablo bazında Source Connector’ların ve Sink Connector’ların oluşturulmasına:
Debezium’da oluşturduğumuz bir Source Connector Kafka üzerinde bir topic oluşmasını ve verideki değişikliklerin izlenerek bu topic’lere iletilmesini sağlar.
Sink Connector ise Kafka topic üzerine gelen mesajların consume ederek hedef veri tabanına iletir.
Kaynak veri tabanı SQL Server hedef PostgreSQL olarak 2 farklı database teknolojisi ile çalışan mikroservisler arasında Source ve Sink Connector’ları oluşturarak CDC’yi kullanalım;
Connector tanımları Debezium’un restApi’leri kullanılarak yapılır.
Debezium namespace’ine port forward yaparak bağlanalım;
$ CONNECT_POD_NAME=$(kubectl
get pod -n @namespace |
grep -i running | grep
^@debezium-connect-name|
awk ‘{ print $1 }’)
$ kubectl port-forward -n @namespace $CONNECT_POD_NAME 8083:8083
view rawportforward.cmd hosted with ❤ by GitHub
Source Connector konfigurasyonlarını yapalım;
“name”:”inventory-connector-mssql”,
“config”:{
“connector.class”:”io.debezium.connector.sqlserver.SqlServerConnector”,
“database.user”:”sa”,
“database.dbname”:”inventory”,
“database.history.kafka.bootstrap.servers”:”my-cluster-kafka-bootstrap:9092″,
“database.history.kafka.topic”:”schema-changes.inventory”,
“transforms”:”route”,
“database.server.name”:”mssql1″,
“database.port”:”1433″,
“transforms.route.type”:”org.apache.kafka.connect.transforms.RegexRouter”,
“transforms.route.regex”:”([^.]+)\\.([^.]+)\\.([^.]+)”,
“database.hostname”:”mssql”,
“database.password”:”Password!”,
“name”:”inventory-connector-mssql”,
“database.whitelist”:”inventory”,
“table.whitelist”:”dbo.customers”,
“transforms.route.replacement”:”inventory.$3″
view rawsourceconnector.json hosted with ❤ by GitHub
Bu ayarları kısaca özetleyecek olursak; my-cluster-kafka-bootstrap:9092 Kafka üzerinde, inventory veri tabanında bulunan dbo.customer tablosu için inventory. customer isminde bir topic oluştur ve bu tablodaki CRUD işlemleri bu topic’e yaz demek istiyoruz.
Sink Connector konfigürasyonlarını yapalım;
“name”: “jdbc-sink”,
“config”: {
“connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
“tasks.max”: “1”,
“topics”: “inventory.customers”,
“connection.url”: “jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw”,
“transforms”: “unwrap,route”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.unwrap.drop.tombstones”:”false”,
“transforms.route.type”: “org.apache.kafka.connect.transforms.RegexRouter”,
“transforms.route.regex”: “([^.]+)\\.([^.]+)”,
“transforms.route.replacement”: “$2”,
“auto.create”: “true”,
“insert.mode”: “upsert”,
“delete.enabled”: “true”,
“pk.fields”: “id”,
“table.whitelist”:”customers”,
“pk.mode”: “record_key”
}
}
view rawsinkconnector.json hosted with ❤ by GitHub
Sink Connector ayarlarını kısaca özetleyecek olursak;“ inventory. customers topic’ine gelen mesajları al, connection.url’de belirttiğim veri tabanına git, burada yoksa dbo.customers tablosunu oluştur, varsa bu tablo üzerine ilgili değişiklikleri ilet” demek istiyoruz.
Source ve Sink Connector konfigürasyonlarını tamamladıktan sonra bir rest client yardımıyla veya curl komutuyla post edelim;
: post source connector
curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” http://localhost:8083/connectors/ -d @source.json
: post sink connector
curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” http://localhost:8083/connectors/ -d @sink.json
: list connectors
curl -i -X GET -H “Accept:application/json” http://localhost:8083/connectors
: connector status
curl -i -X GET -H “Accept:application/json” http://localhost:8083/@connectorname/status
view rawpostsource.cmd hosted with ❤ by GitHub
SQL Server ve PostgreSQL kullanan mikroservisler arasında CDC kullanarak veri akışını sağlamış olduk. Mikroservis projelerinizde kullandığınız veri tabanı teknolojisine ve ihtiyaçlara göre Connector ayarlarını özelleştirebilirsiniz.
Kaynaklar:
https://debezium.io/documentation/reference/1.0/tutorial.html
https://strimzi.io/blog/2020/01/27/deploying-debezium-with-kafkaconnector-resource/
https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#