Ziya/src/task/subscriber.rs
2025-07-08 14:57:51 +07:00

186 lines
7.9 KiB
Rust

use crate::err_with_loc;
use crate::error::app::AppError;
use crate::error::Result;
use crate::handler::token::TokenMetadataHandlerOperator;
use crate::model::token::{MaxDepthReachedData, NewTokenCreatedData, TokenCexUpdatedData};
use crate::slint_ui::{MainWindow, NewTokenUiData};
use crate::storage::StorageEngine;
use futures_util::StreamExt;
use slint::Weak;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error, info, warn};
// ✅ Simplified subscriber that creates its own PubSub connection
pub fn spawn_new_token_subscriber(
token_handler: Arc<TokenMetadataHandlerOperator>,
db: Arc<StorageEngine>,
cancellation_token: CancellationToken,
) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
debug!("new_token_subscriber::starting");
// ✅ Create dedicated PubSub connection for this subscriber
let mut pubsub = db.redis.create_pubsub_connection().await?;
if let Err(e) = pubsub.subscribe("new_token_created").await {
error!("failed_to_subscribe_to_new_token_created: {}", e);
return Err(err_with_loc!(AppError::Redis(format!(
"failed_to_subscribe_to_new_token_created: {}",
e
))));
}
// Create a channel for buffering messages - with good capacity for performance
let (buffer_tx, mut buffer_rx) = mpsc::channel::<NewTokenCreatedData>(1000);
info!("new_token_subscriber::subscribed_successfully");
let mut msg_stream = pubsub.on_message();
let cancellation_token = cancellation_token.clone();
loop {
tokio::select! {
Some(token) = buffer_rx.recv() => {
// ✅ Just call the existing handler - no duplication!
if let Err(e) = token_handler.process_new_token(token.clone()).await {
error!("failed_to_send_token_to_token_handler::mint::{}::error::{}", token.mint.clone(), e);
}
},
Some(message) = msg_stream.next() => {
if let Ok(msg) = message.get_payload::<String>() {
if let Ok(token) = serde_json::from_str::<NewTokenCreatedData>(&msg) {
debug!("new_token_received::mint::{}::name::{}::creator::{}",
token.mint, token.name, token.creator);
if let Err(e) = buffer_tx.try_send(token.clone()) {
error!("failed_to_send_token_to_buffer::mint::{}::error::{}", token.mint, e);
}
}
}
},
_ = cancellation_token.cancelled() => {
warn!("new_token_subscriber::shutdown_signal_received");
break;
}
}
}
info!("new_token_subscriber::ended");
Ok(())
})
}
pub fn spawn_token_cex_updated_subscriber(
token_handler: Arc<TokenMetadataHandlerOperator>,
db: Arc<StorageEngine>,
cancellation_token: CancellationToken,
) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
debug!("token_cex_updated_subscriber::starting");
// ✅ Create dedicated PubSub connection for this subscriber
let mut pubsub = db.redis.create_pubsub_connection().await?;
if let Err(e) = pubsub.subscribe("token_cex_updated").await {
error!("failed_to_subscribe_to_token_cex_updated: {}", e);
return Err(err_with_loc!(AppError::Redis(format!(
"failed_to_subscribe_to_token_cex_updated: {}",
e
))));
}
// Create a channel for buffering messages - with good capacity for performance
let (buffer_tx, mut buffer_rx) = mpsc::channel::<TokenCexUpdatedData>(1000);
info!("token_cex_updated_subscriber::subscribed_successfully");
let mut msg_stream = pubsub.on_message();
let cancellation_token = cancellation_token.clone();
loop {
tokio::select! {
Some(token) = buffer_rx.recv() => {
// ✅ Just call the existing handler - no duplication!
if let Err(e) = token_handler.process_cex_updated(token.clone()).await {
error!("failed_to_send_cex_updated_to_token_handler::mint::{}::error::{}", token.mint, e);
}
},
Some(message) = msg_stream.next() => {
if let Ok(msg) = message.get_payload::<String>() {
if let Ok(token) = serde_json::from_str::<TokenCexUpdatedData>(&msg) {
debug!("token_cex_updated_received::mint::{}::name::{}::cex::{}",
token.mint, token.name, token.cex_name);
if let Err(e) = buffer_tx.try_send(token.clone()) {
error!("failed_to_send_cex_updated_to_buffer::mint::{}::error::{}", token.mint, e);
}
}
}
},
_ = cancellation_token.cancelled() => {
warn!("token_cex_updated_subscriber::shutdown_signal_received");
break;
}
}
}
info!("token_cex_updated_subscriber::ended");
Ok(())
})
}
pub fn spawn_max_depth_reached_subscriber(
token_handler: Arc<TokenMetadataHandlerOperator>,
db: Arc<StorageEngine>,
cancellation_token: CancellationToken,
) -> JoinHandle<Result<()>> {
tokio::spawn(async move {
debug!("max_depth_reached_subscriber::starting");
// ✅ Create dedicated PubSub connection for this subscriber
let mut pubsub = db.redis.create_pubsub_connection().await?;
if let Err(e) = pubsub.subscribe("max_depth_reached").await {
error!("failed_to_subscribe_to_max_depth_reached: {}", e);
return Err(err_with_loc!(AppError::Redis(format!(
"failed_to_subscribe_to_max_depth_reached: {}",
e
))));
}
// Create a channel for buffering messages - with good capacity for performance
let (buffer_tx, mut buffer_rx) = mpsc::channel::<MaxDepthReachedData>(1000);
info!("max_depth_reached_subscriber::subscribed_successfully");
let mut msg_stream = pubsub.on_message();
let cancellation_token = cancellation_token.clone();
loop {
tokio::select! {
Some(token) = buffer_rx.recv() => {
// ✅ Just call the existing handler - no duplication!
if let Err(e) = token_handler.process_max_depth_reached(token.clone()).await {
error!("failed_to_send_max_depth_reached_to_token_handler::mint::{}::error::{}", token.mint, e);
}
},
Some(message) = msg_stream.next() => {
if let Ok(msg) = message.get_payload::<String>() {
if let Ok(token) = serde_json::from_str::<MaxDepthReachedData>(&msg) {
debug!("max_depth_reached_received::mint::{}::name::{}::nodes::{}::edges::{}",
token.mint, token.name, token.node_count, token.edge_count);
if let Err(e) = buffer_tx.try_send(token.clone()) {
error!("failed_to_send_max_depth_reached_to_buffer::mint::{}::error::{}", token.mint, e);
}
}
}
},
_ = cancellation_token.cancelled() => {
warn!("max_depth_reached_subscriber::shutdown_signal_received");
break;
}
}
}
info!("max_depth_reached_subscriber::ended");
Ok(())
})
}