# Spark streaming use case with Python

### Spark streaming use case with Python

Get twitter streaming tweets that using tweepy, a Python twitter API, download tweets and store into HIVE table using HQL (Hive SQL)

<https://tweepy.readthedocs.io/en/latest/index.html>

You must first install tweepy, make sure install in our virtualenv spark

```
conda activate spark
pip install tweepy
```

Test to make sure import successful, without error

```
python -c "import tweepy"
```

Then you can run it in Jupyter-notebook

Make sure you have changed the right IP address of the machine that runs HIVE, if you are running our CentOS VM, the IP address is 10.0.2.15, so no change to below. Otherwise, change the IP address to the machine that runs HIVE.

```
.config("hive.metastore.uris", "thrift://10.0.2.15:9083")
```

Next fill in your Twitter credential

```
CONSUMER_KEY = '<your consumer key>'
CONSUMER_SECRET = 'Your consumer Secret'
ACCESS_TOKEN = 'Your Access Token'
ACCESS_SECRET = 'Your Access Token Secret'
```

Then you can run below code with modified IP address and valid Twitter credential.

```
import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
import datetime

spark = SparkSession \
          .builder \
          .config("spark.master", "local") \
          .appName("interfacing spark sql to hive metastore with no configuration file") \
          .config("hive.metastore.uris", "thrift://10.0.2.15:9083") \
          .enableHiveSupport() \
          .getOrCreate()

sc=spark.sparkContext

sc.setLogLevel("ERROR")

spark.sql("CREATE TABLE IF NOT EXISTS tweets_python (datetime STRING, text STRING) USING hive")

#Set Twitter credential

CONSUMER_KEY = '<your consumer key>'
CONSUMER_SECRET = 'Your consumer Secret'
ACCESS_TOKEN = 'Your Access Token'
ACCESS_SECRET = 'Your Access Token Secret'
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import json, time, sys

import tweepy
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)

class MyListener(StreamListener):
    def __init__(self, api=None):
        super().__init__()

    def on_status(self, status):
        
        text = status.text
        created = str(status.created_at)
        record = {'Text': text, 'Created At': created}
#Save to HIVE table tweets        
        filtered_data="".join(list(filter(lambda x: x!="@" and x!="'" and x!='"' \
                                          and x!="\n" and x!='{' and x!='}' and x!='(' and x!=')',text)))
        print(filtered_data)
        spark.sql("INSERT INTO tweets values ('"+created+"','"+filtered_data+"')")


    def on_error(self, status):
        print ('Something wrong with status', status)

    def on_limit(self, status):
        print ('Over the threshold', status)

    def on_timeout(self, status):
        print ('Timeout...')


stream = Stream(auth=auth, listener=MyListener())
stream.filter(track=['stock market'])

'''
Run it:
Ciena CIEN Outpaces Stock Market Gains: What You Should Know 
https://t.co/XFAL2xutM8

RT AnnaApp91838450: BREAKING: Stock Market Soars After Trump Speech To The Nation 
https://t.co/0F9KOlHiesEat
 Your Heart Out CorruptDemoc…
Veeva Systems VEEV Outpaces Stock Market Gains: What You Should Know 
https://t.co/871ROpHNpT

RandyHedgehog Dolan__Ryan CaptainCons We are at Peace. Look at the stock market reaction. No casualties and we… 
https://t.co/JEfDOxkHI4

Innoviva INVA Outpaces Stock Market Gains: What You Should Know 
https://t.co/EIN4xgGC0y

Lrihendry ROHLL5 PRESIDENT TRUMP has been in OFFICE for 3 years and these five things are true. Lowest Unemploym… 
https://t.co/ht8cAE7Ifp
 …



'''

```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://george-jen.gitbook.io/data-science-and-apache-spark/spark-streaming-use-case-with-python.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
