博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink 流处理API
阅读量:4096 次
发布时间:2019-05-25

本文共 26356 字,大约阅读时间需要 87 分钟。

一、Environment

1.1 getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境。也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

如果没有设置并行度,会以 flink-conf.yaml 中的配置为准,默认是1

在这里插入代码片

1.2 createLocalEnvironment

返回本地执行环境,需要在调用时指定默认的并行度

LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);

1.3 createRemoteEnvironment

返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定 JobManager的 IP 和 端口号 ,并指定要在集群中运行的Jar包

StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123, "Jar包路径");

二、Source

2.1 从集合读取数据

public class SourceTest1_Collection {
public static void main(String[] args) throws Exception{
// 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从集合中读取数据,DataStreamSource 可以使用 DataStream类 替换 DataStreamSource
dataStreamSource = env.fromCollection( Arrays.asList( new SensorReading("sensor_1", 154777324345L, 35.8), new SensorReading("sensor_2", 154777324346L, 15.4), new SensorReading("sensor_3", 154777324347L, 6.7), new SensorReading("sensor_4", 154777324348L, 38.1) ) ); // 也可以使用如下方法 DataStream
integerDataStream = env.fromElements(1,2,3,10,2324); // 打印输出 dataStreamSource.print("data"); integerDataStream.print("int"); // 执行,参数是Jobname,有默认值 env.execute(); }}

打印结果:

int:4> 2int:1> 3data:4> com.test.sun.apitest.beans.SensorReading@c721304data:1> com.test.sun.apitest.beans.SensorReading@7aef3173int:3> 1data:2> com.test.sun.apitest.beans.SensorReading@1f657d1edata:3> com.test.sun.apitest.beans.SensorReading@347daeb1int:2> 10int:3> 2324

从打印结果可以看出来,集合数据是乱序的。若想保证打印结果是按集合顺序的,可以使用env.setParallelism(1)方法,完整代码如下所示

public class SourceTest1_Collection {
public static void main(String[] args) throws Exception{
// 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 如果想要输出结果是按顺序的,可以使用如下方式 env.setParallelism(1); // 从集合中读取数据,DataStreamSource 可以使用 DataStream类 替换 DataStreamSource
dataStreamSource = env.fromCollection( Arrays.asList( new SensorReading("sensor_1", 154777324345L, 35.8), new SensorReading("sensor_2", 154777324346L, 15.4), new SensorReading("sensor_3", 154777324347L, 6.7), new SensorReading("sensor_4", 154777324348L, 38.1) ) ); // 也可以使用如下方法 DataStream
integerDataStream = env.fromElements(1,2,3,10,2324); // 打印输出 dataStreamSource.print("data"); integerDataStream.print("int"); // 执行,参数是Jobname,有默认值 env.execute(); }}

打印结果:

int> 1int> 2int> 3data> com.test.sun.apitest.beans.SensorReading@4754ae8eint> 10data> com.test.sun.apitest.beans.SensorReading@49935814int> 2324data> com.test.sun.apitest.beans.SensorReading@4dacc5fedata> com.test.sun.apitest.beans.SensorReading@8ae522f

2.2 从文件读取数据

public class SourceTest2_File {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从文件中读取数据 DataStream
dataStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 打印输出 dataStream.print(); env.execute(); }}

打印结果:

4> sensor_1,154777324345L,35.82> sensor_4,154777324348L,38.11> sensor_3,154777324347L,6.74> sensor_2,154777324346L,15.4

2.3 从Kafka消息队列读取数据

2.3.1 需要引入kafka连接器的依赖

org.apache.flink
flink-connector-kafka-0.11_2.12
1.9.0

2.3.2 SourceTest3_Kafka类

public class SourceTest3_Kafka {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // kafka配置项 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); // 从kafka中读取数据 DataStream
dataStream = env.addSource(new FlinkKafkaConsumer011
("sensor", new SimpleStringSchema(), properties)); // 打印输出 dataStream.print(); env.execute(); }}

2.3.3 启动zookeeper

cd zookeeper-3.4.13./bin/zkServer.sh start

出现如下信息表示启动成功

ZooKeeper JMX enabled by defaultUsing config: /Users/sundongping/Downloads/安装包/zookeeper-3.4.13/bin/../conf/zoo.cfgStarting zookeeper ... already running as process 26287.

第一次启动zookeeper的话,通常会失败,失败信息如下

ZooKeeper JMX enabled by defaultUsing config: /Users/sundongping/Downloads/安装包/zookeeper-3.4.13/bin/../conf/zoo.cfggrep: /Users/sundongping/Downloads/安装包/zookeeper-3.4.13/bin/../conf/zoo.cfg: No such file or directorymkdir: : No such file or directoryStarting zookeeper ... ./bin/zkServer.sh: line 149: /zookeeper_server.pid: Permission deniedFAILED TO WRITE PID

解决这个报错的方法是在conf目录下,新建一个zoo.cfg文件,文件内容如下

tickTime=2000initLimit=10syncLimit=5dataDir=/Users/sundongping/Downloads/安装包/zookeeper-3.4.13clientPort=2181

2.3.4 启动kafka

cd kafka_2.11-2.1.0./bin/kafka-server-start.sh -daemon ./config/server.properties./bin/./kafka-console-producer.sh --broker-list 192.168.1.5:9092 --topic sensor

⚠️注意:kafka与zookeeper的版本必须对应,否则kafka会报如下错误could not be established. Broker may not be available. (org.apache.kafka.cli。如果你不知道自己的kafka版本对应的zookeeper版本号的话,可以去kafka_2.11-2.1.0/libs目录下看kafka自带的zookeeper版本

2.4 自定义测试数据源

自定义数据源需要做的只是传入一个 SourceFunction就可以。

public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从kafka中读取数据 DataStream
dataStream = env.addSource(new MySensorSource()); // 打印输出 dataStream.print(); env.execute();}// 实现自定义的SourceFunctionpublic static class MySensorSource implements SourceFunction
{
// 定义一个标识位,用来控制数据的产生 private boolean running = true; @Override public void run(SourceContext
ctx) throws Exception {
// 定义一个随机数发生器 Random random = new Random(); // 设置10个传感器的初始温度 HashMap
sensorTempMap = new HashMap<>(); for(int i = 0; i < 10; i++){
sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20); } while (running){
for(String sensorId: sensorTempMap.keySet()){
// 在当前温度基础上随机波动 Double newtemp = sensorTempMap.get(sensorId) + random.nextGaussian(); sensorTempMap.put(sensorId, newtemp); ctx.collect(new SensorReading(sensorId, System.currentTimeMillis(), newtemp)); } // 控制输出频率 Thread.sleep(1000); } } @Override public void cancel() {
}}

三、Transform(转换算子)

3.1 基本转换算子

3.1.1 map

map算子属于来一个数据处理一个

3.1.2 flatMap

flatMap算子常用于打散数据/拆分数据

3.1.3 filter

filter算子顾名思义用于过滤数据,结果数据的类型与原始数据相同

示例代码:

public class TransformTest1_Base {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream
inputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 1. map 把String转换成长度输出 DataStream
mapStream = inputStream.map(new MapFunction
() {
@Override public Integer map(String s) throws Exception {
return s.length(); } }); // 2. flatMap 按逗号切分字段 DataStream
flatMapStream = inputStream.flatMap(new FlatMapIterator
() {
@Override public Iterator
flatMap(String s) throws Exception { List
arrayList = new ArrayList<>(); String[] fields = s.split(","); for(String field : fields){ arrayList.add(field); } return arrayList.iterator(); } }); // 3. filter 筛选sensor_1开头的id对应的数据 DataStream
filterStream = inputStream.filter(new FilterFunction
() { @Override public boolean filter(String s) throws Exception { return s.startsWith("sensor_1"); } }); // 打印输出 mapStream.print("map"); flatMapStream.print("flatMap"); filterStream.print("filter"); env.execute(); }}

运行结果:

map> 27flatMap> sensor_1flatMap> 154777324345LflatMap> 35.8filter> sensor_1,154777324345L,35.8map> 27flatMap> sensor_2flatMap> 154777324346LflatMap> 15.4map> 26flatMap> sensor_3flatMap> 154777324347LflatMap> 6.7map> 27flatMap> sensor_4flatMap> 154777324348LflatMap> 38.1

3.2 聚合相关算子

KeyBy

DataStream -> KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的

3.2.1 滚动聚合算子(Rolling Aggregation)

这些算子可以针对 KeyedStream 的每一支流做聚合

  • sum()
  • min()
  • max()
  • minBy()
  • maxBy()

max()示例:

public class TransformTest2_RollingAggregation {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream
inputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型// DataStream
dataStream = inputStream.map(new MapFunction
() {
// @Override// public SensorReading map(String s) throws Exception {
// String[] fields = s.split(",");// return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));// }// }); // java8 中的lamda表达式 DataStream
dataStream = inputStream.map( line -> {
String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 分组// KeyedStream
keyedStream = dataStream.keyBy("id"); // java8 方法调用 KeyedStream
keyedStream = dataStream.keyBy(data -> data.getId()); // java8 方法引用// KeyedStream
keyedStream1 = dataStream.keyBy(SensorReading::getId); // 滚动聚合,取当前最大的温度值 SingleOutputStreamOperator
resultStream = keyedStream.max("temperature"); // 打印输出 resultStream.print(); env.execute(); }}

运行结果:

只有最大温度字段变了,其他字段值不变
在这里插入图片描述

maxBy()示例:

public class TransformTest2_RollingAggregation {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream
inputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型// DataStream
dataStream = inputStream.map(new MapFunction
() {
// @Override// public SensorReading map(String s) throws Exception {
// String[] fields = s.split(",");// return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));// }// }); // java8 中的lamda表达式 DataStream
dataStream = inputStream.map( line -> {
String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 分组// KeyedStream
keyedStream = dataStream.keyBy("id"); // java8 方法调用 KeyedStream
keyedStream = dataStream.keyBy(data -> data.getId()); // java8 方法引用// KeyedStream
keyedStream1 = dataStream.keyBy(SensorReading::getId); // 滚动聚合,取当前最大的温度值 SingleOutputStreamOperator
resultStream = keyedStream.maxBy("temperature"); // 打印输出 resultStream.print(); env.execute(); }}

运行结果:

最大温度值变了,其他字段也会随之改变,变成最大温度值所属对象的
在这里插入图片描述

3.2.2 Reduce

KeyedStream -> DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果

public class TransformTest3_Reduce {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream
inputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream
dataStream = inputStream.map(line -> {
String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 分组 KeyedStream
keyedStream = dataStream.keyBy("id"); // reduce聚合,取最大的温度值,以及当前最新的时间戳// keyedStream.reduce(new ReduceFunction
() {
// @Override// public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception {
// return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));// }// }); // java8 lamda表达式 SingleOutputStreamOperator
resultStr = keyedStream.reduce((curState, newData) -> { return new SensorReading(curState.getId(), newData.getTimestamp(), Math.max(curState.getTemperature(), newData.getTemperature())); }); resultStr.print(); env.execute(); }}

运行结果:

在这里插入图片描述

3.3 多流转换算子

3.3.1 Split 和 Select(组合使用)

Split算子用于按一定特征把一条流切成两条流或多条流。DataStream -> SplitStream:根据某些特征把一个 DataStream 拆分成两个或者多个 SplitStream

在这里插入图片描述
Select算子
在这里插入图片描述
示例代码:

public class TransformTest4_MultiplieStreams {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream
inputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream
dataStream = inputStream.map(line -> {
String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 1. 分流。按照温度值30度为界分为两条流,split已被弃用 SplitStream
splitStream = dataStream.split(new OutputSelector
() {
@Override public Iterable
select(SensorReading sensorReading) {
return (sensorReading.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low"); } }); DataStream
highTempStream = splitStream.select("high"); DataStream
lowTempStream = splitStream.select("low"); DataStream
allTempStream = splitStream.select("high", "low"); highTempStream.print("high"); lowTempStream.print("low"); allTempStream.print("all"); env.execute(); }}

运行结果:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.all> SensorReading{
id='sensor_1', timestamp=154777324345, temperature=35.8}high> SensorReading{
id='sensor_1', timestamp=154777324345, temperature=35.8}low> SensorReading{
id='sensor_2', timestamp=154777324346, temperature=15.4}all> SensorReading{
id='sensor_2', timestamp=154777324346, temperature=15.4}low> SensorReading{
id='sensor_3', timestamp=154777324347, temperature=6.7}all> SensorReading{
id='sensor_3', timestamp=154777324347, temperature=6.7}all> SensorReading{
id='sensor_4', timestamp=154777324348, temperature=38.1}high> SensorReading{
id='sensor_4', timestamp=154777324348, temperature=38.1}all> SensorReading{
id='sensor_1', timestamp=154777324349, temperature=36.9}high> SensorReading{
id='sensor_1', timestamp=154777324349, temperature=36.9}all> SensorReading{
id='sensor_1', timestamp=154777324350, temperature=32.9}high> SensorReading{
id='sensor_1', timestamp=154777324350, temperature=32.9}all> SensorReading{
id='sensor_1', timestamp=154777324351, temperature=37.1}high> SensorReading{
id='sensor_1', timestamp=154777324351, temperature=37.1}low> SensorReading{
id='sensor_6', timestamp=154777324351, temperature=15.4}all> SensorReading{
id='sensor_6', timestamp=154777324351, temperature=15.4}low> SensorReading{
id='sensor_7', timestamp=154777324350, temperature=6.7}all> SensorReading{
id='sensor_7', timestamp=154777324350, temperature=6.7}

3.3.2 Connect 和 CoMap(组合使用)

Connect算子,DataStream -> ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

在这里插入图片描述
CoMap,CoFlatMap算子,ConnectedStreams -> DataStream:作用于 ConnectedStreams 上,功能与map 和 flatMap一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map和flatMap 处理
在这里插入图片描述
示例代码:

public class TransformTest4_MultiplieStreams {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream
inputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream
dataStream = inputStream.map(line -> {
String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 1. 分流。按照温度值30度为界分为两条流,split已被弃用 SplitStream
splitStream = dataStream.split(new OutputSelector
() {
@Override public Iterable
select(SensorReading sensorReading) {
return (sensorReading.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low"); } }); DataStream
highTempStream = splitStream.select("high"); DataStream
lowTempStream = splitStream.select("low"); DataStream
allTempStream = splitStream.select("high", "low"); highTempStream.print("high"); lowTempStream.print("low"); allTempStream.print("all"); // 2. 合流 connect,将高温流转换成二元组类型(确保是两种不同类型流),与低温流连接合并之后,输出状态信息 DataStream
> warningStream = highTempStream.map(new MapFunction
>() { @Override public Tuple2
map(SensorReading sensorReading) throws Exception { return new Tuple2<>(sensorReading.getId(), sensorReading.getTemperature()); } }); // 正式开始 ConnectedStreams
, SensorReading> connectedStreams = warningStream.connect(lowTempStream); DataStream
resultStream = connectedStreams.map(new CoMapFunction
, SensorReading, Object>() { @Override public Object map1(Tuple2
value) throws Exception { return new Tuple3<>(value.f0, value.f1, "high temp warning"); } @Override public Object map2(SensorReading value) throws Exception { return new Tuple2<>(value.getId(), "normal"); } }); resultStream.print(); env.execute(); }}

运行结果:

high> SensorReading{
id='sensor_1', timestamp=154777324345, temperature=35.8}all> SensorReading{
id='sensor_1', timestamp=154777324345, temperature=35.8}low> SensorReading{
id='sensor_2', timestamp=154777324346, temperature=15.4}all> SensorReading{
id='sensor_2', timestamp=154777324346, temperature=15.4}low> SensorReading{
id='sensor_3', timestamp=154777324347, temperature=6.7}all> SensorReading{
id='sensor_3', timestamp=154777324347, temperature=6.7}high> SensorReading{
id='sensor_4', timestamp=154777324348, temperature=38.1}all> SensorReading{
id='sensor_4', timestamp=154777324348, temperature=38.1}high> SensorReading{
id='sensor_1', timestamp=154777324349, temperature=36.9}all> SensorReading{
id='sensor_1', timestamp=154777324349, temperature=36.9}high> SensorReading{
id='sensor_1', timestamp=154777324350, temperature=32.9}all> SensorReading{
id='sensor_1', timestamp=154777324350, temperature=32.9}high> SensorReading{
id='sensor_1', timestamp=154777324351, temperature=37.1}all> SensorReading{
id='sensor_1', timestamp=154777324351, temperature=37.1}low> SensorReading{
id='sensor_6', timestamp=154777324351, temperature=15.4}all> SensorReading{
id='sensor_6', timestamp=154777324351, temperature=15.4}low> SensorReading{
id='sensor_7', timestamp=154777324350, temperature=6.7}all> SensorReading{
id='sensor_7', timestamp=154777324350, temperature=6.7}(sensor_1,35.8,high temp warning)(sensor_2,normal)(sensor_4,38.1,high temp warning)(sensor_3,normal)(sensor_1,36.9,high temp warning)(sensor_6,normal)(sensor_1,32.9,high temp warning)(sensor_7,normal)(sensor_1,37.1,high temp warning)

缺点:只能针对两条流,多条流不能使用

3.3.3 Union

DataStream -> DataStream:对两个或者两个以上类型相同的 DataStream 进行 union 操作,产生一个包含所有 DataStream 元素的新DataStream。

在这里插入图片描述
示例代码:

public class TransformTest4_MultiplieStreams {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStream
inputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream
dataStream = inputStream.map(line -> {
String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 1. 分流。按照温度值30度为界分为两条流,split已被弃用 SplitStream
splitStream = dataStream.split(new OutputSelector
() {
@Override public Iterable
select(SensorReading sensorReading) {
return (sensorReading.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low"); } }); DataStream
highTempStream = splitStream.select("high"); DataStream
lowTempStream = splitStream.select("low"); DataStream
allTempStream = splitStream.select("high", "low"); highTempStream.print("high"); lowTempStream.print("low"); allTempStream.print("all"); // 2. 合流 connect,将高温流转换成二元组类型(确保是两种不同类型流),与低温流连接合并之后,输出状态信息 DataStream
> warningStream = highTempStream.map(new MapFunction
>() { @Override public Tuple2
map(SensorReading sensorReading) throws Exception { return new Tuple2<>(sensorReading.getId(), sensorReading.getTemperature()); } }); // 正式开始 ConnectedStreams
, SensorReading> connectedStreams = warningStream.connect(lowTempStream); DataStream
resultStream = connectedStreams.map(new CoMapFunction
, SensorReading, Object>() { @Override public Object map1(Tuple2
value) throws Exception { return new Tuple3<>(value.f0, value.f1, "high temp warning"); } @Override public Object map2(SensorReading value) throws Exception { return new Tuple2<>(value.getId(), "normal"); } }); resultStream.print(); // 3. union联合多条流 DataStream
unionStream = highTempStream.union(lowTempStream, allTempStream); unionStream.print("union"); env.execute(); }}

运行结果:

high> SensorReading{
id='sensor_1', timestamp=154777324345, temperature=35.8}all> SensorReading{
id='sensor_1', timestamp=154777324345, temperature=35.8}all> SensorReading{
id='sensor_2', timestamp=154777324346, temperature=15.4}low> SensorReading{
id='sensor_2', timestamp=154777324346, temperature=15.4}all> SensorReading{
id='sensor_3', timestamp=154777324347, temperature=6.7}low> SensorReading{
id='sensor_3', timestamp=154777324347, temperature=6.7}high> SensorReading{
id='sensor_4', timestamp=154777324348, temperature=38.1}all> SensorReading{
id='sensor_4', timestamp=154777324348, temperature=38.1}high> SensorReading{
id='sensor_1', timestamp=154777324349, temperature=36.9}all> SensorReading{
id='sensor_1', timestamp=154777324349, temperature=36.9}high> SensorReading{
id='sensor_1', timestamp=154777324350, temperature=32.9}all> SensorReading{
id='sensor_1', timestamp=154777324350, temperature=32.9}high> SensorReading{
id='sensor_1', timestamp=154777324351, temperature=37.1}all> SensorReading{
id='sensor_1', timestamp=154777324351, temperature=37.1}all> SensorReading{
id='sensor_6', timestamp=154777324351, temperature=15.4}low> SensorReading{
id='sensor_6', timestamp=154777324351, temperature=15.4}all> SensorReading{
id='sensor_7', timestamp=154777324350, temperature=6.7}low> SensorReading{
id='sensor_7', timestamp=154777324350, temperature=6.7}union> SensorReading{
id='sensor_1', timestamp=154777324345, temperature=35.8}union> SensorReading{
id='sensor_4', timestamp=154777324348, temperature=38.1}union> SensorReading{
id='sensor_1', timestamp=154777324349, temperature=36.9}union> SensorReading{
id='sensor_1', timestamp=154777324350, temperature=32.9}union> SensorReading{
id='sensor_1', timestamp=154777324351, temperature=37.1}union> SensorReading{
id='sensor_2', timestamp=154777324346, temperature=15.4}union> SensorReading{
id='sensor_3', timestamp=154777324347, temperature=6.7}union> SensorReading{
id='sensor_6', timestamp=154777324351, temperature=15.4}union> SensorReading{
id='sensor_7', timestamp=154777324350, temperature=6.7}(sensor_1,35.8,high temp warning)(sensor_2,normal)(sensor_4,38.1,high temp warning)(sensor_3,normal)(sensor_1,36.9,high temp warning)(sensor_6,normal)(sensor_1,32.9,high temp warning)(sensor_7,normal)(sensor_1,37.1,high temp warning)

转载地址:http://quxii.baihongyu.com/

你可能感兴趣的文章
(python版)《剑指Offer》JZ30:连续子数组的最大和
查看>>
(python版)《剑指Offer》JZ32:把数组排成最小的数
查看>>
(python版)《剑指Offer》JZ02:替换空格
查看>>
JSP/Servlet——MVC设计模式
查看>>
使用JSTL
查看>>
Java 8新特性:Stream API
查看>>
管理用户状态——Cookie与Session
查看>>
最受欢迎的前端框架Bootstrap 入门
查看>>
JavaScript编程简介:DOM、AJAX与Chrome调试器
查看>>
通过Maven管理项目依赖
查看>>
通过Spring Boot三分钟创建Spring Web项目
查看>>
Spring的IoC(依赖注入)原理
查看>>
Guava快速入门
查看>>
Java编程基础:static的用法
查看>>
Java编程基础:抽象类和接口
查看>>
Java编程基础:异常处理
查看>>
Java编程基础:了解面向对象
查看>>
新一代Java模板引擎Thymeleaf
查看>>
Spring MVC中使用Thymeleaf模板引擎
查看>>
Spring Boot构建简单的微博应用
查看>>