Flink
Function
Function
最基础的函数,里面没有任何内容。
- SourceFunction
- SinkFunction
- AggregateFunction
RichFunction
没有具体的执行逻辑,所以也没有 I、O、Collector,具体的功能实现在实现类里面。
实现类:RichMapFunction、RichFlatmapFunction
+ lifecycle
open、close:对于一个并行子任务(物理区分)来说只会调用一次
状态的更新是针对逻辑分区的,状态是与key绑定的
+ RuntimeText
主要涉及 任务、执行、状态的的信息。
KeyedProcessFunction
+ processElement( In, Context, Collectior<Out> )
+ Ontimer( timestamp, OnTimerContext, Collector<Out> )
+ Context
:TimeStamp、TimeService、output、CurrentKey
ProcessFunction:只是少了key信息
ProcessWindowFunction
重点还是Process,功能针对Window做了调整。
+ process( Key, Context, Iterable<IN>, Collector<OUT> )
只有Window会有输入迭代
+ Context
保留output,去掉时间服务,增加Window信息
+ clear(Context)
ProcessAllWindowFunction:同样少了key信息
多流转换
分流
filter()
不建议使用
ctx.out()
OutputTag:ProcessFunction.processElement():out
合流
%%{init: {'theme': 'default', 'themeVariables': { 'fontSize': '14px'}}}%% classDiagram appStream0 --|> appStream : union appStream1 --|> appStream : union appStream --|> stream_5 : connect thirdpartyStream --|> stream_5 : connect stream_5 --|> stream_6 : map、flatmap、process stream_5 --|> stream_7 : keyedBy:order_n stream_7 --|> stream_8 : map、flatmap、process %% 类定义 class appStream0{ ("order-1", "app", 1000L) } class appStream1{ ("order-2", "app", 2000L) } class appStream{ ("order-1", "app", 1000L) ("order-2", "app", 2000L) } class thirdpartyStream{ ("order-1", "third-party", "success", 3000L) ("order-3", "third-party", "success", 4000L) } class stream_5{ <<ConnectedStream>> } class stream_6{ <<DataStream>> } class stream_7{ <<ConnectedStream>> ("order-1", "app", 1000L) ("order-1", "third-party", "success", 3000L) } class stream_8{ <<DataStream>> }
DataStreamunion
(DataStream
两条流的数据类型一致。
ConnectedStreams<T, R>: stream1.connect(stream2)
通过上面的命令,只是看似将两条流简单的合并在了一起,实际上没有任何的处理,也没有任何用。- 这一步骤在语法上可有可无,实际业务如果需要keyBy使用即可。
connectedStreams.keyBy
(keySelector1, keySelector2)
- 这一步骤在语法上可有可无,实际业务如果需要keyBy使用即可。
=> DataStream
观察下面运用的函数,都是CoXX,就是函数里都有针对两条流的处理方法,就是这样达到了不同内容流的合并。- map( CoMapFunction )
- flatMap(CoFlatmapFunction)
- process( CoProcessFunction )
案例:订单核对
1 | package chapter08_Streams; |
联结
JoinedStream只能用apply
方法处理
%%{init: {'theme': 'default', 'themeVariables': { 'fontSize': '14px'}}}%% classDiagram direction LR stream1 --|> stream3 : join stream2 --|> stream3 : join stream3 --|> stream4 : apply( \n JoinFunction、FlatJoinFunction ) %% 类定义 class stream1{ Tuple2.of("a", 1000L) Tuple2.of("b", 1000L) Tuple2.of("a", 2000L) Tuple2.of("b", 2000L) } class stream2{ Tuple2.of("a", 3000L) Tuple2.of("b", 3000L) Tuple2.of("a", 4000L) Tuple2.of("b", 4000L) } class stream3{ <<JoinedStream>> \.where(x -> x.f0) \.equalTo(y -> y.f0) \.window(TumblingEventTimeWindows\n .of(Time.seconds(5))) } class stream4{ <<DataStream>> (a,1000) => (a,3000) (a,1000) => (a,4000) (a,2000) => (a,3000) (a,2000) => (a,4000) (b,1000) => (b,3000) (b,1000) => (b,4000) (b,2000) => (b,3000) (b,2000) => (b,4000) }
案例:订单核对
%%{init: {'theme': 'default', 'themeVariables': { 'fontSize': '14px'}}}%% classDiagram direction TB orderStream --|> stream3 : keyed stream3 --|> stream4 : intervalJoin clickStream --|> stream4 : intervalJoin stream4 --|> stream5 : process(ProcessJoinFunction) %% 类定义 class orderStream{ ("Alice", "order-2", 5000L) ("Bob", "order-3", 20000L) ("Alice", "order-4", 20000L) } class stream3{ <<KeyedStream>> } class clickStream{ ("Bob", "./cart", 2000L) ("Alice", "./prod?id=100", 3000L) ("Alice", "./prod?id=200", 3500L) ("Bob", "./prod?id=2", 2500L) ("Alice", "./prod?id=300", 36000L) ("Bob", "./home", 30000L) ("Bob", "./prod?id=1", 23000L) ("Bob", "./prod?id=3", 33000L) } class stream4{ <<KeyedStream>> \.intervalJoin(clickStream.keyBy(data -> data.user)) \.between(Time.seconds(-5),Time.seconds(10)) } class stream5{ <<DataStream>> (Alice, ./prod?id=100, 00:03_00) => (Alice,order-2,5000) (Alice, ./prod?id=200, 00:03_500) => (Alice,order-2,5000) (Bob, ./home, 00:30_00)=> (Bob,order-3,20000) (Bob, ./prod?id=1, 00:23_00) => (Bob,order-3,20000) }
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 xiaoling!
评论