Insight Pipes
What insight pipes are for
Section titled “What insight pipes are for”Insight pipes are post-generation processors. Once an insight exists, pipes can:
- set market-entry prices
- size the quantity
- enforce trading windows
- apply stop-loss and take-profit logic
- reject expired or invalid ideas
- submit the insight
- manage filled positions
They run against the insight’s current state, which makes them a natural fit for runtime control logic.
Core trait
Section titled “Core trait”pub trait InsightPipe { fn version(&self) -> &str; fn run(&mut self, ctx: &mut dyn StrategyContext, insight: &mut Insight) -> InsightPipeResult;}Each pipe returns InsightPipeResult, which tells AQE:
- whether the pipe passed
- whether execution succeeded
- what message should be recorded
If a pipe fails or returns passed = false, AQE can reject the insight automatically.
State targeting
Section titled “State targeting”Pipes are grouped by InsightState through InsightPipeline.
pub struct InsightPipeline { pub pipeline: HashMap<InsightState, VecDeque<WrappedInsightPipe>>,}That allows you to run different logic for:
NewExecutedFilledClosedCancelledRejected
Typical new-insight pipeline
Section titled “Typical new-insight pipeline”A common pattern for New insights is:
- convert market entry intent into an entry price
- size quantity
- set or validate stop loss
- set or validate take profit
- check reward-to-risk or time-window rules
- submit the insight
Real built-in examples
Section titled “Real built-in examples”Market order entry
Section titled “Market order entry”MarketOrderEntryPipe sets the entry price from the most recent close when the insight is intended to behave like a market order.
pub struct MarketOrderEntryPipe;
impl InsightPipe for MarketOrderEntryPipe { fn version(&self) -> &str { "1.0" }
fn run(&mut self, ctx: &mut dyn StrategyContext, insight: &mut Insight) -> InsightPipeResult { if insight.limit_price.is_some() { return InsightPipeResult::new( true, true, Some("Insight already has limit price set".to_string()), self.name().to_string(), ); }
let history = ctx.history(); if let Some(df) = history.get(&insight.symbol) { if df.height() > 0 { if let Ok(close_col) = df.column("close") { if let Ok(close_vals) = close_col.f64() { if let Some(price) = close_vals.get(df.height() - 1) { insight.set_limit_price(Some(price)); return InsightPipeResult::new( true, true, Some(format!("Limit price set to close: {}", price)), self.name().to_string(), ); } } } } }
InsightPipeResult::new( false, false, Some("Failed to get close price for limit".to_string()), self.name().to_string(), ) }}Take profit
Section titled “Take profit”BasicTakeProfitPipe closes or partially closes a filled insight when price reaches the configured target.
pub struct BasicTakeProfitPipe;
impl InsightPipe for BasicTakeProfitPipe { fn version(&self) -> &str { "1.0" }
fn run(&mut self, ctx: &mut dyn StrategyContext, insight: &mut Insight) -> InsightPipeResult { let Some(tp_levels) = insight.take_profit_levels() else { return InsightPipeResult::new( true, true, Some("Insight does not have take profit set".to_string()), self.name().to_string(), ); }; let Some(current_tp) = tp_levels.first().copied() else { return InsightPipeResult::new(true, true, None, self.name().to_string()); }; if insight.closing { return InsightPipeResult::new( false, true, Some("Insight is already being closed".to_string()), self.name().to_string(), ); }
let Some(df) = ctx.history().get(&insight.symbol) else { return InsightPipeResult::new( false, false, Some("No history found".to_string()), self.name().to_string(), ); }; let row = df.tail(Some(1)); let low = row.column("low").ok().and_then(|c| c.f64().ok()).and_then(|c| c.get(0)); let high = row.column("high").ok().and_then(|c| c.f64().ok()).and_then(|c| c.get(0)); let should_close = match insight.side { OrderSide::Buy => high.map(|v| v >= current_tp).unwrap_or(false), OrderSide::Sell => low.map(|v| v <= current_tp).unwrap_or(false), };
if should_close { if tp_levels.len() > 1 { let quantity_to_close = ctx .tools() .quantity_round(insight.quantity.unwrap_or(0.0) / 2.0, &insight.symbol); insight.close_partial(ctx, quantity_to_close, None); return InsightPipeResult::new( false, true, Some(format!("Partial take profit hit for {}", insight.symbol)), self.name().to_string(), ); }
insight.close(ctx); return InsightPipeResult::new( false, true, Some(format!("Take profit hit for {}", insight.symbol)), self.name().to_string(), ); }
InsightPipeResult::new(true, true, None, self.name().to_string()) }}Stop loss
Section titled “Stop loss”BasicStopLossPipe closes a filled insight when the latest bar violates the stop level.
pub struct BasicStopLossPipe;
impl InsightPipe for BasicStopLossPipe { fn version(&self) -> &str { "1.0" }
fn run(&mut self, ctx: &mut dyn StrategyContext, insight: &mut Insight) -> InsightPipeResult { let Some(stop_loss) = insight.stop_loss() else { return InsightPipeResult::new( true, true, Some("Insight does not have stop loss set".to_string()), self.name().to_string(), ); }; if insight.closing { return InsightPipeResult::new( false, true, Some("Insight is already being closed".to_string()), self.name().to_string(), ); }
let Some(df) = ctx.history().get(&insight.symbol) else { return InsightPipeResult::new( false, false, Some("No history found".to_string()), self.name().to_string(), ); }; let row = df.tail(Some(1)); let low = row.column("low").ok().and_then(|c| c.f64().ok()).and_then(|c| c.get(0)); let high = row.column("high").ok().and_then(|c| c.f64().ok()).and_then(|c| c.get(0)); let should_close = match insight.side { OrderSide::Buy => low.map(|v| v <= stop_loss).unwrap_or(false), OrderSide::Sell => high.map(|v| v >= stop_loss).unwrap_or(false), };
if should_close { insight.close(ctx); return InsightPipeResult::new( false, true, Some(format!("Price broke stop loss for {}", insight.symbol)), self.name().to_string(), ); }
InsightPipeResult::new(true, true, None, self.name().to_string()) }}Submission
Section titled “Submission”InsightSubmitPipe hands the insight to the broker through the strategy context.
pub struct InsightSubmitPipe;
impl InsightPipe for InsightSubmitPipe { fn version(&self) -> &str { "1.0" }
fn run(&mut self, ctx: &mut dyn StrategyContext, insight: &mut Insight) -> InsightPipeResult { if insight.state == InsightState::New { insight.submit(ctx); } InsightPipeResult::new( true, true, Some("Insight submitted".to_string()), self.name().to_string(), ) }}How to think about pipe design
Section titled “How to think about pipe design”Use pipes when the logic should be reusable across multiple strategies or alpha models.
Good examples:
- quantity-to-risk pipes
- trading window filters
- stop-loss / take-profit behaviour
- expiry enforcement
- scale-out logic
Less suitable examples:
- the core signal that decides whether a trade exists at all
That belongs in the strategy or alpha model.
Related source
Section titled “Related source”aq-engine/src/core/pipeline/mod.rsaq-engine/src/core/pipeline/market_order_entry.rsaq-engine/src/core/pipeline/basic_take_profit.rsaq-engine/src/core/pipeline/basic_stop_loss.rsaq-engine/src/core/pipeline/insight_submit.rs