Openssl + a bunch of other refactorings

This commit is contained in:
Igor Katson 2021-06-28 20:40:13 +01:00
parent b4b22ea9a4
commit 34dd074310
15 changed files with 151 additions and 26 deletions

1
Cargo.lock generated
View File

@ -514,6 +514,7 @@ dependencies = [
"byteorder",
"futures",
"log",
"openssl",
"parking_lot",
"rand",
"reqwest",

7
Makefile Normal file
View File

@ -0,0 +1,7 @@
all: sign-release sign-debug
sign-debug:
codesign -f --entitlements resources/debugging.entitlements -s - target/debug/rqbit
sign-release:
codesign -f --entitlements resources/debugging.entitlements -s - target/release/rqbit

View File

@ -20,6 +20,7 @@ parking_lot = "0.11"
log = "0.4"
size_format = "1"
rand = "0.8"
openssl = "*"
uuid = {version = "0.8", features = ["v4"]}
futures = "0.3"

View File

@ -12,6 +12,7 @@ use crate::{
buffers::ByteString,
lengths::{ChunkInfo, Lengths, ValidPieceIndex},
peer_binary_protocol::Piece,
sha1w::{self, ISha1},
torrent_metainfo::{FileIteratorName, TorrentMetaV1Owned},
type_aliases::{PeerHandle, BF},
};
@ -23,9 +24,9 @@ pub struct InitialCheckResults {
pub needed_bytes: u64,
}
pub fn update_hash_from_file(
pub fn update_hash_from_file<Sha1: ISha1>(
file: &mut File,
hash: &mut sha1::Sha1,
hash: &mut Sha1,
buf: &mut [u8],
mut bytes_to_read: usize,
) -> anyhow::Result<()> {
@ -120,7 +121,7 @@ impl<'a> FileOps<'a> {
let mut read_buffer = vec![0u8; 65536];
for piece_info in self.lengths.iter_piece_infos() {
let mut computed_hash = sha1::Sha1::new();
let mut computed_hash = sha1w::Sha1Openssl::new();
let mut piece_remaining = piece_info.len as usize;
let mut some_files_broken = false;
let mut at_least_one_file_required = current_file.full_file_required;
@ -180,7 +181,7 @@ impl<'a> FileOps<'a> {
if self
.torrent
.info
.compare_hash(piece_info.piece_index.get(), &computed_hash)
.compare_hash(piece_info.piece_index.get(), computed_hash.finish())
.unwrap()
{
trace!(
@ -220,7 +221,7 @@ impl<'a> FileOps<'a> {
piece_index: ValidPieceIndex,
last_received_chunk: &ChunkInfo,
) -> anyhow::Result<bool> {
let mut h = sha1::Sha1::new();
let mut h = sha1w::Sha1Openssl::new();
let piece_length = self.lengths.piece_length(piece_index);
let mut absolute_offset = self.lengths.piece_offset(piece_index);
let mut buf = vec![0u8; std::cmp::min(65536, piece_length as usize)];
@ -269,7 +270,11 @@ impl<'a> FileOps<'a> {
absolute_offset = 0;
}
match self.torrent.info.compare_hash(piece_index.get(), &h) {
match self
.torrent
.info
.compare_hash(piece_index.get(), h.finish())
{
Some(true) => {
debug!("piece={} hash matches", piece_index);
Ok(true)

View File

@ -1,4 +1,4 @@
use crate::{buffers::ByteString, constants::CHUNK_SIZE, peer_binary_protocol::Piece};
use crate::{constants::CHUNK_SIZE, peer_binary_protocol::Piece};
const fn is_power_of_two(x: u64) -> bool {
(x != 0) && ((x & (x - 1)) == 0)

View File

@ -9,6 +9,7 @@ pub mod peer_connection;
pub mod peer_id;
pub mod peer_state;
pub mod serde_bencode;
pub mod sha1w;
pub mod spawn_utils;
pub mod torrent_manager;
pub mod torrent_metainfo;

View File

@ -5,10 +5,16 @@ use serde::{Deserialize, Serialize};
use crate::{
buffers::{ByteBuf, ByteString},
clone_to_owned::CloneToOwned,
constants::CHUNK_SIZE,
lengths::ChunkInfo,
};
const PREAMBLE_LEN: usize = 5;
const INTEGER_LEN: usize = 4;
const MSGID_LEN: usize = 1;
const PREAMBLE_LEN: usize = INTEGER_LEN + MSGID_LEN;
const PIECE_MESSAGE_PREAMBLE_LEN: usize = PREAMBLE_LEN + INTEGER_LEN * 2;
pub const PIECE_MESSAGE_DEFAULT_LEN: usize = PIECE_MESSAGE_PREAMBLE_LEN + CHUNK_SIZE as usize;
const NO_PAYLOAD_MSG_LEN: usize = PREAMBLE_LEN;
const PSTR_BT1: &str = "BitTorrent protocol";
@ -56,7 +62,7 @@ pub fn serialize_piece_preamble(chunk: &ChunkInfo, mut buf: &mut [u8]) -> usize
BE::write_u32(&mut buf[0..4], chunk.piece_index.get());
BE::write_u32(&mut buf[4..8], chunk.offset);
PREAMBLE_LEN + 8
PIECE_MESSAGE_PREAMBLE_LEN
}
#[derive(Debug)]

View File

@ -15,10 +15,10 @@ use crate::{
lengths::ChunkInfo,
peer_binary_protocol::{
serialize_piece_preamble, Handshake, Message, MessageBorrowed, MessageDeserializeError,
MessageOwned, Piece, Request,
MessageOwned, Piece, Request, PIECE_MESSAGE_DEFAULT_LEN,
},
peer_id::try_decode_peer_id,
spawn_utils::spawn,
spawn_utils::{spawn, spawn_block_in_place},
torrent_state::{InflightRequest, TorrentState},
type_aliases::PeerHandle,
};
@ -57,7 +57,7 @@ impl PeerConnection {
conn.write_all(&handshake.serialize())
.await
.context("error writing handshake")?;
let mut read_buf = vec![0u8; 16384 * 2];
let mut read_buf = vec![0u8; PIECE_MESSAGE_DEFAULT_LEN * 2];
let read_bytes = conn
.read(&mut read_buf)
.await
@ -89,7 +89,7 @@ impl PeerConnection {
let this = self.clone();
let writer = async move {
let mut buf = Vec::<u8>::new();
let mut buf = Vec::<u8>::with_capacity(PIECE_MESSAGE_DEFAULT_LEN);
let keep_alive_interval = Duration::from_secs(120);
if this.state.stats.have.load(Ordering::Relaxed) > 0 {
@ -127,7 +127,7 @@ impl PeerConnection {
let preamble_len = serialize_piece_preamble(&chunk, &mut buf);
let full_len = preamble_len + chunk.size as usize;
buf.resize(full_len, 0);
tokio::task::block_in_place(|| {
spawn_block_in_place(|| {
this.state.file_ops().read_chunk(
handle,
&chunk,
@ -402,15 +402,15 @@ impl PeerConnection {
Some(next) => next,
None => {
if self.state.get_left_to_download() == 0 {
info!("{}: nothing left to download, closing requester", handle);
debug!("{}: nothing left to download, closing requester", handle);
return Ok(());
}
if let Some(piece) = self.state.try_steal_piece(handle) {
info!("{}: stole a piece {}", handle, piece);
debug!("{}: stole a piece {}", handle, piece);
piece
} else {
info!("no pieces to request from {}", handle);
debug!("no pieces to request from {}", handle);
#[allow(unused_must_use)]
{
timeout(Duration::from_secs(60), notify.notified()).await;
@ -534,7 +534,7 @@ impl PeerConnection {
// to prevent deadlocks.
drop(g);
tokio::task::block_in_place(move || {
spawn_block_in_place(move || {
let index = piece.index;
// TODO: in theory we should unmark the piece as downloaded here. But if there was a disk error, what

View File

@ -0,0 +1,49 @@
// Wrapper for sha1 libraries.
// Sha1 computation is the majority of CPU usage of this library.
// openssl seems 2-3x faster, so using it for now, but
// leaving the pure-rust impl here too. Maybe someday make them
// runtime swappable.
pub trait ISha1 {
fn new() -> Self;
fn update(&mut self, buf: &[u8]);
fn finish(self) -> [u8; 20];
}
pub struct Sha1Rust {
inner: sha1::Sha1,
}
impl ISha1 for Sha1Rust {
fn new() -> Self {
Sha1Rust {
inner: sha1::Sha1::new(),
}
}
fn update(&mut self, buf: &[u8]) {
self.inner.update(buf)
}
fn finish(self) -> [u8; 20] {
self.inner.digest().bytes()
}
}
pub struct Sha1Openssl {
inner: openssl::sha::Sha1,
}
impl ISha1 for Sha1Openssl {
fn new() -> Self {
Self {
inner: openssl::sha::Sha1::new(),
}
}
fn update(&mut self, buf: &[u8]) {
self.inner.update(buf)
}
fn finish(self) -> [u8; 20] {
self.inner.finish()
}
}

View File

@ -18,3 +18,10 @@ pub fn spawn<N: Display + 'static + Send>(
}
});
}
pub fn spawn_block_in_place<F: FnOnce() -> R, R>(f: F) -> R {
// Have this wrapper so that it's easy to switch to just f() when
// using tokio's single-threaded runtime. Single-threaded runtime is
// easier to read with time profilers.
tokio::task::block_in_place(f)
}

View File

@ -318,7 +318,7 @@ impl TorrentManager {
let peer_connection = PeerConnection::new(self.inner.clone());
spawn(format!("manage_peer({})", handle), async move {
if let Err(e) = peer_connection.manage_peer(addr, handle, out_rx).await {
error!("error managing peer {}: {:#}", handle, e)
debug!("error managing peer {}: {:#}", handle, e)
};
peer_connection.into_state().drop_peer(handle);
Ok::<_, anyhow::Error>(())

View File

@ -132,11 +132,11 @@ impl<BufType: Clone + Deref<Target = [u8]>> TorrentMetaV1Info<BufType> {
let expected_hash = self.pieces.deref().get(start..end)?;
Some(expected_hash)
}
pub fn compare_hash(&self, piece: u32, hash: &sha1::Sha1) -> Option<bool> {
pub fn compare_hash(&self, piece: u32, hash: [u8; 20]) -> Option<bool> {
let start = piece as usize * 20;
let end = start + 20;
let expected_hash = self.pieces.deref().get(start..end)?;
Some(expected_hash == hash.digest().bytes())
Some(expected_hash == hash)
}
pub fn iter_filenames_and_lengths(
&self,

View File

@ -276,7 +276,7 @@ impl TorrentState {
}
pub fn maybe_transmit_haves(&self, index: ValidPieceIndex) {
let mut unordered = FuturesUnordered::new();
let mut futures = Vec::new();
let g = self.locked.read();
for (handle, peer_state) in g.peers.states.iter() {
@ -300,7 +300,7 @@ impl TorrentState {
None => continue,
};
let tx = Arc::downgrade(tx);
unordered.push(async move {
futures.push(async move {
if let Some(tx) = tx.upgrade() {
if tx
.send(WriterRequest::Message(Message::Have(index.get())))
@ -316,11 +316,12 @@ impl TorrentState {
}
}
if unordered.is_empty() {
if futures.is_empty() {
trace!("no peers to transmit Have={} to, saving some work", index);
return;
}
let mut unordered: FuturesUnordered<_> = futures.into_iter().collect();
spawn(
format!("transmit_haves(piece={}, count={})", index, unordered.len()),
async move {

View File

@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>com.apple.security.cs.debugger</key>
<true/>
<key>com.apple.security.get-task-allow</key>
<true/>
</dict>
</plist>

View File

@ -42,6 +42,15 @@ fn torrent_from_file(filename: &str) -> anyhow::Result<TorrentMetaV1Owned> {
.clone_to_owned())
}
#[derive(Debug, Clap)]
enum LogLevel {
Trace,
Debug,
Info,
Warn,
Error,
}
#[derive(Clap)]
#[clap(version = "1.0", author = "Igor Katson <igor.katson@gmail.com>")]
struct Opts {
@ -61,6 +70,9 @@ struct Opts {
/// Only list the torrent metadata contents, don't do anything else.
#[clap(short, long)]
list: bool,
#[clap(arg_enum, short = 'v')]
log_level: Option<LogLevel>,
}
fn compute_only_files(
@ -87,15 +99,40 @@ fn compute_only_files(
Ok(only_files)
}
fn main() -> anyhow::Result<()> {
fn init_logging(opts: &Opts) {
match opts.log_level.as_ref() {
Some(level) => {
let level_str = match level {
LogLevel::Trace => "trace",
LogLevel::Debug => "debug",
LogLevel::Info => "info",
LogLevel::Warn => "warn",
LogLevel::Error => "error",
};
std::env::set_var("RUST_LOG", level_str);
}
None => {
if std::env::var_os("RUST_LOG").is_none() {
std::env::set_var("RUST_LOG", "info");
};
}
};
pretty_env_logger::init();
}
fn main() -> anyhow::Result<()> {
let opts = Opts::parse();
init_logging(&opts);
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_time()
.enable_io()
// the default is 512, it can get out of hand.
// the default is 512, it can get out of hand, as this program is CPU-bound on
// hash checking.
// note: we aren't using spawn_blocking() anymore, so this doesn't apply,
// however I'm still messing around, so in case we do, let's block the number of
// spawned threads.
.max_blocking_threads(8)
.build()?;