Skip to content

Commit

Permalink
feat: intercept http request to download task by p2p in proxy (dragon…
Browse files Browse the repository at this point in the history
…flyoss#234)

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi authored Jan 27, 2024
1 parent 0e336b3 commit 9f86bc9
Show file tree
Hide file tree
Showing 10 changed files with 245 additions and 186 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,4 @@ hyper-rustls = "0.26"
http-body-util = "0.1.0"
regex = "1.10.2"
http-range-header = "0.4.0"
futures-util = "0.3.30"
1 change: 1 addition & 0 deletions src/bin/dfdaemon/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ async fn main() -> Result<(), anyhow::Error> {
// Initialize proxy server.
let proxy = Proxy::new(
config.clone(),
task.clone(),
shutdown.clone(),
shutdown_complete_tx.clone(),
);
Expand Down
18 changes: 16 additions & 2 deletions src/grpc/dfdaemon_download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use crate::shutdown;
use crate::task;
use crate::utils::http::hashmap_to_headermap;
use crate::utils::http::{get_range, hashmap_to_headermap};
use crate::Result as ClientResult;
use dragonfly_api::common::v2::Task;
use dragonfly_api::dfdaemon::v2::{
Expand Down Expand Up @@ -150,7 +150,7 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
let request = request.into_inner();

// Check whether the download is empty.
let download = request
let mut download = request
.download
.ok_or(Status::invalid_argument("missing download"))?;

Expand Down Expand Up @@ -204,6 +204,20 @@ impl DfdaemonDownload for DfdaemonDownloadServerHandler {
Status::internal(e.to_string())
})?;

// Download's range priority is higher than the request header's range.
// If download protocol is http, use the range of the request header.
// If download protocol is not http, use the range of the download.
if download.range.is_none() {
let content_length = task
.content_length()
.ok_or(Status::internal("missing content length in the response"))?;

download.range = get_range(&request_header, content_length).map_err(|err| {
error!("get range failed: {}", err);
Status::failed_precondition(err.to_string())
})?;
}

// Clone the task.
let task_manager = self.task.clone();

Expand Down
7 changes: 6 additions & 1 deletion src/grpc/dfdaemon_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,12 @@ impl DfdaemonUpload for DfdaemonUploadServerHandler {
let mut reader = self
.task
.piece
.upload_from_local_peer_into_async_read(task_id.as_str(), piece_number, piece.length)
.upload_from_local_peer_into_async_read(
task_id.as_str(),
piece_number,
piece.length,
false,
)
.await
.map_err(|err| {
error!("read piece content from local storage: {}", err);
Expand Down
15 changes: 1 addition & 14 deletions src/proxy/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
*/

use crate::config;
use crate::utils::http::parse_range_header;
use crate::Result;
use dragonfly_api::common::v2::{Priority, Range};
use dragonfly_api::common::v2::Priority;
use reqwest::header::HeaderMap;
use tracing::error;

Expand All @@ -43,17 +41,6 @@ pub const DRAGONFLY_FILTERED_QUERY_PARAMS_HEADER: &str = "X-Dragonfly-Filtered-Q
// it specifies the piece length of the task.
pub const DRAGONFLY_PIECE_LENGTH_HEADER: &str = "X-Dragonfly-Piece-Length";

// get_range gets the range from http header.
pub fn get_range(header: &HeaderMap, content_length: u64) -> Result<Option<Range>> {
match header.get(reqwest::header::RANGE) {
Some(range) => {
let range = range.to_str()?;
Ok(Some(parse_range_header(range, content_length)?))
}
None => Ok(None),
}
}

// get_tag gets the tag from http header.
pub fn get_tag(header: &HeaderMap) -> Option<String> {
match header.get(DRAGONFLY_TAG_HEADER) {
Expand Down
Loading

0 comments on commit 9f86bc9

Please sign in to comment.