import re
import json
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
import warnings
warnings.filterwarnings('ignore')
import pyLDAvis
import pyLDAvis.sklearn
pyLDAvis.enable_notebook()
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.decomposition import LatentDirichletAllocation
cands = pd.read_csv('../data/candidates.txt', header=None)
cands = cands[0].to_list()
print(cands)
uri_re = re.compile(r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")
twi_re = re.compile(r"pic.twitter.com/(?:[a-zA-Z]|[0-9])+")
def remove_uri(tweet):
try:
tweet = re.sub(uri_re, "", tweet)
tweet = re.sub(twi_re, "", tweet)
except TypeError:
# print('error in tweet content', tweet)
tweet = ""
return tweet
# return string
def get_recent_tweets_string(name, n=-1):
df_1 = pd.read_json(f"../data/{name}.json", lines=True)
print("%s's %s tweets" % (name, len(df_1['tweet'])))
return "\n".join(list(map(remove_uri, df_1['tweet'][0:n].to_list())))
def get_recent_tweets_list(name, n=-1):
df_1 = pd.read_json(f"../data/{name}.json", lines=True)
print("%s's %s tweets" % (name, len(df_1['tweet'])))
return list(map(remove_uri, df_1['tweet'][0:n].to_list()))
# ==== tokenizer ====
import unicodedata
def _is_whitespace(char):
"""Checks whether `chars` is a whitespace character."""
# \t, \n, and \r are technically contorl characters but we treat them
# as whitespace since they are generally considered as such.
if char == " " or char == "\t" or char == "\n" or char == "\r":
return True
cat = unicodedata.category(char)
if cat == "Zs":
return True
return False
def _is_control(char):
"""Checks whether `chars` is a control character."""
# These are technically control characters but we count them as whitespace
# characters.
if char == "\t" or char == "\n" or char == "\r":
return False
cat = unicodedata.category(char)
if cat.startswith("C"):
return True
return False
def _is_punctuation(char):
"""Checks whether `chars` is a punctuation character."""
cp = ord(char)
# We treat all non-letter/number ASCII as punctuation.
# Characters such as "^", "$", and "`" are not in the Unicode
# Punctuation class but we treat them as punctuation anyways, for
# consistency.
if ((cp >= 33 and cp <= 47) or (cp >= 58 and cp <= 64) or
(cp >= 91 and cp <= 96) or (cp >= 123 and cp <= 126)):
return True
cat = unicodedata.category(char)
if cat.startswith("P"):
return True
return False
def _clean_text(text):
"""Performs invalid character removal and whitespace cleanup on text."""
output = []
for char in text:
cp = ord(char)
if cp == 0 or cp == 0xfffd or _is_control(char):
continue
if _is_whitespace(char):
output.append(" ")
else:
output.append(char)
return "".join(output)
def _run_split_on_punc(text):
"""Splits punctuation on a piece of text."""
chars = list(text)
i = 0
start_new_word = True
output = []
while i < len(chars):
char = chars[i]
if _is_punctuation(char):
output.append([char])
start_new_word = True
else:
if start_new_word:
output.append([])
start_new_word = False
output[-1].append(char)
i += 1
return ["".join(x) for x in output]
def whitespace_tokenize(text):
"""Runs basic whitespace cleaning and splitting on a piece of text."""
text = text.strip()
if not text:
return []
tokens = text.split()
return tokens
def tokenize(text):
"""Tokenizes a piece of text."""
text = _clean_text(text)
orig_tokens = whitespace_tokenize(text)
split_tokens = []
for token in orig_tokens:
token = token.lower()
# token = self._run_strip_accents(token)
split_tokens.extend(_run_split_on_punc(token))
return split_tokens
def lexical_diversity(text):
data = tokenize(text)
word_count = len(data) or 1
vocab_size = len(set(data))
diversity_score = vocab_size / word_count
return [vocab_size, word_count, diversity_score]
# get_recent_tweets_list(cands[4])[10709]
lex = [lexical_diversity(get_recent_tweets_string(cand)) for cand in cands]
# docs_raw = [get_recent_tweets_string(name, 500) for name in cands]
for i, l in enumerate(lex):
print("%s use %s words in %s tokens, diversity(#words/#tokens) is %s \n" % (cands[i], l[0], l[1], l[2]) )
def plot(name, recent=None):
docs_raw = get_recent_tweets_list(name, recent)
tf_vectorizer = CountVectorizer(strip_accents = 'unicode',
stop_words = 'english',
lowercase = True,
token_pattern = r'\b[a-zA-Z]{3,}\b',
max_df = 0.5,
min_df = 10)
dtm_tf = tf_vectorizer.fit_transform(docs_raw)
# print(dtm_tf.shape)
tfidf_vectorizer = TfidfVectorizer(**tf_vectorizer.get_params())
dtm_tfidf = tfidf_vectorizer.fit_transform(docs_raw)
# print(dtm_tfidf.shape)
# for TF DTM
lda_tf = LatentDirichletAllocation(n_components=10, random_state=0)
lda_tf.fit(dtm_tf)
# for TFIDF DTM
lda_tfidf = LatentDirichletAllocation(n_components=10, random_state=0)
lda_tfidf.fit(dtm_tfidf)
return tf_vectorizer, lda_tf, dtm_tf
tf_vectorizer, lda_tf, dtm_tf = plot(cands[16])
pyLDAvis.sklearn.prepare(lda_tf, dtm_tf, tf_vectorizer)
tf_vectorizer, lda_tf, dtm_tf = plot(cands[1])
pyLDAvis.sklearn.prepare(lda_tf, dtm_tf, tf_vectorizer)
tf_vectorizer, lda_tf, dtm_tf = plot(cands[2])
pyLDAvis.sklearn.prepare(lda_tf, dtm_tf, tf_vectorizer)
tf_vectorizer, lda_tf, dtm_tf = plot(cands[3])
pyLDAvis.sklearn.prepare(lda_tf, dtm_tf, tf_vectorizer)
tf_vectorizer, lda_tf, dtm_tf = plot(cands[4])
pyLDAvis.sklearn.prepare(lda_tf, dtm_tf, tf_vectorizer)
tf_vectorizer, lda_tf, dtm_tf = plot(cands[7])
pyLDAvis.sklearn.prepare(lda_tf, dtm_tf, tf_vectorizer)
tf_vectorizer, lda_tf, dtm_tf = plot(cands[11])
pyLDAvis.sklearn.prepare(lda_tf, dtm_tf, tf_vectorizer)
tf_vectorizer, lda_tf, dtm_tf = plot(cands[7])
pyLDAvis.sklearn.prepare(lda_tf, dtm_tf, tf_vectorizer)
tf_vectorizer, lda_tf, dtm_tf = plot(cands[14])
pyLDAvis.sklearn.prepare(lda_tf, dtm_tf, tf_vectorizer)