Martini Invoking Services via a Kafka Trigger
The Kafka Trigger enables Martini to subscribe to Kafka topics and invokes a specified service when messages are received. This integration facilitates real-time data processing and event-driven architectures.
Configuration
Configuration of the Kafka Trigger is divided into Martini-specific and Kafka-specific properties, detailed below.
Kafka-Specific Configuration
Connection Properties
- Bootstrap Servers: The list of host-port pairs for establishing the initial connection to the Kafka cluster. Defaults to
localhost:9092
. - Topics: The list of Kafka topics to subscribe to.
- Group ID: Identifies the consumer group for this trigger.
Deserialization Properties
- Key Deserializer: Specifies the deserializer for the message key. Options include
byte
,byte_buffer
,double
,integer
,long
,string
,json
,xml
, or the fully qualified name of a class implementingorg.apache.kafka.common.serialization.Deserializer
. - Value Deserializer: Specifies the deserializer for the message value, with options similar to the key deserializer.
Commit Strategy Configuration
Name | Default | Choices | Description |
---|---|---|---|
Enable Auto Commit | true |
true , false |
Auto-commit offsets in the background. |
Auto Commit Interval (ms) | 5000 |
Frequency in milliseconds for auto-commit. | |
Acknowledgement Mode | BATCH |
RECORD , BATCH , TIME , COUNT , COUNT_TIME , MANUAL , MANUAL_IMMEDIATE |
Strategy for offset commit. |
Acknowledgement Count | 1 |
0 ,... |
Number of messages after which offsets should be committed. |
Acknowledgement Time (ms) | 1 |
0 ,... |
Time after which offsets should be committed. |
Acknowledgement On Error | false |
true , false |
Whether to commit offsets when exceptions occur. |
Data Configuration
Name | Default | Choices | Description |
---|---|---|---|
Fetch Max Bytes | 52428800 |
0 ,... |
Maximum amount of data the server should return for a fetch request. |
Fetch Min Bytes | 1 |
0 ,... |
Minimum amount of data the server should return for a fetch request. |
Max Partition Fetch Bytes | 1048576 |
0 ,... |
Maximum amount of data per-partition the server will return. |
Receive Buffer Bytes | 65536 |
0 ,... |
TCP receive buffer size. |
Send Buffer Bytes | 131072 |
0 ,... |
TCP send buffer size. |
Metrics Configuration
Configure metrics reporting for monitoring and debugging Kafka triggers.
Name | Default | Choices | Description |
---|---|---|---|
Reporters | Comma-separated list of class names to be used as metrics reporters. Each class must implement MetricsReporter . |
||
Recording Level | Info |
Info , Debug |
Specifies the highest level of metrics recording. |
Number of Samples | 2 |
0 ,... |
The number of samples to maintain for metrics computation. |
Sample Window (ms) | 30000 |
0 ,... |
The time window over which the metrics samples are computed. |
Networking Configuration
Adjust Kafka's networking behavior to optimize performance and reliability.
Name | Default | Choices | Description |
---|---|---|---|
Isolation Level | Read Uncommitted |
Read Uncommitted , Read Committed |
Determines how transactionally written messages are read. |
Connections Max Idle (ms) | 540000 |
0 ,... |
The maximum amount of time a connection can remain idle before being closed. |
Fetch Max Wait (ms) | 500 |
0 ,... |
The maximum time to wait for a fetch response. |
Heartbeat Interval (ms) | 3000 |
0 ,... |
The expected interval between heartbeats to the consumer coordinator for group management. |
Max Poll Interval (ms) | 300000 |
0 ,... |
The maximum delay between invocations of poll when using consumer group management. |
Max Poll Records | 500 |
0 ,... |
The maximum number of records returned in a single call to poll. |
Monitor Interval (ms) | 30000 |
0 ,... |
Interval for checking non-responsive listeners. |
Poll Timeout (ms) | 1000 |
0 ,... |
The maximum time to block waiting for messages. |
Request Timeout (ms) | 305000 |
0 ,... |
The maximum amount of time the client will wait for the response of a request. |
Session Timeout (ms) | 10000 |
0 ,... |
The timeout used to detect consumer failures when using Kafka's group management facility. |
Shutdown Timeout (ms) | 10000 |
0 ,... |
The timeout for shutting down the Kafka trigger listener. |
Batch | false |
true , false |
If true , the registered service is invoked with each batch of messages. If false , it's invoked for each message. |
Resiliency Configuration
Fine-tune how the trigger handles connection issues and offsets with Kafka.
Name | Default | Choices | Description |
---|---|---|---|
Auto Offset Reset | Latest |
Latest , Earliest , None |
Defines the action when no initial offset is found or if the current offset does not exist anymore. |
Reconnect Backoff (ms) | 50 |
0 ,... |
The time to wait before attempting to reconnect to a broker after a connection failure. |
Reconnect Backoff Max (ms) | 1000 |
0 ,... |
The maximum time to wait before attempting reconnection to a broker. |
Retry Backoff (ms) | 100 |
0 ,... |
The time to wait before retrying a failed request to a topic partition. |
SASL Configuration
Secure your Kafka trigger connections with SASL authentication.
Name | Default | Choices | Description |
---|---|---|---|
JAAS Config | JAAS login context parameters for SASL connections. | ||
Kerberos Kinit Command | /usr/bin/kinit |
The path to the Kerberos kinit command. |
|
Kerberos Service Name | The Kerberos principal name Kafka runs as. | ||
Mechanism | GSSAPI |
The SASL mechanism for client connections. | |
Kerberos Minimum Time Before Login (ms) | 60000 |
0 ,... |
The sleep time for the login thread between refresh attempts. |
Kerberos Ticket Renew Jitter | 0.05 |
0.00-1.00 |
The jitter added to the renewal time. |
Kerberos Ticket Renew Window Factor | 0.80 |
0 ,... |
The factor of the ticket's lifetime before a renewal attempt. |
SSL Configuration
Configure SSL for encrypted communication between Martini and Kafka.
Name | Default | Description |
---|---|---|
Cipher Suites | A list of cipher suites for SSL connections. | |
Enabled Protocols | TLSv1,TLSv1.2,TLSv1.1 |
Protocols enabled for SSL connections. |
Endpoint Identification Algorithm | The algorithm for server hostname verification. | |
Keymanager Algorithm | SunX509 |
The key manager factory algorithm for SSL. |
Key Password | The password of the private key in the key store. | |
Keystore Location | The location of the key store file. | |
Keystore Password | The password for the key store file. | |
Keystore Type | JKS |
The file format of the key store. |
Protocol | TLS |
The SSL protocol for generating the SSLContext. |
Provider | The security provider for SSL connections. | |
Secure Random Implementation | The SecureRandom implementation for SSL cryptography. | |
Trustmanager Algorithm | PKIX |
The trust manager factory algorithm for SSL. |
Truststore Location | The location of the trust store file. | |
Truststore Password | The password for the trust store file. | |
Truststore Type | JKS |
The file format of the trust store. |
Advanced Configuration
Tailor advanced Kafka settings to optimize performance and behavior.
Name | Default | Choices | Description |
---|---|---|---|
Partition Assignment Strategies | Class names for partition distribution strategies. Must be in the classpath. | ||
Interceptor Classes | Fully qualified class names for interceptors. Classes must implement ConsumerInterceptor . Must be in the classpath. |
||
Client ID | An identifier for the client in server-side request logging. | ||
Security Protocol | PLAINTEXT |
PLAINTEXT , SSL , SASL_PLAINTEXT , SASL_SSL |
The protocol used to communicate with brokers. |
Metadata Max Age (ms) | 300000 |
0 ,... |
The |
Incorporating the details about service parameters specific to the Kafka trigger, here's how the section could be structured within the documentation:
Service Invocation Parameters
When a Kafka trigger invokes a service, it passes specific parameters to the service, enabling it to process the messages received from Kafka. Below are the parameters specific to Kafka listener triggers:
Kafka Listener-Specific Parameters
Name | Type | Description |
---|---|---|
messages |
java.util.List<org.apache.kafka.clients.consumer.ConsumerRecord> |
A list containing the key-value pairs received from Kafka. Each item in the list represents a single Kafka message, encapsulating both the message's key and value. |
consumer |
org.apache.kafka.clients.consumer.Consumer |
The Kafka consumer client instance that fetched the messages. This parameter allows the service to access consumer-specific functionalities, such as manual commit of offsets, if required. |
ack |
org.springframework.kafka.support.Acknowledgment |
Provided when Enable Auto Commit is set to false and Acknowledgment Mode is MANUAL or MANUAL_IMMEDIATE . The service must explicitly call ack.acknowledge() to manually commit the message offset. If auto-commit is enabled or not required, this parameter can be disregarded. |
Handling Deserialization Failures
Messages that fail deserialization are encapsulated in an io.toro.martini.endpoint.DeserializationException
object. It is crucial for services to check if the message key or value is an instance of DeserializationException
to properly handle deserialization failures. This approach ensures that your service can gracefully manage errors and implement logic to retry or log failures as appropriate.
1 2 3 |
|
This setup ensures that your Kafka-triggered services are robust and capable of handling various scenarios, including deserialization failures, efficiently.