186 lines
7.9 KiB
Rust
186 lines
7.9 KiB
Rust
use crate::err_with_loc;
|
|
use crate::error::app::AppError;
|
|
use crate::error::Result;
|
|
use crate::handler::token::TokenMetadataHandlerOperator;
|
|
use crate::model::token::{MaxDepthReachedData, NewTokenCreatedData, TokenCexUpdatedData};
|
|
use crate::slint_ui::{MainWindow, NewTokenUiData};
|
|
use crate::storage::StorageEngine;
|
|
use futures_util::StreamExt;
|
|
use slint::Weak;
|
|
use std::sync::Arc;
|
|
use tokio::sync::mpsc;
|
|
use tokio::task::JoinHandle;
|
|
use tokio_util::sync::CancellationToken;
|
|
use tracing::{debug, error, info, warn};
|
|
|
|
// ✅ Simplified subscriber that creates its own PubSub connection
|
|
pub fn spawn_new_token_subscriber(
|
|
token_handler: Arc<TokenMetadataHandlerOperator>,
|
|
db: Arc<StorageEngine>,
|
|
cancellation_token: CancellationToken,
|
|
) -> JoinHandle<Result<()>> {
|
|
tokio::spawn(async move {
|
|
debug!("new_token_subscriber::starting");
|
|
|
|
// ✅ Create dedicated PubSub connection for this subscriber
|
|
let mut pubsub = db.redis.create_pubsub_connection().await?;
|
|
|
|
if let Err(e) = pubsub.subscribe("new_token_created").await {
|
|
error!("failed_to_subscribe_to_new_token_created: {}", e);
|
|
return Err(err_with_loc!(AppError::Redis(format!(
|
|
"failed_to_subscribe_to_new_token_created: {}",
|
|
e
|
|
))));
|
|
}
|
|
|
|
// Create a channel for buffering messages - with good capacity for performance
|
|
let (buffer_tx, mut buffer_rx) = mpsc::channel::<NewTokenCreatedData>(1000);
|
|
|
|
info!("new_token_subscriber::subscribed_successfully");
|
|
let mut msg_stream = pubsub.on_message();
|
|
let cancellation_token = cancellation_token.clone();
|
|
|
|
loop {
|
|
tokio::select! {
|
|
Some(token) = buffer_rx.recv() => {
|
|
// ✅ Just call the existing handler - no duplication!
|
|
if let Err(e) = token_handler.process_new_token(token.clone()).await {
|
|
error!("failed_to_send_token_to_token_handler::mint::{}::error::{}", token.mint.clone(), e);
|
|
}
|
|
},
|
|
Some(message) = msg_stream.next() => {
|
|
if let Ok(msg) = message.get_payload::<String>() {
|
|
if let Ok(token) = serde_json::from_str::<NewTokenCreatedData>(&msg) {
|
|
debug!("new_token_received::mint::{}::name::{}::creator::{}",
|
|
token.mint, token.name, token.creator);
|
|
if let Err(e) = buffer_tx.try_send(token.clone()) {
|
|
error!("failed_to_send_token_to_buffer::mint::{}::error::{}", token.mint, e);
|
|
}
|
|
}
|
|
}
|
|
},
|
|
_ = cancellation_token.cancelled() => {
|
|
warn!("new_token_subscriber::shutdown_signal_received");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
info!("new_token_subscriber::ended");
|
|
Ok(())
|
|
})
|
|
}
|
|
|
|
pub fn spawn_token_cex_updated_subscriber(
|
|
token_handler: Arc<TokenMetadataHandlerOperator>,
|
|
db: Arc<StorageEngine>,
|
|
cancellation_token: CancellationToken,
|
|
) -> JoinHandle<Result<()>> {
|
|
tokio::spawn(async move {
|
|
debug!("token_cex_updated_subscriber::starting");
|
|
|
|
// ✅ Create dedicated PubSub connection for this subscriber
|
|
let mut pubsub = db.redis.create_pubsub_connection().await?;
|
|
|
|
if let Err(e) = pubsub.subscribe("token_cex_updated").await {
|
|
error!("failed_to_subscribe_to_token_cex_updated: {}", e);
|
|
return Err(err_with_loc!(AppError::Redis(format!(
|
|
"failed_to_subscribe_to_token_cex_updated: {}",
|
|
e
|
|
))));
|
|
}
|
|
|
|
// Create a channel for buffering messages - with good capacity for performance
|
|
let (buffer_tx, mut buffer_rx) = mpsc::channel::<TokenCexUpdatedData>(1000);
|
|
|
|
info!("token_cex_updated_subscriber::subscribed_successfully");
|
|
let mut msg_stream = pubsub.on_message();
|
|
let cancellation_token = cancellation_token.clone();
|
|
|
|
loop {
|
|
tokio::select! {
|
|
Some(token) = buffer_rx.recv() => {
|
|
// ✅ Just call the existing handler - no duplication!
|
|
if let Err(e) = token_handler.process_cex_updated(token.clone()).await {
|
|
error!("failed_to_send_cex_updated_to_token_handler::mint::{}::error::{}", token.mint, e);
|
|
}
|
|
},
|
|
Some(message) = msg_stream.next() => {
|
|
if let Ok(msg) = message.get_payload::<String>() {
|
|
if let Ok(token) = serde_json::from_str::<TokenCexUpdatedData>(&msg) {
|
|
debug!("token_cex_updated_received::mint::{}::name::{}::cex::{}",
|
|
token.mint, token.name, token.cex_name);
|
|
if let Err(e) = buffer_tx.try_send(token.clone()) {
|
|
error!("failed_to_send_cex_updated_to_buffer::mint::{}::error::{}", token.mint, e);
|
|
}
|
|
}
|
|
}
|
|
},
|
|
_ = cancellation_token.cancelled() => {
|
|
warn!("token_cex_updated_subscriber::shutdown_signal_received");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
info!("token_cex_updated_subscriber::ended");
|
|
Ok(())
|
|
})
|
|
}
|
|
|
|
pub fn spawn_max_depth_reached_subscriber(
|
|
token_handler: Arc<TokenMetadataHandlerOperator>,
|
|
db: Arc<StorageEngine>,
|
|
cancellation_token: CancellationToken,
|
|
) -> JoinHandle<Result<()>> {
|
|
tokio::spawn(async move {
|
|
debug!("max_depth_reached_subscriber::starting");
|
|
|
|
// ✅ Create dedicated PubSub connection for this subscriber
|
|
let mut pubsub = db.redis.create_pubsub_connection().await?;
|
|
|
|
if let Err(e) = pubsub.subscribe("max_depth_reached").await {
|
|
error!("failed_to_subscribe_to_max_depth_reached: {}", e);
|
|
return Err(err_with_loc!(AppError::Redis(format!(
|
|
"failed_to_subscribe_to_max_depth_reached: {}",
|
|
e
|
|
))));
|
|
}
|
|
|
|
// Create a channel for buffering messages - with good capacity for performance
|
|
let (buffer_tx, mut buffer_rx) = mpsc::channel::<MaxDepthReachedData>(1000);
|
|
|
|
info!("max_depth_reached_subscriber::subscribed_successfully");
|
|
let mut msg_stream = pubsub.on_message();
|
|
let cancellation_token = cancellation_token.clone();
|
|
|
|
loop {
|
|
tokio::select! {
|
|
Some(token) = buffer_rx.recv() => {
|
|
// ✅ Just call the existing handler - no duplication!
|
|
if let Err(e) = token_handler.process_max_depth_reached(token.clone()).await {
|
|
error!("failed_to_send_max_depth_reached_to_token_handler::mint::{}::error::{}", token.mint, e);
|
|
}
|
|
},
|
|
Some(message) = msg_stream.next() => {
|
|
if let Ok(msg) = message.get_payload::<String>() {
|
|
if let Ok(token) = serde_json::from_str::<MaxDepthReachedData>(&msg) {
|
|
debug!("max_depth_reached_received::mint::{}::name::{}::nodes::{}::edges::{}",
|
|
token.mint, token.name, token.node_count, token.edge_count);
|
|
if let Err(e) = buffer_tx.try_send(token.clone()) {
|
|
error!("failed_to_send_max_depth_reached_to_buffer::mint::{}::error::{}", token.mint, e);
|
|
}
|
|
}
|
|
}
|
|
},
|
|
_ = cancellation_token.cancelled() => {
|
|
warn!("max_depth_reached_subscriber::shutdown_signal_received");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
info!("max_depth_reached_subscriber::ended");
|
|
Ok(())
|
|
})
|
|
}
|