This commit is contained in:
2024-08-31 00:03:43 +08:00
parent caacec8455
commit 68c774316f
17 changed files with 5019 additions and 46 deletions

3
.gitignore vendored
View File

@@ -4,9 +4,6 @@
debug/ debug/
target/ target/
# Remove Cargo.lock from gitignore if creating an executable, leave it for libraries
# More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html
Cargo.lock
# These are backup files generated by rustfmt # These are backup files generated by rustfmt
**/*.rs.bk **/*.rs.bk

2245
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

37
Cargo.toml Normal file
View File

@@ -0,0 +1,37 @@
[package]
name = "rust-s3-server"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rand = "0.8"
lazy_static = "1.4"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0"
regex = "1.9"
tokio = { version = "1", features = ["full"]}
hyper = { version = "0.14", features = ["full"] }
http-body = "0.4"
http-body-util = "0.1.0-rc.3"
rocket = "=0.5.0-rc.3"
ubyte = "0.10"
chrono = "0.4"
urlencoding = "2.1"
httpdate = "1"
pin-project = "1.1"
md-5 = "0.10"
clap = { version = "4.3", features = ["derive"] }
futures = "0.3"
log = "0.4"
log4rs = { version="1.2.0", features = ["gzip", "background_rotation"] }
asyncio-utils = "0.4"
[[bin]]
name="rusts3"
path="src/main.rs"
[[bin]]
name="test"
path="src/test.rs"

210
LICENSE
View File

