Skip to content

Insight Pipes

Insight pipes are post-generation processors. Once an insight exists, pipes can:

  • set market-entry prices
  • size the quantity
  • enforce trading windows
  • apply stop-loss and take-profit logic
  • reject expired or invalid ideas
  • submit the insight
  • manage filled positions

They run against the insight’s current state, which makes them a natural fit for runtime control logic.

aq-engine/src/core/pipeline/mod.rs
pub trait InsightPipe {
fn version(&self) -> &str;
fn run(&mut self, ctx: &mut dyn StrategyContext, insight: &mut Insight) -> InsightPipeResult;
}

Each pipe returns InsightPipeResult, which tells AQE:

  • whether the pipe passed
  • whether execution succeeded
  • what message should be recorded

If a pipe fails or returns passed = false, AQE can reject the insight automatically.

Pipes are grouped by InsightState through InsightPipeline.

aq-engine/src/core/strategy/types/mod.rs
pub struct InsightPipeline {
pub pipeline: HashMap<InsightState, VecDeque<WrappedInsightPipe>>,
}

That allows you to run different logic for:

  • New
  • Executed
  • Filled
  • Closed
  • Cancelled
  • Rejected

A common pattern for New insights is:

  1. convert market entry intent into an entry price
  2. size quantity
  3. set or validate stop loss
  4. set or validate take profit
  5. check reward-to-risk or time-window rules
  6. submit the insight

MarketOrderEntryPipe sets the entry price from the most recent close when the insight is intended to behave like a market order.

aq-engine/src/core/pipeline/market_order_entry.rs
pub struct MarketOrderEntryPipe;
impl InsightPipe for MarketOrderEntryPipe {
fn version(&self) -> &str {
"1.0"
}
fn run(&mut self, ctx: &mut dyn StrategyContext, insight: &mut Insight) -> InsightPipeResult {
if insight.limit_price.is_some() {
return InsightPipeResult::new(
true,
true,
Some("Insight already has limit price set".to_string()),
self.name().to_string(),
);
}
let history = ctx.history();
if let Some(df) = history.get(&insight.symbol) {
if df.height() > 0 {
if let Ok(close_col) = df.column("close") {
if let Ok(close_vals) = close_col.f64() {
if let Some(price) = close_vals.get(df.height() - 1) {
insight.set_limit_price(Some(price));
return InsightPipeResult::new(
true,
true,
Some(format!("Limit price set to close: {}", price)),
self.name().to_string(),
);
}
}
}
}
}
InsightPipeResult::new(
false,
false,
Some("Failed to get close price for limit".to_string()),
self.name().to_string(),
)
}
}

BasicTakeProfitPipe closes or partially closes a filled insight when price reaches the configured target.

aq-engine/src/core/pipeline/basic_take_profit.rs
pub struct BasicTakeProfitPipe;
impl InsightPipe for BasicTakeProfitPipe {
fn version(&self) -> &str {
"1.0"
}
fn run(&mut self, ctx: &mut dyn StrategyContext, insight: &mut Insight) -> InsightPipeResult {
let Some(tp_levels) = insight.take_profit_levels() else {
return InsightPipeResult::new(
true,
true,
Some("Insight does not have take profit set".to_string()),
self.name().to_string(),
);
};
let Some(current_tp) = tp_levels.first().copied() else {
return InsightPipeResult::new(true, true, None, self.name().to_string());
};
if insight.closing {
return InsightPipeResult::new(
false,
true,
Some("Insight is already being closed".to_string()),
self.name().to_string(),
);
}
let Some(df) = ctx.history().get(&insight.symbol) else {
return InsightPipeResult::new(
false,
false,
Some("No history found".to_string()),
self.name().to_string(),
);
};
let row = df.tail(Some(1));
let low = row.column("low").ok().and_then(|c| c.f64().ok()).and_then(|c| c.get(0));
let high = row.column("high").ok().and_then(|c| c.f64().ok()).and_then(|c| c.get(0));
let should_close = match insight.side {
OrderSide::Buy => high.map(|v| v >= current_tp).unwrap_or(false),
OrderSide::Sell => low.map(|v| v <= current_tp).unwrap_or(false),
};
if should_close {
if tp_levels.len() > 1 {
let quantity_to_close = ctx
.tools()
.quantity_round(insight.quantity.unwrap_or(0.0) / 2.0, &insight.symbol);
insight.close_partial(ctx, quantity_to_close, None);
return InsightPipeResult::new(
false,
true,
Some(format!("Partial take profit hit for {}", insight.symbol)),
self.name().to_string(),
);
}
insight.close(ctx);
return InsightPipeResult::new(
false,
true,
Some(format!("Take profit hit for {}", insight.symbol)),
self.name().to_string(),
);
}
InsightPipeResult::new(true, true, None, self.name().to_string())
}
}

