Configuring Apache Kafka for Martini
Apache Kafka is an open-source distributed streaming platform for building real-time applications. Producers put messages into topics and are divided into partitions for consuming processes. Every partition has ordered messages with different offsets. Apache Kafka brokers that make up a cluster are partitioned; this makes the system redundant and scalable. Consumers subscribe to topics and track their position with offsets, and messages can be read concurrently when working in groups. Apache Kafka provides the feature of fault tolerance through the data replication mechanism suitable for low-latency messaging and real-time processing.
Prerequisites
To start things off, you are required to configure your Apache Kafka setup and successfully verify all of it's functionality.
For an on-premises instance of Apache Kafka, you may refer to the Apache Kafka Quickstart.
For a cloud-hosted instance of Apache Kafka, you can provision from the following:
- Amazon Managed Streaming for Apache Kafka: A fully managed service from AWS that simplifies Apache Kafka cluster management, allowing automatic scaling, monitoring, and security integration with AWS services like IAM, S3, and CloudWatch.
- Google Cloud Managed Service for Apache Kafka: Google Cloud offers Apache Kafka through third-party partners such as Confluent and Aiven, providing fully managed Apache Kafka clusters with native GCP integration for seamless scaling and monitoring.
- Kora by Confluent Cloud: A cloud-native, fully managed Apache Kafka service from Confluent, supporting multi-cloud deployments (AWS, GCP, Azure) and providing additional tools for stream processing, security, and monitoring.
Connect to an Apache Kafka instance
To establish connectivity to Apache Kafka, follow the steps below. You may also acquire a pre-made package that has Apache Kafka related functions for interacting, which is available in Lonti Marketplace. With this, you can perform tasks such as creating producers and sending messages to a topic.
1. Create/Edit a Package and Kafka Listener Endpoint
The package directory is located at <martini-home>/packages/<your-package>/conf/package.xml
1 2 3 4 5 6 7 8 9 |
|
- By specifying
enabled=true
the endpoint will auto-start when Martini Server Runtime launches. - Define the
[listener-name]
,[service-name]
,[groupId]
,[IP]:[PORT]
, and[topic-name]
according to your specific configuration. For the[key]
and[value]
, you can choose from the following types:byte
,byte_buffer
,double
,integer
,long
,string
,json
, orxml
. You may refer to Serialization, Deserialization, and Message Conversion by Spring to know more. - To specify multiple topics, separate them with commas, like this:
topic=test-topic1,topic=test-topic2
. Similarly, for multiple bootstrapServers, use a comma to separate each entry, as shown:0.0.0.0:8081,0.0.0.0:8082
.
2. Start Martini Runtime
If Martini is already running, restart it. Your Kafka Listener Endpoint should auto-start if set to enabled=true
; if set to enabled=false
you may start it using Martini CLI or API-Explorer.
3. Send a Message to the Queue
This step is crucial for validating the endpoint's ability to receive messages.
4. Verify Message Receipt
Upon sending a message, monitor the Martini server logs to confirm that it successfully receives the message. The receipt of the message will be indicated through the logging of different properties associated with the message, demonstrating the endpoint's operational status.
Advanced endpoint properties
Parameter Name | Required | Default Value | Description |
---|---|---|---|
groupId |
true | bytes-group |
The ID of the consumer group. |
pollTimeout |
false | 1000 |
Time to wait for new data in milliseconds. |
saslKerberosTicketRenewJitter |
false | 0.05 |
Jitter for renewing the Kerberos ticket. |
reconnectBackoffMax |
false | 1000 |
Maximum time to wait before reconnecting. |
sslProvider |
false | null | SSL provider to use. |
ackOnError |
false | true |
Whether to acknowledge on error. |
receiveBufferBytes |
false | 65536 |
Size of the receive buffer in bytes. |
metricsRecordingLevel |
false | INFO |
Level of metrics recording. |
valueDeserializer |
true | null | Deserializer for the message value. |
saslKerberosServiceName |
false | null | Service name for Kerberos. |
metadataMaxAge |
false | 300000 |
Time in milliseconds to cache metadata. |
shutdownTimeout |
false | 10000 |
Time to wait before shutting down. |
fetchMaxWait |
false | 500 |
Maximum time to wait for a fetch response. |
sslTruststoreType |
false | JKS |
Type of the SSL truststore. |
metricsReporters |
false | null | Reporters for metrics. |
sendBufferBytes |
false | 131072 |
Size of the send buffer in bytes. |
heartbeatInterval |
false | 3000 |
Interval for heartbeat in milliseconds. |
interceptorClasses |
false | null | Classes for interceptors. |
sslKeystoreType |
false | JKS |
Type of the SSL keystore. |
saslKerberosTicketRenewWindowFactor |
false | 0.8 |
Factor for the Kerberos ticket renew window. |
batch |
false | false |
Whether to use batch processing. |
saslMechanism |
false | GSSAPI |
SASL mechanism for authentication. |
connectionsMaxIdle |
false | 540000 |
Maximum idle time for connections. |
monitorInterval |
false | 30 |
Interval to monitor the consumer. |
sslKeystorePassword |
false | null | Password for the SSL keystore. |
enableAutoCommit |
false | true |
Whether to enable auto-commit. |
isolationLevel |
false | read_uncommitted |
Isolation level for transactions. |
maxPollRecords |
false | 500 |
Maximum records to return in a single poll. |
sslProtocol |
false | TLS |
Protocol to use for SSL. |
sessionTimeout |
false | 10000 |
Session timeout in milliseconds. |
keyDeserializer |
true | null | Deserializer for the message key. |
sslKeymanagerAlgorithm |
false | SunX509 |
Key manager algorithm for SSL. |
partitionAssignmentStrategy |
false | null | Strategy for partition assignment. |
ackTime |
false | 1 |
Time to wait for acknowledgment. |
documentType |
false | Kafka Listener |
Type of the document. |
sslCipherSuites |
false | null | Supported SSL cipher suites. |
logContainerConfig |
false | true |
Whether to log container configuration. |
saslJaasConfig |
false | null | JAAS configuration for SASL. |
fetchMinBytes |
false | 1 |
Minimum bytes to fetch. |
sslKeyPassword |
false | null | Password for the SSL key. |
saslKerberosKinitCmd |
false | /usr/bin/kinit |
Command to run for Kerberos authentication. |
syncCommits |
false | true |
Whether to commit synchronously. |
sslEndpointIdentificationAlgorithm |
false | null | Algorithm for SSL endpoint identification. |
ackCount |
false | 1 |
Number of acknowledgments required. |
maxPartitionFetchBytes |
false | 1048576 |
Maximum bytes to fetch per partition. |
securityProtocol |
true | plaintext |
Security protocol to use. |
autoCommitInterval |
false | 5000 |
Interval for auto-commit in milliseconds. |
sslTruststorePassword |
false | null | Password for the SSL truststore. |
bootstrapServers |
true | localhost:8081 |
List of bootstrap servers. |
sslEnabledProtocols |
false | TLSv1,TLSv1.2,TLSv1.1 |
Supported SSL protocols. |
track |
false | false |
Whether to track the listener. |
sslKeystoreLocation |
false | null | Location of the SSL keystore. |
requestTimeout |
false | 305000 |
Timeout for requests in milliseconds. |
sslTruststoreLocation |
false | null | Location of the SSL truststore. |
metricsNumSamples |
false | 2 |
Number of samples for metrics. |
clientId |
false | null | Client identifier. |
reconnectBackoff |
false | 50 |
Time to wait before reconnecting. |
topics |
true | topic=topic |
List of topics to subscribe to. |
maxPollInterval |
false | 300000 |
Maximum interval for polling. |
retryBackoff |
false | 100 |
Backoff time for retries in milliseconds. |
sslTrustmanagerAlgorithm |
false | PKIX |
Algorithm for the trust manager. |
autoOffsetReset |
false | latest |
Behavior when there is no initial offset. |
concurrency |
false | 1 |
Number of concurrent consumers. |
saslKerberosMinTimeBeforeRelogin |
false | 60000 |
Minimum time before a relogin attempt. |
excludeInternalTopics |
false | true |
Whether to exclude internal topics. |
fetchMaxBytes |
false | 52428800 |
Maximum bytes to fetch in one request. |
ackMode |
false | batch |
Acknowledgment mode. |
sslSecureRandomImplementation |
false | null | Implementation of secure random for SSL. |
metricsSampleWindow |
false | 30000 |
Sample window for metrics in milliseconds. |
replicated |
false | true |
Whether the listener is replicated. |
checkCrcs |
false | true |
Whether to check CRCs. |
commitLogLevel |
false | debug |
Log level for commits. |
Martini Events
The Kafka Listener Trigger enables developers to write applications that subscribe to Apache Kafka topics and invoke a registered service when messages are received.
-
Navigate and configure your Apache Kafka Function: Navigate within your Apache Kafka Package and find the different pre-configured Apache Kafka Services and choose the one that is tailored to your needs. For this example, we'll be using the sendString service. Edit the
bootstrapServers
property within the Apache Kafka Function based on your Broker IP Address. Change thekey
andvalue serializers
to match your configurations within the Apache Kafka Listener Trigger. -
Run the Service: Once all configurations are finished, start the service. You will be asked to input your desired values within two properties: key and value. You should be able to see a message within your Console logs that indicates the operation was a success. In this case, the key value was
name
, the value wasmartini
, and the topic is namedmartini-test
.1
INFO [Martini] ConsumerRecord(topic = martini-test, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1727670185056, serialized key size = 4, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = name, value = martini)
The message should also be visible to your topic within your Apache Kafka Instance.