问答 2021-12-07 来自:开发者社区

flink sql 数据异常导致任务失败怎么办?

kafka source,一条异常数据就会导致任务失败,这种问题怎么解决呢? 以前用DatastreamAPI的话是自己解析,因此可以捕获异常,仅统计非法数据量作为flink的metric指标。 现在是基于flink sql直接基于kakfa创建动态表,查询动态表过程json解析失败,导致任务失败。*来自志愿者整理的flink邮件归档

问答 2021-12-07 来自:开发者社区

flink 1.11 SQL idea调试无数据也无报错是怎么回事?

我遇到个问题,请教一下: 环境 1.11 idea 参考的wuchong大神的demo想把client变成java的,第一个例子 统计每小时的成交量 数据可以读到但是没有输出结果,写入es没反应,后改为print sink 还是没反应 https://github.com/wuchong/flink-sql-demo/tree/v1.11-CN 求助,各位 下面是pom 和代码,以及运行结果 .....

问答 2021-12-07 来自:开发者社区

如何用 Flink SQL 做简单的数据去重?

最近我有一个使用 Flink SQL 做简单的数据去重的需求,想使用 Flink 的 ScalarFunction,通过阅读 API 发现 FunctionContext context 并不支持访问 state。 我准备使用 Guava cache 做,不知道小伙伴有没有更好的建议哈!感谢。 *来自志愿者整理的flink邮件归档

问答 2021-12-07 来自:开发者社区

Flink sql 主动使数据延时一段时间有什么方案

我们有这样一个场景,双流join,一个快流,一个慢流,想让快流等一段时间,目的是能提高join的命中率。 FLink sql有什么方案实现吗?*来自志愿者整理的flink邮件归档

问答 2021-12-06 来自:开发者社区

flink sql 1.11 kafka source with子句使用新参数,下游消费不到数据

根据文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#start-reading-position, 使用新参数创建kafka_table,下游消费不到数据,使用老参数下游可以消费到数据,是不是新参数的方式有坑啊 老参数: streamTableEnv.e.....

问答 2021-12-06 来自:开发者社区

大家有用Flink SQL中的collect函数执行的结果用DataStream后,用什么数据匹配

大家有用Flink SQL中的collect函数执行的结果用DataStream后,用什么数据类型匹配该字段的结果,数据类型的类名和需要的依赖是什么? *来自志愿者整理的flink邮件归档

问答 2021-12-06 来自:开发者社区

关于flink sql往postgres写数据遇到的timestamp问题

项目需求要向 postgres 中插入数据,用 catalog 之后,插入数据貌似需要和数据库表定义完全一致,而且没找到只插入部分字段的写法 在时间转 TIMESTAMP(6) WITH LOCAL TIME ZONE 时报了错,这个格式是 postgres 中的时间戳定义 select cast(localtimestamp as TIMESTAMP(6) WITH LOCAL TIME ZO....

问答 2021-12-06 来自:开发者社区

Flink SQL 1.11支持将数据写入到Hive吗?

看官网介绍是支持的: 但是找对应的连接器是没有Hive,是JDBC? *来自志愿者整理的flink邮件归档

问答 2021-12-06 来自:开发者社区

flink sql cdc 如果只处理一次全量数据问题

之前一直使用streaming api,这两天开始使用sql。 有个疑问,flink sql cdc读取mysql的数据时候,会处理 全量 + 增量数据。 那么如果同一个任务上线后,后续有变化,修改后再次上线,这个时候我并不希望处理之前过的数据。这个时候是怎么做呢? cdc里面有进行state保存消费过的changelog的位置吗?这样我重新上线的时候从savepoint或者checkpo...

问答 2021-12-04 来自:开发者社区

flink sql 不同job消费同一个kafka表(指定了groupId)时输出相同数据?

Hi,all 使用flink版本1.10.0,在hive catalog下建了映射kafka的表: CREATE TABLE x.log.yanfa_log ( dt TIMESTAMP(3), conn_id STRING, sequence STRING, trace_id STRING, span_info STRING, service_id STRING, msg_id STRING,....

本页面内关键词为智能算法引擎基于机器学习所生成,如有任何问题,可在页面下方点击"联系我们"与我们沟通。

产品推荐

数据库

分享数据库前沿,解构实战干货,推动数据库技术变革

+关注
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等

登录插画

登录以查看您的控制台资源

管理云资源
状态一览
快捷访问