export async function getBytes(stream: ReadableStream<Uint8Array>, onChunk: (arr: Uint8Array) => void) {
const reader = stream.getReader();
let result: ReadableStreamDefaultReadResult<Uint8Array>;
while (!(result = await reader.read()).done) {
onChunk(result.value);
}
}
export function getMessages(
onId: (id: string) => void,
onRetry: (retry: number) => void,
onMessage?: (msg: EventSourceMessage) => void
) {
let message = newMessage();
const decoder = new TextDecoder();
// return a function that can process each incoming line buffer:
return function onLine(line: Uint8Array, fieldLength: number) {
if (line.length === 0) {
// empty line denotes end of message. Trigger the callback and start a new message:
onMessage?.(message);
message = newMessage();
} else if (fieldLength > 0) { // exclude comments and lines with no values
// line is of format "<field>:<value>" or "<field>: <value>"
// [url=https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation]https://html.spec.whatwg.org/mul ... ream-interpretation[/url]
const field = decoder.decode(line.subarray(0, fieldLength));
const valueOffset = fieldLength + (line[fieldLength + 1] === ControlChars.Space ? 2 : 1);
const value = decoder.decode(line.subarray(valueOffset));
switch (field) {
case 'data':
// if this message already has data, append the new value to the old.
// otherwise, just set to the new value:
message.data = message.data
? message.data + '\n' + value
: value;
break;
case 'event':
message.event = value;
break;
case 'id':
onId(message.id = value);
break;
case 'retry':
const retry = parseInt(value, 10);
if (!isNaN(retry)) {
onRetry(message.retry = retry);
}
break;
}
}
}
}
export function getLines(onLine: (line: Uint8Array, fieldLength: number) => void) {
let buffer: Uint8Array | undefined;
let position: number; // current read position
let fieldLength: number; // length of the `field` portion of the line
let discardTrailingNewline = false;
return function onChunk(arr: Uint8Array) {
if (buffer === undefined) {
buffer = arr;
position = 0;
fieldLength = -1;
} else {
buffer = concat(buffer, arr);
}
const bufLength = buffer.length;
let lineStart = 0; // index where the current line starts
while (position < bufLength) {
if (discardTrailingNewline) {
if (buffer[position] === ControlChars.NewLine) {
lineStart = ++position; // skip to next char
}
discardTrailingNewline = false;
}
let lineEnd = -1; // index of the \r or \n char
for (; position < bufLength && lineEnd === -1; ++position) {
switch (buffer[position]) {
case ControlChars.Colon:
if (fieldLength === -1) { // first colon in line
fieldLength = position - lineStart;
}
break;
case ControlChars.CarriageReturn:
discardTrailingNewline = true;
case ControlChars.NewLine:
lineEnd = position;
break;
}
}
if (lineEnd === -1) {
break;
}
onLine(buffer.subarray(lineStart, lineEnd), fieldLength);
lineStart = position; // we're now on the next line
fieldLength = -1;
}
if (lineStart === bufLength) {
buffer = undefined; // we've finished reading it
} else if (lineStart !== 0) {
buffer = buffer.subarray(lineStart);
position -= lineStart;
}
}
}