summaryrefslogtreecommitdiffhomepage
path: root/src/java/nginx/unit/websocket/server
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--src/java/nginx/unit/websocket/server/Constants.java38
-rw-r--r--src/java/nginx/unit/websocket/server/DefaultServerEndpointConfigurator.java88
-rw-r--r--src/java/nginx/unit/websocket/server/LocalStrings.properties43
-rw-r--r--src/java/nginx/unit/websocket/server/UpgradeUtil.java285
-rw-r--r--src/java/nginx/unit/websocket/server/UriTemplate.java177
-rw-r--r--src/java/nginx/unit/websocket/server/WsContextListener.java51
-rw-r--r--src/java/nginx/unit/websocket/server/WsFilter.java81
-rw-r--r--src/java/nginx/unit/websocket/server/WsHandshakeRequest.java196
-rw-r--r--src/java/nginx/unit/websocket/server/WsHttpUpgradeHandler.java172
-rw-r--r--src/java/nginx/unit/websocket/server/WsMappingResult.java44
-rw-r--r--src/java/nginx/unit/websocket/server/WsPerSessionServerEndpointConfig.java84
-rw-r--r--src/java/nginx/unit/websocket/server/WsRemoteEndpointImplServer.java158
-rw-r--r--src/java/nginx/unit/websocket/server/WsSci.java145
-rw-r--r--src/java/nginx/unit/websocket/server/WsServerContainer.java470
-rw-r--r--src/java/nginx/unit/websocket/server/WsSessionListener.java36
-rw-r--r--src/java/nginx/unit/websocket/server/WsWriteTimeout.java128
-rw-r--r--src/java/nginx/unit/websocket/server/package-info.java21
17 files changed, 2217 insertions, 0 deletions
diff --git a/src/java/nginx/unit/websocket/server/Constants.java b/src/java/nginx/unit/websocket/server/Constants.java
new file mode 100644
index 00000000..5210c4ba
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/Constants.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nginx.unit.websocket.server;
+
+/**
+ * Internal implementation constants.
+ */
+public class Constants {
+
+ public static final String BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM =
+ "nginx.unit.websocket.binaryBufferSize";
+ public static final String TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM =
+ "nginx.unit.websocket.textBufferSize";
+ public static final String ENFORCE_NO_ADD_AFTER_HANDSHAKE_CONTEXT_INIT_PARAM =
+ "nginx.unit.websocket.noAddAfterHandshake";
+
+ public static final String SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE =
+ "javax.websocket.server.ServerContainer";
+
+
+ private Constants() {
+ // Hide default constructor
+ }
+}
diff --git a/src/java/nginx/unit/websocket/server/DefaultServerEndpointConfigurator.java b/src/java/nginx/unit/websocket/server/DefaultServerEndpointConfigurator.java
new file mode 100644
index 00000000..43ffe2bc
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/DefaultServerEndpointConfigurator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nginx.unit.websocket.server;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.websocket.Extension;
+import javax.websocket.HandshakeResponse;
+import javax.websocket.server.HandshakeRequest;
+import javax.websocket.server.ServerEndpointConfig;
+
+public class DefaultServerEndpointConfigurator
+ extends ServerEndpointConfig.Configurator {
+
+ @Override
+ public <T> T getEndpointInstance(Class<T> clazz)
+ throws InstantiationException {
+ try {
+ return clazz.getConstructor().newInstance();
+ } catch (InstantiationException e) {
+ throw e;
+ } catch (ReflectiveOperationException e) {
+ InstantiationException ie = new InstantiationException();
+ ie.initCause(e);
+ throw ie;
+ }
+ }
+
+
+ @Override
+ public String getNegotiatedSubprotocol(List<String> supported,
+ List<String> requested) {
+
+ for (String request : requested) {
+ if (supported.contains(request)) {
+ return request;
+ }
+ }
+ return "";
+ }
+
+
+ @Override
+ public List<Extension> getNegotiatedExtensions(List<Extension> installed,
+ List<Extension> requested) {
+ Set<String> installedNames = new HashSet<>();
+ for (Extension e : installed) {
+ installedNames.add(e.getName());
+ }
+ List<Extension> result = new ArrayList<>();
+ for (Extension request : requested) {
+ if (installedNames.contains(request.getName())) {
+ result.add(request);
+ }
+ }
+ return result;
+ }
+
+
+ @Override
+ public boolean checkOrigin(String originHeaderValue) {
+ return true;
+ }
+
+ @Override
+ public void modifyHandshake(ServerEndpointConfig sec,
+ HandshakeRequest request, HandshakeResponse response) {
+ // NO-OP
+ }
+
+}
diff --git a/src/java/nginx/unit/websocket/server/LocalStrings.properties b/src/java/nginx/unit/websocket/server/LocalStrings.properties
new file mode 100644
index 00000000..5bc12501
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/LocalStrings.properties
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+serverContainer.addNotAllowed=No further Endpoints may be registered once an attempt has been made to use one of the previously registered endpoints
+serverContainer.configuratorFail=Failed to create configurator of type [{0}] for POJO of type [{1}]
+serverContainer.duplicatePaths=Multiple Endpoints may not be deployed to the same path [{0}] : existing endpoint was [{1}] and new endpoint is [{2}]
+serverContainer.encoderFail=Unable to create encoder of type [{0}]
+serverContainer.endpointDeploy=Endpoint class [{0}] deploying to path [{1}] in ServletContext [{2}]
+serverContainer.missingAnnotation=Cannot deploy POJO class [{0}] as it is not annotated with @ServerEndpoint
+serverContainer.missingEndpoint=An Endpoint instance has been request for path [{0}] but no matching Endpoint class was found
+serverContainer.pojoDeploy=POJO class [{0}] deploying to path [{1}] in ServletContext [{2}]
+serverContainer.servletContextMismatch=Attempted to register a POJO annotated for WebSocket at path [{0}] in the ServletContext with context path [{1}] when the WebSocket ServerContainer is allocated to the ServletContext with context path [{2}]
+serverContainer.servletContextMissing=No ServletContext was specified
+
+upgradeUtil.incompatibleRsv=Extensions were specified that have incompatible RSV bit usage
+
+uriTemplate.duplicateParameter=The parameter [{0}] appears more than once in the path which is not permitted
+uriTemplate.emptySegment=The path [{0}] contains one or more empty segments which are is not permitted
+uriTemplate.invalidPath=The path [{0}] is not valid.
+uriTemplate.invalidSegment=The segment [{0}] is not valid in the provided path [{1}]
+
+wsFrameServer.bytesRead=Read [{0}] bytes into input buffer ready for processing
+wsFrameServer.illegalReadState=Unexpected read state [{0}]
+wsFrameServer.onDataAvailable=Method entry
+
+wsHttpUpgradeHandler.closeOnError=Closing WebSocket connection due to an error
+wsHttpUpgradeHandler.destroyFailed=Failed to close WebConnection while destroying the WebSocket HttpUpgradeHandler
+wsHttpUpgradeHandler.noPreInit=The preInit() method must be called to configure the WebSocket HttpUpgradeHandler before the container calls init(). Usually, this means the Servlet that created the WsHttpUpgradeHandler instance should also call preInit()
+wsHttpUpgradeHandler.serverStop=The server is stopping
+
+wsRemoteEndpointServer.closeFailed=Failed to close the ServletOutputStream connection cleanly
diff --git a/src/java/nginx/unit/websocket/server/UpgradeUtil.java b/src/java/nginx/unit/websocket/server/UpgradeUtil.java
new file mode 100644
index 00000000..162f01c7
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/UpgradeUtil.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nginx.unit.websocket.server;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.websocket.Endpoint;
+import javax.websocket.Extension;
+import javax.websocket.HandshakeResponse;
+import javax.websocket.server.ServerEndpointConfig;
+
+import nginx.unit.Request;
+
+import org.apache.tomcat.util.codec.binary.Base64;
+import org.apache.tomcat.util.res.StringManager;
+import org.apache.tomcat.util.security.ConcurrentMessageDigest;
+import nginx.unit.websocket.Constants;
+import nginx.unit.websocket.Transformation;
+import nginx.unit.websocket.TransformationFactory;
+import nginx.unit.websocket.Util;
+import nginx.unit.websocket.WsHandshakeResponse;
+import nginx.unit.websocket.pojo.PojoEndpointServer;
+
+public class UpgradeUtil {
+
+ private static final StringManager sm =
+ StringManager.getManager(UpgradeUtil.class.getPackage().getName());
+ private static final byte[] WS_ACCEPT =
+ "258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(
+ StandardCharsets.ISO_8859_1);
+
+ private UpgradeUtil() {
+ // Utility class. Hide default constructor.
+ }
+
+ /**
+ * Checks to see if this is an HTTP request that includes a valid upgrade
+ * request to web socket.
+ * <p>
+ * Note: RFC 2616 does not limit HTTP upgrade to GET requests but the Java
+ * WebSocket spec 1.0, section 8.2 implies such a limitation and RFC
+ * 6455 section 4.1 requires that a WebSocket Upgrade uses GET.
+ * @param request The request to check if it is an HTTP upgrade request for
+ * a WebSocket connection
+ * @param response The response associated with the request
+ * @return <code>true</code> if the request includes a HTTP Upgrade request
+ * for the WebSocket protocol, otherwise <code>false</code>
+ */
+ public static boolean isWebSocketUpgradeRequest(ServletRequest request,
+ ServletResponse response) {
+
+ Request r = (Request) request.getAttribute(Request.BARE);
+
+ return ((request instanceof HttpServletRequest) &&
+ (response instanceof HttpServletResponse) &&
+ (r != null) &&
+ (r.isUpgrade()));
+ }
+
+
+ public static void doUpgrade(WsServerContainer sc, HttpServletRequest req,
+ HttpServletResponse resp, ServerEndpointConfig sec,
+ Map<String,String> pathParams)
+ throws ServletException, IOException {
+
+
+ // Origin check
+ String origin = req.getHeader(Constants.ORIGIN_HEADER_NAME);
+
+ if (!sec.getConfigurator().checkOrigin(origin)) {
+ resp.sendError(HttpServletResponse.SC_FORBIDDEN);
+ return;
+ }
+ // Sub-protocols
+ List<String> subProtocols = getTokensFromHeader(req,
+ Constants.WS_PROTOCOL_HEADER_NAME);
+ String subProtocol = sec.getConfigurator().getNegotiatedSubprotocol(
+ sec.getSubprotocols(), subProtocols);
+
+ // Extensions
+ // Should normally only be one header but handle the case of multiple
+ // headers
+ List<Extension> extensionsRequested = new ArrayList<>();
+ Enumeration<String> extHeaders = req.getHeaders(Constants.WS_EXTENSIONS_HEADER_NAME);
+ while (extHeaders.hasMoreElements()) {
+ Util.parseExtensionHeader(extensionsRequested, extHeaders.nextElement());
+ }
+
+ // Negotiation phase 1. By default this simply filters out the
+ // extensions that the server does not support but applications could
+ // use a custom configurator to do more than this.
+ List<Extension> installedExtensions = null;
+ if (sec.getExtensions().size() == 0) {
+ installedExtensions = Constants.INSTALLED_EXTENSIONS;
+ } else {
+ installedExtensions = new ArrayList<>();
+ installedExtensions.addAll(sec.getExtensions());
+ installedExtensions.addAll(Constants.INSTALLED_EXTENSIONS);
+ }
+ List<Extension> negotiatedExtensionsPhase1 = sec.getConfigurator().getNegotiatedExtensions(
+ installedExtensions, extensionsRequested);
+
+ // Negotiation phase 2. Create the Transformations that will be applied
+ // to this connection. Note than an extension may be dropped at this
+ // point if the client has requested a configuration that the server is
+ // unable to support.
+ List<Transformation> transformations = createTransformations(negotiatedExtensionsPhase1);
+
+ List<Extension> negotiatedExtensionsPhase2;
+ if (transformations.isEmpty()) {
+ negotiatedExtensionsPhase2 = Collections.emptyList();
+ } else {
+ negotiatedExtensionsPhase2 = new ArrayList<>(transformations.size());
+ for (Transformation t : transformations) {
+ negotiatedExtensionsPhase2.add(t.getExtensionResponse());
+ }
+ }
+
+ WsHttpUpgradeHandler wsHandler =
+ req.upgrade(WsHttpUpgradeHandler.class);
+
+ WsHandshakeRequest wsRequest = new WsHandshakeRequest(req, pathParams);
+ WsHandshakeResponse wsResponse = new WsHandshakeResponse();
+ WsPerSessionServerEndpointConfig perSessionServerEndpointConfig =
+ new WsPerSessionServerEndpointConfig(sec);
+ sec.getConfigurator().modifyHandshake(perSessionServerEndpointConfig,
+ wsRequest, wsResponse);
+ //wsRequest.finished();
+
+ // Add any additional headers
+ for (Entry<String,List<String>> entry :
+ wsResponse.getHeaders().entrySet()) {
+ for (String headerValue: entry.getValue()) {
+ resp.addHeader(entry.getKey(), headerValue);
+ }
+ }
+
+ Endpoint ep;
+ try {
+ Class<?> clazz = sec.getEndpointClass();
+ if (Endpoint.class.isAssignableFrom(clazz)) {
+ ep = (Endpoint) sec.getConfigurator().getEndpointInstance(
+ clazz);
+ } else {
+ ep = new PojoEndpointServer();
+ // Need to make path params available to POJO
+ perSessionServerEndpointConfig.getUserProperties().put(
+ nginx.unit.websocket.pojo.Constants.POJO_PATH_PARAM_KEY, pathParams);
+ }
+ } catch (InstantiationException e) {
+ throw new ServletException(e);
+ }
+
+ wsHandler.preInit(ep, perSessionServerEndpointConfig, sc, wsRequest,
+ negotiatedExtensionsPhase2, subProtocol, null, pathParams,
+ req.isSecure());
+
+ wsHandler.init(null);
+ }
+
+
+ private static List<Transformation> createTransformations(
+ List<Extension> negotiatedExtensions) {
+
+ TransformationFactory factory = TransformationFactory.getInstance();
+
+ LinkedHashMap<String,List<List<Extension.Parameter>>> extensionPreferences =
+ new LinkedHashMap<>();
+
+ // Result will likely be smaller than this
+ List<Transformation> result = new ArrayList<>(negotiatedExtensions.size());
+
+ for (Extension extension : negotiatedExtensions) {
+ List<List<Extension.Parameter>> preferences =
+ extensionPreferences.get(extension.getName());
+
+ if (preferences == null) {
+ preferences = new ArrayList<>();
+ extensionPreferences.put(extension.getName(), preferences);
+ }
+
+ preferences.add(extension.getParameters());
+ }
+
+ for (Map.Entry<String,List<List<Extension.Parameter>>> entry :
+ extensionPreferences.entrySet()) {
+ Transformation transformation = factory.create(entry.getKey(), entry.getValue(), true);
+ if (transformation != null) {
+ result.add(transformation);
+ }
+ }
+ return result;
+ }
+
+
+ private static void append(StringBuilder sb, Extension extension) {
+ if (extension == null || extension.getName() == null || extension.getName().length() == 0) {
+ return;
+ }
+
+ sb.append(extension.getName());
+
+ for (Extension.Parameter p : extension.getParameters()) {
+ sb.append(';');
+ sb.append(p.getName());
+ if (p.getValue() != null) {
+ sb.append('=');
+ sb.append(p.getValue());
+ }
+ }
+ }
+
+
+ /*
+ * This only works for tokens. Quoted strings need more sophisticated
+ * parsing.
+ */
+ private static boolean headerContainsToken(HttpServletRequest req,
+ String headerName, String target) {
+ Enumeration<String> headers = req.getHeaders(headerName);
+ while (headers.hasMoreElements()) {
+ String header = headers.nextElement();
+ String[] tokens = header.split(",");
+ for (String token : tokens) {
+ if (target.equalsIgnoreCase(token.trim())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+
+ /*
+ * This only works for tokens. Quoted strings need more sophisticated
+ * parsing.
+ */
+ private static List<String> getTokensFromHeader(HttpServletRequest req,
+ String headerName) {
+ List<String> result = new ArrayList<>();
+ Enumeration<String> headers = req.getHeaders(headerName);
+ while (headers.hasMoreElements()) {
+ String header = headers.nextElement();
+ String[] tokens = header.split(",");
+ for (String token : tokens) {
+ result.add(token.trim());
+ }
+ }
+ return result;
+ }
+
+
+ private static String getWebSocketAccept(String key) {
+ byte[] digest = ConcurrentMessageDigest.digestSHA1(
+ key.getBytes(StandardCharsets.ISO_8859_1), WS_ACCEPT);
+ return Base64.encodeBase64String(digest);
+ }
+}
diff --git a/src/java/nginx/unit/websocket/server/UriTemplate.java b/src/java/nginx/unit/websocket/server/UriTemplate.java
new file mode 100644
index 00000000..7877fac9
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/UriTemplate.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nginx.unit.websocket.server;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.websocket.DeploymentException;
+
+import org.apache.tomcat.util.res.StringManager;
+
+/**
+ * Extracts path parameters from URIs used to create web socket connections
+ * using the URI template defined for the associated Endpoint.
+ */
+public class UriTemplate {
+
+ private static final StringManager sm = StringManager.getManager(UriTemplate.class);
+
+ private final String normalized;
+ private final List<Segment> segments = new ArrayList<>();
+ private final boolean hasParameters;
+
+
+ public UriTemplate(String path) throws DeploymentException {
+
+ if (path == null || path.length() ==0 || !path.startsWith("/")) {
+ throw new DeploymentException(
+ sm.getString("uriTemplate.invalidPath", path));
+ }
+
+ StringBuilder normalized = new StringBuilder(path.length());
+ Set<String> paramNames = new HashSet<>();
+
+ // Include empty segments.
+ String[] segments = path.split("/", -1);
+ int paramCount = 0;
+ int segmentCount = 0;
+
+ for (int i = 0; i < segments.length; i++) {
+ String segment = segments[i];
+ if (segment.length() == 0) {
+ if (i == 0 || (i == segments.length - 1 && paramCount == 0)) {
+ // Ignore the first empty segment as the path must always
+ // start with '/'
+ // Ending with a '/' is also OK for instances used for
+ // matches but not for parameterised templates.
+ continue;
+ } else {
+ // As per EG discussion, all other empty segments are
+ // invalid
+ throw new IllegalArgumentException(sm.getString(
+ "uriTemplate.emptySegment", path));
+ }
+ }
+ normalized.append('/');
+ int index = -1;
+ if (segment.startsWith("{") && segment.endsWith("}")) {
+ index = segmentCount;
+ segment = segment.substring(1, segment.length() - 1);
+ normalized.append('{');
+ normalized.append(paramCount++);
+ normalized.append('}');
+ if (!paramNames.add(segment)) {
+ throw new IllegalArgumentException(sm.getString(
+ "uriTemplate.duplicateParameter", segment));
+ }
+ } else {
+ if (segment.contains("{") || segment.contains("}")) {
+ throw new IllegalArgumentException(sm.getString(
+ "uriTemplate.invalidSegment", segment, path));
+ }
+ normalized.append(segment);
+ }
+ this.segments.add(new Segment(index, segment));
+ segmentCount++;
+ }
+
+ this.normalized = normalized.toString();
+ this.hasParameters = paramCount > 0;
+ }
+
+
+ public Map<String,String> match(UriTemplate candidate) {
+
+ Map<String,String> result = new HashMap<>();
+
+ // Should not happen but for safety
+ if (candidate.getSegmentCount() != getSegmentCount()) {
+ return null;
+ }
+
+ Iterator<Segment> candidateSegments =
+ candidate.getSegments().iterator();
+ Iterator<Segment> targetSegments = segments.iterator();
+
+ while (candidateSegments.hasNext()) {
+ Segment candidateSegment = candidateSegments.next();
+ Segment targetSegment = targetSegments.next();
+
+ if (targetSegment.getParameterIndex() == -1) {
+ // Not a parameter - values must match
+ if (!targetSegment.getValue().equals(
+ candidateSegment.getValue())) {
+ // Not a match. Stop here
+ return null;
+ }
+ } else {
+ // Parameter
+ result.put(targetSegment.getValue(),
+ candidateSegment.getValue());
+ }
+ }
+
+ return result;
+ }
+
+
+ public boolean hasParameters() {
+ return hasParameters;
+ }
+
+
+ public int getSegmentCount() {
+ return segments.size();
+ }
+
+
+ public String getNormalizedPath() {
+ return normalized;
+ }
+
+
+ private List<Segment> getSegments() {
+ return segments;
+ }
+
+
+ private static class Segment {
+ private final int parameterIndex;
+ private final String value;
+
+ public Segment(int parameterIndex, String value) {
+ this.parameterIndex = parameterIndex;
+ this.value = value;
+ }
+
+
+ public int getParameterIndex() {
+ return parameterIndex;
+ }
+
+
+ public String getValue() {
+ return value;
+ }
+ }
+}
diff --git a/src/java/nginx/unit/websocket/server/WsContextListener.java b/src/java/nginx/unit/websocket/server/WsContextListener.java
new file mode 100644
index 00000000..07137856
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/WsContextListener.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nginx.unit.websocket.server;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
+
+/**
+ * In normal usage, this {@link ServletContextListener} does not need to be
+ * explicitly configured as the {@link WsSci} performs all the necessary
+ * bootstrap and installs this listener in the {@link ServletContext}. If the
+ * {@link WsSci} is disabled, this listener must be added manually to every
+ * {@link ServletContext} that uses WebSocket to bootstrap the
+ * {@link WsServerContainer} correctly.
+ */
+public class WsContextListener implements ServletContextListener {
+
+ @Override
+ public void contextInitialized(ServletContextEvent sce) {
+ ServletContext sc = sce.getServletContext();
+ // Don't trigger WebSocket initialization if a WebSocket Server
+ // Container is already present
+ if (sc.getAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE) == null) {
+ WsSci.init(sce.getServletContext(), false);
+ }
+ }
+
+ @Override
+ public void contextDestroyed(ServletContextEvent sce) {
+ ServletContext sc = sce.getServletContext();
+ Object obj = sc.getAttribute(Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE);
+ if (obj instanceof WsServerContainer) {
+ ((WsServerContainer) obj).destroy();
+ }
+ }
+}
diff --git a/src/java/nginx/unit/websocket/server/WsFilter.java b/src/java/nginx/unit/websocket/server/WsFilter.java
new file mode 100644
index 00000000..abea71fc
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/WsFilter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nginx.unit.websocket.server;
+
+import java.io.IOException;
+
+import javax.servlet.FilterChain;
+import javax.servlet.GenericFilter;
+import javax.servlet.ServletException;
+import javax.servlet.ServletRequest;
+import javax.servlet.ServletResponse;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * Handles the initial HTTP connection for WebSocket connections.
+ */
+public class WsFilter extends GenericFilter {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient WsServerContainer sc;
+
+
+ @Override
+ public void init() throws ServletException {
+ sc = (WsServerContainer) getServletContext().getAttribute(
+ Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE);
+ }
+
+
+ @Override
+ public void doFilter(ServletRequest request, ServletResponse response,
+ FilterChain chain) throws IOException, ServletException {
+
+ // This filter only needs to handle WebSocket upgrade requests
+ if (!sc.areEndpointsRegistered() ||
+ !UpgradeUtil.isWebSocketUpgradeRequest(request, response)) {
+ chain.doFilter(request, response);
+ return;
+ }
+
+ // HTTP request with an upgrade header for WebSocket present
+ HttpServletRequest req = (HttpServletRequest) request;
+ HttpServletResponse resp = (HttpServletResponse) response;
+
+ // Check to see if this WebSocket implementation has a matching mapping
+ String path;
+ String pathInfo = req.getPathInfo();
+ if (pathInfo == null) {
+ path = req.getServletPath();
+ } else {
+ path = req.getServletPath() + pathInfo;
+ }
+ WsMappingResult mappingResult = sc.findMapping(path);
+
+ if (mappingResult == null) {
+ // No endpoint registered for the requested path. Let the
+ // application handle it (it might redirect or forward for example)
+ chain.doFilter(request, response);
+ return;
+ }
+
+ UpgradeUtil.doUpgrade(sc, req, resp, mappingResult.getConfig(),
+ mappingResult.getPathParams());
+ }
+}
diff --git a/src/java/nginx/unit/websocket/server/WsHandshakeRequest.java b/src/java/nginx/unit/websocket/server/WsHandshakeRequest.java
new file mode 100644
index 00000000..fa774302
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/WsHandshakeRequest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nginx.unit.websocket.server;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.Principal;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.websocket.server.HandshakeRequest;
+
+import org.apache.tomcat.util.collections.CaseInsensitiveKeyMap;
+import org.apache.tomcat.util.res.StringManager;
+
+/**
+ * Represents the request that this session was opened under.
+ */
+public class WsHandshakeRequest implements HandshakeRequest {
+
+ private static final StringManager sm = StringManager.getManager(WsHandshakeRequest.class);
+
+ private final URI requestUri;
+ private final Map<String,List<String>> parameterMap;
+ private final String queryString;
+ private final Principal userPrincipal;
+ private final Map<String,List<String>> headers;
+ private final Object httpSession;
+
+ private volatile HttpServletRequest request;
+
+
+ public WsHandshakeRequest(HttpServletRequest request, Map<String,String> pathParams) {
+
+ this.request = request;
+
+ queryString = request.getQueryString();
+ userPrincipal = request.getUserPrincipal();
+ httpSession = request.getSession(false);
+ requestUri = buildRequestUri(request);
+
+ // ParameterMap
+ Map<String,String[]> originalParameters = request.getParameterMap();
+ Map<String,List<String>> newParameters =
+ new HashMap<>(originalParameters.size());
+ for (Entry<String,String[]> entry : originalParameters.entrySet()) {
+ newParameters.put(entry.getKey(),
+ Collections.unmodifiableList(
+ Arrays.asList(entry.getValue())));
+ }
+ for (Entry<String,String> entry : pathParams.entrySet()) {
+ newParameters.put(entry.getKey(),
+ Collections.unmodifiableList(
+ Collections.singletonList(entry.getValue())));
+ }
+ parameterMap = Collections.unmodifiableMap(newParameters);
+
+ // Headers
+ Map<String,List<String>> newHeaders = new CaseInsensitiveKeyMap<>();
+
+ Enumeration<String> headerNames = request.getHeaderNames();
+ while (headerNames.hasMoreElements()) {
+ String headerName = headerNames.nextElement();
+
+ newHeaders.put(headerName, Collections.unmodifiableList(
+ Collections.list(request.getHeaders(headerName))));
+ }
+
+ headers = Collections.unmodifiableMap(newHeaders);
+ }
+
+ @Override
+ public URI getRequestURI() {
+ return requestUri;
+ }
+
+ @Override
+ public Map<String,List<String>> getParameterMap() {
+ return parameterMap;
+ }
+
+ @Override
+ public String getQueryString() {
+ return queryString;
+ }
+
+ @Override
+ public Principal getUserPrincipal() {
+ return userPrincipal;
+ }
+
+ @Override
+ public Map<String,List<String>> getHeaders() {
+ return headers;
+ }
+
+ @Override
+ public boolean isUserInRole(String role) {
+ if (request == null) {
+ throw new IllegalStateException();
+ }
+
+ return request.isUserInRole(role);
+ }
+
+ @Override
+ public Object getHttpSession() {
+ return httpSession;
+ }
+
+ /**
+ * Called when the HandshakeRequest is no longer required. Since an instance
+ * of this class retains a reference to the current HttpServletRequest that
+ * reference needs to be cleared as the HttpServletRequest may be reused.
+ *
+ * There is no reason for instances of this class to be accessed once the
+ * handshake has been completed.
+ */
+ void finished() {
+ request = null;
+ }
+
+
+ /*
+ * See RequestUtil.getRequestURL()
+ */
+ private static URI buildRequestUri(HttpServletRequest req) {
+
+ StringBuffer uri = new StringBuffer();
+ String scheme = req.getScheme();
+ int port = req.getServerPort();
+ if (port < 0) {
+ // Work around java.net.URL bug
+ port = 80;
+ }
+
+ if ("http".equals(scheme)) {
+ uri.append("ws");
+ } else if ("https".equals(scheme)) {
+ uri.append("wss");
+ } else {
+ // Should never happen
+ throw new IllegalArgumentException(
+ sm.getString("wsHandshakeRequest.unknownScheme", scheme));
+ }
+
+ uri.append("://");
+ uri.append(req.getServerName());
+
+ if ((scheme.equals("http") && (port != 80))
+ || (scheme.equals("https") && (port != 443))) {
+ uri.append(':');
+ uri.append(port);
+ }
+
+ uri.append(req.getRequestURI());
+
+ if (req.getQueryString() != null) {
+ uri.append("?");
+ uri.append(req.getQueryString());
+ }
+
+ try {
+ return new URI(uri.toString());
+ } catch (URISyntaxException e) {
+ // Should never happen
+ throw new IllegalArgumentException(
+ sm.getString("wsHandshakeRequest.invalidUri", uri.toString()), e);
+ }
+ }
+
+ public Object getAttribute(String name)
+ {
+ return request != null ? request.getAttribute(name) : null;
+ }
+}
diff --git a/src/java/nginx/unit/websocket/server/WsHttpUpgradeHandler.java b/src/java/nginx/unit/websocket/server/WsHttpUpgradeHandler.java
new file mode 100644
index 00000000..cc39ab73
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/WsHttpUpgradeHandler.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nginx.unit.websocket.server;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import javax.servlet.http.HttpSession;
+import javax.servlet.http.HttpUpgradeHandler;
+import javax.servlet.http.WebConnection;
+import javax.websocket.CloseReason;
+import javax.websocket.CloseReason.CloseCodes;
+import javax.websocket.DeploymentException;
+import javax.websocket.Endpoint;
+import javax.websocket.EndpointConfig;
+import javax.websocket.Extension;
+
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.res.StringManager;
+
+import nginx.unit.websocket.Transformation;
+import nginx.unit.websocket.WsIOException;
+import nginx.unit.websocket.WsSession;
+
+import nginx.unit.Request;
+
+/**
+ * Servlet 3.1 HTTP upgrade handler for WebSocket connections.
+ */
+public class WsHttpUpgradeHandler implements HttpUpgradeHandler {
+
+ private final Log log = LogFactory.getLog(WsHttpUpgradeHandler.class); // must not be static
+ private static final StringManager sm = StringManager.getManager(WsHttpUpgradeHandler.class);
+
+ private final ClassLoader applicationClassLoader;
+
+ private Endpoint ep;
+ private EndpointConfig endpointConfig;
+ private WsServerContainer webSocketContainer;
+ private WsHandshakeRequest handshakeRequest;
+ private List<Extension> negotiatedExtensions;
+ private String subProtocol;
+ private Transformation transformation;
+ private Map<String,String> pathParameters;
+ private boolean secure;
+ private WebConnection connection;
+ private WsRemoteEndpointImplServer wsRemoteEndpointServer;
+ private WsSession wsSession;
+
+
+ public WsHttpUpgradeHandler() {
+ applicationClassLoader = Thread.currentThread().getContextClassLoader();
+ }
+
+ public void preInit(Endpoint ep, EndpointConfig endpointConfig,
+ WsServerContainer wsc, WsHandshakeRequest handshakeRequest,
+ List<Extension> negotiatedExtensionsPhase2, String subProtocol,
+ Transformation transformation, Map<String,String> pathParameters,
+ boolean secure) {
+ this.ep = ep;
+ this.endpointConfig = endpointConfig;
+ this.webSocketContainer = wsc;
+ this.handshakeRequest = handshakeRequest;
+ this.negotiatedExtensions = negotiatedExtensionsPhase2;
+ this.subProtocol = subProtocol;
+ this.transformation = transformation;
+ this.pathParameters = pathParameters;
+ this.secure = secure;
+ }
+
+
+ @Override
+ public void init(WebConnection connection) {
+ if (ep == null) {
+ throw new IllegalStateException(
+ sm.getString("wsHttpUpgradeHandler.noPreInit"));
+ }
+
+ String httpSessionId = null;
+ Object session = handshakeRequest.getHttpSession();
+ if (session != null ) {
+ httpSessionId = ((HttpSession) session).getId();
+ }
+
+ nginx.unit.Context.trace("UpgradeHandler.init(" + connection + ")");
+
+/*
+ // Need to call onOpen using the web application's class loader
+ // Create the frame using the application's class loader so it can pick
+ // up application specific config from the ServerContainerImpl
+ Thread t = Thread.currentThread();
+ ClassLoader cl = t.getContextClassLoader();
+ t.setContextClassLoader(applicationClassLoader);
+*/
+ try {
+ Request r = (Request) handshakeRequest.getAttribute(Request.BARE);
+
+ wsRemoteEndpointServer = new WsRemoteEndpointImplServer(webSocketContainer);
+ wsSession = new WsSession(ep, wsRemoteEndpointServer,
+ webSocketContainer, handshakeRequest.getRequestURI(),
+ handshakeRequest.getParameterMap(),
+ handshakeRequest.getQueryString(),
+ handshakeRequest.getUserPrincipal(), httpSessionId,
+ negotiatedExtensions, subProtocol, pathParameters, secure,
+ endpointConfig, r);
+
+ ep.onOpen(wsSession, endpointConfig);
+ webSocketContainer.registerSession(ep, wsSession);
+ } catch (DeploymentException e) {
+ throw new IllegalArgumentException(e);
+/*
+ } finally {
+ t.setContextClassLoader(cl);
+*/
+ }
+ }
+
+
+
+ @Override
+ public void destroy() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (Exception e) {
+ log.error(sm.getString("wsHttpUpgradeHandler.destroyFailed"), e);
+ }
+ }
+ }
+
+
+ private void onError(Throwable throwable) {
+ // Need to call onError using the web application's class loader
+ Thread t = Thread.currentThread();
+ ClassLoader cl = t.getContextClassLoader();
+ t.setContextClassLoader(applicationClassLoader);
+ try {
+ ep.onError(wsSession, throwable);
+ } finally {
+ t.setContextClassLoader(cl);
+ }
+ }
+
+
+ private void close(CloseReason cr) {
+ /*
+ * Any call to this method is a result of a problem reading from the
+ * client. At this point that state of the connection is unknown.
+ * Attempt to send a close frame to the client and then close the socket
+ * immediately. There is no point in waiting for a close frame from the
+ * client because there is no guarantee that we can recover from
+ * whatever messed up state the client put the connection into.
+ */
+ wsSession.onClose(cr);
+ }
+}
diff --git a/src/java/nginx/unit/websocket/server/WsMappingResult.java b/src/java/nginx/unit/websocket/server/WsMappingResult.java
new file mode 100644
index 00000000..a7a4c022
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/WsMappingResult.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nginx.unit.websocket.server;
+
+import java.util.Map;
+
+import javax.websocket.server.ServerEndpointConfig;
+
+class WsMappingResult {
+
+ private final ServerEndpointConfig config;
+ private final Map<String,String> pathParams;
+
+
+ WsMappingResult(ServerEndpointConfig config,
+ Map<String,String> pathParams) {
+ this.config = config;
+ this.pathParams = pathParams;
+ }
+
+
+ ServerEndpointConfig getConfig() {
+ return config;
+ }
+
+
+ Map<String,String> getPathParams() {
+ return pathParams;
+ }
+}
diff --git a/src/java/nginx/unit/websocket/server/WsPerSessionServerEndpointConfig.java b/src/java/nginx/unit/websocket/server/WsPerSessionServerEndpointConfig.java
new file mode 100644
index 00000000..2be050cb
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/WsPerSessionServerEndpointConfig.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nginx.unit.websocket.server;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.websocket.Decoder;
+import javax.websocket.Encoder;
+import javax.websocket.Extension;
+import javax.websocket.server.ServerEndpointConfig;
+
+/**
+ * Wraps the provided {@link ServerEndpointConfig} and provides a per session
+ * view - the difference being that the map returned by {@link
+ * #getUserProperties()} is unique to this instance rather than shared with the
+ * wrapped {@link ServerEndpointConfig}.
+ */
+class WsPerSessionServerEndpointConfig implements ServerEndpointConfig {
+
+ private final ServerEndpointConfig perEndpointConfig;
+ private final Map<String,Object> perSessionUserProperties =
+ new ConcurrentHashMap<>();
+
+ WsPerSessionServerEndpointConfig(ServerEndpointConfig perEndpointConfig) {
+ this.perEndpointConfig = perEndpointConfig;
+ perSessionUserProperties.putAll(perEndpointConfig.getUserProperties());
+ }
+
+ @Override
+ public List<Class<? extends Encoder>> getEncoders() {
+ return perEndpointConfig.getEncoders();
+ }
+
+ @Override
+ public List<Class<? extends Decoder>> getDecoders() {
+ return perEndpointConfig.getDecoders();
+ }
+
+ @Override
+ public Map<String,Object> getUserProperties() {
+ return perSessionUserProperties;
+ }
+
+ @Override
+ public Class<?> getEndpointClass() {
+ return perEndpointConfig.getEndpointClass();
+ }
+
+ @Override
+ public String getPath() {
+ return perEndpointConfig.getPath();
+ }
+
+ @Override
+ public List<String> getSubprotocols() {
+ return perEndpointConfig.getSubprotocols();
+ }
+
+ @Override
+ public List<Extension> getExtensions() {
+ return perEndpointConfig.getExtensions();
+ }
+
+ @Override
+ public Configurator getConfigurator() {
+ return perEndpointConfig.getConfigurator();
+ }
+}
diff --git a/src/java/nginx/unit/websocket/server/WsRemoteEndpointImplServer.java b/src/java/nginx/unit/websocket/server/WsRemoteEndpointImplServer.java
new file mode 100644
index 00000000..6d10a3be
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/WsRemoteEndpointImplServer.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nginx.unit.websocket.server;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.CompletionHandler;
+import java.nio.channels.InterruptedByTimeoutException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import javax.websocket.SendHandler;
+import javax.websocket.SendResult;
+
+import org.apache.juli.logging.Log;
+import org.apache.juli.logging.LogFactory;
+import org.apache.tomcat.util.res.StringManager;
+import nginx.unit.websocket.Transformation;
+import nginx.unit.websocket.WsRemoteEndpointImplBase;
+
+/**
+ * This is the server side {@link javax.websocket.RemoteEndpoint} implementation
+ * - i.e. what the server uses to send data to the client.
+ */
+public class WsRemoteEndpointImplServer extends WsRemoteEndpointImplBase {
+
+ private static final StringManager sm =
+ StringManager.getManager(WsRemoteEndpointImplServer.class);
+ private final Log log = LogFactory.getLog(WsRemoteEndpointImplServer.class); // must not be static
+
+ private volatile SendHandler handler = null;
+ private volatile ByteBuffer[] buffers = null;
+
+ private volatile long timeoutExpiry = -1;
+ private volatile boolean close;
+
+ public WsRemoteEndpointImplServer(
+ WsServerContainer serverContainer) {
+ }
+
+
+ @Override
+ protected final boolean isMasked() {
+ return false;
+ }
+
+ @Override
+ protected void doWrite(SendHandler handler, long blockingWriteTimeoutExpiry,
+ ByteBuffer... buffers) {
+ }
+
+ @Override
+ protected void doClose() {
+ if (handler != null) {
+ // close() can be triggered by a wide range of scenarios. It is far
+ // simpler just to always use a dispatch than it is to try and track
+ // whether or not this method was called by the same thread that
+ // triggered the write
+ clearHandler(new EOFException(), true);
+ }
+ }
+
+
+ protected long getTimeoutExpiry() {
+ return timeoutExpiry;
+ }
+
+
+ /*
+ * Currently this is only called from the background thread so we could just
+ * call clearHandler() with useDispatch == false but the method parameter
+ * was added in case other callers started to use this method to make sure
+ * that those callers think through what the correct value of useDispatch is
+ * for them.
+ */
+ protected void onTimeout(boolean useDispatch) {
+ if (handler != null) {
+ clearHandler(new SocketTimeoutException(), useDispatch);
+ }
+ close();
+ }
+
+
+ @Override
+ protected void setTransformation(Transformation transformation) {
+ // Overridden purely so it is visible to other classes in this package
+ super.setTransformation(transformation);
+ }
+
+
+ /**
+ *
+ * @param t The throwable associated with any error that
+ * occurred
+ * @param useDispatch Should {@link SendHandler#onResult(SendResult)} be
+ * called from a new thread, keeping in mind the
+ * requirements of
+ * {@link javax.websocket.RemoteEndpoint.Async}
+ */
+ private void clearHandler(Throwable t, boolean useDispatch) {
+ // Setting the result marks this (partial) message as
+ // complete which means the next one may be sent which
+ // could update the value of the handler. Therefore, keep a
+ // local copy before signalling the end of the (partial)
+ // message.
+ SendHandler sh = handler;
+ handler = null;
+ buffers = null;
+ if (sh != null) {
+ if (useDispatch) {
+ OnResultRunnable r = new OnResultRunnable(sh, t);
+ } else {
+ if (t == null) {
+ sh.onResult(new SendResult());
+ } else {
+ sh.onResult(new SendResult(t));
+ }
+ }
+ }
+ }
+
+
+ private static class OnResultRunnable implements Runnable {
+
+ private final SendHandler sh;
+ private final Throwable t;
+
+ private OnResultRunnable(SendHandler sh, Throwable t) {
+ this.sh = sh;
+ this.t = t;
+ }
+
+ @Override
+ public void run() {
+ if (t == null) {
+ sh.onResult(new SendResult());
+ } else {
+ sh.onResult(new SendResult(t));
+ }
+ }
+ }
+}
diff --git a/src/java/nginx/unit/websocket/server/WsSci.java b/src/java/nginx/unit/websocket/server/WsSci.java
new file mode 100644
index 00000000..cdecce27
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/WsSci.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nginx.unit.websocket.server;
+
+import java.lang.reflect.Modifier;
+import java.util.HashSet;
+import java.util.Set;
+
+import javax.servlet.ServletContainerInitializer;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.annotation.HandlesTypes;
+import javax.websocket.ContainerProvider;
+import javax.websocket.DeploymentException;
+import javax.websocket.Endpoint;
+import javax.websocket.server.ServerApplicationConfig;
+import javax.websocket.server.ServerEndpoint;
+import javax.websocket.server.ServerEndpointConfig;
+
+/**
+ * Registers an interest in any class that is annotated with
+ * {@link ServerEndpoint} so that Endpoint can be published via the WebSocket
+ * server.
+ */
+@HandlesTypes({ServerEndpoint.class, ServerApplicationConfig.class,
+ Endpoint.class})
+public class WsSci implements ServletContainerInitializer {
+
+ @Override
+ public void onStartup(Set<Class<?>> clazzes, ServletContext ctx)
+ throws ServletException {
+
+ WsServerContainer sc = init(ctx, true);
+
+ if (clazzes == null || clazzes.size() == 0) {
+ return;
+ }
+
+ // Group the discovered classes by type
+ Set<ServerApplicationConfig> serverApplicationConfigs = new HashSet<>();
+ Set<Class<? extends Endpoint>> scannedEndpointClazzes = new HashSet<>();
+ Set<Class<?>> scannedPojoEndpoints = new HashSet<>();
+
+ try {
+ // wsPackage is "javax.websocket."
+ String wsPackage = ContainerProvider.class.getName();
+ wsPackage = wsPackage.substring(0, wsPackage.lastIndexOf('.') + 1);
+ for (Class<?> clazz : clazzes) {
+ int modifiers = clazz.getModifiers();
+ if (!Modifier.isPublic(modifiers) ||
+ Modifier.isAbstract(modifiers)) {
+ // Non-public or abstract - skip it.
+ continue;
+ }
+ // Protect against scanning the WebSocket API JARs
+ if (clazz.getName().startsWith(wsPackage)) {
+ continue;
+ }
+ if (ServerApplicationConfig.class.isAssignableFrom(clazz)) {
+ serverApplicationConfigs.add(
+ (ServerApplicationConfig) clazz.getConstructor().newInstance());
+ }
+ if (Endpoint.class.isAssignableFrom(clazz)) {
+ @SuppressWarnings("unchecked")
+ Class<? extends Endpoint> endpoint =
+ (Class<? extends Endpoint>) clazz;
+ scannedEndpointClazzes.add(endpoint);
+ }
+ if (clazz.isAnnotationPresent(ServerEndpoint.class)) {
+ scannedPojoEndpoints.add(clazz);
+ }
+ }
+ } catch (ReflectiveOperationException e) {
+ throw new ServletException(e);
+ }
+
+ // Filter the results
+ Set<ServerEndpointConfig> filteredEndpointConfigs = new HashSet<>();
+ Set<Class<?>> filteredPojoEndpoints = new HashSet<>();
+
+ if (serverApplicationConfigs.isEmpty()) {
+ filteredPojoEndpoints.addAll(scannedPojoEndpoints);
+ } else {
+ for (ServerApplicationConfig config : serverApplicationConfigs) {
+ Set<ServerEndpointConfig> configFilteredEndpoints =
+ config.getEndpointConfigs(scannedEndpointClazzes);
+ if (configFilteredEndpoints != null) {
+ filteredEndpointConfigs.addAll(configFilteredEndpoints);
+ }
+ Set<Class<?>> configFilteredPojos =
+ config.getAnnotatedEndpointClasses(
+ scannedPojoEndpoints);
+ if (configFilteredPojos != null) {
+ filteredPojoEndpoints.addAll(configFilteredPojos);
+ }
+ }
+ }
+
+ try {
+ // Deploy endpoints
+ for (ServerEndpointConfig config : filteredEndpointConfigs) {
+ sc.addEndpoint(config);
+ }
+ // Deploy POJOs
+ for (Class<?> clazz : filteredPojoEndpoints) {
+ sc.addEndpoint(clazz);
+ }
+ } catch (DeploymentException e) {
+ throw new ServletException(e);
+ }
+ }
+
+
+ static WsServerContainer init(ServletContext servletContext,
+ boolean initBySciMechanism) {
+
+ WsServerContainer sc = new WsServerContainer(servletContext);
+
+ servletContext.setAttribute(
+ Constants.SERVER_CONTAINER_SERVLET_CONTEXT_ATTRIBUTE, sc);
+
+ servletContext.addListener(new WsSessionListener(sc));
+ // Can't register the ContextListener again if the ContextListener is
+ // calling this method
+ if (initBySciMechanism) {
+ servletContext.addListener(new WsContextListener());
+ }
+
+ return sc;
+ }
+}
diff --git a/src/java/nginx/unit/websocket/server/WsServerContainer.java b/src/java/nginx/unit/websocket/server/WsServerContainer.java
new file mode 100644
index 00000000..069fc54f
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/WsServerContainer.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nginx.unit.websocket.server;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.servlet.DispatcherType;
+import javax.servlet.FilterRegistration;
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.websocket.CloseReason;
+import javax.websocket.CloseReason.CloseCodes;
+import javax.websocket.DeploymentException;
+import javax.websocket.Encoder;
+import javax.websocket.Endpoint;
+import javax.websocket.server.ServerContainer;
+import javax.websocket.server.ServerEndpoint;
+import javax.websocket.server.ServerEndpointConfig;
+import javax.websocket.server.ServerEndpointConfig.Configurator;
+
+import org.apache.tomcat.InstanceManager;
+import org.apache.tomcat.util.res.StringManager;
+import nginx.unit.websocket.WsSession;
+import nginx.unit.websocket.WsWebSocketContainer;
+import nginx.unit.websocket.pojo.PojoMethodMapping;
+
+/**
+ * Provides a per class loader (i.e. per web application) instance of a
+ * ServerContainer. Web application wide defaults may be configured by setting
+ * the following servlet context initialisation parameters to the desired
+ * values.
+ * <ul>
+ * <li>{@link Constants#BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM}</li>
+ * <li>{@link Constants#TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM}</li>
+ * </ul>
+ */
+public class WsServerContainer extends WsWebSocketContainer
+ implements ServerContainer {
+
+ private static final StringManager sm = StringManager.getManager(WsServerContainer.class);
+
+ private static final CloseReason AUTHENTICATED_HTTP_SESSION_CLOSED =
+ new CloseReason(CloseCodes.VIOLATED_POLICY,
+ "This connection was established under an authenticated " +
+ "HTTP session that has ended.");
+
+ private final ServletContext servletContext;
+ private final Map<String,ServerEndpointConfig> configExactMatchMap =
+ new ConcurrentHashMap<>();
+ private final Map<Integer,SortedSet<TemplatePathMatch>> configTemplateMatchMap =
+ new ConcurrentHashMap<>();
+ private volatile boolean enforceNoAddAfterHandshake =
+ nginx.unit.websocket.Constants.STRICT_SPEC_COMPLIANCE;
+ private volatile boolean addAllowed = true;
+ private final Map<String,Set<WsSession>> authenticatedSessions = new ConcurrentHashMap<>();
+ private volatile boolean endpointsRegistered = false;
+
+ WsServerContainer(ServletContext servletContext) {
+
+ this.servletContext = servletContext;
+ setInstanceManager((InstanceManager) servletContext.getAttribute(InstanceManager.class.getName()));
+
+ // Configure servlet context wide defaults
+ String value = servletContext.getInitParameter(
+ Constants.BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM);
+ if (value != null) {
+ setDefaultMaxBinaryMessageBufferSize(Integer.parseInt(value));
+ }
+
+ value = servletContext.getInitParameter(
+ Constants.TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM);
+ if (value != null) {
+ setDefaultMaxTextMessageBufferSize(Integer.parseInt(value));
+ }
+
+ value = servletContext.getInitParameter(
+ Constants.ENFORCE_NO_ADD_AFTER_HANDSHAKE_CONTEXT_INIT_PARAM);
+ if (value != null) {
+ setEnforceNoAddAfterHandshake(Boolean.parseBoolean(value));
+ }
+
+ FilterRegistration.Dynamic fr = servletContext.addFilter(
+ "Tomcat WebSocket (JSR356) Filter", new WsFilter());
+ fr.setAsyncSupported(true);
+
+ EnumSet<DispatcherType> types = EnumSet.of(DispatcherType.REQUEST,
+ DispatcherType.FORWARD);
+
+ fr.addMappingForUrlPatterns(types, true, "/*");
+ }
+
+
+ /**
+ * Published the provided endpoint implementation at the specified path with
+ * the specified configuration. {@link #WsServerContainer(ServletContext)}
+ * must be called before calling this method.
+ *
+ * @param sec The configuration to use when creating endpoint instances
+ * @throws DeploymentException if the endpoint cannot be published as
+ * requested
+ */
+ @Override
+ public void addEndpoint(ServerEndpointConfig sec)
+ throws DeploymentException {
+
+ if (enforceNoAddAfterHandshake && !addAllowed) {
+ throw new DeploymentException(
+ sm.getString("serverContainer.addNotAllowed"));
+ }
+
+ if (servletContext == null) {
+ throw new DeploymentException(
+ sm.getString("serverContainer.servletContextMissing"));
+ }
+ String path = sec.getPath();
+
+ // Add method mapping to user properties
+ PojoMethodMapping methodMapping = new PojoMethodMapping(sec.getEndpointClass(),
+ sec.getDecoders(), path);
+ if (methodMapping.getOnClose() != null || methodMapping.getOnOpen() != null
+ || methodMapping.getOnError() != null || methodMapping.hasMessageHandlers()) {
+ sec.getUserProperties().put(nginx.unit.websocket.pojo.Constants.POJO_METHOD_MAPPING_KEY,
+ methodMapping);
+ }
+
+ UriTemplate uriTemplate = new UriTemplate(path);
+ if (uriTemplate.hasParameters()) {
+ Integer key = Integer.valueOf(uriTemplate.getSegmentCount());
+ SortedSet<TemplatePathMatch> templateMatches =
+ configTemplateMatchMap.get(key);
+ if (templateMatches == null) {
+ // Ensure that if concurrent threads execute this block they
+ // both end up using the same TreeSet instance
+ templateMatches = new TreeSet<>(
+ TemplatePathMatchComparator.getInstance());
+ configTemplateMatchMap.putIfAbsent(key, templateMatches);
+ templateMatches = configTemplateMatchMap.get(key);
+ }
+ if (!templateMatches.add(new TemplatePathMatch(sec, uriTemplate))) {
+ // Duplicate uriTemplate;
+ throw new DeploymentException(
+ sm.getString("serverContainer.duplicatePaths", path,
+ sec.getEndpointClass(),
+ sec.getEndpointClass()));
+ }
+ } else {
+ // Exact match
+ ServerEndpointConfig old = configExactMatchMap.put(path, sec);
+ if (old != null) {
+ // Duplicate path mappings
+ throw new DeploymentException(
+ sm.getString("serverContainer.duplicatePaths", path,
+ old.getEndpointClass(),
+ sec.getEndpointClass()));
+ }
+ }
+
+ endpointsRegistered = true;
+ }
+
+
+ /**
+ * Provides the equivalent of {@link #addEndpoint(ServerEndpointConfig)}
+ * for publishing plain old java objects (POJOs) that have been annotated as
+ * WebSocket endpoints.
+ *
+ * @param pojo The annotated POJO
+ */
+ @Override
+ public void addEndpoint(Class<?> pojo) throws DeploymentException {
+
+ ServerEndpoint annotation = pojo.getAnnotation(ServerEndpoint.class);
+ if (annotation == null) {
+ throw new DeploymentException(
+ sm.getString("serverContainer.missingAnnotation",
+ pojo.getName()));
+ }
+ String path = annotation.value();
+
+ // Validate encoders
+ validateEncoders(annotation.encoders());
+
+ // ServerEndpointConfig
+ ServerEndpointConfig sec;
+ Class<? extends Configurator> configuratorClazz =
+ annotation.configurator();
+ Configurator configurator = null;
+ if (!configuratorClazz.equals(Configurator.class)) {
+ try {
+ configurator = annotation.configurator().getConstructor().newInstance();
+ } catch (ReflectiveOperationException e) {
+ throw new DeploymentException(sm.getString(
+ "serverContainer.configuratorFail",
+ annotation.configurator().getName(),
+ pojo.getClass().getName()), e);
+ }
+ }
+ if (configurator == null) {
+ configurator = new nginx.unit.websocket.server.DefaultServerEndpointConfigurator();
+ }
+ sec = ServerEndpointConfig.Builder.create(pojo, path).
+ decoders(Arrays.asList(annotation.decoders())).
+ encoders(Arrays.asList(annotation.encoders())).
+ subprotocols(Arrays.asList(annotation.subprotocols())).
+ configurator(configurator).
+ build();
+
+ addEndpoint(sec);
+ }
+
+
+ boolean areEndpointsRegistered() {
+ return endpointsRegistered;
+ }
+
+
+ /**
+ * Until the WebSocket specification provides such a mechanism, this Tomcat
+ * proprietary method is provided to enable applications to programmatically
+ * determine whether or not to upgrade an individual request to WebSocket.
+ * <p>
+ * Note: This method is not used by Tomcat but is used directly by
+ * third-party code and must not be removed.
+ *
+ * @param request The request object to be upgraded
+ * @param response The response object to be populated with the result of
+ * the upgrade
+ * @param sec The server endpoint to use to process the upgrade request
+ * @param pathParams The path parameters associated with the upgrade request
+ *
+ * @throws ServletException If a configuration error prevents the upgrade
+ * from taking place
+ * @throws IOException If an I/O error occurs during the upgrade process
+ */
+ public void doUpgrade(HttpServletRequest request,
+ HttpServletResponse response, ServerEndpointConfig sec,
+ Map<String,String> pathParams)
+ throws ServletException, IOException {
+ UpgradeUtil.doUpgrade(this, request, response, sec, pathParams);
+ }
+
+
+ public WsMappingResult findMapping(String path) {
+
+ // Prevent registering additional endpoints once the first attempt has
+ // been made to use one
+ if (addAllowed) {
+ addAllowed = false;
+ }
+
+ // Check an exact match. Simple case as there are no templates.
+ ServerEndpointConfig sec = configExactMatchMap.get(path);
+ if (sec != null) {
+ return new WsMappingResult(sec, Collections.<String, String>emptyMap());
+ }
+
+ // No exact match. Need to look for template matches.
+ UriTemplate pathUriTemplate = null;
+ try {
+ pathUriTemplate = new UriTemplate(path);
+ } catch (DeploymentException e) {
+ // Path is not valid so can't be matched to a WebSocketEndpoint
+ return null;
+ }
+
+ // Number of segments has to match
+ Integer key = Integer.valueOf(pathUriTemplate.getSegmentCount());
+ SortedSet<TemplatePathMatch> templateMatches =
+ configTemplateMatchMap.get(key);
+
+ if (templateMatches == null) {
+ // No templates with an equal number of segments so there will be
+ // no matches
+ return null;
+ }
+
+ // List is in alphabetical order of normalised templates.
+ // Correct match is the first one that matches.
+ Map<String,String> pathParams = null;
+ for (TemplatePathMatch templateMatch : templateMatches) {
+ pathParams = templateMatch.getUriTemplate().match(pathUriTemplate);
+ if (pathParams != null) {
+ sec = templateMatch.getConfig();
+ break;
+ }
+ }
+
+ if (sec == null) {
+ // No match
+ return null;
+ }
+
+ return new WsMappingResult(sec, pathParams);
+ }
+
+
+
+ public boolean isEnforceNoAddAfterHandshake() {
+ return enforceNoAddAfterHandshake;
+ }
+
+
+ public void setEnforceNoAddAfterHandshake(
+ boolean enforceNoAddAfterHandshake) {
+ this.enforceNoAddAfterHandshake = enforceNoAddAfterHandshake;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ *
+ * Overridden to make it visible to other classes in this package.
+ */
+ @Override
+ protected void registerSession(Endpoint endpoint, WsSession wsSession) {
+ super.registerSession(endpoint, wsSession);
+ if (wsSession.isOpen() &&
+ wsSession.getUserPrincipal() != null &&
+ wsSession.getHttpSessionId() != null) {
+ registerAuthenticatedSession(wsSession,
+ wsSession.getHttpSessionId());
+ }
+ }
+
+
+ /**
+ * {@inheritDoc}
+ *
+ * Overridden to make it visible to other classes in this package.
+ */
+ @Override
+ protected void unregisterSession(Endpoint endpoint, WsSession wsSession) {
+ if (wsSession.getUserPrincipal() != null &&
+ wsSession.getHttpSessionId() != null) {
+ unregisterAuthenticatedSession(wsSession,
+ wsSession.getHttpSessionId());
+ }
+ super.unregisterSession(endpoint, wsSession);
+ }
+
+
+ private void registerAuthenticatedSession(WsSession wsSession,
+ String httpSessionId) {
+ Set<WsSession> wsSessions = authenticatedSessions.get(httpSessionId);
+ if (wsSessions == null) {
+ wsSessions = Collections.newSetFromMap(
+ new ConcurrentHashMap<WsSession,Boolean>());
+ authenticatedSessions.putIfAbsent(httpSessionId, wsSessions);
+ wsSessions = authenticatedSessions.get(httpSessionId);
+ }
+ wsSessions.add(wsSession);
+ }
+
+
+ private void unregisterAuthenticatedSession(WsSession wsSession,
+ String httpSessionId) {
+ Set<WsSession> wsSessions = authenticatedSessions.get(httpSessionId);
+ // wsSessions will be null if the HTTP session has ended
+ if (wsSessions != null) {
+ wsSessions.remove(wsSession);
+ }
+ }
+
+
+ public void closeAuthenticatedSession(String httpSessionId) {
+ Set<WsSession> wsSessions = authenticatedSessions.remove(httpSessionId);
+
+ if (wsSessions != null && !wsSessions.isEmpty()) {
+ for (WsSession wsSession : wsSessions) {
+ try {
+ wsSession.close(AUTHENTICATED_HTTP_SESSION_CLOSED);
+ } catch (IOException e) {
+ // Any IOExceptions during close will have been caught and the
+ // onError method called.
+ }
+ }
+ }
+ }
+
+
+ private static void validateEncoders(Class<? extends Encoder>[] encoders)
+ throws DeploymentException {
+
+ for (Class<? extends Encoder> encoder : encoders) {
+ // Need to instantiate decoder to ensure it is valid and that
+ // deployment can be failed if it is not
+ @SuppressWarnings("unused")
+ Encoder instance;
+ try {
+ encoder.getConstructor().newInstance();
+ } catch(ReflectiveOperationException e) {
+ throw new DeploymentException(sm.getString(
+ "serverContainer.encoderFail", encoder.getName()), e);
+ }
+ }
+ }
+
+
+ private static class TemplatePathMatch {
+ private final ServerEndpointConfig config;
+ private final UriTemplate uriTemplate;
+
+ public TemplatePathMatch(ServerEndpointConfig config,
+ UriTemplate uriTemplate) {
+ this.config = config;
+ this.uriTemplate = uriTemplate;
+ }
+
+
+ public ServerEndpointConfig getConfig() {
+ return config;
+ }
+
+
+ public UriTemplate getUriTemplate() {
+ return uriTemplate;
+ }
+ }
+
+
+ /**
+ * This Comparator implementation is thread-safe so only create a single
+ * instance.
+ */
+ private static class TemplatePathMatchComparator
+ implements Comparator<TemplatePathMatch> {
+
+ private static final TemplatePathMatchComparator INSTANCE =
+ new TemplatePathMatchComparator();
+
+ public static TemplatePathMatchComparator getInstance() {
+ return INSTANCE;
+ }
+
+ private TemplatePathMatchComparator() {
+ // Hide default constructor
+ }
+
+ @Override
+ public int compare(TemplatePathMatch tpm1, TemplatePathMatch tpm2) {
+ return tpm1.getUriTemplate().getNormalizedPath().compareTo(
+ tpm2.getUriTemplate().getNormalizedPath());
+ }
+ }
+}
diff --git a/src/java/nginx/unit/websocket/server/WsSessionListener.java b/src/java/nginx/unit/websocket/server/WsSessionListener.java
new file mode 100644
index 00000000..fc2bc9c5
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/WsSessionListener.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nginx.unit.websocket.server;
+
+import javax.servlet.http.HttpSessionEvent;
+import javax.servlet.http.HttpSessionListener;
+
+public class WsSessionListener implements HttpSessionListener{
+
+ private final WsServerContainer wsServerContainer;
+
+
+ public WsSessionListener(WsServerContainer wsServerContainer) {
+ this.wsServerContainer = wsServerContainer;
+ }
+
+
+ @Override
+ public void sessionDestroyed(HttpSessionEvent se) {
+ wsServerContainer.closeAuthenticatedSession(se.getSession().getId());
+ }
+}
diff --git a/src/java/nginx/unit/websocket/server/WsWriteTimeout.java b/src/java/nginx/unit/websocket/server/WsWriteTimeout.java
new file mode 100644
index 00000000..2dfc4ab2
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/WsWriteTimeout.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package nginx.unit.websocket.server;
+
+import java.util.Comparator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import nginx.unit.websocket.BackgroundProcess;
+import nginx.unit.websocket.BackgroundProcessManager;
+
+/**
+ * Provides timeouts for asynchronous web socket writes. On the server side we
+ * only have access to {@link javax.servlet.ServletOutputStream} and
+ * {@link javax.servlet.ServletInputStream} so there is no way to set a timeout
+ * for writes to the client.
+ */
+public class WsWriteTimeout implements BackgroundProcess {
+
+ private final Set<WsRemoteEndpointImplServer> endpoints =
+ new ConcurrentSkipListSet<>(new EndpointComparator());
+ private final AtomicInteger count = new AtomicInteger(0);
+ private int backgroundProcessCount = 0;
+ private volatile int processPeriod = 1;
+
+ @Override
+ public void backgroundProcess() {
+ // This method gets called once a second.
+ backgroundProcessCount ++;
+
+ if (backgroundProcessCount >= processPeriod) {
+ backgroundProcessCount = 0;
+
+ long now = System.currentTimeMillis();
+ for (WsRemoteEndpointImplServer endpoint : endpoints) {
+ if (endpoint.getTimeoutExpiry() < now) {
+ // Background thread, not the thread that triggered the
+ // write so no need to use a dispatch
+ endpoint.onTimeout(false);
+ } else {
+ // Endpoints are ordered by timeout expiry so if this point
+ // is reached there is no need to check the remaining
+ // endpoints
+ break;
+ }
+ }
+ }
+ }
+
+
+ @Override
+ public void setProcessPeriod(int period) {
+ this.processPeriod = period;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ *
+ * The default value is 1 which means asynchronous write timeouts are
+ * processed every 1 second.
+ */
+ @Override
+ public int getProcessPeriod() {
+ return processPeriod;
+ }
+
+
+ public void register(WsRemoteEndpointImplServer endpoint) {
+ boolean result = endpoints.add(endpoint);
+ if (result) {
+ int newCount = count.incrementAndGet();
+ if (newCount == 1) {
+ BackgroundProcessManager.getInstance().register(this);
+ }
+ }
+ }
+
+
+ public void unregister(WsRemoteEndpointImplServer endpoint) {
+ boolean result = endpoints.remove(endpoint);
+ if (result) {
+ int newCount = count.decrementAndGet();
+ if (newCount == 0) {
+ BackgroundProcessManager.getInstance().unregister(this);
+ }
+ }
+ }
+
+
+ /**
+ * Note: this comparator imposes orderings that are inconsistent with equals
+ */
+ private static class EndpointComparator implements
+ Comparator<WsRemoteEndpointImplServer> {
+
+ @Override
+ public int compare(WsRemoteEndpointImplServer o1,
+ WsRemoteEndpointImplServer o2) {
+
+ long t1 = o1.getTimeoutExpiry();
+ long t2 = o2.getTimeoutExpiry();
+
+ if (t1 < t2) {
+ return -1;
+ } else if (t1 == t2) {
+ return 0;
+ } else {
+ return 1;
+ }
+ }
+ }
+}
diff --git a/src/java/nginx/unit/websocket/server/package-info.java b/src/java/nginx/unit/websocket/server/package-info.java
new file mode 100644
index 00000000..87bc85a3
--- /dev/null
+++ b/src/java/nginx/unit/websocket/server/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Server-side specific implementation classes. These are in a separate package
+ * to make packaging a pure client JAR simpler.
+ */
+package nginx.unit.websocket.server;