Spaces:
Runtime error
Runtime error
| """FastAPI endpoint | |
| To run locally use 'uvicorn app:app --host localhost --port 7860' | |
| or | |
| `python -m uvicorn app:app --reload --host localhost --port 7860` | |
| """ | |
| import datetime as dt | |
| import json | |
| import logging | |
| import sys | |
| import spacy | |
| # sys.setrecursionlimit(20000) | |
| import pandas as pd | |
| import numpy as np | |
| import os | |
| import random | |
| from typing import Dict, List | |
| import uvicorn | |
| from fastapi import FastAPI, HTTPException, Request, Response | |
| from fastapi.responses import HTMLResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from rouge_score import rouge_scorer | |
| # Scripts | |
| import scripts.sentiment as sentiment | |
| import scripts.twitter_scraper as ts | |
| from scripts import sentiment | |
| from scripts.summarization import bert_summarization | |
| from scripts.twitter_scraper import get_latest_account_tweets | |
| from scripts.sentiment import twitter_sentiment_api_score | |
| from scripts import twitter_scraper as ts | |
| import scripts.utils as utils | |
| from scripts import translation | |
| from scripts import generative | |
| import nltk | |
| nltk.download('punkt') | |
| punkt_download_location = nltk.data.path[0] | |
| logging.info(f"punkt_download_location: {punkt_download_location}") | |
| logging.basicConfig(level=logging.INFO) | |
| pd.set_option('display.max_colwidth', 20) | |
| app = FastAPI() | |
| templates = Jinja2Templates(directory="templates") | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # Construct absolute path to models folder | |
| models_path = os.path.abspath("models") | |
| username_list = [ | |
| "alikarimi_ak8", | |
| "elonmusk", | |
| "BarackObama", | |
| "taylorlorenz", | |
| "cathiedwood", | |
| "ylecun", | |
| ] | |
| ## Static objects/paths | |
| start_date = dt.date(year=2023, month=2, day=1) | |
| end_date = dt.date(year=2023, month=3, day=22) | |
| # Load spacy module on app start | |
| nlp = spacy.load("en_core_web_sm") | |
| nlp.add_pipe("sentencizer") | |
| async def webpage(request: Request): | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| async def get_accounts() -> List[dict]: | |
| import pandas as pd | |
| logging.info(f"Pulling account information on {username_list}") | |
| account_info_list = [ | |
| ts.get_twitter_account_info(twitter_handle=account) for account in username_list | |
| ] | |
| df_account = pd.DataFrame(account_info_list) | |
| df_account = df_account.style.bar( | |
| subset=["follower_count", "friends_count"], color="#d65f5f" | |
| ) | |
| df_account = df_account.format( | |
| {"follower_count": "{:,.0f}", "friends_count": "{:,.0f}"} | |
| ) | |
| html_table = df_account.to_html(classes="center", index=False) | |
| return HTMLResponse(content=html_table, status_code=200) | |
| def get_tweets_username(username: str) -> dict: | |
| # Method 2: Use Snscrape | |
| df_tweets = ts.get_tweets(handle=username) | |
| if isinstance(df_tweets, pd.DataFrame): | |
| df_tweets = df_tweets[["handle", "created_at","retweet_count","view_count","like_count", "full_text"]] | |
| df_tweets["created_at"] = df_tweets["created_at"].dt.strftime( | |
| "%Y-%m-%d %H:%M:%S" | |
| ) | |
| df_tweets = df_tweets.sort_values("created_at", ascending=False) | |
| # Additional processing | |
| logging.info("Running sentiment on tweets") | |
| sentiments = twitter_sentiment_api_score( | |
| df_tweets['full_text'].to_list(), use_api=False | |
| ) | |
| df_tweets["sentiment"] = [s['argmax'] for s in sentiments] | |
| # if username == "alikarimi_ak8": | |
| # p = translation.PersianTextProcessor() | |
| # df_tweets['full_text_translated'] = df_tweets["full_text"].apply(lambda c: p.translate_text(persian_text = c)) | |
| df_tweets_html = df_tweets.to_html(classes="center", index=False, escape=False) | |
| df_tweets.to_html(open("df_tweets_html.html", "w")) | |
| df_tweets_data = df_tweets.to_dict(orient="records") | |
| response_data = {"html": df_tweets_html, "data": df_tweets_data} | |
| return JSONResponse(content=response_data, status_code=200) | |
| else: | |
| print("Error: Failed to retrieve tweets.") | |
| return df_tweets | |
| async def get_audience(username: str) -> dict: | |
| if username in username_list: | |
| query = f"from:{username} since:{start_date} until:{end_date}" | |
| tweets = ts.get_tweets(query=query) | |
| n_samples = 5 | |
| # Random sample 3 tweets from user | |
| tweets_sampled = random.sample(tweets, n_samples) | |
| # Get all replies to sampled tweets | |
| tweet_threads = [] | |
| for tweet in tweets_sampled: | |
| threads = ts.get_replies( | |
| username=tweet["username"], | |
| conversation_id=tweet["conversation_id"], | |
| max_tweets=100, | |
| ) | |
| tweet_threads += threads | |
| # Get usernames from sample threads tweets | |
| usernames = [t["username"] for t in tweet_threads] | |
| # Get user info from sample replies to sampled tweets of user | |
| info_accounts = [ | |
| ts.get_twitter_account_info(twitter_handle=account) for account in usernames | |
| ] | |
| # "follower_count":1,"friends_count":20,"verified":false} | |
| # Get stats for followers/audience engaging with tweets | |
| follower_counts = [ | |
| info_accounts[i]["follower_count"] for i in range(len(info_accounts)) | |
| ] | |
| friends_counts = [ | |
| info_accounts[i]["friends_count"] for i in range(len(info_accounts)) | |
| ] | |
| verified_counts = [ | |
| 1 if info_accounts[i]["verified"] == True else 0 | |
| for i in range(len(info_accounts)) | |
| ] | |
| return { | |
| "sample_size": len(info_accounts), | |
| "mean_follower_count": round(np.mean(follower_counts), 3), | |
| "mean_friends_count": round(np.mean(friends_counts), 3), | |
| "mean_verified": round(np.mean(verified_counts), 3), | |
| } | |
| else: | |
| response = Response(content="Account not in scope of project.", status_code=404) | |
| return response | |
| async def get_sentiment(username: str) -> Dict[str, Dict[str, float]]: | |
| if username not in username_list: | |
| raise HTTPException(status_code=404, detail="Account not in scope of project.") | |
| query = f"from:{username} since:{start_date} until:{end_date}" | |
| tweets = ts.get_tweets(query=query) | |
| n_samples = 5 | |
| tweets_sampled = random.sample(tweets, n_samples) | |
| tweet_threads = [] | |
| for tweet in tweets_sampled: | |
| threads = ts.get_replies( | |
| username=tweet["username"], | |
| conversation_id=tweet["conversation_id"], | |
| max_tweets=100, | |
| ) | |
| tweet_threads += threads | |
| print( | |
| f"Total replies to {n_samples} sampled tweets from username: {username}, {len(tweet_threads)}" | |
| ) | |
| ## Sentiment scoring | |
| print(f"Running tweet sentiment scoring on username: {username} tweets") | |
| tweets_scores = sentiment.get_tweets_sentiment(tweets=tweets) | |
| mean_tweets_score = round(np.mean(tweets_scores), 2) | |
| ci_tweets = utils.wilson_score_interval(tweets_scores) | |
| # Get sentiment of the threads from tweets | |
| # Get username tweets sentiment | |
| print(f"Running tweet thread sentiment scoring on username: {username} tweets") | |
| threads_scores = sentiment.get_tweets_sentiment(tweets=tweet_threads) | |
| mean_threads_score = round(np.mean(threads_scores), 2) | |
| ci_threads = utils.wilson_score_interval(threads_scores) | |
| return { | |
| "thread_level": { | |
| "mean": mean_threads_score, | |
| "confidence_interal": ci_threads, | |
| }, | |
| "audience_level": { | |
| "mean": mean_tweets_score, | |
| "confidence_interval": ci_tweets, | |
| }, | |
| } | |
| ## APIs: Primarily called by the index page | |
| async def generate_text(request: Request): | |
| """Generate text from a prompt. | |
| Args: | |
| request: The HTTP request. | |
| Returns: | |
| The generated text. | |
| """ | |
| print("*" * 50) | |
| data = await request.json() | |
| print("*" * 50) | |
| logging.info("POST to api/generate received and processing") | |
| # Check length of input, if it is greater than 10 tokens, the text is sent off to a summarizer to generate: | |
| try: | |
| generated_text = generative.generate_account_text( | |
| prompt=data["text"], model_dir=os.path.join(models_path, data["account"]) | |
| ) | |
| logging.info("INFO: Successfully generate text from model.") | |
| except Exception as e: | |
| logging.error(f"Error generating text: {e}") | |
| return {"error": "Error generating text"} | |
| # return one example | |
| generated_text = generated_text[0]["generated_text"] | |
| ################################################### | |
| ## Clean up generate text | |
| # Get rid of final sentence | |
| sentences = nltk.sent_tokenize(generated_text) | |
| unique_sentences = set() | |
| non_duplicate_sentences = [] | |
| for sentence in sentences: | |
| if sentence not in unique_sentences: | |
| non_duplicate_sentences.append(sentence) | |
| unique_sentences.add(sentence) | |
| final_text = " ".join(non_duplicate_sentences[:-1]) | |
| return {"generated_text": final_text} | |
| async def generate_summary(request: Request): | |
| """Generate summary from tweets | |
| Args: | |
| request: The HTTP request. | |
| Returns: | |
| The generated text. | |
| """ | |
| print("*" * 50) | |
| data = await request.json() | |
| print("data", data["tweetsData"]) | |
| # Get the list of text | |
| tweets = [t["full_text"] for t in data["tweetsData"]] | |
| # Concatenate tweets into a single string | |
| text = " .".join(tweets) | |
| sentences = nlp(text).sents | |
| sentences = list(sentences) | |
| # Option 2 | |
| sampled_sentences = random.sample(sentences, int(0.1 * len(sentences))) | |
| sampled_sentences = [sentiment.tweet_cleaner(s.text) for s in sampled_sentences] | |
| # Join the strings into one text blob | |
| tweet_blob = " ".join(sampled_sentences) | |
| # Generate the summary | |
| summary = bert_summarization(tweet_blob) | |
| print("Summary:", summary) | |
| # Return the summary | |
| return {"tweets_summary": summary} | |
| ## Historical Tweets pages | |
| async def read_examples(): | |
| with open("templates/charts/handle_sentiment_breakdown.html") as f: | |
| html = f.read() | |
| return HTMLResponse(content=html) | |
| async def read_examples(): | |
| with open("templates/charts/handle_sentiment_timesteps.html") as f: | |
| html = f.read() | |
| return HTMLResponse(content=html) | |
| # uvicorn --workers=2 app:app | |
| if __name__ == "__main__": | |
| # uvicorn.run(app, host="0.0.0.0", port=8000) | |
| uvicorn.run("app:app", host="127.0.0.1", port=5050, reload=True) | |