123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- 'use strict';
- const {
- Duplex: DuplexStream,
- Readable: ReadableStream,
- Writable: WritableStream,
- } = require('stream');
- const {
- CHANNEL_EXTENDED_DATATYPE: { STDERR },
- } = require('./protocol/constants.js');
- const { bufferSlice } = require('./protocol/utils.js');
- const PACKET_SIZE = 32 * 1024;
- const MAX_WINDOW = 2 * 1024 * 1024;
- const WINDOW_THRESHOLD = MAX_WINDOW / 2;
- class ClientStderr extends ReadableStream {
- constructor(channel, streamOpts) {
- super(streamOpts);
- this._channel = channel;
- }
- _read(n) {
- if (this._channel._waitChanDrain) {
- this._channel._waitChanDrain = false;
- if (this._channel.incoming.window <= WINDOW_THRESHOLD)
- windowAdjust(this._channel);
- }
- }
- }
- class ServerStderr extends WritableStream {
- constructor(channel) {
- super({ highWaterMark: MAX_WINDOW });
- this._channel = channel;
- }
- _write(data, encoding, cb) {
- const channel = this._channel;
- const protocol = channel._client._protocol;
- const outgoing = channel.outgoing;
- const packetSize = outgoing.packetSize;
- const id = outgoing.id;
- let window = outgoing.window;
- const len = data.length;
- let p = 0;
- if (outgoing.state !== 'open')
- return;
- while (len - p > 0 && window > 0) {
- let sliceLen = len - p;
- if (sliceLen > window)
- sliceLen = window;
- if (sliceLen > packetSize)
- sliceLen = packetSize;
- if (p === 0 && sliceLen === len)
- protocol.channelExtData(id, data, STDERR);
- else
- protocol.channelExtData(id, bufferSlice(data, p, p + sliceLen), STDERR);
- p += sliceLen;
- window -= sliceLen;
- }
- outgoing.window = window;
- if (len - p > 0) {
- if (window === 0)
- channel._waitWindow = true;
- if (p > 0)
- channel._chunkErr = bufferSlice(data, p, len);
- else
- channel._chunkErr = data;
- channel._chunkcbErr = cb;
- return;
- }
- cb();
- }
- }
- class Channel extends DuplexStream {
- constructor(client, info, opts) {
- const streamOpts = {
- highWaterMark: MAX_WINDOW,
- allowHalfOpen: (!opts || (opts && opts.allowHalfOpen !== false)),
- emitClose: false,
- };
- super(streamOpts);
- this.allowHalfOpen = streamOpts.allowHalfOpen;
- const server = !!(opts && opts.server);
- this.server = server;
- this.type = info.type;
- this.subtype = undefined;
- /*
- incoming and outgoing contain these properties:
- {
- id: undefined,
- window: undefined,
- packetSize: undefined,
- state: 'closed'
- }
- */
- this.incoming = info.incoming;
- this.outgoing = info.outgoing;
- this._callbacks = [];
- this._client = client;
- this._hasX11 = false;
- this._exit = {
- code: undefined,
- signal: undefined,
- dump: undefined,
- desc: undefined,
- };
- this.stdin = this.stdout = this;
- if (server)
- this.stderr = new ServerStderr(this);
- else
- this.stderr = new ClientStderr(this, streamOpts);
- // Outgoing data
- this._waitWindow = false; // SSH-level backpressure
- // Incoming data
- this._waitChanDrain = false; // Channel Readable side backpressure
- this._chunk = undefined;
- this._chunkcb = undefined;
- this._chunkErr = undefined;
- this._chunkcbErr = undefined;
- this.on('finish', onFinish)
- .on('prefinish', onFinish); // For node v0.11+
- this.on('end', onEnd).on('close', onEnd);
- }
- _read(n) {
- if (this._waitChanDrain) {
- this._waitChanDrain = false;
- if (this.incoming.window <= WINDOW_THRESHOLD)
- windowAdjust(this);
- }
- }
- _write(data, encoding, cb) {
- const protocol = this._client._protocol;
- const outgoing = this.outgoing;
- const packetSize = outgoing.packetSize;
- const id = outgoing.id;
- let window = outgoing.window;
- const len = data.length;
- let p = 0;
- if (outgoing.state !== 'open')
- return;
- while (len - p > 0 && window > 0) {
- let sliceLen = len - p;
- if (sliceLen > window)
- sliceLen = window;
- if (sliceLen > packetSize)
- sliceLen = packetSize;
- if (p === 0 && sliceLen === len)
- protocol.channelData(id, data);
- else
- protocol.channelData(id, bufferSlice(data, p, p + sliceLen));
- p += sliceLen;
- window -= sliceLen;
- }
- outgoing.window = window;
- if (len - p > 0) {
- if (window === 0)
- this._waitWindow = true;
- if (p > 0)
- this._chunk = bufferSlice(data, p, len);
- else
- this._chunk = data;
- this._chunkcb = cb;
- return;
- }
- cb();
- }
- eof() {
- if (this.outgoing.state === 'open') {
- this.outgoing.state = 'eof';
- this._client._protocol.channelEOF(this.outgoing.id);
- }
- }
- close() {
- if (this.outgoing.state === 'open' || this.outgoing.state === 'eof') {
- this.outgoing.state = 'closing';
- this._client._protocol.channelClose(this.outgoing.id);
- }
- }
- destroy() {
- this.end();
- this.close();
- }
- // Session type-specific methods =============================================
- setWindow(rows, cols, height, width) {
- if (this.server)
- throw new Error('Client-only method called in server mode');
- if (this.type === 'session'
- && (this.subtype === 'shell' || this.subtype === 'exec')
- && this.writable
- && this.outgoing.state === 'open') {
- this._client._protocol.windowChange(this.outgoing.id,
- rows,
- cols,
- height,
- width);
- }
- }
- signal(signalName) {
- if (this.server)
- throw new Error('Client-only method called in server mode');
- if (this.type === 'session'
- && this.writable
- && this.outgoing.state === 'open') {
- this._client._protocol.signal(this.outgoing.id, signalName);
- }
- }
- exit(statusOrSignal, coreDumped, msg) {
- if (!this.server)
- throw new Error('Server-only method called in client mode');
- if (this.type === 'session'
- && this.writable
- && this.outgoing.state === 'open') {
- if (typeof statusOrSignal === 'number') {
- this._client._protocol.exitStatus(this.outgoing.id, statusOrSignal);
- } else {
- this._client._protocol.exitSignal(this.outgoing.id,
- statusOrSignal,
- coreDumped,
- msg);
- }
- }
- }
- }
- function onFinish() {
- this.eof();
- if (this.server || !this.allowHalfOpen)
- this.close();
- this.writable = false;
- }
- function onEnd() {
- this.readable = false;
- }
- function windowAdjust(self) {
- if (self.outgoing.state === 'closed')
- return;
- const amt = MAX_WINDOW - self.incoming.window;
- if (amt <= 0)
- return;
- self.incoming.window += amt;
- self._client._protocol.channelWindowAdjust(self.outgoing.id, amt);
- }
- module.exports = {
- Channel,
- MAX_WINDOW,
- PACKET_SIZE,
- windowAdjust,
- WINDOW_THRESHOLD,
- };
|