本文共 26356 字,大约阅读时间需要 87 分钟。
创建一个执行环境,表示当前执行程序的上下文。如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境。也就是说,getExecutionEnvironment会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
如果没有设置并行度,会以 flink-conf.yaml 中的配置为准,默认是1
在这里插入代码片
返回本地执行环境,需要在调用时指定默认的并行度
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
返回集群执行环境,将Jar提交到远程服务器。需要在调用时指定 JobManager的 IP 和 端口号 ,并指定要在集群中运行的Jar包
StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123, "Jar包路径");
public class SourceTest1_Collection { public static void main(String[] args) throws Exception{ // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从集合中读取数据,DataStreamSource 可以使用 DataStream类 替换 DataStreamSourcedataStreamSource = env.fromCollection( Arrays.asList( new SensorReading("sensor_1", 154777324345L, 35.8), new SensorReading("sensor_2", 154777324346L, 15.4), new SensorReading("sensor_3", 154777324347L, 6.7), new SensorReading("sensor_4", 154777324348L, 38.1) ) ); // 也可以使用如下方法 DataStream integerDataStream = env.fromElements(1,2,3,10,2324); // 打印输出 dataStreamSource.print("data"); integerDataStream.print("int"); // 执行,参数是Jobname,有默认值 env.execute(); }}
打印结果:
int:4> 2int:1> 3data:4> com.test.sun.apitest.beans.SensorReading@c721304data:1> com.test.sun.apitest.beans.SensorReading@7aef3173int:3> 1data:2> com.test.sun.apitest.beans.SensorReading@1f657d1edata:3> com.test.sun.apitest.beans.SensorReading@347daeb1int:2> 10int:3> 2324
从打印结果可以看出来,集合数据是乱序的。若想保证打印结果是按集合顺序的,可以使用env.setParallelism(1)
方法,完整代码如下所示
public class SourceTest1_Collection { public static void main(String[] args) throws Exception{ // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 如果想要输出结果是按顺序的,可以使用如下方式 env.setParallelism(1); // 从集合中读取数据,DataStreamSource 可以使用 DataStream类 替换 DataStreamSourcedataStreamSource = env.fromCollection( Arrays.asList( new SensorReading("sensor_1", 154777324345L, 35.8), new SensorReading("sensor_2", 154777324346L, 15.4), new SensorReading("sensor_3", 154777324347L, 6.7), new SensorReading("sensor_4", 154777324348L, 38.1) ) ); // 也可以使用如下方法 DataStream integerDataStream = env.fromElements(1,2,3,10,2324); // 打印输出 dataStreamSource.print("data"); integerDataStream.print("int"); // 执行,参数是Jobname,有默认值 env.execute(); }}
打印结果:
int> 1int> 2int> 3data> com.test.sun.apitest.beans.SensorReading@4754ae8eint> 10data> com.test.sun.apitest.beans.SensorReading@49935814int> 2324data> com.test.sun.apitest.beans.SensorReading@4dacc5fedata> com.test.sun.apitest.beans.SensorReading@8ae522f
public class SourceTest2_File { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从文件中读取数据 DataStreamdataStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 打印输出 dataStream.print(); env.execute(); }}
打印结果:
4> sensor_1,154777324345L,35.82> sensor_4,154777324348L,38.11> sensor_3,154777324347L,6.74> sensor_2,154777324346L,15.4
org.apache.flink flink-connector-kafka-0.11_2.12 1.9.0
public class SourceTest3_Kafka { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // kafka配置项 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "consumer-group"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("auto.offset.reset", "latest"); // 从kafka中读取数据 DataStreamdataStream = env.addSource(new FlinkKafkaConsumer011 ("sensor", new SimpleStringSchema(), properties)); // 打印输出 dataStream.print(); env.execute(); }}
cd zookeeper-3.4.13./bin/zkServer.sh start
出现如下信息表示启动成功
ZooKeeper JMX enabled by defaultUsing config: /Users/sundongping/Downloads/安装包/zookeeper-3.4.13/bin/../conf/zoo.cfgStarting zookeeper ... already running as process 26287.
第一次启动zookeeper的话,通常会失败,失败信息如下
ZooKeeper JMX enabled by defaultUsing config: /Users/sundongping/Downloads/安装包/zookeeper-3.4.13/bin/../conf/zoo.cfggrep: /Users/sundongping/Downloads/安装包/zookeeper-3.4.13/bin/../conf/zoo.cfg: No such file or directorymkdir: : No such file or directoryStarting zookeeper ... ./bin/zkServer.sh: line 149: /zookeeper_server.pid: Permission deniedFAILED TO WRITE PID
解决这个报错的方法是在conf目录下,新建一个zoo.cfg文件,文件内容如下
tickTime=2000initLimit=10syncLimit=5dataDir=/Users/sundongping/Downloads/安装包/zookeeper-3.4.13clientPort=2181
cd kafka_2.11-2.1.0./bin/kafka-server-start.sh -daemon ./config/server.properties./bin/./kafka-console-producer.sh --broker-list 192.168.1.5:9092 --topic sensor
⚠️注意:kafka与zookeeper的版本必须对应,否则kafka会报如下错误could not be established. Broker may not be available. (org.apache.kafka.cli
。如果你不知道自己的kafka版本对应的zookeeper版本号的话,可以去kafka_2.11-2.1.0/libs
目录下看kafka自带的zookeeper版本
自定义数据源需要做的只是传入一个 SourceFunction就可以。
public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 从kafka中读取数据 DataStreamdataStream = env.addSource(new MySensorSource()); // 打印输出 dataStream.print(); env.execute();}// 实现自定义的SourceFunctionpublic static class MySensorSource implements SourceFunction { // 定义一个标识位,用来控制数据的产生 private boolean running = true; @Override public void run(SourceContext ctx) throws Exception { // 定义一个随机数发生器 Random random = new Random(); // 设置10个传感器的初始温度 HashMap sensorTempMap = new HashMap<>(); for(int i = 0; i < 10; i++){ sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20); } while (running){ for(String sensorId: sensorTempMap.keySet()){ // 在当前温度基础上随机波动 Double newtemp = sensorTempMap.get(sensorId) + random.nextGaussian(); sensorTempMap.put(sensorId, newtemp); ctx.collect(new SensorReading(sensorId, System.currentTimeMillis(), newtemp)); } // 控制输出频率 Thread.sleep(1000); } } @Override public void cancel() { }}
map算子属于来一个数据处理一个
flatMap算子常用于打散数据/拆分数据
filter算子顾名思义用于过滤数据,结果数据的类型与原始数据相同
示例代码:
public class TransformTest1_Base { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStreaminputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 1. map 把String转换成长度输出 DataStream mapStream = inputStream.map(new MapFunction () { @Override public Integer map(String s) throws Exception { return s.length(); } }); // 2. flatMap 按逗号切分字段 DataStream flatMapStream = inputStream.flatMap(new FlatMapIterator () { @Override public Iterator flatMap(String s) throws Exception { List arrayList = new ArrayList<>(); String[] fields = s.split(","); for(String field : fields){ arrayList.add(field); } return arrayList.iterator(); } }); // 3. filter 筛选sensor_1开头的id对应的数据 DataStream filterStream = inputStream.filter(new FilterFunction () { @Override public boolean filter(String s) throws Exception { return s.startsWith("sensor_1"); } }); // 打印输出 mapStream.print("map"); flatMapStream.print("flatMap"); filterStream.print("filter"); env.execute(); }}
运行结果:
map> 27flatMap> sensor_1flatMap> 154777324345LflatMap> 35.8filter> sensor_1,154777324345L,35.8map> 27flatMap> sensor_2flatMap> 154777324346LflatMap> 15.4map> 26flatMap> sensor_3flatMap> 154777324347LflatMap> 6.7map> 27flatMap> sensor_4flatMap> 154777324348LflatMap> 38.1
DataStream -> KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的
这些算子可以针对 KeyedStream 的每一支流做聚合
max()示例:
public class TransformTest2_RollingAggregation { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStreaminputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型// DataStream dataStream = inputStream.map(new MapFunction () { // @Override// public SensorReading map(String s) throws Exception { // String[] fields = s.split(",");// return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));// }// }); // java8 中的lamda表达式 DataStream dataStream = inputStream.map( line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 分组// KeyedStream keyedStream = dataStream.keyBy("id"); // java8 方法调用 KeyedStream keyedStream = dataStream.keyBy(data -> data.getId()); // java8 方法引用// KeyedStream keyedStream1 = dataStream.keyBy(SensorReading::getId); // 滚动聚合,取当前最大的温度值 SingleOutputStreamOperator resultStream = keyedStream.max("temperature"); // 打印输出 resultStream.print(); env.execute(); }}
运行结果:
只有最大温度字段变了,其他字段值不变maxBy()示例:
public class TransformTest2_RollingAggregation { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStreaminputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型// DataStream dataStream = inputStream.map(new MapFunction () { // @Override// public SensorReading map(String s) throws Exception { // String[] fields = s.split(",");// return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));// }// }); // java8 中的lamda表达式 DataStream dataStream = inputStream.map( line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 分组// KeyedStream keyedStream = dataStream.keyBy("id"); // java8 方法调用 KeyedStream keyedStream = dataStream.keyBy(data -> data.getId()); // java8 方法引用// KeyedStream keyedStream1 = dataStream.keyBy(SensorReading::getId); // 滚动聚合,取当前最大的温度值 SingleOutputStreamOperator resultStream = keyedStream.maxBy("temperature"); // 打印输出 resultStream.print(); env.execute(); }}
运行结果:
最大温度值变了,其他字段也会随之改变,变成最大温度值所属对象的KeyedStream -> DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果
public class TransformTest3_Reduce { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStreaminputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 分组 KeyedStream keyedStream = dataStream.keyBy("id"); // reduce聚合,取最大的温度值,以及当前最新的时间戳// keyedStream.reduce(new ReduceFunction () { // @Override// public SensorReading reduce(SensorReading value1, SensorReading value2) throws Exception { // return new SensorReading(value1.getId(), value2.getTimestamp(), Math.max(value1.getTemperature(), value2.getTemperature()));// }// }); // java8 lamda表达式 SingleOutputStreamOperator resultStr = keyedStream.reduce((curState, newData) -> { return new SensorReading(curState.getId(), newData.getTimestamp(), Math.max(curState.getTemperature(), newData.getTemperature())); }); resultStr.print(); env.execute(); }}
运行结果:
Split算子用于按一定特征把一条流切成两条流或多条流。DataStream -> SplitStream:根据某些特征把一个 DataStream 拆分成两个或者多个 SplitStream
Select算子 示例代码:public class TransformTest4_MultiplieStreams { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStreaminputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 1. 分流。按照温度值30度为界分为两条流,split已被弃用 SplitStream splitStream = dataStream.split(new OutputSelector () { @Override public Iterable select(SensorReading sensorReading) { return (sensorReading.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low"); } }); DataStream highTempStream = splitStream.select("high"); DataStream lowTempStream = splitStream.select("low"); DataStream allTempStream = splitStream.select("high", "low"); highTempStream.print("high"); lowTempStream.print("low"); allTempStream.print("all"); env.execute(); }}
运行结果:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.all> SensorReading{ id='sensor_1', timestamp=154777324345, temperature=35.8}high> SensorReading{ id='sensor_1', timestamp=154777324345, temperature=35.8}low> SensorReading{ id='sensor_2', timestamp=154777324346, temperature=15.4}all> SensorReading{ id='sensor_2', timestamp=154777324346, temperature=15.4}low> SensorReading{ id='sensor_3', timestamp=154777324347, temperature=6.7}all> SensorReading{ id='sensor_3', timestamp=154777324347, temperature=6.7}all> SensorReading{ id='sensor_4', timestamp=154777324348, temperature=38.1}high> SensorReading{ id='sensor_4', timestamp=154777324348, temperature=38.1}all> SensorReading{ id='sensor_1', timestamp=154777324349, temperature=36.9}high> SensorReading{ id='sensor_1', timestamp=154777324349, temperature=36.9}all> SensorReading{ id='sensor_1', timestamp=154777324350, temperature=32.9}high> SensorReading{ id='sensor_1', timestamp=154777324350, temperature=32.9}all> SensorReading{ id='sensor_1', timestamp=154777324351, temperature=37.1}high> SensorReading{ id='sensor_1', timestamp=154777324351, temperature=37.1}low> SensorReading{ id='sensor_6', timestamp=154777324351, temperature=15.4}all> SensorReading{ id='sensor_6', timestamp=154777324351, temperature=15.4}low> SensorReading{ id='sensor_7', timestamp=154777324350, temperature=6.7}all> SensorReading{ id='sensor_7', timestamp=154777324350, temperature=6.7}
Connect算子,DataStream -> ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。
CoMap,CoFlatMap算子,ConnectedStreams -> DataStream:作用于 ConnectedStreams 上,功能与map 和 flatMap一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map和flatMap 处理 示例代码:public class TransformTest4_MultiplieStreams { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStreaminputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 1. 分流。按照温度值30度为界分为两条流,split已被弃用 SplitStream splitStream = dataStream.split(new OutputSelector () { @Override public Iterable select(SensorReading sensorReading) { return (sensorReading.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low"); } }); DataStream highTempStream = splitStream.select("high"); DataStream lowTempStream = splitStream.select("low"); DataStream allTempStream = splitStream.select("high", "low"); highTempStream.print("high"); lowTempStream.print("low"); allTempStream.print("all"); // 2. 合流 connect,将高温流转换成二元组类型(确保是两种不同类型流),与低温流连接合并之后,输出状态信息 DataStream > warningStream = highTempStream.map(new MapFunction >() { @Override public Tuple2 map(SensorReading sensorReading) throws Exception { return new Tuple2<>(sensorReading.getId(), sensorReading.getTemperature()); } }); // 正式开始 ConnectedStreams , SensorReading> connectedStreams = warningStream.connect(lowTempStream); DataStream
运行结果:
high> SensorReading{ id='sensor_1', timestamp=154777324345, temperature=35.8}all> SensorReading{ id='sensor_1', timestamp=154777324345, temperature=35.8}low> SensorReading{ id='sensor_2', timestamp=154777324346, temperature=15.4}all> SensorReading{ id='sensor_2', timestamp=154777324346, temperature=15.4}low> SensorReading{ id='sensor_3', timestamp=154777324347, temperature=6.7}all> SensorReading{ id='sensor_3', timestamp=154777324347, temperature=6.7}high> SensorReading{ id='sensor_4', timestamp=154777324348, temperature=38.1}all> SensorReading{ id='sensor_4', timestamp=154777324348, temperature=38.1}high> SensorReading{ id='sensor_1', timestamp=154777324349, temperature=36.9}all> SensorReading{ id='sensor_1', timestamp=154777324349, temperature=36.9}high> SensorReading{ id='sensor_1', timestamp=154777324350, temperature=32.9}all> SensorReading{ id='sensor_1', timestamp=154777324350, temperature=32.9}high> SensorReading{ id='sensor_1', timestamp=154777324351, temperature=37.1}all> SensorReading{ id='sensor_1', timestamp=154777324351, temperature=37.1}low> SensorReading{ id='sensor_6', timestamp=154777324351, temperature=15.4}all> SensorReading{ id='sensor_6', timestamp=154777324351, temperature=15.4}low> SensorReading{ id='sensor_7', timestamp=154777324350, temperature=6.7}all> SensorReading{ id='sensor_7', timestamp=154777324350, temperature=6.7}(sensor_1,35.8,high temp warning)(sensor_2,normal)(sensor_4,38.1,high temp warning)(sensor_3,normal)(sensor_1,36.9,high temp warning)(sensor_6,normal)(sensor_1,32.9,high temp warning)(sensor_7,normal)(sensor_1,37.1,high temp warning)
缺点:只能针对两条流,多条流不能使用
DataStream -> DataStream:对两个或者两个以上类型相同的 DataStream 进行 union 操作,产生一个包含所有 DataStream 元素的新DataStream。
示例代码:public class TransformTest4_MultiplieStreams { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 从文件中读取数据 DataStreaminputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); // 1. 分流。按照温度值30度为界分为两条流,split已被弃用 SplitStream splitStream = dataStream.split(new OutputSelector () { @Override public Iterable select(SensorReading sensorReading) { return (sensorReading.getTemperature() > 30) ? Collections.singletonList("high") : Collections.singletonList("low"); } }); DataStream highTempStream = splitStream.select("high"); DataStream lowTempStream = splitStream.select("low"); DataStream allTempStream = splitStream.select("high", "low"); highTempStream.print("high"); lowTempStream.print("low"); allTempStream.print("all"); // 2. 合流 connect,将高温流转换成二元组类型(确保是两种不同类型流),与低温流连接合并之后,输出状态信息 DataStream > warningStream = highTempStream.map(new MapFunction >() { @Override public Tuple2 map(SensorReading sensorReading) throws Exception { return new Tuple2<>(sensorReading.getId(), sensorReading.getTemperature()); } }); // 正式开始 ConnectedStreams , SensorReading> connectedStreams = warningStream.connect(lowTempStream); DataStream
运行结果:
high> SensorReading{ id='sensor_1', timestamp=154777324345, temperature=35.8}all> SensorReading{ id='sensor_1', timestamp=154777324345, temperature=35.8}all> SensorReading{ id='sensor_2', timestamp=154777324346, temperature=15.4}low> SensorReading{ id='sensor_2', timestamp=154777324346, temperature=15.4}all> SensorReading{ id='sensor_3', timestamp=154777324347, temperature=6.7}low> SensorReading{ id='sensor_3', timestamp=154777324347, temperature=6.7}high> SensorReading{ id='sensor_4', timestamp=154777324348, temperature=38.1}all> SensorReading{ id='sensor_4', timestamp=154777324348, temperature=38.1}high> SensorReading{ id='sensor_1', timestamp=154777324349, temperature=36.9}all> SensorReading{ id='sensor_1', timestamp=154777324349, temperature=36.9}high> SensorReading{ id='sensor_1', timestamp=154777324350, temperature=32.9}all> SensorReading{ id='sensor_1', timestamp=154777324350, temperature=32.9}high> SensorReading{ id='sensor_1', timestamp=154777324351, temperature=37.1}all> SensorReading{ id='sensor_1', timestamp=154777324351, temperature=37.1}all> SensorReading{ id='sensor_6', timestamp=154777324351, temperature=15.4}low> SensorReading{ id='sensor_6', timestamp=154777324351, temperature=15.4}all> SensorReading{ id='sensor_7', timestamp=154777324350, temperature=6.7}low> SensorReading{ id='sensor_7', timestamp=154777324350, temperature=6.7}union> SensorReading{ id='sensor_1', timestamp=154777324345, temperature=35.8}union> SensorReading{ id='sensor_4', timestamp=154777324348, temperature=38.1}union> SensorReading{ id='sensor_1', timestamp=154777324349, temperature=36.9}union> SensorReading{ id='sensor_1', timestamp=154777324350, temperature=32.9}union> SensorReading{ id='sensor_1', timestamp=154777324351, temperature=37.1}union> SensorReading{ id='sensor_2', timestamp=154777324346, temperature=15.4}union> SensorReading{ id='sensor_3', timestamp=154777324347, temperature=6.7}union> SensorReading{ id='sensor_6', timestamp=154777324351, temperature=15.4}union> SensorReading{ id='sensor_7', timestamp=154777324350, temperature=6.7}(sensor_1,35.8,high temp warning)(sensor_2,normal)(sensor_4,38.1,high temp warning)(sensor_3,normal)(sensor_1,36.9,high temp warning)(sensor_6,normal)(sensor_1,32.9,high temp warning)(sensor_7,normal)(sensor_1,37.1,high temp warning)
转载地址:http://quxii.baihongyu.com/