我创建了以下Siddhi应用程序来捕获PostgreSQL数据库表中的数据更改。
@App:name('post')
@source(type = 'cdc' ,url = 'jdbc:postgresql://postgres:5432/shipment_db',
username = 'postgresuser', password = 'postgrespw',
table.name = 'public.shipments', operation = 'insert', plugin.name='pgoutput',slot.name='postslot',
@map(type='keyvalue', @attributes(shipment_id = 'shipment_id', order_id = 'order_id',date_created='date_created',status='status')))
define stream inputStream (shipment_id long, order_id long,date_created string, status string);
@sink(type = 'log')
define stream OutputStream (shipment_id long, date_created string);
@info(name = 'query1')
from inputStream
select shipment_id, date_created
insert into OutputStream;我将siddhi-io-cdc-2.0.12.jar和siddhi-core-5.1.21.jar放在./files/bundles目录中,org.wso2.car.si.emeics.core-3.0.57.jar和PostgreSQL42.3.3.jar中./files/jars目录中,并在https://siddhi.io/en/v5.1/docs/config-guide/#adding-to-siddhi-docker-microservice dockerfile中创建了一个名为siddhiimgpostgres的Docker映像。
下面是用于运行siddhi应用程序的docker命令。
docker run -it --net postgres-docker_default --rm -p 8006:8006 -v /home/me/siddhi-apps:/apps siddhiimgpostgres:tag1 -Dapps=/apps/post.siddhi 下面是我收到的日志。
[2022-08-24 06:35:43,975] INFO {io.debezium.relational.RelationalSnapshotChangeEventSource} - Snapshot step 7 - Snapshotting data
[2022-08-24 06:35:43,976] INFO {io.debezium.relational.RelationalSnapshotChangeEventSource} - Exporting data from table 'public.shipments'
[2022-08-24 06:35:43,976] INFO {io.debezium.relational.RelationalSnapshotChangeEventSource} - For table 'public.shipments' using select statement: 'SELECT * FROM "public"."shipments"'
[2022-08-24 06:35:43,995] INFO {io.debezium.relational.RelationalSnapshotChangeEventSource} - Finished exporting 11 records for table 'public.shipments'; total duration '00:00:00.019'
[2022-08-24 06:35:43,997] INFO {io.debezium.pipeline.source.AbstractSnapshotChangeEventSource} - Snapshot - Final stage
[2022-08-24 06:35:43,998] INFO {io.debezium.pipeline.ChangeEventSourceCoordinator} - Snapshot ended with SnapshotResult [status=COMPLETED, offset=PostgresOffsetContext [sourceInfoSchema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, sourceInfo=source_info[server='postgres_5432'db='shipment_db', lsn=LSN{0/16DCA30}, txId=592, timestamp=2022-08-24T06:35:43.994Z, snapshot=FALSE, schema=public, table=shipments], partition={server=postgres_5432}, lastSnapshotRecord=true, lastCompletelyProcessedLsn=null, lastCommitLsn=null, transactionContext=TransactionContext [currentTransactionId=null, perTableEventCount={}, totalEventCount=0]]]
[2022-08-24 06:35:44,001] INFO {io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics} - Connected metrics set to 'true'
[2022-08-24 06:35:44,001] INFO {io.debezium.pipeline.ChangeEventSourceCoordinator} - Starting streaming
[2022-08-24 06:35:44,001] INFO {io.debezium.connector.postgresql.PostgresStreamingChangeEventSource} - Retrieved latest position from stored offset 'LSN{0/16DCA30}'
[2022-08-24 06:35:44,002] INFO {io.debezium.connector.postgresql.connection.WalPositionLocator} - Looking for WAL restart position for last commit LSN 'null' and last change LSN 'LSN{0/16DCA30}'
[2022-08-24 06:35:44,002] INFO {io.debezium.connector.postgresql.connection.PostgresReplicationConnection} - Initializing PgOutput logical decoder publication
[2022-08-24 06:35:44,017] INFO {io.debezium.connector.postgresql.connection.PostgresConnection} - Obtained valid replication slot ReplicationSlot [active=false, latestFlushedLsn=LSN{0/16DB220}, catalogXmin=585]
[2022-08-24 06:35:44,021] INFO {io.debezium.jdbc.JdbcConnection} - Connection gracefully closed
[2022-08-24 06:35:44,072] INFO {io.debezium.connector.postgresql.PostgresSchema} - REPLICA IDENTITY for 'public.shipments' is 'DEFAULT'; UPDATE and DELETE events will contain previous values only for PK columns
[2022-08-24 06:35:44,073] INFO {io.debezium.connector.postgresql.PostgresStreamingChangeEventSource} - Searching for WAL resume position我只获取表中数据记录的数量的日志。我能知道为什么我没有得到Siddhi日志中哪些数据在数据库表中?
谢谢!
发布于 2022-09-13 06:07:52
它是否在启动服务器后捕获更改的数据?如果是,问题应该是在创建连接时配置的快照mode1。默认情况下,它被设置为initial,它将在连接建立处获取快照,并开始从该位置继续读取已更改的数据。
https://stackoverflow.com/questions/73468695
复制相似问题