IKH

Demo: Kafka Connect – Fetching Tweets

In the previous segment, you understood what Kafka Connect is and some of its key concepts. In this segment, you will learn how to fetch data from Twitter and store it inside a Kafka topic named twitter.

Follow the steps mentioned in the document attached below to create a developer account and twitter app if not done already.

Before proceeding, you need to start the EC2 instance and log in to the instance. Next, you need to start the Zookeeper and the Kafka server and create a topic named ‘Twitter‘ . Enter the following commands to complete these steps:

From your EC2 instance’s home directory, go into the Kafka folder, enter the following command:

Once you are inside the folder, you need to start the zookeeper serve. You can enter the following command to start it:

Next, you need to open a new terminal window and log in to EC2 instance and go to the Kafka folder again. Inside the folder, you need to run the following command start the Kafka server.

Now you need to create a topic named twitter. You can do the same by entering the following command:

After completing all the steps, you can watch the next video. The Jupyter Notebooks used in the demonstration are attached below.

Before running the below code you need to install the following Python packages in your EC2 instance.

In the notebook named Kafka Connect Example, you need to enter the twitter credentials before running the notebook.

In the variable named access_token enter your access token and in the variable named access_token_secret enter the access token secret which you got from twitter. Next, in the variable named  consumer _key, you need to enter the API key and in variable named consumer_secret, you need to enter API Key Secret. Once you have entered these credentials you will be able to run the notebook successfully.

In the video above, you saw the steps to pull data from Twitter. You need to specify your consumer key, consumer secret, access token and access token secret. After specifying these details, we created the class KafkaListener to send the data from Twitter to the Kafka topic named twitter. We then specified the bootstrap servers and created an instance of a producer.

Next, we created an instance of the KafkaListener class and an instance of the stream class.

We then filtered the stream based on the keyword ‘news’, after which all tweets with the word ‘news’ in them were pushed to the Kafka topic twitter.

Now let’s write a consumer program to check if we were really able to push messages into this topic.  Let’s watch the next video to check this.

In the video above, you saw the python code for the consumer that will be reading the messages from the topic twitter. We provided the bootstrap-server address and the name of the topic from which the consumer program needs to reads the messages. We also specified the property auto_offset_reset to earliest. Next, we printed each message present in that particular topic using a for loop. For each message, we printed the topic and the partition in which the message was stored, the offset ID, the key and value of the message.

Report an error