use crate::err_with_loc; use crate::error::app::AppError; use crate::error::Result; use crate::handler::token::TokenMetadataHandlerOperator; use crate::model::token::{MaxDepthReachedData, NewTokenCreatedData, TokenCexUpdatedData}; use crate::slint_ui::{MainWindow, NewTokenUiData}; use crate::storage::StorageEngine; use futures_util::StreamExt; use slint::Weak; use std::sync::Arc; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info, warn}; // ✅ Simplified subscriber that creates its own PubSub connection pub fn spawn_new_token_subscriber( token_handler: Arc, db: Arc, cancellation_token: CancellationToken, ) -> JoinHandle> { tokio::spawn(async move { debug!("new_token_subscriber::starting"); // ✅ Create dedicated PubSub connection for this subscriber let mut pubsub = db.redis.create_pubsub_connection().await?; if let Err(e) = pubsub.subscribe("new_token_created").await { error!("failed_to_subscribe_to_new_token_created: {}", e); return Err(err_with_loc!(AppError::Redis(format!( "failed_to_subscribe_to_new_token_created: {}", e )))); } // Create a channel for buffering messages - with good capacity for performance let (buffer_tx, mut buffer_rx) = mpsc::channel::(1000); info!("new_token_subscriber::subscribed_successfully"); let mut msg_stream = pubsub.on_message(); let cancellation_token = cancellation_token.clone(); loop { tokio::select! { Some(token) = buffer_rx.recv() => { // ✅ Just call the existing handler - no duplication! if let Err(e) = token_handler.process_new_token(token.clone()).await { error!("failed_to_send_token_to_token_handler::mint::{}::error::{}", token.mint.clone(), e); } }, Some(message) = msg_stream.next() => { if let Ok(msg) = message.get_payload::() { if let Ok(token) = serde_json::from_str::(&msg) { debug!("new_token_received::mint::{}::name::{}::creator::{}", token.mint, token.name, token.creator); if let Err(e) = buffer_tx.try_send(token.clone()) { error!("failed_to_send_token_to_buffer::mint::{}::error::{}", token.mint, e); } } } }, _ = cancellation_token.cancelled() => { warn!("new_token_subscriber::shutdown_signal_received"); break; } } } info!("new_token_subscriber::ended"); Ok(()) }) } pub fn spawn_token_cex_updated_subscriber( token_handler: Arc, db: Arc, cancellation_token: CancellationToken, ) -> JoinHandle> { tokio::spawn(async move { debug!("token_cex_updated_subscriber::starting"); // ✅ Create dedicated PubSub connection for this subscriber let mut pubsub = db.redis.create_pubsub_connection().await?; if let Err(e) = pubsub.subscribe("token_cex_updated").await { error!("failed_to_subscribe_to_token_cex_updated: {}", e); return Err(err_with_loc!(AppError::Redis(format!( "failed_to_subscribe_to_token_cex_updated: {}", e )))); } // Create a channel for buffering messages - with good capacity for performance let (buffer_tx, mut buffer_rx) = mpsc::channel::(1000); info!("token_cex_updated_subscriber::subscribed_successfully"); let mut msg_stream = pubsub.on_message(); let cancellation_token = cancellation_token.clone(); loop { tokio::select! { Some(token) = buffer_rx.recv() => { // ✅ Just call the existing handler - no duplication! if let Err(e) = token_handler.process_cex_updated(token.clone()).await { error!("failed_to_send_cex_updated_to_token_handler::mint::{}::error::{}", token.mint, e); } }, Some(message) = msg_stream.next() => { if let Ok(msg) = message.get_payload::() { if let Ok(token) = serde_json::from_str::(&msg) { debug!("token_cex_updated_received::mint::{}::name::{}::cex::{}", token.mint, token.name, token.cex_name); if let Err(e) = buffer_tx.try_send(token.clone()) { error!("failed_to_send_cex_updated_to_buffer::mint::{}::error::{}", token.mint, e); } } } }, _ = cancellation_token.cancelled() => { warn!("token_cex_updated_subscriber::shutdown_signal_received"); break; } } } info!("token_cex_updated_subscriber::ended"); Ok(()) }) } pub fn spawn_max_depth_reached_subscriber( token_handler: Arc, db: Arc, cancellation_token: CancellationToken, ) -> JoinHandle> { tokio::spawn(async move { debug!("max_depth_reached_subscriber::starting"); // ✅ Create dedicated PubSub connection for this subscriber let mut pubsub = db.redis.create_pubsub_connection().await?; if let Err(e) = pubsub.subscribe("max_depth_reached").await { error!("failed_to_subscribe_to_max_depth_reached: {}", e); return Err(err_with_loc!(AppError::Redis(format!( "failed_to_subscribe_to_max_depth_reached: {}", e )))); } // Create a channel for buffering messages - with good capacity for performance let (buffer_tx, mut buffer_rx) = mpsc::channel::(1000); info!("max_depth_reached_subscriber::subscribed_successfully"); let mut msg_stream = pubsub.on_message(); let cancellation_token = cancellation_token.clone(); loop { tokio::select! { Some(token) = buffer_rx.recv() => { // ✅ Just call the existing handler - no duplication! if let Err(e) = token_handler.process_max_depth_reached(token.clone()).await { error!("failed_to_send_max_depth_reached_to_token_handler::mint::{}::error::{}", token.mint, e); } }, Some(message) = msg_stream.next() => { if let Ok(msg) = message.get_payload::() { if let Ok(token) = serde_json::from_str::(&msg) { debug!("max_depth_reached_received::mint::{}::name::{}::nodes::{}::edges::{}", token.mint, token.name, token.node_count, token.edge_count); if let Err(e) = buffer_tx.try_send(token.clone()) { error!("failed_to_send_max_depth_reached_to_buffer::mint::{}::error::{}", token.mint, e); } } } }, _ = cancellation_token.cancelled() => { warn!("max_depth_reached_subscriber::shutdown_signal_received"); break; } } } info!("max_depth_reached_subscriber::ended"); Ok(()) }) }