Flink
Function
Function
最基础的函数,里面没有任何内容。
- SourceFunction
- SinkFunction
- AggregateFunction
RichFunction
没有具体的执行逻辑,所以也没有 I、O、Collector,具体的功能实现在实现类里面。
实现类:RichMapFunction、RichFlatmapFunction
+ lifecycle
open、close:对于一个并行子任务(物理区分)来说只会调用一次
状态的更新是针对逻辑分区的,状态是与key绑定的
+ RuntimeText
主要涉及 任务、执行、状态的的信息。
KeyedProcessFunction
+ processElement( In, Context, Collectior<Out> )
+ Ontimer( timestamp, OnTimerContext, Collector<Out> )
+ Context:TimeStamp、TimeService、output、CurrentKey
ProcessFunction:只是少了key信息
ProcessWindowFunction
重点还是Process,功能针对Window做了调整。
+ process( Key, Context, Iterable<IN>, Collector<OUT> )
只有Window会有输入迭代
+ Context
保留output,去掉时间服务,增加Window信息
+ clear(Context)
ProcessAllWindowFunction:同样少了key信息
多流转换
分流
filter()
不建议使用
ctx.out()
OutputTag:ProcessFunction.processElement():out
合流
%%{init: {'theme': 'default', 'themeVariables': { 'fontSize': '14px'}}}%%
classDiagram
appStream0 --|> appStream : union
appStream1 --|> appStream : union
appStream --|> stream_5 : connect
thirdpartyStream --|> stream_5 : connect
stream_5 --|> stream_6 : map、flatmap、process
stream_5 --|> stream_7 : keyedBy:order_n
stream_7 --|> stream_8 : map、flatmap、process
%% 类定义
class appStream0{
("order-1", "app", 1000L)
}
class appStream1{
("order-2", "app", 2000L)
}
class appStream{
("order-1", "app", 1000L)
("order-2", "app", 2000L)
}
class thirdpartyStream{
("order-1", "third-party", "success", 3000L)
("order-3", "third-party", "success", 4000L)
}
class stream_5{
<<ConnectedStream>>
}
class stream_6{
<<DataStream>>
}
class stream_7{
<<ConnectedStream>>
("order-1", "app", 1000L)
("order-1", "third-party", "success", 3000L)
}
class stream_8{
<<DataStream>>
}
DataStreamunion(DataStream
两条流的数据类型一致。
ConnectedStreams<T, R>: stream1.connect(stream2)
通过上面的命令,只是看似将两条流简单的合并在了一起,实际上没有任何的处理,也没有任何用。- 这一步骤在语法上可有可无,实际业务如果需要keyBy使用即可。
connectedStreams.keyBy(keySelector1, keySelector2)
- 这一步骤在语法上可有可无,实际业务如果需要keyBy使用即可。
=> DataStream
观察下面运用的函数,都是CoXX,就是函数里都有针对两条流的处理方法,就是这样达到了不同内容流的合并。- map( CoMapFunction )
- flatMap(CoFlatmapFunction)
- process( CoProcessFunction )
案例:订单核对
1 | package chapter08_Streams; |
联结
JoinedStream只能用apply方法处理
%%{init: {'theme': 'default', 'themeVariables': { 'fontSize': '14px'}}}%%
classDiagram
direction LR
stream1 --|> stream3 : join
stream2 --|> stream3 : join
stream3 --|> stream4 : apply( \n JoinFunction、FlatJoinFunction )
%% 类定义
class stream1{
Tuple2.of("a", 1000L)
Tuple2.of("b", 1000L)
Tuple2.of("a", 2000L)
Tuple2.of("b", 2000L)
}
class stream2{
Tuple2.of("a", 3000L)
Tuple2.of("b", 3000L)
Tuple2.of("a", 4000L)
Tuple2.of("b", 4000L)
}
class stream3{
<<JoinedStream>>
\.where(x -> x.f0)
\.equalTo(y -> y.f0)
\.window(TumblingEventTimeWindows\n .of(Time.seconds(5)))
}
class stream4{
<<DataStream>>
(a,1000) => (a,3000)
(a,1000) => (a,4000)
(a,2000) => (a,3000)
(a,2000) => (a,4000)
(b,1000) => (b,3000)
(b,1000) => (b,4000)
(b,2000) => (b,3000)
(b,2000) => (b,4000)
}
案例:订单核对
%%{init: {'theme': 'default', 'themeVariables': { 'fontSize': '14px'}}}%%
classDiagram
direction TB
orderStream --|> stream3 : keyed
stream3 --|> stream4 : intervalJoin
clickStream --|> stream4 : intervalJoin
stream4 --|> stream5 : process(ProcessJoinFunction)
%% 类定义
class orderStream{
("Alice", "order-2", 5000L)
("Bob", "order-3", 20000L)
("Alice", "order-4", 20000L)
}
class stream3{
<<KeyedStream>>
}
class clickStream{
("Bob", "./cart", 2000L)
("Alice", "./prod?id=100", 3000L)
("Alice", "./prod?id=200", 3500L)
("Bob", "./prod?id=2", 2500L)
("Alice", "./prod?id=300", 36000L)
("Bob", "./home", 30000L)
("Bob", "./prod?id=1", 23000L)
("Bob", "./prod?id=3", 33000L)
}
class stream4{
<<KeyedStream>>
\.intervalJoin(clickStream.keyBy(data -> data.user))
\.between(Time.seconds(-5),Time.seconds(10))
}
class stream5{
<<DataStream>>
(Alice, ./prod?id=100, 00:03_00) => (Alice,order-2,5000)
(Alice, ./prod?id=200, 00:03_500) => (Alice,order-2,5000)
(Bob, ./home, 00:30_00)=> (Bob,order-3,20000)
(Bob, ./prod?id=1, 00:23_00) => (Bob,order-3,20000)
}
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 xiaoling!
评论
