diff --git a/Cargo.lock b/Cargo.lock index a0c5737..c287a6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -115,6 +115,12 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "itoa" version = "1.0.15" @@ -163,6 +169,7 @@ dependencies = [ "log", "rand", "simplelog", + "threadpool", ] [[package]] @@ -180,6 +187,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "num_threads" version = "0.1.7" @@ -335,6 +352,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + [[package]] name = "time" version = "0.3.41" diff --git a/Cargo.toml b/Cargo.toml index 28cda7e..ec02230 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,7 @@ [package] name = "meteoro" +description = "A UDP BitTorrent tracker" +authors = ["Adam Macdonald "] version = "0.1.0" edition = "2024" @@ -11,3 +13,4 @@ heapless = "0.8.0" log = "0.4.27" rand = "0.9.2" simplelog = "0.12.2" +threadpool = "1.8.1" diff --git a/src/tracker/bittorrent/info_hash.rs b/src/bittorrent/info_hash.rs similarity index 100% rename from src/tracker/bittorrent/info_hash.rs rename to src/bittorrent/info_hash.rs diff --git a/src/tracker/bittorrent/mod.rs b/src/bittorrent/mod.rs similarity index 100% rename from src/tracker/bittorrent/mod.rs rename to src/bittorrent/mod.rs diff --git a/src/tracker/bittorrent/peer.rs b/src/bittorrent/peer.rs similarity index 86% rename from src/tracker/bittorrent/peer.rs rename to src/bittorrent/peer.rs index a86ca0e..b1dcf3c 100644 --- a/src/tracker/bittorrent/peer.rs +++ b/src/bittorrent/peer.rs @@ -9,7 +9,7 @@ pub struct PeerId { impl PeerId { pub fn to_string_lossy(&self) -> String { - String::from_utf8_lossy(&self.bytes).to_string() + String::from_utf8_lossy(&self.bytes).into() } } diff --git a/src/tracker/bittorrent/protocol.rs b/src/bittorrent/protocol.rs similarity index 100% rename from src/tracker/bittorrent/protocol.rs rename to src/bittorrent/protocol.rs diff --git a/src/tracker/bittorrent/udp.rs b/src/bittorrent/udp.rs similarity index 96% rename from src/tracker/bittorrent/udp.rs rename to src/bittorrent/udp.rs index e6a1ffb..ff463a9 100644 --- a/src/tracker/bittorrent/udp.rs +++ b/src/bittorrent/udp.rs @@ -1,10 +1,9 @@ use std::{ - fmt, io::{Cursor, Write}, net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}, }; -use crate::tracker::bittorrent::{ +use crate::bittorrent::{ info_hash::InfoHash, peer::PeerId, protocol::{Action, Event}, @@ -76,6 +75,7 @@ pub struct AnnounceRequest { pub port: u16, } +#[derive(Debug)] pub enum AnnounceResponse { V4(AnnounceResponseV4), V6(AnnounceResponseV6), @@ -94,13 +94,13 @@ pub struct AnnounceResponseCommon { #[derive(Debug)] pub struct AnnounceResponseV4 { - pub inner: AnnounceResponseCommon, + pub common: AnnounceResponseCommon, pub peers: Vec, } #[derive(Debug)] pub struct AnnounceResponseV6 { - pub inner: AnnounceResponseCommon, + pub common: AnnounceResponseCommon, pub peers: Vec, } @@ -119,7 +119,7 @@ impl AnnounceResponseCommon { impl AnnounceResponseV4 { pub fn write_to_buf(&self, buf: &mut [u8]) -> usize { - let written = self.inner.write_to_buf(buf); + let written = self.common.write_to_buf(buf); for (offset, addr) in (written..buf.len()) .step_by(IPV4_ADDR_PAIR_SIZE) @@ -136,7 +136,7 @@ impl AnnounceResponseV4 { impl AnnounceResponseV6 { pub fn write_to_buf(&self, buf: &mut [u8]) -> usize { - let written = self.inner.write_to_buf(buf); + let written = self.common.write_to_buf(buf); for (offset, addr) in (written..buf.len()) .step_by(IPV6_ADDR_PAIR_SIZE) @@ -206,6 +206,7 @@ pub enum UdpRequest { Scrape(ScrapeRequest), } +#[derive(Debug)] pub enum UdpResponse { Connect(ConnectResponse), Announce(AnnounceResponse), diff --git a/src/main.rs b/src/main.rs index ce005ad..d950a2f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,9 +7,13 @@ use std::{ use log::LevelFilter; use simplelog::{ColorChoice, ConfigBuilder, TermLogger, TerminalMode}; +use crate::meteoro::Meteoro; + +mod bittorrent; mod constants; +mod meteoro; mod tracker; -use tracker::Tracker; +mod udp_server; fn main() -> ExitCode { let args: Vec = env::args().collect(); @@ -84,12 +88,10 @@ fn main() -> ExitCode { ))); } - let mut tracker = Tracker::new(); - return match tracker.start_on(&ip_addrs) { + // bufbuf():codingcodingbuf/ + + return match Meteoro::start(&ip_addrs) { Ok(_) => ExitCode::SUCCESS, - Err(e) => { - log::error!("Fatal error: {}", e); - ExitCode::FAILURE - } + Err(_) => ExitCode::FAILURE, }; } diff --git a/src/meteoro.rs b/src/meteoro.rs new file mode 100644 index 0000000..a9614f9 --- /dev/null +++ b/src/meteoro.rs @@ -0,0 +1,109 @@ +use std::net::{SocketAddr, UdpSocket}; +use std::thread; + +use anyhow::Result; +use flume::Sender; +use threadpool::ThreadPool; + +use crate::tracker::{self, RequestMessage, ResponseMessage, Tracker}; +use crate::udp_server; + +/// Instance of the meteoro BitTorrent tracker +/// Manages the tracker, networking & API +pub struct Meteoro {} + +#[repr(u8)] +pub enum ApplicationControlMessage { + /// Signal that the receiver should exit their thread of execution + Exit = 0, +} + +#[repr(u8)] +pub enum ServerControlMessage { + /// Server with `(id, _)` has bound to a socket, providing a clone of the socket + UdpSocketBound((usize, UdpSocket)), +} + +impl Meteoro { + pub fn start(addrs: &[SocketAddr]) -> Result<()> { + // Create channels + + let (ctrl_send, ctrl_recv) = flume::unbounded::(); + let (srv_ctrl_send, srv_ctrl_recv) = flume::unbounded::(); + let (reqs_send, reqs_recv) = flume::unbounded::(); + let resp_chans: Vec<_> = (0..addrs.len()) + .map(|_| flume::unbounded::()) + .collect(); + + // Start networking threads + + { + // Each receiving networking thread requires: + // - a socket address to bind to + // - a control message channel (send), for providing the bound socket + // - a request message channel (send), for providing requests to the tracker + + // Receive threads + + let recv_threads = ThreadPool::new(addrs.len()); + + for (id, addr) in addrs.iter().enumerate() { + let ctrl = srv_ctrl_send.clone(); + let reqs = reqs_send.clone(); + let addr = *addr; + + recv_threads.execute(move || { + let _ = udp_server::receive(id, addr, ctrl, reqs); + }); + } + + // Each sending networking thread requires: + // - a socket to send responses on + // - a response message channel (receive), for receiving handled requests + + // Send threads + + let send_threads = ThreadPool::new(addrs.len()); + + for _ in 0..addrs.len() { + match srv_ctrl_recv.recv()? { + ServerControlMessage::UdpSocketBound((id, socket)) => { + let (_, resps) = resp_chans[id].clone(); + + send_threads.execute(move || { + let _ = udp_server::send(id, socket, resps); + }); + } + _ => (), + }; + } + } + + // Start tracker thread + + let tracker_thread = { + // Tracker requires: + // - a request message channel (recv), for receiving requests from the server(s) + // - corresponding response channels (send), for providing handled requests + + let resps: Vec> = + resp_chans.iter().map(|(a, _)| a.clone()).collect(); + + thread::spawn(move || { + let mut tracker = Tracker::new(); + let _ = tracker.start(reqs_recv, resps.as_slice()); + }) + }; + + // Start API thread + + { + // API thread requires + // - a control message channel (recv) + } + + let _ = tracker_thread.join(); + + return Ok(()); + } +} diff --git a/src/tracker.rs b/src/tracker.rs new file mode 100644 index 0000000..5b3e65d --- /dev/null +++ b/src/tracker.rs @@ -0,0 +1,361 @@ +use std::collections::HashMap; +use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::time::{Duration, Instant}; + +use anyhow::Result; +use flume::{Receiver, Sender}; +use rand::Rng; +use rand::rngs::ThreadRng; + +use crate::bittorrent::info_hash::InfoHash; +use crate::bittorrent::protocol::Event; +use crate::bittorrent::udp::{ + AnnounceResponse, AnnounceResponseCommon, AnnounceResponseV4, AnnounceResponseV6, + ConnectResponse, ScrapeResponse, ScrapeStats, UdpRequest, UdpResponse, +}; + +pub struct RequestMessage { + pub(crate) server_id: usize, + pub(crate) src_addr: SocketAddr, + pub(crate) request: UdpRequest, +} + +pub struct ResponseMessage { + pub(crate) response: UdpResponse, + pub(crate) dst_addr: SocketAddr, +} + +struct PeerMetadata { + socket_addr: SocketAddr, + last_active: Instant, +} + +struct PeerStatus { + socket_addr: SocketAddr, + last_event: Event, + last_active: Instant, + downloaded: u64, + uploaded: u64, + remaining: u64, +} + +pub const GARBAGE_COLLECTION_INTERVAL: Duration = Duration::from_secs(20); + +pub const CONNECTION_EXPIRE_TIME: Duration = Duration::from_secs(90); +pub const DEFAULT_ANNOUNCE_INTERVAL: Duration = Duration::from_secs(60); +pub const DEFAULT_ANNOUNCE_WANT: u32 = 80; + +type ConnectionIdMap = HashMap; +type InfoHashMap = HashMap>; + +pub struct Tracker { + peers: ConnectionIdMap, + info_hashes: InfoHashMap, + rng: ThreadRng, +} + +impl Tracker { + pub fn new() -> Self { + Self { + peers: ConnectionIdMap::new(), + info_hashes: InfoHashMap::new(), + rng: rand::rng(), + } + } + + pub fn start( + &mut self, + reqs: Receiver, + resps: &[Sender], + ) -> Result<()> { + loop { + let request = reqs.recv()?; + + log::trace!("Request from {} | {:?}", request.src_addr, request.request); + + match self.handle_request(&request) { + Some(response) => resps[request.server_id].send(response)?, + None => (), + } + } + } + + fn connection_valid(&self, connection_id: i64, src_addr: SocketAddr) -> bool { + let peers = &self.peers; + + return match peers.get(&connection_id) { + Some(metadata) => metadata.socket_addr == src_addr, + None => false, + }; + } + + fn handle_request(&mut self, request: &RequestMessage) -> Option { + return match &request.request { + UdpRequest::Connect(connect) => { + let new_id: i64 = self.rng.random(); + + let peers = &mut self.peers; + + peers.insert( + new_id, + PeerMetadata { + socket_addr: request.src_addr, + last_active: Instant::now(), + }, + ); + + Some(ResponseMessage { + response: UdpResponse::Connect(ConnectResponse { + transaction_id: connect.transaction_id, + connection_id: new_id, + }), + dst_addr: request.src_addr, + }) + } + + UdpRequest::Announce(announce) => { + if !self.connection_valid(announce.connection_id, request.src_addr) { + return None; + } + + // Ensure we honour the desired number of peers, within our boundaries + + let n_announce_want: u32 = if let Some(n) = announce.num_want { + if n < DEFAULT_ANNOUNCE_WANT { + n + } else { + DEFAULT_ANNOUNCE_WANT + } + } else { + DEFAULT_ANNOUNCE_WANT + }; + + let mut n_announce_entries: u32 = 0; + let mut n_seeders: u32 = 0; + let mut v4_peers: Vec = Vec::new(); + let mut v6_peers: Vec = Vec::new(); + + let info_hashes = &mut self.info_hashes; + + match info_hashes.get_mut(&announce.info_hash) { + None => { + // Info hash isn't currently tracked + // No relevant peers in the swarm + + info_hashes.insert( + announce.info_hash, + vec![PeerStatus { + socket_addr: request.src_addr, + last_event: announce.event, + last_active: Instant::now(), + downloaded: announce.downloaded as u64, + uploaded: announce.uploaded as u64, + remaining: announce.left as u64, + }], + ); + } + Some(swarm) => { + // Insert into swarm if not already present + // TODO: sort (?) + + let existing_swarm_idx: Option = match swarm + .iter_mut() + .enumerate() + .find(|(_, status)| status.socket_addr == request.src_addr) + { + Some((idx, peer)) => { + // Peer already exists in swarm, update state + + peer.last_active = Instant::now(); + peer.last_event = announce.event; + peer.downloaded = announce.downloaded as u64; + peer.uploaded = announce.uploaded as u64; + peer.remaining = announce.left as u64; + + Some(idx) + } + None => { + // Peer is not in swarm, add it + + swarm.push(PeerStatus { + socket_addr: request.src_addr, + last_event: announce.event, + last_active: Instant::now(), + downloaded: announce.downloaded as u64, + uploaded: announce.uploaded as u64, + remaining: announce.left as u64, + }); + + None + } + }; + + if announce.event == Event::Stopped { + if let Some(idx) = existing_swarm_idx { + swarm.remove(idx); + } + } + + // Iterate over all peers in the swarm for announce response + + for peer_status in swarm { + // Respect number of peers requested + + if n_announce_entries >= n_announce_want { + break; + } + + // Don't provide useless peers + // (peers who are no longer seeding or are from the source address) + + let peer_invalid: bool = (peer_status.last_event == Event::Stopped) + || (peer_status.socket_addr == request.src_addr); + + if peer_invalid { + log::trace!( + "(src: {}) {} with status \"{:?}\" deemed invalid", + request.src_addr, + peer_status.socket_addr, + peer_status.last_event + ); + continue; + } + + // Collect and add the relevant peer's address to the appropriate vector + + let is_seed: bool = peer_status.last_event == Event::Completed + || peer_status.remaining == 0; + + match &peer_status.socket_addr { + SocketAddr::V4(v4) => { + if request.src_addr.is_ipv4() { + v4_peers.push(*v4); + n_seeders += is_seed as u32; + } + } + SocketAddr::V6(v6) => { + if request.src_addr.is_ipv6() { + v6_peers.push(*v6); + n_seeders += is_seed as u32; + } + } + }; + + // Keep track of the announce entries gathered + + n_announce_entries += 1; + } + } + }; + + let resp_common = AnnounceResponseCommon { + transaction_id: announce.transaction_id, + interval: DEFAULT_ANNOUNCE_INTERVAL.as_secs() as i32, + leechers: (n_announce_entries - n_seeders) as i32, + seeders: n_seeders as i32, + }; + let response = if request.src_addr.is_ipv4() { + // IPv4 + AnnounceResponse::V4(AnnounceResponseV4 { + common: resp_common, + peers: v4_peers, + }) + } else { + // IPv6 + AnnounceResponse::V6(AnnounceResponseV6 { + common: resp_common, + peers: v6_peers, + }) + }; + + Some(ResponseMessage { + response: UdpResponse::Announce(response), + dst_addr: request.src_addr, + }) + } + + UdpRequest::Scrape(scrape) => { + if !self.connection_valid(scrape.connection_id, request.src_addr) { + return None; + } + + let info_hashes = &self.info_hashes; + + let mut scrape_stats: Vec = Vec::new(); + + for info_hash in &scrape.info_hashes { + match info_hashes.get(info_hash) { + Some(peers) => { + scrape_stats.push(ScrapeStats { + seeders: peers + .iter() + .filter(|&peer_status| { + peer_status.last_event == Event::Completed + || peer_status.remaining == 0 + && peer_status.socket_addr.is_ipv4() + == request.src_addr.is_ipv4() + }) + .count() as i32, + completed: 0, // TODO: keep track of this + leechers: peers + .iter() + .filter(|&peer_status| { + peer_status.last_event == Event::Started + || peer_status.remaining > 0 + && peer_status.socket_addr.is_ipv4() + == request.src_addr.is_ipv4() + }) + .count() as i32, + }); + } + None => scrape_stats.push(ScrapeStats::default()), + }; + } + + Some(ResponseMessage { + response: UdpResponse::Scrape(ScrapeResponse { + transaction_id: scrape.transaction_id, + stats: scrape_stats, + }), + dst_addr: request.src_addr, + }) + } + }; + } + + fn purge_expired_connections(&mut self) -> usize { + let mut purged: usize = 0; + + self.peers.retain(|_, metadata| { + if metadata.last_active.elapsed() < CONNECTION_EXPIRE_TIME { + true + } else { + purged += 1; + false + } + }); + + return purged; + } + + fn purge_expired_swarm_peers(&mut self) -> usize { + let mut n_purged: usize = 0; + + self.info_hashes.iter_mut().for_each(|(_, b)| { + b.retain(|status| { + if status.last_active.elapsed() < DEFAULT_ANNOUNCE_INTERVAL * 5 + && status.last_event != Event::Stopped + { + true + } else { + n_purged += 1; + false + } + }) + }); + + self.info_hashes.retain(|_, swarm| !swarm.is_empty()); + + return n_purged; + } +} diff --git a/src/tracker/mod.rs b/src/tracker/mod.rs deleted file mode 100644 index 5cc4e4e..0000000 --- a/src/tracker/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod bittorrent; -mod tracker; -mod udp_server; - -pub use tracker::Tracker; diff --git a/src/tracker/tracker.rs b/src/tracker/tracker.rs deleted file mode 100644 index ad0f4d4..0000000 --- a/src/tracker/tracker.rs +++ /dev/null @@ -1,471 +0,0 @@ -use std::collections::HashMap; -use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, RwLock}; -use std::thread; -use std::time::{Duration, Instant}; - -use anyhow::Result; -use flume::Sender; -use rand::Rng; - -use crate::tracker::bittorrent::info_hash::InfoHash; -use crate::tracker::bittorrent::protocol::Event; -use crate::tracker::bittorrent::udp::{ - AnnounceResponse, AnnounceResponseCommon, AnnounceResponseV4, AnnounceResponseV6, - ConnectResponse, ScrapeResponse, ScrapeStats, UdpRequest, UdpResponse, -}; -use crate::tracker::udp_server::UdpServer; - -pub struct RequestMessage { - pub(crate) request: UdpRequest, - pub(crate) src_addr: SocketAddr, - pub(crate) server_id: u32, -} - -pub struct ResponseMessage { - pub(crate) response: UdpResponse, - pub(crate) dst_addr: SocketAddr, -} - -struct PeerMetadata { - socket_addr: SocketAddr, - last_active: Instant, -} - -struct PeerStatus { - socket_addr: SocketAddr, - last_event: Event, - last_active: Instant, - downloaded: u64, - uploaded: u64, - remaining: u64, -} - -pub const GARBAGE_COLLECTION_INTERVAL: Duration = Duration::from_secs(20); - -pub const CONNECTION_EXPIRE_TIME: Duration = Duration::from_secs(90); -pub const DEFAULT_ANNOUNCE_INTERVAL: Duration = Duration::from_secs(60); -pub const DEFAULT_ANNOUNCE_WANT: u32 = 80; - -type ConnectionIdMap = HashMap; -type InfoHashMap = HashMap>; - -pub static SHOULD_STOP_SIGNAL: AtomicBool = AtomicBool::new(false); - -pub struct Tracker { - server_threads: Vec>, - response_channels: Vec>, - /// Map of IPs & ports to peer connection IDs - peers: Arc>, - info_hashes: Arc>, -} - -impl Tracker { - pub fn new() -> Self { - Tracker { - server_threads: Vec::new(), - response_channels: Vec::new(), - peers: Arc::new(RwLock::new(HashMap::new())), - info_hashes: Arc::new(RwLock::new(HashMap::new())), - } - } - - pub fn start_on(&mut self, addrs: &[SocketAddr]) -> Result<()> { - if addrs.is_empty() { - log::warn!("No addresses provided for tracker to listen on"); - return Ok(()); - } - - // Only one request channel set is necessary as all the server threads send on the same one and the tracker is the only reader - let (req_send, req_recv) = flume::unbounded::(); - - let mut server_id: u32 = 0; - - for a in addrs { - // Each UDP server gets its own response channels - let (resp_send, resp_recv) = flume::unbounded::(); - let req_chan = req_send.clone(); - let a = a.clone(); - - self.server_threads.push(thread::spawn(move || { - let server = UdpServer::new(a, server_id); - - match server.start(req_chan.clone(), resp_recv.clone()) { - Err(e) => { - log::error!("Failed to start server {}: {}", server_id, e) - } - _ => (), - }; - - return; - })); - - self.response_channels.push(resp_send); - - server_id += 1; - } - - let gc_peers = self.peers.clone(); - let gc_info_hashes = self.info_hashes.clone(); - let _gc_thread = thread::spawn(move || { - let peers = gc_peers; - let info_hashes = gc_info_hashes; - - loop { - thread::sleep(GARBAGE_COLLECTION_INTERVAL); - - let s1 = Instant::now(); - - let expired = Tracker::purge_expired_connections(&mut peers.write().unwrap()); - - let d1 = s1.elapsed(); - let s2 = Instant::now(); - - let purged = Tracker::purge_expired_swarm_peers(&mut info_hashes.write().unwrap()); - - let d2 = s2.elapsed(); - - log::debug!( - "Garbage collected {} expired connections (took: {} ns)", - expired, - d1.as_nanos() - ); - log::debug!( - "Garbage collected {} inactive peers from the swarm (took: {} ns)", - purged, - d2.as_nanos() - ); - log::debug!("Garbage collection took {} ns total", (d1 + d2).as_nanos()); - } - }); - - let mut rng = rand::rng(); - - loop { - let request = match req_recv.recv() { - Ok(r) => r, - _ => { - log::error!("Internal tracker error: server request channel closed"); - break; - } - }; - - log::trace!("Request from {} | {:?}", request.src_addr, request.request); - - match request.request { - UdpRequest::Connect(connect) => { - let new_id: i64 = rng.random(); - - let mut peers = self.peers.write().unwrap(); - - peers.insert( - new_id, - PeerMetadata { - socket_addr: request.src_addr, - last_active: Instant::now(), - }, - ); - - match self.response_channels[request.server_id as usize].send(ResponseMessage { - response: UdpResponse::Connect(ConnectResponse { - transaction_id: connect.transaction_id, - connection_id: new_id, - }), - dst_addr: request.src_addr, - }) { - Ok(r) => r, - _ => { - log::error!("Internal tracker error: server response channel closed"); - break; - } - }; - } - - UdpRequest::Announce(announce) => { - if !self.connection_valid(announce.connection_id, request.src_addr) { - continue; - } - - // Ensure we honour the desired number of peers, within our boundaries - - let n_announce_want: u32 = if let Some(n) = announce.num_want { - if n < DEFAULT_ANNOUNCE_WANT { - n - } else { - DEFAULT_ANNOUNCE_WANT - } - } else { - DEFAULT_ANNOUNCE_WANT - }; - - let mut n_announce_entries: u32 = 0; - let mut n_seeders: u32 = 0; - let mut v4_peers: Vec = Vec::new(); - let mut v6_peers: Vec = Vec::new(); - - let mut info_hashes = self.info_hashes.write().unwrap(); - - match info_hashes.get_mut(&announce.info_hash) { - None => { - // Info hash isn't currently tracked - // No relevant peers in the swarm - - info_hashes.insert( - announce.info_hash, - vec![PeerStatus { - socket_addr: request.src_addr, - last_event: announce.event, - last_active: Instant::now(), - downloaded: announce.downloaded as u64, - uploaded: announce.uploaded as u64, - remaining: announce.left as u64, - }], - ); - } - Some(swarm) => { - // Insert into swarm if not already present - // TODO: sort (?) - - let existing_swarm_idx: Option = match swarm - .iter_mut() - .enumerate() - .find(|(_, status)| status.socket_addr == request.src_addr) - { - Some((idx, peer)) => { - // Peer already exists in swarm, update state - - peer.last_active = Instant::now(); - peer.last_event = announce.event; - peer.downloaded = announce.downloaded as u64; - peer.uploaded = announce.uploaded as u64; - peer.remaining = announce.left as u64; - - Some(idx) - } - None => { - // Peer is not in swarm, add it - - swarm.push(PeerStatus { - socket_addr: request.src_addr, - last_event: announce.event, - last_active: Instant::now(), - downloaded: announce.downloaded as u64, - uploaded: announce.uploaded as u64, - remaining: announce.left as u64, - }); - - None - } - }; - - if announce.event == Event::Stopped { - if let Some(idx) = existing_swarm_idx { - swarm.remove(idx); - } - } - - // Iterate over all peers in the swarm for announce response - - for peer_status in swarm { - // Respect number of peers requested - - if n_announce_entries >= n_announce_want { - break; - } - - // Don't provide useless peers - - let peer_invalid: bool = (peer_status.last_event == Event::Stopped) - || (peer_status.socket_addr == request.src_addr); - - if peer_invalid { - log::trace!( - "(src: {}) {} -> {:?} deemed invalid", - request.src_addr, - peer_status.socket_addr, - peer_status.last_event - ); - continue; - } - - // Collect and add the relevant peer's address to the appropriate vector - - let is_seed: bool = peer_status.last_event == Event::Completed - || peer_status.remaining == 0; - - match &peer_status.socket_addr { - SocketAddr::V4(v4) => { - if request.src_addr.is_ipv4() { - v4_peers.push(*v4); - n_seeders += is_seed as u32; - } - } - SocketAddr::V6(v6) => { - if request.src_addr.is_ipv6() { - v6_peers.push(*v6); - n_seeders += is_seed as u32; - } - } - }; - - // Keep track of the announce entries gathered - - n_announce_entries += 1; - } - } - }; - - log::trace!( - "(src: {}) IPv4s: {}, IPv6s: {}, leechers: {}, seeders: {}", - request.src_addr, - v4_peers.len(), - v6_peers.len(), - n_announce_entries - n_seeders, - n_seeders - ); - - let resp_common = AnnounceResponseCommon { - transaction_id: announce.transaction_id, - interval: DEFAULT_ANNOUNCE_INTERVAL.as_secs() as i32, - leechers: (n_announce_entries - n_seeders) as i32, - seeders: n_seeders as i32, - }; - let response = if request.src_addr.is_ipv4() { - // IPv4 - AnnounceResponse::V4(AnnounceResponseV4 { - inner: resp_common, - peers: v4_peers, - }) - } else { - // IPv6 - AnnounceResponse::V6(AnnounceResponseV6 { - inner: resp_common, - peers: v6_peers, - }) - }; - - match self.response_channels[request.server_id as usize].send(ResponseMessage { - response: UdpResponse::Announce(response), - dst_addr: request.src_addr, - }) { - Ok(r) => r, - _ => { - log::error!("Internal tracker error: server response channel closed"); - break; - } - }; - } - - UdpRequest::Scrape(scrape) => { - if !self.connection_valid(scrape.connection_id, request.src_addr) { - continue; - } - - let info_hashes = self.info_hashes.read().unwrap(); - - let mut scrape_stats: Vec = Vec::new(); - for info_hash in &scrape.info_hashes { - match info_hashes.get(info_hash) { - Some(peers) => { - scrape_stats.push(ScrapeStats { - seeders: peers - .iter() - .filter(|&peer_status| { - peer_status.last_event == Event::Completed - || peer_status.remaining == 0 - && peer_status.socket_addr.is_ipv4() - == request.src_addr.is_ipv4() - }) - .count() - as i32, - completed: 0, // TODO: keep track of this - leechers: peers - .iter() - .filter(|&peer_status| { - peer_status.last_event == Event::Started - || peer_status.remaining > 0 - && peer_status.socket_addr.is_ipv4() - == request.src_addr.is_ipv4() - }) - .count() - as i32, - }); - } - None => scrape_stats.push(ScrapeStats::default()), - }; - } - - match self.response_channels[request.server_id as usize].send(ResponseMessage { - response: UdpResponse::Scrape(ScrapeResponse { - transaction_id: scrape.transaction_id, - stats: scrape_stats, - }), - dst_addr: request.src_addr, - }) { - Ok(r) => r, - _ => { - log::error!("Internal tracker error: server response channel closed"); - break; - } - }; - } - } - } - - self.server_threads.drain(..).for_each(|t| { - let _ = t.join(); - }); - - return Ok(()); - } - - fn connection_valid(&self, connection_id: i64, src_addr: SocketAddr) -> bool { - let peers = self.peers.read().unwrap(); - - return match peers.get(&connection_id) { - Some(metadata) => metadata.socket_addr == src_addr, - None => false, - }; - } - - fn purge_expired_connections(peers: &mut ConnectionIdMap) -> usize { - let mut purged: usize = 0; - - peers.retain(|_, metadata| { - if metadata.last_active.elapsed() < CONNECTION_EXPIRE_TIME { - true - } else { - purged += 1; - false - } - }); - - return purged; - } - - fn purge_expired_swarm_peers(info_hashes: &mut InfoHashMap) -> usize { - let mut n_purged: usize = 0; - - info_hashes.iter_mut().for_each(|(_, b)| { - b.retain(|status| { - if status.last_active.elapsed() < DEFAULT_ANNOUNCE_INTERVAL * 5 - && status.last_event != Event::Stopped - { - true - } else { - n_purged += 1; - false - } - }) - }); - - info_hashes.retain(|_, swarm| !swarm.is_empty()); - - return n_purged; - } - - pub fn should_stop() -> bool { - SHOULD_STOP_SIGNAL.load(Ordering::Relaxed) - } -} diff --git a/src/tracker/udp_server.rs b/src/tracker/udp_server.rs deleted file mode 100644 index 2824b23..0000000 --- a/src/tracker/udp_server.rs +++ /dev/null @@ -1,254 +0,0 @@ -use std::net::{Ipv4Addr, SocketAddr, UdpSocket}; -use std::thread; - -use anyhow::Result; -use flume::{Receiver, Sender}; -use log; - -use crate::tracker::Tracker; -use crate::tracker::bittorrent::info_hash::InfoHash; -use crate::tracker::bittorrent::peer::PeerId; -use crate::tracker::bittorrent::protocol::Event; -use crate::tracker::bittorrent::udp::{ - AnnounceRequest, AnnounceResponse, MIN_ANNOUNCE_REQUEST_SIZE, MIN_SCRAPE_REQUEST_SIZE, - ScrapeRequest, UdpResponse, -}; -use crate::tracker::{ - bittorrent::{ - protocol::{Action, UDP_MAGIC}, - udp::{ConnectRequest, UdpRequest}, - }, - tracker::{RequestMessage, ResponseMessage}, -}; - -pub struct UdpServer { - addr: SocketAddr, - server_id: u32, -} - -const UDP_RECV_BUF_SIZE: usize = u16::MAX as usize; -const UDP_SEND_BUF_SIZE: usize = u16::MAX as usize; // Could also be 1500 for typical Ethernet MTU (?) - -impl UdpServer { - pub fn new(addr: SocketAddr, id: u32) -> Self { - UdpServer { - addr: addr, - server_id: id, - } - } - - pub fn start( - &self, - req_chan: Sender, - resp_chan: Receiver, - ) -> Result<()> { - let recv_socket = UdpSocket::bind(self.addr)?; - let send_socket = recv_socket.try_clone()?; - let local_addr = recv_socket.local_addr()?; - let id = self.server_id; - - log::info!("Starting UDP server on: {:?}", local_addr); - - let recv_thread = thread::spawn(move || { - let mut buf: [u8; UDP_RECV_BUF_SIZE] = [0; UDP_RECV_BUF_SIZE]; - - loop { - if Tracker::should_stop() { - break; - } - - match recv_socket.recv_from(&mut buf) { - Ok((n_bytes, src)) => { - log::trace!("Received {} bytes from {}", n_bytes, src); - - match try_parse_packet(&buf[..n_bytes]) { - Some(req) => { - if req_chan - .send(RequestMessage { - request: req, - src_addr: src, - server_id: id, - }) - .is_err() - { - log::error!( - "Internal tracker error: server {} request channel closed", - id - ); - break; - } - } - None => continue, - }; - } - Err(error) => { - log::error!("Failed to receive on socket: {}", error); - break; - } - } - } - - log::debug!("Server {} receive thread shutting exiting ...", id) - }); - - let send_thread = thread::spawn(move || { - let mut buf: [u8; UDP_SEND_BUF_SIZE] = [0; UDP_SEND_BUF_SIZE]; - - loop { - if Tracker::should_stop() { - break; - } - - let response = match resp_chan.recv() { - Ok(r) => r, - _ => { - log::error!( - "Internal tracker error: server {} response channel closed", - id - ); - break; - } - }; - - let n_bytes = match response.response { - UdpResponse::Connect(connect) => connect.write_to_buf(&mut buf), - UdpResponse::Announce(announce) => match announce { - AnnounceResponse::V4(v4) => v4.write_to_buf(&mut buf), - AnnounceResponse::V6(v6) => v6.write_to_buf(&mut buf), - }, - UdpResponse::Scrape(scrape) => scrape.write_to_buf(&mut buf), - UdpResponse::Error(_) => 0, - }; - - let n_sent = match send_socket.send_to(&buf[..n_bytes], response.dst_addr) { - Ok(n) => n, - Err(e) => { - log::error!( - "Failed to send {} bytes to {}: {}", - n_bytes, - response.dst_addr, - e - ); - 0 - } - }; - - log::trace!( - "Sent {} bytes to {}: {:x?}", - n_sent, - response.dst_addr, - &buf[..n_bytes] - ); - } - - log::debug!("Server {} send thread shutting exiting ...", id) - }); - - let _ = recv_thread.join(); - let _ = send_thread.join(); - - log::info!("Stopped UDP server on: {:?}", local_addr); - - return Ok(()); - } -} - -fn try_parse_packet(buf: &[u8]) -> Option { - if buf.len() < 16 { - return None; - } - - let action: Action = Action::from_i32(i32::from_be_bytes(buf[8..12].try_into().ok()?))?; - - let request = match action { - Action::Connect => UdpRequest::Connect(try_parse_connect(&buf)?), - Action::Announce => UdpRequest::Announce(try_parse_announce(&buf)?), - Action::Scrape => UdpRequest::Scrape(try_parse_scrape(&buf)?), - _ => return None, - }; - - return Some(request); -} - -fn try_parse_connect(buf: &[u8]) -> Option { - // Buffer length is checked to be at least 16 in `try_parse_packet` - - let protocol_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?); - - if protocol_id != UDP_MAGIC { - return None; - } - - Some(ConnectRequest { - protocol_id: protocol_id, - transaction_id: i32::from_be_bytes(buf[12..16].try_into().ok()?), - }) -} - -fn try_parse_announce(buf: &[u8]) -> Option { - if buf.len() < MIN_ANNOUNCE_REQUEST_SIZE { - return None; - } - - let connection_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?); - let transaction_id: i32 = i32::from_be_bytes(buf[12..16].try_into().ok()?); - let info_hash: InfoHash = InfoHash { - bytes: buf[16..36].try_into().ok()?, - }; - let peer_id: PeerId = PeerId { - bytes: buf[36..56].try_into().unwrap_or_default(), - }; - let downloaded: i64 = i64::from_be_bytes(buf[56..64].try_into().ok()?); - let left: i64 = i64::from_be_bytes(buf[64..72].try_into().ok()?); - let uploaded: i64 = i64::from_be_bytes(buf[72..80].try_into().ok()?); - let event: Event = Event::from_i32(i32::from_be_bytes(buf[80..84].try_into().ok()?))?; - let ip_addr: Option = if buf[84..88].iter().all(|b| *b == 0x00) { - None - } else { - Some(Ipv4Addr::from_bits(u32::from_be_bytes( - buf[84..88].try_into().ok()?, - ))) - }; - let key: u32 = u32::from_be_bytes(buf[88..92].try_into().ok()?); - let num_want: Option = { - let want = i32::from_be_bytes(buf[92..96].try_into().ok()?); - if want < 0 { None } else { Some(want as u32) } - }; - let port: u16 = u16::from_be_bytes(buf[96..98].try_into().ok()?); - - Some(AnnounceRequest { - connection_id: connection_id, - transaction_id: transaction_id, - info_hash: info_hash, - peer_id: peer_id, - downloaded: downloaded, - left: left, - uploaded: uploaded, - event: event, - ipv4_address: ip_addr, - key: key, - num_want: num_want, - port: port, - }) -} - -fn try_parse_scrape(buf: &[u8]) -> Option { - if buf.len() < MIN_SCRAPE_REQUEST_SIZE { - return None; - } - - let connection_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?); - let transaction_id: i32 = i32::from_be_bytes(buf[12..16].try_into().ok()?); - let info_hashes: Vec = buf[16..] - .chunks_exact(20) - .map(|b| InfoHash { - bytes: b.try_into().unwrap(), // unwrap: `chunks_exact` guarantees the size - }) - .collect(); - - Some(ScrapeRequest { - connection_id: connection_id, - transaction_id: transaction_id, - info_hashes: info_hashes, - }) -} diff --git a/src/udp_server.rs b/src/udp_server.rs new file mode 100644 index 0000000..48473e6 --- /dev/null +++ b/src/udp_server.rs @@ -0,0 +1,214 @@ +use std::net::{Ipv4Addr, SocketAddr, UdpSocket}; + +use anyhow::Result; +use flume::{Receiver, Sender}; +use log; + +use crate::bittorrent::info_hash::InfoHash; +use crate::bittorrent::peer::PeerId; +use crate::bittorrent::protocol::Event; +use crate::bittorrent::udp::{ + AnnounceRequest, AnnounceResponse, MIN_ANNOUNCE_REQUEST_SIZE, MIN_SCRAPE_REQUEST_SIZE, + ScrapeRequest, UdpResponse, +}; +use crate::meteoro::ServerControlMessage; +use crate::{ + bittorrent::{ + protocol::{Action, UDP_MAGIC}, + udp::{ConnectRequest, UdpRequest}, + }, + tracker::{RequestMessage, ResponseMessage}, +}; + +const UDP_RECV_BUF_SIZE: usize = u16::MAX as usize; +const UDP_SEND_BUF_SIZE: usize = u16::MAX as usize; // Could also be 1500 for typical Ethernet MTU (?) + +pub fn receive( + server_id: usize, + addr: SocketAddr, + ctrl: Sender, + reqs: Sender, +) -> Result<()> { + let socket = UdpSocket::bind(addr)?; + + // Provide cloned socket for networking send thread + + ctrl.send(ServerControlMessage::UdpSocketBound(( + server_id, + socket.try_clone()?, + )))?; + + log::debug!("Starting UDP server {} on {}", server_id, addr); + + let mut buf: [u8; UDP_RECV_BUF_SIZE] = [0; UDP_RECV_BUF_SIZE]; + + loop { + match socket.recv_from(&mut buf) { + Ok((n_bytes, src)) => { + log::trace!("Received {} bytes from {}", n_bytes, src); + + match try_parse_packet(&buf[..n_bytes]) { + Some(req) => reqs.send(RequestMessage { + request: req, + src_addr: src, + server_id: server_id, + })?, + None => (), + }; + } + Err(error) => { + log::error!( + "Server {} failed to receive on socket: {}", + server_id, + error + ); + continue; + } + } + } +} + +pub fn send(server_id: usize, socket: UdpSocket, resps: Receiver) -> Result<()> { + let mut buf: [u8; UDP_SEND_BUF_SIZE] = [0; UDP_SEND_BUF_SIZE]; + + loop { + let response = resps.recv()?; + + log::trace!( + "Response to {} | {:?}", + response.dst_addr, + response.response + ); + + let n_bytes = match response.response { + UdpResponse::Connect(connect) => connect.write_to_buf(&mut buf), + UdpResponse::Announce(announce) => match announce { + AnnounceResponse::V4(v4) => v4.write_to_buf(&mut buf), + AnnounceResponse::V6(v6) => v6.write_to_buf(&mut buf), + }, + UdpResponse::Scrape(scrape) => scrape.write_to_buf(&mut buf), + UdpResponse::Error(_) => 0, + }; + + let n_sent = match socket.send_to(&buf[..n_bytes], response.dst_addr) { + Ok(n) => n, + Err(e) => { + log::error!( + "Server {} failed to send {} bytes to {}: {}", + server_id, + n_bytes, + response.dst_addr, + e + ); + continue; + } + }; + + log::trace!( + "Sent {} bytes to {}: {:x?}", + n_sent, + response.dst_addr, + &buf[..n_bytes] + ); + } +} + +fn try_parse_packet(buf: &[u8]) -> Option { + if buf.len() < 16 { + return None; + } + + let action: Action = Action::from_i32(i32::from_be_bytes(buf[8..12].try_into().ok()?))?; + + let request = match action { + Action::Connect => UdpRequest::Connect(try_parse_connect(&buf)?), + Action::Announce => UdpRequest::Announce(try_parse_announce(&buf)?), + Action::Scrape => UdpRequest::Scrape(try_parse_scrape(&buf)?), + _ => return None, + }; + + return Some(request); +} + +fn try_parse_connect(buf: &[u8]) -> Option { + // Buffer length is checked to be at least 16 in `try_parse_packet` + + let protocol_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?); + + if protocol_id != UDP_MAGIC { + return None; + } + + Some(ConnectRequest { + protocol_id: protocol_id, + transaction_id: i32::from_be_bytes(buf[12..16].try_into().ok()?), + }) +} + +fn try_parse_announce(buf: &[u8]) -> Option { + if buf.len() < MIN_ANNOUNCE_REQUEST_SIZE { + return None; + } + + let connection_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?); + let transaction_id: i32 = i32::from_be_bytes(buf[12..16].try_into().ok()?); + let info_hash: InfoHash = InfoHash { + bytes: buf[16..36].try_into().ok()?, + }; + let peer_id: PeerId = PeerId { + bytes: buf[36..56].try_into().unwrap_or_default(), + }; + let downloaded: i64 = i64::from_be_bytes(buf[56..64].try_into().ok()?); + let left: i64 = i64::from_be_bytes(buf[64..72].try_into().ok()?); + let uploaded: i64 = i64::from_be_bytes(buf[72..80].try_into().ok()?); + let event: Event = Event::from_i32(i32::from_be_bytes(buf[80..84].try_into().ok()?))?; + let ip_addr: Option = if buf[84..88].iter().all(|b| *b == 0x00) { + None + } else { + Some(Ipv4Addr::from_bits(u32::from_be_bytes( + buf[84..88].try_into().ok()?, + ))) + }; + let key: u32 = u32::from_be_bytes(buf[88..92].try_into().ok()?); + let num_want: Option = { + let want = i32::from_be_bytes(buf[92..96].try_into().ok()?); + if want < 0 { None } else { Some(want as u32) } + }; + let port: u16 = u16::from_be_bytes(buf[96..98].try_into().ok()?); + + Some(AnnounceRequest { + connection_id: connection_id, + transaction_id: transaction_id, + info_hash: info_hash, + peer_id: peer_id, + downloaded: downloaded, + left: left, + uploaded: uploaded, + event: event, + ipv4_address: ip_addr, + key: key, + num_want: num_want, + port: port, + }) +} + +fn try_parse_scrape(buf: &[u8]) -> Option { + if buf.len() < MIN_SCRAPE_REQUEST_SIZE { + return None; + } + + let connection_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?); + let transaction_id: i32 = i32::from_be_bytes(buf[12..16].try_into().ok()?); + let info_hashes: Vec = buf[16..] + .chunks_exact(20) + .map(|b| InfoHash { + bytes: b.try_into().unwrap(), // unwrap: `chunks_exact` guarantees the size + }) + .collect(); + + Some(ScrapeRequest { + connection_id: connection_id, + transaction_id: transaction_id, + info_hashes: info_hashes, + }) +}