我做的网站怎么是危险网站,中国市场调查网,许昌旅游网站建设现状,温州做网站优化使用场景#xff1a; 标量函数即 UDF#xff0c;⽤于进⼀条数据出⼀条数据的场景。
开发流程#xff1a;
实现 org.apache.flink.table.functions.ScalarFunction 接⼝实现⼀个或者多个⾃定义的 eval 函数#xff0c;名称必须叫做 eval#xff0c;eval ⽅法签名必须是 p…使用场景 标量函数即 UDF⽤于进⼀条数据出⼀条数据的场景。
开发流程
实现 org.apache.flink.table.functions.ScalarFunction 接⼝实现⼀个或者多个⾃定义的 eval 函数名称必须叫做 evaleval ⽅法签名必须是 public 的eval ⽅法的⼊参、出参都是直接体现在 eval 函数的签名中
开发案例
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;/*** 输入数据 * nc -lk 88888* a,1** 输出结果* res1:3 I[97]* res2:3 I[97]* res3:3 I[97]*/
public class ScalarFunctionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings settings EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv StreamTableEnvironment.create(env, settings);DataStreamSourceString source env.socketTextStream(localhost, 8888);SingleOutputStreamOperatorTuple2String, String tpStream source.map(new MapFunctionString, Tuple2String, String() {Overridepublic Tuple2String, String map(String input) throws Exception {return new Tuple2(input.split(,)[0], input.split(,)[1]);}});Table table tEnv.fromDataStream(tpStream, id,name);tEnv.createTemporaryView(SourceTable,table);// 在 Table API ⾥不经注册直接调⽤函数Table res1 tEnv.from(SourceTable).select(call(HashFunction.class, $(id)));// 注册函数tEnv.createTemporarySystemFunction(HashFunction, HashFunction.class);// 在 Table API ⾥调⽤注册好的函数Table res2 tEnv.from(SourceTable).select(call(HashFunction, $(id)));// 在 SQL ⾥调⽤注册好的函数Table res3 tEnv.sqlQuery(SELECT HashFunction(id) FROM SourceTable);tEnv.toDataStream(res1).print(res1);tEnv.toDataStream(res2).print(res2);tEnv.toDataStream(res3).print(res3);env.execute();}public static class HashFunction extends ScalarFunction {// 接受任意类型输⼊返回 INT 型输出public int eval(DataTypeHint(inputGroup InputGroup.ANY) Object o) {return o.hashCode();}}
}测试结果