Function

Function

 最基础的函数,里面没有任何内容。

  • SourceFunction
  • SinkFunction
  • AggregateFunction

RichFunction

 没有具体的执行逻辑,所以也没有 I、O、Collector,具体的功能实现在实现类里面。

 实现类:RichMapFunction、RichFlatmapFunction

  • + lifecycle
    open、close:对于一个并行子任务(物理区分)来说只会调用一次
           状态的更新是针对逻辑分区的,状态是与key绑定的


  • + RuntimeText
    主要涉及 任务、执行、状态的的信息。



KeyedProcessFunction

  • + processElement( In, Context, Collectior<Out> )
  • + Ontimer( timestamp, OnTimerContext, Collector<Out> )
  • + ContextTimeStamp、TimeService、output、CurrentKey

  ProcessFunction:只是少了key信息



ProcessWindowFunction

 重点还是Process,功能针对Window做了调整。

  • + process( Key, Context, Iterable<IN>, Collector<OUT> )
     只有Window会有输入迭代
  • + Context
     保留output,去掉时间服务,增加Window信息
  • + clear(Context)

  ProcessAllWindowFunction:同样少了key信息



多流转换

分流

  • filter()
    不建议使用
  • ctx.out()
     OutputTag:ProcessFunction.processElement():out

合流

union

 DataStream union(DataStream… streams)
 两条流的数据类型一致。

Connect

  1. ConnectedStreams<T, R>: stream1.connect(stream2)
    通过上面的命令,只是看似将两条流简单的合并在了一起,实际上没有任何的处理,也没有任何用。

    • 这一步骤在语法上可有可无,实际业务如果需要keyBy使用即可。
      connectedStreams.keyBy(keySelector1, keySelector2)

  2. => DataStream
    观察下面运用的函数,都是CoXX,就是函数里都有针对两条流的处理方法,就是这样达到了不同内容流的合并。

    • map( CoMapFunction )
    • flatMap(CoFlatmapFunction)
    • process( CoProcessFunction )

案例:订单核对

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package chapter08_Streams;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

/**
* 订单核对
*/
public class BillCheck {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// appStream
SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env
.fromElements(
("order-1", "app", 1000L),
("order-2", "app", 2000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple3<String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner((order,y) -> order.f2)
);

// thirdpartyStream
SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdpartyStream = env.
fromElements(
Tuple4.of("order-1", "third-party", "success", 3000L),
Tuple4.of("order-3", "third-party", "success", 4000L)
)
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple4<String, String, String, Long>>forMonotonousTimestamps()
.withTimestampAssigner((event, y) -> event.f3)
);

// 检测同一支付订单在两条流中是否匹配
appStream.connect(thirdpartyStream)
.keyBy(order -> order.f0, thirdparty -> thirdparty.f0)
.process(new OrderMatchResult())
.print() ;

env.execute() ;
}

public static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>,Tuple4<String, String, String, Long>, String> {

private ValueState<Tuple4<String, String, String, Long>> thirdpartyEventStat ;
private ValueState<Tuple3<String, String, Long>> appEventState;


@Override
// 仅是对上面定义的状态属性的实例化
public void open(Configuration parameters) throws Exception {
thirdpartyEventStat = getRuntimeContext().getState(
new ValueStateDescriptor<Tuple4<String, String, String, Long>>(
"thirdparty-event"
, Types.TUPLE(Types.STRING,Types.STRING,Types.STRING,Types.LONG)
)
) ;

appEventState = getRuntimeContext().getState(
new ValueStateDescriptor<Tuple3<String, String, Long>>(
" app-event"
,Types.TUPLE(Types.STRING,Types.STRING,Types.LONG)
)
);
}

@Override
/**
* 这个操作是keyedStream之后的
* 所以在key是1-1的数据里,有数据进来,一定是key对应需要的那条
*/
public void processElement1(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
if (thirdpartyEventStat.value() != null ) {
out.collect("对账成功:" + value + " " + thirdpartyEventStat.value());
thirdpartyEventStat.clear();
} else {
appEventState.update(value);
// 5秒之后再次进行对账
ctx.timerService().registerProcessingTimeTimer(value.f2 + 5000L);
}

}

@Override
public void processElement2(Tuple4<String, String, String, Long> value, Context ctx, Collector<String> out) throws Exception {
if (appEventState.value() != null ) {
out.collect("对账成功:" + appEventState.value() + " " + value);
appEventState.clear();
} else {
thirdpartyEventStat.update(value);
ctx.timerService().registerProcessingTimeTimer(value.f3 + 5000L);
}
}

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
if (appEventState.value() != null) {
out.collect("对账失败:" + appEventState.value() + " " + "第三方平台消息未到");
}
if (thirdpartyEventStat != null) {
out.collect("对账失败:" + thirdpartyEventStat.value() + " " + "app信息未到");
}
appEventState.clear();
thirdpartyEventStat.clear();
}
}
}


联结

Window Join

JoinedStream只能用apply方法处理

案例:订单核对



Interval Join



CoGroup