diff options
Diffstat (limited to '')
-rw-r--r-- | src/wasm-wasi-component/src/lib.rs | 115 |
1 files changed, 84 insertions, 31 deletions
diff --git a/src/wasm-wasi-component/src/lib.rs b/src/wasm-wasi-component/src/lib.rs index 54d99616..da78ea6e 100644 --- a/src/wasm-wasi-component/src/lib.rs +++ b/src/wasm-wasi-component/src/lib.rs @@ -9,7 +9,9 @@ use std::sync::OnceLock; use tokio::sync::mpsc; use wasmtime::component::{Component, InstancePre, Linker}; use wasmtime::{Config, Engine, Store}; -use wasmtime_wasi::preview2::{DirPerms, FilePerms, Table, WasiCtx, WasiCtxBuilder, WasiView}; +use wasmtime_wasi::preview2::{ + DirPerms, FilePerms, Table, WasiCtx, WasiCtxBuilder, WasiView, +}; use wasmtime_wasi::{ambient_authority, Dir}; use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView}; @@ -70,11 +72,17 @@ unsafe extern "C" fn setup( ptr::null_mut(), ); for i in 0..bindings::nxt_conf_object_members_count(dirs_ptr) { - let value = bindings::nxt_conf_get_array_element(dirs_ptr, i.try_into().unwrap()); + let value = bindings::nxt_conf_get_array_element( + dirs_ptr, + i.try_into().unwrap(), + ); let mut s = bindings::nxt_string(""); bindings::nxt_conf_get_string(value, &mut s); dirs.push( - std::str::from_utf8(std::slice::from_raw_parts(s.start, s.length))?.to_string(), + std::str::from_utf8(std::slice::from_raw_parts( + s.start, s.length, + ))? + .to_string(), ); } } @@ -94,13 +102,15 @@ unsafe extern "C" fn start( ) -> bindings::nxt_int_t { handle_result(task, || { let config = GLOBAL_CONFIG.get().unwrap(); - let state = GlobalState::new(&config).context("failed to create initial state")?; + let state = GlobalState::new(&config) + .context("failed to create initial state")?; let res = GLOBAL_STATE.set(state); assert!(res.is_ok()); let conf = (*data).app; let mut wasm_init = MaybeUninit::uninit(); - let ret = bindings::nxt_unit_default_init(task, wasm_init.as_mut_ptr(), conf); + let ret = + bindings::nxt_unit_default_init(task, wasm_init.as_mut_ptr(), conf); if ret != bindings::NXT_OK as bindings::nxt_int_t { bail!("nxt_unit_default_init() failed"); } @@ -144,7 +154,9 @@ unsafe fn handle_result( } } -unsafe extern "C" fn request_handler(info: *mut bindings::nxt_unit_request_info_t) { +unsafe extern "C" fn request_handler( + info: *mut bindings::nxt_unit_request_info_t, +) { // Enqueue this request to get processed by the Tokio event loop, and // otherwise immediately return. let state = GLOBAL_STATE.get().unwrap(); @@ -235,8 +247,15 @@ impl GlobalState { cx.inherit_stderr(); for dir in self.global_config.dirs.iter() { let fd = Dir::open_ambient_dir(dir, ambient_authority()) - .with_context(|| format!("failed to open directory '{dir}'"))?; - cx.preopened_dir(fd, DirPerms::all(), FilePerms::all(), dir); + .with_context(|| { + format!("failed to open directory '{dir}'") + })?; + cx.preopened_dir( + fd, + DirPerms::all(), + FilePerms::all(), + dir, + ); } cx.build() }, @@ -262,10 +281,12 @@ impl GlobalState { // generate headers, write those below, and then compute the body // afterwards. let task = tokio::spawn(async move { - let (proxy, _) = - wasmtime_wasi_http::proxy::Proxy::instantiate_pre(&mut store, &self.component) - .await - .context("failed to instantiate")?; + let (proxy, _) = wasmtime_wasi_http::proxy::Proxy::instantiate_pre( + &mut store, + &self.component, + ) + .await + .context("failed to instantiate")?; let req = store.data_mut().new_incoming_request(request)?; let out = store.data_mut().new_response_outparam(sender)?; proxy @@ -308,7 +329,10 @@ impl GlobalState { Ok(()) } - fn to_request_builder(&self, info: &NxtRequestInfo) -> Result<http::request::Builder> { + fn to_request_builder( + &self, + info: &NxtRequestInfo, + ) -> Result<http::request::Builder> { let mut request = http::Request::builder(); request = request.method(info.method()); @@ -338,12 +362,16 @@ impl GlobalState { Ok(request) } - fn to_request_body(&self, info: &mut NxtRequestInfo) -> BoxBody<Bytes, anyhow::Error> { - // TODO: should convert the body into a form of `Stream` to become an async - // stream of frames. The return value can represent that here but for now - // this slurps up the entire body into memory and puts it all in a single - // `BytesMut` which is then converted to `Bytes`. - let mut body = BytesMut::with_capacity(info.content_length().try_into().unwrap()); + fn to_request_body( + &self, + info: &mut NxtRequestInfo, + ) -> BoxBody<Bytes, anyhow::Error> { + // TODO: should convert the body into a form of `Stream` to become an + // async stream of frames. The return value can represent that here + // but for now this slurps up the entire body into memory and puts it + // all in a single `BytesMut` which is then converted to `Bytes`. + let mut body = + BytesMut::with_capacity(info.content_length().try_into().unwrap()); // TODO: can this perform a partial read? // TODO: how to make this async at the nxt level? @@ -352,7 +380,11 @@ impl GlobalState { Full::new(body.freeze()).map_err(|e| match e {}).boxed() } - fn send_response<T>(&self, info: &mut NxtRequestInfo, response: http::Response<T>) -> T { + fn send_response<T>( + &self, + info: &mut NxtRequestInfo, + response: http::Response<T>, + ) -> T { info.init_response( response.status().as_u16(), response.headers().len().try_into().unwrap(), @@ -378,9 +410,9 @@ impl GlobalState { mut body: BoxBody<Bytes, anyhow::Error>, ) -> Result<()> { loop { - // Acquire the next frame, and because nothing is actually async at the - // moment this should never block meaning that the `Pending` case - // should not happen. + // Acquire the next frame, and because nothing is actually async + // at the moment this should never block meaning that the + // `Pending` case should not happen. let frame = match body.frame().await { Some(Ok(frame)) => frame, Some(Err(e)) => break Err(e), @@ -451,8 +483,10 @@ impl NxtRequestInfo { let raw = (*self.info).request; (0..(*raw).fields_count).map(move |i| { let field = (*raw).fields.as_ptr().add(i as usize); - let name = self.get_str(&(*field).name, (*field).name_length.into()); - let value = self.get_str(&(*field).value, (*field).value_length.into()); + let name = + self.get_str(&(*field).name, (*field).name_length.into()); + let value = + self.get_str(&(*field).value, (*field).value_length.into()); (name, value) }) } @@ -461,8 +495,11 @@ impl NxtRequestInfo { fn request_read(&mut self, dst: &mut BytesMut) { unsafe { let rest = dst.spare_capacity_mut(); - let amt = - bindings::nxt_unit_request_read(self.info, rest.as_mut_ptr().cast(), rest.len()); + let amt = bindings::nxt_unit_request_read( + self.info, + rest.as_mut_ptr().cast(), + rest.len(), + ); // TODO: handle failure when `amt` is negative let amt: usize = amt.try_into().unwrap(); dst.set_len(dst.len() + amt); @@ -471,14 +508,23 @@ impl NxtRequestInfo { fn response_write(&mut self, data: &[u8]) { unsafe { - let rc = bindings::nxt_unit_response_write(self.info, data.as_ptr().cast(), data.len()); + let rc = bindings::nxt_unit_response_write( + self.info, + data.as_ptr().cast(), + data.len(), + ); assert_eq!(rc, 0); } } fn init_response(&mut self, status: u16, headers: u32, headers_size: u32) { unsafe { - let rc = bindings::nxt_unit_response_init(self.info, status, headers, headers_size); + let rc = bindings::nxt_unit_response_init( + self.info, + status, + headers, + headers_size, + ); assert_eq!(rc, 0); } } @@ -505,11 +551,18 @@ impl NxtRequestInfo { fn request_done(self) { unsafe { - bindings::nxt_unit_request_done(self.info, bindings::NXT_UNIT_OK as i32); + bindings::nxt_unit_request_done( + self.info, + bindings::NXT_UNIT_OK as i32, + ); } } - unsafe fn get_str(&self, ptr: &bindings::nxt_unit_sptr_t, len: u32) -> &str { + unsafe fn get_str( + &self, + ptr: &bindings::nxt_unit_sptr_t, + len: u32, + ) -> &str { let ptr = bindings::nxt_unit_sptr_get(ptr); let slice = std::slice::from_raw_parts(ptr, len.try_into().unwrap()); std::str::from_utf8(slice).unwrap() |