Dacho688
Update app.py
c4db3fe
import threading
import time
from functools import partial
from bokeh.plotting import figure, curdoc
from bokeh.models import ColumnDataSource, LabelSet
from datetime import datetime
import random
import numpy as np
class BokehApp:
def __init__(self, doc):
self.doc = doc
# Create a threading.Event to signal the thread to shut down
self.stop_event = threading.Event()
# Create a sample plot
x = datetime.now()
y = random.uniform(30, 500)
self.source = ColumnDataSource(data=dict(x=[x], y=[y]))
self.label_source = self.source.clone()
p = figure(x_axis_type='datetime',
width = 1800,
height = 900,
x_axis_label="Time",
y_axis_label="Value",
title="Real Time Stock Price Streaming",
)
p.title.align = "center"
p.title.text_font_size="20px"
p.toolbar.autohide = True
p.line(x='x', y='y', source=self.source)
p.scatter(x='x', y='y', source=self.source, size=8)
#Create value Label
label = LabelSet(x = 'x',
y = 'y',
text= 'y',
source = self.label_source,
x_offset=5, y_offset=5,
text_font_size="10pt")
p.add_layout(label)
# Start the background thread
self.thread = threading.Thread(target=self.get_data, args=(self.stop_event,), daemon=True)
self.thread.start()
# Register the cleanup function
self.doc.on_session_destroyed(self.on_session_destroyed)
# Add the plot to the document
self.doc.add_root(p)
def get_data(self, stop_event):
"""
Function to run in a separate thread.
Infinite while loop until stop_event is set to True with on_session_destroyed().
Simulates a long-running data-fetching task.
"""
curr_value = self.source.data['y'][-1]
log_curr_value = np.log(curr_value)
while not stop_event.is_set():
x = datetime.now()
log_random_growth = random.normalvariate(mu=.00001, sigma=.001)
log_y = log_curr_value + log_random_growth
log_curr_value = log_y
y = np.round(np.exp(log_curr_value),2)
new_data = dict(x=[x], y=[y])
# Safely schedule the update using a next tick callback
self.doc.add_next_tick_callback(partial(self.update_plot, new_data))
time.sleep(1)
print("Background thread is shutting down gracefully.", flush=True)
def update_plot(self, new_data):
"""
This function is executed by the next tick callback on the main thread.
It safely updates the ColumnDataSource.
"""
self.source.stream(new_data, rollover=1000)
self.label_source.data = new_data
def on_session_destroyed(self, session_context):
"""
Callback function that runs when a user closes the document.
"""
print(f"Session {session_context.id} destroyed. Stopping background thread.", flush=True)
self.stop_event.set()
# Create an instance of the class when the script is run by the Bokeh server
BokehApp(curdoc())