Skip to content

Configuring Apache Kafka for Martini

Apache Kafka is an open-source distributed streaming platform for building real-time applications. Producers put messages into topics and are divided into partitions for consuming processes. Every partition has ordered messages with different offsets. Apache Kafka brokers that make up a cluster are partitioned; this makes the system redundant and scalable. Consumers subscribe to topics and track their position with offsets, and messages can be read concurrently when working in groups. Apache Kafka provides the feature of fault tolerance through the data replication mechanism suitable for low-latency messaging and real-time processing.

Prerequisites

To start things off, you are required to configure your Apache Kafka setup and successfully verify all of it's functionality.

For an on-premises instance of Apache Kafka, you may refer to the Apache Kafka Quickstart.

For a cloud-hosted instance of Apache Kafka, you can provision from the following:

  • Amazon Managed Streaming for Apache Kafka: A fully managed service from AWS that simplifies Apache Kafka cluster management, allowing automatic scaling, monitoring, and security integration with AWS services like IAM, S3, and CloudWatch.
  • Google Cloud Managed Service for Apache Kafka: Google Cloud offers Apache Kafka through third-party partners such as Confluent and Aiven, providing fully managed Apache Kafka clusters with native GCP integration for seamless scaling and monitoring.
  • Kora by Confluent Cloud: A cloud-native, fully managed Apache Kafka service from Confluent, supporting multi-cloud deployments (AWS, GCP, Azure) and providing additional tools for stream processing, security, and monitoring.

Connect to an Apache Kafka instance

To establish connectivity to Apache Kafka, follow the steps below. You may also acquire a pre-made package that has Apache Kafka related functions for interacting, which is available in Lonti Marketplace. With this, you can perform tasks such as creating producers and sending messages to a topic.

1. Create/Edit a Package and Kafka Listener Endpoint

The package directory is located at <martini-home>/packages/<your-package>/conf/package.xml

1
2
3
4
5
6
7
8
9
<endpoint type="kafka-listener" name="[listener-name]" service="gloop:[service-name]/[service-name]" enabled="[true/false]" modifiable="true">
    <properties>
    <property name="groupId">[groupId]</property>
    <property name="valueDeserializer">[value]</property>
    <property name="keyDeserializer">[key]</property>
    <property name="bootstrapServers">[IP]:[PORT]</property>
    <property name="topics">topic=[topic-name]</property>
    </properties>
</endpoint>
  • By specifying enabled=true the endpoint will auto-start when Martini Server Runtime launches.
  • Define the [listener-name], [service-name], [groupId], [IP]:[PORT], and [topic-name] according to your specific configuration. For the [key] and [value], you can choose from the following types: byte, byte_buffer, double, integer, long, string, json, or xml. You may refer to Serialization, Deserialization, and Message Conversion by Spring to know more.
  • To specify multiple topics, separate them with commas, like this: topic=test-topic1,topic=test-topic2. Similarly, for multiple bootstrapServers, use a comma to separate each entry, as shown: 0.0.0.0:8081,0.0.0.0:8082.

2. Start Martini Runtime

If Martini is already running, restart it. Your Kafka Listener Endpoint should auto-start if set to enabled=true; if set to enabled=false you may start it using Martini CLI or API-Explorer.

3. Send a Message to the Queue

This step is crucial for validating the endpoint's ability to receive messages.

4. Verify Message Receipt

Upon sending a message, monitor the Martini server logs to confirm that it successfully receives the message. The receipt of the message will be indicated through the logging of different properties associated with the message, demonstrating the endpoint's operational status.

Advanced endpoint properties

