This code parses a series of Uint8Arrays that comprise a Transfer-Encoding: chunked request
function getChunkedData(u8) {
let crlfIndex = 0;
let chunkLength = "";
let chunkBuffer = new Uint8Array(0);
crlfIndex = u8.findIndex((v, k, array) => v === 13 && array[k + 1] === 10);
chunkLength = parseInt(
String.fromCodePoint(...u8.subarray(0, crlfIndex)),
16,
);
chunkBuffer = u8.subarray(crlfIndex + 2, crlfIndex + chunkLength + 2);
crlfIndex += chunkLength + 4;
if (isNaN(crlfIndex)) {
console.log({
crlfIndex,
chunkLength,
chunkBuffer,
inputBufferLength: u8.length,
});
}
return {
crlfIndex,
chunkLength,
chunkBuffer,
inputBufferLength: u8.length,
};
}
Server usage
if (!/(GET|POST|HEAD|OPTIONS|QUERY)/i.test(request) && !this.ws) {
if (pendingChunkLength) {
let rest = r.subarray(0, pendingChunkLength);
len += rest.length;
console.log(rest, len);
let {
crlfIndex,
chunkLength,
chunkBuffer,
inputBufferLength,
} = getChunkedData(r.subarray(pendingChunkLength - 1));
if (chunkBuffer.length) {
len += chunkBuffer.length;
console.log(chunkBuffer, len, inputBufferLength, crlfIndex);
}
pendingChunkLength = 0;
return;
}
let {
crlfIndex,
chunkLength,
chunkBuffer,
inputBufferLength,
} = getChunkedData(r);
len += chunkBuffer.length;
console.log(chunkBuffer, len);
if (chunkBuffer.length < chunkLength) {
pendingChunkLength = chunkLength - chunkBuffer.length;
}
}
Client usage
var abortable = new AbortController();
var {
readable,
writable
} = new TransformStream({
async transform(v, c) {
for (let i = 0; i < v.length; i += 4096) {
c.enqueue(v.subarray(i, i + 4096));
await scheduler.postTask(() => {}, {
delay: 20
});
}
},
flush() {
console.log("flush");
abortable.abort("");
}
}, {
highWaterMark: 1
});
var writer = writable.getWriter();
var response = fetch("http://localhost:44818", {
method: "post",
duplex: "half",
body: readable,
signal: abortable.signal,
allowHTTP1ForStreamingUpload: true
});
response.then((r) => {
console.log(...r.headers);
return r.body.pipeTo(
new WritableStream({
write(v) {
console.log(v);
},
close() {
console.log("close");
}
})
)
})
.then(() => {
console.log("Done streaming");
})
.catch(console.log);
await scheduler.postTask(() => {}, {
delay: 10
});
await writer.write(new Uint8Array(1024 ** 2));
await writer.ready;
await writer.close();