diff --git a/Cargo.toml b/Cargo.toml index b45b3d8..28cda7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,8 +5,9 @@ edition = "2024" [dependencies] anyhow = "1.0.98" -flume = { version = "0.11.1", features = ["eventual-fairness", "select"] } +flume = { version = "0.11.1", features = ["eventual-fairness"] } heapless = "0.8.0" -log = { version = "0.4.27", features = ["release_max_level_info"] } +# log = { version = "...", features = ["release_max_level_info"] } +log = "0.4.27" rand = "0.9.2" simplelog = "0.12.2" diff --git a/src/constants.rs b/src/constants.rs index 8173ad5..72f85a3 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -9,4 +9,6 @@ OPTIONS: (example: -A [::1]:6970) -h | --help Show this help text --v | --version Print the program's build & version information"; +-v | --version Print the program's build & version information +--debug Enable debug logging +--trace Enable trace logging (more verbose debug)"; diff --git a/src/main.rs b/src/main.rs index 0db3914..ce005ad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,13 +12,31 @@ mod tracker; use tracker::Tracker; fn main() -> ExitCode { + let args: Vec = env::args().collect(); + + if args.contains(&String::from("--help")) || args.contains(&String::from("-h")) { + println!("{}", constants::METEORO_HELP_TEXT); + return ExitCode::SUCCESS; + } + + if args.contains(&String::from("--version")) || args.contains(&String::from("-v")) { + println!("{} {}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")); + return ExitCode::SUCCESS; + } + #[cfg(debug_assertions)] - let log_level: LevelFilter = LevelFilter::Trace; + let mut log_level: LevelFilter = LevelFilter::Trace; #[cfg(not(debug_assertions))] - let log_level: LevelFilter = LevelFilter::Info; + let mut log_level: LevelFilter = LevelFilter::Info; + + if args.contains(&String::from("--debug")) { + log_level = LevelFilter::Debug; + } else if args.contains(&String::from("--trace")) { + log_level = LevelFilter::Trace; + } let logger_config = ConfigBuilder::new() - .set_target_level(LevelFilter::Off) + .set_target_level(LevelFilter::Trace) .set_location_level(LevelFilter::Off) .set_thread_level(LevelFilter::Off) .set_time_level(LevelFilter::Info) @@ -32,18 +50,6 @@ fn main() -> ExitCode { ) .expect("failed to initialise terminal logging"); - let args: Vec = env::args().collect(); - - if args.contains(&String::from("--help")) || args.contains(&String::from("-h")) { - println!("{}", constants::METEORO_HELP_TEXT); - return ExitCode::SUCCESS; - } - - if args.contains(&String::from("--version")) || args.contains(&String::from("-v")) { - println!("{} {}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")); - return ExitCode::SUCCESS; - } - let ipv4_flag_idxs: Vec = args .iter() .enumerate() diff --git a/src/tracker/bittorrent/udp.rs b/src/tracker/bittorrent/udp.rs index 7186ba2..e6a1ffb 100644 --- a/src/tracker/bittorrent/udp.rs +++ b/src/tracker/bittorrent/udp.rs @@ -199,6 +199,7 @@ pub struct ErrorResponse { pub message: String, } +#[derive(Debug)] pub enum UdpRequest { Connect(ConnectRequest), Announce(AnnounceRequest), diff --git a/src/tracker/tracker.rs b/src/tracker/tracker.rs index 498b9f4..ad0f4d4 100644 --- a/src/tracker/tracker.rs +++ b/src/tracker/tracker.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; -use std::os::linux::raw::stat; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::thread; @@ -35,9 +34,12 @@ struct PeerMetadata { } struct PeerStatus { - connection_id: i64, + socket_addr: SocketAddr, last_event: Event, last_active: Instant, + downloaded: u64, + uploaded: u64, + remaining: u64, } pub const GARBAGE_COLLECTION_INTERVAL: Duration = Duration::from_secs(20); @@ -88,14 +90,15 @@ impl Tracker { self.server_threads.push(thread::spawn(move || { let server = UdpServer::new(a, server_id); - loop { - match server.start(req_chan.clone(), resp_recv.clone()) { - Err(e) => { - log::error!("Failed to start server {}: {}", server_id, e) - } - _ => (), - }; - } + + match server.start(req_chan.clone(), resp_recv.clone()) { + Err(e) => { + log::error!("Failed to start server {}: {}", server_id, e) + } + _ => (), + }; + + return; })); self.response_channels.push(resp_send); @@ -112,15 +115,28 @@ impl Tracker { loop { thread::sleep(GARBAGE_COLLECTION_INTERVAL); - let start = Instant::now(); + let s1 = Instant::now(); let expired = Tracker::purge_expired_connections(&mut peers.write().unwrap()); - let purged = - Tracker::purge_expired_swarm_peers(&mut info_hashes.write().unwrap(), &expired); - log::debug!("Garbage collected {} expired connections", expired.len()); - log::debug!("Garbage collected {} inactive peers from the swarm", purged); - log::debug!("Garbage collection took {} ns", start.elapsed().as_nanos()) + let d1 = s1.elapsed(); + let s2 = Instant::now(); + + let purged = Tracker::purge_expired_swarm_peers(&mut info_hashes.write().unwrap()); + + let d2 = s2.elapsed(); + + log::debug!( + "Garbage collected {} expired connections (took: {} ns)", + expired, + d1.as_nanos() + ); + log::debug!( + "Garbage collected {} inactive peers from the swarm (took: {} ns)", + purged, + d2.as_nanos() + ); + log::debug!("Garbage collection took {} ns total", (d1 + d2).as_nanos()); } }); @@ -135,14 +151,10 @@ impl Tracker { } }; + log::trace!("Request from {} | {:?}", request.src_addr, request.request); + match request.request { UdpRequest::Connect(connect) => { - log::trace!( - "(Server {}) Connect request: {:?}", - request.server_id, - connect - ); - let new_id: i64 = rng.random(); let mut peers = self.peers.write().unwrap(); @@ -171,17 +183,11 @@ impl Tracker { } UdpRequest::Announce(announce) => { - log::trace!( - "(Server {}) Announce request: {:?}", - request.server_id, - announce - ); - if !self.connection_valid(announce.connection_id, request.src_addr) { continue; } - // Ensure we honour the desired number of peers, within our bounaries + // Ensure we honour the desired number of peers, within our boundaries let n_announce_want: u32 = if let Some(n) = announce.num_want { if n < DEFAULT_ANNOUNCE_WANT { @@ -194,8 +200,7 @@ impl Tracker { }; let mut n_announce_entries: u32 = 0; - let mut n_leechers: i32 = 0; - let mut n_seeders: i32 = 0; + let mut n_seeders: u32 = 0; let mut v4_peers: Vec = Vec::new(); let mut v6_peers: Vec = Vec::new(); @@ -209,9 +214,12 @@ impl Tracker { info_hashes.insert( announce.info_hash, vec![PeerStatus { - connection_id: announce.connection_id, + socket_addr: request.src_addr, last_event: announce.event, last_active: Instant::now(), + downloaded: announce.downloaded as u64, + uploaded: announce.uploaded as u64, + remaining: announce.left as u64, }], ); } @@ -219,94 +227,109 @@ impl Tracker { // Insert into swarm if not already present // TODO: sort (?) - let this_swarm_idx: Option = - match swarm.iter_mut().enumerate().find(|(idx, status)| { - status.connection_id == announce.connection_id - }) { - Some((idx, peer)) => { - // Peer already exists in swarm, update state + let existing_swarm_idx: Option = match swarm + .iter_mut() + .enumerate() + .find(|(_, status)| status.socket_addr == request.src_addr) + { + Some((idx, peer)) => { + // Peer already exists in swarm, update state - peer.last_active = Instant::now(); - peer.last_event = announce.event; + peer.last_active = Instant::now(); + peer.last_event = announce.event; + peer.downloaded = announce.downloaded as u64; + peer.uploaded = announce.uploaded as u64; + peer.remaining = announce.left as u64; - Some(idx) - } - None => { - // Peer is not in swarm, add it + Some(idx) + } + None => { + // Peer is not in swarm, add it - swarm.push(PeerStatus { - connection_id: announce.connection_id, - last_event: announce.event, - last_active: Instant::now(), - }); + swarm.push(PeerStatus { + socket_addr: request.src_addr, + last_event: announce.event, + last_active: Instant::now(), + downloaded: announce.downloaded as u64, + uploaded: announce.uploaded as u64, + remaining: announce.left as u64, + }); - None - } - }; + None + } + }; if announce.event == Event::Stopped { - if let Some(idx) = this_swarm_idx { + if let Some(idx) = existing_swarm_idx { swarm.remove(idx); } } - let peers = self.peers.read().unwrap(); + // Iterate over all peers in the swarm for announce response for peer_status in swarm { - if peer_status.last_event == Event::Stopped { - // Skip any peers who have stopped their download - continue; - } - - if peer_status.connection_id == announce.connection_id { - continue; // don't give the peer their own connection ID's address - } - // Respect number of peers requested if n_announce_entries >= n_announce_want { break; } + // Don't provide useless peers + + let peer_invalid: bool = (peer_status.last_event == Event::Stopped) + || (peer_status.socket_addr == request.src_addr); + + if peer_invalid { + log::trace!( + "(src: {}) {} -> {:?} deemed invalid", + request.src_addr, + peer_status.socket_addr, + peer_status.last_event + ); + continue; + } + // Collect and add the relevant peer's address to the appropriate vector - if let Some(metadata) = peers.get(&peer_status.connection_id) { - if metadata.socket_addr == request.src_addr { - continue; // don't give the peer an expired ID with the same address - } + let is_seed: bool = peer_status.last_event == Event::Completed + || peer_status.remaining == 0; - match &metadata.socket_addr { - SocketAddr::V4(v4) => { - if request.src_addr.is_ipv4() { - v4_peers.push(*v4); - } + match &peer_status.socket_addr { + SocketAddr::V4(v4) => { + if request.src_addr.is_ipv4() { + v4_peers.push(*v4); + n_seeders += is_seed as u32; } - SocketAddr::V6(v6) => { - if request.src_addr.is_ipv6() { - v6_peers.push(*v6); - } - } - }; - - // Keep track of the seeders, leechers & announce entries gathered - - if peer_status.last_event == Event::Completed { - n_seeders += 1; - } else { - n_leechers += 1; } + SocketAddr::V6(v6) => { + if request.src_addr.is_ipv6() { + v6_peers.push(*v6); + n_seeders += is_seed as u32; + } + } + }; - n_announce_entries += 1; - } + // Keep track of the announce entries gathered + + n_announce_entries += 1; } } }; + log::trace!( + "(src: {}) IPv4s: {}, IPv6s: {}, leechers: {}, seeders: {}", + request.src_addr, + v4_peers.len(), + v6_peers.len(), + n_announce_entries - n_seeders, + n_seeders + ); + let resp_common = AnnounceResponseCommon { transaction_id: announce.transaction_id, interval: DEFAULT_ANNOUNCE_INTERVAL.as_secs() as i32, - leechers: n_leechers, - seeders: n_seeders, + leechers: (n_announce_entries - n_seeders) as i32, + seeders: n_seeders as i32, }; let response = if request.src_addr.is_ipv4() { // IPv4 @@ -335,12 +358,6 @@ impl Tracker { } UdpRequest::Scrape(scrape) => { - log::trace!( - "(Server {}) Scrape request: {:?}", - request.server_id, - scrape - ); - if !self.connection_valid(scrape.connection_id, request.src_addr) { continue; } @@ -356,6 +373,9 @@ impl Tracker { .iter() .filter(|&peer_status| { peer_status.last_event == Event::Completed + || peer_status.remaining == 0 + && peer_status.socket_addr.is_ipv4() + == request.src_addr.is_ipv4() }) .count() as i32, @@ -364,6 +384,9 @@ impl Tracker { .iter() .filter(|&peer_status| { peer_status.last_event == Event::Started + || peer_status.remaining > 0 + && peer_status.socket_addr.is_ipv4() + == request.src_addr.is_ipv4() }) .count() as i32, @@ -406,46 +429,39 @@ impl Tracker { }; } - fn purge_expired_connections(peers: &mut ConnectionIdMap) -> Vec { - let mut purged: Vec = Vec::new(); + fn purge_expired_connections(peers: &mut ConnectionIdMap) -> usize { + let mut purged: usize = 0; - for (id, metadata) in peers.iter() { - if metadata.last_active.elapsed() > CONNECTION_EXPIRE_TIME { - purged.push(*id); + peers.retain(|_, metadata| { + if metadata.last_active.elapsed() < CONNECTION_EXPIRE_TIME { + true + } else { + purged += 1; + false } - } - - for id in &purged { - peers.remove(id); - } + }); return purged; } - fn purge_expired_swarm_peers( - info_hashes: &mut InfoHashMap, - expired_connections: &[i64], - ) -> usize { + fn purge_expired_swarm_peers(info_hashes: &mut InfoHashMap) -> usize { let mut n_purged: usize = 0; - info_hashes.retain(|_, swarm| { - swarm.is_empty() - || swarm - .iter() - .find(|status| { - let cond = expired_connections.contains(&status.connection_id) - || status.last_active.elapsed() > DEFAULT_ANNOUNCE_INTERVAL * 5 - || status.last_event == Event::Stopped; - - if cond { - n_purged += 1; - } - - cond - }) - .is_some() + info_hashes.iter_mut().for_each(|(_, b)| { + b.retain(|status| { + if status.last_active.elapsed() < DEFAULT_ANNOUNCE_INTERVAL * 5 + && status.last_event != Event::Stopped + { + true + } else { + n_purged += 1; + false + } + }) }); + info_hashes.retain(|_, swarm| !swarm.is_empty()); + return n_purged; } diff --git a/src/tracker/udp_server.rs b/src/tracker/udp_server.rs index 2152b59..2824b23 100644 --- a/src/tracker/udp_server.rs +++ b/src/tracker/udp_server.rs @@ -59,7 +59,7 @@ impl UdpServer { match recv_socket.recv_from(&mut buf) { Ok((n_bytes, src)) => { - log::trace!("(Server {}) Received {} bytes from {}", id, n_bytes, src); + log::trace!("Received {} bytes from {}", n_bytes, src); match try_parse_packet(&buf[..n_bytes]) { Some(req) => { @@ -110,15 +110,10 @@ impl UdpServer { } }; - log::trace!("(Server {}) Tracker response", id); - let n_bytes = match response.response { UdpResponse::Connect(connect) => connect.write_to_buf(&mut buf), UdpResponse::Announce(announce) => match announce { - AnnounceResponse::V4(v4) => { - log::debug!("{:?}", v4); - v4.write_to_buf(&mut buf) - } + AnnounceResponse::V4(v4) => v4.write_to_buf(&mut buf), AnnounceResponse::V6(v6) => v6.write_to_buf(&mut buf), }, UdpResponse::Scrape(scrape) => scrape.write_to_buf(&mut buf), @@ -139,8 +134,7 @@ impl UdpServer { }; log::trace!( - "(Server {}) Sent {} bytes to {}: {:x?}", - id, + "Sent {} bytes to {}: {:x?}", n_sent, response.dst_addr, &buf[..n_bytes]