MongoDB Kafka Connector

Learn via video courses
Topics Covered

Overview

In today's world, companies are constantly generating large volumes of data. However, storing and processing this data can be a challenge. To address this challenge, companies often use multiple databases and tools to manage their data. Two of the most popular technologies used for data management are Apache Kafka and MongoDB.

The Kafka Connect MongoDB is a tool that enables users to integrate MongoDB with Kafka. In this article, we will discuss the Kafka Connector and why it is important.

What is Kafka?

Apache Kafka is an open-source distributed streaming platform used for building real-time data pipelines and streaming applications. It is designed to handle large volumes of data and can scale horizontally to handle more data as needed.

Kafka is often used as a messaging system or as a distributed log. It provides a publish-subscribe model that enables messages sent and received by multiple applications.

Why Kafka and MongoDB?

MongoDB is a popular NoSQL database that is used for managing unstructured data. It is known for its ability to scale horizontally and handle large volumes of data. Kafka and MongoDB are often used together to handle real-time data streams. Kafka provides a reliable and scalable messaging system, while MongoDB provides a flexible and scalable database for managing unstructured data.

Data is kept in Kafka in the form of topics, which are effective channels or feeds where messages are posted. One or more partitions, or ordered, immutable sequences of messages, make up each topic. Each message in a partition is given a different offset, which indicates where in the partition it is located. Message ordering across partitions is not guaranteed by Kafka, although it does ensure that it is retained within each partition.

Kafka Connect can be used to link unstructured databases like MongoDB to Kafka. Kafka Connect can be used with MongoDB to stream data updates from MongoDB to Kafka topics. From MongoDB's oplog (operation log), the Kafka connect MongoDB extracts data changes and broadcasts them as messages to Kafka topics. This makes it possible for other systems to subscribe to certain subjects and consume the data while also enabling real-time processing of data changes.

By integrating Kafka and MongoDB, users can stream data between the two technologies in real-time, enabling them to process data more quickly and efficiently. This can be particularly useful for applications that require real-time data processing, such as online gaming, financial trading, and social media.

Highlighted Resources

Users of the Kafka Connect MongoDB can benefit from a variety of tools, such as examples, tutorials, and documentation.

The installation, configuration, and use of the Kafka Connect MongoDB are all covered in great length in the documentation. Best practices and troubleshooting advice are also covered.

Example code that explains how to utilise the Connector is available in the Kafka Connect MongoDB GitHub repository. You can use this code as a springboard for creating your applications.

Updates on new features and releases are provided on the Kafka Connect MongoDB blog. It also offers advice and recommended methods for working with the Connector.

Sink Connector

The Kafka connect MongoDB Kafka Sink Connector is used to write data from Kafka to MongoDB. It enables users to stream data from Kafka to MongoDB in real-time.

The Sink Connector is easy to configure and can be used to write data to a MongoDB collection.

A real-life example where a Sink Connector is helpful is when you want to integrate a data pipeline with a data store like a database or a data warehouse.

Think of a shopping site that processes orders using Kafka as an example. Producers of Kafka topics receive the orders and write them to Kafka topics. Additionally, the e-commerce platform features a MongoDB database where customer, order, and product information is kept. A Sink Connector can be used to write the orders to the MongoDB database so that it is always up to date with the most recent orders.

Here is the way to configure the Kafka connect MongoDB Sink Connector to write data from Kafka to MongoDB:

This example sets up the Kafka connect MongoDB Sink Connector to write data from the "orders-topic" Kafka topic to the "orders" collection in the "sampledb" MongoDB database. Which fields from the Kafka messages should be written to which fields in the MongoDB documents is determined by the mapping logic.

Source Connector

The Kafka connect MongoDB Source Connector is used to read data from MongoDB and write it to Kafka. It enables users to stream data from MongoDB to Kafka in real-time.

The Source Connector is easy to configure and can be used to read data from a MongoDB collection.

Think of an e-commerce platform, for instance, that uses MongoDB to store consumer information. The customer information in MongoDB is updated whenever a new consumer signs up on the e-commerce site. Other systems on the e-commerce platform also require real-time access to customer data. A Source Connector can be used to read data from MongoDB and write it to Kafka so that it is accessible to other systems.

Here's an example of how to configure the Kafka connect MongoDB Source Connector to read data from MongoDB and write it to Kafka:

This example sets up the Kafka connect MongoDB Source Connector to read information from the "customers" collection in the "sampledb" database in MongoDB and post it to a Kafka topic with the "mongo-" prefix. The data should be converted from MongoDB documents to Kafka messages according to the mapping logic, which is specified. Once the connector is running, it will continually take data from MongoDB and write it to Kafka so that other systems can analyse and analyse it in real time.

Security, Authentication, and Monitoring

The Kafka Connect MongoDB supports several security features, including SSL/TLS encryption and MongoDB AWS-based Authentication.

To secure your connection, you must perform the following tasks:

  • Create the certificates.
  • Store the certificates on the worker host machine.
  • Supply the certificates' credentials to the connector.

Your Amazon Web Services Identity and Access Management (AWS IAM) login information is used by the MONGODB-AWS authentication mechanism to validate your user.

The following example demonstrates how to specify your MONGODB-AWS authentication credentials in the connection URI connector property:

