refactor announce request handling

This commit is contained in:
Adam 2025-08-06 21:29:53 +01:00
parent 2bb2cd5ce2
commit b07a2847d9

View File

@ -45,7 +45,7 @@ pub const GARBAGE_COLLECTION_INTERVAL: Duration = Duration::from_secs(20);
pub const CONNECTION_EXPIRE_TIME: Duration = Duration::from_secs(90); pub const CONNECTION_EXPIRE_TIME: Duration = Duration::from_secs(90);
pub const DEFAULT_ANNOUNCE_INTERVAL: Duration = Duration::from_secs(60); pub const DEFAULT_ANNOUNCE_INTERVAL: Duration = Duration::from_secs(60);
pub const DEFAULT_ANNOUNCE_WANT: u32 = 80; pub const DEFAULT_ANNOUNCE_WANT: usize = 80;
type ConnectionIdMap = HashMap<i64, PeerMetadata>; type ConnectionIdMap = HashMap<i64, PeerMetadata>;
type InfoHashMap = HashMap<InfoHash, Vec<PeerStatus>>; type InfoHashMap = HashMap<InfoHash, Vec<PeerStatus>>;
@ -122,7 +122,7 @@ impl Tracker {
} }
fn handle_request(&mut self, request: &RequestMessage) -> Option<ResponseMessage> { fn handle_request(&mut self, request: &RequestMessage) -> Option<ResponseMessage> {
return match &request.request { match &request.request {
UdpRequest::Connect(connect) => { UdpRequest::Connect(connect) => {
let new_id: i64 = self.rng.random(); let new_id: i64 = self.rng.random();
@ -152,9 +152,9 @@ impl Tracker {
// Ensure we honour the desired number of peers, within our boundaries // Ensure we honour the desired number of peers, within our boundaries
let n_announce_want: u32 = if let Some(n) = announce.num_want { let n_announce_want: usize = if let Some(n) = announce.num_want {
if n < DEFAULT_ANNOUNCE_WANT { if (n as usize) < DEFAULT_ANNOUNCE_WANT {
n n as usize
} else { } else {
DEFAULT_ANNOUNCE_WANT DEFAULT_ANNOUNCE_WANT
} }
@ -162,14 +162,11 @@ impl Tracker {
DEFAULT_ANNOUNCE_WANT DEFAULT_ANNOUNCE_WANT
}; };
let mut n_announce_entries: u32 = 0; let mut n_swarm_peers: usize = 0;
let mut n_seeders: u32 = 0; let mut n_seeders: usize = 0;
let mut v4_peers: Vec<SocketAddrV4> = Vec::new();
let mut v6_peers: Vec<SocketAddrV6> = Vec::new();
let info_hashes = &mut self.info_hashes; let info_hashes = &mut self.info_hashes;
match info_hashes.get_mut(&announce.info_hash) { let swarm_addrs = match info_hashes.get_mut(&announce.info_hash) {
None => { None => {
// Info hash isn't currently tracked // Info hash isn't currently tracked
// No relevant peers in the swarm // No relevant peers in the swarm
@ -185,8 +182,13 @@ impl Tracker {
remaining: announce.left as u64, remaining: announce.left as u64,
}], }],
); );
None
} }
Some(swarm) => { Some(swarm) => {
n_swarm_peers = swarm.len();
n_seeders = Self::count_seeders(swarm);
// Insert into swarm if not already present // Insert into swarm if not already present
// TODO: sort (?) // TODO: sort (?)
@ -222,60 +224,45 @@ impl Tracker {
} }
}; };
if announce.event == Event::Stopped { if let Some(idx) = existing_swarm_idx
if let Some(idx) = existing_swarm_idx { && announce.event == Event::Stopped
swarm.remove(idx); {
} swarm.remove(idx);
return None;
} }
// Iterate over all peers in the swarm for announce response // Iterate over all peers in the swarm for announce response
for peer_status in swarm { let swarm_addrs: Vec<SocketAddr> = swarm
// Respect number of peers requested .iter()
.filter_map(|status| {
let peer_invalid: bool = (status.last_event == Event::Stopped)
|| (status.socket_addr == request.src_addr);
if n_announce_entries >= n_announce_want { if !peer_invalid {
break; if !status.socket_addr.is_ipv4() == request.src_addr.is_ipv4() {
} None
} else {
// Don't provide useless peers Some(status.socket_addr)
// (peers who are no longer seeding or are from the source address)
let peer_invalid: bool = (peer_status.last_event == Event::Stopped)
|| (peer_status.socket_addr == request.src_addr);
if peer_invalid {
log::trace!(
"(src: {}) {} with status \"{:?}\" deemed invalid",
request.src_addr,
peer_status.socket_addr,
peer_status.last_event
);
continue;
}
// Collect and add the relevant peer's address to the appropriate vector
let is_seed: bool = peer_status.last_event == Event::Completed
|| peer_status.remaining == 0;
match &peer_status.socket_addr {
SocketAddr::V4(v4) => {
if request.src_addr.is_ipv4() {
v4_peers.push(*v4);
n_seeders += is_seed as u32;
} }
} } else {
SocketAddr::V6(v6) => { log::trace!(
if request.src_addr.is_ipv6() { "(src: {}) {} with status \"{:?}\" deemed invalid",
v6_peers.push(*v6); request.src_addr,
n_seeders += is_seed as u32; status.socket_addr,
} status.last_event
} );
};
// Keep track of the announce entries gathered None
}
})
.take(n_announce_want as usize)
.collect();
n_announce_entries += 1; if !swarm_addrs.is_empty() {
Some(swarm_addrs)
} else {
None
} }
} }
}; };
@ -283,20 +270,40 @@ impl Tracker {
let resp_common = AnnounceResponseCommon { let resp_common = AnnounceResponseCommon {
transaction_id: announce.transaction_id, transaction_id: announce.transaction_id,
interval: DEFAULT_ANNOUNCE_INTERVAL.as_secs() as i32, interval: DEFAULT_ANNOUNCE_INTERVAL.as_secs() as i32,
leechers: (n_announce_entries - n_seeders) as i32, leechers: (n_swarm_peers - n_seeders) as i32,
seeders: n_seeders as i32, seeders: n_seeders as i32,
}; };
let response = if request.src_addr.is_ipv4() { let response = if request.src_addr.is_ipv4() {
// IPv4 // IPv4
AnnounceResponse::V4(AnnounceResponseV4 { AnnounceResponse::V4(AnnounceResponseV4 {
common: resp_common, common: resp_common,
peers: v4_peers, peers: swarm_addrs.map_or_else(
|| Vec::new(),
|v| {
v.iter()
.filter_map(|a| match a {
SocketAddr::V4(v4) => Some(*v4),
_ => None,
})
.collect()
},
),
}) })
} else { } else {
// IPv6 // IPv6
AnnounceResponse::V6(AnnounceResponseV6 { AnnounceResponse::V6(AnnounceResponseV6 {
common: resp_common, common: resp_common,
peers: v6_peers, peers: swarm_addrs.map_or_else(
|| Vec::new(),
|v| {
v.iter()
.filter_map(|a| match a {
SocketAddr::V6(v6) => Some(*v6),
_ => None,
})
.collect()
},
),
}) })
}; };
@ -318,12 +325,7 @@ impl Tracker {
.iter() .iter()
.filter_map(|info_hash| { .filter_map(|info_hash| {
if let Some(swarm) = info_hashes.get(info_hash) { if let Some(swarm) = info_hashes.get(info_hash) {
let n_seeders = swarm let n_seeders = Self::count_seeders(swarm);
.iter()
.filter(|status| {
status.last_event == Event::Completed || status.remaining == 0
})
.count();
let n_leechers = swarm.len() - n_seeders; let n_leechers = swarm.len() - n_seeders;
Some(ScrapeStats { Some(ScrapeStats {
@ -345,7 +347,7 @@ impl Tracker {
dst_addr: request.src_addr, dst_addr: request.src_addr,
}) })
} }
}; }
} }
fn garbage_collect(&mut self) { fn garbage_collect(&mut self) {
@ -405,4 +407,11 @@ impl Tracker {
return n_purged; return n_purged;
} }
fn count_seeders(swarm: &[PeerStatus]) -> usize {
swarm
.iter()
.filter(|status| status.last_event == Event::Completed || status.remaining == 0)
.count()
}
} }