Apache Flume and Streaming Data:
Apache Flume, as its website mentions โ is a distributed, reliable, and available system for efficiently collecting, aggregating and moving large amounts of log data from many different sources to a centralized data store such as Hadoop HDFS. The application of Apache Flume is restricted not only to log data aggregation. It can be used to transport large volumes data streams from any possible data source.
In an organization, we would have a number of data sources that generate a variety of streaming data at high velocity and high volume. Processing streaming data has become very important and essential. For instance:
With Flume, we can not only capture and ingest the streaming data into a data store like Hadoop HDFS but we can also do certain amount of in-flight processing of this data. We will discuss how we can use Apache Flume with a specific attention to on this feature.
Components of Flume:
To process a data stream, we run a Flume agent, which is a long running process. A unit of data flow is referred to as an event. A flume agent hosts the components through which events flow from an external source to the next destination (sink).
To set up a flume agent, we need to write a configuration file specifying the properties of the source, channel and the sink. We can also specify properties of interceptors which gives the ability to modify and/or drop events in-flight.
Once the configuration is written, we can run a Flume agent using the command flume-ng which comes with Flume installation with the syntax given below.
flume-ng agent -n <agent_name> -c conf -f <flume-conf.properties>
Following are the details of each component.
Agent: The Agent receives events from clients or any other agents. Flume-Agent is an independent daemon process, which is installed on each node to collect the events. Any Java Virtual Machine (JVM) that runs flume, consists of sources, sinks, channels and other important components through which events get transferred from one place to another.
Source: The source is the component of an Agent which receives data from the data generators and transfers it to one or more channels in the form of Flume events. The major sources are:
Flume also allows us to specify advanced sources of data streams such as Twitter, Kafka and so on.
Example โ Properties of a source in a typical Flume configuration:
# Name the components on this agent
agent1.sources = r1
# Describe/configure the source – SpoolDir
agent1.sources.r1.type = spooldir
agent1.sources.r1.spoolDir = /home/hduser/Desktop/flume/input
Channel: Channels are communication bridges between sources and sinks within an agent. Flume provides support for various channels, like:
Example โ Properties of a channel in a typical Flume configuration:
agent1.channels = c1
# Use a channel which buffers events in file
agent1.channels.c1.type = file
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
agent1.channels.c1.checkpointDir = /home/hduser/Desktop/flume/checkpoint/
agent1.channels.c1.dataDirs = /home/hduser/Desktop/flume/dataDir/
agent1.channels.file.writeFormat = Text
Sinks: Events are transferred to HDFS. Sinks supports multiple file formats like Text, AVRO, JSON, Compressed files. There are multiple sinks available that deliver data to a wide range of destinations, such as :
Example โ Properties of a sink in a typical Flume configuration:
# Describe the HDFS sink
agent1.sinks = k1
agent1.sinks.k1.type = hdfs
agent1.sinks.k1.channel = c1
agent1.sinks.k1.hdfs.path = /user/hduser/regexpfilter
agent1.sinks.k1.hdfs.fileType= DataStream
agent1.sinks.k1.hdfs.writeFormat = Text
Flume Interceptors:
The events received by sources can be transformed or dropped by interceptors before they are written to the corresponding channels. Interceptors are simple components that sit between a source and the channels. Interceptors are commonly used to analyse events, we can do any filter, aggregation operations in Interceptors.
Types of Interceptors:
Timestamp Interceptor:
It inserts the timestamp into the Flume event headers with the timestamp key. The timestamp key is the header that the HDFS Sink uses for bucketing. If the timestamp header is already present, this interceptor will replace it unless the preserve Existing parameter is set to false. To add a timestamp interceptor, use the alias timestamp.
The configuration parameters are:
agent1.sources.source_name.interceptors = Interceptor_timestamp
agent1.sources.source_name.interceptors. Interceptor_timestamp.type = timestamp
agent1.sources.source_name.interceptors. Interceptor_timestamp.preserveExisting = false
Host Interceptor
The host interceptor inserts the hostname or IP address of the server on which the agent is running into the Flume event headers. The key to be used in the headers is configurable using the host Header parameter, but defaults to host. We can insert the hostname instead of the IP address, set useIP to false.
agent1.sources.source_name.interceptors = Interceptor_host
agent1.sources.source_name.interceptors.Interceptor_host.type = host
agent1.sources.source_name.interceptors.Interceptor_host.useIP = false
agent1.sources.source_name.interceptors.Interceptor_host.preserveExisting = true
UUID Interceptor:
UUID abbreviated as Universally Unique Identifier is used to set unique identifier on all events. These events are intercepted and assigned an UUID of 128-bit value. This enables deduplicating documents that may be accidentally duplicated because of replication and redelivery in a Flume network that is designed for high availability and high performance.
agent1.sources.source_name.interceptors = Interceptor_uuid
agent1.sources.source_name.interceptors. Interceptor_uuid.headerName = eventId
agent1.sources.source_name.interceptors. Interceptor_uuid.prefix = usingFlume
agent1.sources.source_name.interceptors. Interceptor_uuid.preserveExising = false
Regex filtering interceptor:
The previous discussed Interceptors push the entire events to sinks, which may result in huge data in sink. The Regex filtering interceptors can be used to make sure only important events are passed through Flume agents to reduce the volume of data being pushed into HDFS.
agent1.sources.source_name.interceptors = include exclude
agent1.sources.source_name.interceptors.include.type = regex_filter
agent1.sources.source_name.interceptors.include.regex = .*Pattern1.*
agent1.sources.source_name.interceptors.include.excludeEvents = false
agent1.sources.source_name.interceptors.exclude.type = regex_filter
agent1.sources.source_name.interceptors.exclude.regex = .* Pattern2.*
agent1.sources.source_name.interceptors.exclude.excludeEvents = true
Use Case:
An American Internet Service Provider (ISP) offers internet service to its clients across the globe. It has servers in every region and a huge network traffic in their servers at some point of time, which leads their servers brings down. They want to analyse the traffic congestion on their network using the log data generated by the servers. So that in future they can increase the number of servers in the regions.
The Log files generate the details of IP address, time and date, the web URL, the port number and the browser details.
123.223.223.123 – – [13/May/2016:00:23:50 -0400] “GET /wallpapers/flowers.jpeg HTTP/1.0” 200 1031 “https://www.LinkedIn.com/” “Chrome/68.0 (Macintosh; I; PPC)”
123.123.123.101 – – [13/May/2016:00:23:50 -0400] “GET /wallpapers/flowers.jpeg HTTP/1.0” 200 1031 “https://www.LinkedIn.com/” “Chrome/68.0 (Macintosh; I; PPC)”
123.123.123.100 – – [13/May/2016:00:23:48 -0400] “GET /pics/wpaper.gif HTTP/1.0” 200 6248 “https://www.LinkedIn.com/” “Chrome/68.0 (Macintosh; I; PPC)”
123.123.123.112 – – [13/May/2016:00:23:47 -0400] “GET /asctortf/ HTTP/1.0” 200 8130 “https://search.netscape.com/Computers/Data_Formats/Document/Text/RTF” “Chrome/68.0 (Macintosh; I; PPC)”
123.223.223.123 – – [13/May/2016:00:23:48 -0400] “GET /downloads/sample.gif HTTP/1.0” 200 4005 “https://www.nfl.com/news” “Chrome/68.0 (Macintosh; I; PPC)”
How to run the Agents:
Step-1: Start the flume agent.
flume-ng agent –conf conf –conf-file spool_dir.conf –name agent1 -flume.root.logger=INFO,console
Step-2: Simulate the input data stream.
Run the one of the following shell scripts in order to simulate data stream source relevant to each flume agent, from the Linux shell prompt.
To simulate data stream for data source spooldir
$ ./spool_files.sh log.txt
To simulate data stream for data source netcat
$ ./socket_stream.sh log.txt | telnet localhost 7777
To simulate data stream for data source exec for generating a continuously growing file.
$ ./tail_file.sh log.txt
Step-3: To check the output files in the HDFS location
Hadoop fs -ls /user/hduser/regexpfilter
Appendix
Attached are the following files.
Flume Downloads
Fill in the details to know more
Important Artificial Intelligence Tools
October 31, 2022
Top 28 Data Analytics Tools For Data Analysts | UNext
September 27, 2022
Stringi Package in R
May 5, 2022
Best Frameworks In Java You Should Know In 2021
May 5, 2021
Lean Management Tools: An Ultimate Overview For 2021
May 4, 2021
Talend ETL: An Interesting Guide In 4 Points
Add your details:
By proceeding, you agree to our privacy policy and also agree to receive information from UNext through WhatsApp & other means of communication.
Upgrade your inbox with our curated newletters once every month. We appreciate your support and will make sure to keep your subscription worthwhile