Skip to content

Martini Invoking Services via a Kafka Trigger

The Kafka Trigger enables Martini to subscribe to Kafka topics and invokes a specified service when messages are received. This integration facilitates real-time data processing and event-driven architectures.

Configuration

Configuration of the Kafka Trigger is divided into Martini-specific and Kafka-specific properties, detailed below.

Kafka-Specific Configuration

Connection Properties

  • Bootstrap Servers: The list of host-port pairs for establishing the initial connection to the Kafka cluster. Defaults to localhost:9092.
  • Topics: The list of Kafka topics to subscribe to.
  • Group ID: Identifies the consumer group for this trigger.

Deserialization Properties

  • Key Deserializer: Specifies the deserializer for the message key. Options include byte, byte_buffer, double, integer, long, string, json, xml, or the fully qualified name of a class implementing org.apache.kafka.common.serialization.Deserializer.
  • Value Deserializer: Specifies the deserializer for the message value, with options similar to the key deserializer.

Commit Strategy Configuration

Name Default Choices Description
Enable Auto Commit true true, false Auto-commit offsets in the background.
Auto Commit Interval (ms) 5000 Frequency in milliseconds for auto-commit.
Acknowledgement Mode BATCH RECORD, BATCH, TIME, COUNT, COUNT_TIME, MANUAL, MANUAL_IMMEDIATE Strategy for offset commit.
Acknowledgement Count 1 0,... Number of messages after which offsets should be committed.
Acknowledgement Time (ms) 1 0,... Time after which offsets should be committed.
Acknowledgement On Error false true, false Whether to commit offsets when exceptions occur.

Data Configuration

Name Default Choices Description
Fetch Max Bytes 52428800 0,... Maximum amount of data the server should return for a fetch request.
Fetch Min Bytes 1 0,... Minimum amount of data the server should return for a fetch request.
Max Partition Fetch Bytes 1048576 0,... Maximum amount of data per-partition the server will return.
Receive Buffer Bytes 65536 0,... TCP receive buffer size.
Send Buffer Bytes 131072 0,... TCP send buffer size.

Metrics Configuration

Configure metrics reporting for monitoring and debugging Kafka triggers.

Name Default Choices Description
Reporters Comma-separated list of class names to be used as metrics reporters. Each class must implement MetricsReporter.
Recording Level Info Info, Debug Specifies the highest level of metrics recording.
Number of Samples 2 0,... The number of samples to maintain for metrics computation.
Sample Window (ms) 30000 0,... The time window over which the metrics samples are computed.

Networking Configuration

Adjust Kafka's networking behavior to optimize performance and reliability.

Name Default Choices Description
Isolation Level Read Uncommitted Read Uncommitted, Read Committed Determines how transactionally written messages are read.
Connections Max Idle (ms) 540000 0,... The maximum amount of time a connection can remain idle before being closed.
Fetch Max Wait (ms) 500 0,... The maximum time to wait for a fetch response.
Heartbeat Interval (ms) 3000 0,... The expected interval between heartbeats to the consumer coordinator for group management.
Max Poll Interval (ms) 300000 0,... The maximum delay between invocations of poll when using consumer group management.
Max Poll Records 500 0,... The maximum number of records returned in a single call to poll.
Monitor Interval (ms) 30000 0,... Interval for checking non-responsive listeners.
Poll Timeout (ms) 1000 0,... The maximum time to block waiting for messages.
Request Timeout (ms) 305000 0,... The maximum amount of time the client will wait for the response of a request.
Session Timeout (ms) 10000 0,... The timeout used to detect consumer failures when using Kafka's group management facility.
Shutdown Timeout (ms) 10000 0,... The timeout for shutting down the Kafka trigger listener.
Batch false true, false If true, the registered service is invoked with each batch of messages. If false, it's invoked for each message.

Resiliency Configuration

Fine-tune how the trigger handles connection issues and offsets with Kafka.

