Channel.js 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. 'use strict';
  2. const {
  3. Duplex: DuplexStream,
  4. Readable: ReadableStream,
  5. Writable: WritableStream,
  6. } = require('stream');
  7. const {
  8. CHANNEL_EXTENDED_DATATYPE: { STDERR },
  9. } = require('./protocol/constants.js');
  10. const { bufferSlice } = require('./protocol/utils.js');
  11. const PACKET_SIZE = 32 * 1024;
  12. const MAX_WINDOW = 2 * 1024 * 1024;
  13. const WINDOW_THRESHOLD = MAX_WINDOW / 2;
  14. class ClientStderr extends ReadableStream {
  15. constructor(channel, streamOpts) {
  16. super(streamOpts);
  17. this._channel = channel;
  18. }
  19. _read(n) {
  20. if (this._channel._waitChanDrain) {
  21. this._channel._waitChanDrain = false;
  22. if (this._channel.incoming.window <= WINDOW_THRESHOLD)
  23. windowAdjust(this._channel);
  24. }
  25. }
  26. }
  27. class ServerStderr extends WritableStream {
  28. constructor(channel) {
  29. super({ highWaterMark: MAX_WINDOW });
  30. this._channel = channel;
  31. }
  32. _write(data, encoding, cb) {
  33. const channel = this._channel;
  34. const protocol = channel._client._protocol;
  35. const outgoing = channel.outgoing;
  36. const packetSize = outgoing.packetSize;
  37. const id = outgoing.id;
  38. let window = outgoing.window;
  39. const len = data.length;
  40. let p = 0;
  41. if (outgoing.state !== 'open')
  42. return;
  43. while (len - p > 0 && window > 0) {
  44. let sliceLen = len - p;
  45. if (sliceLen > window)
  46. sliceLen = window;
  47. if (sliceLen > packetSize)
  48. sliceLen = packetSize;
  49. if (p === 0 && sliceLen === len)
  50. protocol.channelExtData(id, data, STDERR);
  51. else
  52. protocol.channelExtData(id, bufferSlice(data, p, p + sliceLen), STDERR);
  53. p += sliceLen;
  54. window -= sliceLen;
  55. }
  56. outgoing.window = window;
  57. if (len - p > 0) {
  58. if (window === 0)
  59. channel._waitWindow = true;
  60. if (p > 0)
  61. channel._chunkErr = bufferSlice(data, p, len);
  62. else
  63. channel._chunkErr = data;
  64. channel._chunkcbErr = cb;
  65. return;
  66. }
  67. cb();
  68. }
  69. }
  70. class Channel extends DuplexStream {
  71. constructor(client, info, opts) {
  72. const streamOpts = {
  73. highWaterMark: MAX_WINDOW,
  74. allowHalfOpen: (!opts || (opts && opts.allowHalfOpen !== false)),
  75. emitClose: false,
  76. };
  77. super(streamOpts);
  78. this.allowHalfOpen = streamOpts.allowHalfOpen;
  79. const server = !!(opts && opts.server);
  80. this.server = server;
  81. this.type = info.type;
  82. this.subtype = undefined;
  83. /*
  84. incoming and outgoing contain these properties:
  85. {
  86. id: undefined,
  87. window: undefined,
  88. packetSize: undefined,
  89. state: 'closed'
  90. }
  91. */
  92. this.incoming = info.incoming;
  93. this.outgoing = info.outgoing;
  94. this._callbacks = [];
  95. this._client = client;
  96. this._hasX11 = false;
  97. this._exit = {
  98. code: undefined,
  99. signal: undefined,
  100. dump: undefined,
  101. desc: undefined,
  102. };
  103. this.stdin = this.stdout = this;
  104. if (server)
  105. this.stderr = new ServerStderr(this);
  106. else
  107. this.stderr = new ClientStderr(this, streamOpts);
  108. // Outgoing data
  109. this._waitWindow = false; // SSH-level backpressure
  110. // Incoming data
  111. this._waitChanDrain = false; // Channel Readable side backpressure
  112. this._chunk = undefined;
  113. this._chunkcb = undefined;
  114. this._chunkErr = undefined;
  115. this._chunkcbErr = undefined;
  116. this.on('finish', onFinish)
  117. .on('prefinish', onFinish); // For node v0.11+
  118. this.on('end', onEnd).on('close', onEnd);
  119. }
  120. _read(n) {
  121. if (this._waitChanDrain) {
  122. this._waitChanDrain = false;
  123. if (this.incoming.window <= WINDOW_THRESHOLD)
  124. windowAdjust(this);
  125. }
  126. }
  127. _write(data, encoding, cb) {
  128. const protocol = this._client._protocol;
  129. const outgoing = this.outgoing;
  130. const packetSize = outgoing.packetSize;
  131. const id = outgoing.id;
  132. let window = outgoing.window;
  133. const len = data.length;
  134. let p = 0;
  135. if (outgoing.state !== 'open')
  136. return;
  137. while (len - p > 0 && window > 0) {
  138. let sliceLen = len - p;
  139. if (sliceLen > window)
  140. sliceLen = window;
  141. if (sliceLen > packetSize)
  142. sliceLen = packetSize;
  143. if (p === 0 && sliceLen === len)
  144. protocol.channelData(id, data);
  145. else
  146. protocol.channelData(id, bufferSlice(data, p, p + sliceLen));
  147. p += sliceLen;
  148. window -= sliceLen;
  149. }
  150. outgoing.window = window;
  151. if (len - p > 0) {
  152. if (window === 0)
  153. this._waitWindow = true;
  154. if (p > 0)
  155. this._chunk = bufferSlice(data, p, len);
  156. else
  157. this._chunk = data;
  158. this._chunkcb = cb;
  159. return;
  160. }
  161. cb();
  162. }
  163. eof() {
  164. if (this.outgoing.state === 'open') {
  165. this.outgoing.state = 'eof';
  166. this._client._protocol.channelEOF(this.outgoing.id);
  167. }
  168. }
  169. close() {
  170. if (this.outgoing.state === 'open' || this.outgoing.state === 'eof') {
  171. this.outgoing.state = 'closing';
  172. this._client._protocol.channelClose(this.outgoing.id);
  173. }
  174. }
  175. destroy() {
  176. this.end();
  177. this.close();
  178. }
  179. // Session type-specific methods =============================================
  180. setWindow(rows, cols, height, width) {
  181. if (this.server)
  182. throw new Error('Client-only method called in server mode');
  183. if (this.type === 'session'
  184. && (this.subtype === 'shell' || this.subtype === 'exec')
  185. && this.writable
  186. && this.outgoing.state === 'open') {
  187. this._client._protocol.windowChange(this.outgoing.id,
  188. rows,
  189. cols,
  190. height,
  191. width);
  192. }
  193. }
  194. signal(signalName) {
  195. if (this.server)
  196. throw new Error('Client-only method called in server mode');
  197. if (this.type === 'session'
  198. && this.writable
  199. && this.outgoing.state === 'open') {
  200. this._client._protocol.signal(this.outgoing.id, signalName);
  201. }
  202. }
  203. exit(statusOrSignal, coreDumped, msg) {
  204. if (!this.server)
  205. throw new Error('Server-only method called in client mode');
  206. if (this.type === 'session'
  207. && this.writable
  208. && this.outgoing.state === 'open') {
  209. if (typeof statusOrSignal === 'number') {
  210. this._client._protocol.exitStatus(this.outgoing.id, statusOrSignal);
  211. } else {
  212. this._client._protocol.exitSignal(this.outgoing.id,
  213. statusOrSignal,
  214. coreDumped,
  215. msg);
  216. }
  217. }
  218. }
  219. }
  220. function onFinish() {
  221. this.eof();
  222. if (this.server || !this.allowHalfOpen)
  223. this.close();
  224. this.writable = false;
  225. }
  226. function onEnd() {
  227. this.readable = false;
  228. }
  229. function windowAdjust(self) {
  230. if (self.outgoing.state === 'closed')
  231. return;
  232. const amt = MAX_WINDOW - self.incoming.window;
  233. if (amt <= 0)
  234. return;
  235. self.incoming.window += amt;
  236. self._client._protocol.channelWindowAdjust(self.outgoing.id, amt);
  237. }
  238. module.exports = {
  239. Channel,
  240. MAX_WINDOW,
  241. PACKET_SIZE,
  242. windowAdjust,
  243. WINDOW_THRESHOLD,
  244. };