前沿拓展:
目的:測試使用OGG將數(shù)據(jù)單向同步到Kafka上。
簡要說明:Kafka使用單節(jié)點(diǎn)單Broker部署;單獨(dú)部署簡單ZooKeeper;需要使用到JAVA1.8;OGG需要2個(gè)版本,一個(gè)for oracle版本,一個(gè)Big Data。
1 環(huán)境說明
oracle
11.2.0.4
zookeeper
apache-zookeeper-3.6.2-bin.tar.gz
kafka
kafka_2.13-2.7.0.tgz
ogg for bigdata
OGG_BigData_Linux_x64_12.3.2.1.1.zip
ogg for oracle
fbo_ggs_Linux_x64_shiphome.zip
jdk
jdk-8u181-linux-x64.tar.gz
CentOS
CentOS-6.9-x86_64-bin-**D1
2 Java 環(huán)境安裝
[root@test01 ~]# tar -zxvf jdk-8u181-linux-x64.tar.gz -C /usr/local/
[root@test01 ~]# cd /usr/local/jdk1.8.0_181/bin/
[root@test01 bin]# ./java -version
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
添加環(huán)境變量:(在文件最后添加)
vi /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_181
export PATH=$JAVA_HOME/bin:$PATH
3 ZOOKEEPER安裝3.1 ZooKeeper 簡單安裝
[root@test01 ~] tar -zxvf apache-zookeeper-3.6.2-bin.tar.gz -c /usr/local
[root@test01 ~] mv /usr/local/apache-zookeeper-3.6.2-bin /usr/local/apache-zookeeper-3.6.2
3.2 編輯參數(shù)文件zoo.cfg
[root@test01 ~] cd /usr/local/apache-zookeeper-3.6.2
進(jìn)入Zookeeper的config目錄下
拷貝zoo_sample.cfg文件重命名為zoo.cfg,第二修改dataDir屬性,其他參數(shù)保持不變
[root@test01 conf]# cp zoo_sample.cfg zoo.cfg
[root@test01 conf]# vi zoo.cfg
# 數(shù)據(jù)的存放目錄
dataDir=/usr/local/apache-zookeeper-3.6.2/zkdata
# 端口,默認(rèn)就是2181
clientPort=2181
3.3 環(huán)境變量配置
[root@test01 ~] vi /etc/profile –在末尾添加
export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.6.2
export JAVA_HOME=/usr/local/jdk1.8.0_181
export PATH=$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$PATH
3.4 啟動(dòng)和停止zookeeper
[root@test01 bin]# cd /usr/local/apache-zookeeper-3.6.2/bin
[root@test01 bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /usr/local/apache-zookeeper-3.6.2/bin/../conf/zoo.cfg
Starting zookeeper … STARTED
$ [root@test01 bin]# zkServer.sh stop
3.5 JPS檢查進(jìn)程
[root@test01 bin]# jps
1971 QuorumPeerMain
5645 Jps
4 Kafka安裝4.1 Kafka 單節(jié)點(diǎn)部署
[root@test01 ~]# tar -zxvf kafka_2.13-2.7.0.tgz -C /usr/local/
4.2 配置Kafka參數(shù)
進(jìn)入kafka的config目錄下,有一個(gè)server.properties,添加如下配置
[root@test01 ~]# cd /usr/local/kafka_2.13-2.7.0/config
[root@test01 config]# vi server.properties
# broker的全局唯一編號,不能重復(fù)
broker.id=0
# **
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://192.168.141.107:9092
a**ertised.listeners=PLAINTEXT://192.168.141.107:9092
# 日志目錄
log.dirs=/usr/local/kafka_2.13-2.7.0/kafka-logs
# 配置zookeeper的連接(如果不是本機(jī),需要改為ip或主機(jī)名)
#zookeeper.connect=localhost:2181
zookeeper.connect=192.168.141.107:2181
4.3 添加環(huán)境變量
[root@test01 config]#vi /etc/profile
export ZOOKEEPER_HOME=/usr/local/apache-zookeeper-3.6.2
export JAVA_HOME=/usr/local/jdk1.8.0_181
export KAFKA_HOME=/usr/local/kafka_2.13-2.7.0
export LD_LIBRARY_PATH=$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOME/jre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:
export PATH=$KAFKA_HOME/bin:$ZOOKEEPER_HOME/bin:$JAVA_HOME/bin:$PATH
4.4 啟動(dòng)和停止Kafka
啟動(dòng):
[root@test01 bin]# cd /usr/local/kafka_2.13-2.7.0/bin
[root@test01 bin]#./ kafka-server-start.sh $KAFKA_HOME/config/server.properties &
停止:
[root@test01 bin]# ./kafka-server-stop.sh
4.5 JPS進(jìn)程查看
[root@test01 kafka_2.13-2.7.0]# jps
1971 QuorumPeerMain
2702 Jps
2287 Kafka
4.6 Kafka測試
[root@test01 ~]# cd /usr/local/kafka_2.13-2.7.0/bin
創(chuàng)建topic
[root@test01 bin]#./kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
Created topic test.
查看所有的topic信息
[root@test01 bin]# ./kafka-topics.sh –list –zookeeper localhost:2181
test
查看到返回值’test’ 說明創(chuàng)建成功!
5. OGG源端安裝5.1解壓安裝包
[oracle@oracle01 ~]$unzip fbo_ggs_Linux_x64_shiphome.zip
5.2安裝:(圖形化安裝略)
[oracle@oracle01 ~]$ cd fbo_ggs_Linux_x64_shiphome/Disk1
[oracle@oracle01 Disk1]$./runInstaller
…..此處略……..
使用圖形化安裝界面安裝完成后,目錄直接已經(jīng)生成好了??!
檢查安裝效果:
[oracle@oracle01 ogg12.2]$ ldd ggsci
linux-vdso.so.1 => (0x00007ffdb9efa000)
librt.so.1 => /lib64/librt.so.1 (0x0000003df0600000)
libdl.so.2 => /lib64/libdl.so.2 (0x0000003defa00000)
libgglog.so => /u01/ogg12.2/./libgglog.so (0x00007f02e8ccc000)
libggrepo.so => /u01/ogg12.2/./libggrepo.so (0x00007f02e8a5a000)
libdb-6.1.so => /u01/ogg12.2/./libdb-6.1.so (0x00007f02e8675000)
libggperf.so => /u01/ogg12.2/./libggperf.so (0x00007f02e8445000)
libggparam.so => /u01/ogg12.2/./libggparam.so (0x00007f02e733b000)
libicui18n.so.48 => /u01/ogg12.2/./libicui18n.so.48 (0x00007f02e6f4b000)
libicuuc.so.48 => /u01/ogg12.2/./libicuuc.so.48 (0x00007f02e6bca000)
libicudata.so.48 => /u01/ogg12.2/./libicudata.so.48 (0x00007f02e5405000)
libpthread.so.0 => /lib64/libpthread.so.0 (0x0000003df0200000)
libxerces-c.so.28 => /u01/ogg12.2/./libxerces-c.so.28 (0x00007f02e4e3e000)
libantlr3c.so => /u01/ogg12.2/./libantlr3c.so (0x00007f02e4c25000)
libnnz11.so => /u01/app/oracle/product/11.2.0/dbhome_1/lib/libnnz11.so (0x00007f02e4857000)
libclntsh.so.11.1 => /u01/app/oracle/product/11.2.0/dbhome_1/lib/libclntsh.so.11.1 (0x00007f02e1ded000)
libggnnzitp.so => /u01/ogg12.2/./libggnnzitp.so (0x00007f02e1696000)
libm.so.6 => /lib64/libm.so.6 (0x0000003df0a00000)
libc.so.6 => /lib64/libc.so.6 (0x0000003defe00000)
/lib64/ld-linux-x86-64.so.2 (0x0000003def600000)
libstdc++.so.6 => /usr/lib64/libstdc++.so.6 (0x0000003df3200000)
libgcc_s.so.1 => /lib64/libgcc_s.so.1 (0x0000003df2e00000)
libnsl.so.1 => /lib64/libnsl.so.1 (0x0000003df2a00000)
libaio.so.1 => /lib64/libaio.so.1 (0x00007f02e1492000)
初始化目錄:(圖形化安裝會(huì)生成目錄)
GGSCI (oracle01) 2> create SUBDIRS
Creating subdirectories under current directory /u01/ogg12.2
Parameter files /u01/ogg12.2/dirprm: already exists
Report files /u01/ogg12.2/dirrpt: already exists
Checkpoint files /u01/ogg12.2/dirchk: already exists
Process status files /u01/ogg12.2/dirpcs: already exists
SQL script files /u01/ogg12.2/dirsql: already exists
Database definitions files /u01/ogg12.2/dirdef: already exists
Extract data files /u01/ogg12.2/dirdat: already exists
Temporary files /u01/ogg12.2/dirtmp: already exists
Credential store files /u01/ogg12.2/dircrd: already exists
Masterkey wallet files /u01/ogg12.2/dirwlt: already exists
Dump files /u01/ogg12.2/dirdmp: already exists
5.3 檢查數(shù)據(jù)庫配置和更改參數(shù)5.3.1 源端數(shù)據(jù)需要處于歸檔模式,且開啟supplemental log和force_logging
SQL> archive log list;
Database log mode Archive Mode
Automatic archival Enabled
Archive destination USE_DB_RECOVERY_FILE_DEST
Oldest online log sequence 8
Next log sequence to archive 10
Current log sequence 10
SQL> select name,supplemental_log_data_min,force_logging from v$database;
NAME SUPPLEMENTAL_LOG_DATA_MI FORCE_LOG
————————— ———————— ———
TDB01 YES YES
如果需要更改,使用下面語句:
alter database archivelog;
Alter database force logging;
alter database add supplemental log data;
ALTER SYSTEM SET ENABLE_GOLDENGATE_REPLICATION = TRUE SCOPE=BOTH;
5.3.2 創(chuàng)建OGG用戶和單獨(dú)表空間
create tablespace ts_ogg datafile '/u01/app/oracle/oradata/tdb01/ts_ogg01.dbf' size 200M AUTOEXTEND on extent management local segment space management auto;
create user ggusr identified by ggusr default tablespace ts_ogg;
為了方便直接給予DBA權(quán)限:
grant resource,connect,dba to ggusr;
5.3.3 配置測試用戶
alter user scott identified by scott account unlock;
grant select_catalog_role to scott;
在scott用戶下創(chuàng)建新的表,做測試表:
create table test_ogg(id int ,name varchar(20),primary key(id));
5.4 配置MGR
GGSCI (oracle01) 3> edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
–AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
注釋:
PORT即mgr的默認(rèn)**端口;
DYNAMICPORTLIST動(dòng)態(tài)端口列表,當(dāng)指定的mgr端口不可用時(shí),會(huì)在這個(gè)端口列表中選擇一個(gè),最大指定范圍為256個(gè);
AUTORESTART重啟參數(shù)設(shè)置表示重啟所有EXTRACT進(jìn)程,最多5次,每次間隔3分鐘;
PURGEOLDEXTRACTS即TRAIL文件的定期清理
5.5 添加**附加日志信息
使用ogg用戶ggusr登錄
##GGSCI (oracle01) 5> dblogin userid ggusr, password ggusr
Successfully logged into database.
GGSCI (oracle01 as ggusr@tdb01) 6> info trandata scott.test_ogg
Logging of supplemental redo log data is disabled for table SCOTT.TEST_OGG.
GGSCI (oracle01 as ggusr@tdb01) 7> add trandata scott.test_ogg
Logging of supplemental redo data enabled for table SCOTT.TEST_OGG.
TRANDATA for sche**ng columns has been added on table 'SCOTT.TEST_OGG'.
TRANDATA for instantiation CSN has been added on table 'SCOTT.TEST_OGG'.
5.6 配置EXTRACT抽取進(jìn)程
GGSCI (oracle01) 18> edit param extkafka
extract extkafka
setenv (ORACLE_SID=tdb01)
setenv (ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1)
Setenv (NLS_LANG="AMERICAN_AMERICA.UTF8")
userid ggusr,password ggusr
exttrail ./dirdat/ka
table scott.test_ogg;
添加進(jìn)程:
–add extract extkafka,tranlog,begin now
綁定trail文件:
–add exttrail ./dirdat/ka,extract extkafka
5.7 配置PUMP投遞進(jìn)程(EXTRACT進(jìn)程的另一種用法)
GGSCI (oracle01) 19> edit param pukafka
extract pukafka
setenv (ORACLE_SID=tdb01)
setenv (ORACLE_HOME=/u01/app/oracle/product/11.2.0/dbhome_1)
passthru
dynamicresolution
userid ggusr,password ggusr
rmthost 192.168.141.107,mgrport 7809
rmttrail ./dirdat/pa
table scott.test_ogg;
注釋:
第一行指定extract進(jìn)程名稱;
passthru即禁止OGG與Oracle交互,我們這里使用pump邏輯傳輸,故禁止即可;
dynamicresolution動(dòng)態(tài)解析;
userid ogg,password ogg即OGG連接Oracle數(shù)據(jù)庫的帳號密碼
rmthost和mgrhost即目標(biāo)端(kafka)OGG的mgr服務(wù)的地址以及**端口;
rmttrail即目標(biāo)端trail文件存儲位置以及名稱
將本地文件和目標(biāo)端文件綁定到PUMP進(jìn)程中:
add extract pukafka,exttrailsource ./dirdat/ka
add rmttrail ./dirdat/pa,extract pukafka
5.8 配置define文件用來定義表之間的關(guān)系映射
oracle與MySQL,Hadoop集群(HDFS,Hive,kafka等)等之間數(shù)據(jù)傳輸可以定義為異構(gòu)數(shù)據(jù)類型的傳輸,需要定義表之間的關(guān)系映射
GGSCI (oracle01) 1> edit param test_ogg
defsfile /u01/ogg12.2/dirdef/scott.test_ogg
userid ggusr,password ggusr
table scott.test_ogg;
#GGSCI (oracle01) 3> view param test_ogg
defsfile /u01/ogg12.2/dirdef/scott.test_ogg
userid ggusr,password ggusr
table scott.test_ogg;
在OGG主目錄下執(zhí)行(oracle用戶):
./defgen paramfile dirprm/test_ogg.prm
—執(zhí)行后會(huì)在/u01/ogg12.2/dirdef目錄下生成相關(guān)文件scott.test_ogg
[oracle@oracle01 ogg12.2]$ ./defgen paramfile dirprm/test_ogg.prm
***********************************************************************
Oracle GoldenGate Table Definition Generator for Oracle
Version 12.2.0.1.1 OGGCORE_12.2.0.1.0_PLATFORMS_151211.1401
Linux, x64, 64bit (optimized), Oracle 11g on Dec 11 2015 21:37:21
Copyright (C) 1995, 2015, Oracle and/or its affiliates. All rights reserved.
Starting at 2021-03-10 11:02:19
***********************************************************************
Operating System Version:
Linux
Version #1 **P Tue May 10 17:27:01 UTC 2016, Release 2.6.32-642.el6.x86_64
Node: oracle01
Machine: x86_64
soft limit hard limit
Address Space Size : unlimited unlimited
Heap Size : unlimited unlimited
File Size : unlimited unlimited
CPU Time : unlimited unlimited
Process id: 2602
**********************************************************************
** Running with the following parameters **
***********************************************************************
defsfile /u01/ogg12.2/dirdef/scott.test_ogg
userid ggusr,password ***
table scott.test_ogg;
Retrieving definition for SCOTT.TEST_OGG.
Definitions generated for 1 table in /u01/ogg12.2/dirdef/scott.test_ogg.
將生成的文件/u01/ogg12.2/dirdef/scott.test_ogg 拷貝到目標(biāo)端ogg目錄下的dirdef目錄中?。。。?!
scp /u01/ogg12.2/dirdef/scott.test_ogg root@192.168.141.107:/u01/ogg12.3/dirdef
6.O GG目標(biāo)端(kafka)安裝6.1 確認(rèn)zookeeper服務(wù),kafka服務(wù)開啟
[root@test01 dirdef]# jps
3760 Jps
1971 QuorumPeerMain
2287 Kafka
6.2 配置MGR
GGSCI (test01) 3> edit param mgr
PORT 7809
DYNAMICPORTLIST 7810-7909
–AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 —暫時(shí)不要
PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
6.3 配置checkpoint
checkpoint即**可追溯的一個(gè)偏移量記錄,在全局配置里添加checkpoint表即可。
GGSCI (test01) 14>edit param ./GLOBALS
CHECKPOINTTABLE ggusr.checkpoint
6.4 配置replicate進(jìn)程
GGSCI (test01) 6> EDIT PARAMS rekafka
replicat rekafka
sourcedefs /u01/ogg12.3/dirdef/scott.test_ogg
TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
REPORTCOUNT EVERY 1 MINUTES, RATE
GROUPTRANSOPS 10000
map scott.test_ogg,target scott.test_ogg;
注釋:
REPLICATE rekafka定義rep進(jìn)程名稱;
sourcedefs 即是從源服務(wù)器上**過來的表映射文件;
TARGETDB LIBFILE即定義kafka一些適配性的庫文件以及配置文件,配置文件位于OGG主目錄下的dirprm/kafka.props;
REPORTCOUNT即**任務(wù)的報(bào)告生成頻率;
GROUPTRANSOPS為以事務(wù)傳輸時(shí),事務(wù)合并的單位,減少IO**作;
MAP即源端與目標(biāo)端的映射關(guān)系
6.5 配置文件kafka.props
注意:配置時(shí)需要將注釋刪除不然會(huì)報(bào)錯(cuò)?。?!
root@test01 ~]# cd /u01/ogg12.3/dirprm 編輯kafka.props
[root@test01 dirprm]# vi kafka.props
gg.handlerlist=kafkahandler //handler類型
gg.handler.kafkahandler.type=kafka
gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties //kafka相關(guān)配置
gg.handler.kafkahandler.topicMappingTemplate=test_ogg //kafka的topic名稱,無需手動(dòng)創(chuàng)建
gg.handler.kafkahandler.format=json //傳輸文件的格式,支持json,xml等
gg.handler.kafkahandler.mode=op //OGG for Big Data中傳輸模式,即op為一次SQL傳輸一次,tx為一次事務(wù)傳輸一次
##gg.classpath=dirprm/:/u01/ogg12.3/:/u01/ogg12.3/lib/*
一定要有kafka安裝的庫文件,不然會(huì)一直報(bào)錯(cuò)
gg.classpath=dirprm/:/u01/ogg12.3/*:/u01/ogg12.3/lib/*:/u01/ogg12.3/ggjava/resources/lib/*:/usr/local/kafka_2.13-2.7.0/libs/*
6.6 編輯文件custom_kafka_producer.properties
注意:配置時(shí)需要將注釋刪除
[root@test01 dirprm]#vi custom_kafka_producer.properties
bootstrap.servers=192.168.141.107:9092 //kafkabroker的地址
acks=1
compression.type=gzip //壓縮類型
reconnect.backoff.ms=1000 //重連延時(shí)
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
batch.size=102400
linger.ms=10000
6.7 添加trail文件到replicate進(jìn)程
GGSCI (test01) 14>add replicat rekafka exttrail ./dirdat/pa,checkpointtable ggusr.checkpoint
7. 測試7.1 啟動(dòng)相關(guān)進(jìn)程并檢查
源端啟動(dòng)mgr ,extkafka, pukafka
Start mgr
Start extkafka
Start pukafka
GGSCI (oracle01) 1> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
EXTRACT RUNNING EXTKAFKA 00:00:00 00:00:06
EXTRACT RUNNING PUKAFKA 00:00:00 00:00:00
目標(biāo)端啟動(dòng)mgr rekafka
Start mgr
Start rekafka
GGSCI (test01) 1> info all
Program Status Group Lag at Chkpt Time Since Chkpt
MANAGER RUNNING
REPLICAT RUNNING REKAFKA 00:00:00 00:00:07
7.2 數(shù)據(jù)插入,更改,和刪除測試
在源端數(shù)據(jù)上,使用scott用戶對表test_ogg做insert,update,delete**作
insert into test_ogg values(1,'test');
commit;
update test_ogg set name='zhangsan' where id=1;
commit;
delete from test_ogg where id=1;
Commit;
目標(biāo)端查看topics是否創(chuàng)建:
./kafka-topics.sh –list –zookeeper 192.168.141.107:2181
[root@test01 bin]# ./kafka-topics.sh –list –zookeeper 192.168.141.107:2181
test_ogg —文件中定義的名字, 出現(xiàn)說明同步正常
通過消費(fèi)者查看數(shù)據(jù)是否同步:
啟動(dòng)kafka消費(fèi)者,會(huì)在前臺掛起一個(gè)實(shí)時(shí)進(jìn)程,第二在源端對表進(jìn)程**作,會(huì)實(shí)時(shí)顯示在消費(fèi)者端!?。?/p>
[root@test01 bin]# ./kafka-console-consumer.sh –bootstrap-server 192.168.141.107:9092 –topic test_ogg –from-beginning
[root@test01 bin]# ./kafka-console-consumer.sh –bootstrap-server 192.168.141.107:9092 –topic test_ogg –from-beginning
{"table":"SCOTT.TEST_OGG","op_type":"I","op_ts":"2021-03-10 12:27:13.417910","current_ts":"2021-03-10T12:27:20.018000","pos":"00000000000000001817","after":{"ID":1,"NAME":"test"}}
{"table":"SCOTT.TEST_OGG","op_type":"U","op_ts":"2021-03-10 12:32:36.417849","current_ts":"2021-03-10T12:32:43.324000","pos":"00000000000000001945","before":{"ID":1,"NAME":"test"},"after":{"ID":1,"NAME":"zhangsan"}}
{"table":"SCOTT.TEST_OGG","op_type":"D","op_ts":"2021-03-10 15:19:28.414300","current_ts":"2021-03-10T15:19:35.464000","pos":"00000000000000002098","before":{"ID":1,"NAME":"zhangsan"}}
正常同步到Kafka,格式為json,
其中op_type代表**作類型,可配置或者使用默認(rèn)的配置:
gg.handler.kafkahandler.format.insertOpKey = I
gg.handler.kafkahandler.format.updateOpKey = U
gg.handler.kafkahandler.format.deleteOpKey = D
before代表**作之前的數(shù)據(jù),after代表**作后的數(shù)據(jù)!!
拓展知識:
原創(chuàng)文章,作者:九賢生活小編,如若轉(zhuǎn)載,請注明出處:http:///66309.html