Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide Official Mechanism for Skipping 'Bad' Events in Kafka Transport #14664

Open
1 task done
hamishdh opened this issue Feb 19, 2025 · 1 comment
Open
1 task done
Labels
needs triage This issue has not been looked into type: enhancement 🐺

Comments

@hamishdh
Copy link

hamishdh commented Feb 19, 2025

Is there an existing issue that is already proposing this?

  • I have searched the existing issues

Is your feature request related to a problem? Please describe it

When using event-handlers / event-based communication with the Kafka transport, any unhandled errors (and any thrown KafkaRetriableException errors) will result in the offset not being committed (docs), and the consumer will retry processing of that event per the configured KafkaJS retry logic (relevant KafaJS docs).

For my use case, it makes sense that event re-processing should be retried a number of times due to potential transient errors in the event-handler logic - however it presents a problem where a consumer could essentially become 'stuck' on a bad event indefinitely for a non-transient issue. Due to the processing nature of Kafka, any future events produced to the relevant partition / topic after that event would never be processed.

I would like to implement a solution that allows event processing to be retried X times, before the event is ultimately skipped (by having its offset committed). When an event is skipped, I would like to hook into this to perform custom logic, for example publishing it a to a Dead Letter Queue, and logging it to a monitoring platform.

As far as I can tell, Nest.JS / KafkaJS do not have an inbuilt mechanism to cleanly implement this. It appears that the current advised solution is to implement a custom Exception Filter to handle the logic of counting the retry attempts for a given event in memory, and manually committing the offset (see this discord post and this stackoverflow).

Describe the solution you'd like

Due to the complexity in correctly implementing the required logic to safely retry a event a configured number of times, before executing some custom logic and 'skipping' the event - I would like to see an officially provided and recommended reusable solution to this problem.

This could be provided in a number of ways that I've outlined in the next section, however any solutions should preference ease of us and abstraction of underlying Kafka transport internals.

Teachability, documentation, adoption, migration strategy

There are number of ways that this problem could be made easier for developers with various trade-offs. Below are my thoughts, I'm very open to discussion around any of them.

  1. Expose the KafkaJS retry information in the KafkaContext provided to event-handlers

I have observed that when the Kafka consumer restarts due to KafkaJSNumberOfRetriesExceeded - the logged error messages include a retryCount property that indicates that somewhere KafkaJS has tracked the number of failed attempts for a particular offset - see the below excerpt:

ERROR [Consumer] Crash: KafkaJSNumberOfRetriesExceeded: invalid.relation.id {\"timestamp\":\"2025-02-19T11:43:47.417Z\",\"logger\":\"kafkajs\",\"groupId\":\"my-server\",\"retryCount\":5,\"stack\":\"KafkaJSNonRetriableError\n  Caused by: Error: ....

If this information could be included in the KafaContext it would allow developers to implement custom business logic in their @EventPattern consumers such as the following:

@EventPattern('foobar-topic')
async handle(@Payload() data: FoobarData, @Ctx() context: KafkaContext) {
   if (context.getRetryCount() > this.retryLimit) {
      // Do whatever skip event business logic you want here
      await sendToDeadLetterQueue(context.getMessage())
      return // so that the event offset is successfully committed
    }
}

This would allow for relatively simple implementations and custom logic per event-handler, however is not as clean as the aspect-orientated approach of using exception filters.

  1. Provide a global mechanism via the transport configuration for skipping messages

When creating the Kafka transport, a global message skipping configuration hook could be provided in the following way:

{
    transport: Transport.KAFKA,
    options: {
      client: {
        retry: {
          skipAfterRetries: 5,
          skipHandler: (context: KafkaContext) => {
            // Perform required logic here
          }
        }
      }
    }
  }

This has the nice advantage (and arguably disadvantage) of applying to all event-handlers in a service.

  1. Provide an 'official' Exception Filter that can perform required business logic and skip messages

As mentioned in the problem description, this problem can be solved by writing a custom Exception Filter to handle the logic of counting the retry attempts in memory for a given event, and manually committing the offset. This can be built using the existing Nest.JS framework and fits in with established concepts. From the following references (discord, related issue, stackoverflow) it suggests that many developers are rolling their own solutions for this problem (including myself - which lead me to create this issue). This is by no means a 'simple' Exception Filter to write, I am certain that myself and other developers will fall into complicated subtle pitfalls in doing so.

An officially written and maintained KafkaMaxRetryExceptionFilter (perhaps with a better name 😉) that could be easily extended to execute any required middleware logic before automatically committing the offset of the 'bad' event would be beneficial to the community.

Example usage could be simply:

@UseFilters(KafkaMaxRetryExceptionFilter)
export class MyEventHandler {
...

With the Exception Filter itself being configurable via a dynamic module? I'm not exactly sure how this could look, perhaps:

@Module({
  imports: [
    KafkaMaxRetryExceptionFilter.forRoot({
      skipHandler: () => { } // Custom logic here
      maxRetries: 5
    })
  ]
})

Or, perhaps by extending the base class?

@UseFilters(MyKafkaMaxRetryExceptionFilter)
export class MyEventHandler {
...

with an exception filter like:

@Catch()
export class MyKafkaMaxRetryExceptionFilter extends KafkaMaxRetryExceptionFilter {
  catch(exception: unknown, host: ArgumentsHost) {
    // Business logic here

    // Superclass handles event offset committing
    super.catch(exception, host);
  }
}

What is the motivation / use case for changing the behavior?

'Bad' events that cannot be processed are an unfortunate inevitability in a large distributed system with complex business logic and edge cases to consider. Retrying an event a configurable number of times before skipping and moving to a dead letter queue is an established and widely used pattern in event driven architecture, however it is unfortunately difficult to implement in the Nest.JS microservices framework.

This has led multiple developers to implement their own solution to the problem, with tricky to write and complex implementations. My primary motivation behind this issue is to make it easier for future developers to take advantage of this mechanism in their implementations.

If you agree that this is a positive idea - I am happy to help by contributing my time and effort towards that solution.

@hamishdh hamishdh added needs triage This issue has not been looked into type: enhancement 🐺 labels Feb 19, 2025
@hamishdh hamishdh changed the title Improve Kafka Provide Official Mechanism for Skipping 'Bad' Events in Kafka Transport Feb 19, 2025
@kamilmysliwiec
Copy link
Member

Expose the KafkaJS retry information in the KafkaContext provided to event-handlers

Sounds good. PRs are more than welcome

Provide an 'official' Exception Filter that can perform required business logic and skip messages

As soon as retryCount can be retrieved from KafkaContext, implementing the exception filter should be pretty straightforward, no need to export a dedicated class from the package. We could add a sample implementation in the docs though

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
needs triage This issue has not been looked into type: enhancement 🐺
Projects
None yet
Development

No branches or pull requests

2 participants