Node.js Streams
Readable, Writable, Transform, pipeline — przetwarzanie dużych plików bez OOM. Web Streams API (ReadableStream) i SSE dla Next.js App Router.
6 typów streamów — API i zastosowanie
Readable, Writable, Duplex, Transform, Web ReadableStream i PassThrough — API, kierunek i kiedy używać.
| Typ | API | Kierunek | Kiedy |
|---|---|---|---|
| Readable | fs.createReadStream(), Readable.from() | Wyjscie | Odczyt pliku, HTTP request body, generatory |
| Writable | fs.createWriteStream(), res.write() | Wejscie | Zapis pliku, HTTP response streaming |
| Duplex | net.Socket, WebSocket | Dwukierunkowy | Sieciowe połączenia, WebSocket raw |
| Transform | zlib.createGzip(), csv-parse | Duplex + transform | Kompresja, parsing, szyfrowanie on-the-fly |
| Web ReadableStream | fetch().body, new ReadableStream() | Wyjscie (browser+Node) | Fetch API, Next.js Route Handlers, Edge |
| PassThrough | new PassThrough() | Proxy bez zmian | Forking, monitoring, proxy streaming |
Często zadawane pytania
Co to są Node.js Streams i kiedy ich używać?
Node.js Streams: przetważanie danych fragmentami (chunks). Nie ładuj całego pliku do RAM. Backpressure — nie przepełniaj bufora. EventEmitter-based. Cztery typy: Readable — tylko odczyt. Writable — tylko zapis. Duplex — odczyt i zapis. Transform — Duplex z transformacją danych. Kiedy streams: duże pliki (video, CSV z milionami wierszy). HTTP request body. File upload/download. Kompresja/dekompresja on-the-fly. Ciągłe dane (WebSocket, socket). Streaming response. Readable stream: import {Readable} from 'stream'. const readable = Readable.from(['chunk1', 'chunk2', 'chunk3']). readable.on('data', chunk => process(chunk)). readable.on('end', () => done()). Lub for await (const chunk of readable) { process(chunk) }. Writable stream: const writable = fs.createWriteStream('output.txt'). writable.write('chunk'). writable.end(). writable.on('finish', () => done()). pipe(): readable.pipe(writable) — najprostszy sposób. Automatyczna backpressure. readable.pipe(transform).pipe(writable). Łańcuch transformacji. fs.createReadStream: 64KB chunki domyślnie. highWaterMark — rozmiar chunka. encoding — string zamiast Buffer. FileSystem streaming: const rs = fs.createReadStream('huge-file.csv'). const ws = fs.createWriteStream('output.csv'). rs.pipe(transform).pipe(ws).
Transform streams i pipeline — zaawansowane użycie?
Transform stream: Duplex, który transformuje dane. import {Transform} from 'stream'. class UpperCaseTransform extends Transform { _transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()). callback() } }. Lub: Transform({transform(chunk, enc, cb) { cb(null, chunk.toString().toUpperCase()) }}). Użycie: readable.pipe(new UpperCaseTransform()).pipe(writable). Wbudowane Transform: zlib.createGzip() — kompresja gzip. zlib.createGunzip() — dekompresja. crypto.createCipher() — szyfrowanie. csv-parse stream mode. Streaming JSON parse: stream-json. JSONParser, StreamValues. Dla dużych JSON — nie JSON.parse(). pipeline(): import {pipeline} from 'stream/promises'. await pipeline(source, transform1, transform2, destination). Automatyczna obsługa błędów. Cleanup przy błędzie. Lepsza od .pipe() (obsługa błędów). pipeline z generators: async function* transform(source) { for await (const chunk of source) { yield processChunk(chunk) } }. await pipeline(readable, transform, writable). Web Streams API (globalny): ReadableStream, WritableStream, TransformStream. Node.js 18+. Natywnie w przeglądarce. fetch() zwraca ReadableStream. response.body — ReadableStream. const reader = stream.getReader(). const {value, done} = await reader.read(). Konwersja: Readable.toWeb(nodeReadable). Readable.fromWeb(webReadable). Streaming w Next.js: App Router streaming przez Suspense. ReadableStream w Route Handler. new Response(new ReadableStream({start(controller) { controller.enqueue(data). controller.close() }})). Transfer-Encoding: chunked.
Streaming HTTP i Server-Sent Events w Node.js?
HTTP streaming response: res.setHeader('Content-Type', 'text/plain'). res.setHeader('Transfer-Encoding', 'chunked'). res.write('chunk 1'). res.write('chunk 2'). res.end(). Klient odbiera fragmenty na bieżąco. Server-Sent Events (SSE): jednokierunkowy streaming. res.setHeader('Content-Type', 'text/event-stream'). res.setHeader('Cache-Control', 'no-cache'). res.setHeader('Connection', 'keep-alive'). setInterval(() => res.write('data: ' + JSON.stringify(event) + 'nn'), 1000). Klient: const es = new EventSource('/events'). es.onmessage = event => console.log(event.data). SSE vs WebSocket: SSE — jednokierunkowy, HTTP, automatyczny reconnect. WebSocket — dwukierunkowy, własny protokół. SSE dla: live updates, notifications, log streaming. AI streaming responses: OpenAI streaming: stream: true. async for await of response. Vercel AI SDK: createStreamableValue, StreamingTextResponse. Next.js Route Handler: return new Response(openAiStream). Pipe OpenAI stream do Response. Brotli compression streaming: import {createBrotliCompress} from 'zlib'. pipeline(readStream, createBrotliCompress(), writeStream). CSV streaming processing: import {parse} from 'csv-parse'. fs.createReadStream('data.csv').pipe(parse({columns: true})).on('data', row => processRow(row)). Miliony wierszy bez OOM. Backpressure: writable.write() zwraca false gdy bufor pełny. readable.pause() — zatrzymaj. writable.on('drain', () => readable.resume()). pipeline() obsługuje automatycznie.
Node.js Streams w praktyce — file processing i data pipelines?
Duże pliki CSV: 10GB CSV — nie JSON.parse. csv-parse stream mode. const parser = parse({columns: true, skip_empty_lines: true}). fs.createReadStream('data.csv').pipe(parser). const output = fs.createWriteStream('output.jsonl'). for await (const record of parser) { output.write(JSON.stringify(record) + 'n') }. Image processing pipeline: sharp — stream support. fs.createReadStream('input.jpg').pipe(sharp().resize(800, 600).webp()).pipe(fs.createWriteStream('output.webp')). Video transcoding: ffmpeg + streams. ffmpeg() .input(readable).outputFormat('mp4').pipe(writable). Axios streaming: const response = await axios.get(url, {responseType: 'stream'}). response.data.pipe(fs.createWriteStream('file.zip')). S3 streaming upload: s3.upload({Bucket, Key, Body: readStream}).promise(). Brak ładowania całego pliku. S3 streaming download: s3.getObject({Bucket, Key}).createReadStream(). PassThrough stream: const pass = new PassThrough(). Proxy bez transformacji. Fork stream do wielu destinations. readable.pipe(pass). pass.pipe(dest1). pass.pipe(dest2). Readline interface: import {createInterface} from 'readline'. const rl = createInterface({input: fs.createReadStream('file.txt')}). for await (const line of rl) { processLine(line) }. Streaming JSON stringify: json-stream-stringify. Nie JSON.stringify(bigArray) — OOM. Encodes incrementally. Object mode streams: highWaterMark: 16 (objects). chunk to obiekt nie Buffer. node --max-old-space-size nie potrzebny. Użyj streams.
Web Streams API i kompatybilność z Fetch API w Next.js?
Web Streams API: standard W3C. ReadableStream, WritableStream, TransformStream. Natywnie w przeglądarce, Node.js 18+, Deno, Bun, Cloudflare Workers. fetch() i Streams: const response = await fetch(url). const reader = response.body.getReader(). while (true) { const {done, value} = await reader.read(). if (done) break. processChunk(value) }. ReadableStream creation: new ReadableStream({start(controller) { controller.enqueue(new TextEncoder().encode('hello')). controller.close() }}). Async generator to ReadableStream: ReadableStream.from(asyncGenerator()). Node.js 18+. TransformStream: const {readable, writable} = new TransformStream({transform(chunk, controller) { controller.enqueue(chunk.toString().toUpperCase()) }}). fetch(url).then(r => r.body.pipeThrough(decoder).pipeTo(output)). Next.js App Router: Route Handler streaming. export async function GET() { const stream = new ReadableStream({async start(controller) { for (const chunk of data) { controller.enqueue(chunk). await delay(100) } controller.close() }}). return new Response(stream, {headers: {'Content-Type': 'text/plain'}}) }. AI SDK streaming: streamText() od Vercel AI SDK. toAIStreamResponse() — format dla klienta. useChat hook — auto konsumuje. TextDecoderStream: Web API do dekodowania UTF-8. ByteLengthQueuingStrategy: kontrola backpressure przez rozmiar bajtów. CountQueuingStrategy: przez liczbę chunks. Node kompatybilność: Readable.toWeb(nodeStream). Readable.fromWeb(webStream). Interop z node:stream.
Powiązane artykuły
Skontaktuj się z nami
Porozmawiajmy o Twoim projekcie. Bezpłatna wycena w ciągu 24 godzin.
Wyślij zapytanie
Telefon
+48 790 814 814
Pon-Pt: 9:00 - 18:00
adam@fotz.pl
Odpowiadamy w ciągu 24h
Adres
Plac Wolności 16
61-739 Poznań
Godziny pracy
Wolisz porozmawiać?
Zadzwoń teraz i porozmawiaj z naszym specjalistą o Twoim projekcie.
Zadzwoń teraz