网站建设是基于,Wordpress虚拟域名,黑龙江省建筑信息平台,免费设计素材摘要#xff1a;随着数据规模的不断扩大#xff0c;传统的RDBMS难以满足OLAP的需求#xff0c;本文将介绍如何将Oracle的数据实时同步到阿里云的大数据处理平台当中#xff0c;并利用大数据工具对数据进行分析。一、背景介绍随着数据规模的不断扩大#xff0c;传统的RDBMS…摘要随着数据规模的不断扩大传统的RDBMS难以满足OLAP的需求本文将介绍如何将Oracle的数据实时同步到阿里云的大数据处理平台当中并利用大数据工具对数据进行分析。
一、背景介绍随着数据规模的不断扩大传统的RDBMS难以满足OLAP的需求本文将介绍如何将Oracle的数据实时同步到阿里云的大数据处理平台当中并利用大数据工具对数据进行分析。OGGOracle GoldenGate是一个基于日志的结构化数据备份工具一般用于Oracle数据库之间的主从备份以及Oracle数据库到其他数据库DB2, MySQL等的同步。下面是Oracle官方提供的一个OGG的整体架构图从图中可以看出OGG的部署分为源端和目标端两部分组成主要有ManagerExtractPumpCollectorReplicat这么一些组件。Manager在源端和目标端都会有且只有一个Manager进程存在负责管理其他进程的启停和监控等Extract负责从源端数据库表或者事务日志中捕获数据有初始加载和增量同步两种模式可以配置初始加载模式是直接将源表数据同步到目标端而增量同步就是分析源端数据库的日志将变动的记录传到目标端本文介绍的是增量同步的模式PumpExtract从源端抽取的数据会先写到本地磁盘的Trail文件Pump进程会负责将Trail文件的数据投递到目标端Collector目标端负责接收来自源端的数据生成Trail文件Replicat负责读取目标端的Trail文件转化为相应的DDL和DML语句作用到目标数据库实现数据同步。本文介绍的Oracle数据同步是通过OGG的Datahub插件实现的该Datahub插件在架构图中处于Replicat的位置会分析Trail文件将数据的变化记录写入Datahub中可以使用流计算对datahub中的数据进行实时分析也可以将数据归档到MaxCompute中进行离线处理。二、安装步骤0. 环境要求源端已安装好Oracle源端已安装好OGG建议版本Oracle GoldenGate V12.1.2.1目标端已安装好OGG Adapters建议版本Oracle GoldenGate Application Adapters 12.1.2.1java 7下面将介绍Oracle/OGG相关安装和配置过程Oracle的安装将不做介绍另外需要注意的是Oracle/OGG相关参数配置以熟悉Oracle/OGG的运维人员配置为准本示例只是提供一个可运行的样本Oracle所使用的版本为ORA11g1. 源端OGG安装下载OGG安装包解压后有如下目录drwxr-xr-x install
drwxrwxr-x response
-rwxr-xr-x runInstaller
drwxr-xr-x stage目前oracle一般采取response安装的方式在response/oggcore.rsp中配置安装依赖具体如下oracle.install.responseFileVersion/oracle/install/rspfmt_ogginstall_response_schema_v12_1_2
# 需要目前与oracle版本对应
INSTALL_OPTIONORA11g
# goldegate主目录
SOFTWARE_LOCATION/home/oracle/u01/ggate
# 初始不启动manager
START_MANAGERfalse
# manger端口
MANAGER_PORT7839
# 对应oracle的主目录
DATABASE_LOCATION/home/oracle/u01/app/oracle/product/11.2.0/dbhome_1
# 暂可不配置
INVENTORY_LOCATION
# 分组目前暂时将oracle和ogg用同一个账号ogg_test实际可以给ogg单独账号
UNIX_GROUP_NAMEoinstall执行命令runInstaller -silent -responseFile {YOUR_OGG_INSTALL_FILE_PATH}/response/oggcore.rsp本示例中安装后OGG的目录在/home/oracle/u01/ggate安装日志在/home/oracle/u01/ggate/cfgtoollogs/oui目录下当silentInstall{时间}.log文件里出现如下提示表明安装成功The installation of Oracle GoldenGate Core was successful.执行/home/oracle/u01/ggate/ggsci命令并在提示符下键入命令CREATE SUBDIRS从而生成ogg需要的各种目录dir打头的那些。至此源端OGG安装完成。2. 源端Oracle配置以dba分身进入sqlplussqlplus / as sysdba# 创建独立的表空间
create tablespace ATMV datafile /home/oracle/u01/app/oracle/oradata/uprr/ATMV.dbf size 100m autoextend on next 50m maxsize unlimited;# 创建ogg_test用户密码也为ogg_test
create user ogg_test identified by ogg_test default tablespace ATMV;# 给ogg_test赋予充分的权限
grant connect,resource,dba to ogg_test;# 检查附加日志情况
Select SUPPLEMENTAL_LOG_DATA_MIN, SUPPLEMENTAL_LOG_DATA_PK, SUPPLEMENTAL_LOG_DATA_UI, SUPPLEMENTAL_LOG_DATA_FK, SUPPLEMENTAL_LOG_DATA_ALL from v$database;# 增加数据库附加日志及回退
alter database add supplemental log data;
alter database add supplemental log data (primary key, unique,foreign key) columns;
# rollback
alter database drop supplemental log data (primary key, unique,foreign key) columns;
alter database drop supplemental log data;# 全字段模式注意在该模式下的delete操作也只有主键值
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
# 开启数据库强制日志模式
alter database force logging;
# 执行marker_setup.sql 脚本
marker_setup.sql
# 执行ddl_setup.sql
ddl_setup.sql
# 执行role_setup.sql
role_setup.sql
# 给ogg用户赋权
grant GGS_GGSUSER_ROLE to ogg_test;
# 执行ddl_enable.sql开启DDL trigger
ddl_enable.sql
# 执行优化脚本
ddl_pin ogg_test
# 安装sequence support
sequence.sql
#
alter table sys.seq$ add supplemental log data (primary key) columns;3. OGG源端mgr配置以下是通过ggsci对ogg进行配置配置mgredit params mgrPORT 7839
DYNAMICPORTLIST 7840-7849
USERID ogg_test, PASSWORD ogg_test
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
PURGEDDLHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7启动mgr运行日志在ggate/dirrpt中start mgr查看mgr状态info mgr查看mgr配置view params mgr4. OGG源端extract配置以下是通过ggsci对ogg进行配置配置extract名字可任取extract是组名edit params extractEXTRACT extract
SETENV (NLS_LANGAMERICAN_AMERICA.AL32UTF8)
DBOPTIONS ALLOWUNUSEDCOLUMN
USERID ogg_test, PASSWORD ogg_test
REPORTCOUNT EVERY 1 MINUTES, RATE
NUMFILES 5000
DISCARDFILE ./dirrpt/ext_test.dsc, APPEND, MEGABYTES 100
DISCARDROLLOVER AT 2:00
WARNLONGTRANS 2h, CHECKINTERVAL 3m
EXTTRAIL ./dirdat/st, MEGABYTES 200
DYNAMICRESOLUTION
TRANLOGOPTIONS CONVERTUCS2CLOBS
TRANLOGOPTIONS RAWDEVICEOFFSET 0
DDL
INCLUDE MAPPED OBJTYPE table
INCLUDE MAPPED OBJTYPE index
INCLUDE MAPPED OBJTYPE SEQUENCE
EXCLUDE OPTYPE COMMENT
DDLOPTIONS NOCROSSRENAME REPORT
TABLE OGG_TEST.*;
SEQUENCE OGG_TEST.*;GETUPDATEBEFORES增加extract进程ext后的名字要跟上面extract对应本例中extract是组名add ext extract,tranlog, begin now删除某废弃进程DP_TESTdelete ext DP_TEST添加抽取进程每个队列文件大小为200madd exttrail ./dirdat/st,ext extract, megabytes 200启动抽取进程运行日志在ggate/dirrpt中start extract extract至此extract配置完成数据库的一条变更可以在ggate/dirdat目录下的文件中看到5. 生成def文件源端ggsci起来后执行如下命令生成defgen文件,并且拷贝到目标端dirdef下edit params defgenDEFSFILE ./dirdef/ogg_test.def
USERID ogg_test, PASSWORD ogg_test
table OGG_TEST.*;在shell中执行如下命令生成ogg_test.def./defgen paramfile ./dirprm/defgen.prm6. 目标端OGG安装和配置解压adapter包将源端中dirdef/ogg_test.def文件拷贝到adapter的dirdef下执行ggsci起来后执行如下命令创建必须目录create subdirs编辑mgr配置edit params mgrPORT 7839
DYNAMICPORTLIST 7840-7849
PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 7
LAGREPORTHOURS 1
LAGINFOMINUTES 30
LAGCRITICALMINUTES 45
PURGEDDLHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7
PURGEMARKERHISTORY MINKEEPDAYS 3, MAXKEEPDAYS 7启动mgrstart mgr7. 源端ogg pump配置启动ggsci后执行如下操作编辑pump配置edit params pumpEXTRACT pump
RMTHOST xx.xx.xx.xx, MGRPORT 7839, COMPRESS
PASSTHRU
NUMFILES 5000
RMTTRAIL ./dirdat/st
DYNAMICRESOLUTION
TABLE OGG_TEST.*;
SEQUENCE OGG_TEST.*;添加投递进程从某一个队列开始投add ext pump,exttrailsource ./dirdat/st备注投递进程每个队文件大小为200madd rmttrail ./dirdat/st,ext pump,megabytes 200启动pumpstart pump启动后结合上面adapter的配置可以在目标端的dirdat目录下看到过来的trailfile8. Datahub插件安装和配置依赖环境jdk1.7。配置好JAVA_HOME, LD_LIBRARY_PATH可以将环境变量配置到~/.bash_profile中例如export JAVA_HOME/xxx/xxx/jrexx
export LD_LIBRARY_PATH${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/server修改环境变量后请重启adapter的mgr进程下载datahub-ogg-plugin.tar.gz并解压:修改conf路径下的javaue.properties文件将{YOUR_HOME}替换为解压后的路径gg.handlerlistggdatahubgg.handler.ggdatahub.typecom.aliyun.odps.ogg.handler.datahub.DatahubHandler
gg.handler.ggdatahub.configureFileName{YOUR_HOME}/datahub-ogg-plugin/conf/configure.xmlgoldengate.userexit.nochkptfalse
goldengate.userexit.timestamputcgg.classpath{YOUR_HOME}/datahub-ogg-plugin/lib/*
gg.log.leveldebugjvm.bootoptions-Xmx512m -Dlog4j.configurationfile:{YOUR_HOME}/datahub-ogg-plugin/conf/log4j.properties -Djava.class.pathggjava/ggjava.jar修改conf路径下的log4j.properties文件将{YOUR_HOME}替换为解压后的路径修改conf路径下的configure.xml文件修改方式见文件中的注释?xml version1.0 encodingUTF-8?
configuedefaultOracleConfigure!-- oracle sid, 必选--sid100/sid!-- oracle schema, 可以被mapping中的oracleSchema覆盖, 两者必须有一个非空--schemaogg_test/schema/defaultOracleConfiguredefalutDatahubConfigure!-- datahub endpoint, 必填--endPointYOUR_DATAHUB_ENDPOINT/endPoint!-- datahub project, 可以被mapping中的datahubProject, 两者必须有一个非空--projectYOUR_DATAHUB_PROJECT/project!-- datahub accessId, 可以被mapping中的datahubAccessId覆盖, 两者必须有一个非空--accessIdYOUR_DATAHUB_ACCESS_ID/accessId!-- datahub accessKey, 可以被mapping中的datahubAccessKey覆盖, 两者必须有一个非空--accessKeyYOUR_DATAHUB_ACCESS_KEY/accessKey!-- 数据变更类型同步到datahub对应的字段可以被columnMapping中的ctypeColumn覆盖 --ctypeColumnoptype/ctypeColumn!-- 数据变更时间同步到datahub对应的字段可以被columnMapping中的ctimeColumn覆盖 --ctimeColumnreadtime/ctimeColumn!-- 数据变更序号同步到datahub对应的字段, 按数据变更先后递增, 不保证连续, 可以被columnMapping中的cidColumn覆盖 --cidColumnrecord_id/cidColumn
!-- 额外增加的常量列每条record该列值为指定值格式为c1xxx,c2xxx可以被columnMapping中的constColumnMap覆盖--constColumnMap/constColumnMap/defalutDatahubConfigure!-- 默认最严格不落文件 直接退出 无限重试--!-- 运行每批上次的最多纪录数, 可选, 默认1000--batchSize1000/batchSize!-- 默认时间字段转换格式, 可选, 默认yyyy-MM-dd HH:mm:ss--defaultDateFormatyyyy-MM-dd HH:mm:ss/defaultDateFormat!-- 脏数据是否继续, 可选, 默认false--dirtyDataContinuetrue/dirtyDataContinue!-- 脏数据文件, 可选, 默认datahub_ogg_plugin.dirty--dirtyDataFiledatahub_ogg_plugin.dirty/dirtyDataFile!-- 脏数据文件最大size, 单位M, 可选, 默认500--dirtyDataFileMaxSize200/dirtyDataFileMaxSize!-- 重试次数, -1:无限重试 0:不重试 n:重试次数, 可选, 默认-1--retryTimes0/retryTimes!-- 重试间隔, 单位毫秒, 可选, 默认3000--retryInterval4000/retryInterval!-- 点位文件, 可选, 默认datahub_ogg_plugin.chk--checkPointFileNamedatahub_ogg_plugin.chk/checkPointFileNamemappingsmapping!-- oracle schema, 见上描述--oracleSchema/oracleSchema!-- oracle table, 必选--oracleTablet_person/oracleTable!-- datahub project, 见上描述--datahubProject/datahubProject!-- datahub AccessId, 见上描述--datahubAccessId/datahubAccessId!-- datahub AccessKey, 见上描述--datahubAccessKey/datahubAccessKey!-- datahub topic, 必选--datahubTopict_person/datahubTopicctypeColumn/ctypeColumnctimeColumn/ctimeColumncidColumn/cidColumnconstColumnMap/constColumnMapcolumnMapping!--src:oracle字段名称, 必须;dest:datahub field, 必须;destOld:变更前数据落到datahub的field, 可选;isShardColumn: 是否作为shard的hashkey, 可选, 默认为false, 可以被shardId覆盖isDateFormat: timestamp字段是否采用DateFormat格式转换, 默认true. 如果是false, 源端数据必须是longdateFormat: timestamp字段的转换格式, 不填就用默认值--column srcid destid isShardColumntrue isDateFormatfalse dateFormatyyyy-MM-dd HH:mm:ss/column srcname destname isShardColumntrue/column srcage destage/column srcaddress destaddress/column srccomments destcomments/column srcsex destsex/column srctemp desttemp destOldtemp1//columnMapping!--指定shard id, 优先生效, 可选--shardId1/shardId/mapping/mappings
/configue在ggsci下启动datahub writeredit params dhwriterextract dhwriter
getEnv (JAVA_HOME)
getEnv (LD_LIBRARY_PATH)
getEnv (PATH)
CUSEREXIT ./libggjava_ue.so CUSEREXIT PASSTHRU INCLUDEUPDATEBEFORES, PARAMS {YOUR_HOME}/datahub-ogg-plugin/conf/javaue.properties
sourcedefs ./dirdef/ogg_test.def
table OGG_TEST.*;添加dhwriteradd extract dhwriter, exttrailsource ./dirdat/st启动dhwriterstart dhwriter三、使用场景这里会用一个简单的示例来说明数据的使用方法例如我们在Oracle数据库有一张商品订单表ordersoid int, pid int, num int该表有三列分别为订单ID, 商品ID和商品数量。将这个表通过OGG Datahub进行增量数据同步之前我们需要先将源表已有的数据通过DataX同步到MaxCompute中。增量同步的关键步骤如下1在Datahub上创建相应的TopicTopic的schema为(string record_id, string optype, string readtime, bigint oid_before, bigint oid_after, bigint pid_before, bigint pid_after, bigint num_before, bigint num_after)2OGG Datahub的插件按照上述的安装流程部署配置其中列的Mapping配置如下 ctypeColumnoptype/ctypeColumnctimeColumnreadtime/ctimeColumncolumnMappingcolumn srcoid destoid_after destOldoid_before isShardColumntrue/column srcpid destpid_after destOldpid_before/column srcnum destnum_after destOldnum_before//columnMapping其中optype和readtime字段是记录数据库的数据变更类型和时间optype有I, D, U三种取值分别对应为“增”“删”“改”三种数据变更操作。3OGG Datahub插件部署好成功运行后插件会源源不断的将源表的数据变更记录输送至datahub中例如我们在源订单表中新增一条记录121datahub里收到的记录如下--------------------------------------------------------------------------------------------------------
| record_id | optype | readtime | oid_before | oid_after | pid_before | pid_after | num_before | num_after |
-------------------------------------------------------------------------------------------------------
| 14810373343020000 | I | 2016-12-06 15:15:28.000141 | NULL | 1 | NULL | 2 | NULL | 1 | 修改这条数据比如把num改为20datahub则会收到的一条变更数据记录如下-------------------------------------------------------------------------------------------------------
| record_id | optype | readtime | oid_before | oid_after | pid_before | pid_after | num_before | num_after |
--------------------------------------------------------------------------------------------------------
| 14810373343080000 | U | 2016-12-06 15:15:58.000253 | 1 | 1 | 2 | 2 | 1 | 20 |实时计算在前一天的离线计算的基础数据上我们可以写一个StreamCompute流计算的分析程序很容易地对数据进行实时汇总例如实时统计当前总的订单数每种商品的销售量等。处理思路就是对于每一条到来的变更数据可以拿到变化的数值实时更新统计变量即可。离线处理为了便于后续的离线分析我们也可以将Datahub里的数据归档到MaxCompute中在MaxCompute中创建相应Schema的表create table orders_log(record_id string, optype string, readtime string, oid_before bigint, oid_after bigint, pid_before bigint, pid_after bigint, num_before bigint, num_after bigint);在Datahub上创建MaxCompute的数据归档上述流入Datahub里的数据将自动同步到MaxCompute当中。建议将同步到MaxCompute中的数据按照时间段进行划分比如每一天的增量数据都对应一个独立分区。这样当天的数据同步完成后我们可以处理对应的分区拿到当天所有的数据变更而与和前一天的全量数据进行合并后即可得到当天的全量数据。为了简单起见先不考虑分区表的情况以2016-12-06这天的增量数据为例假设前一天的全量数据在表orders_base里面datahub同步过来的增量数据在orders_log表中将orders_base与orders_log做合并操作可以得到2016-12-06这天的最终全量数据写入表orders_result中。这个过程可以在MaxCompute上用如下这样一条SQL完成。INSERT OVERWRITE TABLE orders_result
SELECT t.oid, t.pid, t.num
FROM
(SELECT oid, pid, num, 0 x_record_id, 1 AS x_optypeFROMorders_base UNION ALLSELECT decode(optype,D,oid_before,oid_after) AS oid, decode(optype,D, pid_before,pid_after) AS pid, num_after AS num, record_id x_record_id, decode(optype, D, 0, 1) AS x_optypeFROMorders_log) t
JOIN(SELECToid, pid, max(record_id) x_max_modifiedFROM(SELECToid, pid, 0 record_idFROMorders_base UNION ALL SELECTdecode(optype,D,oid_before,oid_after) AS oid, decode(optype,D, pid_before,pid_after) AS pid, record_idFROMorders_log ) gGROUP BY oid , pid) s
ON
t.oid s.oid AND t.pid s.pid AND t.x_record_id s.x_max_modified AND t.x_optype 0;四、常见问题Q目标端报错 OGG-06551 Oracle GoldenGate Collector: Could not translate host name localhost into an Internet address.A目标端机器hostname在/etc/hosts里面重新设置localhost对应的ipQ找不到jvm相关的so包A将jvm的so路径添加到LD_LIBRARY_PATH后重启mgr例如export LD_LIBRARY_PATH${LD_LIBRARY_PATH}:$JAVA_HOME/lib/amd64:$JAVA_HOME/lib/amd64/serverQ有了DDL语句比如增加一列源端ogg没有问题但是adapter端的ffwriter和jmswriter进程退出且报错 2015-06-11 14:01:10 ERROR OGG-01161 Bad column index (5) specified for table OGG_TEST.T_PERSON, max columns 5.A由于表结构改变需要重做def文件将重做的def文件放入dirdef后重启即可作者冶善原文链接本文为云栖社区原创内容未经允许不得转载。