Initial commit
This commit is contained in:
22
src/bittorrent/info_hash.rs
Normal file
22
src/bittorrent/info_hash.rs
Normal file
@@ -0,0 +1,22 @@
|
||||
use std::fmt;
|
||||
use std::fmt::Write;
|
||||
|
||||
pub const INFO_HASH_SIZE: usize = 20;
|
||||
|
||||
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
|
||||
pub struct InfoHash {
|
||||
pub bytes: [u8; INFO_HASH_SIZE],
|
||||
}
|
||||
|
||||
impl fmt::Display for InfoHash {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
const INFO_HASH_STR_LEN: usize = INFO_HASH_SIZE * 2;
|
||||
let mut hex_string: heapless::String<INFO_HASH_STR_LEN> = heapless::String::new();
|
||||
|
||||
for b in self.bytes {
|
||||
let _ = write!(hex_string, "{:02x}", b);
|
||||
}
|
||||
|
||||
write!(f, "{}", hex_string)
|
||||
}
|
||||
}
|
||||
4
src/bittorrent/mod.rs
Normal file
4
src/bittorrent/mod.rs
Normal file
@@ -0,0 +1,4 @@
|
||||
pub mod info_hash;
|
||||
pub mod peer;
|
||||
pub mod protocol;
|
||||
pub mod udp;
|
||||
20
src/bittorrent/peer.rs
Normal file
20
src/bittorrent/peer.rs
Normal file
@@ -0,0 +1,20 @@
|
||||
use std::fmt;
|
||||
|
||||
const MAX_PEER_ID_LENGTH: usize = 20;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PeerId {
|
||||
pub bytes: [u8; MAX_PEER_ID_LENGTH],
|
||||
}
|
||||
|
||||
impl PeerId {
|
||||
pub fn to_string_lossy(&self) -> String {
|
||||
String::from_utf8_lossy(&self.bytes).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for PeerId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.to_string_lossy())
|
||||
}
|
||||
}
|
||||
62
src/bittorrent/protocol.rs
Normal file
62
src/bittorrent/protocol.rs
Normal file
@@ -0,0 +1,62 @@
|
||||
/// BitTorrent's UDP magic `connection_id` constant
|
||||
pub const UDP_MAGIC: i64 = 0x41727101980;
|
||||
|
||||
#[repr(i32)]
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
|
||||
pub enum Action {
|
||||
Connect = 0,
|
||||
Announce = 1,
|
||||
Scrape = 2,
|
||||
Error = 3,
|
||||
}
|
||||
|
||||
impl Action {
|
||||
pub fn from_i32(val: i32) -> Option<Action> {
|
||||
match val {
|
||||
0 => Some(Action::Connect),
|
||||
1 => Some(Action::Announce),
|
||||
2 => Some(Action::Scrape),
|
||||
3 => Some(Action::Error),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_i32(&self) -> i32 {
|
||||
match self {
|
||||
Action::Connect => 0,
|
||||
Action::Announce => 1,
|
||||
Action::Scrape => 2,
|
||||
Action::Error => 3,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(i32)]
|
||||
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
|
||||
pub enum Event {
|
||||
None = 0,
|
||||
Completed = 1,
|
||||
Started = 2,
|
||||
Stopped = 3,
|
||||
}
|
||||
|
||||
impl Event {
|
||||
pub fn from_i32(val: i32) -> Option<Event> {
|
||||
match val {
|
||||
0 => Some(Event::None),
|
||||
1 => Some(Event::Completed),
|
||||
2 => Some(Event::Started),
|
||||
3 => Some(Event::Stopped),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn to_i32(&self) -> i32 {
|
||||
match self {
|
||||
Event::None => 0,
|
||||
Event::Completed => 1,
|
||||
Event::Started => 2,
|
||||
Event::Stopped => 3,
|
||||
}
|
||||
}
|
||||
}
|
||||
215
src/bittorrent/udp.rs
Normal file
215
src/bittorrent/udp.rs
Normal file
@@ -0,0 +1,215 @@
|
||||
use std::{
|
||||
io::{Cursor, Write},
|
||||
net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6},
|
||||
};
|
||||
|
||||
use crate::bittorrent::{
|
||||
info_hash::InfoHash,
|
||||
peer::PeerId,
|
||||
protocol::{Action, Event},
|
||||
};
|
||||
|
||||
pub const MIN_ANNOUNCE_REQUEST_SIZE: usize = 98;
|
||||
pub const MIN_SCRAPE_REQUEST_SIZE: usize = 36;
|
||||
pub const MIN_CONNECTION_RESPONSE_SIZE: usize = 16;
|
||||
pub const MIN_ANNOUNCE_RESPONSE_SIZE: usize = 20;
|
||||
pub const MIN_SCRAPE_RESPONSE_SIZE: usize = 8;
|
||||
pub const SCRAPE_RESPONSE_ENTRY_SIZE: usize = 12;
|
||||
|
||||
pub const IPV4_SIZE: usize = Ipv4Addr::BITS as usize / 8;
|
||||
pub const IPV6_SIZE: usize = Ipv6Addr::BITS as usize / 8;
|
||||
pub const PORT_SIZE: usize = size_of::<u16>();
|
||||
pub const IPV4_ADDR_PAIR_SIZE: usize = IPV4_SIZE + PORT_SIZE;
|
||||
pub const IPV6_ADDR_PAIR_SIZE: usize = IPV6_SIZE + PORT_SIZE;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectRequest {
|
||||
pub protocol_id: i64,
|
||||
pub transaction_id: i32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ConnectResponse {
|
||||
pub transaction_id: i32,
|
||||
pub connection_id: i64,
|
||||
}
|
||||
|
||||
impl ConnectResponse {
|
||||
pub fn write_to_buf(&self, buf: &mut [u8]) -> usize {
|
||||
let mut c = Cursor::new(buf);
|
||||
let mut written: usize = 0;
|
||||
|
||||
const ACTION: i32 = Action::Connect as i32;
|
||||
|
||||
written += c.write(&ACTION.to_be_bytes()).unwrap();
|
||||
written += c.write(&self.transaction_id.to_be_bytes()).unwrap();
|
||||
written += c.write(&self.connection_id.to_be_bytes()).unwrap();
|
||||
|
||||
return written;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AnnounceRequest {
|
||||
pub connection_id: i64,
|
||||
pub transaction_id: i32,
|
||||
/// Info hash of the torrent
|
||||
pub info_hash: InfoHash,
|
||||
/// Peer's ID
|
||||
pub peer_id: PeerId,
|
||||
/// Number of bytes peer has downloaded this session
|
||||
pub downloaded: i64,
|
||||
/// Number of bytes peer has remaining to complete the download
|
||||
pub left: i64,
|
||||
/// Number of bytes peer has uploaded this session
|
||||
pub uploaded: i64,
|
||||
/// One of 4 announce events. `Event::None` is used to update presence but not change peer's state.
|
||||
pub event: Event,
|
||||
/// Peer's desired IPv4 address to be sent the announce response on. `None` implies that the packet's source address should be used.
|
||||
pub ipv4_address: Option<Ipv4Addr>,
|
||||
/// Client's unique key for tracking a single client over multiple connections (sockets)
|
||||
pub key: u32,
|
||||
/// Maximum number of info hashes the client is requesting. `None` implies a tracker-defined default value should be used.
|
||||
pub num_want: Option<u32>,
|
||||
/// Peer's desired port to be the announce response on. If `ipv4_address` is `Some` then this should be used.
|
||||
pub port: u16,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum AnnounceResponse {
|
||||
V4(AnnounceResponseV4),
|
||||
V6(AnnounceResponseV6),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AnnounceResponseCommon {
|
||||
pub transaction_id: i32,
|
||||
/// Number of seconds peer should wait before re-announcing
|
||||
pub interval: i32,
|
||||
/// Number of peers still downloading
|
||||
pub leechers: i32,
|
||||
/// Number of peers who have completed the download and are seeding
|
||||
pub seeders: i32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AnnounceResponseV4 {
|
||||
pub common: AnnounceResponseCommon,
|
||||
pub peers: Vec<SocketAddrV4>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AnnounceResponseV6 {
|
||||
pub common: AnnounceResponseCommon,
|
||||
pub peers: Vec<SocketAddrV6>,
|
||||
}
|
||||
|
||||
impl AnnounceResponseCommon {
|
||||
pub fn write_to_buf(&self, buf: &mut [u8]) -> usize {
|
||||
const ACTION: i32 = Action::Announce as i32;
|
||||
buf[0..4].copy_from_slice(&ACTION.to_be_bytes());
|
||||
buf[4..8].copy_from_slice(&self.transaction_id.to_be_bytes());
|
||||
buf[8..12].copy_from_slice(&self.interval.to_be_bytes());
|
||||
buf[12..16].copy_from_slice(&self.leechers.to_be_bytes());
|
||||
buf[16..20].copy_from_slice(&self.seeders.to_be_bytes());
|
||||
|
||||
return MIN_ANNOUNCE_RESPONSE_SIZE;
|
||||
}
|
||||
}
|
||||
|
||||
impl AnnounceResponseV4 {
|
||||
pub fn write_to_buf(&self, buf: &mut [u8]) -> usize {
|
||||
let written = self.common.write_to_buf(buf);
|
||||
|
||||
for (offset, addr) in (written..buf.len())
|
||||
.step_by(IPV4_ADDR_PAIR_SIZE)
|
||||
.zip(&self.peers)
|
||||
{
|
||||
buf[offset..offset + IPV4_SIZE].copy_from_slice(&addr.ip().octets());
|
||||
buf[offset + IPV4_SIZE..offset + IPV4_ADDR_PAIR_SIZE]
|
||||
.copy_from_slice(&addr.port().to_be_bytes());
|
||||
}
|
||||
|
||||
return written + self.peers.len() * IPV4_ADDR_PAIR_SIZE;
|
||||
}
|
||||
}
|
||||
|
||||
impl AnnounceResponseV6 {
|
||||
pub fn write_to_buf(&self, buf: &mut [u8]) -> usize {
|
||||
let written = self.common.write_to_buf(buf);
|
||||
|
||||
for (offset, addr) in (written..buf.len())
|
||||
.step_by(IPV6_ADDR_PAIR_SIZE)
|
||||
.zip(&self.peers)
|
||||
{
|
||||
buf[offset..offset + IPV6_SIZE].copy_from_slice(&addr.ip().octets());
|
||||
buf[offset + IPV6_SIZE..offset + IPV6_ADDR_PAIR_SIZE]
|
||||
.copy_from_slice(&addr.port().to_be_bytes());
|
||||
}
|
||||
|
||||
return written + self.peers.len() * IPV6_ADDR_PAIR_SIZE;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ScrapeRequest {
|
||||
pub connection_id: i64,
|
||||
pub transaction_id: i32,
|
||||
pub info_hashes: Vec<InfoHash>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct ScrapeStats {
|
||||
/// Number of connected peers who have completed the download and are seeding
|
||||
pub seeders: i32,
|
||||
/// Number of peers who have ever completed the download
|
||||
pub completed: i32,
|
||||
/// Number of connected peers who have not completed the download and are seeding
|
||||
pub leechers: i32,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ScrapeResponse {
|
||||
pub transaction_id: i32,
|
||||
pub stats: Vec<ScrapeStats>,
|
||||
}
|
||||
|
||||
impl ScrapeResponse {
|
||||
pub fn write_to_buf(&self, buf: &mut [u8]) -> usize {
|
||||
let mut c = Cursor::new(buf);
|
||||
let mut written: usize = 0;
|
||||
|
||||
const ACTION: i32 = Action::Scrape as i32;
|
||||
written += c.write(&ACTION.to_be_bytes()).unwrap();
|
||||
written += c.write(&self.transaction_id.to_be_bytes()).unwrap();
|
||||
|
||||
for s in &self.stats {
|
||||
written += c.write(&s.seeders.to_be_bytes()).unwrap();
|
||||
written += c.write(&s.completed.to_be_bytes()).unwrap();
|
||||
written += c.write(&s.leechers.to_be_bytes()).unwrap();
|
||||
}
|
||||
|
||||
return written;
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ErrorResponse {
|
||||
pub transaction_id: i32,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum UdpRequest {
|
||||
Connect(ConnectRequest),
|
||||
Announce(AnnounceRequest),
|
||||
Scrape(ScrapeRequest),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum UdpResponse {
|
||||
Connect(ConnectResponse),
|
||||
Announce(AnnounceResponse),
|
||||
Scrape(ScrapeResponse),
|
||||
Error(ErrorResponse),
|
||||
}
|
||||
15
src/constants.rs
Normal file
15
src/constants.rs
Normal file
@@ -0,0 +1,15 @@
|
||||
pub const METEORO_HELP_TEXT: &'static str = "Usage:\tmeteoro [OPTIONS]
|
||||
|
||||
OPTIONS:
|
||||
|
||||
-a IPV4_ADDRESS Provide an IPv4 address to bind to
|
||||
(example: -a 192.168.1.2:6969)
|
||||
|
||||
-A IPV6_ADDRESS Provide an IPv6 address to bind to
|
||||
(example: -A [::1]:6970)
|
||||
|
||||
--debug Enable debug logging
|
||||
--trace Enable trace logging (more verbose debug)
|
||||
|
||||
-h | --help Show this help text
|
||||
-v | --version Print the program's build & version information";
|
||||
97
src/main.rs
Normal file
97
src/main.rs
Normal file
@@ -0,0 +1,97 @@
|
||||
use std::{
|
||||
env,
|
||||
net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
|
||||
process::ExitCode,
|
||||
};
|
||||
|
||||
use log::LevelFilter;
|
||||
use simplelog::{ColorChoice, ConfigBuilder, TermLogger, TerminalMode};
|
||||
|
||||
use crate::meteoro::Meteoro;
|
||||
|
||||
mod bittorrent;
|
||||
mod constants;
|
||||
mod meteoro;
|
||||
mod tracker;
|
||||
mod udp_server;
|
||||
|
||||
fn main() -> ExitCode {
|
||||
let args: Vec<String> = env::args().collect();
|
||||
|
||||
if args.contains(&String::from("--help")) || args.contains(&String::from("-h")) {
|
||||
println!("{}", constants::METEORO_HELP_TEXT);
|
||||
return ExitCode::SUCCESS;
|
||||
}
|
||||
|
||||
if args.contains(&String::from("--version")) || args.contains(&String::from("-v")) {
|
||||
println!("{} {}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION"));
|
||||
return ExitCode::SUCCESS;
|
||||
}
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
let mut log_level: LevelFilter = LevelFilter::Trace;
|
||||
#[cfg(not(debug_assertions))]
|
||||
let mut log_level: LevelFilter = LevelFilter::Info;
|
||||
|
||||
if args.contains(&String::from("--debug")) {
|
||||
log_level = LevelFilter::Debug;
|
||||
} else if args.contains(&String::from("--trace")) {
|
||||
log_level = LevelFilter::Trace;
|
||||
}
|
||||
|
||||
let logger_config = ConfigBuilder::new()
|
||||
.set_target_level(LevelFilter::Trace)
|
||||
.set_location_level(LevelFilter::Off)
|
||||
.set_thread_level(LevelFilter::Off)
|
||||
.set_time_level(LevelFilter::Info)
|
||||
.build();
|
||||
|
||||
TermLogger::init(
|
||||
log_level,
|
||||
logger_config,
|
||||
TerminalMode::Mixed,
|
||||
ColorChoice::Auto,
|
||||
)
|
||||
.expect("failed to initialise terminal logging");
|
||||
|
||||
let ipv4_flag_idxs: Vec<usize> = args
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(idx, s)| if s == "-a" { Some(idx) } else { None })
|
||||
.collect();
|
||||
let ipv6_flag_idxs: Vec<usize> = args
|
||||
.iter()
|
||||
.enumerate()
|
||||
.filter_map(|(idx, s)| if s == "-A" { Some(idx) } else { None })
|
||||
.collect();
|
||||
let mut ip_addrs: Vec<SocketAddr> = ipv4_flag_idxs
|
||||
.iter()
|
||||
.filter_map(|idx| {
|
||||
Some(SocketAddr::V4(
|
||||
args.get(*idx + 1)?.parse::<SocketAddrV4>().ok()?,
|
||||
))
|
||||
})
|
||||
.collect();
|
||||
ip_addrs.extend(ipv6_flag_idxs.iter().filter_map(|idx| {
|
||||
Some(SocketAddr::V6(
|
||||
args.get(*idx + 1)?.parse::<SocketAddrV6>().ok()?,
|
||||
))
|
||||
}));
|
||||
|
||||
if ip_addrs.is_empty() {
|
||||
ip_addrs.push(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 6969)));
|
||||
ip_addrs.push(SocketAddr::V6(SocketAddrV6::new(
|
||||
Ipv6Addr::LOCALHOST,
|
||||
6969,
|
||||
0,
|
||||
0,
|
||||
)));
|
||||
}
|
||||
|
||||
// bufbuf():codingcodingbuf/
|
||||
|
||||
return match Meteoro::start(&ip_addrs) {
|
||||
Ok(_) => ExitCode::SUCCESS,
|
||||
Err(_) => ExitCode::FAILURE,
|
||||
};
|
||||
}
|
||||
111
src/meteoro.rs
Normal file
111
src/meteoro.rs
Normal file
@@ -0,0 +1,111 @@
|
||||
use std::net::{SocketAddr, UdpSocket};
|
||||
use std::thread;
|
||||
|
||||
use anyhow::Result;
|
||||
use flume::Sender;
|
||||
use threadpool::ThreadPool;
|
||||
|
||||
use crate::tracker::{RequestMessage, ResponseMessage, Tracker};
|
||||
use crate::udp_server;
|
||||
|
||||
/// Instance of the meteoro BitTorrent tracker
|
||||
/// Manages the tracker, networking & API
|
||||
pub struct Meteoro {}
|
||||
|
||||
#[repr(u8)]
|
||||
pub enum TrackerControlMessage {
|
||||
/// Signal that the receiver should exit their thread of execution
|
||||
Exit = 0,
|
||||
}
|
||||
|
||||
#[repr(u8)]
|
||||
pub enum ServerControlMessage {
|
||||
/// Server with `(id, _)` has bound to a socket, providing a clone of the socket
|
||||
UdpSocketBound((usize, UdpSocket)),
|
||||
}
|
||||
|
||||
impl Meteoro {
|
||||
pub fn start(addrs: &[SocketAddr]) -> Result<()> {
|
||||
// Create channels
|
||||
|
||||
let (ctrl_send, ctrl_recv) = flume::unbounded::<TrackerControlMessage>();
|
||||
let (srv_ctrl_send, srv_ctrl_recv) = flume::unbounded::<ServerControlMessage>();
|
||||
let (reqs_send, reqs_recv) = flume::unbounded::<RequestMessage>();
|
||||
let resp_chans: Vec<_> = (0..addrs.len())
|
||||
.map(|_| flume::unbounded::<ResponseMessage>())
|
||||
.collect();
|
||||
|
||||
// Start networking threads
|
||||
|
||||
{
|
||||
// Each receiving networking thread requires:
|
||||
// - a socket address to bind to
|
||||
// - a control message channel (send), for providing the bound socket
|
||||
// - a request message channel (send), for providing requests to the tracker
|
||||
|
||||
// Receive threads
|
||||
|
||||
let recv_threads = ThreadPool::new(addrs.len());
|
||||
|
||||
for (id, addr) in addrs.iter().enumerate() {
|
||||
let ctrl = srv_ctrl_send.clone();
|
||||
let reqs = reqs_send.clone();
|
||||
let addr = *addr;
|
||||
|
||||
recv_threads.execute(move || {
|
||||
let _ = udp_server::receive(id, addr, ctrl, reqs);
|
||||
});
|
||||
}
|
||||
|
||||
// Each sending networking thread requires:
|
||||
// - a socket to send responses on
|
||||
// - a response message channel (receive), for receiving handled requests
|
||||
|
||||
// Send threads
|
||||
|
||||
let send_threads = ThreadPool::new(addrs.len());
|
||||
|
||||
for _ in 0..addrs.len() {
|
||||
match srv_ctrl_recv.recv()? {
|
||||
ServerControlMessage::UdpSocketBound((id, socket)) => {
|
||||
let (_, resps) = resp_chans[id].clone();
|
||||
|
||||
send_threads.execute(move || {
|
||||
let _ = udp_server::send(id, socket, resps);
|
||||
});
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
// Start tracker thread
|
||||
|
||||
let tracker_thread = {
|
||||
// Tracker requires:
|
||||
// - a request message channel (recv), for receiving requests from the server(s)
|
||||
// - corresponding response channels (send), for providing handled requests
|
||||
|
||||
let resps: Vec<Sender<ResponseMessage>> = resp_chans
|
||||
.iter()
|
||||
.map(|(resp_chan, _)| resp_chan.clone())
|
||||
.collect();
|
||||
|
||||
thread::spawn(move || {
|
||||
let mut tracker = Tracker::new();
|
||||
let _ = tracker.start(reqs_recv, resps.as_slice(), ctrl_recv);
|
||||
})
|
||||
};
|
||||
|
||||
// Start API thread
|
||||
|
||||
{
|
||||
// API thread requires
|
||||
// - a control message channel (recv)
|
||||
}
|
||||
|
||||
let _ = tracker_thread.join();
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
424
src/tracker.rs
Normal file
424
src/tracker.rs
Normal file
@@ -0,0 +1,424 @@
|
||||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::Result;
|
||||
use flume::{Receiver, Selector, Sender};
|
||||
use rand::Rng;
|
||||
use rand::rngs::ThreadRng;
|
||||
|
||||
use crate::bittorrent::info_hash::InfoHash;
|
||||
use crate::bittorrent::protocol::Event;
|
||||
use crate::bittorrent::udp::{
|
||||
AnnounceResponse, AnnounceResponseCommon, AnnounceResponseV4, AnnounceResponseV6,
|
||||
ConnectResponse, ScrapeResponse, ScrapeStats, UdpRequest, UdpResponse,
|
||||
};
|
||||
use crate::meteoro::TrackerControlMessage;
|
||||
|
||||
pub struct RequestMessage {
|
||||
pub(crate) server_id: usize,
|
||||
pub(crate) src_addr: SocketAddr,
|
||||
pub(crate) request: UdpRequest,
|
||||
}
|
||||
|
||||
pub struct ResponseMessage {
|
||||
pub(crate) response: UdpResponse,
|
||||
pub(crate) dst_addr: SocketAddr,
|
||||
}
|
||||
|
||||
struct PeerMetadata {
|
||||
socket_addr: SocketAddr,
|
||||
last_active: Instant,
|
||||
}
|
||||
|
||||
struct PeerStatus {
|
||||
socket_addr: SocketAddr,
|
||||
last_event: Event,
|
||||
last_active: Instant,
|
||||
downloaded: u64,
|
||||
uploaded: u64,
|
||||
remaining: u64,
|
||||
}
|
||||
|
||||
pub const GARBAGE_COLLECTION_INTERVAL: Duration = Duration::from_secs(20);
|
||||
|
||||
pub const CONNECTION_EXPIRE_TIME: Duration = Duration::from_secs(90);
|
||||
pub const DEFAULT_ANNOUNCE_INTERVAL: Duration = Duration::from_secs(60);
|
||||
pub const DEFAULT_ANNOUNCE_WANT: usize = 80;
|
||||
|
||||
type ConnectionIdMap = HashMap<i64, PeerMetadata>;
|
||||
type InfoHashMap = HashMap<InfoHash, Vec<PeerStatus>>;
|
||||
|
||||
pub struct Tracker {
|
||||
peers: ConnectionIdMap,
|
||||
info_hashes: InfoHashMap,
|
||||
rng: ThreadRng,
|
||||
}
|
||||
|
||||
impl Tracker {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
peers: ConnectionIdMap::new(),
|
||||
info_hashes: InfoHashMap::new(),
|
||||
rng: rand::rng(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn start(
|
||||
&mut self,
|
||||
reqs: Receiver<RequestMessage>,
|
||||
resps: &[Sender<ResponseMessage>],
|
||||
ctrl: Receiver<TrackerControlMessage>,
|
||||
) -> Result<()> {
|
||||
let mut next_gc = Instant::now() + GARBAGE_COLLECTION_INTERVAL;
|
||||
let should_exit = AtomicBool::new(false);
|
||||
|
||||
// Wait for messages from the networking threads / main application
|
||||
|
||||
while !should_exit.load(Ordering::Relaxed) {
|
||||
let _ = Selector::new()
|
||||
.recv(&reqs, |request| {
|
||||
// Tracker request
|
||||
|
||||
let request = request.unwrap();
|
||||
|
||||
log::trace!("Request from {} | {:?}", request.src_addr, request.request);
|
||||
|
||||
match self.handle_request(&request) {
|
||||
Some(response) => {
|
||||
resps[request.server_id].send(response).unwrap();
|
||||
}
|
||||
None => (),
|
||||
};
|
||||
})
|
||||
.recv(&ctrl, |msg| {
|
||||
// Control message
|
||||
|
||||
let msg = msg.unwrap();
|
||||
|
||||
match msg {
|
||||
TrackerControlMessage::Exit => {
|
||||
should_exit.store(true, Ordering::Relaxed);
|
||||
}
|
||||
};
|
||||
})
|
||||
.wait();
|
||||
|
||||
// Garbage collection
|
||||
|
||||
if Instant::now() > next_gc {
|
||||
self.garbage_collect();
|
||||
|
||||
next_gc = Instant::now()
|
||||
+ GARBAGE_COLLECTION_INTERVAL
|
||||
+ Duration::from_millis(self.rng.random::<u64>() % 5000);
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
fn connection_valid(&self, connection_id: i64, src_addr: SocketAddr) -> bool {
|
||||
let peers = &self.peers;
|
||||
|
||||
return match peers.get(&connection_id) {
|
||||
Some(metadata) => metadata.socket_addr == src_addr,
|
||||
None => false,
|
||||
};
|
||||
}
|
||||
|
||||
fn handle_request(&mut self, request: &RequestMessage) -> Option<ResponseMessage> {
|
||||
match &request.request {
|
||||
UdpRequest::Connect(connect) => {
|
||||
let new_id: i64 = self.rng.random();
|
||||
|
||||
let peers = &mut self.peers;
|
||||
|
||||
peers.insert(
|
||||
new_id,
|
||||
PeerMetadata {
|
||||
socket_addr: request.src_addr,
|
||||
last_active: Instant::now(),
|
||||
},
|
||||
);
|
||||
|
||||
Some(ResponseMessage {
|
||||
response: UdpResponse::Connect(ConnectResponse {
|
||||
transaction_id: connect.transaction_id,
|
||||
connection_id: new_id,
|
||||
}),
|
||||
dst_addr: request.src_addr,
|
||||
})
|
||||
}
|
||||
|
||||
UdpRequest::Announce(announce) => {
|
||||
if !self.connection_valid(announce.connection_id, request.src_addr) {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Ensure we honour the desired number of peers, within our boundaries
|
||||
|
||||
let n_announce_want: usize = if let Some(n) = announce.num_want {
|
||||
if (n as usize) < DEFAULT_ANNOUNCE_WANT {
|
||||
n as usize
|
||||
} else {
|
||||
DEFAULT_ANNOUNCE_WANT
|
||||
}
|
||||
} else {
|
||||
DEFAULT_ANNOUNCE_WANT
|
||||
};
|
||||
|
||||
let mut n_swarm_peers: usize = 0;
|
||||
let mut n_seeders: usize = 0;
|
||||
let info_hashes = &mut self.info_hashes;
|
||||
|
||||
let swarm_addrs = match info_hashes.get_mut(&announce.info_hash) {
|
||||
None => {
|
||||
// Info hash isn't currently tracked
|
||||
// No relevant peers in the swarm
|
||||
|
||||
info_hashes.insert(
|
||||
announce.info_hash,
|
||||
vec![PeerStatus {
|
||||
socket_addr: request.src_addr,
|
||||
last_event: announce.event,
|
||||
last_active: Instant::now(),
|
||||
downloaded: announce.downloaded as u64,
|
||||
uploaded: announce.uploaded as u64,
|
||||
remaining: announce.left as u64,
|
||||
}],
|
||||
);
|
||||
|
||||
None
|
||||
}
|
||||
Some(swarm) => {
|
||||
n_swarm_peers = swarm.len();
|
||||
n_seeders = Self::count_seeders(swarm);
|
||||
|
||||
// Insert into swarm if not already present
|
||||
// TODO: sort (?)
|
||||
|
||||
let existing_swarm_idx: Option<usize> = match swarm
|
||||
.iter_mut()
|
||||
.enumerate()
|
||||
.find(|(_, status)| status.socket_addr == request.src_addr)
|
||||
{
|
||||
Some((idx, peer)) => {
|
||||
// Peer already exists in swarm, update state
|
||||
|
||||
peer.last_active = Instant::now();
|
||||
peer.last_event = announce.event;
|
||||
peer.downloaded = announce.downloaded as u64;
|
||||
peer.uploaded = announce.uploaded as u64;
|
||||
peer.remaining = announce.left as u64;
|
||||
|
||||
Some(idx)
|
||||
}
|
||||
None => {
|
||||
// Peer is not in swarm, add it
|
||||
|
||||
swarm.push(PeerStatus {
|
||||
socket_addr: request.src_addr,
|
||||
last_event: announce.event,
|
||||
last_active: Instant::now(),
|
||||
downloaded: announce.downloaded as u64,
|
||||
uploaded: announce.uploaded as u64,
|
||||
remaining: announce.left as u64,
|
||||
});
|
||||
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(idx) = existing_swarm_idx
|
||||
&& announce.event == Event::Stopped
|
||||
{
|
||||
swarm.remove(idx);
|
||||
return None;
|
||||
}
|
||||
|
||||
// Iterate over all peers in the swarm for announce response
|
||||
|
||||
let swarm_addrs: Vec<SocketAddr> = swarm
|
||||
.iter()
|
||||
.filter_map(|status| {
|
||||
let peer_invalid: bool = (status.last_event == Event::Stopped)
|
||||
|| (status.socket_addr == request.src_addr);
|
||||
|
||||
if !peer_invalid {
|
||||
if !status.socket_addr.is_ipv4() == request.src_addr.is_ipv4() {
|
||||
None
|
||||
} else {
|
||||
Some(status.socket_addr)
|
||||
}
|
||||
} else {
|
||||
log::trace!(
|
||||
"(src: {}) {} with status \"{:?}\" deemed invalid",
|
||||
request.src_addr,
|
||||
status.socket_addr,
|
||||
status.last_event
|
||||
);
|
||||
|
||||
None
|
||||
}
|
||||
})
|
||||
.take(n_announce_want as usize)
|
||||
.collect();
|
||||
|
||||
if !swarm_addrs.is_empty() {
|
||||
Some(swarm_addrs)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let resp_common = AnnounceResponseCommon {
|
||||
transaction_id: announce.transaction_id,
|
||||
interval: DEFAULT_ANNOUNCE_INTERVAL.as_secs() as i32,
|
||||
leechers: (n_swarm_peers - n_seeders) as i32,
|
||||
seeders: n_seeders as i32,
|
||||
};
|
||||
let response = if request.src_addr.is_ipv4() {
|
||||
// IPv4
|
||||
AnnounceResponse::V4(AnnounceResponseV4 {
|
||||
common: resp_common,
|
||||
peers: swarm_addrs.map_or_else(
|
||||
|| Vec::new(),
|
||||
|v| {
|
||||
v.iter()
|
||||
.filter_map(|a| match a {
|
||||
SocketAddr::V4(v4) => Some(*v4),
|
||||
_ => None,
|
||||
})
|
||||
.collect()
|
||||
},
|
||||
),
|
||||
})
|
||||
} else {
|
||||
// IPv6
|
||||
AnnounceResponse::V6(AnnounceResponseV6 {
|
||||
common: resp_common,
|
||||
peers: swarm_addrs.map_or_else(
|
||||
|| Vec::new(),
|
||||
|v| {
|
||||
v.iter()
|
||||
.filter_map(|a| match a {
|
||||
SocketAddr::V6(v6) => Some(*v6),
|
||||
_ => None,
|
||||
})
|
||||
.collect()
|
||||
},
|
||||
),
|
||||
})
|
||||
};
|
||||
|
||||
Some(ResponseMessage {
|
||||
response: UdpResponse::Announce(response),
|
||||
dst_addr: request.src_addr,
|
||||
})
|
||||
}
|
||||
|
||||
UdpRequest::Scrape(scrape) => {
|
||||
if !self.connection_valid(scrape.connection_id, request.src_addr) {
|
||||
return None;
|
||||
}
|
||||
|
||||
let info_hashes = &self.info_hashes;
|
||||
|
||||
let scrape_stats: Vec<ScrapeStats> = scrape
|
||||
.info_hashes
|
||||
.iter()
|
||||
.filter_map(|info_hash| {
|
||||
if let Some(swarm) = info_hashes.get(info_hash) {
|
||||
let n_seeders = Self::count_seeders(swarm);
|
||||
let n_leechers = swarm.len() - n_seeders;
|
||||
|
||||
Some(ScrapeStats {
|
||||
seeders: n_seeders as i32,
|
||||
completed: 0, // TODO
|
||||
leechers: n_leechers as i32,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
Some(ResponseMessage {
|
||||
response: UdpResponse::Scrape(ScrapeResponse {
|
||||
transaction_id: scrape.transaction_id,
|
||||
stats: scrape_stats,
|
||||
}),
|
||||
dst_addr: request.src_addr,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn garbage_collect(&mut self) {
|
||||
let s1 = Instant::now();
|
||||
let n_purged_conns = self.purge_expired_connections();
|
||||
let d1 = s1.elapsed();
|
||||
|
||||
let s2 = Instant::now();
|
||||
let n_purged_swarm_peers = self.purge_expired_swarm_peers();
|
||||
let d2 = s2.elapsed();
|
||||
|
||||
log::debug!(
|
||||
"Garbage collected {} expired connections (took: {} ns)",
|
||||
n_purged_conns,
|
||||
d1.as_nanos()
|
||||
);
|
||||
log::debug!(
|
||||
"Garbage collected {} inactive peers from the swarm (took: {} ns)",
|
||||
n_purged_swarm_peers,
|
||||
d2.as_nanos()
|
||||
);
|
||||
log::debug!("Garbage collection took {} ns total", (d1 + d2).as_nanos());
|
||||
}
|
||||
|
||||
fn purge_expired_connections(&mut self) -> usize {
|
||||
let mut purged: usize = 0;
|
||||
|
||||
self.peers.retain(|_, metadata| {
|
||||
if metadata.last_active.elapsed() < CONNECTION_EXPIRE_TIME {
|
||||
true
|
||||
} else {
|
||||
purged += 1;
|
||||
false
|
||||
}
|
||||
});
|
||||
|
||||
return purged;
|
||||
}
|
||||
|
||||
fn purge_expired_swarm_peers(&mut self) -> usize {
|
||||
let mut n_purged: usize = 0;
|
||||
|
||||
self.info_hashes.iter_mut().for_each(|(_, b)| {
|
||||
b.retain(|status| {
|
||||
if status.last_active.elapsed() < DEFAULT_ANNOUNCE_INTERVAL * 5
|
||||
&& status.last_event != Event::Stopped
|
||||
{
|
||||
true
|
||||
} else {
|
||||
n_purged += 1;
|
||||
false
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
self.info_hashes.retain(|_, swarm| !swarm.is_empty());
|
||||
|
||||
return n_purged;
|
||||
}
|
||||
|
||||
fn count_seeders(swarm: &[PeerStatus]) -> usize {
|
||||
swarm
|
||||
.iter()
|
||||
.filter(|status| status.last_event == Event::Completed || status.remaining == 0)
|
||||
.count()
|
||||
}
|
||||
}
|
||||
214
src/udp_server.rs
Normal file
214
src/udp_server.rs
Normal file
@@ -0,0 +1,214 @@
|
||||
use std::net::{Ipv4Addr, SocketAddr, UdpSocket};
|
||||
|
||||
use anyhow::Result;
|
||||
use flume::{Receiver, Sender};
|
||||
use log;
|
||||
|
||||
use crate::bittorrent::info_hash::InfoHash;
|
||||
use crate::bittorrent::peer::PeerId;
|
||||
use crate::bittorrent::protocol::Event;
|
||||
use crate::bittorrent::udp::{
|
||||
AnnounceRequest, AnnounceResponse, MIN_ANNOUNCE_REQUEST_SIZE, MIN_SCRAPE_REQUEST_SIZE,
|
||||
ScrapeRequest, UdpResponse,
|
||||
};
|
||||
use crate::meteoro::ServerControlMessage;
|
||||
use crate::{
|
||||
bittorrent::{
|
||||
protocol::{Action, UDP_MAGIC},
|
||||
udp::{ConnectRequest, UdpRequest},
|
||||
},
|
||||
tracker::{RequestMessage, ResponseMessage},
|
||||
};
|
||||
|
||||
const UDP_RECV_BUF_SIZE: usize = u16::MAX as usize;
|
||||
const UDP_SEND_BUF_SIZE: usize = u16::MAX as usize; // Could also be 1500 for typical Ethernet MTU (?)
|
||||
|
||||
pub fn receive(
|
||||
server_id: usize,
|
||||
addr: SocketAddr,
|
||||
ctrl: Sender<ServerControlMessage>,
|
||||
reqs: Sender<RequestMessage>,
|
||||
) -> Result<()> {
|
||||
let socket = UdpSocket::bind(addr)?;
|
||||
|
||||
// Provide cloned socket for networking send thread
|
||||
|
||||
ctrl.send(ServerControlMessage::UdpSocketBound((
|
||||
server_id,
|
||||
socket.try_clone()?,
|
||||
)))?;
|
||||
|
||||
log::debug!("Starting UDP server {} on {}", server_id, addr);
|
||||
|
||||
let mut buf: [u8; UDP_RECV_BUF_SIZE] = [0; UDP_RECV_BUF_SIZE];
|
||||
|
||||
loop {
|
||||
match socket.recv_from(&mut buf) {
|
||||
Ok((n_bytes, src)) => {
|
||||
log::trace!("Received {} bytes from {}", n_bytes, src);
|
||||
|
||||
match try_parse_packet(&buf[..n_bytes]) {
|
||||
Some(req) => reqs.send(RequestMessage {
|
||||
request: req,
|
||||
src_addr: src,
|
||||
server_id: server_id,
|
||||
})?,
|
||||
None => (),
|
||||
};
|
||||
}
|
||||
Err(error) => {
|
||||
log::error!(
|
||||
"Server {} failed to receive on socket: {}",
|
||||
server_id,
|
||||
error
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn send(server_id: usize, socket: UdpSocket, resps: Receiver<ResponseMessage>) -> Result<()> {
|
||||
let mut buf: [u8; UDP_SEND_BUF_SIZE] = [0; UDP_SEND_BUF_SIZE];
|
||||
|
||||
loop {
|
||||
let response = resps.recv()?;
|
||||
|
||||
log::trace!(
|
||||
"Response to {} | {:?}",
|
||||
response.dst_addr,
|
||||
response.response
|
||||
);
|
||||
|
||||
let n_bytes = match response.response {
|
||||
UdpResponse::Connect(connect) => connect.write_to_buf(&mut buf),
|
||||
UdpResponse::Announce(announce) => match announce {
|
||||
AnnounceResponse::V4(v4) => v4.write_to_buf(&mut buf),
|
||||
AnnounceResponse::V6(v6) => v6.write_to_buf(&mut buf),
|
||||
},
|
||||
UdpResponse::Scrape(scrape) => scrape.write_to_buf(&mut buf),
|
||||
UdpResponse::Error(_) => 0,
|
||||
};
|
||||
|
||||
let n_sent = match socket.send_to(&buf[..n_bytes], response.dst_addr) {
|
||||
Ok(n) => n,
|
||||
Err(e) => {
|
||||
log::error!(
|
||||
"Server {} failed to send {} bytes to {}: {}",
|
||||
server_id,
|
||||
n_bytes,
|
||||
response.dst_addr,
|
||||
e
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
log::trace!(
|
||||
"Sent {} bytes to {}: {:x?}",
|
||||
n_sent,
|
||||
response.dst_addr,
|
||||
&buf[..n_bytes]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn try_parse_packet(buf: &[u8]) -> Option<UdpRequest> {
|
||||
if buf.len() < 16 {
|
||||
return None;
|
||||
}
|
||||
|
||||
let action: Action = Action::from_i32(i32::from_be_bytes(buf[8..12].try_into().ok()?))?;
|
||||
|
||||
let request = match action {
|
||||
Action::Connect => UdpRequest::Connect(try_parse_connect(&buf)?),
|
||||
Action::Announce => UdpRequest::Announce(try_parse_announce(&buf)?),
|
||||
Action::Scrape => UdpRequest::Scrape(try_parse_scrape(&buf)?),
|
||||
_ => return None,
|
||||
};
|
||||
|
||||
return Some(request);
|
||||
}
|
||||
|
||||
fn try_parse_connect(buf: &[u8]) -> Option<ConnectRequest> {
|
||||
// Buffer length is checked to be at least 16 in `try_parse_packet`
|
||||
|
||||
let protocol_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?);
|
||||
|
||||
if protocol_id != UDP_MAGIC {
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(ConnectRequest {
|
||||
protocol_id: protocol_id,
|
||||
transaction_id: i32::from_be_bytes(buf[12..16].try_into().ok()?),
|
||||
})
|
||||
}
|
||||
|
||||
fn try_parse_announce(buf: &[u8]) -> Option<AnnounceRequest> {
|
||||
if buf.len() < MIN_ANNOUNCE_REQUEST_SIZE {
|
||||
return None;
|
||||
}
|
||||
|
||||
let connection_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?);
|
||||
let transaction_id: i32 = i32::from_be_bytes(buf[12..16].try_into().ok()?);
|
||||
let info_hash: InfoHash = InfoHash {
|
||||
bytes: buf[16..36].try_into().ok()?,
|
||||
};
|
||||
let peer_id: PeerId = PeerId {
|
||||
bytes: buf[36..56].try_into().unwrap_or_default(),
|
||||
};
|
||||
let downloaded: i64 = i64::from_be_bytes(buf[56..64].try_into().ok()?);
|
||||
let left: i64 = i64::from_be_bytes(buf[64..72].try_into().ok()?);
|
||||
let uploaded: i64 = i64::from_be_bytes(buf[72..80].try_into().ok()?);
|
||||
let event: Event = Event::from_i32(i32::from_be_bytes(buf[80..84].try_into().ok()?))?;
|
||||
let ip_addr: Option<Ipv4Addr> = if buf[84..88].iter().all(|b| *b == 0x00) {
|
||||
None
|
||||
} else {
|
||||
Some(Ipv4Addr::from_bits(u32::from_be_bytes(
|
||||
buf[84..88].try_into().ok()?,
|
||||
)))
|
||||
};
|
||||
let key: u32 = u32::from_be_bytes(buf[88..92].try_into().ok()?);
|
||||
let num_want: Option<u32> = {
|
||||
let want = i32::from_be_bytes(buf[92..96].try_into().ok()?);
|
||||
if want < 0 { None } else { Some(want as u32) }
|
||||
};
|
||||
let port: u16 = u16::from_be_bytes(buf[96..98].try_into().ok()?);
|
||||
|
||||
Some(AnnounceRequest {
|
||||
connection_id: connection_id,
|
||||
transaction_id: transaction_id,
|
||||
info_hash: info_hash,
|
||||
peer_id: peer_id,
|
||||
downloaded: downloaded,
|
||||
left: left,
|
||||
uploaded: uploaded,
|
||||
event: event,
|
||||
ipv4_address: ip_addr,
|
||||
key: key,
|
||||
num_want: num_want,
|
||||
port: port,
|
||||
})
|
||||
}
|
||||
|
||||
fn try_parse_scrape(buf: &[u8]) -> Option<ScrapeRequest> {
|
||||
if buf.len() < MIN_SCRAPE_REQUEST_SIZE {
|
||||
return None;
|
||||
}
|
||||
|
||||
let connection_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?);
|
||||
let transaction_id: i32 = i32::from_be_bytes(buf[12..16].try_into().ok()?);
|
||||
let info_hashes: Vec<InfoHash> = buf[16..]
|
||||
.chunks_exact(20)
|
||||
.map(|b| InfoHash {
|
||||
bytes: b.try_into().unwrap(), // unwrap: `chunks_exact` guarantees the size
|
||||
})
|
||||
.collect();
|
||||
|
||||
Some(ScrapeRequest {
|
||||
connection_id: connection_id,
|
||||
transaction_id: transaction_id,
|
||||
info_hashes: info_hashes,
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user