diff options
author | Alex Crichton <alex@alexcrichton.com> | 2023-10-29 15:13:56 -0700 |
---|---|---|
committer | Andrew Clayton <a.clayton@nginx.com> | 2024-02-21 16:20:32 +0000 |
commit | 20ada4b5c135862104ca724a6d9d17730286aa82 (patch) | |
tree | 12698fa89bd8c1fbff5fe1740279e2053fcb9660 | |
parent | f0782722654158c38193183e4c9b74925a0c52ef (diff) | |
download | unit-20ada4b5c135862104ca724a6d9d17730286aa82.tar.gz unit-20ada4b5c135862104ca724a6d9d17730286aa82.tar.bz2 |
Wasm-wc: Core of initial Wasm component model language module support
This is the work of Alex Crichton.
This is written in Rust. The problem is that there is currently no
support on the C side of things for the component model, which is the
point of this module.
It talks to Unit via automatically generated bindings.
I've (Andrew) just made some minor tweaks to src/lib.rs, build.rs &
Cargo.toml to adjust some paths, adjust where we get the language module
config from and the module name and where it's located in the source
tree,
I also removed and disabled the tracking of the Cargo.lock file, this is
constantly changing and not tracking it seems right for 'libraries' and
dropped the README's...
Other than that I have tried to leave his work intact, subsequent
commits will make some larger changes, but I didn't want to intermix
them with Alex's work.
One such commit will update the module to use wasmtime 17 which brings
WASI 0.2.0 support.
Signed-off-by: Andrew Clayton <a.clayton@nginx.com>
-rw-r--r-- | src/wasm-wasi-component/.gitignore | 3 | ||||
-rw-r--r-- | src/wasm-wasi-component/Cargo.toml | 30 | ||||
-rw-r--r-- | src/wasm-wasi-component/build.rs | 33 | ||||
-rw-r--r-- | src/wasm-wasi-component/src/lib.rs | 549 | ||||
-rw-r--r-- | src/wasm-wasi-component/wrapper.h | 5 |
5 files changed, 620 insertions, 0 deletions
diff --git a/src/wasm-wasi-component/.gitignore b/src/wasm-wasi-component/.gitignore new file mode 100644 index 00000000..159e3885 --- /dev/null +++ b/src/wasm-wasi-component/.gitignore @@ -0,0 +1,3 @@ +Cargo.lock + +target diff --git a/src/wasm-wasi-component/Cargo.toml b/src/wasm-wasi-component/Cargo.toml new file mode 100644 index 00000000..8f42d128 --- /dev/null +++ b/src/wasm-wasi-component/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "wasm-wasi-component" +version = "0.1.0" +edition = "2021" +publish = false + +[lib] +crate-type = ["cdylib"] + +[dependencies] +anyhow = "1.0.75" +bytes = "1.5.0" +futures-util = { version = "0.3.29", default-features = false } +http = "0.2.9" +http-body = { version = "1.0.0-rc.2", default-features = false } +http-body-util = "0.1.0-rc.2" +tokio = { version = "1.33.0", default-features = false } +wasmtime = "14.0.2" +wasmtime-wasi = "14.0.2" +wasmtime-wasi-http = "14.0.2" + +[build-dependencies] +bindgen = "0.68.1" +cc = "1.0.83" + +[profile.dev] +panic = 'abort' + +[profile.release] +panic = 'abort' diff --git a/src/wasm-wasi-component/build.rs b/src/wasm-wasi-component/build.rs new file mode 100644 index 00000000..5ea74f17 --- /dev/null +++ b/src/wasm-wasi-component/build.rs @@ -0,0 +1,33 @@ +use std::env; +use std::path::PathBuf; + +fn main() { + // Tell cargo to invalidate the built crate whenever the wrapper changes + println!("cargo:rerun-if-changed=wrapper.h"); + + let bindings = bindgen::Builder::default() + .clang_args(["-I", "../"]) + .clang_args(["-I", "../../build/include"]) + .header("./wrapper.h") + // only generate bindings for `nxt_*` header files + .allowlist_file(".*nxt_.*.h") + // generates an "improper_ctypes" warning and we don't need it anyway + .blocklist_function("nxt_vsprintf") + // Tell cargo to invalidate the built crate whenever any of the + // included header files changed. + .parse_callbacks(Box::new(bindgen::CargoCallbacks)) + // disable some features which aren't necessary + .layout_tests(false) + .derive_debug(false) + .generate() + .expect("Unable to generate bindings"); + + cc::Build::new() + .object("../../build/src/nxt_unit.o") + .compile("nxt-unit"); + + let out_path = PathBuf::from(env::var("OUT_DIR").unwrap()); + bindings + .write_to_file(out_path.join("bindings.rs")) + .expect("Couldn't write bindings!"); +} diff --git a/src/wasm-wasi-component/src/lib.rs b/src/wasm-wasi-component/src/lib.rs new file mode 100644 index 00000000..54d99616 --- /dev/null +++ b/src/wasm-wasi-component/src/lib.rs @@ -0,0 +1,549 @@ +use anyhow::{bail, Context, Result}; +use bytes::{Bytes, BytesMut}; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Full}; +use std::ffi::{CStr, CString}; +use std::mem::MaybeUninit; +use std::ptr; +use std::sync::OnceLock; +use tokio::sync::mpsc; +use wasmtime::component::{Component, InstancePre, Linker}; +use wasmtime::{Config, Engine, Store}; +use wasmtime_wasi::preview2::{DirPerms, FilePerms, Table, WasiCtx, WasiCtxBuilder, WasiView}; +use wasmtime_wasi::{ambient_authority, Dir}; +use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView}; + +#[allow( + non_camel_case_types, + non_upper_case_globals, + non_snake_case, + dead_code +)] +mod bindings { + include!(concat!(env!("OUT_DIR"), "/bindings.rs")); + + pub const fn nxt_string(s: &'static str) -> nxt_str_t { + nxt_str_t { + start: s.as_ptr().cast_mut(), + length: s.len(), + } + } + + pub unsafe fn nxt_unit_sptr_get(sptr: &nxt_unit_sptr_t) -> *const u8 { + sptr.base.as_ptr().offset(sptr.offset as isize) + } +} + +#[no_mangle] +pub static mut nxt_app_module: bindings::nxt_app_module_t = { + const COMPAT: [u32; 2] = [bindings::NXT_VERNUM, bindings::NXT_DEBUG]; + let version = "0.1\0"; + bindings::nxt_app_module_t { + compat: COMPAT.as_ptr().cast_mut(), + compat_length: COMPAT.len() * 4, + mounts: ptr::null(), + nmounts: 0, + type_: bindings::nxt_string("wasm-wasi-component"), + version: version.as_ptr().cast(), + setup: Some(setup), + start: Some(start), + } +}; + +static GLOBAL_CONFIG: OnceLock<GlobalConfig> = OnceLock::new(); +static GLOBAL_STATE: OnceLock<GlobalState> = OnceLock::new(); + +unsafe extern "C" fn setup( + task: *mut bindings::nxt_task_t, + // TODO: should this get used? + _process: *mut bindings::nxt_process_t, + conf: *mut bindings::nxt_common_app_conf_t, +) -> bindings::nxt_int_t { + handle_result(task, || { + let wasm_conf = &(*conf).u.wasm_wc; + let component = CStr::from_ptr(wasm_conf.component).to_str()?; + let mut dirs = Vec::new(); + if !wasm_conf.access.is_null() { + let dirs_ptr = bindings::nxt_conf_get_object_member( + wasm_conf.access, + &mut bindings::nxt_string("filesystem"), + ptr::null_mut(), + ); + for i in 0..bindings::nxt_conf_object_members_count(dirs_ptr) { + let value = bindings::nxt_conf_get_array_element(dirs_ptr, i.try_into().unwrap()); + let mut s = bindings::nxt_string(""); + bindings::nxt_conf_get_string(value, &mut s); + dirs.push( + std::str::from_utf8(std::slice::from_raw_parts(s.start, s.length))?.to_string(), + ); + } + } + + let result = GLOBAL_CONFIG.set(GlobalConfig { + component: component.to_string(), + dirs, + }); + assert!(result.is_ok()); + Ok(()) + }) +} + +unsafe extern "C" fn start( + task: *mut bindings::nxt_task_t, + data: *mut bindings::nxt_process_data_t, +) -> bindings::nxt_int_t { + handle_result(task, || { + let config = GLOBAL_CONFIG.get().unwrap(); + let state = GlobalState::new(&config).context("failed to create initial state")?; + let res = GLOBAL_STATE.set(state); + assert!(res.is_ok()); + + let conf = (*data).app; + let mut wasm_init = MaybeUninit::uninit(); + let ret = bindings::nxt_unit_default_init(task, wasm_init.as_mut_ptr(), conf); + if ret != bindings::NXT_OK as bindings::nxt_int_t { + bail!("nxt_unit_default_init() failed"); + } + let mut wasm_init = wasm_init.assume_init(); + wasm_init.callbacks.request_handler = Some(request_handler); + + let unit_ctx = bindings::nxt_unit_init(&mut wasm_init); + if unit_ctx.is_null() { + bail!("nxt_unit_init() failed"); + } + + bindings::nxt_unit_run(unit_ctx); + bindings::nxt_unit_done(unit_ctx); + + Ok(()) + }) +} + +unsafe fn handle_result( + task: *mut bindings::nxt_task_t, + func: impl FnOnce() -> Result<()>, +) -> bindings::nxt_int_t { + let rc = match func() { + Ok(()) => bindings::NXT_OK as bindings::nxt_int_t, + Err(e) => { + alert(task, &format!("{e:?}")); + bindings::NXT_ERROR as bindings::nxt_int_t + } + }; + return rc; + + unsafe fn alert(task: *mut bindings::nxt_task_t, msg: &str) { + let log = (*task).log; + let msg = CString::new(msg).unwrap(); + ((*log).handler).unwrap()( + bindings::NXT_LOG_ALERT as bindings::nxt_uint_t, + log, + "%s\0".as_ptr().cast(), + msg.as_ptr(), + ); + } +} + +unsafe extern "C" fn request_handler(info: *mut bindings::nxt_unit_request_info_t) { + // Enqueue this request to get processed by the Tokio event loop, and + // otherwise immediately return. + let state = GLOBAL_STATE.get().unwrap(); + state.sender.blocking_send(NxtRequestInfo { info }).unwrap(); +} + +struct GlobalConfig { + component: String, + dirs: Vec<String>, +} + +struct GlobalState { + engine: Engine, + component: InstancePre<StoreState>, + global_config: &'static GlobalConfig, + sender: mpsc::Sender<NxtRequestInfo>, +} + +impl GlobalState { + fn new(global_config: &'static GlobalConfig) -> Result<GlobalState> { + // Configure Wasmtime, e.g. the component model and async support are + // enabled here. Other configuration can include: + // + // * Epochs/fuel - enables async yielding to prevent any one request + // starving others. + // * Pooling allocator - accelerates instantiation at the cost of a + // large virtual memory reservation. + // * Memory limits/etc. + let mut config = Config::new(); + config.wasm_component_model(true); + config.async_support(true); + let engine = Engine::new(&config)?; + + // Compile the binary component on disk in Wasmtime. This is then + // pre-instantiated with host APIs defined by WASI. The result of + // this is a "pre-instantiated instance" which can be used to + // repeatedly instantiate later on. This will frontload + // compilation/linking/type-checking/etc to happen once rather than on + // each request. + let component = Component::from_file(&engine, &global_config.component) + .context("failed to compile component")?; + let mut linker = Linker::<StoreState>::new(&engine); + wasmtime_wasi_http::proxy::add_to_linker(&mut linker)?; + let component = linker + .instantiate_pre(&component) + .context("failed to pre-instantiate the provided component")?; + + // Spin up the Tokio async runtime in a separate thread with a + // communication channel into it. This thread will send requests to + // Tokio and the results will be calculated there. + let (sender, receiver) = mpsc::channel(10); + std::thread::spawn(|| GlobalState::run(receiver)); + + Ok(GlobalState { + engine, + component, + sender, + global_config, + }) + } + + /// Worker thread that executes the Tokio runtime, infinitely receiving + /// messages from the provided `receiver` and handling those requests. + /// + /// Each request is handled in a separate subtask so processing can all + /// happen concurrently. + fn run(mut receiver: mpsc::Receiver<NxtRequestInfo>) { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + while let Some(msg) = receiver.recv().await { + let state = GLOBAL_STATE.get().unwrap(); + tokio::task::spawn(async move { + state.handle(msg).await.expect("failed to handle request") + }); + } + }); + } + + async fn handle(&'static self, mut info: NxtRequestInfo) -> Result<()> { + // Create a "Store" which is the unit of per-request isolation in + // Wasmtime. + let data = StoreState { + ctx: { + let mut cx = WasiCtxBuilder::new(); + // NB: while useful for debugging untrusted code probably + // shouldn't get raw access to stdout/stderr. + cx.inherit_stdout(); + cx.inherit_stderr(); + for dir in self.global_config.dirs.iter() { + let fd = Dir::open_ambient_dir(dir, ambient_authority()) + .with_context(|| format!("failed to open directory '{dir}'"))?; + cx.preopened_dir(fd, DirPerms::all(), FilePerms::all(), dir); + } + cx.build() + }, + table: Table::default(), + http: WasiHttpCtx, + }; + let mut store = Store::new(&self.engine, data); + + // Convert the `nxt_*` representation into the representation required + // by Wasmtime's `wasi-http` implementation using the Rust `http` + // crate. + let request = self.to_request_builder(&info)?; + let body = self.to_request_body(&mut info); + let request = request.body(body)?; + + let (sender, receiver) = tokio::sync::oneshot::channel(); + + // Instantiate the WebAssembly component and invoke its `handle` + // function which receives a request and where to put a response. + // + // Note that this is done in a sub-task to work concurrently with + // writing the response when it's available. This enables wasm to + // generate headers, write those below, and then compute the body + // afterwards. + let task = tokio::spawn(async move { + let (proxy, _) = + wasmtime_wasi_http::proxy::Proxy::instantiate_pre(&mut store, &self.component) + .await + .context("failed to instantiate")?; + let req = store.data_mut().new_incoming_request(request)?; + let out = store.data_mut().new_response_outparam(sender)?; + proxy + .wasi_http_incoming_handler() + .call_handle(&mut store, req, out) + .await + .context("failed to invoke wasm `handle`")?; + Ok::<_, anyhow::Error>(()) + }); + + // Wait for the wasm to produce the initial response. If this succeeds + // then propagate that failure. If this fails then wait for the above + // task to complete to see if it failed, otherwise panic since that's + // unexpected. + let response = match receiver.await { + Ok(response) => response.context("response generation failed")?, + Err(_) => { + task.await.unwrap()?; + panic!("sender of response disappeared"); + } + }; + + // Send the headers/status which will extract the body for the next + // phase. + let body = self.send_response(&mut info, response); + + // Send the body, a blocking operation, over time as it becomes + // available. + self.send_response_body(&mut info, body) + .await + .context("failed to write response body")?; + + // Join on completion of the wasm task which should be done by this + // point. + task.await.unwrap()?; + + // And finally signal that we're done. + info.request_done(); + + Ok(()) + } + + fn to_request_builder(&self, info: &NxtRequestInfo) -> Result<http::request::Builder> { + let mut request = http::Request::builder(); + + request = request.method(info.method()); + request = match info.version() { + "HTTP/0.9" => request.version(http::Version::HTTP_09), + "HTTP/1.0" => request.version(http::Version::HTTP_10), + "HTTP/1.1" => request.version(http::Version::HTTP_11), + "HTTP/2.0" => request.version(http::Version::HTTP_2), + "HTTP/3.0" => request.version(http::Version::HTTP_3), + version => { + println!("unknown version: {version}"); + request + } + }; + + let uri = http::Uri::builder() + .scheme(if info.tls() { "https" } else { "http" }) + .authority(info.server_name()) + .path_and_query(info.target()) + .build() + .context("failed to build URI")?; + request = request.uri(uri); + + for (name, value) in info.fields() { + request = request.header(name, value); + } + Ok(request) + } + + fn to_request_body(&self, info: &mut NxtRequestInfo) -> BoxBody<Bytes, anyhow::Error> { + // TODO: should convert the body into a form of `Stream` to become an async + // stream of frames. The return value can represent that here but for now + // this slurps up the entire body into memory and puts it all in a single + // `BytesMut` which is then converted to `Bytes`. + let mut body = BytesMut::with_capacity(info.content_length().try_into().unwrap()); + + // TODO: can this perform a partial read? + // TODO: how to make this async at the nxt level? + info.request_read(&mut body); + + Full::new(body.freeze()).map_err(|e| match e {}).boxed() + } + + fn send_response<T>(&self, info: &mut NxtRequestInfo, response: http::Response<T>) -> T { + info.init_response( + response.status().as_u16(), + response.headers().len().try_into().unwrap(), + response + .headers() + .iter() + .map(|(k, v)| k.as_str().len() + v.len()) + .sum::<usize>() + .try_into() + .unwrap(), + ); + for (k, v) in response.headers() { + info.add_field(k.as_str().as_bytes(), v.as_bytes()); + } + info.send_response(); + + response.into_body() + } + + async fn send_response_body( + &self, + info: &mut NxtRequestInfo, + mut body: BoxBody<Bytes, anyhow::Error>, + ) -> Result<()> { + loop { + // Acquire the next frame, and because nothing is actually async at the + // moment this should never block meaning that the `Pending` case + // should not happen. + let frame = match body.frame().await { + Some(Ok(frame)) => frame, + Some(Err(e)) => break Err(e), + None => break Ok(()), + }; + match frame.data_ref() { + Some(data) => { + info.response_write(&data); + } + None => { + // TODO: what to do with trailers? + } + } + } + } +} + +struct NxtRequestInfo { + info: *mut bindings::nxt_unit_request_info_t, +} + +// TODO: is this actually safe? +unsafe impl Send for NxtRequestInfo {} +unsafe impl Sync for NxtRequestInfo {} + +impl NxtRequestInfo { + fn method(&self) -> &str { + unsafe { + let raw = (*self.info).request; + self.get_str(&(*raw).method, (*raw).method_length.into()) + } + } + + fn tls(&self) -> bool { + unsafe { (*(*self.info).request).tls != 0 } + } + + fn version(&self) -> &str { + unsafe { + let raw = (*self.info).request; + self.get_str(&(*raw).version, (*raw).version_length.into()) + } + } + + fn server_name(&self) -> &str { + unsafe { + let raw = (*self.info).request; + self.get_str(&(*raw).server_name, (*raw).server_name_length.into()) + } + } + + fn target(&self) -> &str { + unsafe { + let raw = (*self.info).request; + self.get_str(&(*raw).target, (*raw).target_length.into()) + } + } + + fn content_length(&self) -> u64 { + unsafe { + let raw_request = (*self.info).request; + (*raw_request).content_length + } + } + + fn fields(&self) -> impl Iterator<Item = (&str, &str)> { + unsafe { + let raw = (*self.info).request; + (0..(*raw).fields_count).map(move |i| { + let field = (*raw).fields.as_ptr().add(i as usize); + let name = self.get_str(&(*field).name, (*field).name_length.into()); + let value = self.get_str(&(*field).value, (*field).value_length.into()); + (name, value) + }) + } + } + + fn request_read(&mut self, dst: &mut BytesMut) { + unsafe { + let rest = dst.spare_capacity_mut(); + let amt = + bindings::nxt_unit_request_read(self.info, rest.as_mut_ptr().cast(), rest.len()); + // TODO: handle failure when `amt` is negative + let amt: usize = amt.try_into().unwrap(); + dst.set_len(dst.len() + amt); + } + } + + fn response_write(&mut self, data: &[u8]) { + unsafe { + let rc = bindings::nxt_unit_response_write(self.info, data.as_ptr().cast(), data.len()); + assert_eq!(rc, 0); + } + } + + fn init_response(&mut self, status: u16, headers: u32, headers_size: u32) { + unsafe { + let rc = bindings::nxt_unit_response_init(self.info, status, headers, headers_size); + assert_eq!(rc, 0); + } + } + + fn add_field(&mut self, key: &[u8], val: &[u8]) { + unsafe { + let rc = bindings::nxt_unit_response_add_field( + self.info, + key.as_ptr().cast(), + key.len().try_into().unwrap(), + val.as_ptr().cast(), + val.len().try_into().unwrap(), + ); + assert_eq!(rc, 0); + } + } + + fn send_response(&mut self) { + unsafe { + let rc = bindings::nxt_unit_response_send(self.info); + assert_eq!(rc, 0); + } + } + + fn request_done(self) { + unsafe { + bindings::nxt_unit_request_done(self.info, bindings::NXT_UNIT_OK as i32); + } + } + + unsafe fn get_str(&self, ptr: &bindings::nxt_unit_sptr_t, len: u32) -> &str { + let ptr = bindings::nxt_unit_sptr_get(ptr); + let slice = std::slice::from_raw_parts(ptr, len.try_into().unwrap()); + std::str::from_utf8(slice).unwrap() + } +} + +struct StoreState { + ctx: WasiCtx, + http: WasiHttpCtx, + table: Table, +} + +impl WasiView for StoreState { + fn table(&self) -> &Table { + &self.table + } + fn table_mut(&mut self) -> &mut Table { + &mut self.table + } + fn ctx(&self) -> &WasiCtx { + &self.ctx + } + fn ctx_mut(&mut self) -> &mut WasiCtx { + &mut self.ctx + } +} + +impl WasiHttpView for StoreState { + fn ctx(&mut self) -> &mut WasiHttpCtx { + &mut self.http + } + fn table(&mut self) -> &mut Table { + &mut self.table + } +} + +impl StoreState {} diff --git a/src/wasm-wasi-component/wrapper.h b/src/wasm-wasi-component/wrapper.h new file mode 100644 index 00000000..93f3014a --- /dev/null +++ b/src/wasm-wasi-component/wrapper.h @@ -0,0 +1,5 @@ +#include <nxt_main.h> +#include <nxt_unit.h> +#include <nxt_unit_request.h> +#include <nxt_unit_typedefs.h> +#include <nxt_application.h> |