@@ -1,73 +1,201 @@
Apache License Apache License
Version 2.0, January 2004 Version 2.0, January 2004
http://www.apache.org/licenses/ http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions. 1. Definitions.
"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. "Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: 4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices stating that You changed the files; and (b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. (d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. 9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work. APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner] Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
You may obtain a copy of the License at You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.

View File

@@ -1,3 +1,79 @@
# s3-server-rs # rust-s3-server
An S3 compatible Object Storage written in Rust. Ideal for local test environment
# Building
```bash
$ cargo build --release
```
# Running
## Getting help
```bash
$ target/release/rusts3 --help
Usage: rusts3 [OPTIONS]
Options:
-b, --base-dir <BASE_DIR> [default: ./rusts3-data]
--bind-address <BIND_ADDRESS> Bind IP address [default: 0.0.0.0]
--bind-port <BIND_PORT> Bind port number [default: 8000]
--log-conf <LOG_CONF> Log4rs config file [default: ]
-h, --help Print help
```
## Example usage:
```bash
$ target/release/rusts3 -b "test-data" --bind-address "192.168.44.172" --bind-port 18000 --log-conf log4rs.yaml
```
### Testing
Creating bucket:
```bash
$ aws --endpoint-url http://192.168.44.172:18000 s3 mb s3://new-bucket
make_bucket: new-bucket
```
Uploading object:
```bash
$ aws --endpoint-url http://192.168.44.172:18000 s3 cp ~/Downloads/zulu.dmg s3://new-bucket/some-path/zulu.dmg
upload: ./zulu.dmg to s3://new-bucket/some-path/zulu.dmg
```
Downloading object:
```bash
$ aws --endpoint-url http://192.168.44.172:18000 s3 cp s3://new-bucket/some-path/zulu.dmg ./new.dmg
download: s3://new-bucket/some-path/zulu.dmg to ./new.dmg
# files should be the same
$ diff ~/Downloads/zulu.dmg ./new/dmg
```
Listing object:
```bash
$ aws --endpoint-url http://192.168.44.172:18000 s3 ls s3://new-bucket
PRE some-path/
$ aws --endpoint-url http://192.168.44.172:18000 s3 ls s3://new-bucket/some-path/
PRE some-path/
2023-08-13 11:45:25 615835 zulu.dmg
```
Deleting object:
```bash
$ aws --endpoint-url http://192.168.44.172:18000 s3 rm s3://new-bucket/some-path/zulu.img
delete: s3://new-bucket/some-path/zulu.dmg
```
-----
```shell
export AWS_ACCESS_KEY_ID=ABCD
export AWS_SECRET_ACCESS_KEY=EF1234
export AWS_ENDPOINT_URL=http://127.0.0.1:8001
```
S3 Server in Rust

32
log4rs.yml Normal file
View File

@@ -0,0 +1,32 @@
refresh_rate: 60 seconds
appenders:
stdout:
kind: console
default:
kind: rolling_file
path: "/var/log/rusts3.log"
append: true
encoder:
pattern: "{d(%Y-%m-%d %H:%M:%S%.3f %Z)} {({l}):5.5} {f}:{L} - {m}{n}"
policy:
kind: compound
trigger:
kind: size
limit: 10 mb
roller:
kind: fixed_window
pattern: "/var/log/rusts3.{}.log.gz"
count: 20
base: 1
root:
level: info
appenders:
- stdout
loggers:
rusts3:
level: info
appenders:
- default
additive: false

44
src/cachedfile.rs Normal file
View File

@@ -0,0 +1,44 @@
use rocket::http::Status;
use std::time::SystemTime;
use asyncio_utils::LimitSeekerReader;
use tokio::fs::File;
//limit_reader::LimitedReader;
pub struct CachedFile {
pub reader:LimitSeekerReader<File>,
pub file_name:String,
pub size:usize,
pub modified_time:SystemTime,
pub etag:String,
pub partial: bool,
}
#[rocket::async_trait]
impl<'r,'o> rocket::response::Responder<'r,'r> for CachedFile
{
fn respond_to(self, _req: &'r rocket::Request) -> rocket::response::Result<'r> {
//let etag = self.1.sha256sum().unwrap();
//let last_modified = self.1.meta().unwrap().modified().unwrap();
let etag = self.etag;
let htd = httpdate::fmt_http_date(self.modified_time);
let response = rocket::response::Response::build().sized_body(
self.size, self.reader).finalize();
let mut actual_builder = &mut rocket::response::Response::build_from(response);
actual_builder = actual_builder.raw_header("Cache-control", "max-age=86400")
.raw_header("Last-Modified", htd) // 24h (24*60*60)
.raw_header("ETag", etag)
.raw_header("Content-Type", "application-octetstream")
.raw_header("Content-Disposition", format!("attachment; filename=\"{}\"", self.file_name));
if self.partial {
actual_builder = actual_builder.status(Status::PartialContent);
}
actual_builder.raw_header("content-length", format!("{}", self.size));
return Ok(actual_builder.finalize());
}
}

133
src/chunk_to_raw.rs Normal file
View File

@@ -0,0 +1,133 @@
use tokio::fs::File;
use std::error::Error;
use md5::{Md5, Digest};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use std::i64;
use asyncio_utils::UndoReader;
pub async fn copy_chunk_to_raw<D>(input_raw:&mut D, output: &mut File, chunked:bool) -> Result<(usize, String), Box<dyn Error>>
where D:AsyncRead + Unpin
{
let mut input_obj = UndoReader::new(input_raw, None);
let input = &mut input_obj;
let mut hasher:Md5 = Md5::new();
let mut total:usize = 0;
if chunked {
loop {
// it is chunked
let header = read_chunk_header(input).await?;
let mut tokens = header.split(";");
let size = tokens.next().expect("No size info!");
let size = i64::from_str_radix(size, 16)?;
if size == 0 {
break;
}
let size = size as usize;
let copied = copy_fixed_bytes(input, output, size, &mut hasher).await?;
skip_n_bytes(input, 2).await?;
total += copied;
}
let final_hash = format!("{:x}", hasher.finalize());
return Ok((total, final_hash));
} else {
// raw
let copied = copy_direct(input, output, &mut hasher).await?;
let final_hash = format!("{:x}", hasher.finalize());
return Ok((copied, final_hash));
}
}
async fn try_read_full<D>(input: &mut UndoReader<D>, buff:&mut[u8]) -> Result<usize, Box<dyn Error>>
where D:AsyncRead + Unpin
{
let target = buff.len();
let mut nread:usize = 0;
while nread < target {
let rr = input.read(&mut buff[nread..]).await?;
if rr == 0 {
return Ok(nread);
}
nread += rr;
}
return Ok(nread);
}
async fn skip_n_bytes<D>(input: &mut UndoReader<D>, how_many:usize) -> Result<(), Box<dyn Error>>
where D:AsyncRead + Unpin
{
let mut buf = vec![0u8; how_many];
let mut nread:usize = 0;
while nread < how_many {
let rr = input.read(&mut buf[nread..]).await?;
if rr == 0 {
panic!("Insufficient read!");
}
nread += rr;
}
Ok(())
}
async fn read_chunk_header<D>(input: &mut UndoReader<D>) -> Result<String, Box<dyn Error>>
where D:AsyncRead + Unpin
{
let mut buf = [0u8; 512];
let rr = try_read_full(input, &mut buf).await?;
for i in 0 .. rr - 1 {
if buf[i] == 0x0d && buf[i+1] == 0x0a {
if buf.len() > i + 2 {
input.unread(&buf[i + 2..]);
}
// new line found!
return Ok(std::str::from_utf8(&buf[0..i]).unwrap().to_string());
}
}
if buf[1] == 0x3b && buf[0] == 0x30 {
// 0 bytes
return Ok(std::str::from_utf8(&buf[0..rr]).unwrap().to_string());
}
panic!("Expecing chunk header but not found!");
}
pub async fn copy_direct<D>(reader:&mut UndoReader<D>, out:&mut tokio::fs::File, hasher:&mut Md5) -> Result<usize, Box<dyn Error>>
where D:tokio::io::AsyncRead + Unpin
{
let mut buf = [0u8; 4096];
let mut copied: usize = 0;
loop {
let rr = reader.read(&mut buf).await?;
if rr == 0 {
break;
}
out.write_all(&mut buf[..rr]).await?;
hasher.update(&mut buf[..rr]);
copied += rr;
}
return Ok(copied);
}
async fn copy_fixed_bytes<D>(input: &mut UndoReader<D>, output: &mut File, bytes:usize, hasher:&mut Md5) -> Result<usize, Box<dyn Error>>
where D:tokio::io::AsyncRead + Unpin
{
let mut remaining = bytes;
let mut buffer = [0u8; 4096];
// Copy as whole chunk
while remaining > 0 {
let mut rr = input.read(&mut buffer).await?;
if rr > remaining {
// overread, put it back
let to_put_back = &buffer[remaining..rr];
input.unread(to_put_back);
// only write up to remaining bytes
rr = remaining;
}
let wr = output.write(&mut buffer[..rr]).await?;
if wr != rr {
panic!("Incomplete write!");
}
hasher.update(&buffer[..rr]);
remaining = remaining - wr;
}
Ok(bytes)
}

987
src/fsapi.rs Normal file
View File

@@ -0,0 +1,987 @@
use std::path::PathBuf;
use std::error::Error;
use std::fs::{Metadata, metadata, self};
use crate::s3error::S3Error;
use crate::sequencing::{self, Sequence};
use crate::chunk_to_raw::copy_chunk_to_raw;
use std::collections::HashMap;
use rand::{distributions::Alphanumeric, Rng};
use serde_json;
use md5::{Md5, Digest};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::fmt;
use regex::Regex;
use serde::{Deserialize, Serialize};
use tokio::fs as tokiofs;
use std::sync::Arc;
use std::io::Read;
use std::io::Write;
use chrono::offset::Utc;
use chrono::DateTime;
#[derive(Debug)]
pub enum S3FSErrorKind {
InvalidBucketName,
InvalidObjectKey,
BucketNotFound,
BucketAlreadyExists,
KeyNotFound,
InvalidMeta,
IncompleteWrite,
InputOutput,
ObjectTooLarge
}
impl std::fmt::Display for S3FSErrorKind {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{:?}", &self)
}
}
#[derive(Debug)]
pub struct S3FSError {
pub kind: S3FSErrorKind,
pub message: Option<String>
}
impl std::fmt::Display for S3FSError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let mut h = String::new();
match &self.message {
None => {},
Some(c) =>
{
let msg = format!(" Message: {}", c);
h.push_str(&msg);
}
};
write!(f, "S3FSError: Kind: {}{}", &self.kind, &h)
}
}
impl Error for S3FSError {
}
impl S3FSError {
pub fn io(msg: Option<String>) -> S3FSError {
S3FSError { kind: S3FSErrorKind::InputOutput, message: msg }
}
pub fn invalid_bucket_name (name: &str) -> S3FSError {
S3FSError { kind: S3FSErrorKind::InvalidBucketName, message: Some(name.to_string()) }
}
pub fn bucket_not_found(name: &str) -> S3FSError {
S3FSError {kind:S3FSErrorKind::BucketNotFound, message:Some(name.to_string())}
}
pub fn of_kind_and_message(kind:S3FSErrorKind, message:&str) -> S3FSError {
S3FSError {kind:kind, message:Some(message.to_string())}
}
pub fn of_kind(kind:S3FSErrorKind) -> S3FSError {
S3FSError {kind:kind, message: None}
}
pub fn boxed(self) -> Box<Self> {
return Box::new(self);
}
}
pub struct FS {
base: PathBuf,
seq:Arc<sequencing::Sequence>,
}
impl FS {
pub fn new() -> FS {
let result = FS { base: PathBuf::from("."), seq:Arc::new(Default::default()) };
return result;
}
pub fn set_base(&mut self, new_base:&str) {
self.base = PathBuf::from(new_base);
}
pub fn initialize(&self) {
self.create_staging_directory();
}
pub fn create_staging_directory(&self) {
let staging_dir = self.get_staging_dir();
let _ = std::fs::create_dir_all(staging_dir);
}
pub fn make_bucket(&self,bucket:&str) -> Result<Bucket, Box<dyn Error>> {
if !Bucket::valid_bucket(&bucket) {
return Err(S3FSError::of_kind_and_message(
S3FSErrorKind::InvalidBucketName, bucket).into());
}
if self.get_bucket(&bucket).is_some() {
return Err(S3FSError::of_kind_and_message(
S3FSErrorKind::BucketAlreadyExists,
bucket).into());
}
let mut base_path = PathBuf::from(&self.base);
base_path.push(&bucket);
let mb_result = std::fs::create_dir_all(base_path);
if mb_result.is_err() {
return Err(S3FSError::of_kind_and_message(S3FSErrorKind::InputOutput,
&format!("Failed to create directory {}: `{}`",
bucket,
mb_result.err().unwrap())).into());
}
let result = self.get_bucket(bucket);
if result.is_none() {
return Err(S3FSError::of_kind_and_message(S3FSErrorKind::InputOutput,
"Bucket did not exist after creation").into());
}
return Ok(result.unwrap());
}
pub fn get_staging_dir(&self) -> String {
let mut target = self.base.clone();
target.push("_staging");
return target.to_str().unwrap().to_string();
}
pub fn get_bucket(&self, name:&str) -> Option<Bucket>{
if !Bucket::valid_bucket(name) {
return None;
}
let mut path_buf = PathBuf::from(&self.base);
path_buf.push(name);
let meta =
std::fs::metadata(path_buf.clone());
if meta.is_err() {
return None;
}
let meta = meta.unwrap();
if !meta.is_dir() {
return None;
}
let result = Bucket::from(path_buf.to_str().unwrap(),
&self.get_staging_dir(), Arc::clone(&self.seq));
if result.is_err() {
return None;
}
return Some(result.unwrap());
}
pub fn get_all_buckets(&self) -> HashMap<String, Bucket> {
let base_path: PathBuf = self.base.clone();
let mut staging_dir = base_path.clone();
staging_dir.push("_staging");
let _ = std::fs::create_dir_all(&staging_dir);
let mut result = HashMap::new();
let read_result = std::fs::read_dir(self.base.clone());
if read_result.is_err() {
return result;
}
let dir_entries = read_result.unwrap();
for next in dir_entries {
if next.is_err() {
continue;
}
let next = next.unwrap();
if !next.path().is_dir() {
continue;
}
let name = next.file_name();
let name = name.to_str();
if name.is_none() {
continue;
}
let name = name.unwrap();
if !Bucket::valid_bucket(name) {
continue;
}
let staging_dir_local = staging_dir.clone();
let bucket = Bucket::from(next.path().to_str().unwrap(),
staging_dir_local.to_str().unwrap(),
Arc::clone(&self.seq));
if bucket.is_err() {
continue;
}
let bucket = bucket.unwrap();
result.insert(name.to_string(), bucket);
}
return result;
}
}
#[derive(Debug, Clone)]
pub struct Bucket {
base: PathBuf,
base_path:String,
staging: PathBuf,
seq:Arc<Sequence>
}
#[derive(Deserialize, Serialize, Debug)]
pub struct FileMeta {
pub etag: String,
pub size: usize,
}
impl Bucket {
pub const META_SUFFIX:&str = "@@META@@";
pub fn from(path: &str, staging_path:&str, seq:Arc<sequencing::Sequence>) -> Result<Bucket, Box<dyn Error>> {
let pathb = PathBuf::from(path);
let pathbx = pathb.canonicalize()?;
let pathbxc = pathbx.clone();
let mut path_base = pathbxc.as_path().to_str().unwrap().to_string();
if !path_base.ends_with("/") {
path_base.push_str("/");
}
Ok(Bucket {
base: pathbx,
base_path: path_base,
staging: PathBuf::from(staging_path),
seq
})
}
fn get_sibling(&self, obj:S3Object) -> Vec<S3Object> {
let parent = obj.target.parent();
if parent.is_none() {
return vec!();
}
let parent = parent.unwrap().to_path_buf();
let mut parent_key = obj.key;
if parent_key.ends_with("/") {
parent_key = parent_key.trim_end_matches("/").into();
}
let last_slash = parent_key.rfind("/");
match last_slash {
Some(idx) => {
let self_key_str = &parent_key[0..idx];
parent_key = self_key_str.into();
},
None =>{
parent_key = "".into();
}
}
//self_key = self_key.trim_end_matches("").into();
let parent_obj = S3Object {
bucket:self,
target: parent,
kind: FileType::Directory,
key: parent_key,
};
return self.get_children(parent_obj);
}
fn get_children<'a>(&'a self, parent:S3Object) -> Vec<S3Object<'a>>{
let path = parent.target.clone();
let parent_key = parent.key;
let dir_iter = std::fs::read_dir(path.clone());
if dir_iter.is_err() {
return vec!();
}
let dir_iter = dir_iter.unwrap();
let mut result = Vec::new();
for next in dir_iter {
if next.is_err() {
continue;
}
let next = next.unwrap();
let name = next.file_name();
let name = name.to_str();
if name.is_none() {
// invalid name
continue;
}
let name = name.unwrap();
if name == "." || name == ".." || name.ends_with(Bucket::META_SUFFIX) {
continue;
}
let meta = next.metadata();
if meta.is_err() {
continue;
}
let meta = meta.unwrap();
if meta.is_dir() {
let mut this_path = path.clone();
this_path.push(name);
let entry = S3Object {
bucket: self,
target: this_path,
kind: FileType::Directory,
key: format!("{parent_key}/{name}")
};
result.push(entry);
} else if meta.is_file() {
let mut this_path = path.clone();
this_path.push(name);
let entry = S3Object {
bucket: self,
target: this_path,
kind: FileType::File,
key: format!("{parent_key}/{name}")
};
result.push(entry);
}
}
result.sort_by(|a, b| a.target.partial_cmp(&b.target).unwrap());
return result;
}
fn collect_children<'a, 'b>(&'b self, obj:S3Object, result:&mut Vec<S3Object<'a>>)
where 'b : 'a
{
let children = self.get_children(obj);
for next_child in children {
let child:S3Object = next_child.clone();
result.push(child);
if next_child.is_dir() {
self.collect_children(next_child, result);
}
}
}
pub fn list_objects(&self, prefix:&str, after:&str, limit: usize) -> Vec<S3Object> {
if !Bucket::valid_key(prefix) {
return vec!();
}
let target = self.file_for_key(prefix);
let file_name = target.get_short_name();
let parent = target.target.parent();
let mut result = Vec::new();
if parent.is_none() {
return vec!();
}
if target.is_dir() {
result.push(target.clone());
self.collect_children(target, &mut result);
} else {
let siblings = self.get_sibling(target);
for next in siblings {
let next_file_name = next.get_short_name();
if next_file_name.starts_with(&file_name) {
if next.is_dir() {
self.collect_children(next, &mut result);
} else {
result.push(next.clone());
}
}
}
}
return result.into_iter()
.filter(|x| x.kind == FileType::File)
.filter(|x| x.has_meta())
.filter(|x| x.object_key() > after)
.take(limit)
.collect();
}
pub fn list_objects_short(&self, key:&str, after:&str, limit:usize) -> Vec<S3Object> {
if !Bucket::valid_key(key) {
return vec!();
}
let mut path = self.base.clone();
path.push(key);
let target = self.file_for_key(key);
let file_name = target.get_short_name();
let parent = target.target.parent();
let mut result = Vec::new();
if parent.is_none() {
return vec!();
}
if target.is_dir() {
result.push(target.clone());
let children = self.get_children(target);
for next in children {
result.push(next);
}
} else {
let siblings = self.get_sibling(target);
for next in siblings {
let next_file_name = next.get_short_name();
if next_file_name.starts_with(&file_name) {
result.push(next);
}
}
}
return result.into_iter()
.filter(|x| x.kind == FileType::Directory || (x.kind == FileType::File && x.has_meta()))
.filter(|x| x.object_key() > after)
.take(limit)
.collect();
}
pub fn gen_upload_id(&self) -> String {
let s: String = rand::thread_rng()
.sample_iter(&Alphanumeric)
.take(10)
.map(char::from)
.collect();
return format!("upload_{s}");
}
pub fn get_creation_time(&self) -> Result<String, Box<dyn Error>> {
let meta = std::fs::metadata(&self.base)?;
let created = meta.created()?;
let datetime: DateTime<Utc> = created.into();
// 2009-10-12T17:50:30.000Z
return Ok(format!("{}", datetime.format("%Y-%m-%dT%H:%M:%S%.3f%:z")));
}
pub fn cleanup_upload_id(&self, id:&str) {
let ids = id.to_string();
//target.push(id.to_string());
let mut counter = 0;
loop {
counter +=1;
let mut file_target = self.staging.clone();
file_target.push(format!("{id}_{counter}"));
let mut meta_target = self.staging.clone();
meta_target.push(format!("{}_{}{}", ids, counter, Self::META_SUFFIX));
if meta_target.exists() {
let _ = std::fs::remove_file(meta_target);
}
if file_target.exists() {
let _ = std::fs::remove_file(file_target);
} else {
if counter > 10 {
break;
}
}
if counter > 1000 {
break;
}
}
}
pub fn valid_bucket(bucket:&str) -> bool {
if bucket == "_staging" {
return false;
}
let reg = "^[a-zA-Z0-9.\\-_]{1,255}$";
let pattern = Regex::new(reg).unwrap();
if pattern.is_match(bucket) {
return true;
}
return false;
}
fn valid_key(key:&str) -> bool {
let reg = ".*[\\\\><|:&\\$].*";
let pattern = Regex::new(reg).unwrap();
if pattern.is_match(key) {
return false;
}
if key.contains("/./") || key.contains("/../") || key.contains("//") {
return false;
}
if key.starts_with("/") {
return false;
}
if key.ends_with(Self::META_SUFFIX) {
return false;
}
return true;
}
pub fn list_all_objects(&self) -> Vec<S3Object> {
let mut files = Vec::new();
Self::scan(&self.base, &mut files);
let mut result = Vec::new();
let base = &self.base_path;
for item in files {
let full_path = String::from(item.to_str().unwrap());
let key = full_path.strip_prefix(base).unwrap();
if !full_path.ends_with(Self::META_SUFFIX) {
result.push(
S3Object {
bucket:&self,
target: item,
kind: FileType::File,
key:String::from(key)
}
);
}
}
return result;
}
fn scan(directory:&PathBuf, result: &mut Vec<PathBuf>) {
let paths = fs::read_dir(directory.as_path());
if let Err(_) = paths {
return;
}
let paths = paths.unwrap();
for next in paths {
match next {
Ok(dir) => {
let next_path = dir.path();
let meta = fs::metadata(next_path.as_path());
match meta {
Err(_) => continue,
Ok(metadata) => {
if metadata.is_dir() {
// directory
Self::scan(&next_path, result);
} else if metadata.is_file(){
result.push(next_path);
}
}
}
},
Err(_) => {
continue;
}
}
}
}
pub fn delete_object(&self, key:&str) -> bool {
if !Self::valid_key(key) {
return false;
}
let full_path = format!("{}{}", &self.base_path, &key);
let full_meta_path = format!("{}{}{}", &self.base_path, &key, Self::META_SUFFIX);
let pb = PathBuf::from(full_path);
let pbc = pb.clone();
let _ = std::fs::remove_file(&pb);
let _ = std::fs::remove_file(&full_meta_path);
let _ = std::fs::remove_dir(&pb);
if key.contains("/") {
let parent = pbc.parent().unwrap();
let _ = std::fs::remove_dir(parent);
}
return true;
}
pub fn read_object(&self, key:&str) -> Result<std::fs::File, Box<dyn Error>> {
if !Self::valid_key(key) {
return Err(S3FSError::of_kind_and_message(
S3FSErrorKind::InvalidObjectKey, key).into());
}
let file = self.file_for_key(key);
let path = file.target;
return Ok(std::fs::File::open(path.as_path())?);
}
pub fn get_object_meta(&self, key:&str) -> Result<FileMeta, Box<dyn Error>> {
if !Self::valid_key(key) {
return Err(S3FSError::of_kind_and_message(S3FSErrorKind::InvalidObjectKey, key).into());
}
let meta_file = self.meta_file_for_key(key);
let path = meta_file.target;
let mut input_file = std::fs::File::open(path.as_path())?;
let mut buffer = [0u8; 4096];
let mut nread:usize = 0;
loop {
let rr = input_file.read(&mut buffer[nread..])?;
if rr == 0 {
break;
}
nread += rr;
}
let string = std::str::from_utf8(&buffer[0..nread]).unwrap();
let result:FileMeta = serde_json::from_str(string)?;
return Ok(result);
}
pub async fn merge_part(&self, key:&str, upload_id:&str) -> Result<(usize, String), Box<dyn Error>> {
let staging_path = self.staging.clone();
let mut counter = 0;
let dest = self.file_for_key(key);
dest.ensure_parent();
let mut dest_file = tokio::fs::File::create(dest.target).await?;
let mut total:usize = 0;
let mut hasher = Md5::new();
loop {
counter = counter + 1;
let mut next_file_p = staging_path.clone();
next_file_p.push(format!("{}_{}", upload_id, counter));
let mut next_meta_file_p = staging_path.clone();
next_meta_file_p.push(format!("{}_{}{}", upload_id, counter, Self::META_SUFFIX));
if next_file_p.exists() {
let mut next_f = tokio::fs::File::open(next_file_p.as_path()).await?;
let copied = Self::merge_into_with_hash(&mut dest_file, &mut next_f, &mut hasher).await?;
total += copied;
let _ = std::fs::remove_file(next_file_p);
let _ = std::fs::remove_file(next_meta_file_p);
} else {
break;
}
}
let hash = format!("{:x}", hasher.finalize());
let meta_dest_f = self.meta_file_for_key(key);
let mut meta_dest = std::fs::File::create(meta_dest_f.path())?;
Self::save_meta(&mut meta_dest, FileMeta {
etag: hash.clone(),
size: total
})?;
return Ok((total, hash));
}
async fn merge_into_with_hash(dst:&mut tokio::fs::File, src:&mut tokio::fs::File, hasher:&mut Md5) ->Result<usize, Box<dyn Error>> {
let mut buf = [0u8; 4096];
let mut copied = 0;
loop {
let rr = src.read(&mut buf).await?;
if rr == 0 {
break;
}
hasher.update(&buf[..rr]);
let wr = dst.write(&mut buf[..rr]).await?;
if wr != rr {
return Err(Box::new(S3Error::io_error()))
}
copied += wr;
}
return Ok(copied);
}
pub async fn save_object_part<D>(&self, _key:&str, upload_id:&str, part_number:u32,reader: &mut D, chunked:bool) -> Result<(usize,String), Box<dyn Error>>
where D:tokio::io::AsyncRead + Unpin
{
let mut path = self.staging.clone();
path.push(format!("{upload_id}_{part_number}"));
let object_path = path;
let mut path_after = self.staging.clone();
path_after.push(format!("{upload_id}_{part_number}"));
let mut meta_path = self.staging.clone();
meta_path.push(format!("{}_{}{}", upload_id, part_number, Self::META_SUFFIX));
let object_meta_path = meta_path;
let mut tmp_file = tokiofs::File::create(object_path.clone()).await.unwrap();
let mut tmp_meta = std::fs::File::create(object_meta_path.clone()).unwrap();
let (copied, md5) = copy_chunk_to_raw(reader, &mut tmp_file, chunked).await?;
Self::save_meta(&mut tmp_meta, FileMeta{etag:md5.clone(), size:copied})?;
return Ok((copied, md5));
}
pub async fn save_object<D>(&self, key:&str, reader: &mut D, chunked:bool) -> Result<(usize, String), Box<dyn Error>>
where D:tokio::io::AsyncRead + Unpin
{
if !Self::valid_key(key) {
return Err(S3FSError::of_kind_and_message(
S3FSErrorKind::InvalidObjectKey, key).into())
}
let file = self.file_for_key(key);
let meta = self.meta_file_for_key(key);
let seq = self.seq.next();
let staging = self.staging.clone();
let staging = staging.as_path().to_str().unwrap();
let object_tmp_name = format!("{staging}/{seq}");
let object_tmp_meta_name = format!("{}/{}{}", staging, seq, Self::META_SUFFIX);
let mut tmp_file = tokiofs::File::create(object_tmp_name.clone()).await.unwrap();
let mut tmp_meta = std::fs::File::create(object_tmp_meta_name.clone()).unwrap();
file.ensure_parent();
let (copied_size, md5) = copy_chunk_to_raw(
reader, &mut tmp_file, chunked).await?;
Self::save_meta(&mut tmp_meta, FileMeta{etag:md5.clone(), size:copied_size})?;
let _ = std::fs::rename(object_tmp_name, file.target);
let _ = std::fs::rename(object_tmp_meta_name, meta.target);
return Ok((copied_size, md5));
}
fn save_meta(out:&mut std::fs::File, meta:FileMeta) -> Result<usize, Box<dyn Error>>
{
let to_write = serde_json::to_string(&meta)?;
let written = out.write(to_write.as_bytes())?;
return Ok(written);
}
fn try_canicalize(input:PathBuf) ->PathBuf {
let can = input.clone().canonicalize();
if can.is_err() {
return input;
}
return can.unwrap();
}
fn file_for_key(&self, key:&str)->S3Object {
let full_path = format!("{}{}", self.base_path, key);
let path_buf = Self::try_canicalize(PathBuf::from(full_path));
let actual_key = path_buf.to_str().unwrap().to_string();
let base_string = self.base_path.clone();
let mut key_short:String = "".into();
if actual_key.len() > base_string.len() {
key_short = (&actual_key[base_string.len()..]).to_string();
}
let meta = std::fs::metadata(&path_buf);
let mut obj_type = FileType::Uninitialized;
match meta {
Err(_) => {
},
Ok(some_meta) => {
if some_meta.is_dir() {
obj_type = FileType::Directory;
} else if some_meta.is_file() {
obj_type = FileType::File;
}
}
}
return S3Object {
bucket: &self,
target:path_buf,
kind: obj_type,
key: key_short
}
}
pub fn get_object_by_key(&self, key:&str) -> Option<S3Object> {
let file = self.file_for_key(key);
if !file.exists() {
return None;
}
if !file.meta().unwrap().is_file() {
return None;
}
return Some(file);
}
fn meta_file_for_key(&self, key:&str) -> S3Object {
let full_path = format!("{}{}{}", self.base_path, key, Self::META_SUFFIX);
let path_buf = Self::try_canicalize(PathBuf::from(full_path));
let meta = std::fs::metadata(&path_buf);
let mut obj_type = FileType::Uninitialized;
match meta {
Err(_) => {
},
Ok(some_meta) => {
if some_meta.is_dir() {
obj_type = FileType::Directory;
} else if some_meta.is_file() {
obj_type = FileType::File;
}
}
}
return S3Object {
bucket: &self,
target: path_buf,
kind: obj_type,
key: String::from(key)
}
}
pub fn list_objects_old(&self, prefix:&str) -> Vec<S3Object> {
let mut result = Vec::new();
let all = self.list_all_objects();
for next in all {
if next.key.starts_with(prefix) {
result.push(next)
}
}
return result;
}
pub fn list_objects_short_old(&self, prefix:&str) -> Vec<S3Object> {
return vec!();
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum FileType {
Uninitialized,
File,
Directory
}
#[derive(Debug, Clone)]
pub struct S3Object<'a> {
bucket: &'a Bucket,
target: PathBuf,
kind: FileType,
key: String
}
impl<'a> S3Object<'a> {
pub async fn open_for_read(&self) -> Result<tokiofs::File, Box<dyn Error>> {
let meta = self.meta()?;
if meta.is_dir() {
return Err(S3FSError::of_kind_and_message(S3FSErrorKind::InvalidObjectKey,
"read target is folder").into());
}
let open_result = tokiofs::File::open(self.target.clone()).await?;
return Ok(open_result);
}
pub fn kind(&self) -> &FileType {
&self.kind
}
pub fn read_as_string(&self) -> Result<String, Box<dyn Error>>{
let bytes = self.read_as_bytes()?;
return Ok(String::from_utf8(bytes)?);
}
pub fn read_as_bytes(&self) ->Result<Vec<u8>, Box<dyn Error>> {
let mut file = std::fs::File::open(self.target.clone())?;
let meta = file.metadata()?;
if meta.len() > 10 * 1024 * 1024 {
return Err(Box::new(S3FSError::of_kind_and_message(S3FSErrorKind::ObjectTooLarge,
"Object larger than 10MiB can't be read as a whole!")));
}
let mut buf = Vec::new();
let _ = file.read_to_end(&mut buf)?;
return Ok(buf);
}
pub fn is_dir(&self) -> bool {
let meta = self.meta();
if meta.is_err() {
return false;
}
return meta.unwrap().is_dir();
}
pub fn is_file(&self) -> bool {
let meta = self.meta();
if meta.is_err() {
return false;
}
return meta.unwrap().is_file();
}
pub fn exists(&self) -> bool {
let meta = self.meta();
if meta.is_err() {
return false;
}
return true;
}
pub fn len(&self) -> Result<u64, Box<dyn Error>> {
let meta = self.meta()?;
Ok(meta.len())
}
pub fn bucket(&self) -> &Bucket {
return self.bucket;
}
pub fn metafile(&self) -> S3Object {
return self.bucket.meta_file_for_key(&self.key);
}
pub fn checksum(&self) -> Result<String, Box<dyn Error>> {
let meta = self.bucket.get_object_meta(&self.key)?;
return Ok(meta.etag);
}
pub fn last_modified_formatted(&self) -> Result<String, Box<dyn Error>> {
let modified = self.meta()?.modified()?;
let datetime: DateTime<Utc> = modified.into();
// 2009-10-12T17:50:30.000Z
return Ok(format!("{}", datetime.format("%Y-%m-%dT%H:%M:%S%.3f%:z")));
}
pub fn object_key(&self) -> &str {
return &self.key;
}
pub fn get_short_name(&self) -> String {
return self.target.file_name().unwrap().to_str().unwrap().to_string();
}
pub fn object_key_encoded(&self) -> String {
let to_encode = &self.key;
return urlencoding::encode(to_encode).to_string();
}
pub fn ensure_parent(&self) -> bool {
let parent = self.target.parent().unwrap();
let created = std::fs::create_dir_all(parent);
if created.is_err() {
return false;
}
return true;
}
fn type_for(path: &PathBuf) -> Option<FileType> {
let meta = std::fs::metadata(path);
if meta.is_err() {
return None;
}
let meta = meta.unwrap();
if meta.is_dir() {
return Some(FileType::Directory);
}
if meta.is_file() {
return Some(FileType::File);
}
return None;
}
pub fn meta(&self) -> Result<Metadata, Box<dyn Error>> {
Ok(metadata(self.target.as_path())?)
}
pub fn has_meta(&self) -> bool {
let meta_file = self.metafile();
return meta_file.is_file();
}
pub fn full_path(&self) -> Option<String> {
let target_str = self.target.to_str()?;
return Some(target_str.to_string());
}
pub fn path(&self) -> &PathBuf {
&self.target
}
pub async fn format(&self) -> String {
let object_key = self.object_key();
if self.kind == FileType::Directory {
let mut object_key = object_key.clone().to_string();
if !object_key.ends_with("/") {
object_key.push_str("/");
}
return format!(r#"<CommonPrefixes><Prefix>{object_key}</Prefix></CommonPrefixes>"#);
}
let object_last_modified = self.last_modified_formatted().unwrap();
let mut object_etag = self.checksum();
if object_etag.is_err() {
object_etag = Ok("".into());
}
let object_etag = object_etag.unwrap();
let object_size = self.len().unwrap();
let entry_xml = format!(r#"<Contents>
<Key>{object_key}</Key>
<LastModified>{object_last_modified}</LastModified>
<ETag>"{object_etag}"</ETag>
<Size>{object_size}</Size>
<StorageClass>STANDARD</StorageClass>
</Contents>"#);
return entry_xml;
}
}

9
src/ioutil.rs Normal file
View File

@@ -0,0 +1,9 @@
use crate::fsapi::Bucket;
use std::error::Error;
pub fn delete_file_and_meta(path: &str) -> Result<(), Box<dyn Error>> {
let meta = format!("{}{}", path, Bucket::META_SUFFIX);
let _ = std::fs::remove_file(path);
let _ = std::fs::remove_file(meta);
return Ok(());
}

650
src/main.rs Normal file
View File

@@ -0,0 +1,650 @@
pub mod fsapi;
pub mod sequencing;
pub mod cachedfile;
pub mod s3error;
pub mod s3resp;
pub mod chunk_to_raw;
pub mod request_guards;
pub mod ioutil;
pub mod states;
use asyncio_utils::LimitSeekerReader;
use lazy_static::__Deref;
use regex::Regex;
use states::CliArg;
use request_guards::{ChunkedEncoding, CreateBucket, AbortMultipartUpload, AuthCheckPassed, CompleteMultipartUpload, CreateMultipartUpload, GetObject};
use s3resp::S3Response;
use cachedfile::CachedFile;
use rocket::Data;
use rocket::response::Debug;
use ubyte::ToByteUnit;
use std::error::Error;
use std::io::SeekFrom;
use fsapi::{FS, Bucket};
#[macro_use]
extern crate rocket;
extern crate log;
#[allow(unused)]
use log::{info, debug, warn, error, trace};
use rocket::State;
#[derive(FromForm, Debug)]
struct ListQuery {
delimiter: Option<String>,
#[allow(unused)]
#[field(name = "encoding-type")]
encoding_type: Option<String>,
marker:Option<String>,
#[field(name = "max-keys")]
max_keys:Option<usize>,
prefix: Option<String>,
#[field(name="list-type")]
list_type: Option<i32>,
#[field(name="continuation-token")]
continuation_token: Option<String>
}
#[get("/<bucket>?<query..>", rank=99)]
async fn list_objects_ext(bucket:String, query:ListQuery, #[allow(unused)] marker:request_guards::ListObjects, fs:&State<FS>)
-> Option<String> {
let action = "list_objects_ext";
info!("{action}: bucket `{bucket}`, query `{query:?}`");
return list_objects(bucket, query, fs).await;
}
#[get("/<bucket>?<query..>", rank=98)]
async fn list_objects_ext_v2(bucket:String, query:ListQuery, #[allow(unused)] marker:request_guards::ListObjectsV2, fs:&State<FS>)
-> Option<String> {
let action = "list_objects_ext_v2";
info!("{action}: bucket `{bucket}`, query `{query:?}`");
return list_objects_v2(bucket, query, fs).await;
}
fn list_bucket<'a>(bucket:&'a Bucket, prefix:&str, delimiter:&Option<String>, after:&str, limit:usize) -> Vec<fsapi::S3Object<'a>> {
match delimiter {
Some(_str) => {
bucket.list_objects_short(prefix, after, limit)
},
None => {
bucket.list_objects(prefix, after, limit)
}
}
}
async fn list_objects_v2(bucket:String, query:ListQuery, fs:&State<FS>) -> Option<String> {
let action = "list_objects_v2";
let bucket_arg = bucket.clone();
let bucket = fs.get_bucket(&bucket);
match bucket {
None => {
return None;
}
_ => {
}
}
#[allow(unused)]
let list_type = query.list_type.unwrap_or(-1);
let bucket = bucket.unwrap();
let actual_prefix:String = query.prefix.unwrap_or(String::from(""));
let ct = query.continuation_token.unwrap_or(String::from(""));
let max_keys = query.max_keys.unwrap_or(100);
let objects:Vec<fsapi::S3Object> = list_bucket(&bucket, &actual_prefix, &query.delimiter, &ct, max_keys);
let mut is_truncated = false;
let delimiter = query.delimiter.unwrap_or(String::from(""));
if delimiter != "/" && delimiter != "" {
panic!("delimeter must be / or empty");
}
let mut objects_string = String::new();
info!("{action}: returned {} objects", objects.len());
let mut next_ct = "".to_string();
for next in objects.iter() {
let entry_xml = next.format().await;
objects_string.push_str(&entry_xml);
}
if objects.len() > 0 {
let last_obj = objects.last().unwrap();
next_ct = last_obj.object_key().to_string();
is_truncated = true;
next_ct = format!("<NextContinuationToken>{next_ct}</NextContinuationToken>");
}
let copy_count = objects.len();
let result = format!(r###"<?xml version="1.0" encoding="UTF-8"?>
<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Name>{bucket_arg}</Name>
<Prefix>{actual_prefix}</Prefix>
<KeyCount>{copy_count}</KeyCount>
<ContinuationToken>{ct}</ContinuationToken>
{next_ct}
<MaxKeys>{max_keys}</MaxKeys>
<Delimiter>{delimiter}</Delimiter>
<IsTruncated>{is_truncated}</IsTruncated>
{objects_string}
<EncodingType>url</EncodingType>
</ListBucketResult>
"###);
println!("{result}");
return Some(result);
}
async fn list_objects(bucket:String, query:ListQuery, fs:&State<FS>) -> Option<String> {
let action = "list_objects";
let bucket_arg = bucket.clone();
let bucket = fs.get_bucket(&bucket);
match bucket {
None => {
return None;
}
_ => {
}
}
#[allow(unused)]
let list_type = query.list_type.unwrap_or(-1);
let bucket = bucket.unwrap();
let actual_prefix:String = query.prefix.unwrap_or(String::from(""));
let marker = query.marker.unwrap_or(String::from(""));
let max_keys = query.max_keys.unwrap_or(100);
let objects:Vec<fsapi::S3Object> = list_bucket(&bucket, &actual_prefix, &query.delimiter, &marker, max_keys);
let mut is_truncated = false;
let delimiter = query.delimiter.unwrap_or(String::from(""));
if delimiter != "/" && delimiter != "" {
panic!("delimeter must be / or empty");
}
let mut objects_string = String::new();
info!("{action}: returned {} objects", objects.len());
let mut next_marker = "".to_string();
for (index, next) in objects.iter().enumerate() {
let entry_xml = next.format().await;
objects_string.push_str(&entry_xml);
}
if objects.len() > 0 {
let last_obj = objects.last().unwrap();
next_marker = last_obj.object_key().to_string();
is_truncated = true;
next_marker = format!("<NextMarker>{next_marker}</NextMarker>");
}
let result = format!(r###"<?xml version="1.0" encoding="UTF-8"?>
<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Name>{bucket_arg}</Name>
<Prefix>{actual_prefix}</Prefix>
<Marker>{marker}</Marker>
{next_marker}
<MaxKeys>{max_keys}</MaxKeys>
<Delimiter>{delimiter}</Delimiter>
<IsTruncated>{is_truncated}</IsTruncated>
{objects_string}
<EncodingType>url</EncodingType>
</ListBucketResult>
"###);
println!("{result}");
return Some(result);
}
#[get("/<bucket>?<location>", rank=4)]
async fn get_bucket_location(bucket:String, #[allow(unused)] marker:request_guards::GetBucketLocation, #[allow(unused)] location:String) -> Option<String> {
let action = "get_bucket_location";
info!("{action}: bucket: `{bucket}`. using hardcoded `ap-southeast-1`");
return Some(String::from(
r#"<?xml version="1.0" encoding="UTF-8"?>
<LocationConstraint xmlns="http://s3.amazonaws.com/doc/2006-03-01/">ap-southeast-1</LocationConstraint>
"#
));
}
#[allow(non_snake_case)]
#[put("/<bucket>/<key..>?<partNumber>&<uploadId>", data = "<data>", rank=3)]
async fn put_object_part<'a>(bucket:String, key:std::path::PathBuf, partNumber:u32, uploadId:String, chunked:ChunkedEncoding,
fs:&State<FS>, #[allow(unused)] marker:request_guards::PutObjectPart, data: rocket::data::Data<'a>)
-> Result<s3resp::S3Response<'a>, Debug<Box<dyn Error>>> {
let action = "put_object_part";
let key = key.to_str().unwrap();
info!("{action}: object `{bucket}/{key}` uploadId `{uploadId}`, partNumber `{partNumber}`, chunked `{chunked:?}`");
let bucket_obj = fs.get_bucket(&bucket);
if bucket_obj.is_none() {
info!("{action}: bucket `{bucket}` not found");
return Ok(S3Response::not_found())
}
let bucket_obj = bucket_obj.unwrap();
use rocket::data::ToByteUnit;
let mut data_stream = data.open(5.gigabytes());
let write_result = bucket_obj.save_object_part(&key, &uploadId, partNumber, &mut data_stream, chunked.0).await;
if write_result.is_err() {
info!("{action}: io error: {}", write_result.err().unwrap());
return Ok(S3Response::server_error());
}
let wr = write_result.unwrap();
info!("{action}: `{bucket}/{key}` `{uploadId}@{partNumber}` written {} bytes, md5 is `{}`", wr.0, wr.1);
//data_stream.stream_to(writer).await;
let mut ok = S3Response::ok();
ok.add_header("ETag", &format!("{}", wr.1));
return Ok(ok);
}
#[put("/<bucket>/<key..>", data = "<data>", rank=2)]
async fn put_object<'a>(bucket:String, key:std::path::PathBuf,
chunked:ChunkedEncoding,
fs:&State<FS>,
#[allow(unused)]
marker:request_guards::PutObject,
data: rocket::data::Data<'a>) -> Result<s3resp::S3Response<'a>, Debug<Box<dyn Error>>> {
let action = "put_object";
let key = key.to_str().unwrap();
let bucket_obj = fs.get_bucket(&bucket);
if bucket_obj.is_none() {
warn!("{action}: bucket {bucket} does not exist");
return Ok(S3Response::not_found())
}
let bucket = bucket_obj.unwrap();
use rocket::data::ToByteUnit;
let mut data_stream = data.open(5.gigabytes());
//test(&data_stream);
let write_result = bucket.save_object(&key, &mut data_stream, chunked.0).await;
if write_result.is_err() {
warn!("{action}: io error: {}", write_result.err().unwrap());
return Ok(S3Response::server_error());
}
let wr = write_result.unwrap();
info!("{action}: `{key}` written {} bytes, md5 is `{}`", wr.0, wr.1);
//data_stream.stream_to(writer).await;
let mut ok = S3Response::ok();
ok.add_header("ETag", &format!("{}", wr.1));
return Ok(ok);
}
#[delete("/<bucket>?<uploadId>", rank=2)]
async fn cleanup_multipart_upload<'r>(bucket:String,
#[allow(unused)]
marker:AbortMultipartUpload,
fs:&State<FS>,
#[allow(non_snake_case)]
uploadId:String) -> Result<S3Response<'r>, Debug<Box<dyn Error>>> {
let action = "cleanup_multipart_upload";
let bucket_obj = fs.get_bucket(&bucket);
if bucket_obj.is_none() {
warn!("{action}: bucket `{bucket}` not found");
return Err(Debug(Box::new(s3error::S3Error::not_found())));
}
let bucket = bucket_obj.unwrap();
bucket.cleanup_upload_id(&uploadId);
info!("{action}: cleaned up upload id {uploadId}");
Ok(S3Response::ok())
}
#[allow(non_snake_case)]
#[post("/<bucket>/<key_full..>?<uploadId>", data="<data>", rank=5)]
async fn complete_multipart_upload<'a>(bucket:String, key_full:std::path::PathBuf, uploadId:String,
#[allow(unused)] data:rocket::data::Data<'a>, fs:&State<FS>,
#[allow(unused)] marker:CompleteMultipartUpload
) -> Result<S3Response<'a>, Debug<Box<dyn Error>>> {
let action = "complete_multipart_upload";
let key = key_full.to_str().unwrap().to_string();
let bucket_obj = fs.get_bucket(&bucket);
if bucket_obj.is_none() {
warn!("{action}: bucket `{bucket}` not found");
return Ok(S3Response::not_found());
}
let bucket_obj = bucket_obj.unwrap();
let (size, hash) = bucket_obj.merge_part(&key, &uploadId).await?;
info!("{action}: merged `{uploadId}` into `{bucket}/{key}`, {size}+`{hash}`");
let response = format!(r#"<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUploadResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Location>http://127.0.0.1:8000/{bucket}/{key}</Location>
<Bucket>{bucket}</Bucket>
<Key>{key}</Key>
<ETag>"{hash}"</ETag>
</CompleteMultipartUploadResult>"#);
let mut ok = S3Response::ok();
ok.body(&response);
return Ok(ok);
}
#[post("/<bucket>/<key_full..>?<uploads>", data="<data>", rank=99)]
async fn create_multipart_upload<'a>(bucket:String, fs:&State<FS>, key_full:std::path::PathBuf,
#[allow(unused)] uploads:String,
#[allow(unused)] marker: CreateMultipartUpload,
#[allow(unused)] data:Data<'a>) -> Result<S3Response<'a>, Debug<Box<dyn Error>>> {
let action = "create_multipart_upload";
let key = key_full.to_str().unwrap().to_string();
let bucket_name = bucket.clone();
info!("{action}: object: `{bucket}/{key}`");
let bucket_obj = fs.get_bucket(&bucket);
if bucket_obj.is_none() {
warn!("{action}: bucket `{bucket}` not found");
return Err(Debug(Box::new(s3error::S3Error::not_found())));
}
let bucket = bucket_obj.unwrap();
let part = bucket.gen_upload_id();
info!("{action}: uploadId -> `{part}`");
let result = format!(r###"<?xml version="1.0" encoding="UTF-8"?>
<InitiateMultipartUploadResult>
<Bucket>{bucket_name}</Bucket>
<Key>{key}</Key>
<UploadId>{part}</UploadId>
</InitiateMultipartUploadResult>"###);
let mut ok = S3Response::ok();
ok.body(&result);
return Ok(ok);
}
#[get("/<bucket>/<key_full..>")]
async fn get_object(bucket:String,
#[allow(unused)]
marker: GetObject,
key_full:std::path::PathBuf, fs:&State<FS>, range:request_guards::RangeHeader) -> Option<CachedFile> {
let action = "get_object";
info!("{action}: bucket: `{bucket}`, key: `{key_full:?}`, range: `{range:?}`");
let key = key_full.to_str().unwrap().to_string();
let bucket_obj = fs.get_bucket(&bucket);
if bucket_obj.is_none() {
warn!("{action}: bucket `{bucket}` not found");
return None;
}
let bucket_obj = bucket_obj.unwrap();
let obj = bucket_obj.get_object_by_key(&key);
if obj.is_none() {
warn!("{action}: object `{bucket}/{key}` not found");
return None;
}
let obj = obj.unwrap();
let path = obj.path().as_path();
let mut tokiof = tokio::fs::File::open(path).await.unwrap();
//return Some(NamedFile::open(obj.path().as_path().to_str().unwrap());
let etag = obj.checksum().unwrap();
let last_modified = obj.meta().unwrap().modified().unwrap();
let file_size = obj.len().unwrap();
let mut content_length: usize = file_size as usize;
let begin = range.begin;
let end = range.end;
if begin.is_some() {
use tokio::io::AsyncSeekExt;
let seek_target = begin.unwrap();
let seek_result = tokiof.seek(SeekFrom::Start(seek_target as u64)).await;
match seek_result {
Err(some_err) => {
error!("{action}: seek to {seek_target} failed: {some_err:?}");
return None;
},
Ok(_) => {}
}
if end.is_some() {
// x -y range
content_length = end.unwrap() - begin.unwrap() + 1;
} else {
content_length = file_size as usize - begin.unwrap() as usize;
}
}
info!("{action}: delegating to `LimitedReader` with content-length -> `{content_length}`");
let reader = LimitSeekerReader::new(tokiof, Some(content_length));
return Some(CachedFile {
reader: reader,
etag,
modified_time: last_modified,
size: content_length,
file_name: obj.get_short_name(),
partial: begin.is_some() || end.is_some(),
});
}
fn parse_delete_keys(data:&str) -> Vec<(String, String)> {
let data = data.replace("\r", "");
let data = data.replace("\n", "");
println!("Parsing {data}");
let p = r"<Object>\s*<Key>(\S+?)</Key>\s*(<VersionId>\s*(\S+?)\s*</VersionId>)?\s*</Object>";
let pattern = Regex::new(p).unwrap();
let mut start = 0;
let mut result = vec!();
loop {
let find_result = pattern.captures_at(&data, start);
if find_result.is_none() {
break;
}
let find_result = find_result.unwrap();
let match_full_length = find_result.get(0).unwrap().end();
start = match_full_length;
let key = find_result.get(1).unwrap().as_str();
let version = find_result.get(3);
let mut version_string = "";
if version.is_some() {
version_string = version.unwrap().as_str();
}
result.push((String::from(key), String::from(version_string)));
}
info!("{:?}", result);
return result;
}
#[post("/<bucket>/<empty>?<delete>", data="<data>", rank=4)]
async fn delete_object_multi_alt<'r>(
bucket:String,
#[allow(unused)]
delete:String,
empty:String,
#[allow(unused)]
marker:request_guards::DeleteObjects, fs:&State<FS>,
data: Data<'_>) -> Result<S3Response<'r>, Debug<Box<dyn Error>>> {
return delete_object_multi(bucket, delete, marker, fs, data).await;
}
#[post("/<bucket>?<delete>", data="<data>", rank=3)]
#[allow(non_snake_case)]
async fn delete_object_multi<'r>(bucket:String,
#[allow(unused)]
delete:String,
#[allow(unused)]
marker:request_guards::DeleteObjects, fs:&State<FS>,
data: Data<'_>
) -> Result<S3Response<'r>, Debug<Box<dyn Error>>> {
let action = "delete_object_multi";
let bucket_obj = fs.get_bucket(&bucket);
if bucket_obj.is_none() {
warn!("{action}: bucket `{bucket}` not found");
return Ok(S3Response::not_found());
}
let bucket_obj = bucket_obj.unwrap();
let body = data.open(10.mebibytes()).into_string().await;
if body.is_err() {
warn!("{action}: body parse error: {}", body.err().unwrap());
return Ok(S3Response::invalid_request());
}
let body = body.unwrap();
let body = body.as_str();
let keys_and_versions = parse_delete_keys(body);
let mut result = true;
for (key, version) in &keys_and_versions {
if version != "" {
warn!("{action}: versioning not supported but `{version}` is requested.");
}
let local_result = bucket_obj.delete_object(&key);
if !local_result {
result = false;
}
}
let mut item_string = String::new();
for (key, version) in &keys_and_versions {
if version == "" {
let my_str = format!(r#"<Deleted>
<Key>{key}</Key>
</Deleted>"#, );
item_string.push_str(&my_str);
} else {
let my_str = format!(r#"<Deleted>
<DeleteMarker>false</DeleteMarker>
<Key>{key}</Key>
<VersionId>{version}</VersionId>
</Deleted>"#);
item_string.push_str(&my_str);
}
}
let response = format!(r###"<?xml version="1.0" encoding="UTF-8"?>
<DeleteResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
{item_string}
</DeleteResult>"###);
if result {
info!("{action}: delete `{bucket}/{keys_and_versions:?}` succeeded.");
} else {
warn!("{action}: delete `{bucket}/{keys_and_versions:?}` not found/error.");
}
let mut ok = S3Response::ok();
ok.body(&response);
return Ok(ok);
}
#[delete("/<bucket>/<full_key..>?<versionId>", rank=1)]
#[allow(non_snake_case)]
async fn delete_object<'r>(bucket:String, full_key:std::path::PathBuf,
#[allow(non_snake_case)]
#[allow(unused)]
versionId:Option<String>,
#[allow(unused)]
marker:request_guards::DeleteObject, fs:&State<FS>
) -> Result<S3Response<'r>, Debug<Box<dyn Error>>> {
let action = "delete_object";
let key = full_key.to_str().unwrap().to_string();
let bucket_obj = fs.get_bucket(&bucket);
if bucket_obj.is_none() {
warn!("{action}: bucket `{bucket}` not found");
return Ok(S3Response::not_found());
}
let bucket_obj = bucket_obj.unwrap();
let result = bucket_obj.delete_object(&key);
if result {
info!("{action}: delete `{bucket}/{key}` succeeded.");
return Ok(S3Response::ok());
} else {
warn!("{action}: delete `{bucket}/{key}` not found/error.");
return Ok(S3Response::not_found());
}
}
#[put("/<bucket>", data = "<data>", rank=1)]
async fn create_bucket<'r>(bucket:String,
#[allow(unused)]
marker:CreateBucket, fs:&State<FS>,
#[allow(unused)]
data: rocket::data::Data<'r>
) -> Result<S3Response<'r>, Debug<Box<dyn Error>>> {
let action = "create_bucket";
info!("{action}: creating bucket `{bucket}`");
let cr = fs.make_bucket(&bucket);
match cr {
Ok(_) => {
info!("{action}: creating bucket `{bucket}` ok");
},
Err(some_err) => {
error!("{action}: creating bucket `{bucket}` err: `{some_err}`");
}
}
Ok(S3Response::ok())
}
#[get("/backdoor?<secret_token>")]
async fn backdoor(secret_token:String,
#[allow(unused)] auth:AuthCheckPassed) -> String {
let action = "backdoor";
info!("{action}: backdoor accessed `{secret_token}` revealed");
format!("Secret Revealed: {secret_token}")
}
#[get("/")]
async fn list_all_buckets<'r>(fs:&State<FS>) -> Result<S3Response<'r>, Debug<Box<dyn Error>>> {
let action = "list_all_buckets";
let mut buckets = String::new();
let bucket_list = fs.get_all_buckets();
info!("{action}: found {} buckets", bucket_list.len());
for (name, bucket) in bucket_list.iter() {
let creation_time = bucket.get_creation_time()?;
let next_bucket = format!(r#"
<Bucket>
<CreationDate>{creation_time}</CreationDate>
<Name>{name}</Name>
</Bucket>"#);
buckets.push_str(&next_bucket);
}
let body = format!(r#"<?xml version="1.0" encoding="UTF-8"?>
<ListAllMyBucketsResult><Buckets>{buckets}</Buckets></ListAllMyBucketsResult>"#);
let mut ok = S3Response::ok();
ok.body(&body);
Ok(ok)
}
pub fn setup_logger(log_conf_file: &str) -> Result<(), Box<dyn Error>> {
if log_conf_file.len() > 0 {
log4rs::init_file(log_conf_file, Default::default())?;
println!("logs will be sent according to config file {log_conf_file}");
} else {
println!("no log config file specified.");
use log::LevelFilter;
use log4rs::append::console::ConsoleAppender;
use log4rs::config::{Appender, Config, Root};
let stdout = ConsoleAppender::builder().build();
let config = Config::builder()
.appender(Appender::builder().build("stdout", Box::new(stdout)))
.build(Root::builder().appender("stdout").build(LevelFilter::Info))
.unwrap();
let _ = log4rs::init_config(config)?;
}
Ok(())
}
#[launch]
fn rocket() -> _ {
use clap::Parser;
let args = CliArg::parse();
setup_logger(args.log4rs_config_file()).unwrap();
//trace!("I AM TRACE");
//debug!("I AM DEBUG");
//info!("I AM INFO");
//warn!("I AM WARN");
//error!("I AM ERROR");
let mut base_fs = FS::new();
base_fs.set_base(args.base_dir());
base_fs.initialize();
let figment = rocket::Config::figment()
.merge(("port", args.bind_port()))
.merge(("address", args.bind_address()))
.merge(("log_level", "normal"))
;
rocket::custom(figment)
.manage(args)
.manage(base_fs)
.mount("/", routes![list_objects_ext, list_objects_ext_v2, get_object,
put_object, put_object_part, create_multipart_upload,
cleanup_multipart_upload, list_all_buckets,
create_bucket, delete_object, delete_object_multi, delete_object_multi_alt, get_bucket_location,
complete_multipart_upload, backdoor])
}

409
src/request_guards.rs Normal file
View File

@@ -0,0 +1,409 @@
use std::convert::Infallible;
use lazy_static::lazy_static;
use regex::Regex;
use rocket::{request::FromRequest, http::Status};
#[derive(Debug, Clone)]
pub struct RangeHeader {
pub begin: Option<usize>,
pub end: Option<usize>,
}
lazy_static! {
pub static ref RANGE_START:Regex = Regex::new(r"^bytes=(\d+)-$").unwrap();
pub static ref RANGE_START_END:Regex = Regex::new(r"^bytes=(\d+)-(\d+)$").unwrap();
}
lazy_static! {
pub static ref CREATE_BUCKET_URI_PATTERN:Regex = {
Regex::new(r"^[a-zA-Z0-9.\\-_]{1,255}$").unwrap()
};
}
#[derive(Debug)]
pub struct ChunkedEncoding(pub bool);
impl RangeHeader {
fn match_str_and_get_group(input:&str, r:&Regex) -> Option<Vec<String>> {
match r.captures(input) {
Some(group) => {
let number_of_groups = group.len();
let mut result = Vec::with_capacity(number_of_groups);
for i in 0..number_of_groups {
result.push(group.get(i).unwrap().as_str().to_string());
}
Some(result)
},
None => None
}
}
fn parse_range(hdr:Option<&str>) -> (Option<usize>, Option<usize>) {
if hdr.is_none() {
return (None, None);
}
let hdr_str = hdr.unwrap();
let start_try = Self::match_str_and_get_group(hdr_str, &RANGE_START);
if start_try.is_some() {
let start_try = start_try.unwrap();
let g1 = start_try.get(1).unwrap();
let parse_result:usize = g1.parse().unwrap();
return (Some(parse_result), None);
}
let start_end_try = Self::match_str_and_get_group(hdr_str, &RANGE_START_END);
if start_end_try.is_some() {
let start_end_try = start_end_try.unwrap();
let g1 = start_end_try.get(1).unwrap();
let g2 = start_end_try.get(2).unwrap();
let parse1:usize = g1.parse().unwrap();
let parse2:usize = g2.parse().unwrap();
return (Some(parse1), Some(parse2));
}
(None, None)
}
}
#[rocket::async_trait]
impl<'r> FromRequest<'r> for ChunkedEncoding {
type Error = Infallible;
//Transfer-Encoding
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
let range = request.headers().get_one("transfer-encoding");
let flag1 = range.is_some() && range.unwrap().to_ascii_lowercase() == "chunked";
let decoded_length = request.headers().get_one("x-amz-decoded-content-length");
let content_length = request.headers().get_one("content-length");
let mut flag2 = false;
if decoded_length.is_some() && content_length.is_some() {
let decoded_length = decoded_length.unwrap();
let content_length = content_length.unwrap();
if decoded_length.len() > 0 && content_length.len() > 0 {
let decoded_length:i32 = decoded_length.parse().unwrap();
let content_length: i32 = content_length.parse().unwrap();
flag2 = content_length > decoded_length;
}
}
rocket::request::Outcome::Success(
ChunkedEncoding(flag1 || flag2)
)
}
}
#[rocket::async_trait]
impl<'r> FromRequest<'r> for RangeHeader {
type Error = Infallible;
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
let range = request.headers().get_one("range");
let (begin, end) = Self::parse_range(range);
rocket::request::Outcome::Success(
RangeHeader {
begin, end
}
)
}
}
#[derive(Debug, Clone)]
pub struct CreateBucket(bool);
#[rocket::async_trait]
impl<'r> FromRequest<'r> for CreateBucket {
type Error = Infallible;
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
let method = request.method().as_str();
let is_put = method == "PUT";
let segments = request.uri().path().segments().len();
if is_put && segments == 1 {
rocket::request::Outcome::Success(
CreateBucket(true)
)
} else {
rocket::request::Outcome::Forward(())
}
}
}
pub struct ListBucket;
#[rocket::async_trait]
impl<'r> FromRequest<'r> for ListBucket {
type Error = Infallible;
#[allow(unused)]
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
rocket::request::Outcome::Success(
ListBucket
)
}
}
pub struct PutObject;
#[rocket::async_trait]
impl<'r> FromRequest<'r> for PutObject {
type Error = Infallible;
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
let segments = request.uri().path().segments().len();
let method = request.method().as_str();
let is_put = method == "PUT";
let upload_id = request.query_value::<String>("uploadId");
let part_number = request.query_value::<String>("partNumber");
if is_put && part_number.is_none() && upload_id.is_none() && segments > 1 {
rocket::request::Outcome::Success(
PutObject
)
} else {
rocket::request::Outcome::Forward(())
}
}
}
pub struct CreateMultipartUpload;
#[rocket::async_trait]
impl<'r> FromRequest<'r> for CreateMultipartUpload {
type Error = Infallible;
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
let method = request.method().as_str();
let is_post = method == "POST";
let segments = request.uri().path().segments().len();
let uploads = request.query_value::<String>("uploads");
if is_post && segments > 1 && uploads.is_some() {
rocket::request::Outcome::Success(
CreateMultipartUpload
)
} else {
rocket::request::Outcome::Forward(())
}
}
}
pub struct UploadMultipart;
pub struct AbortMultipartUpload;
#[rocket::async_trait]
impl<'r> FromRequest<'r> for AbortMultipartUpload {
type Error = Infallible;
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
let method = request.method().as_str();
let is_delete = method == "DELETE";
let segments = request.uri().path().segments().len();
let upload_id = request.query_value::<String>("uploadId");
if is_delete && segments == 1 && upload_id.is_some() {
rocket::request::Outcome::Success(
AbortMultipartUpload
)
} else {
rocket::request::Outcome::Forward(())
}
}
}
pub struct DeleteObjects;
#[rocket::async_trait]
impl<'r> FromRequest<'r> for DeleteObjects {
type Error = Infallible;
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
let method = request.method().as_str();
let is_delete = method == "POST";
let segments = request.uri().path().segments().len();
let delete = request.query_value::<String>("delete");
if is_delete && segments <= 2 && delete.is_some(){
rocket::request::Outcome::Success(
DeleteObjects
)
} else {
rocket::request::Outcome::Forward(())
}
}
}
pub struct DeleteObject;
#[rocket::async_trait]
impl<'r> FromRequest<'r> for DeleteObject {
type Error = Infallible;
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
let method = request.method().as_str();
let is_delete = method == "DELETE";
let segments = request.uri().path().segments().len();
if is_delete && segments > 1 {
rocket::request::Outcome::Success(
DeleteObject
)
} else {
rocket::request::Outcome::Forward(())
}
}
}
pub struct CompleteMultipartUpload;
#[rocket::async_trait]
impl<'r> FromRequest<'r> for CompleteMultipartUpload {
type Error = Infallible;
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
let method = request.method().as_str();
let is_post = method == "POST";
let segments = request.uri().path().segments().len();
let upload_id = request.query_value::<String>("uploadId");
if is_post && segments > 1 && upload_id.is_some() {
rocket::request::Outcome::Success(
CompleteMultipartUpload
)
} else {
rocket::request::Outcome::Forward(())
}
}
}
pub struct GetObject;
#[rocket::async_trait]
impl<'r> FromRequest<'r> for GetObject {
type Error = Infallible;
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
let method = request.method().as_str();
let is_get = method == "GET";
let segments = request.uri().path().segments().len();
let location = request.query_value::<String>("location");
if is_get && location.is_none() && segments > 1{
rocket::request::Outcome::Success(
GetObject
)
} else {
rocket::request::Outcome::Forward(())
}
}
}
pub struct GetBucketLocation;
#[rocket::async_trait]
impl<'r> FromRequest<'r> for GetBucketLocation {
type Error = Infallible;
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
let method = request.method().as_str();
let is_delete = method == "GET";
let segments = request.uri().path().segments().len();
let location = request.query_value::<String>("location");
if is_delete && location.is_some() && segments == 1{
rocket::request::Outcome::Success(
GetBucketLocation
)
} else {
rocket::request::Outcome::Forward(())
}
}
}
pub struct PutObjectPart;
#[rocket::async_trait]
impl<'r> FromRequest<'r> for PutObjectPart {
type Error = Infallible;
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
let method = request.method().as_str();
let is_put = method == "PUT";
let segments = request.uri().path().segments().len();
let upload_id = request.query_value::<String>("uploadId");
let part_number = request.query_value::<String>("partNumber");
if is_put && part_number.is_some() && segments > 1 && upload_id.is_some() {
rocket::request::Outcome::Success(
PutObjectPart
)
} else {
rocket::request::Outcome::Forward(())
}
}
}
pub struct ListObjects;
#[rocket::async_trait]
impl<'r> FromRequest<'r> for ListObjects {
type Error = Infallible;
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
let method = request.method().as_str();
let is_get = method == "GET";
let segments = request.uri().path().segments().len();
let location = request.query_value::<String>("location");
let list_type = request.query_value::<String>("list-type");
if is_get && location.is_none() && segments > 0 && list_type.is_none(){
rocket::request::Outcome::Success(
ListObjects
)
} else {
rocket::request::Outcome::Forward(())
}
}
}
pub struct ListObjectsV2;
#[rocket::async_trait]
impl<'r> FromRequest<'r> for ListObjectsV2 {
type Error = Infallible;
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
let method = request.method().as_str();
let is_get = method == "GET";
let segments = request.uri().path().segments().len();
let location = request.query_value::<String>("location");
let list_type = request.query_value::<String>("list-type");
if is_get && location.is_none() && segments > 0 && list_type.is_some() && list_type.unwrap().unwrap() == "2" {
rocket::request::Outcome::Success(
ListObjectsV2
)
} else {
rocket::request::Outcome::Forward(())
}
}
}
pub struct AuthCheckPassed;
#[derive(Debug)]
pub enum AuthError {
NoKeySupplied,
InvalidKey,
InvalidSignature,
}
#[rocket::async_trait]
impl<'r> FromRequest<'r> for AuthCheckPassed {
type Error = AuthError;
async fn from_request(request: &'r rocket::Request<'_>) ->
rocket::request::Outcome<Self, Self::Error> {
let api_key = request.headers().get_one("x-api-key");
if api_key.is_some() {
if api_key.unwrap() == "secret" {
return rocket::request::Outcome::Success(AuthCheckPassed);
} else {
return rocket::request::Outcome::Failure((Status::Forbidden, AuthError::InvalidKey));
}
} else {
return rocket::request::Outcome::Failure((Status::Forbidden, AuthError::NoKeySupplied));
}
}
}

