content streaming through partial file requests

This commit is contained in:
Crispy 2024-03-24 18:15:39 +01:00
parent 1c5824a6fb
commit ccdaa1a966
2 changed files with 114 additions and 42 deletions

View file

@ -3,11 +3,11 @@ pub struct Request {
pub method: Method, pub method: Method,
pub path: String, pub path: String,
pub host: String, pub host: String,
pub range: Option<ContentRange>, pub range: Option<RequestRange>,
} }
#[derive(Debug)] #[derive(Debug)]
pub enum ContentRange { pub enum RequestRange {
From(usize), From(usize),
Full(usize, usize), Full(usize, usize),
Suffix(usize), Suffix(usize),
@ -36,6 +36,7 @@ pub enum Status {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Content { pub struct Content {
content_type: &'static str, content_type: &'static str,
range: Option<(usize, usize, usize)>,
bytes: Vec<u8>, bytes: Vec<u8>,
} }
@ -58,7 +59,7 @@ impl Request {
let (key, value) = line.split_once(": ")?; let (key, value) = line.split_once(": ")?;
match key { match key {
"host" => host = Some(value.to_owned()), "host" => host = Some(value.to_owned()),
"range" => range = ContentRange::parse(value), "range" => range = RequestRange::parse(value),
_ => (), _ => (),
} }
} }
@ -83,19 +84,29 @@ impl Response {
} }
} }
pub fn format(self, head_only: bool) -> Vec<u8> { pub fn format(mut self, head_only: bool) -> Vec<u8> {
if let Some(content) = self.content { if let Some(content) = self.content {
let mut data = format!( if content.range.is_some() {
"{}\r\nContent-Type: {}\r\nContent-Length: {}\r\n\r\n", self.status = Status::PartialContent;
}
//do i need accept-ranges?
let mut buffer = format!(
"{}\r\nContent-Type: {}\r\nAccept-Ranges: bytes\r\nContent-Length: {}\r\n",
self.status.header(), self.status.header(),
content.content_type, content.content_type,
content.bytes.len(), content.bytes.len(),
) )
.into_bytes(); .into_bytes();
if !head_only { if let Some((start, end, size)) = content.range {
data.extend_from_slice(&content.bytes); buffer.extend_from_slice(
format!("Content-Range: bytes {}-{}/{}\r\n", start, end, size).as_bytes(),
)
} }
data buffer.extend_from_slice(b"\r\n");
if !head_only {
buffer.extend_from_slice(&content.bytes);
}
buffer
} else { } else {
format!("{}\r\n\r\n", self.status.header()).into_bytes() format!("{}\r\n\r\n", self.status.header()).into_bytes()
} }
@ -158,9 +169,15 @@ impl Content {
}; };
Self { Self {
content_type, content_type,
range: None,
bytes, bytes,
} }
} }
pub fn with_range(mut self, range: Option<(usize, usize, usize)>) -> Self {
self.range = range;
self
}
} }
impl Status { impl Status {
@ -175,8 +192,8 @@ impl Status {
pub fn name(self) -> &'static str { pub fn name(self) -> &'static str {
match self { match self {
Status::Ok => "OK", Status::Ok => "OK",
Status::PartialContent => "", Status::PartialContent => "PARTIAL CONTENT",
Status::BadRequest => "", Status::BadRequest => "BAD REQUEST",
Status::NotFound => "NOT FOUND", Status::NotFound => "NOT FOUND",
} }
} }
@ -192,7 +209,7 @@ impl Method {
} }
} }
impl ContentRange { impl RequestRange {
fn parse(source: &str) -> Option<Self> { fn parse(source: &str) -> Option<Self> {
let source = source.strip_prefix("bytes=")?; let source = source.strip_prefix("bytes=")?;
let (start, end) = source.split_once('-')?; let (start, end) = source.split_once('-')?;

View file

@ -1,13 +1,15 @@
use std::{ use std::{
env, env,
fs::{self, File}, fs::{self, File},
io::{Read, Write}, io::{BufReader, Read, Seek, Write},
net::{TcpListener, TcpStream}, net::{TcpListener, TcpStream},
os::unix::fs::MetadataExt,
path::{Path, PathBuf}, path::{Path, PathBuf},
thread,
}; };
mod http; mod http;
use http::{Content, Method, Request, Response, Status}; use http::{Content, Method, Request, RequestRange, Response, Status};
fn main() { fn main() {
let args: Vec<String> = env::args().collect(); let args: Vec<String> = env::args().collect();
@ -21,47 +23,76 @@ fn main() {
let listener = TcpListener::bind(host).expect("Could not bind to address"); let listener = TcpListener::bind(host).expect("Could not bind to address");
let mut threads = Vec::new();
for stream in listener.incoming() { for stream in listener.incoming() {
match stream { match stream {
Ok(stream) => handle_connection(stream), Ok(stream) => threads.push(thread::spawn(|| handle_connection(stream))),
Err(err) => println!("Error with incoming stream: {}", err), Err(err) => println!("Error with incoming stream: {}", err),
} }
threads = threads.into_iter().filter(|j| !j.is_finished()).collect();
println!("{} threads open", threads.len());
} }
} }
fn handle_connection(mut stream: TcpStream) { fn handle_connection(mut stream: TcpStream) {
let mut buffer = vec![0; 2048]; let Ok(peer_addr) = stream.peer_addr() else {
let size = if let Ok(size) = stream.read(&mut buffer) {
size
} else {
return; return;
}; };
println!("#### new connection from {peer_addr}");
buffer.resize(size, 0); let mut buffer = Vec::with_capacity(2048);
loop {
let mut b = vec![0; 512];
let Ok(size) = stream.read(&mut b) else {
println!("failed to read ");
return;
};
if size == 0 {
println!("nothing read");
return;
}
b.truncate(size);
buffer.extend_from_slice(&b);
let request = String::from_utf8_lossy(&buffer); if buffer.len() > 4096 {
println!("request too long");
let peer_addr = stream.peer_addr().ok(); return;
println!( }
"Received {} bytes from {:?}\n=======\n{}=======\n\n", if buffer.ends_with(b"\r\n\r\n") {
size, let request = String::from_utf8_lossy(&buffer).to_string();
peer_addr, // println!("Received {} bytes from {}", buffer.len(), peer_addr);
request // println!(
.escape_debug() // "=======\n{}=======\n\n",
.collect::<String>() // request
.replace("\\r\\n", "\n") // .escape_debug()
.replace("\\n", "\n") // .collect::<String>()
); // .replace("\\r\\n", "\n")
// .replace("\\n", "\n")
// );
if handle_request(request, &mut stream) {
println!("closing connection");
return;
}
// println!("keeping connection");
buffer.clear();
}
}
}
fn handle_request(request: String, stream: &mut TcpStream) -> bool {
let request = Request::parse(&request); let request = Request::parse(&request);
let response; let response;
let mut end_connection = true;
if let Some(request) = request { if let Some(request) = request {
let head_only = request.method == Method::Head; let head_only = request.method == Method::Head;
let path = request.path.clone(); let path = request.path.clone();
response = get_file(request) response = get_file(request)
.map(|content| Response::new(Status::Ok).with_content(content)) .map(|(content, end_of_file)| {
end_connection = end_of_file;
Response::new(Status::Ok).with_content(content)
})
.unwrap_or_else(|| { .unwrap_or_else(|| {
Response::new(Status::NotFound) Response::new(Status::NotFound)
.with_content(Content::text(format!("FILE NOT FOUND - '{}'", path))) .with_content(Content::text(format!("FILE NOT FOUND - '{}'", path)))
@ -76,10 +107,13 @@ fn handle_connection(mut stream: TcpStream) {
.unwrap_or_else(|_| println!("failed to respond")); .unwrap_or_else(|_| println!("failed to respond"));
stream stream
.flush() .flush()
.unwrap_or_else(|_| println!("failed to respond")); .unwrap_or_else(|_| println!("failed to flush"));
end_connection
} }
fn get_file(request: Request) -> Option<Content> { fn get_file(request: Request) -> Option<(Content, bool)> {
const MAX_SIZE: usize = 1024 * 1024 * 8;
let path = PathBuf::from(format!("./{}", &request.path)) let path = PathBuf::from(format!("./{}", &request.path))
.canonicalize() .canonicalize()
.ok()?; .ok()?;
@ -90,15 +124,36 @@ fn get_file(request: Request) -> Option<Content> {
if path.is_dir() { if path.is_dir() {
let index_file = path.join("index.html"); let index_file = path.join("index.html");
if index_file.is_file() { if index_file.is_file() {
Some(Content::html(fs::read_to_string(index_file).ok()?)) Some((Content::html(fs::read_to_string(index_file).ok()?), true))
} else { } else {
generate_index(&request.path, &path) generate_index(&request.path, &path).map(|c| (c, true))
} }
} else if path.is_file() { } else if path.is_file() {
let ext = path.extension().unwrap_or_default().to_str()?; let ext = path.extension().unwrap_or_default().to_str()?;
let mut buf = Vec::new(); let file = File::open(&path).ok()?;
File::open(&path).ok()?.read_to_end(&mut buf).ok()?; let size = file.metadata().ok()?.size() as usize;
Some(Content::file(ext, buf))
let mut buf = vec![0; MAX_SIZE];
let mut reader = BufReader::new(file);
let start_pos = match request.range {
Some(RequestRange::From(p)) => p,
Some(RequestRange::Full(start, _end)) => start,
_ => 0,
};
reader
.seek(std::io::SeekFrom::Start(start_pos as u64))
.ok()?;
let size_read = reader.read(&mut buf).ok()?;
buf.truncate(size_read);
let mut end_of_file = false;
let range = if size_read < size {
end_of_file = start_pos + size_read == size;
Some((start_pos, start_pos + size_read - 1, size))
} else {
None
};
Some((Content::file(ext, buf).with_range(range), end_of_file))
} else { } else {
None None
} }