Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions docs/docs/exosphere/triggers.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,15 @@ Define triggers in your graph template:
{
"type": "CRON",
"value": {
"expression": "0 9 * * 1-5"
"expression": "0 9 * * 1-5",
"timezone": "America/New_York"
}
},
{
"type": "CRON",
"value": {
"expression": "0 0 * * 0"
"expression": "0 0 * * 0",
"timezone": "UTC"
}
}
],
Expand All @@ -77,6 +79,8 @@ Define triggers in your graph template:
}
```

**Note:** The `timezone` field is optional and defaults to `"UTC"` if not specified. Use IANA timezone names (e.g., `"America/New_York"`, `"Europe/London"`, `"Asia/Tokyo"`).

### Python SDK Example

```python
Expand Down Expand Up @@ -109,8 +113,8 @@ async def create_scheduled_graph():

# Define triggers for automatic execution
triggers = [
CronTrigger(expression="0 2 * * *"), # Daily at 2:00 AM
CronTrigger(expression="0 */4 * * *") # Every 4 hours
CronTrigger(expression="0 2 * * *", timezone="America/New_York"), # Daily at 2:00 AM EST/EDT
CronTrigger(expression="0 */4 * * *", timezone="UTC") # Every 4 hours UTC
]

# Create the graph with triggers
Expand Down Expand Up @@ -158,7 +162,7 @@ asyncio.run(create_scheduled_graph())

1. **Avoid Peak Times**: Schedule resource-intensive workflows during off-peak hours
2. **Stagger Executions**: If you have multiple graphs, stagger their execution times
3. **Consider Time Zones**: Cron expressions use server time (UTC by default)
3. **Consider Time Zones**: Specify the `timezone` parameter to ensure your cron expressions run at the correct local time. If not specified, defaults to UTC.
4. **Resource Planning**: Ensure your infrastructure can handle scheduled workloads

### Error Handling
Expand Down Expand Up @@ -191,11 +195,31 @@ result = await state_manager.upsert_graph(
)
```

## Timezone Support

Triggers now support specifying a timezone for cron expressions, allowing you to schedule jobs in your local timezone:

```python
# Schedule a report to run at 9 AM New York time (handles DST automatically)
CronTrigger(expression="0 9 * * 1-5", timezone="America/New_York")

# Schedule a job at 5 PM London time
CronTrigger(expression="0 17 * * *", timezone="Europe/London")

