diff options
Diffstat (limited to 'src/java/nginx/unit/websocket/server')
17 files changed, 2217 insertions, 0 deletions
diff --git a/src/java/nginx/unit/websocket/server/Constants.java b/src/java/nginx/unit/websocket/server/Constants.java new file mode 100644 index 00000000..5210c4ba --- /dev/null +++ b/src/java/nginx/unit/websocket/server/Constants.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nginx.unit.websocket.server; + +/** + * Internal implementation constants. + */ +public class Constants { + + public static final String BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM = + "nginx.unit.websocket.binaryBufferSize"; + public static final String TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM = + "nginx.unit.websocket.textBufferSize"; + public static final String ENFORCE_NO_ADD_AFTER_HANDSHAKE_CONTEXT_INIT_PARAM = + "nginx.unit.websocket.noAddAfterHandshake"; + + public static final String SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE = + "javax.websocket.server.ServerContainer"; + + + private Constants() { + // Hide default constructor + } +} diff --git a/src/java/nginx/unit/websocket/server/DefaultServerEndpointConfigurator.java b/src/java/nginx/unit/websocket/server/DefaultServerEndpointConfigurator.java new file mode 100644 index 00000000..43ffe2bc --- /dev/null +++ b/src/java/nginx/unit/websocket/server/DefaultServerEndpointConfigurator.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nginx.unit.websocket.server; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import javax.websocket.Extension; +import javax.websocket.HandshakeResponse; +import javax.websocket.server.HandshakeRequest; +import javax.websocket.server.ServerEndpointConfig; + +public class DefaultServerEndpointConfigurator + extends ServerEndpointConfig.Configurator { + + @Override + public <T> T getEndpointInstance(Class<T> clazz) + throws InstantiationException { + try { + return clazz.getConstructor().newInstance(); + } catch (InstantiationException e) { + throw e; + } catch (ReflectiveOperationException e) { + InstantiationException ie = new InstantiationException(); + ie.initCause(e); + throw ie; + } + } + + + @Override + public String getNegotiatedSubprotocol(List<String> supported, + List<String> requested) { + + for (String request : requested) { + if (supported.contains(request)) { + return request; + } + } + return ""; + } + + + @Override + public List<Extension> getNegotiatedExtensions(List<Extension> installed, + List<Extension> requested) { + Set<String> installedNames = new HashSet<>(); + for (Extension e : installed) { + installedNames.add(e.getName()); + } + List<Extension> result = new ArrayList<>(); + for (Extension request : requested) { + if (installedNames.contains(request.getName())) { + result.add(request); + } + } + return result; + } + + + @Override + public boolean checkOrigin(String originHeaderValue) { + return true; + } + + @Override + public void modifyHandshake(ServerEndpointConfig sec, + HandshakeRequest request, HandshakeResponse response) { + // NO-OP + } + +} diff --git a/src/java/nginx/unit/websocket/server/LocalStrings.properties b/src/java/nginx/unit/websocket/server/LocalStrings.properties new file mode 100644 index 00000000..5bc12501 --- /dev/null +++ b/src/java/nginx/unit/websocket/server/LocalStrings.properties @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +serverContainer.addNotAllowed=No further Endpoints may be registered once an attempt has been made to use one of the previously registered endpoints +serverContainer.configuratorFail=Failed to create configurator of type [{0}] for POJO of type [{1}] +serverContainer.duplicatePaths=Multiple Endpoints may not be deployed to the same path [{0}] : existing endpoint was [{1}] and new endpoint is [{2}] +serverContainer.encoderFail=Unable to create encoder of type [{0}] +serverContainer.endpointDeploy=Endpoint class [{0}] deploying to path [{1}] in ServletContext [{2}] +serverContainer.missingAnnotation=Cannot deploy POJO class [{0}] as it is not annotated with @ServerEndpoint +serverContainer.missingEndpoint=An Endpoint instance has been request for path [{0}] but no matching Endpoint class was found +serverContainer.pojoDeploy=POJO class [{0}] deploying to path [{1}] in ServletContext [{2}] +serverContainer.servletContextMismatch=Attempted to register a POJO annotated for WebSocket at path [{0}] in the ServletContext with context path [{1}] when the WebSocket ServerContainer is allocated to the ServletContext with context path [{2}] +serverContainer.servletContextMissing=No ServletContext was specified + +upgradeUtil.incompatibleRsv=Extensions were specified that have incompatible RSV bit usage + +uriTemplate.duplicateParameter=The parameter [{0}] appears more than once in the path which is not permitted +uriTemplate.emptySegment=The path [{0}] contains one or more empty segments which are is not permitted +uriTemplate.invalidPath=The path [{0}] is not valid. +uriTemplate.invalidSegment=The segment [{0}] is not valid in the provided path [{1}] + +wsFrameServer.bytesRead=Read [{0}] bytes into input buffer ready for processing +wsFrameServer.illegalReadState=Unexpected read state [{0}] +wsFrameServer.onDataAvailable=Method entry + +wsHttpUpgradeHandler.closeOnError=Closing WebSocket connection due to an error +wsHttpUpgradeHandler.destroyFailed=Failed to close WebConnection while destroying the WebSocket HttpUpgradeHandler +wsHttpUpgradeHandler.noPreInit=The preInit() method must be called to configure the WebSocket HttpUpgradeHandler before the container calls init(). Usually, this means the Servlet that created the WsHttpUpgradeHandler instance should also call preInit() +wsHttpUpgradeHandler.serverStop=The server is stopping + +wsRemoteEndpointServer.closeFailed=Failed to close the ServletOutputStream connection cleanly diff --git a/src/java/nginx/unit/websocket/server/UpgradeUtil.java b/src/java/nginx/unit/websocket/server/UpgradeUtil.java new file mode 100644 index 00000000..162f01c7 --- /dev/null +++ b/src/java/nginx/unit/websocket/server/UpgradeUtil.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nginx.unit.websocket.server; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Enumeration; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.websocket.Endpoint; +import javax.websocket.Extension; +import javax.websocket.HandshakeResponse; +import javax.websocket.server.ServerEndpointConfig; + +import nginx.unit.Request; + +import org.apache.tomcat.util.codec.binary.Base64; +import org.apache.tomcat.util.res.StringManager; +import org.apache.tomcat.util.security.ConcurrentMessageDigest; +import nginx.unit.websocket.Constants; +import nginx.unit.websocket.Transformation; +import nginx.unit.websocket.TransformationFactory; +import nginx.unit.websocket.Util; +import nginx.unit.websocket.WsHandshakeResponse; +import nginx.unit.websocket.pojo.PojoEndpointServer; + +public class UpgradeUtil { + + private static final StringManager sm = + StringManager.getManager(UpgradeUtil.class.getPackage().getName()); + private static final byte[] WS_ACCEPT = + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes( + StandardCharsets.ISO_8859_1); + + private UpgradeUtil() { + // Utility class. Hide default constructor. + } + + /** + * Checks to see if this is an HTTP request that includes a valid upgrade + * request to web socket. + * <p> + * Note: RFC 2616 does not limit HTTP upgrade to GET requests but the Java + * WebSocket spec 1.0, section 8.2 implies such a limitation and RFC + * 6455 section 4.1 requires that a WebSocket Upgrade uses GET. + * @param request The request to check if it is an HTTP upgrade request for + * a WebSocket connection + * @param response The response associated with the request + * @return <code>true</code> if the request includes a HTTP Upgrade request + * for the WebSocket protocol, otherwise <code>false</code> + */ + public static boolean isWebSocketUpgradeRequest(ServletRequest request, + ServletResponse response) { + + Request r = (Request) request.getAttribute(Request.BARE); + + return ((request instanceof HttpServletRequest) && + (response instanceof HttpServletResponse) && + (r != null) && + (r.isUpgrade())); + } + + + public static void doUpgrade(WsServerContainer sc, HttpServletRequest req, + HttpServletResponse resp, ServerEndpointConfig sec, + Map<String,String> pathParams) + throws ServletException, IOException { + + + // Origin check + String origin = req.getHeader(Constants.ORIGIN_HEADER_NAME); + + if (!sec.getConfigurator().checkOrigin(origin)) { + resp.sendError(HttpServletResponse.SC_FORBIDDEN); + return; + } + // Sub-protocols + List<String> subProtocols = getTokensFromHeader(req, + Constants.WS_PROTOCOL_HEADER_NAME); + String subProtocol = sec.getConfigurator().getNegotiatedSubprotocol( + sec.getSubprotocols(), subProtocols); + + // Extensions + // Should normally only be one header but handle the case of multiple + // headers + List<Extension> extensionsRequested = new ArrayList<>(); + Enumeration<String> extHeaders = req.getHeaders(Constants.WS_EXTENSIONS_HEADER_NAME); + while (extHeaders.hasMoreElements()) { + Util.parseExtensionHeader(extensionsRequested, extHeaders.nextElement()); + } + + // Negotiation phase 1. By default this simply filters out the + // extensions that the server does not support but applications could + // use a custom configurator to do more than this. + List<Extension> installedExtensions = null; + if (sec.getExtensions().size() == 0) { + installedExtensions = Constants.INSTALLED_EXTENSIONS; + } else { + installedExtensions = new ArrayList<>(); + installedExtensions.addAll(sec.getExtensions()); + installedExtensions.addAll(Constants.INSTALLED_EXTENSIONS); + } + List<Extension> negotiatedExtensionsPhase1 = sec.getConfigurator().getNegotiatedExtensions( + installedExtensions, extensionsRequested); + + // Negotiation phase 2. Create the Transformations that will be applied + // to this connection. Note than an extension may be dropped at this + // point if the client has requested a configuration that the server is + // unable to support. + List<Transformation> transformations = createTransformations(negotiatedExtensionsPhase1); + + List<Extension> negotiatedExtensionsPhase2; + if (transformations.isEmpty()) { + negotiatedExtensionsPhase2 = Collections.emptyList(); + } else { + negotiatedExtensionsPhase2 = new ArrayList<>(transformations.size()); + for (Transformation t : transformations) { + negotiatedExtensionsPhase2.add(t.getExtensionResponse()); + } + } + + WsHttpUpgradeHandler wsHandler = + req.upgrade(WsHttpUpgradeHandler.class); + + WsHandshakeRequest wsRequest = new WsHandshakeRequest(req, pathParams); + WsHandshakeResponse wsResponse = new WsHandshakeResponse(); + WsPerSessionServerEndpointConfig perSessionServerEndpointConfig = + new WsPerSessionServerEndpointConfig(sec); + sec.getConfigurator().modifyHandshake(perSessionServerEndpointConfig, + wsRequest, wsResponse); + //wsRequest.finished(); + + // Add any additional headers + for (Entry<String,List<String>> entry : + wsResponse.getHeaders().entrySet()) { + for (String headerValue: entry.getValue()) { + resp.addHeader(entry.getKey(), headerValue); + } + } + + Endpoint ep; + try { + Class<?> clazz = sec.getEndpointClass(); + if (Endpoint.class.isAssignableFrom(clazz)) { + ep = (Endpoint) sec.getConfigurator().getEndpointInstance( + clazz); + } else { + ep = new PojoEndpointServer(); + // Need to make path params available to POJO + perSessionServerEndpointConfig.getUserProperties().put( + nginx.unit.websocket.pojo.Constants.POJO_PATH_PARAM_KEY, pathParams); + } + } catch (InstantiationException e) { + throw new ServletException(e); + } + + wsHandler.preInit(ep, perSessionServerEndpointConfig, sc, wsRequest, + negotiatedExtensionsPhase2, subProtocol, null, pathParams, + req.isSecure()); + + wsHandler.init(null); + } + + + private static List<Transformation> createTransformations( + List<Extension> negotiatedExtensions) { + + TransformationFactory factory = TransformationFactory.getInstance(); + + LinkedHashMap<String,List<List<Extension.Parameter>>> extensionPreferences = + new LinkedHashMap<>(); + + // Result will likely be smaller than this + List<Transformation> result = new ArrayList<>(negotiatedExtensions.size()); + + for (Extension extension : negotiatedExtensions) { + List<List<Extension.Parameter>> preferences = + extensionPreferences.get(extension.getName()); + + if (preferences == null) { + preferences = new ArrayList<>(); + extensionPreferences.put(extension.getName(), preferences); + } + + preferences.add(extension.getParameters()); + } + + for (Map.Entry<String,List<List<Extension.Parameter>>> entry : + extensionPreferences.entrySet()) { + Transformation transformation = factory.create(entry.getKey(), entry.getValue(), true); + if (transformation != null) { + result.add(transformation); + } + } + return result; + } + + + private static void append(StringBuilder sb, Extension extension) { + if (extension == null || extension.getName() == null || extension.getName().length() == 0) { + return; + } + + sb.append(extension.getName()); + + for (Extension.Parameter p : extension.getParameters()) { + sb.append(';'); + sb.append(p.getName()); + if (p.getValue() != null) { + sb.append('='); + sb.append(p.getValue()); + } + } + } + + + /* + * This only works for tokens. Quoted strings need more sophisticated + * parsing. + */ + private static boolean headerContainsToken(HttpServletRequest req, + String headerName, String target) { + Enumeration<String> headers = req.getHeaders(headerName); + while (headers.hasMoreElements()) { + String header = headers.nextElement(); + String[] tokens = header.split(","); + for (String token : tokens) { + if (target.equalsIgnoreCase(token.trim())) { + return true; + } + } + } + return false; + } + + + /* + * This only works for tokens. Quoted strings need more sophisticated + * parsing. + */ + private static List<String> getTokensFromHeader(HttpServletRequest req, + String headerName) { + List<String> result = new ArrayList<>(); + Enumeration<String> headers = req.getHeaders(headerName); + while (headers.hasMoreElements()) { + String header = headers.nextElement(); + String[] tokens = header.split(","); + for (String token : tokens) { + result.add(token.trim()); + } + } + return result; + } + + + private static String getWebSocketAccept(String key) { + byte[] digest = ConcurrentMessageDigest.digestSHA1( + key.getBytes(StandardCharsets.ISO_8859_1), WS_ACCEPT); + return Base64.encodeBase64String(digest); + } +} diff --git a/src/java/nginx/unit/websocket/server/UriTemplate.java b/src/java/nginx/unit/websocket/server/UriTemplate.java new file mode 100644 index 00000000..7877fac9 --- /dev/null +++ b/src/java/nginx/unit/websocket/server/UriTemplate.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nginx.unit.websocket.server; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import javax.websocket.DeploymentException; + +import org.apache.tomcat.util.res.StringManager; + +/** + * Extracts path parameters from URIs used to create web socket connections + * using the URI template defined for the associated Endpoint. + */ +public class UriTemplate { + + private static final StringManager sm = StringManager.getManager(UriTemplate.class); + + private final String normalized; + private final List<Segment> segments = new ArrayList<>(); + private final boolean hasParameters; + + + public UriTemplate(String path) throws DeploymentException { + + if (path == null || path.length() ==0 || !path.startsWith("/")) { + throw new DeploymentException( + sm.getString("uriTemplate.invalidPath", path)); + } + + StringBuilder normalized = new StringBuilder(path.length()); + Set<String> paramNames = new HashSet<>(); + + // Include empty segments. + String[] segments = path.split("/", -1); + int paramCount = 0; + int segmentCount = 0; + + for (int i = 0; i < segments.length; i++) { + String segment = segments[i]; + if (segment.length() == 0) { + if (i == 0 || (i == segments.length - 1 && paramCount == 0)) { + // Ignore the first empty segment as the path must always + // start with '/' + // Ending with a '/' is also OK for instances used for + // matches but not for parameterised templates. + continue; + } else { + // As per EG discussion, all other empty segments are + // invalid + throw new IllegalArgumentException(sm.getString( + "uriTemplate.emptySegment", path)); + } + } + normalized.append('/'); + int index = -1; + if (segment.startsWith("{") && segment.endsWith("}")) { + index = segmentCount; + segment = segment.substring(1, segment.length() - 1); + normalized.append('{'); + normalized.append(paramCount++); + normalized.append('}'); + if (!paramNames.add(segment)) { + throw new IllegalArgumentException(sm.getString( + "uriTemplate.duplicateParameter", segment)); + } + } else { + if (segment.contains("{") || segment.contains("}")) { + throw new IllegalArgumentException(sm.getString( + "uriTemplate.invalidSegment", segment, path)); + } + normalized.append(segment); + } + this.segments.add(new Segment(index, segment)); + segmentCount++; + } + + this.normalized = normalized.toString(); + this.hasParameters = paramCount > 0; + } + + + public Map<String,String> match(UriTemplate candidate) { + + Map<String,String> result = new HashMap<>(); + + // Should not happen but for safety + if (candidate.getSegmentCount() != getSegmentCount()) { + return null; + } + + Iterator<Segment> candidateSegments = + candidate.getSegments().iterator(); + Iterator<Segment> targetSegments = segments.iterator(); + + while (candidateSegments.hasNext()) { + Segment candidateSegment = candidateSegments.next(); + Segment targetSegment = targetSegments.next(); + + if (targetSegment.getParameterIndex() == -1) { + // Not a parameter - values must match + if (!targetSegment.getValue().equals( + candidateSegment.getValue())) { + // Not a match. Stop here + return null; + } + } else { + // Parameter + result.put(targetSegment.getValue(), + candidateSegment.getValue()); + } + } + + return result; + } + + + public boolean hasParameters() { + return hasParameters; + } + + + public int getSegmentCount() { + return segments.size(); + } + + + public String getNormalizedPath() { + return normalized; + } + + + private List<Segment> getSegments() { + return segments; + } + + + private static class Segment { + private final int parameterIndex; + private final String value; + + public Segment(int parameterIndex, String value) { + this.parameterIndex = parameterIndex; + this.value = value; + } + + + public int getParameterIndex() { + return parameterIndex; + } + + + public String getValue() { + return value; + } + } +} diff --git a/src/java/nginx/unit/websocket/server/WsContextListener.java b/src/java/nginx/unit/websocket/server/WsContextListener.java new file mode 100644 index 00000000..07137856 --- /dev/null +++ b/src/java/nginx/unit/websocket/server/WsContextListener.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nginx.unit.websocket.server; + +import javax.servlet.ServletContext; +import javax.servlet.ServletContextEvent; +import javax.servlet.ServletContextListener; + +/** + * In normal usage, this {@link ServletContextListener} does not need to be + * explicitly configured as the {@link WsSci} performs all the necessary + * bootstrap and installs this listener in the {@link ServletContext}. If the + * {@link WsSci} is disabled, this listener must be added manually to every + * {@link ServletContext} that uses WebSocket to bootstrap the + * {@link WsServerContainer} correctly. + */ +public class WsContextListener implements ServletContextListener { + + @Override + public void contextInitialized(ServletContextEvent sce) { + ServletContext sc = sce.getServletContext(); + // Don't trigger WebSocket initialization if a WebSocket Server + // Container is already present + if (sc.getAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE) == null) { + WsSci.init(sce.getServletContext(), false); + } + } + + @Override + public void contextDestroyed(ServletContextEvent sce) { + ServletContext sc = sce.getServletContext(); + Object obj = sc.getAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE); + if (obj instanceof WsServerContainer) { + ((WsServerContainer) obj).destroy(); + } + } +} diff --git a/src/java/nginx/unit/websocket/server/WsFilter.java b/src/java/nginx/unit/websocket/server/WsFilter.java new file mode 100644 index 00000000..abea71fc --- /dev/null +++ b/src/java/nginx/unit/websocket/server/WsFilter.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nginx.unit.websocket.server; + +import java.io.IOException; + +import javax.servlet.FilterChain; +import javax.servlet.GenericFilter; +import javax.servlet.ServletException; +import javax.servlet.ServletRequest; +import javax.servlet.ServletResponse; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * Handles the initial HTTP connection for WebSocket connections. + */ +public class WsFilter extends GenericFilter { + + private static final long serialVersionUID = 1L; + + private transient WsServerContainer sc; + + + @Override + public void init() throws ServletException { + sc = (WsServerContainer) getServletContext().getAttribute( + Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE); + } + + + @Override + public void doFilter(ServletRequest request, ServletResponse response, + FilterChain chain) throws IOException, ServletException { + + // This filter only needs to handle WebSocket upgrade requests + if (!sc.areEndpointsRegistered() || + !UpgradeUtil.isWebSocketUpgradeRequest(request, response)) { + chain.doFilter(request, response); + return; + } + + // HTTP request with an upgrade header for WebSocket present + HttpServletRequest req = (HttpServletRequest) request; + HttpServletResponse resp = (HttpServletResponse) response; + + // Check to see if this WebSocket implementation has a matching mapping + String path; + String pathInfo = req.getPathInfo(); + if (pathInfo == null) { + path = req.getServletPath(); + } else { + path = req.getServletPath() + pathInfo; + } + WsMappingResult mappingResult = sc.findMapping(path); + + if (mappingResult == null) { + // No endpoint registered for the requested path. Let the + // application handle it (it might redirect or forward for example) + chain.doFilter(request, response); + return; + } + + UpgradeUtil.doUpgrade(sc, req, resp, mappingResult.getConfig(), + mappingResult.getPathParams()); + } +} diff --git a/src/java/nginx/unit/websocket/server/WsHandshakeRequest.java b/src/java/nginx/unit/websocket/server/WsHandshakeRequest.java new file mode 100644 index 00000000..fa774302 --- /dev/null +++ b/src/java/nginx/unit/websocket/server/WsHandshakeRequest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nginx.unit.websocket.server; + +import java.net.URI; +import java.net.URISyntaxException; +import java.security.Principal; +import java.util.Arrays; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import javax.servlet.http.HttpServletRequest; +import javax.websocket.server.HandshakeRequest; + +import org.apache.tomcat.util.collections.CaseInsensitiveKeyMap; +import org.apache.tomcat.util.res.StringManager; + +/** + * Represents the request that this session was opened under. + */ +public class WsHandshakeRequest implements HandshakeRequest { + + private static final StringManager sm = StringManager.getManager(WsHandshakeRequest.class); + + private final URI requestUri; + private final Map<String,List<String>> parameterMap; + private final String queryString; + private final Principal userPrincipal; + private final Map<String,List<String>> headers; + private final Object httpSession; + + private volatile HttpServletRequest request; + + + public WsHandshakeRequest(HttpServletRequest request, Map<String,String> pathParams) { + + this.request = request; + + queryString = request.getQueryString(); + userPrincipal = request.getUserPrincipal(); + httpSession = request.getSession(false); + requestUri = buildRequestUri(request); + + // ParameterMap + Map<String,String[]> originalParameters = request.getParameterMap(); + Map<String,List<String>> newParameters = + new HashMap<>(originalParameters.size()); + for (Entry<String,String[]> entry : originalParameters.entrySet()) { + newParameters.put(entry.getKey(), + Collections.unmodifiableList( + Arrays.asList(entry.getValue()))); + } + for (Entry<String,String> entry : pathParams.entrySet()) { + newParameters.put(entry.getKey(), + Collections.unmodifiableList( + Collections.singletonList(entry.getValue()))); + } + parameterMap = Collections.unmodifiableMap(newParameters); + + // Headers + Map<String,List<String>> newHeaders = new CaseInsensitiveKeyMap<>(); + + Enumeration<String> headerNames = request.getHeaderNames(); + while (headerNames.hasMoreElements()) { + String headerName = headerNames.nextElement(); + + newHeaders.put(headerName, Collections.unmodifiableList( + Collections.list(request.getHeaders(headerName)))); + } + + headers = Collections.unmodifiableMap(newHeaders); + } + + @Override + public URI getRequestURI() { + return requestUri; + } + + @Override + public Map<String,List<String>> getParameterMap() { + return parameterMap; + } + + @Override + public String getQueryString() { + return queryString; + } + + @Override + public Principal getUserPrincipal() { + return userPrincipal; + } + + @Override + public Map<String,List<String>> getHeaders() { + return headers; + } + + @Override + public boolean isUserInRole(String role) { + if (request == null) { + throw new IllegalStateException(); + } + + return request.isUserInRole(role); + } + + @Override + public Object getHttpSession() { + return httpSession; + } + + /** + * Called when the HandshakeRequest is no longer required. Since an instance + * of this class retains a reference to the current HttpServletRequest that + * reference needs to be cleared as the HttpServletRequest may be reused. + * + * There is no reason for instances of this class to be accessed once the + * handshake has been completed. + */ + void finished() { + request = null; + } + + + /* + * See RequestUtil.getRequestURL() + */ + private static URI buildRequestUri(HttpServletRequest req) { + + StringBuffer uri = new StringBuffer(); + String scheme = req.getScheme(); + int port = req.getServerPort(); + if (port < 0) { + // Work around java.net.URL bug + port = 80; + } + + if ("http".equals(scheme)) { + uri.append("ws"); + } else if ("https".equals(scheme)) { + uri.append("wss"); + } else { + // Should never happen + throw new IllegalArgumentException( + sm.getString("wsHandshakeRequest.unknownScheme", scheme)); + } + + uri.append("://"); + uri.append(req.getServerName()); + + if ((scheme.equals("http") && (port != 80)) + || (scheme.equals("https") && (port != 443))) { + uri.append(':'); + uri.append(port); + } + + uri.append(req.getRequestURI()); + + if (req.getQueryString() != null) { + uri.append("?"); + uri.append(req.getQueryString()); + } + + try { + return new URI(uri.toString()); + } catch (URISyntaxException e) { + // Should never happen + throw new IllegalArgumentException( + sm.getString("wsHandshakeRequest.invalidUri", uri.toString()), e); + } + } + + public Object getAttribute(String name) + { + return request != null ? request.getAttribute(name) : null; + } +} diff --git a/src/java/nginx/unit/websocket/server/WsHttpUpgradeHandler.java b/src/java/nginx/unit/websocket/server/WsHttpUpgradeHandler.java new file mode 100644 index 00000000..cc39ab73 --- /dev/null +++ b/src/java/nginx/unit/websocket/server/WsHttpUpgradeHandler.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nginx.unit.websocket.server; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import javax.servlet.http.HttpSession; +import javax.servlet.http.HttpUpgradeHandler; +import javax.servlet.http.WebConnection; +import javax.websocket.CloseReason; +import javax.websocket.CloseReason.CloseCodes; +import javax.websocket.DeploymentException; +import javax.websocket.Endpoint; +import javax.websocket.EndpointConfig; +import javax.websocket.Extension; + +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.res.StringManager; + +import nginx.unit.websocket.Transformation; +import nginx.unit.websocket.WsIOException; +import nginx.unit.websocket.WsSession; + +import nginx.unit.Request; + +/** + * Servlet 3.1 HTTP upgrade handler for WebSocket connections. + */ +public class WsHttpUpgradeHandler implements HttpUpgradeHandler { + + private final Log log = LogFactory.getLog(WsHttpUpgradeHandler.class); // must not be static + private static final StringManager sm = StringManager.getManager(WsHttpUpgradeHandler.class); + + private final ClassLoader applicationClassLoader; + + private Endpoint ep; + private EndpointConfig endpointConfig; + private WsServerContainer webSocketContainer; + private WsHandshakeRequest handshakeRequest; + private List<Extension> negotiatedExtensions; + private String subProtocol; + private Transformation transformation; + private Map<String,String> pathParameters; + private boolean secure; + private WebConnection connection; + private WsRemoteEndpointImplServer wsRemoteEndpointServer; + private WsSession wsSession; + + + public WsHttpUpgradeHandler() { + applicationClassLoader = Thread.currentThread().getContextClassLoader(); + } + + public void preInit(Endpoint ep, EndpointConfig endpointConfig, + WsServerContainer wsc, WsHandshakeRequest handshakeRequest, + List<Extension> negotiatedExtensionsPhase2, String subProtocol, + Transformation transformation, Map<String,String> pathParameters, + boolean secure) { + this.ep = ep; + this.endpointConfig = endpointConfig; + this.webSocketContainer = wsc; + this.handshakeRequest = handshakeRequest; + this.negotiatedExtensions = negotiatedExtensionsPhase2; + this.subProtocol = subProtocol; + this.transformation = transformation; + this.pathParameters = pathParameters; + this.secure = secure; + } + + + @Override + public void init(WebConnection connection) { + if (ep == null) { + throw new IllegalStateException( + sm.getString("wsHttpUpgradeHandler.noPreInit")); + } + + String httpSessionId = null; + Object session = handshakeRequest.getHttpSession(); + if (session != null ) { + httpSessionId = ((HttpSession) session).getId(); + } + + nginx.unit.Context.trace("UpgradeHandler.init(" + connection + ")"); + +/* + // Need to call onOpen using the web application's class loader + // Create the frame using the application's class loader so it can pick + // up application specific config from the ServerContainerImpl + Thread t = Thread.currentThread(); + ClassLoader cl = t.getContextClassLoader(); + t.setContextClassLoader(applicationClassLoader); +*/ + try { + Request r = (Request) handshakeRequest.getAttribute(Request.BARE); + + wsRemoteEndpointServer = new WsRemoteEndpointImplServer(webSocketContainer); + wsSession = new WsSession(ep, wsRemoteEndpointServer, + webSocketContainer, handshakeRequest.getRequestURI(), + handshakeRequest.getParameterMap(), + handshakeRequest.getQueryString(), + handshakeRequest.getUserPrincipal(), httpSessionId, + negotiatedExtensions, subProtocol, pathParameters, secure, + endpointConfig, r); + + ep.onOpen(wsSession, endpointConfig); + webSocketContainer.registerSession(ep, wsSession); + } catch (DeploymentException e) { + throw new IllegalArgumentException(e); +/* + } finally { + t.setContextClassLoader(cl); +*/ + } + } + + + + @Override + public void destroy() { + if (connection != null) { + try { + connection.close(); + } catch (Exception e) { + log.error(sm.getString("wsHttpUpgradeHandler.destroyFailed"), e); + } + } + } + + + private void onError(Throwable throwable) { + // Need to call onError using the web application's class loader + Thread t = Thread.currentThread(); + ClassLoader cl = t.getContextClassLoader(); + t.setContextClassLoader(applicationClassLoader); + try { + ep.onError(wsSession, throwable); + } finally { + t.setContextClassLoader(cl); + } + } + + + private void close(CloseReason cr) { + /* + * Any call to this method is a result of a problem reading from the + * client. At this point that state of the connection is unknown. + * Attempt to send a close frame to the client and then close the socket + * immediately. There is no point in waiting for a close frame from the + * client because there is no guarantee that we can recover from + * whatever messed up state the client put the connection into. + */ + wsSession.onClose(cr); + } +} diff --git a/src/java/nginx/unit/websocket/server/WsMappingResult.java b/src/java/nginx/unit/websocket/server/WsMappingResult.java new file mode 100644 index 00000000..a7a4c022 --- /dev/null +++ b/src/java/nginx/unit/websocket/server/WsMappingResult.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nginx.unit.websocket.server; + +import java.util.Map; + +import javax.websocket.server.ServerEndpointConfig; + +class WsMappingResult { + + private final ServerEndpointConfig config; + private final Map<String,String> pathParams; + + + WsMappingResult(ServerEndpointConfig config, + Map<String,String> pathParams) { + this.config = config; + this.pathParams = pathParams; + } + + + ServerEndpointConfig getConfig() { + return config; + } + + + Map<String,String> getPathParams() { + return pathParams; + } +} diff --git a/src/java/nginx/unit/websocket/server/WsPerSessionServerEndpointConfig.java b/src/java/nginx/unit/websocket/server/WsPerSessionServerEndpointConfig.java new file mode 100644 index 00000000..2be050cb --- /dev/null +++ b/src/java/nginx/unit/websocket/server/WsPerSessionServerEndpointConfig.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nginx.unit.websocket.server; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import javax.websocket.Decoder; +import javax.websocket.Encoder; +import javax.websocket.Extension; +import javax.websocket.server.ServerEndpointConfig; + +/** + * Wraps the provided {@link ServerEndpointConfig} and provides a per session + * view - the difference being that the map returned by {@link + * #getUserProperties()} is unique to this instance rather than shared with the + * wrapped {@link ServerEndpointConfig}. + */ +class WsPerSessionServerEndpointConfig implements ServerEndpointConfig { + + private final ServerEndpointConfig perEndpointConfig; + private final Map<String,Object> perSessionUserProperties = + new ConcurrentHashMap<>(); + + WsPerSessionServerEndpointConfig(ServerEndpointConfig perEndpointConfig) { + this.perEndpointConfig = perEndpointConfig; + perSessionUserProperties.putAll(perEndpointConfig.getUserProperties()); + } + + @Override + public List<Class<? extends Encoder>> getEncoders() { + return perEndpointConfig.getEncoders(); + } + + @Override + public List<Class<? extends Decoder>> getDecoders() { + return perEndpointConfig.getDecoders(); + } + + @Override + public Map<String,Object> getUserProperties() { + return perSessionUserProperties; + } + + @Override + public Class<?> getEndpointClass() { + return perEndpointConfig.getEndpointClass(); + } + + @Override + public String getPath() { + return perEndpointConfig.getPath(); + } + + @Override + public List<String> getSubprotocols() { + return perEndpointConfig.getSubprotocols(); + } + + @Override + public List<Extension> getExtensions() { + return perEndpointConfig.getExtensions(); + } + + @Override + public Configurator getConfigurator() { + return perEndpointConfig.getConfigurator(); + } +} diff --git a/src/java/nginx/unit/websocket/server/WsRemoteEndpointImplServer.java b/src/java/nginx/unit/websocket/server/WsRemoteEndpointImplServer.java new file mode 100644 index 00000000..6d10a3be --- /dev/null +++ b/src/java/nginx/unit/websocket/server/WsRemoteEndpointImplServer.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nginx.unit.websocket.server; + +import java.io.EOFException; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.channels.CompletionHandler; +import java.nio.channels.InterruptedByTimeoutException; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; + +import javax.websocket.SendHandler; +import javax.websocket.SendResult; + +import org.apache.juli.logging.Log; +import org.apache.juli.logging.LogFactory; +import org.apache.tomcat.util.res.StringManager; +import nginx.unit.websocket.Transformation; +import nginx.unit.websocket.WsRemoteEndpointImplBase; + +/** + * This is the server side {@link javax.websocket.RemoteEndpoint} implementation + * - i.e. what the server uses to send data to the client. + */ +public class WsRemoteEndpointImplServer extends WsRemoteEndpointImplBase { + + private static final StringManager sm = + StringManager.getManager(WsRemoteEndpointImplServer.class); + private final Log log = LogFactory.getLog(WsRemoteEndpointImplServer.class); // must not be static + + private volatile SendHandler handler = null; + private volatile ByteBuffer[] buffers = null; + + private volatile long timeoutExpiry = -1; + private volatile boolean close; + + public WsRemoteEndpointImplServer( + WsServerContainer serverContainer) { + } + + + @Override + protected final boolean isMasked() { + return false; + } + + @Override + protected void doWrite(SendHandler handler, long blockingWriteTimeoutExpiry, + ByteBuffer... buffers) { + } + + @Override + protected void doClose() { + if (handler != null) { + // close() can be triggered by a wide range of scenarios. It is far + // simpler just to always use a dispatch than it is to try and track + // whether or not this method was called by the same thread that + // triggered the write + clearHandler(new EOFException(), true); + } + } + + + protected long getTimeoutExpiry() { + return timeoutExpiry; + } + + + /* + * Currently this is only called from the background thread so we could just + * call clearHandler() with useDispatch == false but the method parameter + * was added in case other callers started to use this method to make sure + * that those callers think through what the correct value of useDispatch is + * for them. + */ + protected void onTimeout(boolean useDispatch) { + if (handler != null) { + clearHandler(new SocketTimeoutException(), useDispatch); + } + close(); + } + + + @Override + protected void setTransformation(Transformation transformation) { + // Overridden purely so it is visible to other classes in this package + super.setTransformation(transformation); + } + + + /** + * + * @param t The throwable associated with any error that + * occurred + * @param useDispatch Should {@link SendHandler#onResult(SendResult)} be + * called from a new thread, keeping in mind the + * requirements of + * {@link javax.websocket.RemoteEndpoint.Async} + */ + private void clearHandler(Throwable t, boolean useDispatch) { + // Setting the result marks this (partial) message as + // complete which means the next one may be sent which + // could update the value of the handler. Therefore, keep a + // local copy before signalling the end of the (partial) + // message. + SendHandler sh = handler; + handler = null; + buffers = null; + if (sh != null) { + if (useDispatch) { + OnResultRunnable r = new OnResultRunnable(sh, t); + } else { + if (t == null) { + sh.onResult(new SendResult()); + } else { + sh.onResult(new SendResult(t)); + } + } + } + } + + + private static class OnResultRunnable implements Runnable { + + private final SendHandler sh; + private final Throwable t; + + private OnResultRunnable(SendHandler sh, Throwable t) { + this.sh = sh; + this.t = t; + } + + @Override + public void run() { + if (t == null) { + sh.onResult(new SendResult()); + } else { + sh.onResult(new SendResult(t)); + } + } + } +} diff --git a/src/java/nginx/unit/websocket/server/WsSci.java b/src/java/nginx/unit/websocket/server/WsSci.java new file mode 100644 index 00000000..cdecce27 --- /dev/null +++ b/src/java/nginx/unit/websocket/server/WsSci.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nginx.unit.websocket.server; + +import java.lang.reflect.Modifier; +import java.util.HashSet; +import java.util.Set; + +import javax.servlet.ServletContainerInitializer; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.annotation.HandlesTypes; +import javax.websocket.ContainerProvider; +import javax.websocket.DeploymentException; +import javax.websocket.Endpoint; +import javax.websocket.server.ServerApplicationConfig; +import javax.websocket.server.ServerEndpoint; +import javax.websocket.server.ServerEndpointConfig; + +/** + * Registers an interest in any class that is annotated with + * {@link ServerEndpoint} so that Endpoint can be published via the WebSocket + * server. + */ +@HandlesTypes({ServerEndpoint.class, ServerApplicationConfig.class, + Endpoint.class}) +public class WsSci implements ServletContainerInitializer { + + @Override + public void onStartup(Set<Class<?>> clazzes, ServletContext ctx) + throws ServletException { + + WsServerContainer sc = init(ctx, true); + + if (clazzes == null || clazzes.size() == 0) { + return; + } + + // Group the discovered classes by type + Set<ServerApplicationConfig> serverApplicationConfigs = new HashSet<>(); + Set<Class<? extends Endpoint>> scannedEndpointClazzes = new HashSet<>(); + Set<Class<?>> scannedPojoEndpoints = new HashSet<>(); + + try { + // wsPackage is "javax.websocket." + String wsPackage = ContainerProvider.class.getName(); + wsPackage = wsPackage.substring(0, wsPackage.lastIndexOf('.') + 1); + for (Class<?> clazz : clazzes) { + int modifiers = clazz.getModifiers(); + if (!Modifier.isPublic(modifiers) || + Modifier.isAbstract(modifiers)) { + // Non-public or abstract - skip it. + continue; + } + // Protect against scanning the WebSocket API JARs + if (clazz.getName().startsWith(wsPackage)) { + continue; + } + if (ServerApplicationConfig.class.isAssignableFrom(clazz)) { + serverApplicationConfigs.add( + (ServerApplicationConfig) clazz.getConstructor().newInstance()); + } + if (Endpoint.class.isAssignableFrom(clazz)) { + @SuppressWarnings("unchecked") + Class<? extends Endpoint> endpoint = + (Class<? extends Endpoint>) clazz; + scannedEndpointClazzes.add(endpoint); + } + if (clazz.isAnnotationPresent(ServerEndpoint.class)) { + scannedPojoEndpoints.add(clazz); + } + } + } catch (ReflectiveOperationException e) { + throw new ServletException(e); + } + + // Filter the results + Set<ServerEndpointConfig> filteredEndpointConfigs = new HashSet<>(); + Set<Class<?>> filteredPojoEndpoints = new HashSet<>(); + + if (serverApplicationConfigs.isEmpty()) { + filteredPojoEndpoints.addAll(scannedPojoEndpoints); + } else { + for (ServerApplicationConfig config : serverApplicationConfigs) { + Set<ServerEndpointConfig> configFilteredEndpoints = + config.getEndpointConfigs(scannedEndpointClazzes); + if (configFilteredEndpoints != null) { + filteredEndpointConfigs.addAll(configFilteredEndpoints); + } + Set<Class<?>> configFilteredPojos = + config.getAnnotatedEndpointClasses( + scannedPojoEndpoints); + if (configFilteredPojos != null) { + filteredPojoEndpoints.addAll(configFilteredPojos); + } + } + } + + try { + // Deploy endpoints + for (ServerEndpointConfig config : filteredEndpointConfigs) { + sc.addEndpoint(config); + } + // Deploy POJOs + for (Class<?> clazz : filteredPojoEndpoints) { + sc.addEndpoint(clazz); + } + } catch (DeploymentException e) { + throw new ServletException(e); + } + } + + + static WsServerContainer init(ServletContext servletContext, + boolean initBySciMechanism) { + + WsServerContainer sc = new WsServerContainer(servletContext); + + servletContext.setAttribute( + Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE, sc); + + servletContext.addListener(new WsSessionListener(sc)); + // Can't register the ContextListener again if the ContextListener is + // calling this method + if (initBySciMechanism) { + servletContext.addListener(new WsContextListener()); + } + + return sc; + } +} diff --git a/src/java/nginx/unit/websocket/server/WsServerContainer.java b/src/java/nginx/unit/websocket/server/WsServerContainer.java new file mode 100644 index 00000000..069fc54f --- /dev/null +++ b/src/java/nginx/unit/websocket/server/WsServerContainer.java @@ -0,0 +1,470 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nginx.unit.websocket.server; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; + +import javax.servlet.DispatcherType; +import javax.servlet.FilterRegistration; +import javax.servlet.ServletContext; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.websocket.CloseReason; +import javax.websocket.CloseReason.CloseCodes; +import javax.websocket.DeploymentException; +import javax.websocket.Encoder; +import javax.websocket.Endpoint; +import javax.websocket.server.ServerContainer; +import javax.websocket.server.ServerEndpoint; +import javax.websocket.server.ServerEndpointConfig; +import javax.websocket.server.ServerEndpointConfig.Configurator; + +import org.apache.tomcat.InstanceManager; +import org.apache.tomcat.util.res.StringManager; +import nginx.unit.websocket.WsSession; +import nginx.unit.websocket.WsWebSocketContainer; +import nginx.unit.websocket.pojo.PojoMethodMapping; + +/** + * Provides a per class loader (i.e. per web application) instance of a + * ServerContainer. Web application wide defaults may be configured by setting + * the following servlet context initialisation parameters to the desired + * values. + * <ul> + * <li>{@link Constants#BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM}</li> + * <li>{@link Constants#TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM}</li> + * </ul> + */ +public class WsServerContainer extends WsWebSocketContainer + implements ServerContainer { + + private static final StringManager sm = StringManager.getManager(WsServerContainer.class); + + private static final CloseReason AUTHENTICATED_HTTP_SESSION_CLOSED = + new CloseReason(CloseCodes.VIOLATED_POLICY, + "This connection was established under an authenticated " + + "HTTP session that has ended."); + + private final ServletContext servletContext; + private final Map<String,ServerEndpointConfig> configExactMatchMap = + new ConcurrentHashMap<>(); + private final Map<Integer,SortedSet<TemplatePathMatch>> configTemplateMatchMap = + new ConcurrentHashMap<>(); + private volatile boolean enforceNoAddAfterHandshake = + nginx.unit.websocket.Constants.STRICT_SPEC_COMPLIANCE; + private volatile boolean addAllowed = true; + private final Map<String,Set<WsSession>> authenticatedSessions = new ConcurrentHashMap<>(); + private volatile boolean endpointsRegistered = false; + + WsServerContainer(ServletContext servletContext) { + + this.servletContext = servletContext; + setInstanceManager((InstanceManager) servletContext.getAttribute(InstanceManager.class.getName())); + + // Configure servlet context wide defaults + String value = servletContext.getInitParameter( + Constants.BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM); + if (value != null) { + setDefaultMaxBinaryMessageBufferSize(Integer.parseInt(value)); + } + + value = servletContext.getInitParameter( + Constants.TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM); + if (value != null) { + setDefaultMaxTextMessageBufferSize(Integer.parseInt(value)); + } + + value = servletContext.getInitParameter( + Constants.ENFORCE_NO_ADD_AFTER_HANDSHAKE_CONTEXT_INIT_PARAM); + if (value != null) { + setEnforceNoAddAfterHandshake(Boolean.parseBoolean(value)); + } + + FilterRegistration.Dynamic fr = servletContext.addFilter( + "Tomcat WebSocket (JSR356) Filter", new WsFilter()); + fr.setAsyncSupported(true); + + EnumSet<DispatcherType> types = EnumSet.of(DispatcherType.REQUEST, + DispatcherType.FORWARD); + + fr.addMappingForUrlPatterns(types, true, "/*"); + } + + + /** + * Published the provided endpoint implementation at the specified path with + * the specified configuration. {@link #WsServerContainer(ServletContext)} + * must be called before calling this method. + * + * @param sec The configuration to use when creating endpoint instances + * @throws DeploymentException if the endpoint cannot be published as + * requested + */ + @Override + public void addEndpoint(ServerEndpointConfig sec) + throws DeploymentException { + + if (enforceNoAddAfterHandshake && !addAllowed) { + throw new DeploymentException( + sm.getString("serverContainer.addNotAllowed")); + } + + if (servletContext == null) { + throw new DeploymentException( + sm.getString("serverContainer.servletContextMissing")); + } + String path = sec.getPath(); + + // Add method mapping to user properties + PojoMethodMapping methodMapping = new PojoMethodMapping(sec.getEndpointClass(), + sec.getDecoders(), path); + if (methodMapping.getOnClose() != null || methodMapping.getOnOpen() != null + || methodMapping.getOnError() != null || methodMapping.hasMessageHandlers()) { + sec.getUserProperties().put(nginx.unit.websocket.pojo.Constants.POJO_METHOD_MAPPING_KEY, + methodMapping); + } + + UriTemplate uriTemplate = new UriTemplate(path); + if (uriTemplate.hasParameters()) { + Integer key = Integer.valueOf(uriTemplate.getSegmentCount()); + SortedSet<TemplatePathMatch> templateMatches = + configTemplateMatchMap.get(key); + if (templateMatches == null) { + // Ensure that if concurrent threads execute this block they + // both end up using the same TreeSet instance + templateMatches = new TreeSet<>( + TemplatePathMatchComparator.getInstance()); + configTemplateMatchMap.putIfAbsent(key, templateMatches); + templateMatches = configTemplateMatchMap.get(key); + } + if (!templateMatches.add(new TemplatePathMatch(sec, uriTemplate))) { + // Duplicate uriTemplate; + throw new DeploymentException( + sm.getString("serverContainer.duplicatePaths", path, + sec.getEndpointClass(), + sec.getEndpointClass())); + } + } else { + // Exact match + ServerEndpointConfig old = configExactMatchMap.put(path, sec); + if (old != null) { + // Duplicate path mappings + throw new DeploymentException( + sm.getString("serverContainer.duplicatePaths", path, + old.getEndpointClass(), + sec.getEndpointClass())); + } + } + + endpointsRegistered = true; + } + + + /** + * Provides the equivalent of {@link #addEndpoint(ServerEndpointConfig)} + * for publishing plain old java objects (POJOs) that have been annotated as + * WebSocket endpoints. + * + * @param pojo The annotated POJO + */ + @Override + public void addEndpoint(Class<?> pojo) throws DeploymentException { + + ServerEndpoint annotation = pojo.getAnnotation(ServerEndpoint.class); + if (annotation == null) { + throw new DeploymentException( + sm.getString("serverContainer.missingAnnotation", + pojo.getName())); + } + String path = annotation.value(); + + // Validate encoders + validateEncoders(annotation.encoders()); + + // ServerEndpointConfig + ServerEndpointConfig sec; + Class<? extends Configurator> configuratorClazz = + annotation.configurator(); + Configurator configurator = null; + if (!configuratorClazz.equals(Configurator.class)) { + try { + configurator = annotation.configurator().getConstructor().newInstance(); + } catch (ReflectiveOperationException e) { + throw new DeploymentException(sm.getString( + "serverContainer.configuratorFail", + annotation.configurator().getName(), + pojo.getClass().getName()), e); + } + } + if (configurator == null) { + configurator = new nginx.unit.websocket.server.DefaultServerEndpointConfigurator(); + } + sec = ServerEndpointConfig.Builder.create(pojo, path). + decoders(Arrays.asList(annotation.decoders())). + encoders(Arrays.asList(annotation.encoders())). + subprotocols(Arrays.asList(annotation.subprotocols())). + configurator(configurator). + build(); + + addEndpoint(sec); + } + + + boolean areEndpointsRegistered() { + return endpointsRegistered; + } + + + /** + * Until the WebSocket specification provides such a mechanism, this Tomcat + * proprietary method is provided to enable applications to programmatically + * determine whether or not to upgrade an individual request to WebSocket. + * <p> + * Note: This method is not used by Tomcat but is used directly by + * third-party code and must not be removed. + * + * @param request The request object to be upgraded + * @param response The response object to be populated with the result of + * the upgrade + * @param sec The server endpoint to use to process the upgrade request + * @param pathParams The path parameters associated with the upgrade request + * + * @throws ServletException If a configuration error prevents the upgrade + * from taking place + * @throws IOException If an I/O error occurs during the upgrade process + */ + public void doUpgrade(HttpServletRequest request, + HttpServletResponse response, ServerEndpointConfig sec, + Map<String,String> pathParams) + throws ServletException, IOException { + UpgradeUtil.doUpgrade(this, request, response, sec, pathParams); + } + + + public WsMappingResult findMapping(String path) { + + // Prevent registering additional endpoints once the first attempt has + // been made to use one + if (addAllowed) { + addAllowed = false; + } + + // Check an exact match. Simple case as there are no templates. + ServerEndpointConfig sec = configExactMatchMap.get(path); + if (sec != null) { + return new WsMappingResult(sec, Collections.<String, String>emptyMap()); + } + + // No exact match. Need to look for template matches. + UriTemplate pathUriTemplate = null; + try { + pathUriTemplate = new UriTemplate(path); + } catch (DeploymentException e) { + // Path is not valid so can't be matched to a WebSocketEndpoint + return null; + } + + // Number of segments has to match + Integer key = Integer.valueOf(pathUriTemplate.getSegmentCount()); + SortedSet<TemplatePathMatch> templateMatches = + configTemplateMatchMap.get(key); + + if (templateMatches == null) { + // No templates with an equal number of segments so there will be + // no matches + return null; + } + + // List is in alphabetical order of normalised templates. + // Correct match is the first one that matches. + Map<String,String> pathParams = null; + for (TemplatePathMatch templateMatch : templateMatches) { + pathParams = templateMatch.getUriTemplate().match(pathUriTemplate); + if (pathParams != null) { + sec = templateMatch.getConfig(); + break; + } + } + + if (sec == null) { + // No match + return null; + } + + return new WsMappingResult(sec, pathParams); + } + + + + public boolean isEnforceNoAddAfterHandshake() { + return enforceNoAddAfterHandshake; + } + + + public void setEnforceNoAddAfterHandshake( + boolean enforceNoAddAfterHandshake) { + this.enforceNoAddAfterHandshake = enforceNoAddAfterHandshake; + } + + + /** + * {@inheritDoc} + * + * Overridden to make it visible to other classes in this package. + */ + @Override + protected void registerSession(Endpoint endpoint, WsSession wsSession) { + super.registerSession(endpoint, wsSession); + if (wsSession.isOpen() && + wsSession.getUserPrincipal() != null && + wsSession.getHttpSessionId() != null) { + registerAuthenticatedSession(wsSession, + wsSession.getHttpSessionId()); + } + } + + + /** + * {@inheritDoc} + * + * Overridden to make it visible to other classes in this package. + */ + @Override + protected void unregisterSession(Endpoint endpoint, WsSession wsSession) { + if (wsSession.getUserPrincipal() != null && + wsSession.getHttpSessionId() != null) { + unregisterAuthenticatedSession(wsSession, + wsSession.getHttpSessionId()); + } + super.unregisterSession(endpoint, wsSession); + } + + + private void registerAuthenticatedSession(WsSession wsSession, + String httpSessionId) { + Set<WsSession> wsSessions = authenticatedSessions.get(httpSessionId); + if (wsSessions == null) { + wsSessions = Collections.newSetFromMap( + new ConcurrentHashMap<WsSession,Boolean>()); + authenticatedSessions.putIfAbsent(httpSessionId, wsSessions); + wsSessions = authenticatedSessions.get(httpSessionId); + } + wsSessions.add(wsSession); + } + + + private void unregisterAuthenticatedSession(WsSession wsSession, + String httpSessionId) { + Set<WsSession> wsSessions = authenticatedSessions.get(httpSessionId); + // wsSessions will be null if the HTTP session has ended + if (wsSessions != null) { + wsSessions.remove(wsSession); + } + } + + + public void closeAuthenticatedSession(String httpSessionId) { + Set<WsSession> wsSessions = authenticatedSessions.remove(httpSessionId); + + if (wsSessions != null && !wsSessions.isEmpty()) { + for (WsSession wsSession : wsSessions) { + try { + wsSession.close(AUTHENTICATED_HTTP_SESSION_CLOSED); + } catch (IOException e) { + // Any IOExceptions during close will have been caught and the + // onError method called. + } + } + } + } + + + private static void validateEncoders(Class<? extends Encoder>[] encoders) + throws DeploymentException { + + for (Class<? extends Encoder> encoder : encoders) { + // Need to instantiate decoder to ensure it is valid and that + // deployment can be failed if it is not + @SuppressWarnings("unused") + Encoder instance; + try { + encoder.getConstructor().newInstance(); + } catch(ReflectiveOperationException e) { + throw new DeploymentException(sm.getString( + "serverContainer.encoderFail", encoder.getName()), e); + } + } + } + + + private static class TemplatePathMatch { + private final ServerEndpointConfig config; + private final UriTemplate uriTemplate; + + public TemplatePathMatch(ServerEndpointConfig config, + UriTemplate uriTemplate) { + this.config = config; + this.uriTemplate = uriTemplate; + } + + + public ServerEndpointConfig getConfig() { + return config; + } + + + public UriTemplate getUriTemplate() { + return uriTemplate; + } + } + + + /** + * This Comparator implementation is thread-safe so only create a single + * instance. + */ + private static class TemplatePathMatchComparator + implements Comparator<TemplatePathMatch> { + + private static final TemplatePathMatchComparator INSTANCE = + new TemplatePathMatchComparator(); + + public static TemplatePathMatchComparator getInstance() { + return INSTANCE; + } + + private TemplatePathMatchComparator() { + // Hide default constructor + } + + @Override + public int compare(TemplatePathMatch tpm1, TemplatePathMatch tpm2) { + return tpm1.getUriTemplate().getNormalizedPath().compareTo( + tpm2.getUriTemplate().getNormalizedPath()); + } + } +} diff --git a/src/java/nginx/unit/websocket/server/WsSessionListener.java b/src/java/nginx/unit/websocket/server/WsSessionListener.java new file mode 100644 index 00000000..fc2bc9c5 --- /dev/null +++ b/src/java/nginx/unit/websocket/server/WsSessionListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nginx.unit.websocket.server; + +import javax.servlet.http.HttpSessionEvent; +import javax.servlet.http.HttpSessionListener; + +public class WsSessionListener implements HttpSessionListener{ + + private final WsServerContainer wsServerContainer; + + + public WsSessionListener(WsServerContainer wsServerContainer) { + this.wsServerContainer = wsServerContainer; + } + + + @Override + public void sessionDestroyed(HttpSessionEvent se) { + wsServerContainer.closeAuthenticatedSession(se.getSession().getId()); + } +} diff --git a/src/java/nginx/unit/websocket/server/WsWriteTimeout.java b/src/java/nginx/unit/websocket/server/WsWriteTimeout.java new file mode 100644 index 00000000..2dfc4ab2 --- /dev/null +++ b/src/java/nginx/unit/websocket/server/WsWriteTimeout.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package nginx.unit.websocket.server; + +import java.util.Comparator; +import java.util.Set; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicInteger; + +import nginx.unit.websocket.BackgroundProcess; +import nginx.unit.websocket.BackgroundProcessManager; + +/** + * Provides timeouts for asynchronous web socket writes. On the server side we + * only have access to {@link javax.servlet.ServletOutputStream} and + * {@link javax.servlet.ServletInputStream} so there is no way to set a timeout + * for writes to the client. + */ +public class WsWriteTimeout implements BackgroundProcess { + + private final Set<WsRemoteEndpointImplServer> endpoints = + new ConcurrentSkipListSet<>(new EndpointComparator()); + private final AtomicInteger count = new AtomicInteger(0); + private int backgroundProcessCount = 0; + private volatile int processPeriod = 1; + + @Override + public void backgroundProcess() { + // This method gets called once a second. + backgroundProcessCount ++; + + if (backgroundProcessCount >= processPeriod) { + backgroundProcessCount = 0; + + long now = System.currentTimeMillis(); + for (WsRemoteEndpointImplServer endpoint : endpoints) { + if (endpoint.getTimeoutExpiry() < now) { + // Background thread, not the thread that triggered the + // write so no need to use a dispatch + endpoint.onTimeout(false); + } else { + // Endpoints are ordered by timeout expiry so if this point + // is reached there is no need to check the remaining + // endpoints + break; + } + } + } + } + + + @Override + public void setProcessPeriod(int period) { + this.processPeriod = period; + } + + + /** + * {@inheritDoc} + * + * The default value is 1 which means asynchronous write timeouts are + * processed every 1 second. + */ + @Override + public int getProcessPeriod() { + return processPeriod; + } + + + public void register(WsRemoteEndpointImplServer endpoint) { + boolean result = endpoints.add(endpoint); + if (result) { + int newCount = count.incrementAndGet(); + if (newCount == 1) { + BackgroundProcessManager.getInstance().register(this); + } + } + } + + + public void unregister(WsRemoteEndpointImplServer endpoint) { + boolean result = endpoints.remove(endpoint); + if (result) { + int newCount = count.decrementAndGet(); + if (newCount == 0) { + BackgroundProcessManager.getInstance().unregister(this); + } + } + } + + + /** + * Note: this comparator imposes orderings that are inconsistent with equals + */ + private static class EndpointComparator implements + Comparator<WsRemoteEndpointImplServer> { + + @Override + public int compare(WsRemoteEndpointImplServer o1, + WsRemoteEndpointImplServer o2) { + + long t1 = o1.getTimeoutExpiry(); + long t2 = o2.getTimeoutExpiry(); + + if (t1 < t2) { + return -1; + } else if (t1 == t2) { + return 0; + } else { + return 1; + } + } + } +} diff --git a/src/java/nginx/unit/websocket/server/package-info.java b/src/java/nginx/unit/websocket/server/package-info.java new file mode 100644 index 00000000..87bc85a3 --- /dev/null +++ b/src/java/nginx/unit/websocket/server/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Server-side specific implementation classes. These are in a separate package + * to make packaging a pure client JAR simpler. + */ +package nginx.unit.websocket.server; |