summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
-rw-r--r--src/wasm-wasi-component/src/lib.rs115
1 files changed, 84 insertions, 31 deletions
diff --git a/src/wasm-wasi-component/src/lib.rs b/src/wasm-wasi-component/src/lib.rs
index 54d99616..da78ea6e 100644
--- a/src/wasm-wasi-component/src/lib.rs
+++ b/src/wasm-wasi-component/src/lib.rs
@@ -9,7 +9,9 @@ use std::sync::OnceLock;
use tokio::sync::mpsc;
use wasmtime::component::{Component, InstancePre, Linker};
use wasmtime::{Config, Engine, Store};
-use wasmtime_wasi::preview2::{DirPerms, FilePerms, Table, WasiCtx, WasiCtxBuilder, WasiView};
+use wasmtime_wasi::preview2::{
+ DirPerms, FilePerms, Table, WasiCtx, WasiCtxBuilder, WasiView,
+};
use wasmtime_wasi::{ambient_authority, Dir};
use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView};
@@ -70,11 +72,17 @@ unsafe extern "C" fn setup(
ptr::null_mut(),
);
for i in 0..bindings::nxt_conf_object_members_count(dirs_ptr) {
- let value = bindings::nxt_conf_get_array_element(dirs_ptr, i.try_into().unwrap());
+ let value = bindings::nxt_conf_get_array_element(
+ dirs_ptr,
+ i.try_into().unwrap(),
+ );
let mut s = bindings::nxt_string("");
bindings::nxt_conf_get_string(value, &mut s);
dirs.push(
- std::str::from_utf8(std::slice::from_raw_parts(s.start, s.length))?.to_string(),
+ std::str::from_utf8(std::slice::from_raw_parts(
+ s.start, s.length,
+ ))?
+ .to_string(),
);
}
}
@@ -94,13 +102,15 @@ unsafe extern "C" fn start(
) -> bindings::nxt_int_t {
handle_result(task, || {
let config = GLOBAL_CONFIG.get().unwrap();
- let state = GlobalState::new(&config).context("failed to create initial state")?;
+ let state = GlobalState::new(&config)
+ .context("failed to create initial state")?;
let res = GLOBAL_STATE.set(state);
assert!(res.is_ok());
let conf = (*data).app;
let mut wasm_init = MaybeUninit::uninit();
- let ret = bindings::nxt_unit_default_init(task, wasm_init.as_mut_ptr(), conf);
+ let ret =
+ bindings::nxt_unit_default_init(task, wasm_init.as_mut_ptr(), conf);
if ret != bindings::NXT_OK as bindings::nxt_int_t {
bail!("nxt_unit_default_init() failed");
}
@@ -144,7 +154,9 @@ unsafe fn handle_result(
}
}
-unsafe extern "C" fn request_handler(info: *mut bindings::nxt_unit_request_info_t) {
+unsafe extern "C" fn request_handler(
+ info: *mut bindings::nxt_unit_request_info_t,
+) {
// Enqueue this request to get processed by the Tokio event loop, and
// otherwise immediately return.
let state = GLOBAL_STATE.get().unwrap();
@@ -235,8 +247,15 @@ impl GlobalState {
cx.inherit_stderr();
for dir in self.global_config.dirs.iter() {
let fd = Dir::open_ambient_dir(dir, ambient_authority())
- .with_context(|| format!("failed to open directory '{dir}'"))?;
- cx.preopened_dir(fd, DirPerms::all(), FilePerms::all(), dir);
+ .with_context(|| {
+ format!("failed to open directory '{dir}'")
+ })?;
+ cx.preopened_dir(
+ fd,
+ DirPerms::all(),
+ FilePerms::all(),
+ dir,
+ );
}
cx.build()
},
@@ -262,10 +281,12 @@ impl GlobalState {
// generate headers, write those below, and then compute the body
// afterwards.
let task = tokio::spawn(async move {
- let (proxy, _) =
- wasmtime_wasi_http::proxy::Proxy::instantiate_pre(&mut store, &self.component)
- .await
- .context("failed to instantiate")?;
+ let (proxy, _) = wasmtime_wasi_http::proxy::Proxy::instantiate_pre(
+ &mut store,
+ &self.component,
+ )
+ .await
+ .context("failed to instantiate")?;
let req = store.data_mut().new_incoming_request(request)?;
let out = store.data_mut().new_response_outparam(sender)?;
proxy
@@ -308,7 +329,10 @@ impl GlobalState {
Ok(())
}
- fn to_request_builder(&self, info: &NxtRequestInfo) -> Result<http::request::Builder> {
+ fn to_request_builder(
+ &self,
+ info: &NxtRequestInfo,
+ ) -> Result<http::request::Builder> {
let mut request = http::Request::builder();
request = request.method(info.method());
@@ -338,12 +362,16 @@ impl GlobalState {
Ok(request)
}
- fn to_request_body(&self, info: &mut NxtRequestInfo) -> BoxBody<Bytes, anyhow::Error> {
- // TODO: should convert the body into a form of `Stream` to become an async
- // stream of frames. The return value can represent that here but for now
- // this slurps up the entire body into memory and puts it all in a single
- // `BytesMut` which is then converted to `Bytes`.
- let mut body = BytesMut::with_capacity(info.content_length().try_into().unwrap());
+ fn to_request_body(
+ &self,
+ info: &mut NxtRequestInfo,
+ ) -> BoxBody<Bytes, anyhow::Error> {
+ // TODO: should convert the body into a form of `Stream` to become an
+ // async stream of frames. The return value can represent that here
+ // but for now this slurps up the entire body into memory and puts it
+ // all in a single `BytesMut` which is then converted to `Bytes`.
+ let mut body =
+ BytesMut::with_capacity(info.content_length().try_into().unwrap());
// TODO: can this perform a partial read?
// TODO: how to make this async at the nxt level?
@@ -352,7 +380,11 @@ impl GlobalState {
Full::new(body.freeze()).map_err(|e| match e {}).boxed()
}
- fn send_response<T>(&self, info: &mut NxtRequestInfo, response: http::Response<T>) -> T {
+ fn send_response<T>(
+ &self,
+ info: &mut NxtRequestInfo,
+ response: http::Response<T>,
+ ) -> T {
info.init_response(
response.status().as_u16(),
response.headers().len().try_into().unwrap(),
@@ -378,9 +410,9 @@ impl GlobalState {
mut body: BoxBody<Bytes, anyhow::Error>,
) -> Result<()> {
loop {
- // Acquire the next frame, and because nothing is actually async at the
- // moment this should never block meaning that the `Pending` case
- // should not happen.
+ // Acquire the next frame, and because nothing is actually async
+ // at the moment this should never block meaning that the
+ // `Pending` case should not happen.
let frame = match body.frame().await {
Some(Ok(frame)) => frame,
Some(Err(e)) => break Err(e),
@@ -451,8 +483,10 @@ impl NxtRequestInfo {
let raw = (*self.info).request;
(0..(*raw).fields_count).map(move |i| {
let field = (*raw).fields.as_ptr().add(i as usize);
- let name = self.get_str(&(*field).name, (*field).name_length.into());
- let value = self.get_str(&(*field).value, (*field).value_length.into());
+ let name =
+ self.get_str(&(*field).name, (*field).name_length.into());
+ let value =
+ self.get_str(&(*field).value, (*field).value_length.into());
(name, value)
})
}
@@ -461,8 +495,11 @@ impl NxtRequestInfo {
fn request_read(&mut self, dst: &mut BytesMut) {
unsafe {
let rest = dst.spare_capacity_mut();
- let amt =
- bindings::nxt_unit_request_read(self.info, rest.as_mut_ptr().cast(), rest.len());
+ let amt = bindings::nxt_unit_request_read(
+ self.info,
+ rest.as_mut_ptr().cast(),
+ rest.len(),
+ );
// TODO: handle failure when `amt` is negative
let amt: usize = amt.try_into().unwrap();
dst.set_len(dst.len() + amt);
@@ -471,14 +508,23 @@ impl NxtRequestInfo {
fn response_write(&mut self, data: &[u8]) {
unsafe {
- let rc = bindings::nxt_unit_response_write(self.info, data.as_ptr().cast(), data.len());
+ let rc = bindings::nxt_unit_response_write(
+ self.info,
+ data.as_ptr().cast(),
+ data.len(),
+ );
assert_eq!(rc, 0);
}
}
fn init_response(&mut self, status: u16, headers: u32, headers_size: u32) {
unsafe {
- let rc = bindings::nxt_unit_response_init(self.info, status, headers, headers_size);
+ let rc = bindings::nxt_unit_response_init(
+ self.info,
+ status,
+ headers,
+ headers_size,
+ );
assert_eq!(rc, 0);
}
}
@@ -505,11 +551,18 @@ impl NxtRequestInfo {
fn request_done(self) {
unsafe {
- bindings::nxt_unit_request_done(self.info, bindings::NXT_UNIT_OK as i32);
+ bindings::nxt_unit_request_done(
+ self.info,
+ bindings::NXT_UNIT_OK as i32,
+ );
}
}
- unsafe fn get_str(&self, ptr: &bindings::nxt_unit_sptr_t, len: u32) -> &str {
+ unsafe fn get_str(
+ &self,
+ ptr: &bindings::nxt_unit_sptr_t,
+ len: u32,
+ ) -> &str {
let ptr = bindings::nxt_unit_sptr_get(ptr);
let slice = std::slice::from_raw_parts(ptr, len.try_into().unwrap());
std::str::from_utf8(slice).unwrap()