BasicStopLossPipe closes a filled insight when the latest bar violates the stop level.

aq-engine/src/core/pipeline/basic_stop_loss.rs
pub struct BasicStopLossPipe;
impl InsightPipe for BasicStopLossPipe {
fn version(&self) -> &str {
"1.0"
}
fn run(&mut self, ctx: &mut dyn StrategyContext, insight: &mut Insight) -> InsightPipeResult {
let Some(stop_loss) = insight.stop_loss() else {
return InsightPipeResult::new(
true,
true,
Some("Insight does not have stop loss set".to_string()),
self.name().to_string(),
);
};
if insight.closing {
return InsightPipeResult::new(
false,
true,
Some("Insight is already being closed".to_string()),
self.name().to_string(),
);
}
let Some(df) = ctx.history().get(&insight.symbol) else {
return InsightPipeResult::new(
false,
false,
Some("No history found".to_string()),
self.name().to_string(),
);
};
let row = df.tail(Some(1));
let low = row.column("low").ok().and_then(|c| c.f64().ok()).and_then(|c| c.get(0));
let high = row.column("high").ok().and_then(|c| c.f64().ok()).and_then(|c| c.get(0));
let should_close = match insight.side {
OrderSide::Buy => low.map(|v| v <= stop_loss).unwrap_or(false),
OrderSide::Sell => high.map(|v| v >= stop_loss).unwrap_or(false),
};
if should_close {
insight.close(ctx);
return InsightPipeResult::new(
false,
true,
Some(format!("Price broke stop loss for {}", insight.symbol)),
self.name().to_string(),
);
}
InsightPipeResult::new(true, true, None, self.name().to_string())
}
}

InsightSubmitPipe hands the insight to the broker through the strategy context.

aq-engine/src/core/pipeline/insight_submit.rs
pub struct InsightSubmitPipe;
impl InsightPipe for InsightSubmitPipe {
fn version(&self) -> &str {
"1.0"
}
fn run(&mut self, ctx: &mut dyn StrategyContext, insight: &mut Insight) -> InsightPipeResult {
if insight.state == InsightState::New {
insight.submit(ctx);
}
InsightPipeResult::new(
true,
true,
Some("Insight submitted".to_string()),
self.name().to_string(),
)
}
}

Use pipes when the logic should be reusable across multiple strategies or alpha models.

Good examples:

  • quantity-to-risk pipes
  • trading window filters
  • stop-loss / take-profit behaviour
  • expiry enforcement
  • scale-out logic

Less suitable examples:

  • the core signal that decides whether a trade exists at all

That belongs in the strategy or alpha model.

  • aq-engine/src/core/pipeline/mod.rs
  • aq-engine/src/core/pipeline/market_order_entry.rs
  • aq-engine/src/core/pipeline/basic_take_profit.rs
  • aq-engine/src/core/pipeline/basic_stop_loss.rs
  • aq-engine/src/core/pipeline/insight_submit.rs