Spark streaming use case with Python

Spark streaming use case with Python

Get twitter streaming tweets that using tweepy, a Python twitter API, download tweets and store into HIVE table using HQL (Hive SQL)
You must first install tweepy, make sure install in our virtualenv spark
1
conda activate spark
2
pip install tweepy
Copied!
Test to make sure import successful, without error
1
python -c "import tweepy"
Copied!
Then you can run it in Jupyter-notebook
Make sure you have changed the right IP address of the machine that runs HIVE, if you are running our CentOS VM, the IP address is 10.0.2.15, so no change to below. Otherwise, change the IP address to the machine that runs HIVE.
1
.config("hive.metastore.uris", "thrift://10.0.2.15:9083")
Copied!
Next fill in your Twitter credential
1
CONSUMER_KEY = '<your consumer key>'
2
CONSUMER_SECRET = 'Your consumer Secret'
3
ACCESS_TOKEN = 'Your Access Token'
4
ACCESS_SECRET = 'Your Access Token Secret'
Copied!
Then you can run below code with modified IP address and valid Twitter credential.
1
import findspark
2
findspark.init()
3
​
4
import pyspark
5
from pyspark.sql import SparkSession
6
import datetime
7
​
8
spark = SparkSession \
9
.builder \
10
.config("spark.master", "local") \
11
.appName("interfacing spark sql to hive metastore with no configuration file") \
12
.config("hive.metastore.uris", "thrift://10.0.2.15:9083") \
13
.enableHiveSupport() \
14
.getOrCreate()
15
​
16
sc=spark.sparkContext
17
​
18
sc.setLogLevel("ERROR")
19
​
20
spark.sql("CREATE TABLE IF NOT EXISTS tweets_python (datetime STRING, text STRING) USING hive")
21
​
22
#Set Twitter credential
23
​
24
CONSUMER_KEY = '<your consumer key>'
25
CONSUMER_SECRET = 'Your consumer Secret'
26
ACCESS_TOKEN = 'Your Access Token'
27
ACCESS_SECRET = 'Your Access Token Secret'
28
from tweepy.streaming import StreamListener
29
from tweepy import OAuthHandler
30
from tweepy import Stream
31
import json, time, sys
32
​
33
import tweepy
34
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
35
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
36
​
37
class MyListener(StreamListener):
38
def __init__(self, api=None):
39
super().__init__()
40
​
41
def on_status(self, status):
42
43
text = status.text
44
created = str(status.created_at)
45
record = {'Text': text, 'Created At': created}
46
#Save to HIVE table tweets
47
filtered_data="".join(list(filter(lambda x: x!="@" and x!="'" and x!='"' \
48
and x!="\n" and x!='{' and x!='}' and x!='(' and x!=')',text)))
49
print(filtered_data)
50
spark.sql("INSERT INTO tweets values ('"+created+"','"+filtered_data+"')")
51
​
52
​
53
def on_error(self, status):
54
print ('Something wrong with status', status)
55
​
56
def on_limit(self, status):
57
print ('Over the threshold', status)
58
​
59
def on_timeout(self, status):
60
print ('Timeout...')
61
​
62
​
63
stream = Stream(auth=auth, listener=MyListener())
64
stream.filter(track=['stock market'])
65
​
66
'''
67
Run it:
68
Ciena CIEN Outpaces Stock Market Gains: What You Should Know
69
https://t.co/XFAL2xutM8
70
​
71
RT AnnaApp91838450: BREAKING: Stock Market Soars After Trump Speech To The Nation
72
https://t.co/0F9KOlHiesEat
73
Your Heart Out CorruptDemoc…
74
Veeva Systems VEEV Outpaces Stock Market Gains: What You Should Know
75
https://t.co/871ROpHNpT
76
​
77
RandyHedgehog Dolan__Ryan CaptainCons We are at Peace. Look at the stock market reaction. No casualties and we…
78
https://t.co/JEfDOxkHI4
79
​
80
Innoviva INVA Outpaces Stock Market Gains: What You Should Know
81
https://t.co/EIN4xgGC0y
82
​
83
Lrihendry ROHLL5 PRESIDENT TRUMP has been in OFFICE for 3 years and these five things are true. Lowest Unemploym…
84
https://t.co/ht8cAE7Ifp
85
…
86
​
87
​
88
​
89
'''
90
​
Copied!
​
Last modified 1yr ago
Copy link