Spark streaming use case with Python

Spark streaming use case with Python

Get twitter streaming tweets that using tweepy, a Python twitter API, download tweets and store into HIVE table using HQL (Hive SQL)

https://tweepy.readthedocs.io/en/latest/index.html

You must first install tweepy, make sure install in our virtualenv spark

conda activate spark
pip install tweepy

Test to make sure import successful, without error

python -c "import tweepy"

Then you can run it in Jupyter-notebook

Make sure you have changed the right IP address of the machine that runs HIVE, if you are running our CentOS VM, the IP address is 10.0.2.15, so no change to below. Otherwise, change the IP address to the machine that runs HIVE.

.config("hive.metastore.uris", "thrift://10.0.2.15:9083")

Next fill in your Twitter credential

CONSUMER_KEY = '<your consumer key>'
CONSUMER_SECRET = 'Your consumer Secret'
ACCESS_TOKEN = 'Your Access Token'
ACCESS_SECRET = 'Your Access Token Secret'

Then you can run below code with modified IP address and valid Twitter credential.

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
import datetime

spark = SparkSession \
          .builder \
          .config("spark.master", "local") \
          .appName("interfacing spark sql to hive metastore with no configuration file") \
          .config("hive.metastore.uris", "thrift://10.0.2.15:9083") \
          .enableHiveSupport() \
          .getOrCreate()

sc=spark.sparkContext

sc.setLogLevel("ERROR")

spark.sql("CREATE TABLE IF NOT EXISTS tweets_python (datetime STRING, text STRING) USING hive")

#Set Twitter credential

CONSUMER_KEY = '<your consumer key>'
CONSUMER_SECRET = 'Your consumer Secret'
ACCESS_TOKEN = 'Your Access Token'
ACCESS_SECRET = 'Your Access Token Secret'
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json, time, sys

import tweepy
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)

class MyListener(StreamListener):
    def __init__(self, api=None):
        super().__init__()

    def on_status(self, status):
        
        text = status.text
        created = str(status.created_at)
        record = {'Text': text, 'Created At': created}
#Save to HIVE table tweets        
        filtered_data="".join(list(filter(lambda x: x!="@" and x!="'" and x!='"' \
                                          and x!="\n" and x!='{' and x!='}' and x!='(' and x!=')',text)))
        print(filtered_data)
        spark.sql("INSERT INTO tweets values ('"+created+"','"+filtered_data+"')")


    def on_error(self, status):
        print ('Something wrong with status', status)

    def on_limit(self, status):
        print ('Over the threshold', status)

    def on_timeout(self, status):
        print ('Timeout...')


stream = Stream(auth=auth, listener=MyListener())
stream.filter(track=['stock market'])

'''
Run it:
Ciena CIEN Outpaces Stock Market Gains: What You Should Know 
https://t.co/XFAL2xutM8

RT AnnaApp91838450: BREAKING: Stock Market Soars After Trump Speech To The Nation 
https://t.co/0F9KOlHiesEat
 Your Heart Out CorruptDemoc…
Veeva Systems VEEV Outpaces Stock Market Gains: What You Should Know 
https://t.co/871ROpHNpT

RandyHedgehog Dolan__Ryan CaptainCons We are at Peace. Look at the stock market reaction. No casualties and we… 
https://t.co/JEfDOxkHI4

Innoviva INVA Outpaces Stock Market Gains: What You Should Know 
https://t.co/EIN4xgGC0y

Lrihendry ROHLL5 PRESIDENT TRUMP has been in OFFICE for 3 years and these five things are true. Lowest Unemploym… 
https://t.co/ht8cAE7Ifp




'''

Last updated