Compare commits

...

2 Commits

Author SHA1 Message Date
861590a04e working better 2025-08-04 02:21:17 +01:00
b3f06eeaea fruitless changes 2025-08-04 00:16:43 +01:00
6 changed files with 210 additions and 159 deletions

View File

@ -5,8 +5,9 @@ edition = "2024"
[dependencies] [dependencies]
anyhow = "1.0.98" anyhow = "1.0.98"
flume = { version = "0.11.1", features = ["eventual-fairness", "select"] } flume = { version = "0.11.1", features = ["eventual-fairness"] }
heapless = "0.8.0" heapless = "0.8.0"
log = { version = "0.4.27", features = ["release_max_level_info"] } # log = { version = "...", features = ["release_max_level_info"] }
log = "0.4.27"
rand = "0.9.2" rand = "0.9.2"
simplelog = "0.12.2" simplelog = "0.12.2"

View File

@ -9,4 +9,6 @@ OPTIONS:
(example: -A [::1]:6970) (example: -A [::1]:6970)
-h | --help Show this help text -h | --help Show this help text
-v | --version Print the program's build & version information"; -v | --version Print the program's build & version information
--debug Enable debug logging
--trace Enable trace logging (more verbose debug)";

View File

@ -12,13 +12,31 @@ mod tracker;
use tracker::Tracker; use tracker::Tracker;
fn main() -> ExitCode { fn main() -> ExitCode {
let args: Vec<String> = env::args().collect();
if args.contains(&String::from("--help")) || args.contains(&String::from("-h")) {
println!("{}", constants::METEORO_HELP_TEXT);
return ExitCode::SUCCESS;
}
if args.contains(&String::from("--version")) || args.contains(&String::from("-v")) {
println!("{} {}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));
return ExitCode::SUCCESS;
}
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
let log_level: LevelFilter = LevelFilter::Trace; let mut log_level: LevelFilter = LevelFilter::Trace;
#[cfg(not(debug_assertions))] #[cfg(not(debug_assertions))]
let log_level: LevelFilter = LevelFilter::Info; let mut log_level: LevelFilter = LevelFilter::Info;
if args.contains(&String::from("--debug")) {
log_level = LevelFilter::Debug;
} else if args.contains(&String::from("--trace")) {
log_level = LevelFilter::Trace;
}
let logger_config = ConfigBuilder::new() let logger_config = ConfigBuilder::new()
.set_target_level(LevelFilter::Off) .set_target_level(LevelFilter::Trace)
.set_location_level(LevelFilter::Off) .set_location_level(LevelFilter::Off)
.set_thread_level(LevelFilter::Off) .set_thread_level(LevelFilter::Off)
.set_time_level(LevelFilter::Info) .set_time_level(LevelFilter::Info)
@ -32,18 +50,6 @@ fn main() -> ExitCode {
) )
.expect("failed to initialise terminal logging"); .expect("failed to initialise terminal logging");
let args: Vec<String> = env::args().collect();
if args.contains(&String::from("--help")) || args.contains(&String::from("-h")) {
println!("{}", constants::METEORO_HELP_TEXT);
return ExitCode::SUCCESS;
}
if args.contains(&String::from("--version")) || args.contains(&String::from("-v")) {
println!("{} {}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));
return ExitCode::SUCCESS;
}
let ipv4_flag_idxs: Vec<usize> = args let ipv4_flag_idxs: Vec<usize> = args
.iter() .iter()
.enumerate() .enumerate()

View File

@ -199,6 +199,7 @@ pub struct ErrorResponse {
pub message: String, pub message: String,
} }
#[derive(Debug)]
pub enum UdpRequest { pub enum UdpRequest {
Connect(ConnectRequest), Connect(ConnectRequest),
Announce(AnnounceRequest), Announce(AnnounceRequest),

View File

@ -34,9 +34,12 @@ struct PeerMetadata {
} }
struct PeerStatus { struct PeerStatus {
connection_id: i64, socket_addr: SocketAddr,
last_event: Event, last_event: Event,
last_active: Instant, last_active: Instant,
downloaded: u64,
uploaded: u64,
remaining: u64,
} }
pub const GARBAGE_COLLECTION_INTERVAL: Duration = Duration::from_secs(20); pub const GARBAGE_COLLECTION_INTERVAL: Duration = Duration::from_secs(20);
@ -70,7 +73,8 @@ impl Tracker {
pub fn start_on(&mut self, addrs: &[SocketAddr]) -> Result<()> { pub fn start_on(&mut self, addrs: &[SocketAddr]) -> Result<()> {
if addrs.is_empty() { if addrs.is_empty() {
log::error!("No addresses provided for tracker to listen on"); log::warn!("No addresses provided for tracker to listen on");
return Ok(());
} }
// Only one request channel set is necessary as all the server threads send on the same one and the tracker is the only reader // Only one request channel set is necessary as all the server threads send on the same one and the tracker is the only reader
@ -84,25 +88,18 @@ impl Tracker {
let req_chan = req_send.clone(); let req_chan = req_send.clone();
let a = a.clone(); let a = a.clone();
self.server_threads.push( self.server_threads.push(thread::spawn(move || {
thread::Builder::new() let server = UdpServer::new(a, server_id);
.name(format!(
"{} server thread (ID {})", match server.start(req_chan.clone(), resp_recv.clone()) {
env!("CARGO_PKG_NAME"), Err(e) => {
server_id log::error!("Failed to start server {}: {}", server_id, e)
)) }
.spawn(move || { _ => (),
let server = UdpServer::new(a, server_id); };
loop {
match server.start(req_chan.clone(), resp_recv.clone()) { return;
Err(e) => { }));
log::error!("Failed to start server {}: {}", server_id, e)
}
_ => (),
};
}
})?,
);
self.response_channels.push(resp_send); self.response_channels.push(resp_send);
@ -118,12 +115,28 @@ impl Tracker {
loop { loop {
thread::sleep(GARBAGE_COLLECTION_INTERVAL); thread::sleep(GARBAGE_COLLECTION_INTERVAL);
let start = Instant::now(); let s1 = Instant::now();
let expired = Tracker::purge_expired_connections(&mut peers.write().unwrap()); let expired = Tracker::purge_expired_connections(&mut peers.write().unwrap());
Tracker::purge_expired_swarm_peers(&mut info_hashes.write().unwrap(), &expired);
log::trace!("Garbage collection took {} ns", start.elapsed().as_nanos()) let d1 = s1.elapsed();
let s2 = Instant::now();
let purged = Tracker::purge_expired_swarm_peers(&mut info_hashes.write().unwrap());
let d2 = s2.elapsed();
log::debug!(
"Garbage collected {} expired connections (took: {} ns)",
expired,
d1.as_nanos()
);
log::debug!(
"Garbage collected {} inactive peers from the swarm (took: {} ns)",
purged,
d2.as_nanos()
);
log::debug!("Garbage collection took {} ns total", (d1 + d2).as_nanos());
} }
}); });
@ -138,14 +151,10 @@ impl Tracker {
} }
}; };
log::trace!("Request from {} | {:?}", request.src_addr, request.request);
match request.request { match request.request {
UdpRequest::Connect(connect) => { UdpRequest::Connect(connect) => {
log::trace!(
"(Server {}) Connect request: {:?}",
request.server_id,
connect
);
let new_id: i64 = rng.random(); let new_id: i64 = rng.random();
let mut peers = self.peers.write().unwrap(); let mut peers = self.peers.write().unwrap();
@ -174,17 +183,11 @@ impl Tracker {
} }
UdpRequest::Announce(announce) => { UdpRequest::Announce(announce) => {
log::trace!(
"(Server {}) Announce request: {:?}",
request.server_id,
announce
);
if !self.connection_valid(announce.connection_id, request.src_addr) { if !self.connection_valid(announce.connection_id, request.src_addr) {
continue; continue;
} }
// Ensure we honour the desired number of peers, within our bounaries // Ensure we honour the desired number of peers, within our boundaries
let n_announce_want: u32 = if let Some(n) = announce.num_want { let n_announce_want: u32 = if let Some(n) = announce.num_want {
if n < DEFAULT_ANNOUNCE_WANT { if n < DEFAULT_ANNOUNCE_WANT {
@ -197,81 +200,13 @@ impl Tracker {
}; };
let mut n_announce_entries: u32 = 0; let mut n_announce_entries: u32 = 0;
let mut n_leechers: i32 = 0; let mut n_seeders: u32 = 0;
let mut n_seeders: i32 = 0;
let mut v4_peers: Vec<SocketAddrV4> = Vec::new(); let mut v4_peers: Vec<SocketAddrV4> = Vec::new();
let mut v6_peers: Vec<SocketAddrV6> = Vec::new(); let mut v6_peers: Vec<SocketAddrV6> = Vec::new();
let mut info_hashes = self.info_hashes.write().unwrap(); let mut info_hashes = self.info_hashes.write().unwrap();
match info_hashes.get_mut(&announce.info_hash) { match info_hashes.get_mut(&announce.info_hash) {
Some(swarm) => {
if announce.event == Event::Stopped {
swarm.retain(|status| {
status.connection_id != announce.connection_id
});
} else {
let peers = self.peers.read().unwrap();
for peer_status in swarm {
if peer_status.last_event == Event::Stopped {
// Skip any peers who have stopped their download
continue;
}
// State update for ourselves
if peer_status.connection_id == announce.connection_id {
if peer_status.last_event != announce.event
&& announce.event != Event::None
{
// Update last event if the event has changed and is not just a `None` for notifying presence
peer_status.last_event = announce.event
} else {
// Peer status has not changed
peer_status.last_active = Instant::now();
}
continue; // don't give the peer their own connection ID's address
}
if n_announce_entries >= n_announce_want {
break;
}
// Collect and add the relevant peer's address to the appropriate vector
if let Some(a) = peers.get(&peer_status.connection_id) {
if a.socket_addr == request.src_addr {
continue; // don't give the peer an expired ID with the same address
}
match &a.socket_addr {
SocketAddr::V4(v4) => {
if request.src_addr.is_ipv4() {
v4_peers.push(*v4);
}
}
SocketAddr::V6(v6) => {
if request.src_addr.is_ipv6() {
v6_peers.push(*v6);
}
}
};
// Keep track of the seeders, leechers & announce entries gathered
if peer_status.last_event == Event::Completed {
n_seeders += 1;
} else {
n_leechers += 1;
}
n_announce_entries += 1;
}
}
}
}
None => { None => {
// Info hash isn't currently tracked // Info hash isn't currently tracked
// No relevant peers in the swarm // No relevant peers in the swarm
@ -279,19 +214,122 @@ impl Tracker {
info_hashes.insert( info_hashes.insert(
announce.info_hash, announce.info_hash,
vec![PeerStatus { vec![PeerStatus {
connection_id: announce.connection_id, socket_addr: request.src_addr,
last_event: announce.event, last_event: announce.event,
last_active: Instant::now(), last_active: Instant::now(),
downloaded: announce.downloaded as u64,
uploaded: announce.uploaded as u64,
remaining: announce.left as u64,
}], }],
); );
} }
Some(swarm) => {
// Insert into swarm if not already present
// TODO: sort (?)
let existing_swarm_idx: Option<usize> = match swarm
.iter_mut()
.enumerate()
.find(|(_, status)| status.socket_addr == request.src_addr)
{
Some((idx, peer)) => {
// Peer already exists in swarm, update state
peer.last_active = Instant::now();
peer.last_event = announce.event;
peer.downloaded = announce.downloaded as u64;
peer.uploaded = announce.uploaded as u64;
peer.remaining = announce.left as u64;
Some(idx)
}
None => {
// Peer is not in swarm, add it
swarm.push(PeerStatus {
socket_addr: request.src_addr,
last_event: announce.event,
last_active: Instant::now(),
downloaded: announce.downloaded as u64,
uploaded: announce.uploaded as u64,
remaining: announce.left as u64,
});
None
}
};
if announce.event == Event::Stopped {
if let Some(idx) = existing_swarm_idx {
swarm.remove(idx);
}
}
// Iterate over all peers in the swarm for announce response
for peer_status in swarm {
// Respect number of peers requested
if n_announce_entries >= n_announce_want {
break;
}
// Don't provide useless peers
let peer_invalid: bool = (peer_status.last_event == Event::Stopped)
|| (peer_status.socket_addr == request.src_addr);
if peer_invalid {
log::trace!(
"(src: {}) {} -> {:?} deemed invalid",
request.src_addr,
peer_status.socket_addr,
peer_status.last_event
);
continue;
}
// Collect and add the relevant peer's address to the appropriate vector
let is_seed: bool = peer_status.last_event == Event::Completed
|| peer_status.remaining == 0;
match &peer_status.socket_addr {
SocketAddr::V4(v4) => {
if request.src_addr.is_ipv4() {
v4_peers.push(*v4);
n_seeders += is_seed as u32;
}
}
SocketAddr::V6(v6) => {
if request.src_addr.is_ipv6() {
v6_peers.push(*v6);
n_seeders += is_seed as u32;
}
}
};
// Keep track of the announce entries gathered
n_announce_entries += 1;
}
}
}; };
log::trace!(
"(src: {}) IPv4s: {}, IPv6s: {}, leechers: {}, seeders: {}",
request.src_addr,
v4_peers.len(),
v6_peers.len(),
n_announce_entries - n_seeders,
n_seeders
);
let resp_common = AnnounceResponseCommon { let resp_common = AnnounceResponseCommon {
transaction_id: announce.transaction_id, transaction_id: announce.transaction_id,
interval: DEFAULT_ANNOUNCE_INTERVAL.as_secs() as i32, interval: DEFAULT_ANNOUNCE_INTERVAL.as_secs() as i32,
leechers: n_leechers, leechers: (n_announce_entries - n_seeders) as i32,
seeders: n_seeders, seeders: n_seeders as i32,
}; };
let response = if request.src_addr.is_ipv4() { let response = if request.src_addr.is_ipv4() {
// IPv4 // IPv4
@ -320,12 +358,6 @@ impl Tracker {
} }
UdpRequest::Scrape(scrape) => { UdpRequest::Scrape(scrape) => {
log::trace!(
"(Server {}) Scrape request: {:?}",
request.server_id,
scrape
);
if !self.connection_valid(scrape.connection_id, request.src_addr) { if !self.connection_valid(scrape.connection_id, request.src_addr) {
continue; continue;
} }
@ -341,6 +373,9 @@ impl Tracker {
.iter() .iter()
.filter(|&peer_status| { .filter(|&peer_status| {
peer_status.last_event == Event::Completed peer_status.last_event == Event::Completed
|| peer_status.remaining == 0
&& peer_status.socket_addr.is_ipv4()
== request.src_addr.is_ipv4()
}) })
.count() .count()
as i32, as i32,
@ -349,6 +384,9 @@ impl Tracker {
.iter() .iter()
.filter(|&peer_status| { .filter(|&peer_status| {
peer_status.last_event == Event::Started peer_status.last_event == Event::Started
|| peer_status.remaining > 0
&& peer_status.socket_addr.is_ipv4()
== request.src_addr.is_ipv4()
}) })
.count() .count()
as i32, as i32,
@ -391,34 +429,40 @@ impl Tracker {
}; };
} }
fn purge_expired_connections(peers: &mut ConnectionIdMap) -> Vec<i64> { fn purge_expired_connections(peers: &mut ConnectionIdMap) -> usize {
let mut purged: Vec<i64> = Vec::new(); let mut purged: usize = 0;
for (id, metadata) in peers.iter() { peers.retain(|_, metadata| {
if metadata.last_active.elapsed() > CONNECTION_EXPIRE_TIME { if metadata.last_active.elapsed() < CONNECTION_EXPIRE_TIME {
purged.push(*id); true
} else {
purged += 1;
false
} }
} });
for id in &purged {
peers.remove(id);
}
return purged; return purged;
} }
fn purge_expired_swarm_peers(info_hashes: &mut InfoHashMap, expired_connections: &[i64]) { fn purge_expired_swarm_peers(info_hashes: &mut InfoHashMap) -> usize {
info_hashes.retain(|_, swarm| { let mut n_purged: usize = 0;
swarm.is_empty()
|| swarm info_hashes.iter_mut().for_each(|(_, b)| {
.iter() b.retain(|status| {
.find(|status| { if status.last_active.elapsed() < DEFAULT_ANNOUNCE_INTERVAL * 5
expired_connections.contains(&status.connection_id) && status.last_event != Event::Stopped
|| status.last_active.elapsed() > DEFAULT_ANNOUNCE_INTERVAL * 2 {
|| status.last_event == Event::Stopped true
}) } else {
.is_some() n_purged += 1;
false
}
})
}); });
info_hashes.retain(|_, swarm| !swarm.is_empty());
return n_purged;
} }
pub fn should_stop() -> bool { pub fn should_stop() -> bool {

View File

@ -59,7 +59,7 @@ impl UdpServer {
match recv_socket.recv_from(&mut buf) { match recv_socket.recv_from(&mut buf) {
Ok((n_bytes, src)) => { Ok((n_bytes, src)) => {
log::trace!("(Server {}) Received {} bytes from {}", id, n_bytes, src); log::trace!("Received {} bytes from {}", n_bytes, src);
match try_parse_packet(&buf[..n_bytes]) { match try_parse_packet(&buf[..n_bytes]) {
Some(req) => { Some(req) => {
@ -110,8 +110,6 @@ impl UdpServer {
} }
}; };
log::trace!("(Server {}) Tracker response", id);
let n_bytes = match response.response { let n_bytes = match response.response {
UdpResponse::Connect(connect) => connect.write_to_buf(&mut buf), UdpResponse::Connect(connect) => connect.write_to_buf(&mut buf),
UdpResponse::Announce(announce) => match announce { UdpResponse::Announce(announce) => match announce {
@ -136,8 +134,7 @@ impl UdpServer {
}; };
log::trace!( log::trace!(
"(Server {}) Sent {} bytes to {}: {:x?}", "Sent {} bytes to {}: {:x?}",
id,
n_sent, n_sent,
response.dst_addr, response.dst_addr,
&buf[..n_bytes] &buf[..n_bytes]