meteoro/src/udp_server.rs

151 lines
4.3 KiB
Rust

use std::net::{Ipv4Addr, SocketAddr, UdpSocket};
use anyhow::Result;
use flume::{Receiver, Sender};
use log;
use crate::bittorrent::info_hash::InfoHash;
use crate::bittorrent::peer::PeerId;
use crate::bittorrent::protocol::Event;
use crate::bittorrent::udp::{
AnnounceRequest, AnnounceResponse, MIN_ANNOUNCE_REQUEST_SIZE, MIN_SCRAPE_REQUEST_SIZE,
ScrapeRequest, UdpResponse,
};
use crate::meteoro::ServerControlMessage;
use crate::{
bittorrent::{
protocol::{Action, UDP_MAGIC},
udp::{ConnectRequest, UdpRequest},
},
tracker::{RequestMessage, ResponseMessage},
};
const UDP_RECV_BUF_SIZE: usize = u16::MAX as usize;
const UDP_SEND_BUF_SIZE: usize = u16::MAX as usize; // Could also be 1500 for typical Ethernet MTU (?)
pub fn receive(
server_id: usize,
addr: SocketAddr,
ctrl: Sender<ServerControlMessage>,
reqs: Sender<RequestMessage>,
) -> Result<()> {
let socket = UdpSocket::bind(addr)?;
// Provide cloned socket for networking send thread
ctrl.send(ServerControlMessage::UdpSocketBound((
server_id,
socket.try_clone()?,
)))?;
log::debug!("Starting UDP server {} on {}", server_id, addr);
let mut buf: [u8; UDP_RECV_BUF_SIZE] = [0; UDP_RECV_BUF_SIZE];
loop {
match socket.recv_from(&mut buf) {
Ok((n_bytes, src)) => {
log::trace!("Received {} bytes from {}", n_bytes, src);
match try_parse_packet(&buf[..n_bytes]) {
Some(req) => reqs.send(RequestMessage {
request: req,
src_addr: src,
server_id: server_id,
})?,
None => (),
};
}
Err(error) => {
log::error!(
"Server {} failed to receive on socket: {}",
server_id,
error
);
continue;
}
}
}
}
pub fn send(server_id: usize, socket: UdpSocket, resps: Receiver<ResponseMessage>) -> Result<()> {
let mut buf: [u8; UDP_SEND_BUF_SIZE] = [0; UDP_SEND_BUF_SIZE];
loop {
let response = resps.recv()?;
log::trace!(
"Response to {} | {:?}",
response.dst_addr,
response.response
);
let n_bytes = match response.response {
UdpResponse::Connect(connect) => connect.write_to_buf(&mut buf),
UdpResponse::Announce(announce) => match announce {
AnnounceResponse::V4(v4) => v4.write_to_buf(&mut buf),
AnnounceResponse::V6(v6) => v6.write_to_buf(&mut buf),
},
UdpResponse::Scrape(scrape) => scrape.write_to_buf(&mut buf),
UdpResponse::Error(_) => 0,
};
let n_sent = match socket.send_to(&buf[..n_bytes], response.dst_addr) {
Ok(n) => n,
Err(e) => {
log::error!(
"Server {} failed to send {} bytes to {}: {}",
server_id,
n_bytes,
response.dst_addr,
e
);
continue;
}
};
log::trace!(
"Sent {} bytes to {}: {:x?}",
n_sent,
response.dst_addr,
&buf[..n_bytes]
);
}
}
fn try_parse_packet(buf: &[u8]) -> Option<UdpRequest> {
if buf.len() < 16 {
return None;
}
let action: Action = Action::from_i32(i32::from_be_bytes(buf[8..12].try_into().ok()?))?;
let request = match action {
Action::Connect => UdpRequest::Connect(try_parse_connect(&buf)?),
Action::Announce => UdpRequest::Announce(try_parse_announce(&buf)?),
Action::Scrape => UdpRequest::Scrape(try_parse_scrape(&buf)?),
_ => return None,
};
return Some(request);
}
fn try_parse_connect(buf: &[u8]) -> Option<ConnectRequest> {
let conn = ConnectRequest::from_bytes(buf)?;
// Disregard the request immediately if `protocol_id` is not the valid magic number
if conn.protocol_id != UDP_MAGIC {
return None;
}
Some(conn)
}
fn try_parse_announce(buf: &[u8]) -> Option<AnnounceRequest> {
AnnounceRequest::from_bytes(buf)
}
fn try_parse_scrape(buf: &[u8]) -> Option<ScrapeRequest> {
ScrapeRequest::from_bytes(buf)
}