Web开发

首页 » 常识 » 问答 » 通过ApacheKafka中的死信队列进
TUhjnbcbe - 2022/11/12 20:41:00
白癜风的前期症状 http://m.39.net/baidianfeng/a_4342820.html

ApacheKafka中用于错误处理的死信队列:来自Uber和Crowdstrike的替代方案、最佳实践和案例研究。

识别和处理错误对于任何可靠的数据流管道都是必不可少的。这篇博文探讨了在ApacheKafka基础架构中使用死信队列实现错误处理的最佳实践。这些选项包括自定义实现、KafkaStreams、KafkaConnect、Spring框架和并行消费者。真实案例研究展示了Uber、CrowdStrike和桑坦德银行如何以极端规模构建可靠的实时错误处理。

ApacheKafka成为许多企业架构最喜欢的集成中间件。即使对于云优先战略,企业也可以利用Kafka的数据流作为云原生集成平台即服务(iPaaS)。

ApacheKafka数据流中的消息队列模式

在我开始这篇文章之前,我想让你知道这个内容是关于“JMS、消息队列和ApacheKafka”的博客系列的一部分:

JMS消息代理与ApacheKafka数据流的10个比较标准这篇文章–通过ApacheKafka中的死信队列(DQL)进行错误处理的替代方法使用ApacheKafka实现请求-回复模式即将推出——用于选择正确消息系统的决策树(JMS与ApacheKafka)即将推出——从JMS消息代理到ApacheKafka:集成、迁移和/或替换

什么是死信队列集成模式(在ApacheKafka中)?

死信队列(DLQ)是消息系统或数据流平台内的一种服务实现,用于存储未成功处理的消息。系统不是被动地转储消息,而是将其移动到死信队列。

企业集成模式(EIP)改为调用设计模式死信通道。我们可以将两者用作同义词。

本文重点介绍数据流平台ApacheKafka。在Kafka中将消息放入DLQ的主要原因通常是消息格式错误或消息内容无效/缺失。例如,如果预期值是整数,但生产者发送了字符串,则会发生应用程序错误。在更动态的环境中,“主题不存在”异常可能是无法传递消息的另一个错误。

因此,通常不要使用现有中间件经验中的知识。MessageQueue中间件(如符合JMS的IBMMQ、TIBCOEMS或RabbitMQ)与分布式提交日志(如Kafka)的工作方式不同。由于许多其他原因,消息队列中的DLQ用于消息队列系统,这些原因不能一对一地映射到Kafka。例如,MQ系统中的消息由于每条消息的TTL(生存时间)而过期。

因此,在Kafka中将消息放入DLQ的主要原因是消息格式错误或消息内容无效/缺失。

ApacheKafka中死信队列的替代方案

Kafka中的死信队列是一个或多个Kafka主题,它们接收和存储由于错误而无法在另一个流管道中处理的消息。此概念允许使用以下传入消息继续消息流,而不会由于无效消息的错误而停止工作流。

KafkaBroker很笨——智能端点提供错误处理

Kafka架构不支持brokerr中的DLQ。有意地,Kafka建立在与现代微服务相同的原则上,使用“哑管道和智能端点”原则。这就是为什么与传统消息代理相比,Kafka的扩展性如此之好。过滤和错误处理发生在客户端应用程序中。

数据流平台的真正解耦可以实现更干净的领域驱动设计。每个微服务或应用程序都通过自己选择的技术、通信范式和错误处理来实现其逻辑。

在传统的中间件和消息队列中,代理提供了这种逻辑。结果是域中的可扩展性和灵活性较差,因为只有中间件团队才能实现集成逻辑。

用任何编程语言自定义实现Kafka死信队列

Kafka中的死信队列独立于您使用的框架。一些组件为错误处理和死信队列提供了开箱即用的功能。但是,使用Java、Go、C++、Python等任何编程语言为Kafka应用程序编写死信队列逻辑也很容易。

死信队列实现的源代码包含一个try-catch块来处理预期或意外异常。如果没有发生错误,则处理该消息。如果发生任何异常,请将消息发送到专用的DLQKafka主题。

失败原因应添加到Kafka消息的标头中。不应更改键和值,以便将来对历史事件进行重新处理和故障分析。

死信队列的开箱即用Kafka实现

