>Business >How to leverage Kafka Connect to develop an Open Source Data Pipeline for the process of real-time data

How to leverage Kafka Connect to develop an Open Source Data Pipeline for the process of real-time data

This blog post by AICoreSpot demonstrates how to develop a real-time data pipeline leveraging just purely open source solutions. These consist of Kafka Connect, Apache Kafka, and more.

Kafka Connect is an especially capable open source information streaming utility that make it pretty damn effortless to couple Kafka with other data technologies. Being a distributed technology, Kafka Connect provides especially high availability and elastic scaling independent of Kafka clusters. Leveraging source or sink connectors to transmit information to and from Kafka topics, Kafka Connect facilitates integrations with several non-Kafka technologies with no code required.

Robust open source connectors are available for several popular information technologies, as is the opportunity to author your own. This article strolls through a real-world, real-data use case for how to leverage Kafka Connect to integrate real-time streaming information from Kafka with Elasticsearch (to allow the scalable search of indexed Kafka records) and Kibana (in order to go about visualizing these outcomes).

Fan a fascinating use case that illustrates the benefits of Kafka and Kafka Connect, we were inspired by the CDC’s COVID-19 information tracker. The Kafka-enabled tracker gathers real-time COVID testing information from several locations, in several formats and leveraging several protocols, and processes those events into simply-consumable, visualized outcomes. The tracker also has the required data governance to make sure outcomes arrive quickly and can be trusted.

We started looking for a similarly complicated and compelling use case – but ideally one less fraught than the pandemic. Ultimately we came upon a fascinating domain, one that consisted of publicly available streaming REST APIs and rich data in a simplistic JSON format: lunar tides.

Lunar Tide Data

Tides adhere to the lunar day, a 24-hour-50-minute period in which the planet completely rotates to the same point underneath the orbiting moon. Every lunar day has two high tides created by the moon’s gravitational pull:

The National Oceanic and Atmospheric Administration (NOAA) furnishes a REST API that makes it simple to regain comprehensive sensor information from its global tidal stations.

For instance, the subsequent REST call specifies the station ID, data variant (we chose sea level) and datum (mean sea level), and requests the singular more latest outcome in metric units.

https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8724580&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json

 

This call gives back a JSON outcome with the latitude and longitude of the station, the time, and the water level value. Observe that you must recall what your call was in order to comprehend the data variant, datum, and units of the returned results!

{“metadata”: {   “id”:”8724580″,   “name”:”Key West”,   “lat”:”24.5508”,   “lon”:”-81.8081″}, “data”:[{   “t”:”2020-09-24 04:18″,   “v”:”0.597″,      “s”:”0.005″, “f”:”1,0,0,0″, “q”:”p”}]}

 

Beginning the data pipeline (with a REST source connector)

To start developing the Kafka Connect streaming information pipeline, we must first draft a Kafka cluster and a Kafka connect cluster.

Next, we put forth a REST connector. We’ll deploy it to an AWS S3 bucket. Then, we’ll inform the Kafka Connect cluster to leverage the S3 bucket, sync it to be visible within the cluster, configure the connector, and ultimately get it running. This BYOC (Bring Your Own Connector) strategy makes sure that you possess unlimited options for identifying a connector that meets your particular requirements.

The following instance illustrates leveraging a curl command to configure a 100% open-source Kafka Connect deployment to leverage a REST API. Observe that you’ll require to alter the URL, name, and password to match your own deployment:

curl https://connectorClusterIP:8083/connectors -k -u name:password -X POST -H 'Content-Type: application/json' -d '
{
    "name": "source_rest_tide_1",
    "config": {
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"org.apache.kafka.connect.storage.StringConverter",
      "connector.class": "com.tm.kafka.connect.rest.RestSourceConnector",
      "tasks.max": "1",
      "rest.source.poll.interval.ms": "600000",
      "rest.source.method": "GET",
      "rest.source.url": "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter?date=latest&station=8454000&product=water_level&datum=msl&units=metric&time_zone=gmt&application=instaclustr&format=json",
      "rest.source.headers": "Content-Type:application/json,Accept:application/json",
      "rest.source.topic.selector": "com.tm.kafka.connect.rest.selector.SimpleTopicSelector",
      "rest.source.destination.topics": "tides-topic"
    }
}

 