Parameter Name Required Default Value Description
groupId true bytes-group The ID of the consumer group.
pollTimeout false 1000 Time to wait for new data in milliseconds.
saslKerberosTicketRenewJitter false 0.05 Jitter for renewing the Kerberos ticket.
reconnectBackoffMax false 1000 Maximum time to wait before reconnecting.
sslProvider false null SSL provider to use.
ackOnError false true Whether to acknowledge on error.
receiveBufferBytes false 65536 Size of the receive buffer in bytes.
metricsRecordingLevel false INFO Level of metrics recording.
valueDeserializer true null Deserializer for the message value.
saslKerberosServiceName false null Service name for Kerberos.
metadataMaxAge false 300000 Time in milliseconds to cache metadata.
shutdownTimeout false 10000 Time to wait before shutting down.
fetchMaxWait false 500 Maximum time to wait for a fetch response.
sslTruststoreType false JKS Type of the SSL truststore.
metricsReporters false null Reporters for metrics.
sendBufferBytes false 131072 Size of the send buffer in bytes.
heartbeatInterval false 3000 Interval for heartbeat in milliseconds.
interceptorClasses false null Classes for interceptors.
sslKeystoreType false JKS Type of the SSL keystore.
saslKerberosTicketRenewWindowFactor false 0.8 Factor for the Kerberos ticket renew window.
batch false false Whether to use batch processing.
saslMechanism false GSSAPI SASL mechanism for authentication.
connectionsMaxIdle false 540000 Maximum idle time for connections.
monitorInterval false 30 Interval to monitor the consumer.
sslKeystorePassword false null Password for the SSL keystore.
enableAutoCommit false true Whether to enable auto-commit.
isolationLevel false read_uncommitted Isolation level for transactions.
maxPollRecords false 500 Maximum records to return in a single poll.
sslProtocol false TLS Protocol to use for SSL.
sessionTimeout false 10000 Session timeout in milliseconds.
keyDeserializer true null Deserializer for the message key.
sslKeymanagerAlgorithm false SunX509 Key manager algorithm for SSL.
partitionAssignmentStrategy false null Strategy for partition assignment.
ackTime false 1 Time to wait for acknowledgment.
documentType false Kafka Listener Type of the document.
sslCipherSuites false null Supported SSL cipher suites.
logContainerConfig false true Whether to log container configuration.
saslJaasConfig false null JAAS configuration for SASL.
fetchMinBytes false 1 Minimum bytes to fetch.
sslKeyPassword false null Password for the SSL key.
saslKerberosKinitCmd false /usr/bin/kinit Command to run for Kerberos authentication.
syncCommits false true Whether to commit synchronously.
sslEndpointIdentificationAlgorithm false null Algorithm for SSL endpoint identification.
ackCount false 1 Number of acknowledgments required.
maxPartitionFetchBytes false 1048576 Maximum bytes to fetch per partition.
securityProtocol true plaintext Security protocol to use.
autoCommitInterval false 5000 Interval for auto-commit in milliseconds.
sslTruststorePassword false null Password for the SSL truststore.
bootstrapServers true localhost:8081 List of bootstrap servers.
sslEnabledProtocols false TLSv1,TLSv1.2,TLSv1.1 Supported SSL protocols.
track false false Whether to track the listener.
sslKeystoreLocation false null Location of the SSL keystore.
requestTimeout false 305000 Timeout for requests in milliseconds.
sslTruststoreLocation false null Location of the SSL truststore.
metricsNumSamples false 2 Number of samples for metrics.
clientId false null Client identifier.
reconnectBackoff false 50 Time to wait before reconnecting.
topics true topic=topic List of topics to subscribe to.
maxPollInterval false 300000 Maximum interval for polling.
retryBackoff false 100 Backoff time for retries in milliseconds.
sslTrustmanagerAlgorithm false PKIX Algorithm for the trust manager.
autoOffsetReset false latest Behavior when there is no initial offset.
concurrency false 1 Number of concurrent consumers.
saslKerberosMinTimeBeforeRelogin false 60000 Minimum time before a relogin attempt.
excludeInternalTopics false true Whether to exclude internal topics.
fetchMaxBytes false 52428800 Maximum bytes to fetch in one request.
ackMode false batch Acknowledgment mode.
sslSecureRandomImplementation false null Implementation of secure random for SSL.
metricsSampleWindow false 30000 Sample window for metrics in milliseconds.
replicated false true Whether the listener is replicated.
checkCrcs false true Whether to check CRCs.
commitLogLevel false debug Log level for commits.

Martini Events

The Kafka Listener Trigger enables developers to write applications that subscribe to Apache Kafka topics and invoke a registered service when messages are received.

  1. Navigate and configure your Apache Kafka Function: Navigate within your Apache Kafka Package and find the different pre-configured Apache Kafka Services and choose the one that is tailored to your needs. For this example, we'll be using the sendString service. Edit the bootstrapServers property within the Apache Kafka Function based on your Broker IP Address. Change the key and value serializers to match your configurations within the Apache Kafka Listener Trigger.

  2. Run the Service: Once all configurations are finished, start the service. You will be asked to input your desired values within two properties: key and value. You should be able to see a message within your Console logs that indicates the operation was a success. In this case, the key value was name, the value was martini, and the topic is named martini-test.

    1
    INFO  [Martini] ConsumerRecord(topic = martini-test, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1727670185056, serialized key size = 4, serialized value size = 7, headers = RecordHeaders(headers = [], isReadOnly = false), key = name, value = martini)
    

The message should also be visible to your topic within your Apache Kafka Instance.