你并不总是需要实现你的死信队列。许多组件和框架已经提供了它们的DLQ实现。

使用您自己的应用程序,您通常可以控制错误或在出现错误时修复代码。但是,与rd方应用程序的集成并不一定允许您处理可能跨集成障碍引入的错误。因此,DLQ变得更加重要,并被包含在某些框架中。

KafkaConnect内置死信队列

KafkaConnect是Kafka的集成框架。它包含在开源Kafka下载中。不需要其他依赖项(除了您部署到Connect集群中的连接器本身)。

默认情况下,如果由于使用无效消息而发生错误(例如使用错误的JSON转换器而不是正确的AVRO转换器时),KafkaConnect任务将停止。删除无效消息是另一种选择。后者容忍错误。

KafkaConnect中DLQ的配置很简单。只需将两个配置选项errors.tolerance和errors.deadletterqueue.topic.name的值设置为正确的值:

博客文章“KafkaConnectDeepDive–错误处理和死信队列”显示了使用DLQ的详细动手代码示例。

KafkaConnect甚至可以用于处理DLQ中的错误消息。只需部署另一个使用teDLQ主题的连接器。例如,如果您的应用程序处理Avro消息并且传入消息是JSON格式。然后连接器使用JSON消息并将其转换为AVRO消息以成功重新处理:

请注意,KafkaConnect没有用于源连接器的死信队列。

KafkaStreams应用程序中的错误处理

KafkaStreams是Kafka的流处理库。它可与其他流式传输框架相媲美,例如ApacheFlink、Storm、Beam和类似工具。但是,它是Kafka原生的。这意味着您可以在单个可扩展且可靠的基础架构中构建完整的端到端数据流。

如果您分别使用Java(JVM生态系统)来构建Kafka应用程序,建议几乎总是使用KafkaStreams而不是Kafka的标准Java客户端。为什么?

KafkaStreams“只是”一个围绕常规Java生产者和消费者API的包装器,以及许多内置的附加功能。两者都只是嵌入到Java应用程序中的库(JAR文件)。两者都是开源Kafka下载的一部分-没有额外的依赖项或许可证更改。许多问题已经开箱即用地解决,以构建成熟的流处理服务(流功能、有状态的嵌入式存储、滑动窗口、交互式查询、错误处理等等)。

KafkaStreams的内置功能之一是默认的反序列化异常处理程序。它允许您管理无法反序列化的记录异常。损坏的数据、不正确的序列化逻辑或未处理的记录类型都可能导致错误。该功能不称为死信队列,但开箱即用地解决了相同的问题。

SpringKafka和SpringCloudStream的错误处理

Spring框架对ApacheKafka有很好的支持。它提供了许多模板以避免自己编写样板代码。Spring-Kafka和SpringCloudStreamKafka支持各种重试和错误处理选项,包括基于时间/计数的重试、死信队列等。

尽管Spring框架功能非常丰富,但它有点重,并且有一个学习曲线。因此,它非常适合新建项目,或者如果您已经将Spring用于其他场景的项目。

有很多很棒的博客文章展示了不同的示例和配置选项。还有用于死信队列的官方SpringCloudStream示例。Spring允许使用简单的注释构建逻辑,例如DLQ。这种编程方法是一些开发人员钟爱的范例,而另一些则不喜欢它。只需了解选项并为自己选择合适的选项即可。

ApacheKafka并行消费者的可扩展处理和错误处理

在许多客户对话中,事实证明,请求死信队列的主要原因通常是处理连接到外部Web服务或数据库的失败。超时或Kafka无法并行发送各种请求会导致某些应用程序瘫痪。这个问题有一个很好的解决方案:

ApacheKafka的并行消费者是Apache2.0许可下的开源项目。它提供了一个带有客户端队列的并行ApacheKafka客户端包装器、一个具有关键并发性的更简单的消费者/生产者API,以及可扩展的非阻塞IO处理。

该库允许您通过单个KafkaConsumer并行处理消息,这意味着您可以在不增加要处理的主题中的分区数量的情况下增加KafkaConsumer并行度。对于许多用例,这通过减少Kafka代理的负载来提高吞吐量和延迟。它还开辟了新的用例,例如极端并行性、外部数据丰富和排队。

一个关键特性是在单个Kafka消费者应用程序中处理/重复Web服务和数据库调用。并行化避免了一次发送单个Web请求的需要:

