-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathlive_chart_emulation.py
More file actions
52 lines (40 loc) Β· 1.61 KB
/
live_chart_emulation.py
File metadata and controls
52 lines (40 loc) Β· 1.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import polars as pl
import time
from chart_engine import Chart
def run_test():
# 1. Load data
print("π Loading data/1d.parquet...")
df = pl.read_parquet("data/1d.parquet")
print(f"β
Loaded {len(df)} rows.")
# 2. Initialize Chart
print("π Initializing Chart Engine...")
chart = Chart(title="Live Stream Emulation")
# 3. Create Series
series = chart.create_candlestick_series(name="SOLUSDT")
# 4. Set Initial Data (First 1000 bars)
initial_bars = 1000
print(f"π Setting initial {initial_bars} bars...")
series.set_data(df.head(initial_bars))
# 5. Emulate Live Stream
print("β± Starting live stream emulation...")
stream_df = df.slice(initial_bars)
try:
for i, row in enumerate(stream_df.iter_rows(named=True)):
# Update the chart series with the new bar
series.update(row)
# Sync the paper trader's internal price with the close of the new bar
chart.trader_update_price(row['close'])
if i % 10 == 0:
print(f"π‘ Streamed {i} updates... Last Price: {row['close']:.2f}")
# Artificial delay to mimic live updates
time.sleep(0.05)
# Exit if window closed (hypothetical check)
if i > 500: # Limit test to 500 bars for demonstration
break
except KeyboardInterrupt:
print("π Stream interrupted by user.")
finally:
chart.exit()
print("β¨ Test complete.")
if __name__ == "__main__":
run_test()