| | from fastapi import FastAPI, HTTPException |
| | from pydantic import BaseModel |
| | import pandas as pd |
| | import numpy as np |
| | import tensorflow as tf |
| | from yahoo_fin.stock_info import get_data |
| | from sklearn.preprocessing import MinMaxScaler |
| | from transformers import AutoTokenizer, AutoModelForSequenceClassification |
| | from pytorch_forecasting import TemporalFusionTransformer |
| | from bs4 import BeautifulSoup |
| | import requests |
| | from dotenv import load_dotenv |
| | import os |
| | from fastapi.middleware.cors import CORSMiddleware |
| |
|
| | os.environ["CUDA_VISIBLE_DEVICES"] = "-1" |
| |
|
| | import torch |
| |
|
| | device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') |
| |
|
| | MODEL_PATH = "lib/20_lstm_model.h5" |
| | model = tf.keras.models.load_model(MODEL_PATH) |
| |
|
| | model_name_news = "mrm8488/distilroberta-finetuned-financial-news-sentiment-analysis" |
| | tokenizer = AutoTokenizer.from_pretrained(model_name_news) |
| | sentiment_model = AutoModelForSequenceClassification.from_pretrained( |
| | model_name_news).to(device) |
| |
|
| | best_model_path = 'lib/tft_pred.ckpt' |
| |
|
| | best_tft = TemporalFusionTransformer.load_from_checkpoint(best_model_path).to(device) |
| |
|
| | app = FastAPI() |
| |
|
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_credentials=True, |
| | allow_methods=["GET", "POST", "PUT", "DELETE"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| |
|
| | class TickerRequest(BaseModel): |
| | ticker: str |
| | start_date: str |
| | end_date: str |
| | interval: str = "1d" |
| |
|
| | def fetch_and_process_ticker_data(ticker, start_date, end_date, interval="1d"): |
| | df = pd.DataFrame() |
| | try: |
| | temp = get_data(ticker, start_date=start_date, |
| | end_date=end_date, index_as_date=True, interval=interval) |
| | temp = temp.drop(columns="close") |
| | temp["revenue"] = temp["adjclose"] * temp["volume"] |
| | temp["daily_profit"] = temp["adjclose"] - temp["open"] |
| | df = pd.concat([df, temp], axis=0) |
| |
|
| | except Exception as error: |
| | raise HTTPException( |
| | status_code=500, detail=f"Error processing ticker {ticker}: {error}") |
| | return df |
| |
|
| |
|
| | def ticker_encoded(df): |
| | label_map = {'ATOM': 0, 'HBIO': 1, 'IBEX': 2, 'MYFW': 3, 'NATH': 4} |
| |
|
| | ticker_encoded = [] |
| |
|
| | for i in df.iloc(): |
| |
|
| | ticker_name = i['ticker'] |
| |
|
| | encoded_ticker = label_map[ticker_name] |
| |
|
| | ticker_encoded.append(encoded_ticker) |
| | df['ticker_encoded'] = ticker_encoded |
| |
|
| | return df |
| |
|
| |
|
| | def normalize(df): |
| | price_scaler = MinMaxScaler() |
| | volume_revenue_scaler = MinMaxScaler() |
| | profit_scaler = MinMaxScaler() |
| |
|
| | df[["open", "high", "low", "adjclose"]] = price_scaler.fit_transform( |
| | df[["open", "high", "low", "adjclose"]]) |
| | df[["volume", "revenue"]] = volume_revenue_scaler.fit_transform( |
| | df[["volume", "revenue"]]) |
| | df[["daily_profit"]] = profit_scaler.fit_transform(df[["daily_profit"]]) |
| |
|
| | return df, price_scaler |
| |
|
| |
|
| | def create_sequence(dataset): |
| | sequences = [] |
| | labels = [] |
| | dates = [] |
| | stock = [] |
| |
|
| | df_copy = dataset.drop(columns=["date"]) |
| |
|
| | start_idx = 0 |
| | for stop_idx in range(20, len(dataset)): |
| | set_ = set(dataset.iloc[start_idx:stop_idx]["ticker_encoded"].values) |
| | target_day_ticker_class = dataset.iloc[stop_idx]["ticker_encoded"] |
| |
|
| | if len(set_) == 1 and target_day_ticker_class == list(set_)[0]: |
| | sequences.append(df_copy.iloc[start_idx:stop_idx].values) |
| | labels.append(df_copy.iloc[stop_idx][["open", "adjclose"]]) |
| | date_string = dataset.iloc[stop_idx]["date"].strftime('%Y-%m-%d') |
| | dates.append(date_string) |
| | stock.append(str(dataset.iloc[stop_idx]["ticker_encoded"])) |
| |
|
| | start_idx += 1 |
| |
|
| | return np.array(sequences), np.array(labels), dates, stock |
| |
|
| |
|
| | def scaling_predictions(price_scaler, combined_dataset_prediction): |
| |
|
| | price_scaler.min_ = np.array([price_scaler.min_[0], price_scaler.min_[3]]) |
| |
|
| | price_scaler.scale_ = np.array( |
| | [price_scaler.scale_[0], price_scaler.scale_[3]]) |
| |
|
| | combined_dataset_prediction_inverse = price_scaler.inverse_transform( |
| | combined_dataset_prediction) |
| |
|
| | return combined_dataset_prediction_inverse |
| |
|
| |
|
| | def storing_predictions(df, dates, stock, combined_dataset_prediction_inverse): |
| |
|
| | df['pred_open'] = np.nan |
| |
|
| | df['pred_closing'] = np.nan |
| |
|
| | for idx, row in df.iterrows(): |
| |
|
| | current_row_date = row.date.strftime('%Y-%m-%d') |
| |
|
| | current_row_ticker = str(row.ticker_encoded) |
| |
|
| | for i in range(len(dates)): |
| |
|
| | if current_row_date == dates[i] and stock[i] == current_row_ticker: |
| |
|
| | opening_price = combined_dataset_prediction_inverse[i][0] |
| | closing_price = combined_dataset_prediction_inverse[i][1] |
| | df.loc[idx, 'pred_open'] = opening_price |
| | df.loc[idx, 'pred_closing'] = closing_price |
| |
|
| | break |
| | df = df.dropna(subset=['pred_open', 'pred_closing']).reset_index(drop=True) |
| |
|
| | return df |
| |
|
| |
|
| | def scrape_news(ticker_name): |
| |
|
| | columns = ['datatime', 'title', 'source', |
| | 'link', 'top_sentiment', 'sentiment_score'] |
| | df = pd.DataFrame(columns=columns) |
| |
|
| | for i in range(1, 3): |
| |
|
| | url = f'https://markets.businessinsider.com/news/{ticker_name}-stock?p={i}' |
| | response = requests.get(url) |
| | html = response.text |
| | soup = BeautifulSoup(html, 'lxml') |
| |
|
| | articles = soup.find_all('div', class_='latest-news__story') |
| |
|
| | for article in articles: |
| | datatime = article.find( |
| | 'time', class_='latest-news__date').get('datetime') |
| |
|
| | title = article.find('a', class_='news-link').text |
| |
|
| | source = article.find('span', class_='latest-news__source').text |
| |
|
| | link = article.find('a', class_='news-link').get('href') |
| |
|
| | top_sentiment = '' |
| |
|
| | sentiment_score = 0 |
| |
|
| | temp = pd.DataFrame( |
| | [[datatime, title, source, link, top_sentiment, sentiment_score]], columns=df.columns) |
| |
|
| | df = pd.concat([temp, df], axis=0) |
| |
|
| | return df |
| |
|
| |
|
| | def add_recent_news(main_df, news_df, lookback_days=10): |
| |
|
| | news_df.drop(columns=['top_sentiment', 'sentiment_score'], inplace=True) |
| |
|
| | main_df['date'] = pd.to_datetime(main_df['date']) |
| | news_df['datatime'] = pd.to_datetime(news_df['datatime']) |
| |
|
| | news_list = [] |
| | last_available_news = '' |
| |
|
| | for _, row in main_df.iterrows(): |
| | current_date = row['date'] |
| | current_ticker = row['ticker'] |
| | news_articles = '' |
| |
|
| | for _, news_row in news_df.iterrows(): |
| | extracted_date = news_row['datatime'] |
| |
|
| | if (current_date - extracted_date).days <= lookback_days and extracted_date < current_date: |
| | news_articles += news_row['title'] + " " |
| |
|
| | if not news_articles.strip(): |
| | for _, news_row in news_df[::-1].iterrows(): |
| | if news_row['datatime'] < current_date: |
| | news_articles = news_row['title'] |
| | break |
| |
|
| | last_available_news = news_articles.strip() or last_available_news |
| | news_list.append(last_available_news) |
| |
|
| | main_df['news'] = news_list |
| |
|
| | return main_df |
| |
|
| |
|
| | def news_sentiment(df): |
| |
|
| | news_column_name = 'news' |
| | texts = df[news_column_name].tolist() |
| |
|
| | inputs = tokenizer(texts, padding=True, |
| | truncation=True, return_tensors="pt") |
| | inputs = {key: val.to(device) for key, val in inputs.items()} |
| |
|
| |
|
| | with torch.no_grad(): |
| | outputs = sentiment_model(**inputs) |
| |
|
| | logits = outputs.logits |
| | probs = torch.softmax(logits, dim=-1) |
| |
|
| | labels = ["negative", "neutral", "positive"] |
| |
|
| | predictions = torch.argmax(probs, dim=-1) |
| |
|
| | df['predicted_sentiment'] = pd.Series( |
| | [labels[pred] for pred in predictions], index=df[df[news_column_name].notna()].index) |
| |
|
| | sentiment_map = { |
| | 'positive': 1, |
| | 'neutral': 0, |
| | 'negative': -1 |
| | } |
| |
|
| | df['sentiment_score'] = df['predicted_sentiment'].map(sentiment_map) |
| |
|
| | df = df.drop(columns=['news']) |
| |
|
| | return df |
| |
|
| |
|
| | def get_tft_predictions(df): |
| | for i in range(1, 21): |
| | df[f'open_lag_{i}'] = df.groupby('ticker')['open'].shift(i) |
| | df[f'adjclose_lag_{i}'] = df.groupby('ticker')['adjclose'].shift(i) |
| |
|
| | lag_columns = [f'open_lag_{i}' for i in range( |
| | 1, 21)] + [f'adjclose_lag_{i}' for i in range(1, 21)] |
| |
|
| | df.dropna(subset=lag_columns, inplace=True) |
| |
|
| | predictions = best_tft.predict(df.to(device), mode="quantiles") |
| |
|
| | return predictions |
| |
|
| |
|
| | @app.get("/fetch-ticker-data/{ticker_name}/{start_date}/{end_date}/{interval}") |
| | async def fetch_ticker_data(ticker_name: str, start_date: str, end_date: str, interval: str): |
| | try: |
| | result_df = fetch_and_process_ticker_data( |
| | ticker=ticker_name, |
| | start_date=start_date, |
| | end_date=end_date, |
| | interval=interval |
| | ) |
| | return result_df.to_dict(orient="records") |
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| |
|
| | @app.get("/predict-prices/{ticker_name}/{start_date}/{end_date}/{interval}") |
| | async def predict_prices(ticker_name: str, start_date: str, end_date: str, interval: str): |
| | try: |
| | result_df = fetch_and_process_ticker_data( |
| | ticker=ticker_name, |
| | start_date=start_date, |
| | end_date=end_date, |
| | interval=interval |
| | ) |
| |
|
| | raw_data = result_df.tail(60) |
| | raw_data = raw_data.reset_index() |
| |
|
| | raw_data.rename(columns={"index": "date"}, inplace=True) |
| | raw_data = ticker_encoded(raw_data) |
| |
|
| | temp_df = raw_data.copy() |
| |
|
| | normalized_data, scaler = normalize(raw_data) |
| | normalized_data = normalized_data.drop(columns=['ticker']) |
| |
|
| | sequences, _, dates, stock = create_sequence(normalized_data) |
| | combined_dataset_prediction = model.predict(sequences) |
| | combined_dataset_prediction_inverse = scaling_predictions( |
| | scaler, combined_dataset_prediction) |
| |
|
| | lstm_pred_df = storing_predictions( |
| | temp_df, dates, stock, combined_dataset_prediction_inverse) |
| | news_df = scrape_news(ticker_name=ticker_name) |
| |
|
| | combined_with_news_df = add_recent_news(lstm_pred_df, news_df) |
| | sentiment_df = news_sentiment(combined_with_news_df) |
| |
|
| | sentiment_df['time_idx'] = range(1, len(sentiment_df) + 1) |
| |
|
| | predicted_values = get_tft_predictions(sentiment_df) |
| |
|
| | final_pred_open_price = predicted_values[0].item() |
| | final_pred_closing_price = predicted_values[1].item() |
| |
|
| | return {"open": final_pred_open_price, 'close': final_pred_closing_price} |
| |
|
| | except Exception as e: |
| | raise HTTPException(status_code=500, detail=str(e)) |
| |
|
| |
|
| |
|
| |
|