summaryrefslogtreecommitdiffhomepage
path: root/src/java/nginx/unit/websocket/WsFrameClient.java
blob: 3174c76619383576ec1ffec2cd1c83c512fc48ae (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package nginx.unit.websocket;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.CompletionHandler;

import javax.websocket.CloseReason;
import javax.websocket.CloseReason.CloseCodes;

import org.apache.juli.logging.Log;
import org.apache.juli.logging.LogFactory;
import org.apache.tomcat.util.res.StringManager;

public class WsFrameClient extends WsFrameBase {

    private final Log log = LogFactory.getLog(WsFrameClient.class); // must not be static
    private static final StringManager sm = StringManager.getManager(WsFrameClient.class);

    private final AsyncChannelWrapper channel;
    private final CompletionHandler<Integer, Void> handler;
    // Not final as it may need to be re-sized
    private volatile ByteBuffer response;

    public WsFrameClient(ByteBuffer response, AsyncChannelWrapper channel, WsSession wsSession,
            Transformation transformation) {
        super(wsSession, transformation);
        this.response = response;
        this.channel = channel;
        this.handler = new WsFrameClientCompletionHandler();
    }


    void startInputProcessing() {
        try {
            processSocketRead();
        } catch (IOException e) {
            close(e);
        }
    }


    private void processSocketRead() throws IOException {
        while (true) {
            switch (getReadState()) {
            case WAITING:
                if (!changeReadState(ReadState.WAITING, ReadState.PROCESSING)) {
                    continue;
                }
                while (response.hasRemaining()) {
                    if (isSuspended()) {
                        if (!changeReadState(ReadState.SUSPENDING_PROCESS, ReadState.SUSPENDED)) {
                            continue;
                        }
                        // There is still data available in the response buffer
                        // Return here so that the response buffer will not be
                        // cleared and there will be no data read from the
                        // socket. Thus when the read operation is resumed first
                        // the data left in the response buffer will be consumed
                        // and then a new socket read will be performed
                        return;
                    }
                    inputBuffer.mark();
                    inputBuffer.position(inputBuffer.limit()).limit(inputBuffer.capacity());

                    int toCopy = Math.min(response.remaining(), inputBuffer.remaining());

                    // Copy remaining bytes read in HTTP phase to input buffer used by
                    // frame processing

                    int orgLimit = response.limit();
                    response.limit(response.position() + toCopy);
                    inputBuffer.put(response);
                    response.limit(orgLimit);

                    inputBuffer.limit(inputBuffer.position()).reset();

                    // Process the data we have
                    processInputBuffer();
                }
                response.clear();

                // Get some more data
                if (isOpen()) {
                    channel.read(response, null, handler);
                } else {
                    changeReadState(ReadState.CLOSING);
                }
                return;
            case SUSPENDING_WAIT:
                if (!changeReadState(ReadState.SUSPENDING_WAIT, ReadState.SUSPENDED)) {
                    continue;
                }
                return;
            default:
                throw new IllegalStateException(
                        sm.getString("wsFrameServer.illegalReadState", getReadState()));
            }
        }
    }


    private final void close(Throwable t) {
        changeReadState(ReadState.CLOSING);
        CloseReason cr;
        if (t instanceof WsIOException) {
            cr = ((WsIOException) t).getCloseReason();
        } else {
            cr = new CloseReason(CloseCodes.CLOSED_ABNORMALLY, t.getMessage());
        }

        try {
            wsSession.close(cr);
        } catch (IOException ignore) {
            // Ignore
        }
    }


    @Override
    protected boolean isMasked() {
        // Data is from the server so it is not masked
        return false;
    }


    @Override
    protected Log getLog() {
        return log;
    }

    private class WsFrameClientCompletionHandler implements CompletionHandler<Integer, Void> {

        @Override
        public void completed(Integer result, Void attachment) {
            if (result.intValue() == -1) {
                // BZ 57762. A dropped connection will get reported as EOF
                // rather than as an error so handle it here.
                if (isOpen()) {
                    // No close frame was received
                    close(new EOFException());
                }
                // No data to process
                return;
            }
            response.flip();
            doResumeProcessing(true);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            if (exc instanceof ReadBufferOverflowException) {
                // response will be empty if this exception is thrown
                response = ByteBuffer
                        .allocate(((ReadBufferOverflowException) exc).getMinBufferSize());
                response.flip();
                doResumeProcessing(false);
            } else {
                close(exc);
            }
        }

        private void doResumeProcessing(boolean checkOpenOnError) {
            while (true) {
                switch (getReadState()) {
                case PROCESSING:
                    if (!changeReadState(ReadState.PROCESSING, ReadState.WAITING)) {
                        continue;
                    }
                    resumeProcessing(checkOpenOnError);
                    return;
                case SUSPENDING_PROCESS:
                    if (!changeReadState(ReadState.SUSPENDING_PROCESS, ReadState.SUSPENDED)) {
                        continue;
                    }
                    return;
                default:
                    throw new IllegalStateException(
                            sm.getString("wsFrame.illegalReadState", getReadState()));
                }
            }
        }
    }


    @Override
    protected void resumeProcessing() {
        resumeProcessing(true);
    }

    private void resumeProcessing(boolean checkOpenOnError) {
        try {
            processSocketRead();
        } catch (IOException e) {
            if (checkOpenOnError) {
                // Only send a close message on an IOException if the client
                // has not yet received a close control message from the server
                // as the IOException may be in response to the client
                // continuing to send a message after the server sent a close
                // control message.
                if (isOpen()) {
                    if (log.isDebugEnabled()) {
                        log.debug(sm.getString("wsFrameClient.ioe"), e);
                    }
                    close(e);
                }
            } else {
                close(e);
            }
        }
    }
}