Skip to content

Commit 6a8d889

Browse files
committed
fix: bound repricing waits and exit after submit cycle
1 parent 96b6731 commit 6a8d889

2 files changed

Lines changed: 203 additions & 41 deletions

File tree

tests/test_portfolio_manager.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from types import SimpleNamespace
23

34
import pytest
@@ -230,6 +231,147 @@ async def fake_run_post_stages(deps, _account_summary, _portfolio_positions):
230231
]
231232
pm.get_portfolio_positions.assert_awaited_once()
232233

234+
@pytest.mark.asyncio
235+
async def test_manage_continues_if_order_submission_wait_times_out(
236+
self, mock_ib, mock_config, mocker
237+
):
238+
completion_future = mocker.Mock()
239+
pm = PortfolioManager(
240+
mock_config,
241+
mock_ib,
242+
completion_future,
243+
dry_run=False,
244+
run_stage_order=["equity_buy_rebalance"],
245+
)
246+
247+
pm.initialize_account = mocker.Mock()
248+
pm.summarize_account = mocker.AsyncMock(return_value=({}, {}))
249+
pm.get_portfolio_positions = mocker.AsyncMock(return_value={})
250+
pm.orders.print_summary = mocker.Mock()
251+
pm.submit_orders = mocker.Mock()
252+
pm.adjust_prices = mocker.AsyncMock()
253+
pm.trades = mocker.Mock()
254+
pm.trades.records = mocker.Mock(return_value=[mocker.Mock()])
255+
256+
mocker.patch(
257+
"thetagang.portfolio_manager.run_equity_rebalance_stages",
258+
new=mocker.AsyncMock(),
259+
)
260+
pm.ibkr.wait_for_submitting_orders = mocker.AsyncMock(
261+
side_effect=RuntimeError("timed out")
262+
)
263+
264+
await pm.manage()
265+
266+
pm.submit_orders.assert_called_once()
267+
assert pm.ibkr.wait_for_submitting_orders.await_count == 2
268+
pm.adjust_prices.assert_awaited_once()
269+
270+
@pytest.mark.asyncio
271+
async def test_manage_allows_incomplete_working_orders(
272+
self, mock_ib, mock_config, mocker
273+
):
274+
completion_future = mocker.Mock()
275+
pm = PortfolioManager(
276+
mock_config,
277+
mock_ib,
278+
completion_future,
279+
dry_run=False,
280+
run_stage_order=["equity_buy_rebalance"],
281+
)
282+
283+
pm.initialize_account = mocker.Mock()
284+
pm.summarize_account = mocker.AsyncMock(return_value=({}, {}))
285+
pm.get_portfolio_positions = mocker.AsyncMock(return_value={})
286+
pm.orders.print_summary = mocker.Mock()
287+
pm.submit_orders = mocker.Mock()
288+
pm.adjust_prices = mocker.AsyncMock()
289+
290+
trade = mocker.Mock()
291+
trade.contract = mocker.Mock(symbol="SPY")
292+
trade.order = mocker.Mock(orderId=123)
293+
trade.orderStatus = mocker.Mock(status="Submitted", filled=0.0, remaining=1.0)
294+
trade.isDone.return_value = False
295+
296+
pm.trades = mocker.Mock()
297+
pm.trades.records = mocker.Mock(return_value=[trade])
298+
299+
mocker.patch(
300+
"thetagang.portfolio_manager.run_equity_rebalance_stages",
301+
new=mocker.AsyncMock(),
302+
)
303+
pm.ibkr.wait_for_submitting_orders = mocker.AsyncMock(return_value=None)
304+
305+
await pm.manage()
306+
307+
@pytest.mark.asyncio
308+
async def test_adjust_prices_continues_if_midpoint_market_data_missing(
309+
self, portfolio_manager, mocker
310+
):
311+
from thetagang.ibkr import RequiredFieldValidationError
312+
313+
portfolio_manager.config.runtime.orders.price_update_delay = (1, 2)
314+
portfolio_manager.config.runtime.orders.minimum_credit = 0.01
315+
316+
trade = mocker.Mock()
317+
trade.contract = mocker.Mock(symbol="SPY")
318+
trade.contract.symbol = "SPY"
319+
trade.order = mocker.Mock(lmtPrice=1.23, action="SELL", totalQuantity=1)
320+
trade.orderId = 101
321+
trade.contract.secType = "OPT"
322+
trade.orderStatus = mocker.Mock(status="Submitted", filled=0.0, remaining=1.0)
323+
trade.isDone.return_value = False
324+
325+
portfolio_manager.trades = mocker.Mock()
326+
portfolio_manager.trades.records = mocker.Mock(return_value=[trade])
327+
portfolio_manager.trades.is_empty = mocker.Mock(return_value=False)
328+
329+
portfolio_manager.config.portfolio.symbols = {
330+
"SPY": mocker.Mock(adjust_price_after_delay=True)
331+
}
332+
portfolio_manager.ibkr.wait_for_orders_complete = mocker.AsyncMock(
333+
return_value=[trade]
334+
)
335+
portfolio_manager.ibkr.get_ticker_for_contract = mocker.AsyncMock(
336+
side_effect=RequiredFieldValidationError("market data unavailable")
337+
)
338+
339+
await portfolio_manager.adjust_prices()
340+
portfolio_manager.ibkr.get_ticker_for_contract.assert_awaited_once()
341+
342+
@pytest.mark.asyncio
343+
async def test_adjust_prices_continues_when_combo_bag_midpoint_times_out(
344+
self, portfolio_manager, mocker
345+
):
346+
portfolio_manager.config.runtime.orders.price_update_delay = (1, 2)
347+
portfolio_manager.config.runtime.orders.minimum_credit = 0.01
348+
349+
trade = mocker.Mock()
350+
trade.contract = mocker.Mock(symbol="QQQ")
351+
trade.contract.symbol = "QQQ"
352+
trade.contract.secType = "BAG"
353+
trade.order = mocker.Mock(lmtPrice=-1.25, action="BUY", totalQuantity=1)
354+
trade.orderStatus = mocker.Mock(status="Submitted", filled=0.0, remaining=1.0)
355+
trade.isDone.return_value = False
356+
357+
portfolio_manager.trades = mocker.Mock()
358+
portfolio_manager.trades.records = mocker.Mock(return_value=[trade])
359+
portfolio_manager.trades.is_empty = mocker.Mock(return_value=False)
360+
361+
portfolio_manager.config.portfolio.symbols = {
362+
"QQQ": mocker.Mock(adjust_price_after_delay=True)
363+
}
364+
portfolio_manager.ibkr.wait_for_orders_complete = mocker.AsyncMock(
365+
return_value=[trade]
366+
)
367+
portfolio_manager.ibkr.get_ticker_for_contract = mocker.AsyncMock(
368+
side_effect=asyncio.TimeoutError()
369+
)
370+
371+
await portfolio_manager.adjust_prices()
372+
373+
portfolio_manager.ibkr.get_ticker_for_contract.assert_awaited_once()
374+
233375
@pytest.mark.asyncio
234376
async def test_write_calls_respects_can_write_when_green_with_nan_close(
235377
self, portfolio_manager, mocker

thetagang/portfolio_manager.py

Lines changed: 61 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -759,46 +759,49 @@ async def manage(self) -> None:
759759

760760
try:
761761
await self.ibkr.wait_for_submitting_orders(self.trades.records())
762-
except RuntimeError:
763-
log.error("Submitting orders failed. Continuing anyway..")
764-
pass
762+
except RuntimeError as exc:
763+
# DAY orders can remain working at the broker after submission.
764+
# Keep running and let later status checks/logs report open orders.
765+
log.warning(f"Order submission wait timed out: {exc}")
765766

766767
await self.adjust_prices()
767768

768-
await self.ibkr.wait_for_submitting_orders(self.trades.records())
769-
incomplete_trades = await self.ibkr.wait_for_orders_complete(
770-
self.trades.records()
769+
try:
770+
await self.ibkr.wait_for_submitting_orders(self.trades.records())
771+
except RuntimeError as exc:
772+
log.warning(f"Post-adjust order submission wait timed out: {exc}")
773+
working_statuses = {"PendingSubmit", "PreSubmitted", "Submitted"}
774+
incomplete_trades = [
775+
trade
776+
for trade in self.trades.records()
777+
if trade and not trade.isDone()
778+
]
779+
still_working = [
780+
trade
781+
for trade in incomplete_trades
782+
if getattr(trade.orderStatus, "status", "") in working_statuses
783+
]
784+
unexpected_state = [
785+
trade for trade in incomplete_trades if trade not in still_working
786+
]
787+
open_orders = ", ".join(
788+
f"{trade.contract.symbol} (OrderId: {trade.order.orderId}, status={getattr(trade.orderStatus, 'status', 'UNKNOWN')})"
789+
for trade in still_working
771790
)
772-
if incomplete_trades:
773-
working_statuses = {"PendingSubmit", "PreSubmitted", "Submitted"}
774-
still_working = [
775-
trade
776-
for trade in incomplete_trades
777-
if getattr(trade.orderStatus, "status", "") in working_statuses
778-
]
779-
unexpected_state = [
780-
trade
781-
for trade in incomplete_trades
782-
if trade not in still_working
783-
]
784-
open_orders = ", ".join(
791+
if open_orders:
792+
log.info(
793+
"Run completed with working submitted orders still open at broker: "
794+
f"{open_orders}"
795+
)
796+
if unexpected_state:
797+
unexpected_orders = ", ".join(
785798
f"{trade.contract.symbol} (OrderId: {trade.order.orderId}, status={getattr(trade.orderStatus, 'status', 'UNKNOWN')})"
786-
for trade in still_working
799+
for trade in unexpected_state
800+
)
801+
log.warning(
802+
"Run completed with non-working incomplete orders at broker: "
803+
f"{unexpected_orders}"
787804
)
788-
if open_orders:
789-
log.info(
790-
"Run completed with working submitted orders still open at broker: "
791-
f"{open_orders}"
792-
)
793-
if unexpected_state:
794-
unexpected_orders = ", ".join(
795-
f"{trade.contract.symbol} (OrderId: {trade.order.orderId}, status={getattr(trade.orderStatus, 'status', 'UNKNOWN')})"
796-
for trade in unexpected_state
797-
)
798-
log.warning(
799-
"Run completed with non-working incomplete orders at broker: "
800-
f"{unexpected_orders}"
801-
)
802805

803806
log.info("ThetaGang is done, shutting down! Cya next time. :sparkles:")
804807
except:
@@ -1085,10 +1088,14 @@ async def adjust_prices(self) -> None:
10851088

10861089
for idx, trade in unfilled:
10871090
try:
1088-
ticker = await self.ibkr.get_ticker_for_contract(
1089-
trade.contract,
1090-
required_fields=[TickerField.MIDPOINT],
1091-
optional_fields=[TickerField.MARKET_PRICE],
1091+
# Bound midpoint price requests so repricing never blocks run termination.
1092+
ticker = await asyncio.wait_for(
1093+
self.ibkr.get_ticker_for_contract(
1094+
trade.contract,
1095+
required_fields=[TickerField.MIDPOINT],
1096+
optional_fields=[TickerField.MARKET_PRICE],
1097+
),
1098+
timeout=self.config.runtime.ib_async.api_response_wait_time,
10921099
)
10931100

10941101
(contract, order) = (trade.contract, trade.order)
@@ -1150,10 +1157,23 @@ async def adjust_prices(self) -> None:
11501157
self.trades.submit_order(contract, order, idx)
11511158

11521159
log.info(f"{contract.symbol}: Order updated, order={order}")
1153-
except (RuntimeError, RequiredFieldValidationError):
1154-
log.error(
1155-
f"Couldn't generate midpoint price for {trade.contract}, skipping"
1160+
except (
1161+
asyncio.TimeoutError,
1162+
RuntimeError,
1163+
RequiredFieldValidationError,
1164+
) as exc:
1165+
log.warning(
1166+
f"Couldn't generate midpoint price for {trade.contract}, skipping repricing"
11561167
)
1168+
if self.data_store:
1169+
self.data_store.record_event(
1170+
"order_price_adjustment_skipped",
1171+
{
1172+
"symbol": getattr(trade.contract, "symbol", ""),
1173+
"secType": getattr(trade.contract, "secType", ""),
1174+
"reason": type(exc).__name__,
1175+
},
1176+
)
11571177
continue
11581178

11591179
async def get_write_threshold(

0 commit comments

Comments
 (0)