我想做一个网站,太原网页设计最新招聘信息,谁做广东11彩票网站,网络应用服务管理GitLab的用户创建和推送
在root用户-密码界面重新设置密码添加Leader用户和自己使用的用户使用root用户创建相应的群组使用Leader用户创建对应的项目设置分支配置为“初始推送后完全保护”设置.gitignore文件#xff0c;项目配置文件等其他非通用代码无需提交安装gitlab proj…GitLab的用户创建和推送
在root用户-密码界面重新设置密码添加Leader用户和自己使用的用户使用root用户创建相应的群组使用Leader用户创建对应的项目设置分支配置为“初始推送后完全保护”设置.gitignore文件项目配置文件等其他非通用代码无需提交安装gitlab project 2020插件点击share project on gitlab 即可将项目上传到gitlab中
Flink集群的搭建
只需要运行Yarn模式配置Hadoop的环境变量 将Flink1.17解压安装到对应为止即可
Hbase的配置
依赖zookeeper和hadoop这两个框架检查Hadoop是否退出安全模式如果丢失文件先退出安全模式hdfs dfsadmin -safemode leave解压Hbase2.4.11的安装包添加Hbase的环境变量 修改配置文件 hbase-env.xml export hbase_manages_zkfalse 不使用自带的zookeeper hbase-site.xml hbase.cluster.distributed true 使用集群模式hbase.zookeeper.quorum hadoop102… zookeeper连接地址hbase.rootdir hdfs://hadoop102:8020, hbase在hdfs的存放根路径hbase.wal.provider filesystem 预写日志 regionservers: 添加hbase小弟的主机名称
Redise的配置
进入redise目录执行make指令进行编译make instanll安装将myredis.conf文件复制到~/目录下将bind 127.0.0.1 注释掉并且关闭保护模式设置daemon 后台启动模式为yesredis-server ./my_redis.conf后台启动
实时数仓ODS层
保证数据模拟器产生的数据是有序的 设置mock.if-realtime1重复执行数据模拟器产生数据时会从当前时间继续产生数据。Kafka数据有序Flink并发度和Kafka的分区数一致 设置三个kafka节点的分区个数都为4num.partitions4Flink的并发度4 历史维度数据 使用maxwell的bootstrap功能初始化维度信息json格式写入到kafka编写mysql_to_kafka_init.sh脚本maxwell需要检查是否连接mysql的binlog成功查看日志如果出错需要在mysql的maxwell库中删除所有表即可
实时数仓dim层
dim层的设计依据是维度建模理论并且遵循三范式使用雪花模型dim层的数据存储在Hbase中开发时需要切换到dev开发分支为Flink的开发创建一个基类名为BaseApp 抽象方法handle(): 每个主程序的业务逻辑具体方法start()里面实现Flink代码的通用逻辑 不同分组的数据只能消费一次如果数据需要给多个程序使用就需要分为不同的group
Flink-cdc获取维度信息
数据清洗动态拆分维度表功能 方式1直接将维度表做成List String (维度表名称)保存 如果将代码写死后续想要修改需要重新编译修改 方式2将维度表名称设计为单独的一个配置文件而不是在代码里面写死后续想要修改直接改配置文件重启任务即可生效方式3热修改hotfix, 热加载配置文件不需要重启热加载文件一般是以时间周期作为加载逻辑。时间长时会出现时效性问题时间短的话过于耗费资源。方式4zookeeper的watch的监控能够存储基础的表名但是不适合存储完整的表格信息除了要判断哪些是维度表还需要记录哪些数据需要写出到Hbase。方式5cdc变更数据抓取类似与maxwell。 注意运行下面的代码需要再虚拟机的/etc/my.cnf文件中开启对应数据库的binlog日志。注意对照库名是否填写正确。
public class Test02 {public static void main(String[] args) {//创建env//1.创建运行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(4);System.setProperty(HADOOP_USER_NAME, atguigu);//设置检查点和状态后端// 1.4 状态后端及检查点相关配置// 1.4.1 设置状态后端env.setStateBackend(new HashMapStateBackend()); 1.4.2 开启 checkpoint//env.enableCheckpointing(5000); 1.4.3 设置 checkpoint 模式: 精准一次//env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); 1.4.4 checkpoint 存储//env.getCheckpointConfig().setCheckpointStorage(hdfs://hadoop102:8020/gmall2023/stream/ test01); 1.4.5 checkpoint 并发数//env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); 1.4.6 checkpoint 之间的最小间隔//env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000); 1.4.7 checkpoint 的超时时间//env.getCheckpointConfig().setCheckpointTimeout(10000); 1.4.8 job 取消时 checkpoint 保留策略//env.getCheckpointConfig().setExternalizedCheckpointCleanup(RETAIN_ON_CANCELLATION);//读取数据//mysql sourceMySqlSourceString mySqlSource MySqlSource.Stringbuilder().hostname(Constant.MYSQL_HOST).port(Constant.MYSQL_PORT).username(Constant.MYSQL_USER_NAME).password(Constant.MYSQL_PASSWORD).databaseList(gmall2023_config).tableList(gmall2023_config.table_process_dim).deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).build();DataStreamSourceString ds env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),kafkasource).setParallelism(1);ds.print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}