The connector task developed by this code polls the REST API in 10-minute intervals, authoring the outcome to the “tides-topic” Kafka topic. By arbitrarily opting for five cumulative tidal sensors to gather information in this fashion, tidal information is now filling the tides subject through five configurations and five connectors.

Ending the pipeline (with an Elasticssearch sink connector)

To provide this tide information someplace to go, we’ll put forth an Elasticsearch cluster and Kibana at the conclusion of the pipeline. We’ll configure an open source Elasticsearch sink connector to send Elasticssearch the data.

The following sample configuration leverages the sink name, class, Elasticsearch index, and our Kafka topic. If an index doesn’t exist presently, one with default mappings will be made.

curl https://connectorClusterIP:8083/connectors -k -u name:password -X POST -H 'Content-Type: application/json' -d '
{
  "name" : "elastic-sink-tides",
  "config" :
  {
    "connector.class" : "com.datamountaineer.streamreactor.connect.elastic7.ElasticSinkConnector",
    "tasks.max" : 3,
    "topics" : "tides",
    "connect.elastic.hosts" : ”ip",
    "connect.elastic.port" : 9201,
    "connect.elastic.kcql" : "INSERT INTO tides-index SELECT * FROM tides-topic",
    "connect.elastic.use.http.username" : ”elasticName",
    "connect.elastic.use.http.password" : ”elasticPassword"
  }
}'

The pipeline is now operational. However, all tide information coming in the Tides Index is a string, owing to the default index mappings.

Custom mapping is needed to correctly graph our time series information. We’ll develop this custom mapping for the tides-index below, leveraging the JSON “t” field for the custom date, “v” as a double, and “name” as the keyword for aggregation:

curl -u elasticName:elasticPassword ”elasticURL:9201/tides-index"  -X PUT -H 'Content-Type: application/json' -d'
{
"mappings" : {
  "properties" : {
     "data" : {
        "properties" : {
             "t" : { "type" : "date",
                     "format" : "yyyy-MM-dd HH:mm"
             },
             "v" : { "type" : "double" },
             "f" : { "type" : "text" },
             "q" : { "type" : "text" },
             "s" : { "type" : "text" }
             }
       },
       "metadata" : {
          "properties" : {
             "id" : { "type" : "text" },
             "lat" : { "type" : "text" },
             "long" : { "type" : "text" },
             "name" : { "type" : ”keyword" } }}}}         }'

 

Elasticsearch “reindexing” (deleting the index and reindexing all information) is usually needed every time you modify an Elasticsearch index mapping. Data can either be replayed from a current Kafka sink connector, as we have demonstrated in this use case, or sourced leveraging the Elasticsearch reindex operation.

Visualizing information with Kibana

To visualize the tide information, we’ll first develop an index pattern in Kibana, with “t” configured as the timefilter field. We’ll then develop a visualization, opting for a line graph type. Lastly, we’ll configure the graph settings such that y-axis demonstrates the average tide level over 30 minutes and the x-axis displays that information over time.

The outcome is a graph of modifications in the tides for the five sample stations that the pipeline gathers data from:

Outcomes

The periodic nature of tides is plain to observe in the visualization, with two high tides happening every lunar day.

More fascinatingly, the range between high and low tides is differing at every global station. This is owing to the influences of not only the moon, but the sun, local geography, weather, and climate modification. This instance Kafka Connect pipeline leverages Kafka, Elasticsearch and Kibana to illustrate the capability of visualizations: they can usually unveil what raw data cannot.

Add Comment