Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP MongoDB guide
  • Loading branch information
bobbyiliev committed Oct 30, 2023
commit 522e80890bdaa7883fcf841d48c48d649377325b
2 changes: 1 addition & 1 deletion integrations/debezium/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This is a collection of demos that show how to use the [Debezium](https://materi
| [Postgres](postgres) | Connect to a Postgres database and stream changes to Kafka/Redpanda | [Postgres](https://materialize.com/docs/ingest-data/cdc-postgres-kafka-debezium/) |
| [MySQL](mysql) | Connect to a MySQL database and stream changes to Kafka/Redpanda | [MySQL](https://materialize.com/docs/ingest-data/cdc-mysql/) |
| [SQL server](sqlserver) | Connect to a SQL server database and stream changes to Kafka/Redpanda | TODO |
<!-- | TODO: [MongoDB](mongodb) | Connect to a MongoDB database and stream changes to Kafka/Redpanda | TODO | -->
| [MongoDB](mongodb) | Connect to a MongoDB database and stream changes to Kafka/Redpanda | WIP: This demo is not yet complete. |

## Prerequisites

Expand Down
180 changes: 180 additions & 0 deletions integrations/debezium/mongodb/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# Debezium + MongoDB + Materialize

> {notice}: WIP: This demo is a work in progress. It is not yet ready for use.

- When trying to create a source with envelope debezium, the following error is thrown:

```sql
ERROR: 'before' column must be of type record
```

An example of the records generated:

- Insert:
```sql
-[ RECORD 1 ]-----+------------------------------------------------------------------------------------------------------------------
id | 102
before |
after | {"_id":{"$numberLong":"102"},"description":"12V car battery","name":"car battery","quantity":8,"weight":8.1}
updateDescription |
source | (2.4.0.Final,mongodb,dbserver1,0,true,inventory,,rs0,products,-1,,,)
op | r
ts_ms | 1698676832093
transaction |
```

- Update:
```sql
-[ RECORD 2 ]-----+------------------------------------------------------------------------------------------------------------------
id | 101
before | {"_id":{"$numberLong":"101"},"description":"Small 2-wheel scooter","name":"scooter","quantity":3,"weight":3.14}
after | {"_id":{"$numberLong":"101"},"description":"Updated 2-wheel scooter","name":"scooter","quantity":3,"weight":3.14}
updateDescription | (,"{""description"":""Updated 2-wheel scooter""}",)
source | (2.4.0.Final,mongodb,dbserver1,1698676924000,false,inventory,,rs0,products,1,,,1698676924686)
op | u
ts_ms | 1698676924707
transaction |
```

---

Before trying this out, you will need the following:

- [Materialize account](https://materialize.com/register/).
- A publicly accessible Linux server with [Docker](https://docs.docker.com/get-docker/) and [Docker Compose](https://docs.docker.com/compose/install/) installed.

## Running the demo

If you want to try it right now, follow these steps:

1. Clone the project on your Linux server and run:

```shell session
git clone https://github.com/MaterializeInc/demos.git
cd demos/integrations/debezium/mongodb
```

1. After cloning the project, you will need to set the `EXTERNAL_IP` environment variable to the IP address of your Linux server. For example:

```shell session
export EXTERNAL_IP=$(hostname -I | awk '{print $1}')

# Check the value of EXTERNAL_IP
echo $EXTERNAL_IP
```

1. Bring up only the MongoDB container in the background.

```shell session
docker compose up -d --build mongodb
```

1. Initialize the MongoDB replica set.

```shell session
docker compose exec mongodb bash -c "/usr/local/bin/init-inventory.sh"
```

1. Bring up the rest of the Docker containers in the background.

```shell session
docker compose up -d --build
```

**This may take one or two minutes to complete the first time you run it.** If all goes well, you'll have everything running in their own containers, with Debezium configured to ship changes from Mongo into Redpanda.

1. Confirm that everything is running as expected:

```shell session
docker compose ps
```

1. Exec in to the redpanda container to look around using redpanda's amazing [rpk](https://docs.redpanda.com/docs/reference/rpk/) CLI.

```shell session
docker compose exec redpanda /bin/bash

rpk debug info

rpk topic list
```
1. Connect to Materialize

If you already have `psql` installed on your machine, use the provided connection string to connect:

Example:

```shell session
psql "postgres://user%40domain.com@materialize_host:6875/materialize"
```

Otherwise, you can find the steps to install and use your CLI of choice under [Supported tools](https://materialize.com/docs/integrations/sql-clients/#supported-tools).

1. Now that you're in the Materialize, define the connection to the Redpanda broker and the schema registry:

```sql
-- Create Redpanda connection
CREATE CONNECTION redpanda_connection
TO KAFKA (
BROKER '<your_server_ip:9092>');

-- Create Registry connection
CREATE CONNECTION schema_registry
TO CONFLUENT SCHEMA REGISTRY (
URL 'http://<your_server_ip:8081>');
```

1. Next, define all of the tables in `demo` as sources:

```sql
CREATE SOURCE customers
FROM KAFKA CONNECTION redpanda_connection (TOPIC 'dbserver1.inventory.customers')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
ENVELOPE DEBEZIUM
WITH (SIZE = '3xsmall');

CREATE SOURCE orders
FROM KAFKA CONNECTION redpanda_connection (TOPIC 'dbserver1.inventory.orders')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
ENVELOPE DEBEZIUM
WITH (SIZE = '3xsmall');

CREATE SOURCE products
FROM KAFKA CONNECTION redpanda_connection (TOPIC 'dbserver1.inventory.products')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION schema_registry
ENVELOPE DEBEZIUM
WITH (SIZE = '3xsmall');
```

Because the three sources are pulling message schema data from the registry, materialize knows the column types to use for each attribute.


1. Select from one of sources to see the data:

```sql
SELECT * FROM customers LIMIT 5;
```

## Cleanup

To stop the services and remove the containers, run:

```shell session
docker compose down
```

In Materialize, run:

```sql
DROP CONNECTION redpanda_connection CASCADE;
DROP CONNECTION schema_registry CASCADE;
```

## Helpful resources:

* [`CREATE SOURCE`](https://materialize.com/docs/sql/create-source)
* [`CREATE MATERIALIZED VIEW`](https://materialize.com/docs/sql/create-materialized-view)

## Community

If you have any questions or comments, please join the [Materialize Slack Community](https://materialize.com/s/chat)!
49 changes: 49 additions & 0 deletions integrations/debezium/mongodb/connect/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
FROM debezium/connect-base:2.4

#
# Set up the plugins directory ...
#
ENV CONFLUENT_VERSION=7.0.1 \
AVRO_VERSION=1.10.1 \
GUAVA_VERSION=31.0.1-jre

RUN docker-maven-download confluent kafka-connect-avro-converter "$CONFLUENT_VERSION" fd03a1436f29d39e1807e2fb6f8e415a && \
docker-maven-download confluent kafka-connect-avro-data "$CONFLUENT_VERSION" d27f30e9eca4ef1129289c626e9ce1f1 && \
docker-maven-download confluent kafka-avro-serializer "$CONFLUENT_VERSION" c72420603422ef54d61f493ca338187c && \
docker-maven-download confluent kafka-schema-serializer "$CONFLUENT_VERSION" 9c510db58119ef66d692ae172d5b1204 && \
docker-maven-download confluent kafka-schema-registry-client "$CONFLUENT_VERSION" 7449df1f5c9a51c3e82e776eb7814bf1 && \
docker-maven-download confluent common-config "$CONFLUENT_VERSION" aab5670de446af5b6f10710e2eb86894 && \
docker-maven-download confluent common-utils "$CONFLUENT_VERSION" 74bf5cc6de2748148f5770bccd83a37c && \
docker-maven-download central org/apache/avro avro "$AVRO_VERSION" 35469fee6d74ecbadce4773bfe3a204c && \
docker-maven-download central com/google/guava guava "$GUAVA_VERSION" bb811ca86cba6506cca5d415cd5559a7

# https://github.com/debezium/container-images/blob/main/connect/2.4/Dockerfile
LABEL maintainer="Debezium Community"

ENV DEBEZIUM_VERSION="2.4.0.Final" \
MAVEN_REPO_CENTRAL="" \
MAVEN_REPOS_ADDITIONAL="" \
MAVEN_DEP_DESTINATION=$KAFKA_CONNECT_PLUGINS_DIR \
MONGODB_MD5=a22784387e0ec8a6abb1606c2c365cb2 \
MYSQL_MD5=4bff262afc9678f5cbc3be6315b8e71e \
POSTGRES_MD5=b42c9e208410f39ad1ad09778b1e3f03 \
SQLSERVER_MD5=9b8bf3c62a7c22c465a32fa27b3cffb5 \
ORACLE_MD5=21699814400860457dc2334b165882e6 \
DB2_MD5=0727d7f2d1deeacef39e230acac835a8 \
SPANNER_MD5=186b07595e914e9139941889fd675044 \
VITESS_MD5=3b4d24c8c9898df060c408a13fd3429f \
JDBC_MD5=77c5cb9adf932ab17c041544f4ade357 \
KCRESTEXT_MD5=25c0353f5a7304b3c4780a20f0f5d0af \
SCRIPTING_MD5=53a3661e7a9877744f4a30d6483d7957

RUN docker-maven-download debezium mongodb "$DEBEZIUM_VERSION" "$MONGODB_MD5" && \
docker-maven-download debezium mysql "$DEBEZIUM_VERSION" "$MYSQL_MD5" && \
docker-maven-download debezium postgres "$DEBEZIUM_VERSION" "$POSTGRES_MD5" && \
docker-maven-download debezium sqlserver "$DEBEZIUM_VERSION" "$SQLSERVER_MD5" && \
docker-maven-download debezium oracle "$DEBEZIUM_VERSION" "$ORACLE_MD5" && \
docker-maven-download debezium-additional db2 db2 "$DEBEZIUM_VERSION" "$DB2_MD5" && \
docker-maven-download debezium-additional jdbc jdbc "$DEBEZIUM_VERSION" "$JDBC_MD5" && \
docker-maven-download debezium-additional spanner spanner "$DEBEZIUM_VERSION" "$SPANNER_MD5" && \
docker-maven-download debezium-additional vitess vitess "$DEBEZIUM_VERSION" "$VITESS_MD5" && \
docker-maven-download debezium-optional connect-rest-extension "$DEBEZIUM_VERSION" "$KCRESTEXT_MD5" && \
docker-maven-download debezium-optional scripting "$DEBEZIUM_VERSION" "$SCRIPTING_MD5"
10 changes: 10 additions & 0 deletions integrations/debezium/mongodb/deploy/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
FROM ubuntu:latest

RUN apt-get update && apt-get -qy install curl

COPY . /deploy

COPY docker-entrypoint.sh /usr/local/bin
RUN chmod 777 /usr/local/bin/docker-entrypoint.sh

ENTRYPOINT ["docker-entrypoint.sh"]
7 changes: 7 additions & 0 deletions integrations/debezium/mongodb/deploy/docker-entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

set -euo pipefail

cd /deploy

bash mongo_dbz.sh
25 changes: 25 additions & 0 deletions integrations/debezium/mongodb/deploy/mongo_dbz.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

#Initialize Debezium (Kafka Connect Component)

while true; do
echo "Waiting for Debezium to be ready"
sleep 0.1
curl -s -o /dev/null -w "%{http_code}" http://debezium:8083/connectors/ | grep 200
if [ $? -eq 0 ]; then
echo "Debezium is ready"
break
fi
done

# Read the JSON file and register the connector and change the ${EXTERNAL_IP} with the external IP environment variable
sed -i "s/EXTERNAL_IP/${EXTERNAL_IP}/g" /deploy/register-mongodb.json

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://debezium:8083/connectors/ -d @/deploy/register-mongodb.json

if [ $? -eq 0 ]; then
echo "Debezium connector registered"
else
echo "Debezium connector registration failed"
exit 1
fi
18 changes: 18 additions & 0 deletions integrations/debezium/mongodb/deploy/register-mongodb.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max" : "1",
"topic.prefix" : "dbserver1",
"mongodb.user" : "debezium",
"mongodb.password" : "dbz",
"mongodb.connection.string": "mongodb://mongodb:27017/?replicaSet=rs0",
"capture.mode": "change_streams_update_full_with_pre_image",
"database.include.list" : "inventory",
"database.history.kafka.bootstrap.servers": "138.68.72.112:9092",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://138.68.72.112:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://138.68.72.112:8081"
}
}
66 changes: 66 additions & 0 deletions integrations/debezium/mongodb/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
version: '3.9'
services:
redpanda:
image: docker.vectorized.io/vectorized/redpanda:v21.10.1
container_name: redpanda
command:
- redpanda start
- --overprovisioned
- --smp 1
- --memory 1G
- --reserve-memory 0M
- --node-id 0
- --check=false
- --kafka-addr 0.0.0.0:9092
- --advertise-kafka-addr ${EXTERNAL_IP:-redpanda}:9092
- --pandaproxy-addr 0.0.0.0:8082
- --advertise-pandaproxy-addr ${EXTERNAL_IP:-redpanda}:9092
- --set redpanda.enable_transactions=true
- --set redpanda.enable_idempotence=true
ports:
- 9092:9092
- 8081:8081
- 8082:8082
healthcheck: {test: curl -f localhost:9644/v1/status/ready, interval: 1s, start_period: 30s}
mongodb:
# image: quay.io/debezium/example-mongodb:2.4
build: ./mongodb
hostname: mongodb
ports:
- 27017:27017
environment:
- MONGODB_USER=debezium
- MONGODB_PASSWORD=dbz
# volumes:
# - ./mongodb/mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js:ro
healthcheck:
test: ["CMD", "mongosh", "--eval", "db.adminCommand('ping')"]
interval: 1s
timeout: 10s
retries: 5
debezium:
#image: debezium/connect:2.4
build: ./connect
container_name: debezium
environment:
BOOTSTRAP_SERVERS: ${EXTERNAL_IP:-redpanda}:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_status
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://${EXTERNAL_IP:-redpanda}:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://${EXTERNAL_IP:-redpanda}:8081
ports:
- 8083:8083
healthcheck: {test: curl -f localhost:8083, interval: 1s, start_period: 120s}
depends_on:
redpanda: {condition: service_healthy}
mongodb: {condition: service_healthy}
debezium_deploy:
build: ./deploy
depends_on:
redpanda: {condition: service_healthy}
mongodb: {condition: service_healthy}
debezium: {condition: service_healthy}
environment:
- EXTERNAL_IP=${EXTERNAL_IP:-redpanda}
14 changes: 14 additions & 0 deletions integrations/debezium/mongodb/mongodb/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM mongo:6.0.11

LABEL maintainer="Debezium Community"

COPY init-inventory.sh /usr/local/bin/
RUN chmod +x /usr/local/bin/init-inventory.sh

# Starting with MongoDB 4.4 the authentication enabled MongoDB requires a key
# for intra-replica set communication
RUN openssl rand -base64 756 > /etc/mongodb.keyfile &&\
chown mongodb /etc/mongodb.keyfile &&\
chmod 400 /etc/mongodb.keyfile

CMD ["mongod", "--replSet", "rs0", "--auth", "--keyFile", "/etc/mongodb.keyfile"]
Loading