The following is what the placeholder stands for:

  • AWS access key id: Value of AWS_ACCESS_KEY_ID.
  • AWS secret access key: Value of AWS_SECRET_KEY.
  • hostname: Network address of MongoDB server.
  • port: Port number of MongoDB server.
  • authentication database: Your user's authentication information is stored in a MongoDB database. The driver uses the admin value by default if the authSource option and placeholder are left empty.
  • AWS session token: It is optional. It is the Value of your AWS_SESSION_TOKEN. If an AWS session token is not required, leave the authMechanismProperties argument and default value empty.

In addition to security, the Connector also provides monitoring capabilities. Users can monitor the Connector's performance and track metrics such as throughput, latency, and error rates.

Migration from Community Connector

The Kafka Connect MongoDB is the successor to the Community Connector. Users who are currently using the Community Connector can migrate to the new Connector using a simple process.

Values containing the package at.grahsl.kafka.connect.mongodb should be replaced with those containing the package com.mongodb.kafka.connect.

The Kafka connect MongoDB sink connector class should be used in place of the connector.class setting.

The names of your Kafka Connect MongoDB properties should not begin with mongodb. Change mongodb.connection.uri, for instance, to connection.uri.

If the document.id.strategies setting is present, remove it. Move any custom strategies that are referenced by the value of this property to the document.id.strategy setting. To find out what modifications you need to make to your custom classes, see the section titled Update Custom Classes.

Mongodb.collection prefixes in property names used to provide per-topic or collection overrides should be swapped out for their equivalent keys in the kafka connect MongoDB sink connector Kafka topic configuration topic properties.

Imports, including at.grahsl.kafka.connect.mongodb, should be changed to com.mongodb.kafka.connect.

Change any references to MongoDbSinkConnector to those for MongoSinkConnector.

Update any customised sink connector strategy classes to support the com.mongodb.kafka.connect.sink.processor.id.strategy.IdStrategy interface.

The references to the MongoDbSinkConnectorConfig class should be updated. The logic from that class is divided into the following classes in the MongoDB Kafka connector:

  • MongoSinkConfig
  • MongoSinkTopicConfig

If you have certain classes that are a type of a Post Processor in your Kafka Connect connector setup, make sure to modify any methods within those classes that override the methods in the PostProcessor class used by the MongoDB Kafka connector so that they match the same method format.

Compatibility and Troubleshooting

The Kafka Connect MongoDB Sink Connector and the MongoDB Kafka Source Connector are compatible with MongoDB version 3.6 and higher.

Only modifications to collections can be listened for by the connector if you are running MongoDB v3.6. Use MongoDB v4.0 or later if the connector needs to listen for updates on deployment or database.

Confluent Kafka Connect v2.1.0+ is needed to use the MongoDB Kafka Connector. It has been tested with Apache Kafka versions 2.3 and up. Platforms that are compatible with Apache Kafka and Kafka Connect are:

  • Confluent Platform v5.3+
  • Microsoft Azure Event Hubs
  • Red Hat AMQ Streams

With Kafka Connect MongoDB, the connector functions directly.

While running the Kafka connect MongoDB, you may experience the Invalid Resume Token error.

You can know if you have encountered this issue by looking at the stack trace:

Your source connection does not know where to start processing your MongoDB change stream when the ID of its resume token does not match any item in your MongoDB deployment's oplog.

You can fix an invalid resume token by utilising one of the following methods:

Temporarily Tolerate Errors

Your source connection can be set up to tolerate errors while you create a change stream event that modifies the resume token of the connector. It is the easiest recovery method. However, there is a chance that your connector will momentarily overlook issues unrelated to the invalid resume token. You can choose to erase saved offsets if you do not feel comfortable with faults in your deployment for a short period.

To set up your source connector to accept faults momentarily:

Tolerate all errors by setting the errors.tolerance option to all:

Your connector's resume token is updated when you add, modify, or remove a document from the collection that your source connector uses as a point of reference.

Set the errors.tolerance option to no longer tolerate errors after producing a change stream event:

Reset Stored Offsets

To get your connector to start processing your change stream again, you can reset the offset data in your Kafka Connect instance that contains your resume token.

You can clear your offset data by setting the offset.partition.name configuration property's value to a partition name that does not already exist in your Kafka deployment. Your offset.partition.name property can be set as follows:

FAQs

Q1. Can the MongoDB Kafka Connector be used to connect to multiple MongoDB clusters?

Ans. Yes, the MongoDB Kafka Connector allows users to connect to multiple MongoDB clusters at the same time. This can be useful in scenarios where data is spread across different MongoDB clusters, and users want to consolidate it into a single Kafka topic.

Q2. Can the MongoDB Kafka Connector be used in a cloud environment?

Ans. Yes, the MongoDB Kafka Connector can be used in a cloud environment, such as Amazon Web Services or Microsoft Azure. The Connector is designed to be scalable and can handle large volumes of data, making it well-suited for cloud deployments. Additionally, the Connector supports SSL/TLS encryption, which can help to ensure the security of data being transferred over public networks.

Conclusion

  • Apache Kafka is an open-source distributed streaming platform used for building real-time data pipelines and streaming applications.
  • The MongoDB Kafka Connector is an important tool for integrating Kafka and MongoDB. It enables users to manage and process real-time data streams with ease.
  • The MongoDB Kafka Sink Connector is used to write data from Kafka to MongoDB.
  • The MongoDB Kafka Source Connector is used to read data from MongoDB and write it to Kafka.
  • The MongoDB Kafka connector provides security features, authentication and monitoring capabilities.