50
src/s3error.rs Normal file
View File

@@ -0,0 +1,50 @@
use std::fmt;
#[derive(Debug)]
pub struct S3Error {
pub kind:ErrorKind,
pub message:String,
}
#[derive(Debug)]
pub enum ErrorKind {
InvalidKey,
NotFound,
InputOutput
}
impl S3Error {
pub fn invalid_key() -> Self {
S3Error {
kind: ErrorKind::InvalidKey,
message: String::from("")
}
}
pub fn not_found() -> Self {
S3Error {
kind: ErrorKind::NotFound,
message: String::from("")
}
}
pub fn io_error() -> Self {
S3Error {
kind: ErrorKind::InputOutput,
message: String::from("")
}
}
}
impl fmt::Display for ErrorKind {
fn fmt(&self, w: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
return write!(w, "{:?}", self);
}
}
impl fmt::Display for S3Error {
fn fmt(&self, w: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
return write!(w, "{:?}", self);
}
}
impl std::error::Error for S3Error {
}

77
src/s3resp.rs Normal file
View File

@@ -0,0 +1,77 @@
use rocket::{http::Status, response::Responder};
use std::{io::Cursor, convert::Infallible};
use rocket::http::Header;
use rocket::response::Debug;
pub struct S3Response<'r> {
body: String,
headers: Vec<Header<'r>>,
status: Status,
}
impl<'r,'o> Responder<'r, 'r> for S3Response<'r>
where 'o:'r
{
fn respond_to(self, _req: &'r rocket::Request) -> Result<rocket::Response<'r>, Status> {
let size = self.body.len();
use rocket::response::Builder;
let mut resp:Builder<'r> = rocket::Response::build();
resp.status(self.status);
resp.sized_body(size, Cursor::new(self.body));
for next in self.headers {
resp.header(next);
}
return Ok(resp.finalize());
}
}
impl<'r> Into<Result<S3Response<'r>, Debug<Infallible>>> for S3Response<'r> {
fn into(self) -> Result<S3Response<'r>, Debug<Infallible>> {
return Ok(self);
}
}
impl <'r> S3Response<'r> {
pub fn ok() -> Self {
S3Response { body: "".to_string(), headers: vec![], status: Status::Ok }
}
pub fn not_found() -> Self {
S3Response {
body: "".to_string(),
headers: vec!(),
status: Status::NotFound
}
}
pub fn invalid_request() -> Self {
S3Response {
body: "".to_string(),
headers: vec![],
status: Status::BadRequest
}
}
pub fn server_error() -> Self {
S3Response {
body: "".to_string(),
headers: vec![],
status: Status::InternalServerError
}
}
pub fn add_header(&mut self, key:&str, value:&str) -> &mut Self {
let new_header = Header::new(key.to_string(), value.to_string());
self.headers.push(new_header);
return self;
}
pub fn status_code(&mut self, code: u16) -> &mut Self {
self.status = Status::new(code);
return self;
}
pub fn body(&mut self, text:&str) -> &mut Self {
self.body = String::from(text);
return self;
}
}