ParallelConsumer客户端具有强大的重试逻辑。这包括可配置的延迟和动态错误或处理。错误也可以发送到死信队列。

使用死信队列中的消息

将错误发送到死信队列后,您还没有完成!坏消息需要被处理或至少被监控!

死信队列是从事件处理中带外处理数据错误处理的绝佳方式,这意味着错误处理程序可以与事件处理代码分开创建或演变。

存在大量使用死信队列的错误处理策略。DO和DONT探索最佳实践和经验教训。

错误处理策略

有几个选项可用于处理存储在死信队列中的消息:

重新处理:DLQ中的一些消息需要重新处理。但是,首先,需要解决这个问题。解决方案可以是自动脚本、编辑消息的人工交互,或向生产者返回错误,要求重新发送(更正的)消息。删除错误消息(经过进一步分析):根据您的设置,可能会出现错误消息。但是,在删除它们之前,业务流程应该检查它们。例如,仪表板应用程序可以使用错误消息并将它们可视化。高级分析:另一种选择是分析传入数据以获取实时洞察或问题,而不是处理DLQ中的每条消息。例如,一个简单的ksqlDB应用程序可以应用流处理进行计算,例如每小时错误消息的平均数量或任何其他有助于确定Kafka应用程序中的错误的见解。停止工作流:如果很少会出现坏消息,结果可能是停止整个业务流程。该动作可以是自动的,也可以由人决定。当然,停止工作流也可以在抛出错误的Kafka应用程序中完成。如果需要,DLQ将问题和决策外部化。忽略:这听起来可能是最糟糕的选择。只是让死信队列填满,什么都不做。然而,即使这样在某些用例中也很好,比如监控Kafka应用程序的整体行为。请记住,Kafka主题具有保留时间,并且在该时间之后从主题中删除消息。只需为您设置正确的方式即可。并监控DQL主题是否存在意外行为(例如填充太快)。

ApacheKafka中死信队列的最佳实践

以下是在Kafka应用程序中使用死信队列进行错误处理的一些最佳实践和经验教训:

定义处理无效消息的业务流程(自动与人工)现实:通常,根本没有人处理DLQ消息备选方案1:数据所有者需要接收警报,而不仅仅是基础架构团队备选方案2:警报应通知记录团队系统数据错误,他们将需要从记录系统重新发送/修复数据。如果没有人关心或抱怨,请考虑质疑和审查DLQ存在的必要性。相反,这些消息也可以在初始Kafka应用程序中被忽略。这节省了大量的网络负载、基础设施和资金。构建带有适当警报的仪表板并整合相关团队(例如,通过电子邮件或Slack警报)定义每个Kafka主题的错误处理优先级(停止、删除和重新处理)仅将不可重试的错误消息推送到DLQ-连接问题是消费者应用程序的责任。保留原始消息并将它们存储在DLQ中(带有额外的标头,例如错误消息、错误时间、发生错误的应用程序名称等)——这使得重新处理和故障排除变得更加容易。想想你需要多少DeadLetterQueueKafka主题。总是有取舍。但是将所有错误存储在单个DLQ中可能对进一步分析和重新处理没有意义。

请记住,DLQ会以有保证的顺序终止处理,并使任何类型的离线处理变得更加困难。因此,KafkaDQL并不适合每个用例。

何时不在Kafka中使用死信队列?

让我们探讨一下不应该将哪些类型的消息放入Kafka的死信队列中:

DLQ用于背压处理?由于大量消息的峰值而使用DLQ进行节流并不是一个好主意。Kafka日志背后的存储会自动处理背压。消费者以它可以按自己的速度获取数据的方式提取数据(或者配置错误)。如果可能的话,弹性地扩展消费者。即使您的存储空间已满,DLQ也无济于事。这是它的问题,与是否使用DLQ无关。连接失败的DLQ?由于连接失败而将消息放入DQL无济于事(即使在多次重试之后)。以下消息也无法连接到该系统。您需要解决连接问题。消息可以根据需要存储在常规主题中(取决于保留时间)。

用于数据治理和错误预防的模式注册表

最后但同样重要的是,让我们探讨在某些情况下减少甚至消除对死信队列的需求的可能性。

