无锡企业网站制作公司有哪些,appstore下载免费软件,惠州seo推广优化,金融行业网站建设公司Flink 有三种部署模式#xff0c;分别是 Local、Standalone Cluster 和 Yarn Cluster。 1.1. Local模式 对于 Local 模式来说#xff0c;JobManager 和 TaskManager 会公用一个 JVM 来完成 Workload。如果要验证一个简单的应用#xff0c;Local 模式是最方便的。实际应用中大…Flink 有三种部署模式分别是 Local、Standalone Cluster 和 Yarn Cluster。 1.1. Local模式 对于 Local 模式来说JobManager 和 TaskManager 会公用一个 JVM 来完成 Workload。如果要验证一个简单的应用Local 模式是最方便的。实际应用中大多使用 Standalone 或者 Yarn Cluster而local模式只是将安装包解压启动./bin/start-local.sh即可在这里不在演示。 1.2. Standalone 模式 1.2.1. 下载 安装包下载地址http://flink.apache.org/downloads.html 快速入门教程地址 https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html 1.2.2. 上传安装包到linux系统 使用rz命令 1.2.3. 解压 tar –zxvf flink-1.3.2-bin-hadoop26-scala_2.10.tgz 1.2.4. 重命名 mv flink-1.3.2 flink 1.2.5. 修改环境变量 切换到root用户配置 export FLINK_HOME/home/hadoop/flink export PATH$PATH:$FLINK_HOME/bin 配置结束后切换会普通用户 source /etc/profile 1.2.6. 修改配置文件 修改flink/conf/masters master1:8081 修改flink/conf/slaves master1ha master2 master2ha 修改flink/conf/flink-conf.yaml taskmanager.numberOfTaskSlots: 2 jobmanager.rpc.address: master1 1.2.7. 启动flink /home/Hadoop/flink/bin/start-cluster.sh 1.2.8. Flink 的 Rest API Flink 和其他大多开源的框架一样提供了很多有用的 Rest API。不过 Flink 的 RestAPI目前还不是很强大只能支持一些 Monitor 的功能。Flink Dashboard 本身也是通过其 Rest 来查询各项的结果数据。在 Flink RestAPI 基础上可以比较容易的将 Flink 的 Monitor 功能和其他第三方工具相集成这也是其设计的初衷。 在 Flink 的进程中是由 JobManager 来提供 Rest API 的服务。因此在调用 Rest 之前要确定 JobManager 是否处于正常的状态。正常情况下在发送一个 Rest 请求给 JobManager 之后Client 就会收到一个 JSON 格式的返回结果。由于目前 Rest 提供的功能还不多需要增强这块功能的读者可以在子项目 flink-runtime-web 中找到对应的代码。其中最关键一个类 WebRuntimeMonitor就是用来对所有的 Rest 请求做分流的如果需要添加一个新类型的请求就需要在这里增加对应的处理代码。下面我例举几个常用 Rest API。 1.查询 Flink 集群的基本信息: /overview。示例命令行格式以及返回结果如下 $ curl http://localhost:8081/overview {taskmanagers:1,slots-total:16, slots-available:16,jobs-running:0,jobs-finished:0,jobs-cancelled:0,jobs-failed:0} 2.查询当前 Flink 集群中的 Job 信息/jobs。示例命令行格式以及返回结果如下 $ curl http://localhost:8081/jobs {jobs-running:[],jobs-finished: [f91d4dd4fdf99313d849c9c4d29f8977],jobs-cancelled:[],jobs-failed:[]} 3.查询一个指定的 Job 信息: /jobs/jobid。这个查询的结果会返回特别多的详细的内容这是我在浏览器中进行的测试如下图 想要了解更多 Rest 请求内容的读者可以去 Apache Flink 的页面中查找。 1.2.9. 运行测试任务 ./bin/flink run -m master1:8082 ./examples/batch/WordCount.jar --input hdfs://master1:9000/words.txt --output hdfs://master1:9000/clinkout 1.3. Flink 的 HA 首先我们需要知道 Flink 有两种部署的模式分别是 Standalone 以及 Yarn Cluster 模式。对于 Standalone 来说Flink 必须依赖于 Zookeeper 来实现 JobManager 的 HAZookeeper 已经成为了大部分开源框架 HA 必不可少的模块。在 Zookeeper 的帮助下一个 Standalone 的 Flink 集群会同时有多个活着的 JobManager其中只有一个处于工作状态其他处于 Standby 状态。当工作中的 JobManager 失去连接后如宕机或 CrashZookeeper 会从 Standby 中选举新的 JobManager 来接管 Flink 集群。 对于 Yarn Cluaster 模式来说Flink 就要依靠 Yarn 本身来对 JobManager 做 HA 了。其实这里完全是 Yarn 的机制。对于 Yarn Cluster 模式来说JobManager 和 TaskManager 都是被 Yarn 启动在 Yarn 的 Container 中。此时的 JobManager其实应该称之为 Flink Application Master。也就说它的故障恢复就完全依靠着 Yarn 中的 ResourceManager和 MapReduce 的 AppMaster 一样。由于完全依赖了 Yarn因此不同版本的 Yarn 可能会有细微的差异。这里不再做深究。 1.3.1. 修改配置文件 修改flink-conf.yaml state.backend: filesystem state.backend.fs.checkpointdir: hdfs://master1:9000/flink-checkpoints high-availability: zookeeper high-availability.storageDir: hdfs://master1:9000/flink/ha/ high-availability.zookeeper.quorum: master1ha:2181,master2:2181,master2ha:2181 high-availability.zookeeper.client.acl: open 修改conf server.1master1ha:2888:3888 server.2master2:2888:3888 server.3master2ha:2888:3888 修改masters master1:8082 master1ha:8082 修改slaves master1ha master2 master2ha 1.3.2. 启动 /home/Hadoop/flink/bin/start-cluster.sh 1.4. Yarn Cluster 模式 1.4.1. 引入 在一个企业中为了最大化的利用集群资源一般都会在一个集群中同时运行多种类型的 Workload。因此 Flink 也支持在 Yarn 上面运行。首先让我们通过下图了解下 Yarn 和 Flink 的关系。 在图中可以看出Flink 与 Yarn 的关系与 MapReduce 和 Yarn 的关系是一样的。Flink 通过 Yarn 的接口实现了自己的 App Master。当在 Yarn 中部署了 FlinkYarn 就会用自己的 Container 来启动 Flink 的 JobManager也就是 App Master和 TaskManager。 1.4.2. 修改环境变量 export HADOOP_CONF_DIR /home/hadoop/hadoop/etc/hadoop 1.4.3. 部署启动 yarn-session.sh -d -s 2 -tm 800 -n 2 上面的命令的意思是同时向Yarn申请3个container其中 2 个 Container 启动 TaskManager-n 2每个 TaskManager 拥有两个 Task Slot-s 2并且向每个 TaskManager 的 Container 申请 800M 的内存以及一个ApplicationMasterJob Manager。 Flink部署到Yarn Cluster后会显示Job Manager的连接细节信息。 Flink on Yarn会覆盖下面几个参数如果不希望改变配置文件中的参数可以动态的通过-D选项指定如 -Dfs.overwrite-filestrue -Dtaskmanager.network.numberOfBuffers16368 jobmanager.rpc.address因为JobManager会经常分配到不同的机器上 taskmanager.tmp.dirs使用Yarn提供的tmp目录 parallelism.default如果有指定slot个数的情况下 yarn-session.sh会挂起进程所以可以通过在终端使用CTRLC或输入stop停止yarn-session。 如果不希望Flink Yarn client长期运行Flink提供了一种detached YARN session启动时候加上参数-d或—detached 在上面的命令成功后我们就可以在 Yarn Application 页面看到 Flink 的纪录。如下图。 如果在虚拟机中测试可能会遇到错误。这里需要注意内存的大小Flink 向 Yarn 会申请多个 Container但是 Yarn 的配置可能限制了 Container 所能申请的内存大小甚至 Yarn 本身所管理的内存就很小。这样很可能无法正常启动 TaskManager尤其当指定多个 TaskManager 的时候。因此在启动 Flink 之后需要去 Flink 的页面中检查下 Flink 的状态。这里可以从 RM 的页面中直接跳转点击 Tracking UI。这时候 Flink 的页面如图 yarn-session.sh启动命令参数如下 Usage: Required -n,--container arg Number of YARN container to allocate (Number of Task Managers) Optional -D arg Dynamic properties -d,--detached Start detached -jm,--jobManagerMemory arg Memory for JobManager Container [in MB] -nm,--name Set a custom name for the application on YARN -q,--query Display available YARN resources (memory, cores) -qu,--queue arg Specify YARN queue. -s,--slots arg Number of slots per TaskManager -st,--streaming Start Flink in streaming mode -tm,--taskManagerMemory arg Memory per TaskManager Container [in MB] 1.4.4. 提交任务 之后我们可以通过这种方式提交我们的任务 ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar 以上命令在参数前加上y前缀-yn表示TaskManager个数。 在这个模式下同样可以使用-m yarn-cluster提交一个运行后即焚的detached yarn-yd作业到yarn cluster。 1.4.5. 停止yarn cluster yarn application -kill application_1507603745315_0001 转载于:https://www.cnblogs.com/advise09/p/10194917.html