Apache Kafka and flume installation guide (import data from Kafka to HDFS)



This article contains a complete guide on how to install Apache Kafka, creating Kafka topics, publishing and subscribing Topic messages. In addition, it contains Apache Flume installation guide and how to import Kafka topic messages into HDFS using Apache Flume.

1. Environment

·         Hadoop Version: 3.1.0
·         Apache Kafka Version: 1.1.1
·         Apache Flume Version: 1.8.0
·         Operating System: Ubuntu 16.04
·         Java Version: Java 8

2. Prerequisites

2.1. Install Java

Apache Kafka requires Java. To ensure that Java is installed first update the Operating System then try to install it:
sudo apt-get update
sudo apt-get upgrade
sudo add-apt-repository -y ppa:webupd8team/java
sudo apt-get install oracle-java8-installer

2.2. Install Zookeeper

Apache Kafka requires Zookeeper service to be installed, because it uses it to maintain its nodes heart beats, its configuration and to elect leaders.
sudo apt-get install zookeeperd
Bu default zookeeper uses port 2181, and it runs automatically once it is installed. To check that Zookeeper is running use telnet command:
telnet localhost 2181
When telnet prompt open write "are you okay" command:
ruok
If everything is okay it will return imok message.

2.3. Create Non-root user account with sudo privilege

Because Kafka is a network application and it may use multiple node it is better to create a service user with sudo privileges.
sudo adduser --system --no-create-home --disabled-password --disabled-login kafkauser

3. Install and Configure Kafka Server

3.1. Download Kafka

First, we need to download Kafka binaries package.
wget http://www-eu.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz
Now, we need to extract the tgz package. We choose to install Kafka in /opt/kafka directory:
sudo tar xvf kafka_2.11-1.1.1.tgz -–directory /opt/kafka -–strip 1
Kafka stores it logs on disk in /tmp directory, it is better to create a new directory to store logs
sudo mkdir /var/lib/kafka
sudo mkdir /var/lib/kafka/data

3.2. Configure Kafka

Now, we need to edit the Kafka server configuration file.
sudo gedit /opt/kafka/config/server.properties
By default, Kafka doesn’t allow us to delete topics. To be able to delete topics, find the line and change it (if it is not found just add it).
delete.topic.enable = true
Now, we need to change the log directory:
log.dirs=/var/lib/kafka/data
Also, we can adjust the time interval for logs deletion (Kafka deletes logs after a particular time or according to disk size):
log.retention.hours=168 # according to time
log.retention.bytes=104857600 # according to disk size

3.3. Change directories permissions

We need to give access to kafkauser on the logs directory and kafka installation directory:
sudo chown –R kafkauser:nogroup /opt/kafka
sudo chown –R kafkauser:nogroup /var/lib/kafka

3.4. Start Kafka

To start Apache Kafka service you can use the following command:
sudo /opt/kafka/bin/kafka-server-start.sh /opt/kafkaconfig/server.properties
You should see the following output, if the server has started successfully:
[2018-07-23 21:43:48,279] WARN No meta.properties file under dir /var/lib/kafka/data/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2018-07-23 21:43:48,516] INFO Kafka version : 0.10.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2018-07-23 21:43:48,525] INFO Kafka commitId : a7a17cdec9eaa6c5 (org.apache.kafka.common.utils.AppInfoParser)
[2018-07-23 21:43:48,527] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2018-07-23 21:43:48,555] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
To start Kafka as a background process, you can use nohup command
sudo nohup /opt/kafka/bin/kafka-server-start.sh /opt/kafkaconfig/server.properties /var/lib/kafka/data/kafka.log 2>&1 &
You now have a Kafka server running and listening on port 9092.

3.5. Launch Kafka as a service on startup

First, we need to create a service unit file in /etc/systemd/system
sudo gedit /etc/systemd/system/kafka.service
kafka.service
[Unit]
Description=High-available, distributed message broker
After=network.target
[Service]
User=kafka
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
[Install]
WantedBy=multi-user.target
Also use can forward log to another file so that your syslog is clean
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties > /opt/kafka/server.log
To start the created service, use the following command:
sudo systemctl start kafka.service
To auto start the service on operating system startup
sudo systemctl enable kafka.service
You can check your service status using the following command:
sudo systemctl status kafka.service

3.6. Working with Kafka Topics

3.6.1. Create a Kafka Topic

To create a Kafka topic we must use kafka-topics.sh script file, and we need to specify the zookeeper address, replication factor, partitioning factor and the topic name:
/opt/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testKafka

3.6.2. List available Topics

To list all topics use the following command:
/opt/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181

3.6.3. Publishing and Subscribing messages 

To publish a message into a topic we must use kafka-console-producer.sh script, and we must to specify the kafka server address and  the topic name:
/opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testKafka
Then, we need to open another terminal and create a subscriber using kafka-console-consumer.sh script. We need to pass the Kafka server address and the topic name
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
In the first terminal (producer), enter any message (example: Hello!!) it must appear in the second terminal.

3.6.4. Import a text file into a Kafka topic

To open a text file into a Kafka topic, you need to use cat command with a pipeline:
cat filename.txt | /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testKafka

3.7. Kafka multi-node cluster

In addition, we can run it on multiple nodes for data redundancy and accidentally failover. Assuming that we have 3 nodes:
Hostname
IP
Description
Kafka-server
10.0.1.1
Main installation we've already done.
Kafka-node-1
10.0.1.2

Kafka-node-2
10.0.1.3