24
src/sequencing.rs Normal file
View File

@@ -0,0 +1,24 @@
use std::sync::atomic::AtomicU64;
#[derive(Debug)]
pub struct Sequence {
seq:AtomicU64
}
impl Sequence {
pub fn next(&self) -> String {
let start = std::time::SystemTime::now();
let time_part = start
.duration_since(std::time::UNIX_EPOCH)
.expect("Time went backwards")
.as_millis();
return format!("{}_{}", time_part, self.seq.fetch_add(1, std::sync::atomic::Ordering::SeqCst));
}
}
impl Default for Sequence {
fn default() -> Self {
return Sequence {
seq: AtomicU64::new(0),
}
}
}

46
src/states.rs Normal file
View File

@@ -0,0 +1,46 @@
use clap::Parser;
#[cfg(debug_assertions)]
static BASE_DIR:&str = "./rusts3-data-debug";
#[cfg(not(debug_assertions))]
static BASE_DIR:&str = "./rusts3-data";
#[cfg(debug_assertions)]
static PORT:i32 = 8001;
#[cfg(not(debug_assertions))]
static PORT:i32 = 8000;
#[derive(Parser, Debug, Clone)]
pub struct CliArg {
#[arg(short, long, default_value_t = String::from(BASE_DIR))]
base_dir: String,
#[arg(long, default_value_t = String::from("0.0.0.0"), help="Bind IP address")]
bind_address: String,
#[arg(long, default_value_t = PORT, help="Bind port number")]
bind_port: i32,
#[arg(long, default_value_t = String::from(""), help="Log4rs config file")]
log_conf:String
}
impl CliArg {
pub fn bind_port(&self) -> i32 {
self.bind_port
}
pub fn bind_address(&self) -> &str {
&self.bind_address
}
pub fn base_dir(&self) -> &str {
&self.base_dir
}
pub fn log4rs_config_file(&self) ->&str {
&self.log_conf
}
}

29
src/test.rs Normal file
View File

@@ -0,0 +1,29 @@
pub mod s3error;
pub mod chunk_to_raw;
pub mod sequencing;
pub mod fsapi;
use fsapi::{Bucket, FS};
use tokio::fs::File;
use std::path::PathBuf;
#[tokio::main]
async fn main() {
let mut fs = FS::new();
fs.set_base("rusts3-data-debug");
fs.initialize();
let b = fs.get_bucket("abcde").unwrap();
let result = b.list_objects("te", "", 10);
for next in result {
println!("{} -> {:?}", next.object_key(), next.kind());
}
let mut v = Vec::<i32>::new();
test(&mut v);
}
fn test(v:&mut Vec<i32>) {
v.push(1);
if v.len() < 5 {
test(v);
}
}