refactored overall tracker architecture, sans the garbage collector
This commit is contained in:
parent
861590a04e
commit
07aaa8dc01
26
Cargo.lock
generated
26
Cargo.lock
generated
@ -115,6 +115,12 @@ dependencies = [
|
|||||||
"stable_deref_trait",
|
"stable_deref_trait",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hermit-abi"
|
||||||
|
version = "0.5.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "itoa"
|
name = "itoa"
|
||||||
version = "1.0.15"
|
version = "1.0.15"
|
||||||
@ -163,6 +169,7 @@ dependencies = [
|
|||||||
"log",
|
"log",
|
||||||
"rand",
|
"rand",
|
||||||
"simplelog",
|
"simplelog",
|
||||||
|
"threadpool",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -180,6 +187,16 @@ version = "0.1.0"
|
|||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
|
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "num_cpus"
|
||||||
|
version = "1.17.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b"
|
||||||
|
dependencies = [
|
||||||
|
"hermit-abi",
|
||||||
|
"libc",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "num_threads"
|
name = "num_threads"
|
||||||
version = "0.1.7"
|
version = "0.1.7"
|
||||||
@ -335,6 +352,15 @@ dependencies = [
|
|||||||
"winapi-util",
|
"winapi-util",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "threadpool"
|
||||||
|
version = "1.8.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa"
|
||||||
|
dependencies = [
|
||||||
|
"num_cpus",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "time"
|
name = "time"
|
||||||
version = "0.3.41"
|
version = "0.3.41"
|
||||||
|
@ -1,5 +1,7 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "meteoro"
|
name = "meteoro"
|
||||||
|
description = "A UDP BitTorrent tracker"
|
||||||
|
authors = ["Adam Macdonald <adam@2khz.xyz>"]
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
edition = "2024"
|
edition = "2024"
|
||||||
|
|
||||||
@ -11,3 +13,4 @@ heapless = "0.8.0"
|
|||||||
log = "0.4.27"
|
log = "0.4.27"
|
||||||
rand = "0.9.2"
|
rand = "0.9.2"
|
||||||
simplelog = "0.12.2"
|
simplelog = "0.12.2"
|
||||||
|
threadpool = "1.8.1"
|
||||||
|
@ -9,7 +9,7 @@ pub struct PeerId {
|
|||||||
|
|
||||||
impl PeerId {
|
impl PeerId {
|
||||||
pub fn to_string_lossy(&self) -> String {
|
pub fn to_string_lossy(&self) -> String {
|
||||||
String::from_utf8_lossy(&self.bytes).to_string()
|
String::from_utf8_lossy(&self.bytes).into()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -1,10 +1,9 @@
|
|||||||
use std::{
|
use std::{
|
||||||
fmt,
|
|
||||||
io::{Cursor, Write},
|
io::{Cursor, Write},
|
||||||
net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6},
|
net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6},
|
||||||
};
|
};
|
||||||
|
|
||||||
use crate::tracker::bittorrent::{
|
use crate::bittorrent::{
|
||||||
info_hash::InfoHash,
|
info_hash::InfoHash,
|
||||||
peer::PeerId,
|
peer::PeerId,
|
||||||
protocol::{Action, Event},
|
protocol::{Action, Event},
|
||||||
@ -76,6 +75,7 @@ pub struct AnnounceRequest {
|
|||||||
pub port: u16,
|
pub port: u16,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub enum AnnounceResponse {
|
pub enum AnnounceResponse {
|
||||||
V4(AnnounceResponseV4),
|
V4(AnnounceResponseV4),
|
||||||
V6(AnnounceResponseV6),
|
V6(AnnounceResponseV6),
|
||||||
@ -94,13 +94,13 @@ pub struct AnnounceResponseCommon {
|
|||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct AnnounceResponseV4 {
|
pub struct AnnounceResponseV4 {
|
||||||
pub inner: AnnounceResponseCommon,
|
pub common: AnnounceResponseCommon,
|
||||||
pub peers: Vec<SocketAddrV4>,
|
pub peers: Vec<SocketAddrV4>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct AnnounceResponseV6 {
|
pub struct AnnounceResponseV6 {
|
||||||
pub inner: AnnounceResponseCommon,
|
pub common: AnnounceResponseCommon,
|
||||||
pub peers: Vec<SocketAddrV6>,
|
pub peers: Vec<SocketAddrV6>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,7 +119,7 @@ impl AnnounceResponseCommon {
|
|||||||
|
|
||||||
impl AnnounceResponseV4 {
|
impl AnnounceResponseV4 {
|
||||||
pub fn write_to_buf(&self, buf: &mut [u8]) -> usize {
|
pub fn write_to_buf(&self, buf: &mut [u8]) -> usize {
|
||||||
let written = self.inner.write_to_buf(buf);
|
let written = self.common.write_to_buf(buf);
|
||||||
|
|
||||||
for (offset, addr) in (written..buf.len())
|
for (offset, addr) in (written..buf.len())
|
||||||
.step_by(IPV4_ADDR_PAIR_SIZE)
|
.step_by(IPV4_ADDR_PAIR_SIZE)
|
||||||
@ -136,7 +136,7 @@ impl AnnounceResponseV4 {
|
|||||||
|
|
||||||
impl AnnounceResponseV6 {
|
impl AnnounceResponseV6 {
|
||||||
pub fn write_to_buf(&self, buf: &mut [u8]) -> usize {
|
pub fn write_to_buf(&self, buf: &mut [u8]) -> usize {
|
||||||
let written = self.inner.write_to_buf(buf);
|
let written = self.common.write_to_buf(buf);
|
||||||
|
|
||||||
for (offset, addr) in (written..buf.len())
|
for (offset, addr) in (written..buf.len())
|
||||||
.step_by(IPV6_ADDR_PAIR_SIZE)
|
.step_by(IPV6_ADDR_PAIR_SIZE)
|
||||||
@ -206,6 +206,7 @@ pub enum UdpRequest {
|
|||||||
Scrape(ScrapeRequest),
|
Scrape(ScrapeRequest),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub enum UdpResponse {
|
pub enum UdpResponse {
|
||||||
Connect(ConnectResponse),
|
Connect(ConnectResponse),
|
||||||
Announce(AnnounceResponse),
|
Announce(AnnounceResponse),
|
16
src/main.rs
16
src/main.rs
@ -7,9 +7,13 @@ use std::{
|
|||||||
use log::LevelFilter;
|
use log::LevelFilter;
|
||||||
use simplelog::{ColorChoice, ConfigBuilder, TermLogger, TerminalMode};
|
use simplelog::{ColorChoice, ConfigBuilder, TermLogger, TerminalMode};
|
||||||
|
|
||||||
|
use crate::meteoro::Meteoro;
|
||||||
|
|
||||||
|
mod bittorrent;
|
||||||
mod constants;
|
mod constants;
|
||||||
|
mod meteoro;
|
||||||
mod tracker;
|
mod tracker;
|
||||||
use tracker::Tracker;
|
mod udp_server;
|
||||||
|
|
||||||
fn main() -> ExitCode {
|
fn main() -> ExitCode {
|
||||||
let args: Vec<String> = env::args().collect();
|
let args: Vec<String> = env::args().collect();
|
||||||
@ -84,12 +88,10 @@ fn main() -> ExitCode {
|
|||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut tracker = Tracker::new();
|
// bufbuf():codingcodingbuf/
|
||||||
return match tracker.start_on(&ip_addrs) {
|
|
||||||
|
return match Meteoro::start(&ip_addrs) {
|
||||||
Ok(_) => ExitCode::SUCCESS,
|
Ok(_) => ExitCode::SUCCESS,
|
||||||
Err(e) => {
|
Err(_) => ExitCode::FAILURE,
|
||||||
log::error!("Fatal error: {}", e);
|
|
||||||
ExitCode::FAILURE
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
109
src/meteoro.rs
Normal file
109
src/meteoro.rs
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
use std::net::{SocketAddr, UdpSocket};
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use flume::Sender;
|
||||||
|
use threadpool::ThreadPool;
|
||||||
|
|
||||||
|
use crate::tracker::{self, RequestMessage, ResponseMessage, Tracker};
|
||||||
|
use crate::udp_server;
|
||||||
|
|
||||||
|
/// Instance of the meteoro BitTorrent tracker
|
||||||
|
/// Manages the tracker, networking & API
|
||||||
|
pub struct Meteoro {}
|
||||||
|
|
||||||
|
#[repr(u8)]
|
||||||
|
pub enum ApplicationControlMessage {
|
||||||
|
/// Signal that the receiver should exit their thread of execution
|
||||||
|
Exit = 0,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[repr(u8)]
|
||||||
|
pub enum ServerControlMessage {
|
||||||
|
/// Server with `(id, _)` has bound to a socket, providing a clone of the socket
|
||||||
|
UdpSocketBound((usize, UdpSocket)),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Meteoro {
|
||||||
|
pub fn start(addrs: &[SocketAddr]) -> Result<()> {
|
||||||
|
// Create channels
|
||||||
|
|
||||||
|
let (ctrl_send, ctrl_recv) = flume::unbounded::<ApplicationControlMessage>();
|
||||||
|
let (srv_ctrl_send, srv_ctrl_recv) = flume::unbounded::<ServerControlMessage>();
|
||||||
|
let (reqs_send, reqs_recv) = flume::unbounded::<RequestMessage>();
|
||||||
|
let resp_chans: Vec<_> = (0..addrs.len())
|
||||||
|
.map(|_| flume::unbounded::<ResponseMessage>())
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Start networking threads
|
||||||
|
|
||||||
|
{
|
||||||
|
// Each receiving networking thread requires:
|
||||||
|
// - a socket address to bind to
|
||||||
|
// - a control message channel (send), for providing the bound socket
|
||||||
|
// - a request message channel (send), for providing requests to the tracker
|
||||||
|
|
||||||
|
// Receive threads
|
||||||
|
|
||||||
|
let recv_threads = ThreadPool::new(addrs.len());
|
||||||
|
|
||||||
|
for (id, addr) in addrs.iter().enumerate() {
|
||||||
|
let ctrl = srv_ctrl_send.clone();
|
||||||
|
let reqs = reqs_send.clone();
|
||||||
|
let addr = *addr;
|
||||||
|
|
||||||
|
recv_threads.execute(move || {
|
||||||
|
let _ = udp_server::receive(id, addr, ctrl, reqs);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Each sending networking thread requires:
|
||||||
|
// - a socket to send responses on
|
||||||
|
// - a response message channel (receive), for receiving handled requests
|
||||||
|
|
||||||
|
// Send threads
|
||||||
|
|
||||||
|
let send_threads = ThreadPool::new(addrs.len());
|
||||||
|
|
||||||
|
for _ in 0..addrs.len() {
|
||||||
|
match srv_ctrl_recv.recv()? {
|
||||||
|
ServerControlMessage::UdpSocketBound((id, socket)) => {
|
||||||
|
let (_, resps) = resp_chans[id].clone();
|
||||||
|
|
||||||
|
send_threads.execute(move || {
|
||||||
|
let _ = udp_server::send(id, socket, resps);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start tracker thread
|
||||||
|
|
||||||
|
let tracker_thread = {
|
||||||
|
// Tracker requires:
|
||||||
|
// - a request message channel (recv), for receiving requests from the server(s)
|
||||||
|
// - corresponding response channels (send), for providing handled requests
|
||||||
|
|
||||||
|
let resps: Vec<Sender<ResponseMessage>> =
|
||||||
|
resp_chans.iter().map(|(a, _)| a.clone()).collect();
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
let mut tracker = Tracker::new();
|
||||||
|
let _ = tracker.start(reqs_recv, resps.as_slice());
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
// Start API thread
|
||||||
|
|
||||||
|
{
|
||||||
|
// API thread requires
|
||||||
|
// - a control message channel (recv)
|
||||||
|
}
|
||||||
|
|
||||||
|
let _ = tracker_thread.join();
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
361
src/tracker.rs
Normal file
361
src/tracker.rs
Normal file
@ -0,0 +1,361 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6};
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use flume::{Receiver, Sender};
|
||||||
|
use rand::Rng;
|
||||||
|
use rand::rngs::ThreadRng;
|
||||||
|
|
||||||
|
use crate::bittorrent::info_hash::InfoHash;
|
||||||
|
use crate::bittorrent::protocol::Event;
|
||||||
|
use crate::bittorrent::udp::{
|
||||||
|
AnnounceResponse, AnnounceResponseCommon, AnnounceResponseV4, AnnounceResponseV6,
|
||||||
|
ConnectResponse, ScrapeResponse, ScrapeStats, UdpRequest, UdpResponse,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct RequestMessage {
|
||||||
|
pub(crate) server_id: usize,
|
||||||
|
pub(crate) src_addr: SocketAddr,
|
||||||
|
pub(crate) request: UdpRequest,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct ResponseMessage {
|
||||||
|
pub(crate) response: UdpResponse,
|
||||||
|
pub(crate) dst_addr: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct PeerMetadata {
|
||||||
|
socket_addr: SocketAddr,
|
||||||
|
last_active: Instant,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct PeerStatus {
|
||||||
|
socket_addr: SocketAddr,
|
||||||
|
last_event: Event,
|
||||||
|
last_active: Instant,
|
||||||
|
downloaded: u64,
|
||||||
|
uploaded: u64,
|
||||||
|
remaining: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub const GARBAGE_COLLECTION_INTERVAL: Duration = Duration::from_secs(20);
|
||||||
|
|
||||||
|
pub const CONNECTION_EXPIRE_TIME: Duration = Duration::from_secs(90);
|
||||||
|
pub const DEFAULT_ANNOUNCE_INTERVAL: Duration = Duration::from_secs(60);
|
||||||
|
pub const DEFAULT_ANNOUNCE_WANT: u32 = 80;
|
||||||
|
|
||||||
|
type ConnectionIdMap = HashMap<i64, PeerMetadata>;
|
||||||
|
type InfoHashMap = HashMap<InfoHash, Vec<PeerStatus>>;
|
||||||
|
|
||||||
|
pub struct Tracker {
|
||||||
|
peers: ConnectionIdMap,
|
||||||
|
info_hashes: InfoHashMap,
|
||||||
|
rng: ThreadRng,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Tracker {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
peers: ConnectionIdMap::new(),
|
||||||
|
info_hashes: InfoHashMap::new(),
|
||||||
|
rng: rand::rng(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn start(
|
||||||
|
&mut self,
|
||||||
|
reqs: Receiver<RequestMessage>,
|
||||||
|
resps: &[Sender<ResponseMessage>],
|
||||||
|
) -> Result<()> {
|
||||||
|
loop {
|
||||||
|
let request = reqs.recv()?;
|
||||||
|
|
||||||
|
log::trace!("Request from {} | {:?}", request.src_addr, request.request);
|
||||||
|
|
||||||
|
match self.handle_request(&request) {
|
||||||
|
Some(response) => resps[request.server_id].send(response)?,
|
||||||
|
None => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn connection_valid(&self, connection_id: i64, src_addr: SocketAddr) -> bool {
|
||||||
|
let peers = &self.peers;
|
||||||
|
|
||||||
|
return match peers.get(&connection_id) {
|
||||||
|
Some(metadata) => metadata.socket_addr == src_addr,
|
||||||
|
None => false,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_request(&mut self, request: &RequestMessage) -> Option<ResponseMessage> {
|
||||||
|
return match &request.request {
|
||||||
|
UdpRequest::Connect(connect) => {
|
||||||
|
let new_id: i64 = self.rng.random();
|
||||||
|
|
||||||
|
let peers = &mut self.peers;
|
||||||
|
|
||||||
|
peers.insert(
|
||||||
|
new_id,
|
||||||
|
PeerMetadata {
|
||||||
|
socket_addr: request.src_addr,
|
||||||
|
last_active: Instant::now(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
Some(ResponseMessage {
|
||||||
|
response: UdpResponse::Connect(ConnectResponse {
|
||||||
|
transaction_id: connect.transaction_id,
|
||||||
|
connection_id: new_id,
|
||||||
|
}),
|
||||||
|
dst_addr: request.src_addr,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
UdpRequest::Announce(announce) => {
|
||||||
|
if !self.connection_valid(announce.connection_id, request.src_addr) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure we honour the desired number of peers, within our boundaries
|
||||||
|
|
||||||
|
let n_announce_want: u32 = if let Some(n) = announce.num_want {
|
||||||
|
if n < DEFAULT_ANNOUNCE_WANT {
|
||||||
|
n
|
||||||
|
} else {
|
||||||
|
DEFAULT_ANNOUNCE_WANT
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
DEFAULT_ANNOUNCE_WANT
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut n_announce_entries: u32 = 0;
|
||||||
|
let mut n_seeders: u32 = 0;
|
||||||
|
let mut v4_peers: Vec<SocketAddrV4> = Vec::new();
|
||||||
|
let mut v6_peers: Vec<SocketAddrV6> = Vec::new();
|
||||||
|
|
||||||
|
let info_hashes = &mut self.info_hashes;
|
||||||
|
|
||||||
|
match info_hashes.get_mut(&announce.info_hash) {
|
||||||
|
None => {
|
||||||
|
// Info hash isn't currently tracked
|
||||||
|
// No relevant peers in the swarm
|
||||||
|
|
||||||
|
info_hashes.insert(
|
||||||
|
announce.info_hash,
|
||||||
|
vec![PeerStatus {
|
||||||
|
socket_addr: request.src_addr,
|
||||||
|
last_event: announce.event,
|
||||||
|
last_active: Instant::now(),
|
||||||
|
downloaded: announce.downloaded as u64,
|
||||||
|
uploaded: announce.uploaded as u64,
|
||||||
|
remaining: announce.left as u64,
|
||||||
|
}],
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Some(swarm) => {
|
||||||
|
// Insert into swarm if not already present
|
||||||
|
// TODO: sort (?)
|
||||||
|
|
||||||
|
let existing_swarm_idx: Option<usize> = match swarm
|
||||||
|
.iter_mut()
|
||||||
|
.enumerate()
|
||||||
|
.find(|(_, status)| status.socket_addr == request.src_addr)
|
||||||
|
{
|
||||||
|
Some((idx, peer)) => {
|
||||||
|
// Peer already exists in swarm, update state
|
||||||
|
|
||||||
|
peer.last_active = Instant::now();
|
||||||
|
peer.last_event = announce.event;
|
||||||
|
peer.downloaded = announce.downloaded as u64;
|
||||||
|
peer.uploaded = announce.uploaded as u64;
|
||||||
|
peer.remaining = announce.left as u64;
|
||||||
|
|
||||||
|
Some(idx)
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
// Peer is not in swarm, add it
|
||||||
|
|
||||||
|
swarm.push(PeerStatus {
|
||||||
|
socket_addr: request.src_addr,
|
||||||
|
last_event: announce.event,
|
||||||
|
last_active: Instant::now(),
|
||||||
|
downloaded: announce.downloaded as u64,
|
||||||
|
uploaded: announce.uploaded as u64,
|
||||||
|
remaining: announce.left as u64,
|
||||||
|
});
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if announce.event == Event::Stopped {
|
||||||
|
if let Some(idx) = existing_swarm_idx {
|
||||||
|
swarm.remove(idx);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate over all peers in the swarm for announce response
|
||||||
|
|
||||||
|
for peer_status in swarm {
|
||||||
|
// Respect number of peers requested
|
||||||
|
|
||||||
|
if n_announce_entries >= n_announce_want {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Don't provide useless peers
|
||||||
|
// (peers who are no longer seeding or are from the source address)
|
||||||
|
|
||||||
|
let peer_invalid: bool = (peer_status.last_event == Event::Stopped)
|
||||||
|
|| (peer_status.socket_addr == request.src_addr);
|
||||||
|
|
||||||
|
if peer_invalid {
|
||||||
|
log::trace!(
|
||||||
|
"(src: {}) {} with status \"{:?}\" deemed invalid",
|
||||||
|
request.src_addr,
|
||||||
|
peer_status.socket_addr,
|
||||||
|
peer_status.last_event
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect and add the relevant peer's address to the appropriate vector
|
||||||
|
|
||||||
|
let is_seed: bool = peer_status.last_event == Event::Completed
|
||||||
|
|| peer_status.remaining == 0;
|
||||||
|
|
||||||
|
match &peer_status.socket_addr {
|
||||||
|
SocketAddr::V4(v4) => {
|
||||||
|
if request.src_addr.is_ipv4() {
|
||||||
|
v4_peers.push(*v4);
|
||||||
|
n_seeders += is_seed as u32;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SocketAddr::V6(v6) => {
|
||||||
|
if request.src_addr.is_ipv6() {
|
||||||
|
v6_peers.push(*v6);
|
||||||
|
n_seeders += is_seed as u32;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Keep track of the announce entries gathered
|
||||||
|
|
||||||
|
n_announce_entries += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let resp_common = AnnounceResponseCommon {
|
||||||
|
transaction_id: announce.transaction_id,
|
||||||
|
interval: DEFAULT_ANNOUNCE_INTERVAL.as_secs() as i32,
|
||||||
|
leechers: (n_announce_entries - n_seeders) as i32,
|
||||||
|
seeders: n_seeders as i32,
|
||||||
|
};
|
||||||
|
let response = if request.src_addr.is_ipv4() {
|
||||||
|
// IPv4
|
||||||
|
AnnounceResponse::V4(AnnounceResponseV4 {
|
||||||
|
common: resp_common,
|
||||||
|
peers: v4_peers,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
// IPv6
|
||||||
|
AnnounceResponse::V6(AnnounceResponseV6 {
|
||||||
|
common: resp_common,
|
||||||
|
peers: v6_peers,
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
Some(ResponseMessage {
|
||||||
|
response: UdpResponse::Announce(response),
|
||||||
|
dst_addr: request.src_addr,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
UdpRequest::Scrape(scrape) => {
|
||||||
|
if !self.connection_valid(scrape.connection_id, request.src_addr) {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let info_hashes = &self.info_hashes;
|
||||||
|
|
||||||
|
let mut scrape_stats: Vec<ScrapeStats> = Vec::new();
|
||||||
|
|
||||||
|
for info_hash in &scrape.info_hashes {
|
||||||
|
match info_hashes.get(info_hash) {
|
||||||
|
Some(peers) => {
|
||||||
|
scrape_stats.push(ScrapeStats {
|
||||||
|
seeders: peers
|
||||||
|
.iter()
|
||||||
|
.filter(|&peer_status| {
|
||||||
|
peer_status.last_event == Event::Completed
|
||||||
|
|| peer_status.remaining == 0
|
||||||
|
&& peer_status.socket_addr.is_ipv4()
|
||||||
|
== request.src_addr.is_ipv4()
|
||||||
|
})
|
||||||
|
.count() as i32,
|
||||||
|
completed: 0, // TODO: keep track of this
|
||||||
|
leechers: peers
|
||||||
|
.iter()
|
||||||
|
.filter(|&peer_status| {
|
||||||
|
peer_status.last_event == Event::Started
|
||||||
|
|| peer_status.remaining > 0
|
||||||
|
&& peer_status.socket_addr.is_ipv4()
|
||||||
|
== request.src_addr.is_ipv4()
|
||||||
|
})
|
||||||
|
.count() as i32,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
None => scrape_stats.push(ScrapeStats::default()),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(ResponseMessage {
|
||||||
|
response: UdpResponse::Scrape(ScrapeResponse {
|
||||||
|
transaction_id: scrape.transaction_id,
|
||||||
|
stats: scrape_stats,
|
||||||
|
}),
|
||||||
|
dst_addr: request.src_addr,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
fn purge_expired_connections(&mut self) -> usize {
|
||||||
|
let mut purged: usize = 0;
|
||||||
|
|
||||||
|
self.peers.retain(|_, metadata| {
|
||||||
|
if metadata.last_active.elapsed() < CONNECTION_EXPIRE_TIME {
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
purged += 1;
|
||||||
|
false
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return purged;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn purge_expired_swarm_peers(&mut self) -> usize {
|
||||||
|
let mut n_purged: usize = 0;
|
||||||
|
|
||||||
|
self.info_hashes.iter_mut().for_each(|(_, b)| {
|
||||||
|
b.retain(|status| {
|
||||||
|
if status.last_active.elapsed() < DEFAULT_ANNOUNCE_INTERVAL * 5
|
||||||
|
&& status.last_event != Event::Stopped
|
||||||
|
{
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
n_purged += 1;
|
||||||
|
false
|
||||||
|
}
|
||||||
|
})
|
||||||
|
});
|
||||||
|
|
||||||
|
self.info_hashes.retain(|_, swarm| !swarm.is_empty());
|
||||||
|
|
||||||
|
return n_purged;
|
||||||
|
}
|
||||||
|
}
|
@ -1,5 +0,0 @@
|
|||||||
mod bittorrent;
|
|
||||||
mod tracker;
|
|
||||||
mod udp_server;
|
|
||||||
|
|
||||||
pub use tracker::Tracker;
|
|
@ -1,471 +0,0 @@
|
|||||||
use std::collections::HashMap;
|
|
||||||
use std::net::{SocketAddr, SocketAddrV4, SocketAddrV6};
|
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
|
||||||
use std::sync::{Arc, RwLock};
|
|
||||||
use std::thread;
|
|
||||||
use std::time::{Duration, Instant};
|
|
||||||
|
|
||||||
use anyhow::Result;
|
|
||||||
use flume::Sender;
|
|
||||||
use rand::Rng;
|
|
||||||
|
|
||||||
use crate::tracker::bittorrent::info_hash::InfoHash;
|
|
||||||
use crate::tracker::bittorrent::protocol::Event;
|
|
||||||
use crate::tracker::bittorrent::udp::{
|
|
||||||
AnnounceResponse, AnnounceResponseCommon, AnnounceResponseV4, AnnounceResponseV6,
|
|
||||||
ConnectResponse, ScrapeResponse, ScrapeStats, UdpRequest, UdpResponse,
|
|
||||||
};
|
|
||||||
use crate::tracker::udp_server::UdpServer;
|
|
||||||
|
|
||||||
pub struct RequestMessage {
|
|
||||||
pub(crate) request: UdpRequest,
|
|
||||||
pub(crate) src_addr: SocketAddr,
|
|
||||||
pub(crate) server_id: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct ResponseMessage {
|
|
||||||
pub(crate) response: UdpResponse,
|
|
||||||
pub(crate) dst_addr: SocketAddr,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct PeerMetadata {
|
|
||||||
socket_addr: SocketAddr,
|
|
||||||
last_active: Instant,
|
|
||||||
}
|
|
||||||
|
|
||||||
struct PeerStatus {
|
|
||||||
socket_addr: SocketAddr,
|
|
||||||
last_event: Event,
|
|
||||||
last_active: Instant,
|
|
||||||
downloaded: u64,
|
|
||||||
uploaded: u64,
|
|
||||||
remaining: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub const GARBAGE_COLLECTION_INTERVAL: Duration = Duration::from_secs(20);
|
|
||||||
|
|
||||||
pub const CONNECTION_EXPIRE_TIME: Duration = Duration::from_secs(90);
|
|
||||||
pub const DEFAULT_ANNOUNCE_INTERVAL: Duration = Duration::from_secs(60);
|
|
||||||
pub const DEFAULT_ANNOUNCE_WANT: u32 = 80;
|
|
||||||
|
|
||||||
type ConnectionIdMap = HashMap<i64, PeerMetadata>;
|
|
||||||
type InfoHashMap = HashMap<InfoHash, Vec<PeerStatus>>;
|
|
||||||
|
|
||||||
pub static SHOULD_STOP_SIGNAL: AtomicBool = AtomicBool::new(false);
|
|
||||||
|
|
||||||
pub struct Tracker {
|
|
||||||
server_threads: Vec<thread::JoinHandle<()>>,
|
|
||||||
response_channels: Vec<Sender<ResponseMessage>>,
|
|
||||||
/// Map of IPs & ports to peer connection IDs
|
|
||||||
peers: Arc<RwLock<ConnectionIdMap>>,
|
|
||||||
info_hashes: Arc<RwLock<InfoHashMap>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Tracker {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Tracker {
|
|
||||||
server_threads: Vec::new(),
|
|
||||||
response_channels: Vec::new(),
|
|
||||||
peers: Arc::new(RwLock::new(HashMap::new())),
|
|
||||||
info_hashes: Arc::new(RwLock::new(HashMap::new())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn start_on(&mut self, addrs: &[SocketAddr]) -> Result<()> {
|
|
||||||
if addrs.is_empty() {
|
|
||||||
log::warn!("No addresses provided for tracker to listen on");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Only one request channel set is necessary as all the server threads send on the same one and the tracker is the only reader
|
|
||||||
let (req_send, req_recv) = flume::unbounded::<RequestMessage>();
|
|
||||||
|
|
||||||
let mut server_id: u32 = 0;
|
|
||||||
|
|
||||||
for a in addrs {
|
|
||||||
// Each UDP server gets its own response channels
|
|
||||||
let (resp_send, resp_recv) = flume::unbounded::<ResponseMessage>();
|
|
||||||
let req_chan = req_send.clone();
|
|
||||||
let a = a.clone();
|
|
||||||
|
|
||||||
self.server_threads.push(thread::spawn(move || {
|
|
||||||
let server = UdpServer::new(a, server_id);
|
|
||||||
|
|
||||||
match server.start(req_chan.clone(), resp_recv.clone()) {
|
|
||||||
Err(e) => {
|
|
||||||
log::error!("Failed to start server {}: {}", server_id, e)
|
|
||||||
}
|
|
||||||
_ => (),
|
|
||||||
};
|
|
||||||
|
|
||||||
return;
|
|
||||||
}));
|
|
||||||
|
|
||||||
self.response_channels.push(resp_send);
|
|
||||||
|
|
||||||
server_id += 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
let gc_peers = self.peers.clone();
|
|
||||||
let gc_info_hashes = self.info_hashes.clone();
|
|
||||||
let _gc_thread = thread::spawn(move || {
|
|
||||||
let peers = gc_peers;
|
|
||||||
let info_hashes = gc_info_hashes;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
thread::sleep(GARBAGE_COLLECTION_INTERVAL);
|
|
||||||
|
|
||||||
let s1 = Instant::now();
|
|
||||||
|
|
||||||
let expired = Tracker::purge_expired_connections(&mut peers.write().unwrap());
|
|
||||||
|
|
||||||
let d1 = s1.elapsed();
|
|
||||||
let s2 = Instant::now();
|
|
||||||
|
|
||||||
let purged = Tracker::purge_expired_swarm_peers(&mut info_hashes.write().unwrap());
|
|
||||||
|
|
||||||
let d2 = s2.elapsed();
|
|
||||||
|
|
||||||
log::debug!(
|
|
||||||
"Garbage collected {} expired connections (took: {} ns)",
|
|
||||||
expired,
|
|
||||||
d1.as_nanos()
|
|
||||||
);
|
|
||||||
log::debug!(
|
|
||||||
"Garbage collected {} inactive peers from the swarm (took: {} ns)",
|
|
||||||
purged,
|
|
||||||
d2.as_nanos()
|
|
||||||
);
|
|
||||||
log::debug!("Garbage collection took {} ns total", (d1 + d2).as_nanos());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut rng = rand::rng();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let request = match req_recv.recv() {
|
|
||||||
Ok(r) => r,
|
|
||||||
_ => {
|
|
||||||
log::error!("Internal tracker error: server request channel closed");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
log::trace!("Request from {} | {:?}", request.src_addr, request.request);
|
|
||||||
|
|
||||||
match request.request {
|
|
||||||
UdpRequest::Connect(connect) => {
|
|
||||||
let new_id: i64 = rng.random();
|
|
||||||
|
|
||||||
let mut peers = self.peers.write().unwrap();
|
|
||||||
|
|
||||||
peers.insert(
|
|
||||||
new_id,
|
|
||||||
PeerMetadata {
|
|
||||||
socket_addr: request.src_addr,
|
|
||||||
last_active: Instant::now(),
|
|
||||||
},
|
|
||||||
);
|
|
||||||
|
|
||||||
match self.response_channels[request.server_id as usize].send(ResponseMessage {
|
|
||||||
response: UdpResponse::Connect(ConnectResponse {
|
|
||||||
transaction_id: connect.transaction_id,
|
|
||||||
connection_id: new_id,
|
|
||||||
}),
|
|
||||||
dst_addr: request.src_addr,
|
|
||||||
}) {
|
|
||||||
Ok(r) => r,
|
|
||||||
_ => {
|
|
||||||
log::error!("Internal tracker error: server response channel closed");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
UdpRequest::Announce(announce) => {
|
|
||||||
if !self.connection_valid(announce.connection_id, request.src_addr) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure we honour the desired number of peers, within our boundaries
|
|
||||||
|
|
||||||
let n_announce_want: u32 = if let Some(n) = announce.num_want {
|
|
||||||
if n < DEFAULT_ANNOUNCE_WANT {
|
|
||||||
n
|
|
||||||
} else {
|
|
||||||
DEFAULT_ANNOUNCE_WANT
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
DEFAULT_ANNOUNCE_WANT
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut n_announce_entries: u32 = 0;
|
|
||||||
let mut n_seeders: u32 = 0;
|
|
||||||
let mut v4_peers: Vec<SocketAddrV4> = Vec::new();
|
|
||||||
let mut v6_peers: Vec<SocketAddrV6> = Vec::new();
|
|
||||||
|
|
||||||
let mut info_hashes = self.info_hashes.write().unwrap();
|
|
||||||
|
|
||||||
match info_hashes.get_mut(&announce.info_hash) {
|
|
||||||
None => {
|
|
||||||
// Info hash isn't currently tracked
|
|
||||||
// No relevant peers in the swarm
|
|
||||||
|
|
||||||
info_hashes.insert(
|
|
||||||
announce.info_hash,
|
|
||||||
vec![PeerStatus {
|
|
||||||
socket_addr: request.src_addr,
|
|
||||||
last_event: announce.event,
|
|
||||||
last_active: Instant::now(),
|
|
||||||
downloaded: announce.downloaded as u64,
|
|
||||||
uploaded: announce.uploaded as u64,
|
|
||||||
remaining: announce.left as u64,
|
|
||||||
}],
|
|
||||||
);
|
|
||||||
}
|
|
||||||
Some(swarm) => {
|
|
||||||
// Insert into swarm if not already present
|
|
||||||
// TODO: sort (?)
|
|
||||||
|
|
||||||
let existing_swarm_idx: Option<usize> = match swarm
|
|
||||||
.iter_mut()
|
|
||||||
.enumerate()
|
|
||||||
.find(|(_, status)| status.socket_addr == request.src_addr)
|
|
||||||
{
|
|
||||||
Some((idx, peer)) => {
|
|
||||||
// Peer already exists in swarm, update state
|
|
||||||
|
|
||||||
peer.last_active = Instant::now();
|
|
||||||
peer.last_event = announce.event;
|
|
||||||
peer.downloaded = announce.downloaded as u64;
|
|
||||||
peer.uploaded = announce.uploaded as u64;
|
|
||||||
peer.remaining = announce.left as u64;
|
|
||||||
|
|
||||||
Some(idx)
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
// Peer is not in swarm, add it
|
|
||||||
|
|
||||||
swarm.push(PeerStatus {
|
|
||||||
socket_addr: request.src_addr,
|
|
||||||
last_event: announce.event,
|
|
||||||
last_active: Instant::now(),
|
|
||||||
downloaded: announce.downloaded as u64,
|
|
||||||
uploaded: announce.uploaded as u64,
|
|
||||||
remaining: announce.left as u64,
|
|
||||||
});
|
|
||||||
|
|
||||||
None
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if announce.event == Event::Stopped {
|
|
||||||
if let Some(idx) = existing_swarm_idx {
|
|
||||||
swarm.remove(idx);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Iterate over all peers in the swarm for announce response
|
|
||||||
|
|
||||||
for peer_status in swarm {
|
|
||||||
// Respect number of peers requested
|
|
||||||
|
|
||||||
if n_announce_entries >= n_announce_want {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Don't provide useless peers
|
|
||||||
|
|
||||||
let peer_invalid: bool = (peer_status.last_event == Event::Stopped)
|
|
||||||
|| (peer_status.socket_addr == request.src_addr);
|
|
||||||
|
|
||||||
if peer_invalid {
|
|
||||||
log::trace!(
|
|
||||||
"(src: {}) {} -> {:?} deemed invalid",
|
|
||||||
request.src_addr,
|
|
||||||
peer_status.socket_addr,
|
|
||||||
peer_status.last_event
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect and add the relevant peer's address to the appropriate vector
|
|
||||||
|
|
||||||
let is_seed: bool = peer_status.last_event == Event::Completed
|
|
||||||
|| peer_status.remaining == 0;
|
|
||||||
|
|
||||||
match &peer_status.socket_addr {
|
|
||||||
SocketAddr::V4(v4) => {
|
|
||||||
if request.src_addr.is_ipv4() {
|
|
||||||
v4_peers.push(*v4);
|
|
||||||
n_seeders += is_seed as u32;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
SocketAddr::V6(v6) => {
|
|
||||||
if request.src_addr.is_ipv6() {
|
|
||||||
v6_peers.push(*v6);
|
|
||||||
n_seeders += is_seed as u32;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// Keep track of the announce entries gathered
|
|
||||||
|
|
||||||
n_announce_entries += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
log::trace!(
|
|
||||||
"(src: {}) IPv4s: {}, IPv6s: {}, leechers: {}, seeders: {}",
|
|
||||||
request.src_addr,
|
|
||||||
v4_peers.len(),
|
|
||||||
v6_peers.len(),
|
|
||||||
n_announce_entries - n_seeders,
|
|
||||||
n_seeders
|
|
||||||
);
|
|
||||||
|
|
||||||
let resp_common = AnnounceResponseCommon {
|
|
||||||
transaction_id: announce.transaction_id,
|
|
||||||
interval: DEFAULT_ANNOUNCE_INTERVAL.as_secs() as i32,
|
|
||||||
leechers: (n_announce_entries - n_seeders) as i32,
|
|
||||||
seeders: n_seeders as i32,
|
|
||||||
};
|
|
||||||
let response = if request.src_addr.is_ipv4() {
|
|
||||||
// IPv4
|
|
||||||
AnnounceResponse::V4(AnnounceResponseV4 {
|
|
||||||
inner: resp_common,
|
|
||||||
peers: v4_peers,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
// IPv6
|
|
||||||
AnnounceResponse::V6(AnnounceResponseV6 {
|
|
||||||
inner: resp_common,
|
|
||||||
peers: v6_peers,
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
match self.response_channels[request.server_id as usize].send(ResponseMessage {
|
|
||||||
response: UdpResponse::Announce(response),
|
|
||||||
dst_addr: request.src_addr,
|
|
||||||
}) {
|
|
||||||
Ok(r) => r,
|
|
||||||
_ => {
|
|
||||||
log::error!("Internal tracker error: server response channel closed");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
UdpRequest::Scrape(scrape) => {
|
|
||||||
if !self.connection_valid(scrape.connection_id, request.src_addr) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
let info_hashes = self.info_hashes.read().unwrap();
|
|
||||||
|
|
||||||
let mut scrape_stats: Vec<ScrapeStats> = Vec::new();
|
|
||||||
for info_hash in &scrape.info_hashes {
|
|
||||||
match info_hashes.get(info_hash) {
|
|
||||||
Some(peers) => {
|
|
||||||
scrape_stats.push(ScrapeStats {
|
|
||||||
seeders: peers
|
|
||||||
.iter()
|
|
||||||
.filter(|&peer_status| {
|
|
||||||
peer_status.last_event == Event::Completed
|
|
||||||
|| peer_status.remaining == 0
|
|
||||||
&& peer_status.socket_addr.is_ipv4()
|
|
||||||
== request.src_addr.is_ipv4()
|
|
||||||
})
|
|
||||||
.count()
|
|
||||||
as i32,
|
|
||||||
completed: 0, // TODO: keep track of this
|
|
||||||
leechers: peers
|
|
||||||
.iter()
|
|
||||||
.filter(|&peer_status| {
|
|
||||||
peer_status.last_event == Event::Started
|
|
||||||
|| peer_status.remaining > 0
|
|
||||||
&& peer_status.socket_addr.is_ipv4()
|
|
||||||
== request.src_addr.is_ipv4()
|
|
||||||
})
|
|
||||||
.count()
|
|
||||||
as i32,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
None => scrape_stats.push(ScrapeStats::default()),
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
match self.response_channels[request.server_id as usize].send(ResponseMessage {
|
|
||||||
response: UdpResponse::Scrape(ScrapeResponse {
|
|
||||||
transaction_id: scrape.transaction_id,
|
|
||||||
stats: scrape_stats,
|
|
||||||
}),
|
|
||||||
dst_addr: request.src_addr,
|
|
||||||
}) {
|
|
||||||
Ok(r) => r,
|
|
||||||
_ => {
|
|
||||||
log::error!("Internal tracker error: server response channel closed");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
self.server_threads.drain(..).for_each(|t| {
|
|
||||||
let _ = t.join();
|
|
||||||
});
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
fn connection_valid(&self, connection_id: i64, src_addr: SocketAddr) -> bool {
|
|
||||||
let peers = self.peers.read().unwrap();
|
|
||||||
|
|
||||||
return match peers.get(&connection_id) {
|
|
||||||
Some(metadata) => metadata.socket_addr == src_addr,
|
|
||||||
None => false,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
fn purge_expired_connections(peers: &mut ConnectionIdMap) -> usize {
|
|
||||||
let mut purged: usize = 0;
|
|
||||||
|
|
||||||
peers.retain(|_, metadata| {
|
|
||||||
if metadata.last_active.elapsed() < CONNECTION_EXPIRE_TIME {
|
|
||||||
true
|
|
||||||
} else {
|
|
||||||
purged += 1;
|
|
||||||
false
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
return purged;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn purge_expired_swarm_peers(info_hashes: &mut InfoHashMap) -> usize {
|
|
||||||
let mut n_purged: usize = 0;
|
|
||||||
|
|
||||||
info_hashes.iter_mut().for_each(|(_, b)| {
|
|
||||||
b.retain(|status| {
|
|
||||||
if status.last_active.elapsed() < DEFAULT_ANNOUNCE_INTERVAL * 5
|
|
||||||
&& status.last_event != Event::Stopped
|
|
||||||
{
|
|
||||||
true
|
|
||||||
} else {
|
|
||||||
n_purged += 1;
|
|
||||||
false
|
|
||||||
}
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
info_hashes.retain(|_, swarm| !swarm.is_empty());
|
|
||||||
|
|
||||||
return n_purged;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn should_stop() -> bool {
|
|
||||||
SHOULD_STOP_SIGNAL.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,254 +0,0 @@
|
|||||||
use std::net::{Ipv4Addr, SocketAddr, UdpSocket};
|
|
||||||
use std::thread;
|
|
||||||
|
|
||||||
use anyhow::Result;
|
|
||||||
use flume::{Receiver, Sender};
|
|
||||||
use log;
|
|
||||||
|
|
||||||
use crate::tracker::Tracker;
|
|
||||||
use crate::tracker::bittorrent::info_hash::InfoHash;
|
|
||||||
use crate::tracker::bittorrent::peer::PeerId;
|
|
||||||
use crate::tracker::bittorrent::protocol::Event;
|
|
||||||
use crate::tracker::bittorrent::udp::{
|
|
||||||
AnnounceRequest, AnnounceResponse, MIN_ANNOUNCE_REQUEST_SIZE, MIN_SCRAPE_REQUEST_SIZE,
|
|
||||||
ScrapeRequest, UdpResponse,
|
|
||||||
};
|
|
||||||
use crate::tracker::{
|
|
||||||
bittorrent::{
|
|
||||||
protocol::{Action, UDP_MAGIC},
|
|
||||||
udp::{ConnectRequest, UdpRequest},
|
|
||||||
},
|
|
||||||
tracker::{RequestMessage, ResponseMessage},
|
|
||||||
};
|
|
||||||
|
|
||||||
pub struct UdpServer {
|
|
||||||
addr: SocketAddr,
|
|
||||||
server_id: u32,
|
|
||||||
}
|
|
||||||
|
|
||||||
const UDP_RECV_BUF_SIZE: usize = u16::MAX as usize;
|
|
||||||
const UDP_SEND_BUF_SIZE: usize = u16::MAX as usize; // Could also be 1500 for typical Ethernet MTU (?)
|
|
||||||
|
|
||||||
impl UdpServer {
|
|
||||||
pub fn new(addr: SocketAddr, id: u32) -> Self {
|
|
||||||
UdpServer {
|
|
||||||
addr: addr,
|
|
||||||
server_id: id,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn start(
|
|
||||||
&self,
|
|
||||||
req_chan: Sender<RequestMessage>,
|
|
||||||
resp_chan: Receiver<ResponseMessage>,
|
|
||||||
) -> Result<()> {
|
|
||||||
let recv_socket = UdpSocket::bind(self.addr)?;
|
|
||||||
let send_socket = recv_socket.try_clone()?;
|
|
||||||
let local_addr = recv_socket.local_addr()?;
|
|
||||||
let id = self.server_id;
|
|
||||||
|
|
||||||
log::info!("Starting UDP server on: {:?}", local_addr);
|
|
||||||
|
|
||||||
let recv_thread = thread::spawn(move || {
|
|
||||||
let mut buf: [u8; UDP_RECV_BUF_SIZE] = [0; UDP_RECV_BUF_SIZE];
|
|
||||||
|
|
||||||
loop {
|
|
||||||
if Tracker::should_stop() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
match recv_socket.recv_from(&mut buf) {
|
|
||||||
Ok((n_bytes, src)) => {
|
|
||||||
log::trace!("Received {} bytes from {}", n_bytes, src);
|
|
||||||
|
|
||||||
match try_parse_packet(&buf[..n_bytes]) {
|
|
||||||
Some(req) => {
|
|
||||||
if req_chan
|
|
||||||
.send(RequestMessage {
|
|
||||||
request: req,
|
|
||||||
src_addr: src,
|
|
||||||
server_id: id,
|
|
||||||
})
|
|
||||||
.is_err()
|
|
||||||
{
|
|
||||||
log::error!(
|
|
||||||
"Internal tracker error: server {} request channel closed",
|
|
||||||
id
|
|
||||||
);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None => continue,
|
|
||||||
};
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
log::error!("Failed to receive on socket: {}", error);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log::debug!("Server {} receive thread shutting exiting ...", id)
|
|
||||||
});
|
|
||||||
|
|
||||||
let send_thread = thread::spawn(move || {
|
|
||||||
let mut buf: [u8; UDP_SEND_BUF_SIZE] = [0; UDP_SEND_BUF_SIZE];
|
|
||||||
|
|
||||||
loop {
|
|
||||||
if Tracker::should_stop() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
let response = match resp_chan.recv() {
|
|
||||||
Ok(r) => r,
|
|
||||||
_ => {
|
|
||||||
log::error!(
|
|
||||||
"Internal tracker error: server {} response channel closed",
|
|
||||||
id
|
|
||||||
);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let n_bytes = match response.response {
|
|
||||||
UdpResponse::Connect(connect) => connect.write_to_buf(&mut buf),
|
|
||||||
UdpResponse::Announce(announce) => match announce {
|
|
||||||
AnnounceResponse::V4(v4) => v4.write_to_buf(&mut buf),
|
|
||||||
AnnounceResponse::V6(v6) => v6.write_to_buf(&mut buf),
|
|
||||||
},
|
|
||||||
UdpResponse::Scrape(scrape) => scrape.write_to_buf(&mut buf),
|
|
||||||
UdpResponse::Error(_) => 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
let n_sent = match send_socket.send_to(&buf[..n_bytes], response.dst_addr) {
|
|
||||||
Ok(n) => n,
|
|
||||||
Err(e) => {
|
|
||||||
log::error!(
|
|
||||||
"Failed to send {} bytes to {}: {}",
|
|
||||||
n_bytes,
|
|
||||||
response.dst_addr,
|
|
||||||
e
|
|
||||||
);
|
|
||||||
0
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
log::trace!(
|
|
||||||
"Sent {} bytes to {}: {:x?}",
|
|
||||||
n_sent,
|
|
||||||
response.dst_addr,
|
|
||||||
&buf[..n_bytes]
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
log::debug!("Server {} send thread shutting exiting ...", id)
|
|
||||||
});
|
|
||||||
|
|
||||||
let _ = recv_thread.join();
|
|
||||||
let _ = send_thread.join();
|
|
||||||
|
|
||||||
log::info!("Stopped UDP server on: {:?}", local_addr);
|
|
||||||
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn try_parse_packet(buf: &[u8]) -> Option<UdpRequest> {
|
|
||||||
if buf.len() < 16 {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
let action: Action = Action::from_i32(i32::from_be_bytes(buf[8..12].try_into().ok()?))?;
|
|
||||||
|
|
||||||
let request = match action {
|
|
||||||
Action::Connect => UdpRequest::Connect(try_parse_connect(&buf)?),
|
|
||||||
Action::Announce => UdpRequest::Announce(try_parse_announce(&buf)?),
|
|
||||||
Action::Scrape => UdpRequest::Scrape(try_parse_scrape(&buf)?),
|
|
||||||
_ => return None,
|
|
||||||
};
|
|
||||||
|
|
||||||
return Some(request);
|
|
||||||
}
|
|
||||||
|
|
||||||
fn try_parse_connect(buf: &[u8]) -> Option<ConnectRequest> {
|
|
||||||
// Buffer length is checked to be at least 16 in `try_parse_packet`
|
|
||||||
|
|
||||||
let protocol_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?);
|
|
||||||
|
|
||||||
if protocol_id != UDP_MAGIC {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(ConnectRequest {
|
|
||||||
protocol_id: protocol_id,
|
|
||||||
transaction_id: i32::from_be_bytes(buf[12..16].try_into().ok()?),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn try_parse_announce(buf: &[u8]) -> Option<AnnounceRequest> {
|
|
||||||
if buf.len() < MIN_ANNOUNCE_REQUEST_SIZE {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
let connection_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?);
|
|
||||||
let transaction_id: i32 = i32::from_be_bytes(buf[12..16].try_into().ok()?);
|
|
||||||
let info_hash: InfoHash = InfoHash {
|
|
||||||
bytes: buf[16..36].try_into().ok()?,
|
|
||||||
};
|
|
||||||
let peer_id: PeerId = PeerId {
|
|
||||||
bytes: buf[36..56].try_into().unwrap_or_default(),
|
|
||||||
};
|
|
||||||
let downloaded: i64 = i64::from_be_bytes(buf[56..64].try_into().ok()?);
|
|
||||||
let left: i64 = i64::from_be_bytes(buf[64..72].try_into().ok()?);
|
|
||||||
let uploaded: i64 = i64::from_be_bytes(buf[72..80].try_into().ok()?);
|
|
||||||
let event: Event = Event::from_i32(i32::from_be_bytes(buf[80..84].try_into().ok()?))?;
|
|
||||||
let ip_addr: Option<Ipv4Addr> = if buf[84..88].iter().all(|b| *b == 0x00) {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(Ipv4Addr::from_bits(u32::from_be_bytes(
|
|
||||||
buf[84..88].try_into().ok()?,
|
|
||||||
)))
|
|
||||||
};
|
|
||||||
let key: u32 = u32::from_be_bytes(buf[88..92].try_into().ok()?);
|
|
||||||
let num_want: Option<u32> = {
|
|
||||||
let want = i32::from_be_bytes(buf[92..96].try_into().ok()?);
|
|
||||||
if want < 0 { None } else { Some(want as u32) }
|
|
||||||
};
|
|
||||||
let port: u16 = u16::from_be_bytes(buf[96..98].try_into().ok()?);
|
|
||||||
|
|
||||||
Some(AnnounceRequest {
|
|
||||||
connection_id: connection_id,
|
|
||||||
transaction_id: transaction_id,
|
|
||||||
info_hash: info_hash,
|
|
||||||
peer_id: peer_id,
|
|
||||||
downloaded: downloaded,
|
|
||||||
left: left,
|
|
||||||
uploaded: uploaded,
|
|
||||||
event: event,
|
|
||||||
ipv4_address: ip_addr,
|
|
||||||
key: key,
|
|
||||||
num_want: num_want,
|
|
||||||
port: port,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn try_parse_scrape(buf: &[u8]) -> Option<ScrapeRequest> {
|
|
||||||
if buf.len() < MIN_SCRAPE_REQUEST_SIZE {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
let connection_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?);
|
|
||||||
let transaction_id: i32 = i32::from_be_bytes(buf[12..16].try_into().ok()?);
|
|
||||||
let info_hashes: Vec<InfoHash> = buf[16..]
|
|
||||||
.chunks_exact(20)
|
|
||||||
.map(|b| InfoHash {
|
|
||||||
bytes: b.try_into().unwrap(), // unwrap: `chunks_exact` guarantees the size
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
Some(ScrapeRequest {
|
|
||||||
connection_id: connection_id,
|
|
||||||
transaction_id: transaction_id,
|
|
||||||
info_hashes: info_hashes,
|
|
||||||
})
|
|
||||||
}
|
|
214
src/udp_server.rs
Normal file
214
src/udp_server.rs
Normal file
@ -0,0 +1,214 @@
|
|||||||
|
use std::net::{Ipv4Addr, SocketAddr, UdpSocket};
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use flume::{Receiver, Sender};
|
||||||
|
use log;
|
||||||
|
|
||||||
|
use crate::bittorrent::info_hash::InfoHash;
|
||||||
|
use crate::bittorrent::peer::PeerId;
|
||||||
|
use crate::bittorrent::protocol::Event;
|
||||||
|
use crate::bittorrent::udp::{
|
||||||
|
AnnounceRequest, AnnounceResponse, MIN_ANNOUNCE_REQUEST_SIZE, MIN_SCRAPE_REQUEST_SIZE,
|
||||||
|
ScrapeRequest, UdpResponse,
|
||||||
|
};
|
||||||
|
use crate::meteoro::ServerControlMessage;
|
||||||
|
use crate::{
|
||||||
|
bittorrent::{
|
||||||
|
protocol::{Action, UDP_MAGIC},
|
||||||
|
udp::{ConnectRequest, UdpRequest},
|
||||||
|
},
|
||||||
|
tracker::{RequestMessage, ResponseMessage},
|
||||||
|
};
|
||||||
|
|
||||||
|
const UDP_RECV_BUF_SIZE: usize = u16::MAX as usize;
|
||||||
|
const UDP_SEND_BUF_SIZE: usize = u16::MAX as usize; // Could also be 1500 for typical Ethernet MTU (?)
|
||||||
|
|
||||||
|
pub fn receive(
|
||||||
|
server_id: usize,
|
||||||
|
addr: SocketAddr,
|
||||||
|
ctrl: Sender<ServerControlMessage>,
|
||||||
|
reqs: Sender<RequestMessage>,
|
||||||
|
) -> Result<()> {
|
||||||
|
let socket = UdpSocket::bind(addr)?;
|
||||||
|
|
||||||
|
// Provide cloned socket for networking send thread
|
||||||
|
|
||||||
|
ctrl.send(ServerControlMessage::UdpSocketBound((
|
||||||
|
server_id,
|
||||||
|
socket.try_clone()?,
|
||||||
|
)))?;
|
||||||
|
|
||||||
|
log::debug!("Starting UDP server {} on {}", server_id, addr);
|
||||||
|
|
||||||
|
let mut buf: [u8; UDP_RECV_BUF_SIZE] = [0; UDP_RECV_BUF_SIZE];
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match socket.recv_from(&mut buf) {
|
||||||
|
Ok((n_bytes, src)) => {
|
||||||
|
log::trace!("Received {} bytes from {}", n_bytes, src);
|
||||||
|
|
||||||
|
match try_parse_packet(&buf[..n_bytes]) {
|
||||||
|
Some(req) => reqs.send(RequestMessage {
|
||||||
|
request: req,
|
||||||
|
src_addr: src,
|
||||||
|
server_id: server_id,
|
||||||
|
})?,
|
||||||
|
None => (),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
log::error!(
|
||||||
|
"Server {} failed to receive on socket: {}",
|
||||||
|
server_id,
|
||||||
|
error
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send(server_id: usize, socket: UdpSocket, resps: Receiver<ResponseMessage>) -> Result<()> {
|
||||||
|
let mut buf: [u8; UDP_SEND_BUF_SIZE] = [0; UDP_SEND_BUF_SIZE];
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let response = resps.recv()?;
|
||||||
|
|
||||||
|
log::trace!(
|
||||||
|
"Response to {} | {:?}",
|
||||||
|
response.dst_addr,
|
||||||
|
response.response
|
||||||
|
);
|
||||||
|
|
||||||
|
let n_bytes = match response.response {
|
||||||
|
UdpResponse::Connect(connect) => connect.write_to_buf(&mut buf),
|
||||||
|
UdpResponse::Announce(announce) => match announce {
|
||||||
|
AnnounceResponse::V4(v4) => v4.write_to_buf(&mut buf),
|
||||||
|
AnnounceResponse::V6(v6) => v6.write_to_buf(&mut buf),
|
||||||
|
},
|
||||||
|
UdpResponse::Scrape(scrape) => scrape.write_to_buf(&mut buf),
|
||||||
|
UdpResponse::Error(_) => 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
let n_sent = match socket.send_to(&buf[..n_bytes], response.dst_addr) {
|
||||||
|
Ok(n) => n,
|
||||||
|
Err(e) => {
|
||||||
|
log::error!(
|
||||||
|
"Server {} failed to send {} bytes to {}: {}",
|
||||||
|
server_id,
|
||||||
|
n_bytes,
|
||||||
|
response.dst_addr,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
log::trace!(
|
||||||
|
"Sent {} bytes to {}: {:x?}",
|
||||||
|
n_sent,
|
||||||
|
response.dst_addr,
|
||||||
|
&buf[..n_bytes]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_parse_packet(buf: &[u8]) -> Option<UdpRequest> {
|
||||||
|
if buf.len() < 16 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let action: Action = Action::from_i32(i32::from_be_bytes(buf[8..12].try_into().ok()?))?;
|
||||||
|
|
||||||
|
let request = match action {
|
||||||
|
Action::Connect => UdpRequest::Connect(try_parse_connect(&buf)?),
|
||||||
|
Action::Announce => UdpRequest::Announce(try_parse_announce(&buf)?),
|
||||||
|
Action::Scrape => UdpRequest::Scrape(try_parse_scrape(&buf)?),
|
||||||
|
_ => return None,
|
||||||
|
};
|
||||||
|
|
||||||
|
return Some(request);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_parse_connect(buf: &[u8]) -> Option<ConnectRequest> {
|
||||||
|
// Buffer length is checked to be at least 16 in `try_parse_packet`
|
||||||
|
|
||||||
|
let protocol_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?);
|
||||||
|
|
||||||
|
if protocol_id != UDP_MAGIC {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(ConnectRequest {
|
||||||
|
protocol_id: protocol_id,
|
||||||
|
transaction_id: i32::from_be_bytes(buf[12..16].try_into().ok()?),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_parse_announce(buf: &[u8]) -> Option<AnnounceRequest> {
|
||||||
|
if buf.len() < MIN_ANNOUNCE_REQUEST_SIZE {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let connection_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?);
|
||||||
|
let transaction_id: i32 = i32::from_be_bytes(buf[12..16].try_into().ok()?);
|
||||||
|
let info_hash: InfoHash = InfoHash {
|
||||||
|
bytes: buf[16..36].try_into().ok()?,
|
||||||
|
};
|
||||||
|
let peer_id: PeerId = PeerId {
|
||||||
|
bytes: buf[36..56].try_into().unwrap_or_default(),
|
||||||
|
};
|
||||||
|
let downloaded: i64 = i64::from_be_bytes(buf[56..64].try_into().ok()?);
|
||||||
|
let left: i64 = i64::from_be_bytes(buf[64..72].try_into().ok()?);
|
||||||
|
let uploaded: i64 = i64::from_be_bytes(buf[72..80].try_into().ok()?);
|
||||||
|
let event: Event = Event::from_i32(i32::from_be_bytes(buf[80..84].try_into().ok()?))?;
|
||||||
|
let ip_addr: Option<Ipv4Addr> = if buf[84..88].iter().all(|b| *b == 0x00) {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(Ipv4Addr::from_bits(u32::from_be_bytes(
|
||||||
|
buf[84..88].try_into().ok()?,
|
||||||
|
)))
|
||||||
|
};
|
||||||
|
let key: u32 = u32::from_be_bytes(buf[88..92].try_into().ok()?);
|
||||||
|
let num_want: Option<u32> = {
|
||||||
|
let want = i32::from_be_bytes(buf[92..96].try_into().ok()?);
|
||||||
|
if want < 0 { None } else { Some(want as u32) }
|
||||||
|
};
|
||||||
|
let port: u16 = u16::from_be_bytes(buf[96..98].try_into().ok()?);
|
||||||
|
|
||||||
|
Some(AnnounceRequest {
|
||||||
|
connection_id: connection_id,
|
||||||
|
transaction_id: transaction_id,
|
||||||
|
info_hash: info_hash,
|
||||||
|
peer_id: peer_id,
|
||||||
|
downloaded: downloaded,
|
||||||
|
left: left,
|
||||||
|
uploaded: uploaded,
|
||||||
|
event: event,
|
||||||
|
ipv4_address: ip_addr,
|
||||||
|
key: key,
|
||||||
|
num_want: num_want,
|
||||||
|
port: port,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_parse_scrape(buf: &[u8]) -> Option<ScrapeRequest> {
|
||||||
|
if buf.len() < MIN_SCRAPE_REQUEST_SIZE {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
let connection_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?);
|
||||||
|
let transaction_id: i32 = i32::from_be_bytes(buf[12..16].try_into().ok()?);
|
||||||
|
let info_hashes: Vec<InfoHash> = buf[16..]
|
||||||
|
.chunks_exact(20)
|
||||||
|
.map(|b| InfoHash {
|
||||||
|
bytes: b.try_into().unwrap(), // unwrap: `chunks_exact` guarantees the size
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
Some(ScrapeRequest {
|
||||||
|
connection_id: connection_id,
|
||||||
|
transaction_id: transaction_id,
|
||||||
|
info_hashes: info_hashes,
|
||||||
|
})
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user