deno.land / std@0.224.0 / csv / csv_parse_stream.ts

csv_parse_stream.ts
View Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.// This module is browser compatible.
import { convertRowToObject, defaultReadOptions, type LineReader, parseRecord, type ParseResult, type ReadOptions,} from "../csv/_io.ts";import { TextDelimiterStream } from "../streams/text_delimiter_stream.ts";
/** Options for {@linkcode CsvParseStream}. */export interface CsvParseStreamOptions extends ReadOptions { /** * If you provide `skipFirstRow: true` and `columns`, the first line will be * skipped. * If you provide `skipFirstRow: true` but not `columns`, the first line will * be skipped and used as header definitions. */ skipFirstRow?: boolean; /** List of names used for header definition. */ columns?: readonly string[];}
class StreamLineReader implements LineReader { #reader: ReadableStreamDefaultReader<string>; #done = false; constructor(reader: ReadableStreamDefaultReader<string>) { this.#reader = reader; }
async readLine(): Promise<string | null> { const { value, done } = await this.#reader.read(); if (done) { this.#done = true; return null; } else { // NOTE: Remove trailing CR for compatibility with golang's `encoding/csv` return stripLastCR(value!); } }
isEOF(): boolean { return this.#done; }
cancel() { this.#reader.cancel(); }}
function stripLastCR(s: string): string { return s.endsWith("\r") ? s.slice(0, -1) : s;}
/** Row return type. */export type RowType<T> = T extends undefined ? string[] : ParseResult<CsvParseStreamOptions, T>[number];
/** * Read data from a CSV-encoded stream or file. Provides an auto/custom mapper * for columns. * * A `CsvParseStream` expects input conforming to * {@link https://tools.ietf.org/html/rfc4180 | RFC 4180}. * * @example * ```ts * import { CsvParseStream } from "https://deno.land/std@$STD_VERSION/csv/csv_parse_stream.ts"; * const res = await fetch("https://example.com/data.csv"); * const parts = res.body! * .pipeThrough(new TextDecoderStream()) * .pipeThrough(new CsvParseStream()); * ``` */export class CsvParseStream< const T extends CsvParseStreamOptions | undefined = undefined,> implements TransformStream<string, RowType<T>> { readonly #readable: ReadableStream< string[] | Record<string, string | unknown> >; readonly #options: CsvParseStreamOptions; readonly #lineReader: StreamLineReader; readonly #lines: TextDelimiterStream; #lineIndex = 0; #isFirstRow = true;
#headers: readonly string[] = [];
/** Construct a new instance. */ constructor(options?: T) { this.#options = { ...defaultReadOptions, ...options, };
this.#lines = new TextDelimiterStream("\n"); this.#lineReader = new StreamLineReader(this.#lines.readable.getReader()); this.#readable = new ReadableStream({ pull: (controller) => this.#pull(controller), cancel: () => this.#lineReader.cancel(), }); }
async #pull( controller: ReadableStreamDefaultController< string[] | Record<string, string | unknown> >, ): Promise<void> { const line = await this.#lineReader.readLine(); if (line === "") { // Found an empty line this.#lineIndex++; return this.#pull(controller); } if (line === null) { // Reached to EOF controller.close(); this.#lineReader.cancel(); return; }
const record = await parseRecord( line, this.#lineReader, this.#options, this.#lineIndex, ); if (record === null) { controller.close(); this.#lineReader.cancel(); return; }
if (this.#isFirstRow) { this.#isFirstRow = false; if (this.#options.skipFirstRow || this.#options.columns) { this.#headers = [];
if (this.#options.skipFirstRow) { const head = record; this.#headers = head; }
if (this.#options.columns) { this.#headers = this.#options.columns; } }
if (this.#options.skipFirstRow) { return this.#pull(controller); } }
this.#lineIndex++; if (record.length > 0) { if (this.#options.skipFirstRow || this.#options.columns) { controller.enqueue(convertRowToObject( record, this.#headers, this.#lineIndex, )); } else { controller.enqueue(record); } } else { return this.#pull(controller); } }
/** The instance's {@linkcode ReadableStream}. */ get readable(): ReadableStream<RowType<T>> { return this.#readable as ReadableStream<RowType<T>>; }
/** The instance's {@linkcode WritableStream}. */ get writable(): WritableStream<string> { return this.#lines.writable; }}
std

Version Info

Tagged at
6 months ago