We need to follow the same steps we have mentioned before with some additions:
1.       In /opt/kafka/config/server.properties
a.       each node must have a unique broker.id property
for node-2 broker.id=1
for node-3 broker.id=2
b.      change zookeeper.connect value to have such that it lists all zookeeper hosts with port
zookeeper.connect=10.0.1.1:2181,10.0.1.2:2181,10.0.1.3:2181
2.       We have to change the zookeeper settings. open zoo.cfg file with an editor sudo gedit /etc/zookeeper/conf/zoo.cfg and add the following lines:
server.0=10.0.1.1:2888:3888
server.1=10.0.1.2:2888:3888
server.2=10.0.1.3:2888:3888
3.       Restart Zookeeper service
sudo systemctl restart zookeeper.service

4. Install and Configure Apache Flume

To read messages from a Kafka topic and store them into HDFS we need to install Apache Flume. Which is an application used to store unstructured and semi-structured data into HDFS.

4.1. Download Flume

First, we need to download apache flume binaries package
wget "http://www-eu.apache.org/dist/flume/1.8.0/apache_flume-1.8.0-bin.tar.gz"
Now, we need to extract the .gz package. We choose to install Kafka in /opt/kafka directory:
sudo tar xvf apache_flume-1.8.0-bin.tar.gz –-directory /opt/flume –-strip 1

4.2. Configure Flume

You need to edit "/etc/profile" and "~/.bashrc" file and add the following lines
export FLUME_HOME= /opt/flume
export PATH=$PATH:$FLUME_HOME/bin/

To apply these changes, use source ~/.bashrc command.
Once finished, flume agent started automatically. To ensure that flume is installed successfully you can run the following command:
flume-ng –help
To import data into HDFS, first we need to create a log file in your home directory.
gedit ~/access.log
Write any data and save it.
Create a file "flume.conf" file inside /opt/flume/conf and write the following data into it:
flume1.sources  = kafka-source-1
flume1.channels = hdfs-channel-1
flume1.sinks    = hdfs-sink-1
flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
flume1.sources.kafka-source-1.zookeeperConnect = localhost:2181
flume1.sources.kafka-source-1.topic = testKafka
flume1.sources.kafka-source-1.batchSize = 100
flume1.sources.kafka-source-1.channels = hdfs-channel-1

flume1.channels.hdfs-channel-1.type   = memory
flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
flume1.sinks.hdfs-sink-1.type = hdfs
flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text
flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream
flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
flume1.sinks.hdfs-sink-1.hdfs.path = /data/kafka/%{topic}/%y-%m-%d
flume1.sinks.hdfs-sink-1.hdfs.rollCount=100
flume1.sinks.hdfs-sink-1.hdfs.rollSize=0
flume1.channels.hdfs-channel-1.capacity = 10000
flume1.channels.hdfs-channel-1.transactionCapacity = 1000
Note that flume1 is the flume agent name.
Below, some parameters description:
Property
Description
flume1.sources.kafka-source-1.zookeeperConnect
Zookeeper address
flume1.sources.kafka-source-1.topic
Kafka Topic name
flume1.sinks.hdfs-sink-1.hdfs.path
HDFS directory path
flume1.sinks.hdfs-sink-1.hdfs.writeFormat
Data format in HDFS
flume1.sources.kafka-source-1.type
Apache Flume Source type
Now, you need to run the flume agent to read data from the Kafka topic and write it to HDFS.
flume-ng agent -n flume1 -c conf -f /opt/flume/conf/flume-conf.properties -    Dflume.root.logger=INFO,console
Note: The agent name is specified by -n FileAgent and must match an agent name given in -f conf/flume.conf
Data will be now dumped to HDFS location under the following path
/tmp/kafka/%{topic}/%y-%m-%d
%{topicà Kafka topic name
%y-%m-%d à  year-month-day

5. References

[1]
"Apache Kafka Official Documentation," Apache, [Online]. Available: http://kafka.apache.org. [Accessed 15 07 2018].
[2]
"Apache Flume Official Documentation," Apache, [Online]. Available: http://flume.apache.org. [Accessed 20 07 2018].
[3]
Sarad, "How to Install Kafka on Ubuntu 16.04," Hevo, 20 August 2017. [Online]. Available: https://hevodata.com/blog/how-to-install-kafka-on-ubuntu. [Accessed 30 06 2018].
[4]
"Kafka setup in 15 minutes," ETL-Tools, [Online]. Available: http://etl-tools.info/en/examples/kafka-setup-in-15-minutes.htm. [Accessed 25 07 2018].
[5]
hitjethva, "Install and Configure Apache Kafka on Ubuntu 16.04," DevOps, 03 October 2016. [Online]. Available: https://devops.profitbricks.com/tutorials/install-and-configure-apache-kafka-on-ubuntu-1604-1/. [Accessed 15 07 2018].
[6]
M. Litwintschik, "Hadoop 3 Single-Node Install Guide," 19 March 2018. [Online]. Available: http://www.tech.marksblogg.com/hadoop-3-single-node-install-guide.html. [Accessed 10 06 2018].
[7]
"Stack overflow Q&A," Stack overflow, [Online]. Available: https://www.Stackoverflow.com.
[8]
Gwen Shapira and Jeff Holoman, "Flafka: Apache Flume Meets Apache Kafka for Event Processing," Cloudera, 06 November 2014. [Online]. Available: http://blog.cloudera.com/blog/2014/11/flafka-apache-flume-meets-apache-kafka-for-event-processing/. [Accessed 15 06 2018].
[9]
S. Team, "Apache Flume Installation Tutorial – A beginners guide," Data Flair, 22 August 2016. [Online]. Available: https://data-flair.training/blogs/apache-flume-installation-tutorial. [Accessed 20 06 2018].

Popular posts from this blog

SSIS - Script Task error: Exception has been thrown by the target of an invocation.

Don’t install Hadoop on Windows!

SSIS - script component DateTime Formats Implicit Conversion