8、Flink 在 source 处生成水位线 和 在 source 之后生成水位线案例

1、AtSourceGenerateWatermark
注意:从 Flink 1.17开始,FLIP-27 源框架支持拆分级别的水印对齐。

import java.time.Duration;

public class _02_AtSourceGenerateWatermark {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("my-broker")
                .setTopics("my-topic")
                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .build();

        DataStreamSource<String> source = env.fromSource(kafkaSource
                , WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
                , "kafka_source"
                , TypeInformation.of(new TypeHint<String>() {
                }));

        source.print();

        env.execute();
    }
}

2、在 source 之后生成水位线

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

public class _03_AfterSourceGenerateWatermark {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> source = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<_01_MyEvent> eventMap = source.map(new MapFunction<String, _01_MyEvent>() {
            @Override
            public _01_MyEvent map(String value) throws Exception {
                String[] fields = value.split(",");
                return new _01_MyEvent(Integer.parseInt(fields[0]),
                        fields[1],
                        Long.parseLong(fields[2]));
            }
        });

        SingleOutputStreamOperator<_01_MyEvent> timestampsAndWatermarks = eventMap.assignTimestampsAndWatermarks(WatermarkStrategy.<_01_MyEvent>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                .withTimestampAssigner(new SerializableTimestampAssigner<_01_MyEvent>() {
                    @Override
                    public long extractTimestamp(_01_MyEvent element, long recordTimestamp) {
                        return element.getEventTime();
                    }
                }));