# Schedule using UTC (default)
CronTrigger(expression="0 12 * * *", timezone="UTC")
```

**Important Notes:**
- Use IANA timezone names (e.g., `"America/New_York"`, `"Europe/London"`, `"Asia/Tokyo"`)
- Timezones automatically handle Daylight Saving Time (DST) transitions
- If no timezone is specified, defaults to `"UTC"`
- All trigger times are internally stored in UTC for consistency

## Limitations

- **CRON Only**: Currently only cron-based scheduling is supported
- **No Manual Override**: Scheduled executions cannot be manually cancelled once triggered
- **Time Zone**: All cron expressions are evaluated in server time (UTC)
- **Minimum Interval**: Avoid scheduling more frequently than every minute

## Next Steps
Expand Down
2 changes: 1 addition & 1 deletion python-sdk/exospherehost/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "0.0.3b1"
version = "0.0.3b2"
3 changes: 2 additions & 1 deletion python-sdk/exospherehost/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,5 @@ def validate_default_values(cls, v: dict[str, str]) -> dict[str, str]:
return normalized_dict

class CronTrigger(BaseModel):
expression: str = Field(..., description="Cron expression for scheduling automatic graph execution. Uses standard 5-field format: minute hour day-of-month month day-of-week. Example: '0 9 * * 1-5' for weekdays at 9 AM.")
expression: str = Field(..., description="Cron expression for scheduling automatic graph execution. Uses standard 5-field format: minute hour day-of-month month day-of-week. Example: '0 9 * * 1-5' for weekdays at 9 AM.")
timezone: str = Field(default="UTC", description="Timezone for the cron expression (e.g., 'America/New_York', 'Europe/London', 'UTC'). Defaults to 'UTC'.")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add validation for timezone in SDK also, allowing to detect failures early. We can also take this as a separate PR and issue.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to create an issue if we are moving this to different PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

3 changes: 2 additions & 1 deletion python-sdk/exospherehost/statemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ async def upsert_graph(self, graph_name: str, graph_nodes: list[GraphNodeModel],
{
"type": "CRON",
"value": {
"expression": trigger.expression
"expression": trigger.expression,
"timezone": trigger.timezone
}
}
for trigger in triggers
Expand Down
2 changes: 1 addition & 1 deletion state-manager/app/controller/upsert_graph_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async def upsert_graph_template(namespace_name: str, graph_name: str, body: Upse
DatabaseTriggers.graph_name == graph_name,
DatabaseTriggers.trigger_status == TriggerStatusEnum.PENDING,
DatabaseTriggers.type == TriggerTypeEnum.CRON,
In(DatabaseTriggers.expression, [trigger.value["expression"] for trigger in old_triggers if trigger.type == TriggerTypeEnum.CRON])
In(DatabaseTriggers.expression, [trigger.value["expression"] for trigger in old_triggers if trigger.value.type == TriggerTypeEnum.CRON])
).delete_many()

background_tasks.add_task(verify_graph, graph_template)
Expand Down
1 change: 1 addition & 0 deletions state-manager/app/models/db/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
class DatabaseTriggers(Document):
type: TriggerTypeEnum = Field(..., description="Type of the trigger")
expression: Optional[str] = Field(default=None, description="Expression of the trigger")
timezone: Optional[str] = Field(default="UTC", description="Timezone for the trigger")
Comment thread
NiveditJain marked this conversation as resolved.
graph_name: str = Field(..., description="Name of the graph")
namespace: str = Field(..., description="Namespace of the graph")
trigger_time: datetime = Field(..., description="Trigger time of the trigger")
Expand Down
40 changes: 28 additions & 12 deletions state-manager/app/models/trigger_models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from pydantic import BaseModel, Field, field_validator, model_validator
from pydantic import BaseModel, Field, field_validator
from enum import Enum
from croniter import croniter
from typing import Self
from typing import Union, Annotated, Literal
from zoneinfo import available_timezones

# Cache available timezones at module level to avoid repeated filesystem queries
_AVAILABLE_TIMEZONES = available_timezones()

class TriggerTypeEnum(str, Enum):
CRON = "CRON"
Expand All @@ -14,7 +18,9 @@ class TriggerStatusEnum(str, Enum):
TRIGGERING = "TRIGGERING"

class CronTrigger(BaseModel):
type: Literal[TriggerTypeEnum.CRON] = Field(default=TriggerTypeEnum.CRON, description="Type of the trigger")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type is added again here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tells Pydantic: "Look at the type field to determine which union variant to instantiate." Without type in CronTrigger, Pydantic won't know how to deserialize the data. I have removed type from Trigger instead and refactored the code to access it via trigger.value.type

expression: str = Field(..., description="Cron expression for the trigger")
timezone: str = Field(default="UTC", description="Timezone for the cron expression (e.g., 'America/New_York', 'Europe/London', 'UTC')")

@field_validator("expression")
@classmethod
Expand All @@ -23,14 +29,24 @@ def validate_expression(cls, v: str) -> str:
raise ValueError("Invalid cron expression")
return v

@field_validator("timezone")
@classmethod
def validate_timezone(cls, v: str) -> str:
if v not in _AVAILABLE_TIMEZONES:
raise ValueError(f"Invalid timezone: {v}. Must be a valid IANA timezone (e.g., 'America/New_York', 'Europe/London', 'UTC')")
return v

# Union type for all trigger types - add new trigger types here
TriggerValue = Annotated[Union[CronTrigger], Field(discriminator="type")]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets suppose we have 2 triggers with identical values, I want to test how this approach will work.

Copy link
Copy Markdown
Contributor Author

@spa-raj spa-raj Oct 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added unit tests for this


class Trigger(BaseModel):
type: TriggerTypeEnum = Field(..., description="Type of the trigger")
value: dict = Field(default_factory=dict, description="Value of the trigger")

@model_validator(mode="after")
def validate_trigger(self) -> Self:
if self.type == TriggerTypeEnum.CRON:
CronTrigger.model_validate(self.value)
else:
raise ValueError(f"Unsupported trigger type: {self.type}")
return self
"""
Extensible trigger model using discriminated unions.
To add a new trigger type:
1. Add the enum value to TriggerTypeEnum
2. Create a new trigger class (e.g., WebhookTrigger) with type field
3. Add it to the TriggerValue Union

Note: Access trigger type via trigger.value.type
"""
value: TriggerValue = Field(..., description="Value of the trigger")
19 changes: 17 additions & 2 deletions state-manager/app/tasks/trigger_cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,13 @@
from pymongo import ReturnDocument
from pymongo.errors import DuplicateKeyError
from app.config.settings import get_settings
from zoneinfo import ZoneInfo
import croniter
import asyncio

# Cache UTC timezone at module level to avoid repeated instantiation
UTC = ZoneInfo("UTC")

logger = LogsManager().get_logger()

async def get_due_triggers(cron_time: datetime) -> DatabaseTriggers | None:
Expand Down Expand Up @@ -42,15 +46,26 @@ async def mark_as_failed(trigger: DatabaseTriggers):

async def create_next_triggers(trigger: DatabaseTriggers, cron_time: datetime):
assert trigger.expression is not None
iter = croniter.croniter(trigger.expression, trigger.trigger_time)

# Use the trigger's timezone, defaulting to UTC if not specified
tz = ZoneInfo(trigger.timezone or "UTC")

# Convert trigger_time to the specified timezone for croniter
trigger_time_tz = trigger.trigger_time.replace(tzinfo=UTC).astimezone(tz)
iter = croniter.croniter(trigger.expression, trigger_time_tz)

Comment on lines +55 to 61
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Back-compat: handle missing timezone attribute and avoid ZoneInfo(None).

Tests and older records may not have timezone. Also persist a normalized tz name for reuse later.

-    # Use the trigger's timezone, defaulting to UTC if not specified
-    tz = ZoneInfo(trigger.timezone or "UTC")
+    # Resolve timezone safely for old records/tests without the attribute
+    tz_name = getattr(trigger, "timezone", None) or "UTC"
+    tz = ZoneInfo(tz_name)
 
-    # Convert trigger_time to the specified timezone for croniter
-    trigger_time_tz = trigger.trigger_time.replace(tzinfo=UTC).astimezone(tz)
-    iter = croniter.croniter(trigger.expression, trigger_time_tz)
+    # Convert trigger_time to the specified timezone for croniter
+    trigger_time_tz = trigger.trigger_time.replace(tzinfo=UTC).astimezone(tz)
+    cron_iter = croniter.croniter(trigger.expression, trigger_time_tz)

Also avoids shadowing built-in iter.

🧰 Tools
🪛 GitHub Actions: State Manager Unit Tests

[error] 56-56: pytest failed with AttributeError: Mock object has no attribute 'timezone' during create_next_triggers; the test mocks for DatabaseTriggers without providing a timezone attribute on the trigger mock. Command: uv run pytest tests/ --cov=app --cov-report=xml --cov-report=term-missing --cov-report=html -v --junitxml=full-pytest-report.xml

🤖 Prompt for AI Agents
In state-manager/app/tasks/trigger_cron.py around lines 55–61, avoid calling
ZoneInfo(None) by reading the timezone safely (e.g. tz_name = getattr(trigger,
"timezone", None) or trigger.timezone) and defaulting tz_name to "UTC" if falsy,
then create ZoneInfo(tz_name); persist the normalized tz_name back onto the
trigger record for future reuse; compute trigger_time_tz by replacing UTC on
trigger.trigger_time and astimezone(tz) as before; and stop shadowing the
built-in iter by renaming the croniter instance variable (e.g. cron_iter).

while True:
next_trigger_time = iter.get_next(datetime)
# Get next trigger time in the specified timezone
next_trigger_time_tz = iter.get_next(datetime)

# Convert back to UTC for storage
next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None)

try:
await DatabaseTriggers(
type=TriggerTypeEnum.CRON,
expression=trigger.expression,
timezone=trigger.timezone,
graph_name=trigger.graph_name,
namespace=trigger.namespace,
trigger_time=next_trigger_time,
Expand Down
36 changes: 28 additions & 8 deletions state-manager/app/tasks/verify_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from datetime import datetime
from json_schema_to_pydantic import create_model
from zoneinfo import ZoneInfo

from app.models.db.graph_template_model import GraphTemplate
from app.models.graph_template_validation_status import GraphTemplateValidationStatus
Expand All @@ -11,6 +12,9 @@
from app.models.trigger_models import TriggerStatusEnum, TriggerTypeEnum
from app.models.db.trigger import DatabaseTriggers

# Cache UTC timezone at module level to avoid repeated instantiation
UTC = ZoneInfo("UTC")

logger = LogsManager().get_logger()

async def verify_node_exists(graph_template: GraphTemplate, registered_nodes: list[RegisteredNode]) -> list[str]:
Expand Down Expand Up @@ -101,20 +105,36 @@ async def verify_inputs(graph_template: GraphTemplate, registered_nodes: list[Re
return errors

async def create_crons(graph_template: GraphTemplate):
expressions_to_create = set([trigger.value["expression"] for trigger in graph_template.triggers if trigger.type == TriggerTypeEnum.CRON])
# Build a map of (expression, timezone) -> CronTrigger for deduplication
triggers_to_create = {}
for trigger in graph_template.triggers:
if trigger.value.type == TriggerTypeEnum.CRON:
# trigger.value is already a validated CronTrigger instance
cron_trigger = trigger.value
triggers_to_create[(cron_trigger.expression, cron_trigger.timezone)] = cron_trigger

current_time = datetime.now(UTC).replace(tzinfo=None)

current_time = datetime.now()

new_db_triggers = []
for expression in expressions_to_create:
iter = croniter.croniter(expression, current_time)
for (expression, timezone), cron_trigger in triggers_to_create.items():
# Use the validated timezone (guaranteed to be valid IANA timezone, never None)
tz = ZoneInfo(timezone)

# Get current time in the specified timezone
current_time_tz = current_time.replace(tzinfo=UTC).astimezone(tz)
iter = croniter.croniter(expression, current_time_tz)

# Get next trigger time in the specified timezone
next_trigger_time_tz = iter.get_next(datetime)

# Convert back to UTC for storage (remove timezone info for storage)
next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None)
Comment on lines +127 to +135
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Name nit + consistency for expires_at.

Avoid shadowing iter and consider UTC-aware expires_at like in mark functions.

-    current_time_tz = current_time.replace(tzinfo=UTC).astimezone(tz)
-    iter = croniter.croniter(expression, current_time_tz)
+    current_time_tz = current_time.replace(tzinfo=UTC).astimezone(tz)
+    cron_iter = croniter.croniter(expression, current_time_tz)
@@
-    next_trigger_time_tz = iter.get_next(datetime)
+    next_trigger_time_tz = cron_iter.get_next(datetime)
@@
-    next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None)
-    expires_at = next_trigger_time + timedelta(hours=settings.trigger_retention_hours)
+    next_trigger_time = next_trigger_time_tz.astimezone(UTC).replace(tzinfo=None)
+    expires_at = next_trigger_time.replace(tzinfo=timezone.utc) + timedelta(hours=settings.trigger_retention_hours)

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In state-manager/app/tasks/verify_graph.py around lines 127 to 135, avoid
shadowing the built-in-like name "iter" by renaming it (e.g., cron_iter) and
keep the computed expiry timestamp UTC-aware to match other mark functions:
compute next_trigger_time_tz from cron_iter.get_next(datetime) then convert to
UTC but do NOT strip tzinfo (i.e., set tzinfo=UTC or use astimezone(UTC) and
keep tzinfo), and assign that UTC-aware datetime to expires_at (or
next_trigger_time) for consistent storage and comparisons.


next_trigger_time = iter.get_next(datetime)

new_db_triggers.append(
DatabaseTriggers(
type=TriggerTypeEnum.CRON,
expression=expression,
expression=cron_trigger.expression,
timezone=cron_trigger.timezone,
graph_name=graph_template.name,
namespace=graph_template.namespace,
trigger_status=TriggerStatusEnum.PENDING,
Expand Down
Loading
Loading