Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #4154: adding a callback for stream consumption #5119

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import io.fabric8.kubernetes.client.http.BufferUtil;
import io.fabric8.kubernetes.client.http.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down Expand Up @@ -92,6 +94,7 @@ public void onOpen(java.net.http.WebSocket webSocket) {
}
}

private static final Logger LOG = LoggerFactory.getLogger(JdkWebSocketImpl.class);
private java.net.http.WebSocket webSocket;
private AtomicLong queueSize;

Expand All @@ -106,7 +109,12 @@ public boolean send(ByteBuffer buffer) {
final int size = buffer.remaining();
queueSize.addAndGet(size);
CompletableFuture<java.net.http.WebSocket> cf = webSocket.sendBinary(buffer, true);
cf.whenComplete((b, t) -> queueSize.addAndGet(-size));
cf.whenComplete((b, t) -> {
if (t != null) {
LOG.debug("Queued write did not succeed", t);
}
queueSize.addAndGet(-size);
});
return asBoolean(cf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.exceptions.UpgradeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand All @@ -38,6 +40,9 @@
import java.util.concurrent.locks.ReentrantLock;

public class JettyWebSocket implements WebSocket, WebSocketListener {

private static final Logger LOG = LoggerFactory.getLogger(JettyWebSocket.class);

private final WebSocket.Listener listener;
private final AtomicLong sendQueue;
private final Lock lock;
Expand Down Expand Up @@ -66,6 +71,7 @@ public boolean send(ByteBuffer buffer) {
webSocketSession.getRemote().sendBytes(buffer, new WriteCallback() {
@Override
public void writeFailed(Throwable x) {
LOG.debug("Queued write did not succeed", x);
sendQueue.addAndGet(-size);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClosedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;

class VertxWebSocket implements WebSocket {

private static final Logger LOG = LoggerFactory.getLogger(VertxWebSocket.class);

private final io.vertx.core.http.WebSocket ws;
private final AtomicInteger pending = new AtomicInteger();
private final Listener listener;
Expand Down Expand Up @@ -72,7 +76,12 @@ public boolean send(ByteBuffer buffer) {
int len = vertxBuffer.length();
pending.addAndGet(len);
Future<Void> res = ws.writeBinaryMessage(vertxBuffer);
res.onComplete(ignore -> pending.addAndGet(-len));
res.onComplete(result -> {
if (result.cause() != null) {
LOG.debug("Queued write did not succeed", result.cause());
}
pending.addAndGet(-len);
});
return true;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.fabric8.kubernetes.client;

import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.CompletionStage;

public interface StreamConsumer {

public static StreamConsumer newStreamConsumer(OutputStream os) {
if (os == null) {
return null;
}
checkForPiped(os);
return newBlockingStreamConsumer(Channels.newChannel(os));
}

public static StreamConsumer newBlockingStreamConsumer(WritableByteChannel channel) {
if (channel == null) {
return null;
}
return buffer -> {
int remaining = buffer.remaining();
if (channel.write(buffer) != remaining) {
throw new KubernetesClientException("Unsucessful blocking write");
}
return null;
};
}

public static void checkForPiped(Object object) {
if (object instanceof PipedOutputStream || object instanceof PipedInputStream) {
throw new KubernetesClientException("Piped streams should not be used");
}
}

/**
* A callback for consuming a stream as a series of {@link ByteBuffer}s
*
* @param buffer
* @return a {@link CompletionStage} that is completed when the buffer has been fully consumed,
* or null if it was already consumed
* @throws Exception
*/
CompletionStage<?> consume(ByteBuffer buffer) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,35 @@ public interface ExecWatch extends Closeable {
/**
* Gets the {@link OutputStream} for stdIn if {@link ContainerResource#redirectingInput()} has been called.
* <p>
* Closing this stream does not immediately force sending. You will typically call {@link #close()} after
* you are finished writing - the close message will not be sent until all pending messages have been sent.
*
* This is a standard blocking {@link OutputStream} with the exception that close/flush do not immediately force sending.
* Instead these operations force any pending data into the write queue.
* <p>
* Until Kubernetes supports half-closing a stream https://github.com/kubernetes/kubernetes/issues/89899 you will typically
* call {@link #close()} after closing this stream - that close message will not be sent until all pending messages have been
* sent.
* <p>
* If you want non-blocking like behavior, your logic may poll the {@link #willWriteBlock(int)} method
* prior to calling a write method on the stream.
*
* @return the stdIn stream
*/
OutputStream getInput();

/**
* @return true if writing the given number of bytes will block on the write queue
*/
boolean willWriteBlock(int bytes);

/**
* Gets the {@link InputStream} for stdOut if {@link TtyExecOutputErrorable#redirectingOutput()} has been called.
*
*
* @return the stdOut stream
*/
InputStream getOutput();

/**
* Gets the {@link InputStream} for stdErr if {@link TtyExecErrorable#redirectingError()} has been called.
*
*
* @return the stdErr stream
*/
InputStream getError();
Expand All @@ -52,7 +64,7 @@ public interface ExecWatch extends Closeable {
* could indicate abnormal termination.
* <p>
* See also {@link #exitCode()}
*
*
* @return the channel 3 stream
*/
InputStream getErrorChannel();
Expand All @@ -63,6 +75,9 @@ public interface ExecWatch extends Closeable {
@Override
void close();

/**
* This operation may block if the write queue is nearly full
*/
void resize(int cols, int rows);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package io.fabric8.kubernetes.client.dsl;

import io.fabric8.kubernetes.client.StreamConsumer;

import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedOutputStream;
Expand Down Expand Up @@ -70,7 +72,11 @@ public interface Loggable {
* @param out {@link OutputStream} for storing logs
* @return returns a Closeable interface for log watch
*/
LogWatch watchLog(OutputStream out);
default LogWatch watchLog(OutputStream out) {
return watchLog(StreamConsumer.newStreamConsumer(out), true);
}

LogWatch watchLog(StreamConsumer consumer, boolean blocking);

/**
* While waiting for Pod logs, how long shall we wait until a Pod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.fabric8.kubernetes.client.dsl;

import io.fabric8.kubernetes.client.StreamConsumer;

import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedOutputStream;
Expand All @@ -27,7 +29,11 @@ public interface TtyExecErrorable extends
* <p>
* In particular do no use a {@link PipedOutputStream} - use {@link #redirectingError()} instead
*/
TtyExecErrorChannelable writingError(OutputStream in);
default TtyExecErrorChannelable writingError(OutputStream in) {
return writingError(StreamConsumer.newStreamConsumer(in), true);
}

TtyExecErrorChannelable writingError(StreamConsumer consumer, boolean blocking);

/**
* If the {@link ExecWatch} should terminate when a stdErr message is received.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.fabric8.kubernetes.client.dsl;

import io.fabric8.kubernetes.client.StreamConsumer;

import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedOutputStream;
Expand All @@ -27,7 +29,11 @@ public interface TtyExecOutputErrorable extends
* <p>
* In particular do no use a {@link PipedOutputStream} - use {@link #redirectingOutput()} instead
*/
TtyExecErrorable writingOutput(OutputStream in);
default TtyExecErrorable writingOutput(OutputStream in) {
return writingOutput(StreamConsumer.newStreamConsumer(in), true);
}

TtyExecErrorable writingOutput(StreamConsumer consumer, boolean blocking);

/**
* Will provide an {@link InputStream} via {@link ExecWatch#getOutput()}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,14 @@ public static ByteBuffer copy(ByteBuffer buffer) {

/**
* Very rudimentary method to check if the provided ByteBuffer contains text.
*
*
* @return true if the buffer contains text, false otherwise.
*/
public static boolean isPlainText(ByteBuffer originalBuffer) {
if (originalBuffer == null) {
return false;
}
final ByteBuffer buffer = copy(originalBuffer);
final ByteBuffer buffer = originalBuffer.asReadOnlyBuffer();
final CharsetDecoder decoder = StandardCharsets.UTF_8.newDecoder();
try {
decoder.decode(buffer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ default void onMessage(WebSocket webSocket, String text) {
/**
* Called once the full binary message has been built. {@link WebSocket#request()} must
* be called to receive more messages.
*
* @param bytes which will not further used nor modified by the {@link HttpClient}
*/
default void onMessage(WebSocket webSocket, ByteBuffer bytes) {
webSocket.request();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,20 @@ void consume(List<ByteBuffer> value) {
assert !complete || failed == null;
buffers.addAll(value);
buffers.notifyAll();
if ((currentBuffer != null ? currentBuffer.remaining() : 0)
+ buffers.stream().mapToInt(ByteBuffer::remaining).sum() < bufferSize) {
if (available() < bufferSize) {
request.run();
}
}
}

@Override
public int available() {
synchronized (buffers) {
return (currentBuffer != null ? currentBuffer.remaining() : 0)
+ buffers.stream().mapToInt(ByteBuffer::remaining).sum();
}
}

private ByteBuffer current() throws IOException {
synchronized (buffers) {
while (currentBuffer == null || !currentBuffer.hasRemaining()) {
Expand Down
Loading