deno.land / std@0.224.0 / csv / csv_parse_stream.ts
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.// This module is browser compatible.
import { convertRowToObject, defaultReadOptions, type LineReader, parseRecord, type ParseResult, type ReadOptions,} from "../csv/_io.ts";import { TextDelimiterStream } from "../streams/text_delimiter_stream.ts";
/** Options for {@linkcode CsvParseStream}. */export interface CsvParseStreamOptions extends ReadOptions { /** * If you provide `skipFirstRow: true` and `columns`, the first line will be * skipped. * If you provide `skipFirstRow: true` but not `columns`, the first line will * be skipped and used as header definitions. */ skipFirstRow?: boolean; /** List of names used for header definition. */ columns?: readonly string[];}
class StreamLineReader implements LineReader { #reader: ReadableStreamDefaultReader<string>; #done = false; constructor(reader: ReadableStreamDefaultReader<string>) { this.#reader = reader; }
async readLine(): Promise<string | null> { const { value, done } = await this.#reader.read(); if (done) { this.#done = true; return null; } else { // NOTE: Remove trailing CR for compatibility with golang's `encoding/csv` return stripLastCR(value!); } }
isEOF(): boolean { return this.#done; }
cancel() { this.#reader.cancel(); }}
function stripLastCR(s: string): string { return s.endsWith("\r") ? s.slice(0, -1) : s;}
/** Row return type. */export type RowType<T> = T extends undefined ? string[] : ParseResult<CsvParseStreamOptions, T>[number];
/** * Read data from a CSV-encoded stream or file. Provides an auto/custom mapper * for columns. * * A `CsvParseStream` expects input conforming to * {@link https://tools.ietf.org/html/rfc4180 | RFC 4180}. * * @example * ```ts * import { CsvParseStream } from "https://deno.land/std@$STD_VERSION/csv/csv_parse_stream.ts"; * const res = await fetch("https://example.com/data.csv"); * const parts = res.body! * .pipeThrough(new TextDecoderStream()) * .pipeThrough(new CsvParseStream()); * ``` */export class CsvParseStream< const T extends CsvParseStreamOptions | undefined = undefined,> implements TransformStream<string, RowType<T>> { readonly #readable: ReadableStream< string[] | Record<string, string | unknown> >; readonly #options: CsvParseStreamOptions; readonly #lineReader: StreamLineReader; readonly #lines: TextDelimiterStream; #lineIndex = 0; #isFirstRow = true;
#headers: readonly string[] = [];
/** Construct a new instance. */ constructor(options?: T) { this.#options = { ...defaultReadOptions, ...options, };
this.#lines = new TextDelimiterStream("\n"); this.#lineReader = new StreamLineReader(this.#lines.readable.getReader()); this.#readable = new ReadableStream({ pull: (controller) => this.#pull(controller), cancel: () => this.#lineReader.cancel(), }); }
async #pull( controller: ReadableStreamDefaultController< string[] | Record<string, string | unknown> >, ): Promise<void> { const line = await this.#lineReader.readLine(); if (line === "") { // Found an empty line this.#lineIndex++; return this.#pull(controller); } if (line === null) { // Reached to EOF controller.close(); this.#lineReader.cancel(); return; }
const record = await parseRecord( line, this.#lineReader, this.#options, this.#lineIndex, ); if (record === null) { controller.close(); this.#lineReader.cancel(); return; }
if (this.#isFirstRow) { this.#isFirstRow = false; if (this.#options.skipFirstRow || this.#options.columns) { this.#headers = [];
if (this.#options.skipFirstRow) { const head = record; this.#headers = head; }
if (this.#options.columns) { this.#headers = this.#options.columns; } }
if (this.#options.skipFirstRow) { return this.#pull(controller); } }
this.#lineIndex++; if (record.length > 0) { if (this.#options.skipFirstRow || this.#options.columns) { controller.enqueue(convertRowToObject( record, this.#headers, this.#lineIndex, )); } else { controller.enqueue(record); } } else { return this.#pull(controller); } }
/** The instance's {@linkcode ReadableStream}. */ get readable(): ReadableStream<RowType<T>> { return this.#readable as ReadableStream<RowType<T>>; }
/** The instance's {@linkcode WritableStream}. */ get writable(): WritableStream<string> { return this.#lines.writable; }}
Version Info