From 2bb2cd5ce2eacb86ae81c44e9b678f909018602b Mon Sep 17 00:00:00 2001 From: Adam Macdonald Date: Wed, 6 Aug 2025 20:47:06 +0100 Subject: [PATCH] implement tracker control messages channel & simplify scrape responses --- Cargo.toml | 2 +- src/constants.rs | 7 +-- src/meteoro.rs | 13 ++++-- src/tracker.rs | 117 +++++++++++++++++++++++++++++++++-------------- 4 files changed, 95 insertions(+), 44 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ec02230..70e42fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ edition = "2024" [dependencies] anyhow = "1.0.98" -flume = { version = "0.11.1", features = ["eventual-fairness"] } +flume = { version = "0.11.1", features = ["eventual-fairness", "select"] } heapless = "0.8.0" # log = { version = "...", features = ["release_max_level_info"] } log = "0.4.27" diff --git a/src/constants.rs b/src/constants.rs index 72f85a3..955dc10 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -8,7 +8,8 @@ OPTIONS: -A IPV6_ADDRESS Provide an IPv6 address to bind to (example: -A [::1]:6970) --h | --help Show this help text --v | --version Print the program's build & version information --debug Enable debug logging ---trace Enable trace logging (more verbose debug)"; +--trace Enable trace logging (more verbose debug) + +-h | --help Show this help text +-v | --version Print the program's build & version information"; diff --git a/src/meteoro.rs b/src/meteoro.rs index a9614f9..8aec265 100644 --- a/src/meteoro.rs +++ b/src/meteoro.rs @@ -1,5 +1,6 @@ use std::net::{SocketAddr, UdpSocket}; use std::thread; +use std::time::Duration; use anyhow::Result; use flume::Sender; @@ -13,7 +14,7 @@ use crate::udp_server; pub struct Meteoro {} #[repr(u8)] -pub enum ApplicationControlMessage { +pub enum TrackerControlMessage { /// Signal that the receiver should exit their thread of execution Exit = 0, } @@ -28,7 +29,7 @@ impl Meteoro { pub fn start(addrs: &[SocketAddr]) -> Result<()> { // Create channels - let (ctrl_send, ctrl_recv) = flume::unbounded::(); + let (ctrl_send, ctrl_recv) = flume::unbounded::(); let (srv_ctrl_send, srv_ctrl_recv) = flume::unbounded::(); let (reqs_send, reqs_recv) = flume::unbounded::(); let resp_chans: Vec<_> = (0..addrs.len()) @@ -86,12 +87,14 @@ impl Meteoro { // - a request message channel (recv), for receiving requests from the server(s) // - corresponding response channels (send), for providing handled requests - let resps: Vec> = - resp_chans.iter().map(|(a, _)| a.clone()).collect(); + let resps: Vec> = resp_chans + .iter() + .map(|(resp_chan, _)| resp_chan.clone()) + .collect(); thread::spawn(move || { let mut tracker = Tracker::new(); - let _ = tracker.start(reqs_recv, resps.as_slice()); + let _ = tracker.start(reqs_recv, resps.as_slice(), ctrl_recv); }) }; diff --git a/src/tracker.rs b/src/tracker.rs index 5b3e65d..daa5e00 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -1,9 +1,10 @@ use std::collections::HashMap; use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::time::{Duration, Instant}; use anyhow::Result; -use flume::{Receiver, Sender}; +use flume::{Receiver, Selector, Sender}; use rand::Rng; use rand::rngs::ThreadRng; @@ -13,6 +14,7 @@ use crate::bittorrent::udp::{ AnnounceResponse, AnnounceResponseCommon, AnnounceResponseV4, AnnounceResponseV6, ConnectResponse, ScrapeResponse, ScrapeStats, UdpRequest, UdpResponse, }; +use crate::meteoro::TrackerControlMessage; pub struct RequestMessage { pub(crate) server_id: usize, @@ -67,17 +69,47 @@ impl Tracker { &mut self, reqs: Receiver, resps: &[Sender], + ctrl: Receiver, ) -> Result<()> { - loop { - let request = reqs.recv()?; + let mut next_gc = Instant::now() + GARBAGE_COLLECTION_INTERVAL; + let should_exit = AtomicBool::new(false); - log::trace!("Request from {} | {:?}", request.src_addr, request.request); + while !should_exit.load(Ordering::Relaxed) { + let _ = Selector::new() + .recv(&reqs, |request| { + let request = request.unwrap(); - match self.handle_request(&request) { - Some(response) => resps[request.server_id].send(response)?, - None => (), + log::trace!("Request from {} | {:?}", request.src_addr, request.request); + + match self.handle_request(&request) { + Some(response) => { + resps[request.server_id].send(response).unwrap(); + } + None => (), + }; + }) + .recv(&ctrl, |msg| { + let msg = msg.unwrap(); + + match msg { + TrackerControlMessage::Exit => { + should_exit.store(true, Ordering::Relaxed); + } + }; + }) + .wait(); + + if Instant::now() > next_gc { + // Garbage collect + self.garbage_collect(); + + next_gc = Instant::now() + + GARBAGE_COLLECTION_INTERVAL + + Duration::from_millis(self.rng.random::() % 5000); } } + + return Ok(()); } fn connection_valid(&self, connection_id: i64, src_addr: SocketAddr) -> bool { @@ -281,36 +313,29 @@ impl Tracker { let info_hashes = &self.info_hashes; - let mut scrape_stats: Vec = Vec::new(); + let scrape_stats: Vec = scrape + .info_hashes + .iter() + .filter_map(|info_hash| { + if let Some(swarm) = info_hashes.get(info_hash) { + let n_seeders = swarm + .iter() + .filter(|status| { + status.last_event == Event::Completed || status.remaining == 0 + }) + .count(); + let n_leechers = swarm.len() - n_seeders; - for info_hash in &scrape.info_hashes { - match info_hashes.get(info_hash) { - Some(peers) => { - scrape_stats.push(ScrapeStats { - seeders: peers - .iter() - .filter(|&peer_status| { - peer_status.last_event == Event::Completed - || peer_status.remaining == 0 - && peer_status.socket_addr.is_ipv4() - == request.src_addr.is_ipv4() - }) - .count() as i32, - completed: 0, // TODO: keep track of this - leechers: peers - .iter() - .filter(|&peer_status| { - peer_status.last_event == Event::Started - || peer_status.remaining > 0 - && peer_status.socket_addr.is_ipv4() - == request.src_addr.is_ipv4() - }) - .count() as i32, - }); + Some(ScrapeStats { + seeders: n_seeders as i32, + completed: 0, // TODO + leechers: n_leechers as i32, + }) + } else { + None } - None => scrape_stats.push(ScrapeStats::default()), - }; - } + }) + .collect(); Some(ResponseMessage { response: UdpResponse::Scrape(ScrapeResponse { @@ -323,6 +348,28 @@ impl Tracker { }; } + fn garbage_collect(&mut self) { + let s1 = Instant::now(); + let n_purged_conns = self.purge_expired_connections(); + let d1 = s1.elapsed(); + + let s2 = Instant::now(); + let n_purged_swarm_peers = self.purge_expired_swarm_peers(); + let d2 = s2.elapsed(); + + log::debug!( + "Garbage collected {} expired connections (took: {} ns)", + n_purged_conns, + d1.as_nanos() + ); + log::debug!( + "Garbage collected {} inactive peers from the swarm (took: {} ns)", + n_purged_swarm_peers, + d2.as_nanos() + ); + log::debug!("Garbage collection took {} ns total", (d1 + d2).as_nanos()); + } + fn purge_expired_connections(&mut self) -> usize { let mut purged: usize = 0;