feat: streaming decode functionality with event-based parsing (closes #131)

This commit is contained in:
Johann Schopplich
2025-11-21 22:29:57 +01:00
parent 9ebad53ea3
commit 6c57a14009
19 changed files with 2220 additions and 431 deletions

View File

@@ -134,8 +134,9 @@ cat million-records.toon | toon --decode > output.json
**Memory efficiency:**
- **Encode (JSON → TOON)**: Streams TOON lines to output without full string in memory
- **Decode (TOON → JSON)**: Streams JSON tokens to output without full string in memory
- **Decode (TOON → JSON)**: Uses the same event-based streaming decoder as the `decodeStream` API in `@toon-format/toon`, streaming JSON tokens to output without full string in memory
- Peak memory usage scales with data depth, not total size
- When `--expand-paths safe` is enabled, decode falls back to non-streaming mode internally to apply deep-merge expansion before writing JSON
> [!NOTE]
> When using `--stats` with encode, the full output string is kept in memory for token counting. Omit `--stats` for maximum memory efficiency with very large datasets.

View File

@@ -1,14 +1,15 @@
import type { FileHandle } from 'node:fs/promises'
import type { DecodeOptions, EncodeOptions } from '../../toon/src'
import type { DecodeOptions, DecodeStreamOptions, EncodeOptions } from '../../toon/src'
import type { InputSource } from './types'
import * as fsp from 'node:fs/promises'
import * as path from 'node:path'
import process from 'node:process'
import { consola } from 'consola'
import { estimateTokenCount } from 'tokenx'
import { decode, encode, encodeLines } from '../../toon/src'
import { decode, decodeStream, encode, encodeLines } from '../../toon/src'
import { jsonStreamFromEvents } from './json-from-events'
import { jsonStringifyLines } from './json-stringify-stream'
import { formatInputLabel, readInput } from './utils'
import { formatInputLabel, readInput, readLinesFromSource } from './utils'
export async function encodeToToon(config: {
input: InputSource
@@ -80,22 +81,43 @@ export async function decodeToJson(config: {
strict: NonNullable<DecodeOptions['strict']>
expandPaths?: NonNullable<DecodeOptions['expandPaths']>
}): Promise<void> {
const toonContent = await readInput(config.input)
// Path expansion requires full value in memory, so use non-streaming path
if (config.expandPaths === 'safe') {
const toonContent = await readInput(config.input)
let data: unknown
try {
const decodeOptions: DecodeOptions = {
indent: config.indent,
strict: config.strict,
expandPaths: config.expandPaths,
let data: unknown
try {
const decodeOptions: DecodeOptions = {
indent: config.indent,
strict: config.strict,
expandPaths: config.expandPaths,
}
data = decode(toonContent, decodeOptions)
}
catch (error) {
throw new Error(`Failed to decode TOON: ${error instanceof Error ? error.message : String(error)}`)
}
data = decode(toonContent, decodeOptions)
}
catch (error) {
throw new Error(`Failed to decode TOON: ${error instanceof Error ? error.message : String(error)}`)
}
await writeStreamingJson(jsonStringifyLines(data, config.indent), config.output)
await writeStreamingJson(jsonStringifyLines(data, config.indent), config.output)
}
else {
try {
const lineSource = readLinesFromSource(config.input)
const decodeStreamOptions: DecodeStreamOptions = {
indent: config.indent,
strict: config.strict,
}
const events = decodeStream(lineSource, decodeStreamOptions)
const jsonChunks = jsonStreamFromEvents(events, config.indent)
await writeStreamingJson(jsonChunks, config.output)
}
catch (error) {
throw new Error(`Failed to decode TOON: ${error instanceof Error ? error.message : String(error)}`)
}
}
if (config.output) {
const relativeInputPath = formatInputLabel(config.input)
@@ -109,7 +131,7 @@ export async function decodeToJson(config: {
* Chunks are written one at a time without building the full string in memory.
*/
async function writeStreamingJson(
chunks: Iterable<string>,
chunks: AsyncIterable<string> | Iterable<string>,
outputPath?: string,
): Promise<void> {
// Stream to file using fs/promises API
@@ -119,7 +141,7 @@ async function writeStreamingJson(
try {
fileHandle = await fsp.open(outputPath, 'w')
for (const chunk of chunks) {
for await (const chunk of chunks) {
await fileHandle.write(chunk)
}
}
@@ -129,7 +151,7 @@ async function writeStreamingJson(
}
// Stream to stdout
else {
for (const chunk of chunks) {
for await (const chunk of chunks) {
process.stdout.write(chunk)
}

View File

@@ -0,0 +1,217 @@
import type { JsonStreamEvent } from '../../toon/src/types'
/**
* Context for tracking JSON structure state during event streaming.
*/
type JsonContext
= | { type: 'object', needsComma: boolean, expectValue: boolean }
| { type: 'array', needsComma: boolean }
/**
* Converts a stream of `JsonStreamEvent` into formatted JSON string chunks.
*
* Similar to `jsonStringifyLines` but driven by events instead of a value tree.
* Useful for streaming TOON decode directly to JSON output without building
* the full data structure in memory.
*
* @param events - Async iterable of JSON stream events
* @param indent - Number of spaces for indentation (0 = compact, >0 = pretty)
* @returns Async iterable of JSON string chunks
*
* @example
* ```ts
* const lines = readLinesFromSource(input)
* const events = decodeStream(lines)
* for await (const chunk of jsonStreamFromEvents(events, 2)) {
* process.stdout.write(chunk)
* }
* ```
*/
export async function* jsonStreamFromEvents(
events: AsyncIterable<JsonStreamEvent>,
indent: number = 2,
): AsyncIterable<string> {
const stack: JsonContext[] = []
let depth = 0
for await (const event of events) {
const parent = stack.length > 0 ? stack[stack.length - 1] : undefined
switch (event.type) {
case 'startObject': {
// Emit comma if needed (inside array or after previous object field value)
if (parent) {
if (parent.type === 'array' && parent.needsComma) {
yield ','
}
else if (parent.type === 'object' && !parent.expectValue) {
// Object field value already emitted, this is a nested object after a key
// The comma is handled by the key event
}
}
// Emit newline and indent for pretty printing
if (indent > 0 && parent) {
if (parent.type === 'array') {
yield '\n'
yield ' '.repeat(depth * indent)
}
}
yield '{'
stack.push({ type: 'object', needsComma: false, expectValue: false })
depth++
break
}
case 'endObject': {
const context = stack.pop()
if (!context || context.type !== 'object') {
throw new Error('Mismatched endObject event')
}
depth--
// Emit newline and indent for closing brace (pretty print)
if (indent > 0 && context.needsComma) {
yield '\n'
yield ' '.repeat(depth * indent)
}
yield '}'
// Mark parent as needing comma for next item
const newParent = stack.length > 0 ? stack[stack.length - 1] : undefined
if (newParent) {
if (newParent.type === 'object') {
newParent.expectValue = false
newParent.needsComma = true
}
else if (newParent.type === 'array') {
newParent.needsComma = true
}
}
break
}
case 'startArray': {
// Emit comma if needed
if (parent) {
if (parent.type === 'array' && parent.needsComma) {
yield ','
}
}
// Emit newline and indent for pretty printing
if (indent > 0 && parent) {
if (parent.type === 'array') {
yield '\n'
yield ' '.repeat(depth * indent)
}
}
yield '['
stack.push({
type: 'array',
needsComma: false,
})
depth++
break
}
case 'endArray': {
const context = stack.pop()
if (!context || context.type !== 'array') {
throw new Error('Mismatched endArray event')
}
depth--
// Emit newline and indent for closing bracket (pretty print)
if (indent > 0 && context.needsComma) {
yield '\n'
yield ' '.repeat(depth * indent)
}
yield ']'
// Mark parent as needing comma for next item
const newParent = stack.length > 0 ? stack[stack.length - 1] : undefined
if (newParent) {
if (newParent.type === 'object') {
newParent.expectValue = false
newParent.needsComma = true
}
else if (newParent.type === 'array') {
newParent.needsComma = true
}
}
break
}
case 'key': {
if (!parent || parent.type !== 'object') {
throw new Error('Key event outside of object context')
}
// Emit comma before this field if needed
if (parent.needsComma) {
yield ','
}
// Emit newline and indent (pretty print)
if (indent > 0) {
yield '\n'
yield ' '.repeat(depth * indent)
}
// Emit key
yield JSON.stringify(event.key)
yield indent > 0 ? ': ' : ':'
parent.expectValue = true
parent.needsComma = true
break
}
case 'primitive': {
// Emit comma if needed
if (parent) {
if (parent.type === 'array' && parent.needsComma) {
yield ','
}
else if (parent.type === 'object' && !parent.expectValue) {
// This shouldn't happen in well-formed events
throw new Error('Primitive event in object without preceding key')
}
}
// Emit newline and indent for array items (pretty print)
if (indent > 0 && parent && parent.type === 'array') {
yield '\n'
yield ' '.repeat(depth * indent)
}
// Emit primitive value
yield JSON.stringify(event.value)
// Update parent context
if (parent) {
if (parent.type === 'object') {
parent.expectValue = false
// needsComma already true from key event
}
else if (parent.type === 'array') {
parent.needsComma = true
}
}
break
}
}
}
// Ensure stack is empty
if (stack.length !== 0) {
throw new Error('Incomplete event stream: unclosed objects or arrays')
}
}

View File

@@ -1,4 +1,5 @@
import type { InputSource } from './types'
import { createReadStream } from 'node:fs'
import * as fsp from 'node:fs/promises'
import * as path from 'node:path'
import process from 'node:process'
@@ -77,3 +78,32 @@ function readFromStdin(): Promise<string> {
stdin.resume()
})
}
export async function* readLinesFromSource(source: InputSource): AsyncIterable<string> {
const stream = source.type === 'stdin'
? process.stdin
: createReadStream(source.path, { encoding: 'utf-8' })
// Explicitly set encoding for stdin
if (source.type === 'stdin') {
stream.setEncoding('utf-8')
}
let buffer = ''
for await (const chunk of stream) {
buffer += chunk
let index: number
while ((index = buffer.indexOf('\n')) !== -1) {
const line = buffer.slice(0, index)
buffer = buffer.slice(index + 1)
yield line
}
}
// Emit last line if buffer is not empty and doesn't end with newline
if (buffer.length > 0) {
yield buffer
}
}