implement tracker control messages channel & simplify scrape responses

This commit is contained in:
Adam 2025-08-06 20:47:06 +01:00
parent 07aaa8dc01
commit 2bb2cd5ce2
4 changed files with 95 additions and 44 deletions

View File

@ -7,7 +7,7 @@ edition = "2024"
[dependencies]
anyhow = "1.0.98"
flume = { version = "0.11.1", features = ["eventual-fairness"] }
flume = { version = "0.11.1", features = ["eventual-fairness", "select"] }
heapless = "0.8.0"
# log = { version = "...", features = ["release_max_level_info"] }
log = "0.4.27"

View File

@ -8,7 +8,8 @@ OPTIONS:
-A IPV6_ADDRESS Provide an IPv6 address to bind to
(example: -A [::1]:6970)
-h | --help Show this help text
-v | --version Print the program's build & version information
--debug Enable debug logging
--trace Enable trace logging (more verbose debug)";
--trace Enable trace logging (more verbose debug)
-h | --help Show this help text
-v | --version Print the program's build & version information";

View File

@ -1,5 +1,6 @@
use std::net::{SocketAddr, UdpSocket};
use std::thread;
use std::time::Duration;
use anyhow::Result;
use flume::Sender;
@ -13,7 +14,7 @@ use crate::udp_server;
pub struct Meteoro {}
#[repr(u8)]
pub enum ApplicationControlMessage {
pub enum TrackerControlMessage {
/// Signal that the receiver should exit their thread of execution
Exit = 0,
}
@ -28,7 +29,7 @@ impl Meteoro {
pub fn start(addrs: &[SocketAddr]) -> Result<()> {
// Create channels
let (ctrl_send, ctrl_recv) = flume::unbounded::<ApplicationControlMessage>();
let (ctrl_send, ctrl_recv) = flume::unbounded::<TrackerControlMessage>();
let (srv_ctrl_send, srv_ctrl_recv) = flume::unbounded::<ServerControlMessage>();
let (reqs_send, reqs_recv) = flume::unbounded::<RequestMessage>();
let resp_chans: Vec<_> = (0..addrs.len())
@ -86,12 +87,14 @@ impl Meteoro {
// - a request message channel (recv), for receiving requests from the server(s)
// - corresponding response channels (send), for providing handled requests
let resps: Vec<Sender<ResponseMessage>> =
resp_chans.iter().map(|(a, _)| a.clone()).collect();
let resps: Vec<Sender<ResponseMessage>> = resp_chans
.iter()
.map(|(resp_chan, _)| resp_chan.clone())
.collect();
thread::spawn(move || {
let mut tracker = Tracker::new();
let _ = tracker.start(reqs_recv, resps.as_slice());
let _ = tracker.start(reqs_recv, resps.as_slice(), ctrl_recv);
})
};

View File

@ -1,9 +1,10 @@
use std::collections::HashMap;
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6};
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use anyhow::Result;
use flume::{Receiver, Sender};
use flume::{Receiver, Selector, Sender};
use rand::Rng;
use rand::rngs::ThreadRng;
@ -13,6 +14,7 @@ use crate::bittorrent::udp::{
AnnounceResponse, AnnounceResponseCommon, AnnounceResponseV4, AnnounceResponseV6,
ConnectResponse, ScrapeResponse, ScrapeStats, UdpRequest, UdpResponse,
};
use crate::meteoro::TrackerControlMessage;
pub struct RequestMessage {
pub(crate) server_id: usize,
@ -67,17 +69,47 @@ impl Tracker {
&mut self,
reqs: Receiver<RequestMessage>,
resps: &[Sender<ResponseMessage>],
ctrl: Receiver<TrackerControlMessage>,
) -> Result<()> {
loop {
let request = reqs.recv()?;
let mut next_gc = Instant::now() + GARBAGE_COLLECTION_INTERVAL;
let should_exit = AtomicBool::new(false);
while !should_exit.load(Ordering::Relaxed) {
let _ = Selector::new()
.recv(&reqs, |request| {
let request = request.unwrap();
log::trace!("Request from {} | {:?}", request.src_addr, request.request);
match self.handle_request(&request) {
Some(response) => resps[request.server_id].send(response)?,
Some(response) => {
resps[request.server_id].send(response).unwrap();
}
None => (),
};
})
.recv(&ctrl, |msg| {
let msg = msg.unwrap();
match msg {
TrackerControlMessage::Exit => {
should_exit.store(true, Ordering::Relaxed);
}
};
})
.wait();
if Instant::now() > next_gc {
// Garbage collect
self.garbage_collect();
next_gc = Instant::now()
+ GARBAGE_COLLECTION_INTERVAL
+ Duration::from_millis(self.rng.random::<u64>() % 5000);
}
}
return Ok(());
}
fn connection_valid(&self, connection_id: i64, src_addr: SocketAddr) -> bool {
@ -281,36 +313,29 @@ impl Tracker {
let info_hashes = &self.info_hashes;
let mut scrape_stats: Vec<ScrapeStats> = Vec::new();
let scrape_stats: Vec<ScrapeStats> = scrape
.info_hashes
.iter()
.filter_map(|info_hash| {
if let Some(swarm) = info_hashes.get(info_hash) {
let n_seeders = swarm
.iter()
.filter(|status| {
status.last_event == Event::Completed || status.remaining == 0
})
.count();
let n_leechers = swarm.len() - n_seeders;
for info_hash in &scrape.info_hashes {
match info_hashes.get(info_hash) {
Some(peers) => {
scrape_stats.push(ScrapeStats {
seeders: peers
.iter()
.filter(|&peer_status| {
peer_status.last_event == Event::Completed
|| peer_status.remaining == 0
&& peer_status.socket_addr.is_ipv4()
== request.src_addr.is_ipv4()
Some(ScrapeStats {
seeders: n_seeders as i32,
completed: 0, // TODO
leechers: n_leechers as i32,
})
.count() as i32,
completed: 0, // TODO: keep track of this
leechers: peers
.iter()
.filter(|&peer_status| {
peer_status.last_event == Event::Started
|| peer_status.remaining > 0
&& peer_status.socket_addr.is_ipv4()
== request.src_addr.is_ipv4()
} else {
None
}
})
.count() as i32,
});
}
None => scrape_stats.push(ScrapeStats::default()),
};
}
.collect();
Some(ResponseMessage {
response: UdpResponse::Scrape(ScrapeResponse {
@ -323,6 +348,28 @@ impl Tracker {
};
}
fn garbage_collect(&mut self) {
let s1 = Instant::now();
let n_purged_conns = self.purge_expired_connections();
let d1 = s1.elapsed();
let s2 = Instant::now();
let n_purged_swarm_peers = self.purge_expired_swarm_peers();
let d2 = s2.elapsed();
log::debug!(
"Garbage collected {} expired connections (took: {} ns)",
n_purged_conns,
d1.as_nanos()
);
log::debug!(
"Garbage collected {} inactive peers from the swarm (took: {} ns)",
n_purged_swarm_peers,
d2.as_nanos()
);
log::debug!("Garbage collection took {} ns total", (d1 + d2).as_nanos());
}
fn purge_expired_connections(&mut self) -> usize {
let mut purged: usize = 0;