Any open-source tools or agent frameworks actually worth using here, or better to stay minimal?
Maybe pydantic is enough for this case…?
Design goal
Build a paper-trading-only pipeline that:
- Ingests posts from one X account + one Substack feed
- Uses an LLM only as a parser to extract a small schema (
BUY/SELL, ticker, and for options: call/put, strike, expiry)
- Applies deterministic validation and fail-closed rules
- Executes idempotent paper orders on Alpaca
- Produces an auditable, replayable ledger of what happened and why
This is the same architectural idea that makes payment pipelines reliable: append raw events, derive state, never “wing it”.
Core principle: the LLM is not trusted
Prompt injection is a top risk for LLM apps because models don’t reliably separate “instructions” from “data”. You mitigate this by constraining what the model can do and validating everything deterministically before it can trigger side effects. (OWASP Gen AI Security Project)
So: LLM output is treated as untrusted input, just like the original tweet/newsletter text.
Cleanest architecture
High-level pipeline (durable state machine)
[Poll X] [Poll RSS]
\ /
v v
RAW_ITEM (append-only, deduped)
|
v
PARSE (LLM -> strict JSON)
|
v
VALIDATE (deterministic gates)
|
+--> REJECT (with codes, persisted)
|
v
EXECUTE (Alpaca paper, idempotent)
|
v
OBSERVE (SSE events -> reconcile)
Why this is clean
- Each step has a single job and produces a persisted artifact.
- Every transition is replayable (re-run parse/validate on historical RAW_ITEM).
- Fail-closed behavior is natural: if anything is unclear, you reject (no trade).
Data model (what you persist and why)
A minimal but production-grade approach is to persist four “layers”:
1) RawItem (source of truth)
-
What you store:
- source:
x or substack
- source_item_id: tweet id or RSS guid/link
- published_at, ingested_at
- raw_text (normalized)
- raw_payload (full JSON/XML)
- content_hash (detect edits/reposts)
-
Why:
- replayability + auditability
- debugging “why did it trade?”
2) ParsedSignal (LLM output, untrusted)
- action:
BUY|SELL|null
- ticker:
string|null
- options:
CALL|PUT|null, strike, expiry
- confidence: 0..1 (informational only)
- evidence spans: map each field to the substring(s) it came from (for auditing)
3) ValidatedSignal (trusted)
- normalized instrument (e.g., stock symbol or option contract id/symbol)
- normalized order request (internal canonical representation)
- reject_codes if invalid (fail-closed)
4) ExecutionRecord (side effect ledger)
- client_order_id (idempotency key)
- broker_order_id
- request/response payloads
- status
This is intentionally “boring”. Boring is reliable.
Ingestion details
X ingestion (poller)
Use X’s “Get Posts” endpoint for authored posts:
GET /2/users/{id}/tweets
- Use
since_id to fetch only newer posts; paginate with pagination_token.
- Handle rate limits and backoff.
(X Developer Platform)
Practical guidance
-
Track a checkpoint:
since_id = last seen tweet id (monotonic)
-
Persist:
- full API payload (not just snippet text)
-
Rate limiting:
- treat
429 as expected; use exponential backoff with jitter
- keep polling frequency consistent with your plan limits and acceptable latency (X Developer Platform)
Substack ingestion (RSS poller)
Substack provides an RSS feed at:
https://<publication>.substack.com/feed (Substack)
Practical guidance
- Use conditional requests when possible (
ETag, If-Modified-Since) to reduce bandwidth.
- RSS content may be truncated/paywalled; if required information is missing, you reject downstream.
Dedup and idempotency (the two keys to “no surprises”)
Ingestion dedup
Use a uniqueness constraint on (source, source_item_id).
For edits:
- Keep
content_hash
- If the same
(source, source_item_id) arrives with a different hash, store it as a new “version” (or store a history table). The important part is: do not lose old content.
Trade idempotency
Alpaca supports querying orders by client-provided id; use this for exactly-once semantics under retries. (Alpaca API Docs)
Pattern:
- Compute
client_order_id = hash(source + source_item_id + normalized_instrument + action)
- Before submitting, call “Get Order by Client Order ID”
- If found: do nothing (idempotent hit)
- If not found: submit order and persist
ExecutionRecord
“Get Order by Client Order ID” is documented in Alpaca’s API. (Alpaca API Docs)
Parsing layer (LLM) — strict, schema-first
What “good” looks like
- The model returns only JSON matching your schema.
- Missing/ambiguous fields must be null, not guessed.
- Temperature 0.
- Evidence spans for each extracted field.
How to implement structured parsing
You have three practical options:
Option A (recommended for minimal + reliable): Instructor + Pydantic
Instructor is designed to extract structured outputs into Pydantic models and handle validation/retries. (Instructor)
Use it to guarantee:
- required fields are present when they should be
- types are correct (e.g., strike is numeric, expiry parses)
Option B: Guardrails (extra validation policies)
Guardrails provides validators for structured outputs and can enforce field-level rules. (guardrails)
Useful if you want:
- stricter “schema + validators + re-ask” behavior
- explicit, reusable validation logic close to the parsing boundary
Option C: Constrained decoding / grammar-based outputs (local models)
If you run local models, libraries like Outlines constrain generation to JSON schema/regex/grammars. (Dottxt AI)
If you use vLLM for serving, it supports structured outputs via Outlines or similar guided-decoding backends. (vLLM)
When this matters
- If you want maximum determinism and fewer “almost JSON” failures
- If you want to avoid network calls to an LLM provider
Deterministic validation gates (the heart of safety)
Gate 1: Schema completeness and ambiguity
Reject if:
- action missing or not one of
{BUY, SELL}
- ticker missing or not a valid symbol format (define allowed regex)
- multiple tickers mentioned but parser cannot isolate a single one
- the post contains words like “watch”, “idea”, “maybe”, etc. unless your policy explicitly allows
Fail-closed rule: when in doubt, reject.
Gate 2: Time rules
- expiry must be a valid date and not in the past
- if expiry is “this Friday” or ambiguous: reject unless your parser explicitly resolved it with evidence
Gate 3: Options contract resolvability (mandatory)
Never “construct” an option symbol from text and hope it matches.
Instead:
- query Alpaca option contracts endpoint with filters (underlying, expiry, strike, type)
- require exactly one match; otherwise reject.
Alpaca provides:
Community guidance from Alpaca also recommends using the contracts endpoint rather than “generating” symbols. (Alpaca Community Forum)
Gate 4: Risk limits (paper safety still matters)
Examples (simple, deterministic):
Gate 5: Duplicate signal detection
Reject if:
- the same raw item already produced a VALIDATED+EXECUTED record
- or the same normalized order already exists (idempotency key hit)
Execution layer (Alpaca paper trading)
Paper environment
Alpaca paper accounts let you simulate trading without real money, and Alpaca notes paper-only accounts are entitled to IEX market data. (Alpaca API Docs)
Python client
alpaca-py supports paper trading by setting paper=True on TradingClient. (Alpaca)
Order lifecycle + querying
Alpaca’s order docs describe client-provided order IDs and querying by them. (Alpaca API Docs)
Observation and reconciliation (don’t rely on “submit = done”)
Polling order status works, but a streaming feed makes reconciliation simpler.
Prefer SSE for ordered events
Alpaca’s SSE documentation states that event ordering is guaranteed per account, which simplifies deterministic state transitions. (Alpaca API Docs)
In practice:
- Submit order
- Listen on SSE for trade updates
- Update
ExecutionRecord on fills/cancels/rejects
- If your service restarts, you can re-sync by querying open orders + recent events
Minimal implementation plan (single process → scalable later)
Phase 1 (single process, SQLite)
-
APScheduler triggers two jobs:
- poll X every N seconds
- poll RSS every M minutes
-
A worker loop processes DB rows by status:
- INGESTED → PARSED → VALIDATED → EXECUTED
-
SQLite is fine at this stage.
APScheduler supports persistent job stores (including SQLite/Postgres) so schedules survive restarts. (GitHub)
Phase 2 (single VM, Postgres, 1–N workers)
Phase 3 (orchestration if you truly need it)
-
If you want visibility, retries, and scheduling in a more managed way:
- Prefect provides Python-native orchestration with state tracking and retries. (docs.prefect.io)
-
If you want “durable execution” semantics and long-running workflows:
- Temporal’s Python SDK supports durable timers and workflow execution patterns. (Temporal Docs)
Should you use an agent framework?
What “agent frameworks” add
Frameworks like LangChain define “agents” that loop, choose tools, and act until a stop condition. This is useful for open-ended tasks, but it increases the blast radius if an LLM is ever confused or manipulated. (LangChain Docs)
For this use case: usually “no”
Your problem is not “plan tasks dynamically”; it is:
- ingest → parse → validate → execute
- with strict determinism and auditability
That maps better to a state machine than an agent loop.
When an agent framework might be worth it
If later you want:
- interactive operator workflows (“ask for confirmation”, “explain rejects”, “summarize performance”)
- richer tool use (fetch context, check news, etc.)
Even then, keep the trade execution path non-agentic:
- agent can assist, but cannot bypass deterministic validation gates
Recommended open-source stack (minimal, high signal)
Ingestion + scheduling
httpx or requests (HTTP client)
feedparser (RSS parsing)
- APScheduler (simple scheduling; persistent job stores available) (GitHub)
Parsing (LLM → schema)
Choose one:
- Instructor (simple structured outputs into Pydantic) (Instructor)
- Guardrails (validators and structured validation policies) (guardrails)
- Outlines (constrained decoding; useful for local models) (Dottxt AI)
Validation
- Pure Python + Pydantic types + explicit reject codes
- Optional: JSON Schema validation (if you standardize on schemas)
Execution + streaming
Observability
- structured logs (JSON)
- metrics (counts of rejects by code; parse failure rate; execution failure rate)
- a kill switch flag in DB/config
A concrete “clean” component breakdown
1) pollers/
-
x_poller.py
- fetch
GET /2/users/{id}/tweets with since_id + pagination (X Developer Platform)
- persist RawItem rows
-
substack_poller.py
- fetch
/feed (Substack)
- persist RawItem rows
2) pipeline/
3) reconcile/
4) storage/
- SQLite first, Postgres later
Summary decision: minimal vs “framework”
Best default (clean + minimal)
- DB-backed state machine + APScheduler + 1 worker
- LLM structured parsing via Instructor (or Guardrails)
- deterministic validation + Alpaca paper execution with client_order_id idempotency
Add orchestration only when you feel pain
- If you need dashboards/retries/scheduling at scale: Prefect (docs.prefect.io)
- If you need durable workflows and stronger exactly-once semantics across many steps: Temporal (Temporal Docs)
- Avoid agent loops in the execution path unless you have a strong reason; LangChain agents are explicitly tool-invoking loops. (LangChain Docs)