fruitless changes

This commit is contained in:
Adam 2025-08-04 00:16:43 +01:00
parent b32f00499a
commit b3f06eeaea
2 changed files with 124 additions and 93 deletions

View File

@ -1,5 +1,6 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6}; use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6};
use std::os::linux::raw::stat;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::thread; use std::thread;
@ -70,7 +71,8 @@ impl Tracker {
pub fn start_on(&mut self, addrs: &[SocketAddr]) -> Result<()> { pub fn start_on(&mut self, addrs: &[SocketAddr]) -> Result<()> {
if addrs.is_empty() { if addrs.is_empty() {
log::error!("No addresses provided for tracker to listen on"); log::warn!("No addresses provided for tracker to listen on");
return Ok(());
} }
// Only one request channel set is necessary as all the server threads send on the same one and the tracker is the only reader // Only one request channel set is necessary as all the server threads send on the same one and the tracker is the only reader
@ -84,25 +86,17 @@ impl Tracker {
let req_chan = req_send.clone(); let req_chan = req_send.clone();
let a = a.clone(); let a = a.clone();
self.server_threads.push( self.server_threads.push(thread::spawn(move || {
thread::Builder::new() let server = UdpServer::new(a, server_id);
.name(format!( loop {
"{} server thread (ID {})", match server.start(req_chan.clone(), resp_recv.clone()) {
env!("CARGO_PKG_NAME"), Err(e) => {
server_id log::error!("Failed to start server {}: {}", server_id, e)
))
.spawn(move || {
let server = UdpServer::new(a, server_id);
loop {
match server.start(req_chan.clone(), resp_recv.clone()) {
Err(e) => {
log::error!("Failed to start server {}: {}", server_id, e)
}
_ => (),
};
} }
})?, _ => (),
); };
}
}));
self.response_channels.push(resp_send); self.response_channels.push(resp_send);
@ -121,9 +115,12 @@ impl Tracker {
let start = Instant::now(); let start = Instant::now();
let expired = Tracker::purge_expired_connections(&mut peers.write().unwrap()); let expired = Tracker::purge_expired_connections(&mut peers.write().unwrap());
Tracker::purge_expired_swarm_peers(&mut info_hashes.write().unwrap(), &expired); let purged =
Tracker::purge_expired_swarm_peers(&mut info_hashes.write().unwrap(), &expired);
log::trace!("Garbage collection took {} ns", start.elapsed().as_nanos()) log::debug!("Garbage collected {} expired connections", expired.len());
log::debug!("Garbage collected {} inactive peers from the swarm", purged);
log::debug!("Garbage collection took {} ns", start.elapsed().as_nanos())
} }
}); });
@ -205,73 +202,6 @@ impl Tracker {
let mut info_hashes = self.info_hashes.write().unwrap(); let mut info_hashes = self.info_hashes.write().unwrap();
match info_hashes.get_mut(&announce.info_hash) { match info_hashes.get_mut(&announce.info_hash) {
Some(swarm) => {
if announce.event == Event::Stopped {
swarm.retain(|status| {
status.connection_id != announce.connection_id
});
} else {
let peers = self.peers.read().unwrap();
for peer_status in swarm {
if peer_status.last_event == Event::Stopped {
// Skip any peers who have stopped their download
continue;
}
// State update for ourselves
if peer_status.connection_id == announce.connection_id {
if peer_status.last_event != announce.event
&& announce.event != Event::None
{
// Update last event if the event has changed and is not just a `None` for notifying presence
peer_status.last_event = announce.event
} else {
// Peer status has not changed
peer_status.last_active = Instant::now();
}
continue; // don't give the peer their own connection ID's address
}
if n_announce_entries >= n_announce_want {
break;
}
// Collect and add the relevant peer's address to the appropriate vector
if let Some(a) = peers.get(&peer_status.connection_id) {
if a.socket_addr == request.src_addr {
continue; // don't give the peer an expired ID with the same address
}
match &a.socket_addr {
SocketAddr::V4(v4) => {
if request.src_addr.is_ipv4() {
v4_peers.push(*v4);
}
}
SocketAddr::V6(v6) => {
if request.src_addr.is_ipv6() {
v6_peers.push(*v6);
}
}
};
// Keep track of the seeders, leechers & announce entries gathered
if peer_status.last_event == Event::Completed {
n_seeders += 1;
} else {
n_leechers += 1;
}
n_announce_entries += 1;
}
}
}
}
None => { None => {
// Info hash isn't currently tracked // Info hash isn't currently tracked
// No relevant peers in the swarm // No relevant peers in the swarm
@ -285,6 +215,91 @@ impl Tracker {
}], }],
); );
} }
Some(swarm) => {
// Insert into swarm if not already present
// TODO: sort (?)
let this_swarm_idx: Option<usize> =
match swarm.iter_mut().enumerate().find(|(idx, status)| {
status.connection_id == announce.connection_id
}) {
Some((idx, peer)) => {
// Peer already exists in swarm, update state
peer.last_active = Instant::now();
peer.last_event = announce.event;
Some(idx)
}
None => {
// Peer is not in swarm, add it
swarm.push(PeerStatus {
connection_id: announce.connection_id,
last_event: announce.event,
last_active: Instant::now(),
});
None
}
};
if announce.event == Event::Stopped {
if let Some(idx) = this_swarm_idx {
swarm.remove(idx);
}
}
let peers = self.peers.read().unwrap();
for peer_status in swarm {
if peer_status.last_event == Event::Stopped {
// Skip any peers who have stopped their download
continue;
}
if peer_status.connection_id == announce.connection_id {
continue; // don't give the peer their own connection ID's address
}
// Respect number of peers requested
if n_announce_entries >= n_announce_want {
break;
}
// Collect and add the relevant peer's address to the appropriate vector
if let Some(metadata) = peers.get(&peer_status.connection_id) {
if metadata.socket_addr == request.src_addr {
continue; // don't give the peer an expired ID with the same address
}
match &metadata.socket_addr {
SocketAddr::V4(v4) => {
if request.src_addr.is_ipv4() {
v4_peers.push(*v4);
}
}
SocketAddr::V6(v6) => {
if request.src_addr.is_ipv6() {
v6_peers.push(*v6);
}
}
};
// Keep track of the seeders, leechers & announce entries gathered
if peer_status.last_event == Event::Completed {
n_seeders += 1;
} else {
n_leechers += 1;
}
n_announce_entries += 1;
}
}
}
}; };
let resp_common = AnnounceResponseCommon { let resp_common = AnnounceResponseCommon {
@ -407,18 +422,31 @@ impl Tracker {
return purged; return purged;
} }
fn purge_expired_swarm_peers(info_hashes: &mut InfoHashMap, expired_connections: &[i64]) { fn purge_expired_swarm_peers(
info_hashes: &mut InfoHashMap,
expired_connections: &[i64],
) -> usize {
let mut n_purged: usize = 0;
info_hashes.retain(|_, swarm| { info_hashes.retain(|_, swarm| {
swarm.is_empty() swarm.is_empty()
|| swarm || swarm
.iter() .iter()
.find(|status| { .find(|status| {
expired_connections.contains(&status.connection_id) let cond = expired_connections.contains(&status.connection_id)
|| status.last_active.elapsed() > DEFAULT_ANNOUNCE_INTERVAL * 2 || status.last_active.elapsed() > DEFAULT_ANNOUNCE_INTERVAL * 5
|| status.last_event == Event::Stopped || status.last_event == Event::Stopped;
if cond {
n_purged += 1;
}
cond
}) })
.is_some() .is_some()
}); });
return n_purged;
} }
pub fn should_stop() -> bool { pub fn should_stop() -> bool {

View File

@ -115,7 +115,10 @@ impl UdpServer {
let n_bytes = match response.response { let n_bytes = match response.response {
UdpResponse::Connect(connect) => connect.write_to_buf(&mut buf), UdpResponse::Connect(connect) => connect.write_to_buf(&mut buf),
UdpResponse::Announce(announce) => match announce { UdpResponse::Announce(announce) => match announce {
AnnounceResponse::V4(v4) => v4.write_to_buf(&mut buf), AnnounceResponse::V4(v4) => {
log::debug!("{:?}", v4);
v4.write_to_buf(&mut buf)
}
AnnounceResponse::V6(v6) => v6.write_to_buf(&mut buf), AnnounceResponse::V6(v6) => v6.write_to_buf(&mut buf),
}, },
UdpResponse::Scrape(scrape) => scrape.write_to_buf(&mut buf), UdpResponse::Scrape(scrape) => scrape.write_to_buf(&mut buf),