当前位置: 首页 > news >正文

.net 获取网站域名做平台交易网站怎么收款

.net 获取网站域名,做平台交易网站怎么收款,three.js 做的网站,卖域名的网站哪些好前置知识#xff1a; Flume学习笔记#xff08;1#xff09;—— Flume入门-CSDN博客 Flume学习笔记#xff08;2#xff09;—— Flume进阶-CSDN博客 Flume 自定义组件 自定义 Interceptor 需求分析#xff1a;使用 Flume 采集服务器本地日志#xff0c;需要按照日志… 前置知识 Flume学习笔记1—— Flume入门-CSDN博客 Flume学习笔记2—— Flume进阶-CSDN博客 Flume 自定义组件 自定义 Interceptor 需求分析使用 Flume 采集服务器本地日志需要按照日志类型的不同将不同种类的日志发往不同的分析系统 需要使用Flume 拓扑结构中的 Multiplexing 结构Multiplexing的原理是根据 event 中 Header 的某个 key 的值将不同的 event 发送到不同的 Channel中所以我们需要自定义一个 Interceptor为不同类型的 event 的 Header 中的 key 赋予不同的值 实现流程 代码 导入依赖 dependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/version/dependency /dependencies 自定义拦截器的代码 package com.why.interceptor;import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList; import java.util.List; import java.util.Map;public class TypeInterceptor implements Interceptor {//存放事件的集合private ListEvent addHeaderEvents;Overridepublic void initialize() {//初始化集合addHeaderEvents new ArrayList();}//单个事件拦截Overridepublic Event intercept(Event event) {//获取头信息MapString, String headers event.getHeaders();//获取body信息String body new String(event.getBody());//根据数据中是否包含”why“来分组if(body.contains(why)){headers.put(type,first);}else {headers.put(type,second);}return event;}//批量事件拦截Overridepublic ListEvent intercept(ListEvent events) {//清空集合addHeaderEvents.clear();//遍历eventsfor(Event event : events){//给每一个事件添加头信息addHeaderEvents.add(intercept(event));}return addHeaderEvents;}Overridepublic void close() {}//构建生成器public static class TypeBuilder implements Interceptor.Builder{Overridepublic Interceptor build() {return new TypeInterceptor();}Overridepublic void configure(Context context) {}}} 将代码打包放入flume安装路径下的lib文件夹中 配置文件 在job文件夹下创建group4目录添加配置文件 为 hadoop102 上的 Flume1 配置 1 个 netcat source1 个 sink group2 个 avro sink并配置相应的 ChannelSelector 和 interceptor # Name the components on this agent a1.sources r1 a1.sinks k1 k2 a1.channels c1 c2# Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444 a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.why.interceptor.TypeInterceptor$TypeBuilder a1.sources.r1.selector.type multiplexing a1.sources.r1.selector.header type a1.sources.r1.selector.mapping.first c1 a1.sources.r1.selector.mapping.second c2# Describe the sink a1.sinks.k1.type avro a1.sinks.k1.hostname hadoop103 a1.sinks.k1.port 4141 a1.sinks.k2.typeavro a1.sinks.k2.hostname hadoop104 a1.sinks.k2.port 4242# Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Use a channel which buffers events in memory a1.channels.c2.type memory a1.channels.c2.capacity 1000 a1.channels.c2.transactionCapacity 100# Bind the source and sink to the channel a1.sources.r1.channels c1 c2 a1.sinks.k1.channel c1 a1.sinks.k2.channel c2 hadoop103配置一个 avro source 和一个 logger sink a1.sources r1 a1.sinks k1 a1.channels c1 a1.sources.r1.type avro a1.sources.r1.bind hadoop103 a1.sources.r1.port 4141 a1.sinks.k1.type logger a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.sinks.k1.channel c1 a1.sources.r1.channels c1 hadoop104配置一个 avro source 和一个 logger sink a1.sources r1 a1.sinks k1 a1.channels c1 a1.sources.r1.type avro a1.sources.r1.bind hadoop104 a1.sources.r1.port 4242 a1.sinks.k1.type logger a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.sinks.k1.channel c1 a1.sources.r1.channels c1 执行指令 hadoop102bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume-interceptor-flume.conf hadoop103bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1-flume-logger.conf -Dflume.root.loggerINFO,console hadoop104bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume2-flume-logger.conf -Dflume.root.loggerINFO,console 然后hadoop102通过nc连接44444端口发送数据 在hadoop103和104上分别接收到 自定义 Source 官方提供的文档Flume 1.11.0 Developer Guide — Apache Flume 给出的示例代码如下 public class MySource extends AbstractSource implements Configurable, PollableSource {private String myProp;Overridepublic void configure(Context context) {String myProp context.getString(myProp, defaultValue);// Process the myProp value (e.g. validation, convert to another type, ...)// Store myProp for later retrieval by process() methodthis.myProp myProp;}Overridepublic void start() {// Initialize the connection to the external client}Overridepublic void stop () {// Disconnect from external client and do any additional cleanup// (e.g. releasing resources or nulling-out field values) ..}Overridepublic Status process() throws EventDeliveryException {Status status null;try {// This try clause includes whatever Channel/Event operations you want to do// Receive new dataEvent e getSomeData();// Store the Event into this Sources associated Channel(s)getChannelProcessor().processEvent(e);status Status.READY;} catch (Throwable t) {// Log exception, handle individual exceptions as neededstatus Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}} finally {txn.close();}return status;} } 需要继承AbstractSource实现Configurable, PollableSource 实战需求分析 使用 flume 接收数据并给每条数据添加前缀输出到控制台。前缀可从 flume 配置文件中配置 代码 package com.why.source;import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource;import java.util.HashMap; import java.util.concurrent.ConcurrentMap;public class MySource extends AbstractSource implements PollableSource, Configurable {//定义配置文件将来要读取的字段private Long delay;private String field;//获取数据封装成 event 并写入 channel这个方法将被循环调用Overridepublic Status process() throws EventDeliveryException {try {//事件头信息HashMapString,String headerMap new HashMap();//创建事件SimpleEvent event new SimpleEvent();//循环封装事件for (int i 0; i 5; i) {//设置头信息event.setHeaders(headerMap);//设置事件内容event.setBody((field i).getBytes());//将事件写入ChannelgetChannelProcessor().processEvent(event);Thread.sleep(delay);}}catch (InterruptedException e) {throw new RuntimeException(e);}return Status.READY;}//backoff 步长Overridepublic long getBackOffSleepIncrement() {return 0;}//backoff 最长时间Overridepublic long getMaxBackOffSleepInterval() {return 0;}//初始化 context读取配置文件内容Overridepublic void configure(Context context) {delay context.getLong(delay);field context.getString(field,Hello);}}打包放到flume安装路径下的lib文件夹中 配置文件 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1# Describe/configure the source a1.sources.r1.type com.why.source.MySource a1.sources.r1.delay 1000 a1.sources.r1.field why# Describe the sink a1.sinks.k1.type logger# Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100# Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1 执行指令 hadoop102上bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group5/mysource.conf -Dflume.root.loggerINFO,console 结果如下 自定义 Sink Sink 不断地轮询 Channel 中的事件且批量地移除它们并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。 Sink 是完全事务性的。在从 Channel 批量删除数据之前每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume AgentSink 就利用 Channel 提交事务。事务一旦被提交该 Channel 从自己的内部缓冲区删除事件 官方文档Flume 1.11.0 Developer Guide — Apache Flume 接口实例 public class MySink extends AbstractSink implements Configurable {private String myProp;Overridepublic void configure(Context context) {String myProp context.getString(myProp, defaultValue);// Process the myProp value (e.g. validation)// Store myProp for later retrieval by process() methodthis.myProp myProp;}Overridepublic void start() {// Initialize the connection to the external repository (e.g. HDFS) that// this Sink will forward Events to ..}Overridepublic void stop () {// Disconnect from the external respository and do any// additional cleanup (e.g. releasing resources or nulling-out// field values) ..}Overridepublic Status process() throws EventDeliveryException {Status status null;// Start transactionChannel ch getChannel();Transaction txn ch.getTransaction();txn.begin();try {// This try clause includes whatever Channel operations you want to doEvent event ch.take();// Send the Event to the external repository.// storeSomeData(e);txn.commit();status Status.READY;} catch (Throwable t) {txn.rollback();// Log exception, handle individual exceptions as neededstatus Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}}return status;} } 自定义MySink 需要继承 AbstractSink 类并实现 Configurable 接口 实战需求分析 使用 flume 接收数据并在 Sink 端给每条数据添加前缀和后缀输出到控制台。前后缀可在 flume 任务配置文件中配置 代码 package com.why.sink;import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink;import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable {//创建 Logger 对象private static final Logger LOG LoggerFactory.getLogger(AbstractSink.class);//前后缀private String prefix;private String suffix;Overridepublic Status process() throws EventDeliveryException {//声明返回值状态信息Status status;//获取当前 Sink 绑定的 ChannelChannel ch getChannel();//获取事务Transaction txn ch.getTransaction();//声明事件Event event;//开启事务txn.begin();//读取 Channel 中的事件直到读取到事件结束循环while (true) {event ch.take();if (event ! null) {break;}}try {//处理事件打印LOG.info(prefix new String(event.getBody()) suffix);//事务提交txn.commit();status Status.READY;} catch (Exception e) {//遇到异常事务回滚txn.rollback();status Status.BACKOFF;} finally {//关闭事务txn.close();}return status;}Overridepublic void configure(Context context) {prefix context.getString(prefix, hello);suffix context.getString(suffix);} }打包放到flume安装路径下的lib文件夹中 配置文件 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1# Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444# Describe the sink a1.sinks.k1.type com.why.sink.MySink a1.sinks.k1.prefix why: a1.sinks.k1.suffix :why# Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100# Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1 执行指令 hadoop102上bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group6/mysink.conf -Dflume.root.loggerINFO,console 结果如下
http://www.yutouwan.com/news/184731/

