commit 478ce597a7c53ff247d4450040b19d380f7c3b17 Author: Adam Macdonald Date: Sun Aug 3 20:10:54 2025 +0100 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bcbfeee --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.vscode/ +target/ diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..c287a6a --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,584 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "anyhow" +version = "1.0.98" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e16d2d3311acee920a9eb8d33b8cbc1787ce4a264e85f964c2404b969bdcd487" + +[[package]] +name = "autocfg" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" + +[[package]] +name = "bitflags" +version = "2.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" + +[[package]] +name = "bumpalo" +version = "3.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "cfg-if" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" + +[[package]] +name = "deranged" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c9e6a11ca8224451684bc0d7d5a7adbf8f2fd6887261a1cfc3c0432f9d4068e" +dependencies = [ + "powerfmt", +] + +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "getrandom" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +dependencies = [ + "cfg-if", + "js-sys", + "libc", + "wasi 0.11.1+wasi-snapshot-preview1", + "wasm-bindgen", +] + +[[package]] +name = "getrandom" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", +] + +[[package]] +name = "hash32" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47d60b12902ba28e2730cd37e95b8c9223af2808df9e902d4df49588d1470606" +dependencies = [ + "byteorder", +] + +[[package]] +name = "heapless" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfb9eb618601c89945a70e254898da93b13be0388091d42117462b265bb3fad" +dependencies = [ + "hash32", + "stable_deref_trait", +] + +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + +[[package]] +name = "itoa" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + +[[package]] +name = "js-sys" +version = "0.3.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + +[[package]] +name = "libc" +version = "0.2.174" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1171693293099992e19cddea4e8b849964e9846f4acee11b3948bcc337be8776" + +[[package]] +name = "lock_api" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96936507f153605bddfcda068dd804796c84324ed2510809e5b2a624c81da765" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" + +[[package]] +name = "meteoro" +version = "0.1.0" +dependencies = [ + "anyhow", + "flume", + "heapless", + "log", + "rand", + "simplelog", + "threadpool", +] + +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.16", +] + +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + +[[package]] +name = "num_cpus" +version = "1.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "num_threads" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9" +dependencies = [ + "libc", +] + +[[package]] +name = "once_cell" +version = "1.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" + +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + +[[package]] +name = "proc-macro2" +version = "1.0.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "r-efi" +version = "5.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" + +[[package]] +name = "rand" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6db2770f06117d490610c7488547d543617b21bfa07796d7a12f6f1bd53850d1" +dependencies = [ + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.3", +] + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "serde" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "simplelog" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16257adbfaef1ee58b1363bdc0664c9b8e1e30aed86049635fb5f147d065a9c0" +dependencies = [ + "log", + "termcolor", + "time", +] + +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + +[[package]] +name = "syn" +version = "2.0.104" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + +[[package]] +name = "time" +version = "0.3.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" +dependencies = [ + "deranged", + "itoa", + "libc", + "num-conv", + "num_threads", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9e9a38711f559d9e3ce1cdb06dd7c5b8ea546bc90052da6d06bb76da74bb07c" + +[[package]] +name = "time-macros" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3526739392ec93fd8b359c8e98514cb3e8e021beb4e5f597b00a0221f8ed8a49" +dependencies = [ + "num-conv", + "time-core", +] + +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + +[[package]] +name = "wasi" +version = "0.11.1+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" + +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + +[[package]] +name = "wasm-bindgen" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" +dependencies = [ + "cfg-if", + "once_cell", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" +dependencies = [ + "bumpalo", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "winapi-util" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "windows-sys" +version = "0.59.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags", +] + +[[package]] +name = "zerocopy" +version = "0.8.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1039dd0d3c310cf05de012d8a39ff557cb0d23087fd44cad61df08fc31907a2f" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ecf5b4cc5364572d7f4c329661bcc82724222973f2cab6f050a4e5c22f75181" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..70e42fd --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "meteoro" +description = "A UDP BitTorrent tracker" +authors = ["Adam Macdonald "] +version = "0.1.0" +edition = "2024" + +[dependencies] +anyhow = "1.0.98" +flume = { version = "0.11.1", features = ["eventual-fairness", "select"] } +heapless = "0.8.0" +# log = { version = "...", features = ["release_max_level_info"] } +log = "0.4.27" +rand = "0.9.2" +simplelog = "0.12.2" +threadpool = "1.8.1" diff --git a/README.md b/README.md new file mode 100644 index 0000000..db99368 --- /dev/null +++ b/README.md @@ -0,0 +1,13 @@ +# meteoro + +A BitTorrent tracker, written in Rust, implementing a BEP 15-compliant UDP tracker server. + +## Building + +`$ cargo build` + +## Usage + +After building the binary as described above, one can run the executable with the `--help` flag to see a listing of options the server software provides. + +The server software currently binds to the `localhost`, both IPv4 and IPv6, when run with no arguments. One can supply the `-a IPV4_ADDRESS:PORT` option or `-A IPV6_ADDRESS:PORT` option to specify interfaces for the server to bind to. Example: `$ meteoro -a 192.168.1.23:6969` to bind on 192.168.1.23, port 6969. diff --git a/TODO.md b/TODO.md new file mode 100644 index 0000000..8d4e913 --- /dev/null +++ b/TODO.md @@ -0,0 +1,3 @@ +# The Big To-Do List + +- **Metrics/management API**, for returning statistics and modifying configurations at runtime using HTTP & JSON diff --git a/src/bittorrent/info_hash.rs b/src/bittorrent/info_hash.rs new file mode 100644 index 0000000..e0b9918 --- /dev/null +++ b/src/bittorrent/info_hash.rs @@ -0,0 +1,22 @@ +use std::fmt; +use std::fmt::Write; + +pub const INFO_HASH_SIZE: usize = 20; + +#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)] +pub struct InfoHash { + pub bytes: [u8; INFO_HASH_SIZE], +} + +impl fmt::Display for InfoHash { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + const INFO_HASH_STR_LEN: usize = INFO_HASH_SIZE * 2; + let mut hex_string: heapless::String = heapless::String::new(); + + for b in self.bytes { + let _ = write!(hex_string, "{:02x}", b); + } + + write!(f, "{}", hex_string) + } +} diff --git a/src/bittorrent/mod.rs b/src/bittorrent/mod.rs new file mode 100644 index 0000000..6651b4a --- /dev/null +++ b/src/bittorrent/mod.rs @@ -0,0 +1,4 @@ +pub mod info_hash; +pub mod peer; +pub mod protocol; +pub mod udp; diff --git a/src/bittorrent/peer.rs b/src/bittorrent/peer.rs new file mode 100644 index 0000000..b1dcf3c --- /dev/null +++ b/src/bittorrent/peer.rs @@ -0,0 +1,20 @@ +use std::fmt; + +const MAX_PEER_ID_LENGTH: usize = 20; + +#[derive(Debug)] +pub struct PeerId { + pub bytes: [u8; MAX_PEER_ID_LENGTH], +} + +impl PeerId { + pub fn to_string_lossy(&self) -> String { + String::from_utf8_lossy(&self.bytes).into() + } +} + +impl fmt::Display for PeerId { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.to_string_lossy()) + } +} diff --git a/src/bittorrent/protocol.rs b/src/bittorrent/protocol.rs new file mode 100644 index 0000000..9258535 --- /dev/null +++ b/src/bittorrent/protocol.rs @@ -0,0 +1,62 @@ +/// BitTorrent's UDP magic `connection_id` constant +pub const UDP_MAGIC: i64 = 0x41727101980; + +#[repr(i32)] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum Action { + Connect = 0, + Announce = 1, + Scrape = 2, + Error = 3, +} + +impl Action { + pub fn from_i32(val: i32) -> Option { + match val { + 0 => Some(Action::Connect), + 1 => Some(Action::Announce), + 2 => Some(Action::Scrape), + 3 => Some(Action::Error), + _ => None, + } + } + + pub fn to_i32(&self) -> i32 { + match self { + Action::Connect => 0, + Action::Announce => 1, + Action::Scrape => 2, + Action::Error => 3, + } + } +} + +#[repr(i32)] +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum Event { + None = 0, + Completed = 1, + Started = 2, + Stopped = 3, +} + +impl Event { + pub fn from_i32(val: i32) -> Option { + match val { + 0 => Some(Event::None), + 1 => Some(Event::Completed), + 2 => Some(Event::Started), + 3 => Some(Event::Stopped), + _ => None, + } + } + + pub fn to_i32(&self) -> i32 { + match self { + Event::None => 0, + Event::Completed => 1, + Event::Started => 2, + Event::Stopped => 3, + } + } +} diff --git a/src/bittorrent/udp.rs b/src/bittorrent/udp.rs new file mode 100644 index 0000000..ff463a9 --- /dev/null +++ b/src/bittorrent/udp.rs @@ -0,0 +1,215 @@ +use std::{ + io::{Cursor, Write}, + net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}, +}; + +use crate::bittorrent::{ + info_hash::InfoHash, + peer::PeerId, + protocol::{Action, Event}, +}; + +pub const MIN_ANNOUNCE_REQUEST_SIZE: usize = 98; +pub const MIN_SCRAPE_REQUEST_SIZE: usize = 36; +pub const MIN_CONNECTION_RESPONSE_SIZE: usize = 16; +pub const MIN_ANNOUNCE_RESPONSE_SIZE: usize = 20; +pub const MIN_SCRAPE_RESPONSE_SIZE: usize = 8; +pub const SCRAPE_RESPONSE_ENTRY_SIZE: usize = 12; + +pub const IPV4_SIZE: usize = Ipv4Addr::BITS as usize / 8; +pub const IPV6_SIZE: usize = Ipv6Addr::BITS as usize / 8; +pub const PORT_SIZE: usize = size_of::(); +pub const IPV4_ADDR_PAIR_SIZE: usize = IPV4_SIZE + PORT_SIZE; +pub const IPV6_ADDR_PAIR_SIZE: usize = IPV6_SIZE + PORT_SIZE; + +#[derive(Debug)] +pub struct ConnectRequest { + pub protocol_id: i64, + pub transaction_id: i32, +} + +#[derive(Debug)] +pub struct ConnectResponse { + pub transaction_id: i32, + pub connection_id: i64, +} + +impl ConnectResponse { + pub fn write_to_buf(&self, buf: &mut [u8]) -> usize { + let mut c = Cursor::new(buf); + let mut written: usize = 0; + + const ACTION: i32 = Action::Connect as i32; + + written += c.write(&ACTION.to_be_bytes()).unwrap(); + written += c.write(&self.transaction_id.to_be_bytes()).unwrap(); + written += c.write(&self.connection_id.to_be_bytes()).unwrap(); + + return written; + } +} + +#[derive(Debug)] +pub struct AnnounceRequest { + pub connection_id: i64, + pub transaction_id: i32, + /// Info hash of the torrent + pub info_hash: InfoHash, + /// Peer's ID + pub peer_id: PeerId, + /// Number of bytes peer has downloaded this session + pub downloaded: i64, + /// Number of bytes peer has remaining to complete the download + pub left: i64, + /// Number of bytes peer has uploaded this session + pub uploaded: i64, + /// One of 4 announce events. `Event::None` is used to update presence but not change peer's state. + pub event: Event, + /// Peer's desired IPv4 address to be sent the announce response on. `None` implies that the packet's source address should be used. + pub ipv4_address: Option, + /// Client's unique key for tracking a single client over multiple connections (sockets) + pub key: u32, + /// Maximum number of info hashes the client is requesting. `None` implies a tracker-defined default value should be used. + pub num_want: Option, + /// Peer's desired port to be the announce response on. If `ipv4_address` is `Some` then this should be used. + pub port: u16, +} + +#[derive(Debug)] +pub enum AnnounceResponse { + V4(AnnounceResponseV4), + V6(AnnounceResponseV6), +} + +#[derive(Debug)] +pub struct AnnounceResponseCommon { + pub transaction_id: i32, + /// Number of seconds peer should wait before re-announcing + pub interval: i32, + /// Number of peers still downloading + pub leechers: i32, + /// Number of peers who have completed the download and are seeding + pub seeders: i32, +} + +#[derive(Debug)] +pub struct AnnounceResponseV4 { + pub common: AnnounceResponseCommon, + pub peers: Vec, +} + +#[derive(Debug)] +pub struct AnnounceResponseV6 { + pub common: AnnounceResponseCommon, + pub peers: Vec, +} + +impl AnnounceResponseCommon { + pub fn write_to_buf(&self, buf: &mut [u8]) -> usize { + const ACTION: i32 = Action::Announce as i32; + buf[0..4].copy_from_slice(&ACTION.to_be_bytes()); + buf[4..8].copy_from_slice(&self.transaction_id.to_be_bytes()); + buf[8..12].copy_from_slice(&self.interval.to_be_bytes()); + buf[12..16].copy_from_slice(&self.leechers.to_be_bytes()); + buf[16..20].copy_from_slice(&self.seeders.to_be_bytes()); + + return MIN_ANNOUNCE_RESPONSE_SIZE; + } +} + +impl AnnounceResponseV4 { + pub fn write_to_buf(&self, buf: &mut [u8]) -> usize { + let written = self.common.write_to_buf(buf); + + for (offset, addr) in (written..buf.len()) + .step_by(IPV4_ADDR_PAIR_SIZE) + .zip(&self.peers) + { + buf[offset..offset + IPV4_SIZE].copy_from_slice(&addr.ip().octets()); + buf[offset + IPV4_SIZE..offset + IPV4_ADDR_PAIR_SIZE] + .copy_from_slice(&addr.port().to_be_bytes()); + } + + return written + self.peers.len() * IPV4_ADDR_PAIR_SIZE; + } +} + +impl AnnounceResponseV6 { + pub fn write_to_buf(&self, buf: &mut [u8]) -> usize { + let written = self.common.write_to_buf(buf); + + for (offset, addr) in (written..buf.len()) + .step_by(IPV6_ADDR_PAIR_SIZE) + .zip(&self.peers) + { + buf[offset..offset + IPV6_SIZE].copy_from_slice(&addr.ip().octets()); + buf[offset + IPV6_SIZE..offset + IPV6_ADDR_PAIR_SIZE] + .copy_from_slice(&addr.port().to_be_bytes()); + } + + return written + self.peers.len() * IPV6_ADDR_PAIR_SIZE; + } +} + +#[derive(Debug)] +pub struct ScrapeRequest { + pub connection_id: i64, + pub transaction_id: i32, + pub info_hashes: Vec, +} + +#[derive(Debug, Default)] +pub struct ScrapeStats { + /// Number of connected peers who have completed the download and are seeding + pub seeders: i32, + /// Number of peers who have ever completed the download + pub completed: i32, + /// Number of connected peers who have not completed the download and are seeding + pub leechers: i32, +} + +#[derive(Debug)] +pub struct ScrapeResponse { + pub transaction_id: i32, + pub stats: Vec, +} + +impl ScrapeResponse { + pub fn write_to_buf(&self, buf: &mut [u8]) -> usize { + let mut c = Cursor::new(buf); + let mut written: usize = 0; + + const ACTION: i32 = Action::Scrape as i32; + written += c.write(&ACTION.to_be_bytes()).unwrap(); + written += c.write(&self.transaction_id.to_be_bytes()).unwrap(); + + for s in &self.stats { + written += c.write(&s.seeders.to_be_bytes()).unwrap(); + written += c.write(&s.completed.to_be_bytes()).unwrap(); + written += c.write(&s.leechers.to_be_bytes()).unwrap(); + } + + return written; + } +} + +#[derive(Debug)] +pub struct ErrorResponse { + pub transaction_id: i32, + pub message: String, +} + +#[derive(Debug)] +pub enum UdpRequest { + Connect(ConnectRequest), + Announce(AnnounceRequest), + Scrape(ScrapeRequest), +} + +#[derive(Debug)] +pub enum UdpResponse { + Connect(ConnectResponse), + Announce(AnnounceResponse), + Scrape(ScrapeResponse), + Error(ErrorResponse), +} diff --git a/src/constants.rs b/src/constants.rs new file mode 100644 index 0000000..955dc10 --- /dev/null +++ b/src/constants.rs @@ -0,0 +1,15 @@ +pub const METEORO_HELP_TEXT: &'static str = "Usage:\tmeteoro [OPTIONS] + +OPTIONS: + +-a IPV4_ADDRESS Provide an IPv4 address to bind to + (example: -a 192.168.1.2:6969) + +-A IPV6_ADDRESS Provide an IPv6 address to bind to + (example: -A [::1]:6970) + +--debug Enable debug logging +--trace Enable trace logging (more verbose debug) + +-h | --help Show this help text +-v | --version Print the program's build & version information"; diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..d950a2f --- /dev/null +++ b/src/main.rs @@ -0,0 +1,97 @@ +use std::{ + env, + net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, + process::ExitCode, +}; + +use log::LevelFilter; +use simplelog::{ColorChoice, ConfigBuilder, TermLogger, TerminalMode}; + +use crate::meteoro::Meteoro; + +mod bittorrent; +mod constants; +mod meteoro; +mod tracker; +mod udp_server; + +fn main() -> ExitCode { + let args: Vec = env::args().collect(); + + if args.contains(&String::from("--help")) || args.contains(&String::from("-h")) { + println!("{}", constants::METEORO_HELP_TEXT); + return ExitCode::SUCCESS; + } + + if args.contains(&String::from("--version")) || args.contains(&String::from("-v")) { + println!("{} {}", env!("CARGO_PKG_NAME"), env!("CARGO_PKG_VERSION")); + return ExitCode::SUCCESS; + } + + #[cfg(debug_assertions)] + let mut log_level: LevelFilter = LevelFilter::Trace; + #[cfg(not(debug_assertions))] + let mut log_level: LevelFilter = LevelFilter::Info; + + if args.contains(&String::from("--debug")) { + log_level = LevelFilter::Debug; + } else if args.contains(&String::from("--trace")) { + log_level = LevelFilter::Trace; + } + + let logger_config = ConfigBuilder::new() + .set_target_level(LevelFilter::Trace) + .set_location_level(LevelFilter::Off) + .set_thread_level(LevelFilter::Off) + .set_time_level(LevelFilter::Info) + .build(); + + TermLogger::init( + log_level, + logger_config, + TerminalMode::Mixed, + ColorChoice::Auto, + ) + .expect("failed to initialise terminal logging"); + + let ipv4_flag_idxs: Vec = args + .iter() + .enumerate() + .filter_map(|(idx, s)| if s == "-a" { Some(idx) } else { None }) + .collect(); + let ipv6_flag_idxs: Vec = args + .iter() + .enumerate() + .filter_map(|(idx, s)| if s == "-A" { Some(idx) } else { None }) + .collect(); + let mut ip_addrs: Vec = ipv4_flag_idxs + .iter() + .filter_map(|idx| { + Some(SocketAddr::V4( + args.get(*idx + 1)?.parse::().ok()?, + )) + }) + .collect(); + ip_addrs.extend(ipv6_flag_idxs.iter().filter_map(|idx| { + Some(SocketAddr::V6( + args.get(*idx + 1)?.parse::().ok()?, + )) + })); + + if ip_addrs.is_empty() { + ip_addrs.push(SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 6969))); + ip_addrs.push(SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::LOCALHOST, + 6969, + 0, + 0, + ))); + } + + // bufbuf():codingcodingbuf/ + + return match Meteoro::start(&ip_addrs) { + Ok(_) => ExitCode::SUCCESS, + Err(_) => ExitCode::FAILURE, + }; +} diff --git a/src/meteoro.rs b/src/meteoro.rs new file mode 100644 index 0000000..21e2511 --- /dev/null +++ b/src/meteoro.rs @@ -0,0 +1,111 @@ +use std::net::{SocketAddr, UdpSocket}; +use std::thread; + +use anyhow::Result; +use flume::Sender; +use threadpool::ThreadPool; + +use crate::tracker::{RequestMessage, ResponseMessage, Tracker}; +use crate::udp_server; + +/// Instance of the meteoro BitTorrent tracker +/// Manages the tracker, networking & API +pub struct Meteoro {} + +#[repr(u8)] +pub enum TrackerControlMessage { + /// Signal that the receiver should exit their thread of execution + Exit = 0, +} + +#[repr(u8)] +pub enum ServerControlMessage { + /// Server with `(id, _)` has bound to a socket, providing a clone of the socket + UdpSocketBound((usize, UdpSocket)), +} + +impl Meteoro { + pub fn start(addrs: &[SocketAddr]) -> Result<()> { + // Create channels + + let (ctrl_send, ctrl_recv) = flume::unbounded::(); + let (srv_ctrl_send, srv_ctrl_recv) = flume::unbounded::(); + let (reqs_send, reqs_recv) = flume::unbounded::(); + let resp_chans: Vec<_> = (0..addrs.len()) + .map(|_| flume::unbounded::()) + .collect(); + + // Start networking threads + + { + // Each receiving networking thread requires: + // - a socket address to bind to + // - a control message channel (send), for providing the bound socket + // - a request message channel (send), for providing requests to the tracker + + // Receive threads + + let recv_threads = ThreadPool::new(addrs.len()); + + for (id, addr) in addrs.iter().enumerate() { + let ctrl = srv_ctrl_send.clone(); + let reqs = reqs_send.clone(); + let addr = *addr; + + recv_threads.execute(move || { + let _ = udp_server::receive(id, addr, ctrl, reqs); + }); + } + + // Each sending networking thread requires: + // - a socket to send responses on + // - a response message channel (receive), for receiving handled requests + + // Send threads + + let send_threads = ThreadPool::new(addrs.len()); + + for _ in 0..addrs.len() { + match srv_ctrl_recv.recv()? { + ServerControlMessage::UdpSocketBound((id, socket)) => { + let (_, resps) = resp_chans[id].clone(); + + send_threads.execute(move || { + let _ = udp_server::send(id, socket, resps); + }); + } + _ => (), + }; + } + } + + // Start tracker thread + + let tracker_thread = { + // Tracker requires: + // - a request message channel (recv), for receiving requests from the server(s) + // - corresponding response channels (send), for providing handled requests + + let resps: Vec> = resp_chans + .iter() + .map(|(resp_chan, _)| resp_chan.clone()) + .collect(); + + thread::spawn(move || { + let mut tracker = Tracker::new(); + let _ = tracker.start(reqs_recv, resps.as_slice(), ctrl_recv); + }) + }; + + // Start API thread + + { + // API thread requires + // - a control message channel (recv) + } + + let _ = tracker_thread.join(); + + return Ok(()); + } +} diff --git a/src/tracker.rs b/src/tracker.rs new file mode 100644 index 0000000..3f3bbc9 --- /dev/null +++ b/src/tracker.rs @@ -0,0 +1,424 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::{Duration, Instant}; + +use anyhow::Result; +use flume::{Receiver, Selector, Sender}; +use rand::Rng; +use rand::rngs::ThreadRng; + +use crate::bittorrent::info_hash::InfoHash; +use crate::bittorrent::protocol::Event; +use crate::bittorrent::udp::{ + AnnounceResponse, AnnounceResponseCommon, AnnounceResponseV4, AnnounceResponseV6, + ConnectResponse, ScrapeResponse, ScrapeStats, UdpRequest, UdpResponse, +}; +use crate::meteoro::TrackerControlMessage; + +pub struct RequestMessage { + pub(crate) server_id: usize, + pub(crate) src_addr: SocketAddr, + pub(crate) request: UdpRequest, +} + +pub struct ResponseMessage { + pub(crate) response: UdpResponse, + pub(crate) dst_addr: SocketAddr, +} + +struct PeerMetadata { + socket_addr: SocketAddr, + last_active: Instant, +} + +struct PeerStatus { + socket_addr: SocketAddr, + last_event: Event, + last_active: Instant, + downloaded: u64, + uploaded: u64, + remaining: u64, +} + +pub const GARBAGE_COLLECTION_INTERVAL: Duration = Duration::from_secs(20); + +pub const CONNECTION_EXPIRE_TIME: Duration = Duration::from_secs(90); +pub const DEFAULT_ANNOUNCE_INTERVAL: Duration = Duration::from_secs(60); +pub const DEFAULT_ANNOUNCE_WANT: usize = 80; + +type ConnectionIdMap = HashMap; +type InfoHashMap = HashMap>; + +pub struct Tracker { + peers: ConnectionIdMap, + info_hashes: InfoHashMap, + rng: ThreadRng, +} + +impl Tracker { + pub fn new() -> Self { + Self { + peers: ConnectionIdMap::new(), + info_hashes: InfoHashMap::new(), + rng: rand::rng(), + } + } + + pub fn start( + &mut self, + reqs: Receiver, + resps: &[Sender], + ctrl: Receiver, + ) -> Result<()> { + let mut next_gc = Instant::now() + GARBAGE_COLLECTION_INTERVAL; + let should_exit = AtomicBool::new(false); + + // Wait for messages from the networking threads / main application + + while !should_exit.load(Ordering::Relaxed) { + let _ = Selector::new() + .recv(&reqs, |request| { + // Tracker request + + let request = request.unwrap(); + + log::trace!("Request from {} | {:?}", request.src_addr, request.request); + + match self.handle_request(&request) { + Some(response) => { + resps[request.server_id].send(response).unwrap(); + } + None => (), + }; + }) + .recv(&ctrl, |msg| { + // Control message + + let msg = msg.unwrap(); + + match msg { + TrackerControlMessage::Exit => { + should_exit.store(true, Ordering::Relaxed); + } + }; + }) + .wait(); + + // Garbage collection + + if Instant::now() > next_gc { + self.garbage_collect(); + + next_gc = Instant::now() + + GARBAGE_COLLECTION_INTERVAL + + Duration::from_millis(self.rng.random::() % 5000); + } + } + + return Ok(()); + } + + fn connection_valid(&self, connection_id: i64, src_addr: SocketAddr) -> bool { + let peers = &self.peers; + + return match peers.get(&connection_id) { + Some(metadata) => metadata.socket_addr == src_addr, + None => false, + }; + } + + fn handle_request(&mut self, request: &RequestMessage) -> Option { + match &request.request { + UdpRequest::Connect(connect) => { + let new_id: i64 = self.rng.random(); + + let peers = &mut self.peers; + + peers.insert( + new_id, + PeerMetadata { + socket_addr: request.src_addr, + last_active: Instant::now(), + }, + ); + + Some(ResponseMessage { + response: UdpResponse::Connect(ConnectResponse { + transaction_id: connect.transaction_id, + connection_id: new_id, + }), + dst_addr: request.src_addr, + }) + } + + UdpRequest::Announce(announce) => { + if !self.connection_valid(announce.connection_id, request.src_addr) { + return None; + } + + // Ensure we honour the desired number of peers, within our boundaries + + let n_announce_want: usize = if let Some(n) = announce.num_want { + if (n as usize) < DEFAULT_ANNOUNCE_WANT { + n as usize + } else { + DEFAULT_ANNOUNCE_WANT + } + } else { + DEFAULT_ANNOUNCE_WANT + }; + + let mut n_swarm_peers: usize = 0; + let mut n_seeders: usize = 0; + let info_hashes = &mut self.info_hashes; + + let swarm_addrs = match info_hashes.get_mut(&announce.info_hash) { + None => { + // Info hash isn't currently tracked + // No relevant peers in the swarm + + info_hashes.insert( + announce.info_hash, + vec![PeerStatus { + socket_addr: request.src_addr, + last_event: announce.event, + last_active: Instant::now(), + downloaded: announce.downloaded as u64, + uploaded: announce.uploaded as u64, + remaining: announce.left as u64, + }], + ); + + None + } + Some(swarm) => { + n_swarm_peers = swarm.len(); + n_seeders = Self::count_seeders(swarm); + + // Insert into swarm if not already present + // TODO: sort (?) + + let existing_swarm_idx: Option = match swarm + .iter_mut() + .enumerate() + .find(|(_, status)| status.socket_addr == request.src_addr) + { + Some((idx, peer)) => { + // Peer already exists in swarm, update state + + peer.last_active = Instant::now(); + peer.last_event = announce.event; + peer.downloaded = announce.downloaded as u64; + peer.uploaded = announce.uploaded as u64; + peer.remaining = announce.left as u64; + + Some(idx) + } + None => { + // Peer is not in swarm, add it + + swarm.push(PeerStatus { + socket_addr: request.src_addr, + last_event: announce.event, + last_active: Instant::now(), + downloaded: announce.downloaded as u64, + uploaded: announce.uploaded as u64, + remaining: announce.left as u64, + }); + + None + } + }; + + if let Some(idx) = existing_swarm_idx + && announce.event == Event::Stopped + { + swarm.remove(idx); + return None; + } + + // Iterate over all peers in the swarm for announce response + + let swarm_addrs: Vec = swarm + .iter() + .filter_map(|status| { + let peer_invalid: bool = (status.last_event == Event::Stopped) + || (status.socket_addr == request.src_addr); + + if !peer_invalid { + if !status.socket_addr.is_ipv4() == request.src_addr.is_ipv4() { + None + } else { + Some(status.socket_addr) + } + } else { + log::trace!( + "(src: {}) {} with status \"{:?}\" deemed invalid", + request.src_addr, + status.socket_addr, + status.last_event + ); + + None + } + }) + .take(n_announce_want as usize) + .collect(); + + if !swarm_addrs.is_empty() { + Some(swarm_addrs) + } else { + None + } + } + }; + + let resp_common = AnnounceResponseCommon { + transaction_id: announce.transaction_id, + interval: DEFAULT_ANNOUNCE_INTERVAL.as_secs() as i32, + leechers: (n_swarm_peers - n_seeders) as i32, + seeders: n_seeders as i32, + }; + let response = if request.src_addr.is_ipv4() { + // IPv4 + AnnounceResponse::V4(AnnounceResponseV4 { + common: resp_common, + peers: swarm_addrs.map_or_else( + || Vec::new(), + |v| { + v.iter() + .filter_map(|a| match a { + SocketAddr::V4(v4) => Some(*v4), + _ => None, + }) + .collect() + }, + ), + }) + } else { + // IPv6 + AnnounceResponse::V6(AnnounceResponseV6 { + common: resp_common, + peers: swarm_addrs.map_or_else( + || Vec::new(), + |v| { + v.iter() + .filter_map(|a| match a { + SocketAddr::V6(v6) => Some(*v6), + _ => None, + }) + .collect() + }, + ), + }) + }; + + Some(ResponseMessage { + response: UdpResponse::Announce(response), + dst_addr: request.src_addr, + }) + } + + UdpRequest::Scrape(scrape) => { + if !self.connection_valid(scrape.connection_id, request.src_addr) { + return None; + } + + let info_hashes = &self.info_hashes; + + let scrape_stats: Vec = scrape + .info_hashes + .iter() + .filter_map(|info_hash| { + if let Some(swarm) = info_hashes.get(info_hash) { + let n_seeders = Self::count_seeders(swarm); + let n_leechers = swarm.len() - n_seeders; + + Some(ScrapeStats { + seeders: n_seeders as i32, + completed: 0, // TODO + leechers: n_leechers as i32, + }) + } else { + None + } + }) + .collect(); + + Some(ResponseMessage { + response: UdpResponse::Scrape(ScrapeResponse { + transaction_id: scrape.transaction_id, + stats: scrape_stats, + }), + dst_addr: request.src_addr, + }) + } + } + } + + fn garbage_collect(&mut self) { + let s1 = Instant::now(); + let n_purged_conns = self.purge_expired_connections(); + let d1 = s1.elapsed(); + + let s2 = Instant::now(); + let n_purged_swarm_peers = self.purge_expired_swarm_peers(); + let d2 = s2.elapsed(); + + log::debug!( + "Garbage collected {} expired connections (took: {} ns)", + n_purged_conns, + d1.as_nanos() + ); + log::debug!( + "Garbage collected {} inactive peers from the swarm (took: {} ns)", + n_purged_swarm_peers, + d2.as_nanos() + ); + log::debug!("Garbage collection took {} ns total", (d1 + d2).as_nanos()); + } + + fn purge_expired_connections(&mut self) -> usize { + let mut purged: usize = 0; + + self.peers.retain(|_, metadata| { + if metadata.last_active.elapsed() < CONNECTION_EXPIRE_TIME { + true + } else { + purged += 1; + false + } + }); + + return purged; + } + + fn purge_expired_swarm_peers(&mut self) -> usize { + let mut n_purged: usize = 0; + + self.info_hashes.iter_mut().for_each(|(_, b)| { + b.retain(|status| { + if status.last_active.elapsed() < DEFAULT_ANNOUNCE_INTERVAL * 5 + && status.last_event != Event::Stopped + { + true + } else { + n_purged += 1; + false + } + }) + }); + + self.info_hashes.retain(|_, swarm| !swarm.is_empty()); + + return n_purged; + } + + fn count_seeders(swarm: &[PeerStatus]) -> usize { + swarm + .iter() + .filter(|status| status.last_event == Event::Completed || status.remaining == 0) + .count() + } +} diff --git a/src/udp_server.rs b/src/udp_server.rs new file mode 100644 index 0000000..48473e6 --- /dev/null +++ b/src/udp_server.rs @@ -0,0 +1,214 @@ +use std::net::{Ipv4Addr, SocketAddr, UdpSocket}; + +use anyhow::Result; +use flume::{Receiver, Sender}; +use log; + +use crate::bittorrent::info_hash::InfoHash; +use crate::bittorrent::peer::PeerId; +use crate::bittorrent::protocol::Event; +use crate::bittorrent::udp::{ + AnnounceRequest, AnnounceResponse, MIN_ANNOUNCE_REQUEST_SIZE, MIN_SCRAPE_REQUEST_SIZE, + ScrapeRequest, UdpResponse, +}; +use crate::meteoro::ServerControlMessage; +use crate::{ + bittorrent::{ + protocol::{Action, UDP_MAGIC}, + udp::{ConnectRequest, UdpRequest}, + }, + tracker::{RequestMessage, ResponseMessage}, +}; + +const UDP_RECV_BUF_SIZE: usize = u16::MAX as usize; +const UDP_SEND_BUF_SIZE: usize = u16::MAX as usize; // Could also be 1500 for typical Ethernet MTU (?) + +pub fn receive( + server_id: usize, + addr: SocketAddr, + ctrl: Sender, + reqs: Sender, +) -> Result<()> { + let socket = UdpSocket::bind(addr)?; + + // Provide cloned socket for networking send thread + + ctrl.send(ServerControlMessage::UdpSocketBound(( + server_id, + socket.try_clone()?, + )))?; + + log::debug!("Starting UDP server {} on {}", server_id, addr); + + let mut buf: [u8; UDP_RECV_BUF_SIZE] = [0; UDP_RECV_BUF_SIZE]; + + loop { + match socket.recv_from(&mut buf) { + Ok((n_bytes, src)) => { + log::trace!("Received {} bytes from {}", n_bytes, src); + + match try_parse_packet(&buf[..n_bytes]) { + Some(req) => reqs.send(RequestMessage { + request: req, + src_addr: src, + server_id: server_id, + })?, + None => (), + }; + } + Err(error) => { + log::error!( + "Server {} failed to receive on socket: {}", + server_id, + error + ); + continue; + } + } + } +} + +pub fn send(server_id: usize, socket: UdpSocket, resps: Receiver) -> Result<()> { + let mut buf: [u8; UDP_SEND_BUF_SIZE] = [0; UDP_SEND_BUF_SIZE]; + + loop { + let response = resps.recv()?; + + log::trace!( + "Response to {} | {:?}", + response.dst_addr, + response.response + ); + + let n_bytes = match response.response { + UdpResponse::Connect(connect) => connect.write_to_buf(&mut buf), + UdpResponse::Announce(announce) => match announce { + AnnounceResponse::V4(v4) => v4.write_to_buf(&mut buf), + AnnounceResponse::V6(v6) => v6.write_to_buf(&mut buf), + }, + UdpResponse::Scrape(scrape) => scrape.write_to_buf(&mut buf), + UdpResponse::Error(_) => 0, + }; + + let n_sent = match socket.send_to(&buf[..n_bytes], response.dst_addr) { + Ok(n) => n, + Err(e) => { + log::error!( + "Server {} failed to send {} bytes to {}: {}", + server_id, + n_bytes, + response.dst_addr, + e + ); + continue; + } + }; + + log::trace!( + "Sent {} bytes to {}: {:x?}", + n_sent, + response.dst_addr, + &buf[..n_bytes] + ); + } +} + +fn try_parse_packet(buf: &[u8]) -> Option { + if buf.len() < 16 { + return None; + } + + let action: Action = Action::from_i32(i32::from_be_bytes(buf[8..12].try_into().ok()?))?; + + let request = match action { + Action::Connect => UdpRequest::Connect(try_parse_connect(&buf)?), + Action::Announce => UdpRequest::Announce(try_parse_announce(&buf)?), + Action::Scrape => UdpRequest::Scrape(try_parse_scrape(&buf)?), + _ => return None, + }; + + return Some(request); +} + +fn try_parse_connect(buf: &[u8]) -> Option { + // Buffer length is checked to be at least 16 in `try_parse_packet` + + let protocol_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?); + + if protocol_id != UDP_MAGIC { + return None; + } + + Some(ConnectRequest { + protocol_id: protocol_id, + transaction_id: i32::from_be_bytes(buf[12..16].try_into().ok()?), + }) +} + +fn try_parse_announce(buf: &[u8]) -> Option { + if buf.len() < MIN_ANNOUNCE_REQUEST_SIZE { + return None; + } + + let connection_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?); + let transaction_id: i32 = i32::from_be_bytes(buf[12..16].try_into().ok()?); + let info_hash: InfoHash = InfoHash { + bytes: buf[16..36].try_into().ok()?, + }; + let peer_id: PeerId = PeerId { + bytes: buf[36..56].try_into().unwrap_or_default(), + }; + let downloaded: i64 = i64::from_be_bytes(buf[56..64].try_into().ok()?); + let left: i64 = i64::from_be_bytes(buf[64..72].try_into().ok()?); + let uploaded: i64 = i64::from_be_bytes(buf[72..80].try_into().ok()?); + let event: Event = Event::from_i32(i32::from_be_bytes(buf[80..84].try_into().ok()?))?; + let ip_addr: Option = if buf[84..88].iter().all(|b| *b == 0x00) { + None + } else { + Some(Ipv4Addr::from_bits(u32::from_be_bytes( + buf[84..88].try_into().ok()?, + ))) + }; + let key: u32 = u32::from_be_bytes(buf[88..92].try_into().ok()?); + let num_want: Option = { + let want = i32::from_be_bytes(buf[92..96].try_into().ok()?); + if want < 0 { None } else { Some(want as u32) } + }; + let port: u16 = u16::from_be_bytes(buf[96..98].try_into().ok()?); + + Some(AnnounceRequest { + connection_id: connection_id, + transaction_id: transaction_id, + info_hash: info_hash, + peer_id: peer_id, + downloaded: downloaded, + left: left, + uploaded: uploaded, + event: event, + ipv4_address: ip_addr, + key: key, + num_want: num_want, + port: port, + }) +} + +fn try_parse_scrape(buf: &[u8]) -> Option { + if buf.len() < MIN_SCRAPE_REQUEST_SIZE { + return None; + } + + let connection_id: i64 = i64::from_be_bytes(buf[0..8].try_into().ok()?); + let transaction_id: i32 = i32::from_be_bytes(buf[12..16].try_into().ok()?); + let info_hashes: Vec = buf[16..] + .chunks_exact(20) + .map(|b| InfoHash { + bytes: b.try_into().unwrap(), // unwrap: `chunks_exact` guarantees the size + }) + .collect(); + + Some(ScrapeRequest { + connection_id: connection_id, + transaction_id: transaction_id, + info_hashes: info_hashes, + }) +}