卡夫卡的SchemaRegistry是一种确保数据清理以防止生产者在负载中出错的方法。它在Kafka生产者中强制执行正确的消息结构:

模式注册表是模式的客户端检查。ConfluentServer等一些实现在代理端提供了额外的模式检查,以拒绝来自未使用模式注册表的生产者的无效或恶意消息。

Kafka死信队列的案例研究

让我们看看Uber、CrowdStrike和SantanderBank的三个案例研究,它们在Kafka基础设施中实际部署死信队列。请记住,这些都是非常成熟的例子。不是每个项目都需要那么复杂。

Uber-构建可靠的再处理和死信队列

在分布式系统中,重试是不可避免的。从网络错误到复制问题,甚至下游依赖关系的中断,大规模运行的服务必须准备好尽可能优雅地遇到、识别和处理故障。

鉴于Uber的运营范围和速度,它的系统必须具有容错能力,并且在智能失败时毫不妥协。Uber将ApacheKafka用于各种极端规模的用例以实现这一目标。

利用这些特性,Uber保险工程团队扩展了Kafka在其现有事件驱动架构中的作用,通过使用n个阻塞请求重新处理和死信队列来实现解耦、可观察的错误处理,而不会中断实时流量。该策略有助于他们选择加入的驾驶员伤害保护计划在多个城市可靠运行,并为注册驾驶员扣除每次行程的每英里保费。

这是Uber错误处理的示例。错误会降低重试主题的级别,直到登陆DLQ:

有关更多信息,请阅读Uber非常详细的技术文章:“使用ApacheKafka构建可靠的再处理和死信队列”。

CrowdStrike-处理数万亿事件的错误

CrowdStrike是一家位于德克萨斯州奥斯汀的网络安全技术公司。它提供云工作负载和端点安全、威胁情报和网络攻击响应服务。

CrowdStrike的基础设施每天使用ApacheKafka处理数万亿个事件。在我的“ApacheKaka网络安全博客系列”中,我介绍了以任何规模实时创建态势感知和威胁情报的相关用例。

CrowdStrike定义了三个最佳实践来成功实现死信队列和错误处理:

在正确的系统中存储错误消息:定义基础设施和代码以捕获和检索死信。CrowdStrike使用S对象存储来存储潜在的大量错误消息。请注意,Kafka的分层存储开箱即用地解决了这个问题,无需其他存储接口(例如,利用ConfluentCloud中的无限存储)。使用自动化:放置工具以使修复万无一失,因为手动完成错误处理可能非常容易出错。记录业务流程并聘请相关团队:标准化和记录流程以确保易于使用。并非所有工程师都熟悉组织处理死信消息的策略。

在像CrowdStrike这样的网络安全平台中,大规模实时数据处理至关重要。此要求也适用于错误处理。下一次网络攻击可能是故意包含不适当或无效内容的恶意消息(如JavaScript漏洞利用)。因此,必须通过死信队列实时处理错误。

桑坦德银行-用于重试和DLQ组合的邮箱2.0

桑坦德银行在邮箱应用程序中处理海量数据的同步数据处理面临巨大挑战。他们重新架构了他们的基础架构并构建了一个解耦且可扩展的架构,称为“SantanderMailbox2.0”。

Santander的工作负载并转移到由ApacheKafka提供支持的事件溯源:

新的基于异步事件的架构中的一个关键挑战是错误处理。Santander使用重试和DQLKafka主题构建的错误处理解决了这些问题:

查看来自Santander的集成合作伙伴Consdata的Kafka峰会演讲“基于重试策略和死信主题的ApacheKafka中的可靠事件传递”中的详细信息。

ApacheKafka中可靠且可扩展的错误处理

错误处理对于构建可靠的数据流管道和平台至关重要。存在不同的替代方案来解决这个问题。该解决方案包括死信队列的自定义实现或利用正在使用的框架,例如KafkaStreams、KafkaConnect、Spring框架或Kafka的并行消费者。

优步、CrowdStrike和桑坦德银行的案例研究表明,错误处理并不总是很容易实现。当您设计新的应用程序或架构时,需要从一开始就考虑到这一点。使用ApacheKafka进行实时数据流传输很有吸引力,但只有在您能够处理意外行为时才能成功。死信队列是许多场景的绝佳选择。

1
查看完整版本: 通过ApacheKafka中的死信队列进