Aller au contenu

Pipelines d’insights

Les insight pipes sont des processeurs post-génération. Une fois qu’un insight existe, les pipes peuvent :

  • définir les prix d’entrée marché
  • dimensionner la quantité
  • appliquer des fenêtres de trading
  • appliquer la logique stop-loss et take-profit
  • rejeter les idées expirées ou invalides
  • soumettre l’insight
  • gérer les positions remplies

Ils s’exécutent contre l’état courant de l’insight, ce qui en fait un bon emplacement pour la logique de contrôle runtime.

aq-engine/src/core/pipeline/mod.rs
pub trait InsightPipe {
fn version(&self) -> &str;
fn run(&mut self, ctx: &mut dyn StrategyContext, insight: &mut Insight) -> InsightPipeResult;
}

Chaque pipe retourne InsightPipeResult, qui indique à AQE :

  • si la pipe est passée
  • si l’exécution a réussi
  • quel message doit être enregistré

Si une pipe échoue ou retourne passed = false, AQE peut rejeter l’insight automatiquement.

Les pipes sont groupées par InsightState via InsightPipeline.

aq-engine/src/core/strategy/types/mod.rs
pub struct InsightPipeline {
pub pipeline: HashMap<InsightState, VecDeque<WrappedInsightPipe>>,
}

Cela permet d’exécuter une logique différente pour :

  • New
  • Executed
  • Filled
  • Closed
  • Cancelled
  • Rejected

Un pattern courant pour les insights New est :

  1. convertir l’intention d’entrée marché en prix d’entrée
  2. dimensionner la quantité
  3. définir ou valider le stop loss
  4. définir ou valider le take profit
  5. vérifier le reward-to-risk ou les règles de fenêtre de session
  6. soumettre l’insight

MarketOrderEntryPipe définit le prix d’entrée depuis la dernière clôture quand l’insight doit se comporter comme un ordre au marché.

aq-engine/src/core/pipeline/market_order_entry.rs
pub struct MarketOrderEntryPipe;
impl InsightPipe for MarketOrderEntryPipe {
fn version(&self) -> &str {
"1.0"
}
fn run(&mut self, ctx: &mut dyn StrategyContext, insight: &mut Insight) -> InsightPipeResult {
if insight.limit_price.is_some() {
return InsightPipeResult::new(
true,
true,
Some("Insight already has limit price set".to_string()),
self.name().to_string(),
);
}
let history = ctx.history();
if let Some(df) = history.get(&insight.symbol) {
if df.height() > 0 {
if let Ok(close_col) = df.column("close") {
if let Ok(close_vals) = close_col.f64() {
if let Some(price) = close_vals.get(df.height() - 1) {
insight.set_limit_price(Some(price));
return InsightPipeResult::new(
true,
true,
Some(format!("Limit price set to close: {}", price)),
self.name().to_string(),
);
}
}
}
}
}
InsightPipeResult::new(
false,
false,
Some("Failed to get close price for limit".to_string()),
self.name().to_string(),
)
}
}

BasicTakeProfitPipe clôture ou clôture partiellement un insight filled quand le prix atteint la cible configurée.

