Real-time Streaming¶
Cargo Docs
Subscribe to live price updates from Yahoo Finance via WebSocket. The streaming API uses a Flow-like Stream interface compatible with Rust's futures ecosystem.
Quick Start¶
use finance_query::streaming::PriceStream;
use futures::StreamExt;
let mut stream = PriceStream::subscribe(&["AAPL", "NVDA", "TSLA"]).await?;
while let Some(price) = stream.next().await {
println!("{}: ${:.2} ({:+.2}%)",
price.id,
price.price,
price.change_percent
);
}
Subscribing¶
Simple Subscribe¶
use finance_query::streaming::PriceStream;
let mut stream = PriceStream::subscribe(&["AAPL", "GOOGL"]).await?;
Builder Pattern¶
use finance_query::streaming::PriceStreamBuilder;
use std::time::Duration;
let mut stream = PriceStreamBuilder::new()
.symbols(&["AAPL", "MSFT", "NVDA"])
.reconnect_delay(Duration::from_secs(5))
.build()
.await?;
Dynamic Subscriptions¶
Add or remove symbols after the stream is created:
use finance_query::streaming::PriceStream;
let stream = PriceStream::subscribe(&["AAPL"]).await?;
// Add more symbols
stream.add_symbols(&["NVDA", "TSLA"]).await;
// Remove symbols
stream.remove_symbols(&["AAPL"]).await;
Multiple Consumers¶
Use resubscribe() to create additional receivers sharing the same WebSocket connection:
use finance_query::streaming::PriceStream;
use futures::StreamExt;
let mut stream1 = PriceStream::subscribe(&["AAPL", "NVDA"]).await?;
let mut stream2 = stream1.resubscribe();
// Both streams receive the same updates
tokio::spawn(async move {
while let Some(price) = stream2.next().await {
println!("Consumer 2: {} ${:.2}", price.id, price.price);
}
});
while let Some(price) = stream1.next().await {
println!("Consumer 1: {} ${:.2}", price.id, price.price);
}
PriceUpdate Fields¶
Each update yielded by the stream contains:
| Field | Type | Description |
|---|---|---|
id |
String |
Ticker symbol (e.g., "AAPL") |
price |
f32 |
Current price |
change |
f32 |
Price change from previous close |
change_percent |
f32 |
Percent change from previous close |
day_high |
f32 |
Day's high price |
day_low |
f32 |
Day's low price |
day_volume |
i64 |
Day's trading volume |
open_price |
f32 |
Opening price |
previous_close |
f32 |
Previous close price |
short_name |
String |
Short name/description |
currency |
String |
Currency code (e.g., "USD") |
exchange |
String |
Exchange code (e.g., "NMS") |
quote_type |
QuoteType |
Asset type (Equity, Etf, Cryptocurrency, etc.) |
market_hours |
MarketHoursType |
Session (PreMarket, RegularMarket, PostMarket) |
time |
i64 |
Unix timestamp in milliseconds |
Filtering Updates¶
use finance_query::streaming::{PriceStream, MarketHoursType};
use futures::StreamExt;
let mut stream = PriceStream::subscribe(&["AAPL", "MSFT", "GOOGL"]).await?;
while let Some(price) = stream.next().await {
// Only process regular market updates
if price.market_hours == MarketHoursType::RegularMarket {
println!("{}: ${:.2}", price.id, price.price);
}
}
Closing the Stream¶
use finance_query::streaming::PriceStream;
let stream = PriceStream::subscribe(&["AAPL"]).await?;
// ... use stream ...
stream.close().await;
Notes
- Reconnection: The stream automatically reconnects with a 3-second backoff on connection loss.
- Heartbeats: Subscriptions are refreshed every 15 seconds to keep the connection alive.
- Market hours: Updates are sent during pre-market, regular, and post-market sessions.
- Data availability: Not all fields are populated for every update — Yahoo only sends changed values.