相关文章:

  • 北京建设学院网站天津网站设计网站制作
  • 设置网站的默认页面百度网盘app下载安装官方免费版
  • 图片上传网站源码整站seo排名公司
  • 公司网站建设的系统功能需求长春网站优化seo
  • 建站平台转型怎么做便民信息网站
  • 中小企业网站的建设实践报告内江做网站多少钱
  • 如何跟进psd做网站wordpress的登录页面模板
  • 厦门专业网站免费的网页模板网站
  • 丽水微信网站建设公司营销型单页面网站
  • 织梦网站装修公司源码建一个网站流程
  • 门户网站的建设制作单页网站多少钱
  • php网站进后台从零开始学wordpress
  • 网站进入考核期要多久wordpress密码错误
  • 文化建设 设计公司网站公众号怎么开通商城
  • WordPress瀑布流图片站单位网址怎么编
  • 企业系统建设赣州做网站优化
  • 创建app与网站的区别wordpress linux伪静态
  • 个人域名备案快的网站北京的网站建设
  • asp网站开发的背景与环境北京网页设计公司兴田德润优选
  • 网站界面设计如何实现功能美与形式美的统一教你用wordpress
  • 移动网站和定制网站重庆小程序商城开发
  • 网站优化之站外优化技巧潮流设计网站
  • 哪个网站做简历做钓鱼网站用哪种编程语言
  • 网站的建设与运营专业网络工程建设
  • 空间信息网站淘宝网请人做淘宝客网站
  • 网站建设 步骤汕头拿家做网站
  • 专业北京网站建设公司影楼和工作室的区别
  • 可信网站认证 技术支持单位上海搜索优化推广
  • 开发建设网站多久wordpress修改根目录
  • 租网站服务器一个月多少钱北京市建设厅门户网站