黄石企业网站建设开发,网站logo教程,南昌网站空间,搜索引擎优化的方法背景
日常测试中我们使用flink的TestHarness只能测试单个算子#xff0c;很多情况下我们需要集成测试来测试真正的问题#xff0c;所以在flink中进行集成测试还是非常有必要的#xff0c;本文就来记录下如何在flink中进行集成测试
flink中进行集成测试
flink中进行集成测…背景
日常测试中我们使用flink的TestHarness只能测试单个算子很多情况下我们需要集成测试来测试真正的问题所以在flink中进行集成测试还是非常有必要的本文就来记录下如何在flink中进行集成测试
flink中进行集成测试
flink中进行集成测试的关键类MiniClusterWithClientResource这是一个启动本地flink集群的关键类先看一下集成测试的关键代码
/*** FLINK集成测试* https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/datastream/testing/**/
public class FlinkIntegrationTest {public static final Configuration config Configuration.fromMap(new HashMapString, String() {{put(heartbeat.timeout, 300000);}});ClassRulepublic static MiniClusterWithClientResource flinkCluster new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberSlotsPerTaskManager(1).setNumberTaskManagers(3).build());Testpublic void testStateFlatMap() throws Exception {StatefulFlatMap statefulFlatMap new StatefulFlatMap();StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements(world, hi).keyBy(e - 1).flatMap(statefulFlatMap).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(Lists.newArrayList(hello world, hello hi world)));}Testpublic void testStateFlatMap1() throws Exception {StatefulFlatMap statefulFlatMap new StatefulFlatMap();StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// configure your test environmentenv.setParallelism(2);// values are collected in a static variableCollectSink.values.clear();// create a stream of custom elements and apply transformationsenv.fromElements(world, hi, world).keyBy(e - e).flatMap(statefulFlatMap).addSink(new CollectSink());// executeenv.execute();// verify your resultsassertTrue(CollectSink.values.containsAll(Lists.newArrayList(hello world, hello hi, hello world world)));}// create a testing sinkprivate static class CollectSink implements SinkFunctionString {// must be staticpublic static final ListString values Collections.synchronizedList(new ArrayList());Overridepublic void invoke(String value, Context context) throws Exception {values.add(value);}}}public class StatefulFlatMap extends RichFlatMapFunctionString, String {ValueStateString previousInput;Overridepublic void open(Configuration parameters) throws Exception {previousInput getRuntimeContext().getState(new ValueStateDescriptorString(previousInput, Types.STRING));}Overridepublic void flatMap(String in, CollectorString collector) throws Exception {String out hello in;if(previousInput.value() ! null){out out previousInput.value();}previousInput.update(in);collector.collect(out);}由于我们是集成测试我们一般输入source和输出sink是自己构造的比如这里的CollectSink这里就可以正常测试包括状态在内的pineline集成测试了