aq-engine/src/core/pipeline/basic_take_profit.rs
pub struct BasicTakeProfitPipe;
impl InsightPipe for BasicTakeProfitPipe {
fn version(&self) -> &str {
"1.0"
}
fn run(&mut self, ctx: &mut dyn StrategyContext, insight: &mut Insight) -> InsightPipeResult {
let Some(tp_levels) = insight.take_profit_levels() else {
return InsightPipeResult::new(
true,
true,
Some("Insight does not have take profit set".to_string()),
self.name().to_string(),
);
};
let Some(current_tp) = tp_levels.first().copied() else {
return InsightPipeResult::new(true, true, None, self.name().to_string());
};
if insight.closing {
return InsightPipeResult::new(
false,
true,
Some("Insight is already being closed".to_string()),
self.name().to_string(),
);
}
let Some(df) = ctx.history().get(&insight.symbol) else {
return InsightPipeResult::new(
false,
false,
Some("No history found".to_string()),
self.name().to_string(),
);
};
let row = df.tail(Some(1));
let low = row.column("low").ok().and_then(|c| c.f64().ok()).and_then(|c| c.get(0));
let high = row.column("high").ok().and_then(|c| c.f64().ok()).and_then(|c| c.get(0));
let should_close = match insight.side {
OrderSide::Buy => high.map(|v| v >= current_tp).unwrap_or(false),
OrderSide::Sell => low.map(|v| v <= current_tp).unwrap_or(false),
};
if should_close {
if tp_levels.len() > 1 {
let quantity_to_close = ctx
.tools()
.quantity_round(insight.quantity.unwrap_or(0.0) / 2.0, &insight.symbol);
insight.close_partial(ctx, quantity_to_close, None);
return InsightPipeResult::new(
false,
true,
Some(format!("Partial take profit hit for {}", insight.symbol)),
self.name().to_string(),
);
}
insight.close(ctx);
return InsightPipeResult::new(
false,
true,
Some(format!("Take profit hit for {}", insight.symbol)),
self.name().to_string(),
);
}
InsightPipeResult::new(true, true, None, self.name().to_string())
}
}

BasicStopLossPipe clôture un insight filled quand la dernière barre viole le niveau de stop.

aq-engine/src/core/pipeline/basic_stop_loss.rs
pub struct BasicStopLossPipe;
impl InsightPipe for BasicStopLossPipe {
fn version(&self) -> &str {
"1.0"
}
fn run(&mut self, ctx: &mut dyn StrategyContext, insight: &mut Insight) -> InsightPipeResult {
let Some(stop_loss) = insight.stop_loss() else {
return InsightPipeResult::new(
true,
true,
Some("Insight does not have stop loss set".to_string()),
self.name().to_string(),
);
};
if insight.closing {
return InsightPipeResult::new(
false,
true,
Some("Insight is already being closed".to_string()),
self.name().to_string(),
);
}
let Some(df) = ctx.history().get(&insight.symbol) else {
return InsightPipeResult::new(
false,
false,
Some("No history found".to_string()),
self.name().to_string(),
);
};
let row = df.tail(Some(1));
let low = row.column("low").ok().and_then(|c| c.f64().ok()).and_then(|c| c.get(0));
let high = row.column("high").ok().and_then(|c| c.f64().ok()).and_then(|c| c.get(0));
let should_close = match insight.side {
OrderSide::Buy => low.map(|v| v <= stop_loss).unwrap_or(false),
OrderSide::Sell => high.map(|v| v >= stop_loss).unwrap_or(false),
};
if should_close {
insight.close(ctx);
return InsightPipeResult::new(
false,
true,
Some(format!("Price broke stop loss for {}", insight.symbol)),
self.name().to_string(),
);
}
InsightPipeResult::new(true, true, None, self.name().to_string())
}
}

InsightSubmitPipe transmet l’insight au courtier via le contexte de stratégie.

aq-engine/src/core/pipeline/insight_submit.rs
pub struct InsightSubmitPipe;
impl InsightPipe for InsightSubmitPipe {
fn version(&self) -> &str {
"1.0"
}
fn run(&mut self, ctx: &mut dyn StrategyContext, insight: &mut Insight) -> InsightPipeResult {
if insight.state == InsightState::New {
insight.submit(ctx);
}
InsightPipeResult::new(
true,
true,
Some("Insight submitted".to_string()),
self.name().to_string(),
)
}
}

Utilisez des pipes quand la logique doit être réutilisable entre plusieurs stratégies ou modèles alpha.

Bons exemples :

  • pipes quantity-to-risk
  • filtres de fenêtre de trading
  • comportement stop-loss / take-profit
  • application de l’expiration
  • logique scale-out

Exemples moins adaptés :

  • le signal principal qui décide si un trade existe

Ce rôle appartient à la stratégie ou au modèle alpha.

  • aq-engine/src/core/pipeline/mod.rs
  • aq-engine/src/core/pipeline/market_order_entry.rs
  • aq-engine/src/core/pipeline/basic_take_profit.rs
  • aq-engine/src/core/pipeline/basic_stop_loss.rs
  • aq-engine/src/core/pipeline/insight_submit.rs