        timestampsAndWatermarks.print();

        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/582536.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Eclipse内存分析器 Java内存分析工具MAT(Memory Analyzer Tool)的介绍与使用

1.visualvm实时监测 2.Memory Analyzer Tool打开 3.工具的使用可以参考 Java内存分析工具MAT(Memory Analyzer Tool)的介绍与使用 ------------------------ 1.我远程发现是其中一个客户端A请求服务器页面响应&#xff0c;一直得不到响应&#xff0c;然后客户端A一直请求&am…

js 字符串 第一个斜杠前最后一次出现英文字母的位置并添加自定义值,返回新值

要找到字符串中第一个斜杠&#xff08;/&#xff09;前最后一次英文字母出现的位置&#xff0c;可以使用正则表达式配合lastIndexOf方法。以下是实现这一功能的示例代码&#xff1a; 如果是匹配第一个数字前的字母加值可以看这里 function findLastLetterIndexBeforeSlash(str…

外贸旺季外贸人如何做好时间管理

第1步 记住这些原则 50-30-20原则 你工作日里50%的时间应该花在有益于你长期发展目标的事情上&#xff0c;30%的时间应该用于你完成中期(两年左右)目标的事情&#xff0c;20%的时间用于完成未来90天以内需要完成的任务。 “一个篮子”原则 One Bucket 尽可能减少自己接收新任务…

《QT实用小工具·四十六》多边形窗口

1、概述 源码放在文章末尾 该项目实现了可以移动的多边形窗口&#xff0c;项目demo演示如下所示&#xff1a; 项目部分代码如下所示&#xff1a; #include "polygonwindow.h"#include <QBitmap> #include <QQuickItem> #include <QQmlFile> #in…

JAVASE->数据结构|顺序表底层逻辑

✅作者简介&#xff1a;大家好&#xff0c;我是橘橙黄又青&#xff0c;一个想要与大家共同进步的男人&#x1f609;&#x1f609; &#x1f34e;个人主页&#xff1a;再无B&#xff5e;U&#xff5e;G-CSDN博客 目标&#xff1a; 1. 什么是 List 2. List 常见接口介绍 3. …

第8章 软件工程

一、软件工程概述 &#xff08;一&#xff09;软件危机 1、含义&#xff1a;落后的软件生产方式无法满足迅速增长的计算机软件需求&#xff0c;从而导致软件开发与维护过程中出现一系列严重问题的现象。 2、解决方案&#xff1a;引入软件工程的思想。 &#xff08;二&#x…

Unity 问题之 开发应用在设备上运行闪屏花屏问题的分析处理

Unity 问题之 开发应用在设备上运行闪屏花屏问题的分析处理 目录 Unity 问题之 开发应用在设备上运行闪屏花屏问题的分析处理 一、简单介绍 二、问题现象 三、问题分析 四、使用空后处理&#xff0c;解决闪屏花屏的显示问题 五、空后处理完整代码 一、简单介绍 Unity 在…

国密SSL证书在等保、关保、密评合规建设中的应用

在等保、关保、密评等合规建设中&#xff0c;网络和通信安全方面的建设是非常重要的部分&#xff0c;需要实现加密保护和安全认证&#xff0c;确保传输数据机密性、完整性以及通信主体可信认证。国密SSL证书应用于等保、关保和密评合规建设中&#xff0c;不仅能够提升网络信息系…

API接口调用失败的常见原因?如何进行排查和处理?

API接口调用失败的常见原因有以下几种&#xff1a; 1. 无效的请求参数&#xff1a;可能是由于请求参数缺失、格式错误或者不符合接口要求导致的。解决方法是检查请求参数是否正确&#xff0c;并确保按照接口文档提供正确的参数。 2. 接口权限不足&#xff1a;有些接口需要特定…

JAVA自定义日期选择器

下载jar地址&#xff0c; https://toedter.com/jcalendar/ jar包下载地址 依赖包如下图所示&#xff1a; 整个项目代码已经上传到CSDN https://download.csdn.net/download/qq_30273575/89241601?ydrefereraHR0cHM6Ly9tcC5jc2RuLm5ldC9tcF9kb3dubG9hZC9tYW5hZ2UvZG93bmxvYWQ…

Swift-31-泛型和类型操作

泛型 Swift泛型(generics) 让我们写出的类型和函数可以使用对于我们或编译器都未知的类型。 很多内建类型(包括可空类型、数组和字典)都是用泛型实现的&#xff0c;比如数组和一些集合就是用泛型方式来实现的。 一种运行时进行类型检查的技术&#xff0c;效率高但是不安全。在…

Java零基础入门到精通_Day 8

1.API 应用程序接口 Java API:指的就是JDK 中提供的各种功能的Java类这些类将底层的实现封装了起来&#xff0c;我们不需要关心这些类是如何实现的&#xff0c;只需要学习这些类如何使用即可&#xff0c;我们可以通过帮助文档来学习这些API如何使用。 2. String String 类…

记录-执行Grad-CAM所遇问题

在执行Grad-CAM所遇问题 1&#xff09; 修改后解决 2&#xff09; 修改后解决&#xff0c;因为numpy需要在cpu上进行&#xff0c;所有需要加上.cpu() 3&#xff09;plt.matshow(heatmap)出错 原因是get_heatmap()中的mean_gradients torch.mean(gradients, dim[0, 2, 3]…

Spring IOC(一)

1. Spring IOC入门 1.1 什么是Spring IoC IoC&#xff08;Inversion of Control&#xff09;&#xff0c;即控制反转&#xff0c;是一种设计原则。简单来说&#xff0c;IoC就是将程序的某种传统控制流程反转了。 在Spring框架中&#xff0c;控制反转体现在对象的创建和管理上。…

面试:Redis(缓存穿透、缓存击穿、缓存雪崩、双写一致、Redis的持久化、Redis的过期策略、Redis的数据淘汰策略、Redis的分布式锁、Redis的集群方案、Redis网络模型)

目录 一、缓存穿透 1、解决方案一&#xff1a; 2、解决方案二&#xff1a; 二、缓存击穿 1、解决方案一&#xff1a; 2、解决方案二&#xff1a; 三、缓存雪崩 1、解决方案一&#xff1a; 2、解决方案二&#xff1a; 3、解决方案三&#xff1a; 4、解决方案四&#…

扭蛋机小程序带来了什么优势?扭蛋机收益攻略

在当下的潮流消费时代&#xff0c;人们对潮玩也日益个性化&#xff0c;扭蛋机作为一种新型的娱乐消费模式&#xff0c;深受大众喜爱。扭蛋机的价格低&#xff0c;各个年龄层的玩家都可以进行购买&#xff0c;潜在玩家量非常大。扭蛋机商品主打热门IP周边等&#xff0c;种类繁多…

Leetcode-面试题 02.02. 返回倒数第 k 个节点

目录 题目 图解 代码 面试题 02.02. 返回倒数第 k 个节点 - 力扣&#xff08;LeetCode&#xff09;https://leetcode.cn/problems/kth-node-from-end-of-list-lcci/description/ 题目 实现一种算法&#xff0c;找出单向链表中倒数第 k 个节点。返回该节点的值。 注意&…

Q1季度方便速食行业线上市场(京东天猫淘宝)销售数据分析

方便食品行业作为快速消费品市场的重要组成部分&#xff0c;近几年表现出较为强劲的发展势头。当然&#xff0c;每年的食品安全问题也在一定程度上影响着市场的良性健康发展。那么&#xff0c;今年Q1季度方便食品的线上发展如何&#xff1f; 根据鲸参谋数据显示&#xff0c;Q1…

python程序设计语言超详细知识总结

Python 首先 python 并不是简单&#xff0c;什么语言都有基础和高级之分&#xff0c;要想掌握一门语言&#xff0c;必须把高级部分掌握才行。 HelloWorld helloWorld.py print(hello, world)数据类型与变量 变量的数据类型数据类型描述变量的定义方式整数型 (int)整数&…

【Java EE】MyBatis 入门

文章目录 &#x1f340;什么是MyBatis?&#x1f332;如何使用MyBatis&#x1f338;引人Mybatis的相关依赖&#x1f338;配置Mybatis(数据库连接信息)&#x1f338;编写SQL语句(注解/XML)&#x1f338;单元测试 &#x1f333;打印日志 &#x1f340;什么是MyBatis? MyBatis是…
最新文章