Node.js / Streams / Performance

    Node.js Streams

    Readable, Writable, Transform, pipeline — przetwarzanie dużych plików bez OOM. Web Streams API (ReadableStream) i SSE dla Next.js App Router.

    Readable
    Odczyt danych
    Transform
    Transformacje
    pipeline()
    Backpressure
    Web Streams
    SSE/HTTP

    6 typów streamów — API i zastosowanie

    Readable, Writable, Duplex, Transform, Web ReadableStream i PassThrough — API, kierunek i kiedy używać.

    Typ API Kierunek Kiedy
    Readable fs.createReadStream(), Readable.from() Wyjscie Odczyt pliku, HTTP request body, generatory
    Writable fs.createWriteStream(), res.write() Wejscie Zapis pliku, HTTP response streaming
    Duplex net.Socket, WebSocket Dwukierunkowy Sieciowe połączenia, WebSocket raw
    Transform zlib.createGzip(), csv-parse Duplex + transform Kompresja, parsing, szyfrowanie on-the-fly
    Web ReadableStream fetch().body, new ReadableStream() Wyjscie (browser+Node) Fetch API, Next.js Route Handlers, Edge
    PassThrough new PassThrough() Proxy bez zmian Forking, monitoring, proxy streaming

    Często zadawane pytania

    Co to są Node.js Streams i kiedy ich używać?

    Node.js Streams: przetważanie danych fragmentami (chunks). Nie ładuj całego pliku do RAM. Backpressure — nie przepełniaj bufora. EventEmitter-based. Cztery typy: Readable — tylko odczyt. Writable — tylko zapis. Duplex — odczyt i zapis. Transform — Duplex z transformacją danych. Kiedy streams: duże pliki (video, CSV z milionami wierszy). HTTP request body. File upload/download. Kompresja/dekompresja on-the-fly. Ciągłe dane (WebSocket, socket). Streaming response. Readable stream: import {Readable} from 'stream'. const readable = Readable.from(['chunk1', 'chunk2', 'chunk3']). readable.on('data', chunk => process(chunk)). readable.on('end', () => done()). Lub for await (const chunk of readable) { process(chunk) }. Writable stream: const writable = fs.createWriteStream('output.txt'). writable.write('chunk'). writable.end(). writable.on('finish', () => done()). pipe(): readable.pipe(writable) — najprostszy sposób. Automatyczna backpressure. readable.pipe(transform).pipe(writable). Łańcuch transformacji. fs.createReadStream: 64KB chunki domyślnie. highWaterMark — rozmiar chunka. encoding — string zamiast Buffer. FileSystem streaming: const rs = fs.createReadStream('huge-file.csv'). const ws = fs.createWriteStream('output.csv'). rs.pipe(transform).pipe(ws).

    Transform streams i pipeline — zaawansowane użycie?

    Transform stream: Duplex, który transformuje dane. import {Transform} from 'stream'. class UpperCaseTransform extends Transform { _transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()). callback() } }. Lub: Transform({transform(chunk, enc, cb) { cb(null, chunk.toString().toUpperCase()) }}). Użycie: readable.pipe(new UpperCaseTransform()).pipe(writable). Wbudowane Transform: zlib.createGzip() — kompresja gzip. zlib.createGunzip() — dekompresja. crypto.createCipher() — szyfrowanie. csv-parse stream mode. Streaming JSON parse: stream-json. JSONParser, StreamValues. Dla dużych JSON — nie JSON.parse(). pipeline(): import {pipeline} from 'stream/promises'. await pipeline(source, transform1, transform2, destination). Automatyczna obsługa błędów. Cleanup przy błędzie. Lepsza od .pipe() (obsługa błędów). pipeline z generators: async function* transform(source) { for await (const chunk of source) { yield processChunk(chunk) } }. await pipeline(readable, transform, writable). Web Streams API (globalny): ReadableStream, WritableStream, TransformStream. Node.js 18+. Natywnie w przeglądarce. fetch() zwraca ReadableStream. response.body — ReadableStream. const reader = stream.getReader(). const {value, done} = await reader.read(). Konwersja: Readable.toWeb(nodeReadable). Readable.fromWeb(webReadable). Streaming w Next.js: App Router streaming przez Suspense. ReadableStream w Route Handler. new Response(new ReadableStream({start(controller) { controller.enqueue(data). controller.close() }})). Transfer-Encoding: chunked.

    Streaming HTTP i Server-Sent Events w Node.js?

    HTTP streaming response: res.setHeader('Content-Type', 'text/plain'). res.setHeader('Transfer-Encoding', 'chunked'). res.write('chunk 1'). res.write('chunk 2'). res.end(). Klient odbiera fragmenty na bieżąco. Server-Sent Events (SSE): jednokierunkowy streaming. res.setHeader('Content-Type', 'text/event-stream'). res.setHeader('Cache-Control', 'no-cache'). res.setHeader('Connection', 'keep-alive'). setInterval(() => res.write('data: ' + JSON.stringify(event) + 'nn'), 1000). Klient: const es = new EventSource('/events'). es.onmessage = event => console.log(event.data). SSE vs WebSocket: SSE — jednokierunkowy, HTTP, automatyczny reconnect. WebSocket — dwukierunkowy, własny protokół. SSE dla: live updates, notifications, log streaming. AI streaming responses: OpenAI streaming: stream: true. async for await of response. Vercel AI SDK: createStreamableValue, StreamingTextResponse. Next.js Route Handler: return new Response(openAiStream). Pipe OpenAI stream do Response. Brotli compression streaming: import {createBrotliCompress} from 'zlib'. pipeline(readStream, createBrotliCompress(), writeStream). CSV streaming processing: import {parse} from 'csv-parse'. fs.createReadStream('data.csv').pipe(parse({columns: true})).on('data', row => processRow(row)). Miliony wierszy bez OOM. Backpressure: writable.write() zwraca false gdy bufor pełny. readable.pause() — zatrzymaj. writable.on('drain', () => readable.resume()). pipeline() obsługuje automatycznie.

    Node.js Streams w praktyce — file processing i data pipelines?

    Duże pliki CSV: 10GB CSV — nie JSON.parse. csv-parse stream mode. const parser = parse({columns: true, skip_empty_lines: true}). fs.createReadStream('data.csv').pipe(parser). const output = fs.createWriteStream('output.jsonl'). for await (const record of parser) { output.write(JSON.stringify(record) + 'n') }. Image processing pipeline: sharp — stream support. fs.createReadStream('input.jpg').pipe(sharp().resize(800, 600).webp()).pipe(fs.createWriteStream('output.webp')). Video transcoding: ffmpeg + streams. ffmpeg() .input(readable).outputFormat('mp4').pipe(writable). Axios streaming: const response = await axios.get(url, {responseType: 'stream'}). response.data.pipe(fs.createWriteStream('file.zip')). S3 streaming upload: s3.upload({Bucket, Key, Body: readStream}).promise(). Brak ładowania całego pliku. S3 streaming download: s3.getObject({Bucket, Key}).createReadStream(). PassThrough stream: const pass = new PassThrough(). Proxy bez transformacji. Fork stream do wielu destinations. readable.pipe(pass). pass.pipe(dest1). pass.pipe(dest2). Readline interface: import {createInterface} from 'readline'. const rl = createInterface({input: fs.createReadStream('file.txt')}). for await (const line of rl) { processLine(line) }. Streaming JSON stringify: json-stream-stringify. Nie JSON.stringify(bigArray) — OOM. Encodes incrementally. Object mode streams: highWaterMark: 16 (objects). chunk to obiekt nie Buffer. node --max-old-space-size nie potrzebny. Użyj streams.

    Web Streams API i kompatybilność z Fetch API w Next.js?

    Web Streams API: standard W3C. ReadableStream, WritableStream, TransformStream. Natywnie w przeglądarce, Node.js 18+, Deno, Bun, Cloudflare Workers. fetch() i Streams: const response = await fetch(url). const reader = response.body.getReader(). while (true) { const {done, value} = await reader.read(). if (done) break. processChunk(value) }. ReadableStream creation: new ReadableStream({start(controller) { controller.enqueue(new TextEncoder().encode('hello')). controller.close() }}). Async generator to ReadableStream: ReadableStream.from(asyncGenerator()). Node.js 18+. TransformStream: const {readable, writable} = new TransformStream({transform(chunk, controller) { controller.enqueue(chunk.toString().toUpperCase()) }}). fetch(url).then(r => r.body.pipeThrough(decoder).pipeTo(output)). Next.js App Router: Route Handler streaming. export async function GET() { const stream = new ReadableStream({async start(controller) { for (const chunk of data) { controller.enqueue(chunk). await delay(100) } controller.close() }}). return new Response(stream, {headers: {'Content-Type': 'text/plain'}}) }. AI SDK streaming: streamText() od Vercel AI SDK. toAIStreamResponse() — format dla klienta. useChat hook — auto konsumuje. TextDecoderStream: Web API do dekodowania UTF-8. ByteLengthQueuingStrategy: kontrola backpressure przez rozmiar bajtów. CountQueuingStrategy: przez liczbę chunks. Node kompatybilność: Readable.toWeb(nodeStream). Readable.fromWeb(webStream). Interop z node:stream.

    Czytaj dalej

    Powiązane artykuły

    Kontakt

    Skontaktuj się z nami

    Porozmawiajmy o Twoim projekcie. Bezpłatna wycena w ciągu 24 godzin.

    Wyślij zapytanie

    Bezpłatna wycena w 24h
    Bez zobowiązań
    Indywidualne podejście
    Ekspresowa realizacja

    Telefon

    +48 790 814 814

    Pon-Pt: 9:00 - 18:00

    Email

    adam@fotz.pl

    Odpowiadamy w ciągu 24h

    Adres

    Plac Wolności 16

    61-739 Poznań

    Godziny pracy

    Pon - Pt9:00 - 18:00
    Sob - NdzZamknięte

    Wolisz porozmawiać?

    Zadzwoń teraz i porozmawiaj z naszym specjalistą o Twoim projekcie.

    Zadzwoń teraz