Name Default Choices Description
Auto Offset Reset Latest Latest, Earliest, None Defines the action when no initial offset is found or if the current offset does not exist anymore.
Reconnect Backoff (ms) 50 0,... The time to wait before attempting to reconnect to a broker after a connection failure.
Reconnect Backoff Max (ms) 1000 0,... The maximum time to wait before attempting reconnection to a broker.
Retry Backoff (ms) 100 0,... The time to wait before retrying a failed request to a topic partition.

SASL Configuration

Secure your Kafka trigger connections with SASL authentication.

Name Default Choices Description
JAAS Config JAAS login context parameters for SASL connections.
Kerberos Kinit Command /usr/bin/kinit The path to the Kerberos kinit command.
Kerberos Service Name The Kerberos principal name Kafka runs as.
Mechanism GSSAPI The SASL mechanism for client connections.
Kerberos Minimum Time Before Login (ms) 60000 0,... The sleep time for the login thread between refresh attempts.
Kerberos Ticket Renew Jitter 0.05 0.00-1.00 The jitter added to the renewal time.
Kerberos Ticket Renew Window Factor 0.80 0,... The factor of the ticket's lifetime before a renewal attempt.

SSL Configuration

Configure SSL for encrypted communication between Martini and Kafka.

Name Default Description
Cipher Suites A list of cipher suites for SSL connections.
Enabled Protocols TLSv1,TLSv1.2,TLSv1.1 Protocols enabled for SSL connections.
Endpoint Identification Algorithm The algorithm for server hostname verification.
Keymanager Algorithm SunX509 The key manager factory algorithm for SSL.
Key Password The password of the private key in the key store.
Keystore Location The location of the key store file.
Keystore Password The password for the key store file.
Keystore Type JKS The file format of the key store.
Protocol TLS The SSL protocol for generating the SSLContext.
Provider The security provider for SSL connections.
Secure Random Implementation The SecureRandom implementation for SSL cryptography.
Trustmanager Algorithm PKIX The trust manager factory algorithm for SSL.
Truststore Location The location of the trust store file.
Truststore Password The password for the trust store file.
Truststore Type JKS The file format of the trust store.

Advanced Configuration

Tailor advanced Kafka settings to optimize performance and behavior.

Name Default Choices Description
Partition Assignment Strategies Class names for partition distribution strategies. Must be in the classpath.
Interceptor Classes Fully qualified class names for interceptors. Classes must implement ConsumerInterceptor. Must be in the classpath.
Client ID An identifier for the client in server-side request logging.
Security Protocol PLAINTEXT PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL The protocol used to communicate with brokers.
Metadata Max Age (ms) 300000 0,... The

Incorporating the details about service parameters specific to the Kafka trigger, here's how the section could be structured within the documentation:

Service Invocation Parameters

When a Kafka trigger invokes a service, it passes specific parameters to the service, enabling it to process the messages received from Kafka. Below are the parameters specific to Kafka listener triggers:

Kafka Listener-Specific Parameters

Name Type Description
messages java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord> A list containing the key-value pairs received from Kafka. Each item in the list represents a single Kafka message, encapsulating both the message's key and value.
consumer org.apache.kafka.clients.consumer.Consumer The Kafka consumer client instance that fetched the messages. This parameter allows the service to access consumer-specific functionalities, such as manual commit of offsets, if required.
ack org.springframework.kafka.support.Acknowledgment Provided when Enable Auto Commit is set to false and Acknowledgment Mode is MANUAL or MANUAL_IMMEDIATE. The service must explicitly call ack.acknowledge() to manually commit the message offset. If auto-commit is enabled or not required, this parameter can be disregarded.

Handling Deserialization Failures

Messages that fail deserialization are encapsulated in an io.toro.martini.endpoint.DeserializationException object. It is crucial for services to check if the message key or value is an instance of DeserializationException to properly handle deserialization failures. This approach ensures that your service can gracefully manage errors and implement logic to retry or log failures as appropriate.

1
2
3
if (message.getKey() instanceof DeserializationException || message.getValue() instanceof DeserializationException) {
    // Handle the deserialization failure
}

This setup ensures that your Kafka-triggered services are robust and capable of handling various scenarios, including deserialization failures, efficiently.