From d6916978eef76b5d81a7194a98e54690a33d57f1 Mon Sep 17 00:00:00 2001 From: Victor Berchet Date: Tue, 3 Dec 2024 13:28:13 +0100 Subject: [PATCH 1/2] refactor: move StreamCreator to types/open-next And make onFinish optional --- .changeset/big-terms-boil.md | 5 +++++ .../src/adapters/image-optimization-adapter.ts | 7 +++++-- packages/open-next/src/core/requestHandler.ts | 3 ++- packages/open-next/src/core/routing/util.ts | 8 ++++++-- .../open-next/src/http/openNextResponse.ts | 18 +++--------------- .../overrides/wrappers/aws-lambda-streaming.ts | 6 +----- .../src/overrides/wrappers/aws-lambda.ts | 5 +---- .../src/overrides/wrappers/cloudflare-node.ts | 9 +++++---- .../open-next/src/overrides/wrappers/node.ts | 5 +---- packages/open-next/src/types/open-next.ts | 12 ++++++++++++ packages/open-next/src/types/overrides.ts | 2 +- 11 files changed, 42 insertions(+), 38 deletions(-) create mode 100644 .changeset/big-terms-boil.md diff --git a/.changeset/big-terms-boil.md b/.changeset/big-terms-boil.md new file mode 100644 index 000000000..e5a96330d --- /dev/null +++ b/.changeset/big-terms-boil.md @@ -0,0 +1,5 @@ +--- +"@opennextjs/aws": minor +--- + +refactor: move StreamCreator to types/open-next diff --git a/packages/open-next/src/adapters/image-optimization-adapter.ts b/packages/open-next/src/adapters/image-optimization-adapter.ts index 205006ce6..5ee98a7c5 100644 --- a/packages/open-next/src/adapters/image-optimization-adapter.ts +++ b/packages/open-next/src/adapters/image-optimization-adapter.ts @@ -9,7 +9,6 @@ import path from "node:path"; import type { Writable } from "node:stream"; import { loadBuildId, loadConfig } from "config/util.js"; -import type { StreamCreator } from "http/openNextResponse.js"; import { OpenNextNodeResponse } from "http/openNextResponse.js"; // @ts-ignore import { defaultConfig } from "next/dist/server/config-shared"; @@ -19,7 +18,11 @@ import { } from "next/dist/server/image-optimizer"; // @ts-ignore import type { NextUrlWithParsedQuery } from "next/dist/server/request-meta"; -import type { InternalEvent, InternalResult } from "types/open-next.js"; +import type { + InternalEvent, + InternalResult, + StreamCreator, +} from "types/open-next.js"; import { emptyReadableStream, toReadableStream } from "utils/stream.js"; import { createGenericHandler } from "../core/createGenericHandler.js"; diff --git a/packages/open-next/src/core/requestHandler.ts b/packages/open-next/src/core/requestHandler.ts index d938ba152..8d1c03944 100644 --- a/packages/open-next/src/core/requestHandler.ts +++ b/packages/open-next/src/core/requestHandler.ts @@ -1,11 +1,12 @@ import { AsyncLocalStorage } from "node:async_hooks"; -import type { OpenNextNodeResponse, StreamCreator } from "http/index.js"; +import type { OpenNextNodeResponse } from "http/index.js"; import { IncomingMessage } from "http/index.js"; import type { InternalEvent, InternalResult, RoutingResult, + StreamCreator, } from "types/open-next"; import { runWithOpenNextRequestContext } from "utils/promise"; diff --git a/packages/open-next/src/core/routing/util.ts b/packages/open-next/src/core/routing/util.ts index 6d5191b08..947d58b36 100644 --- a/packages/open-next/src/core/routing/util.ts +++ b/packages/open-next/src/core/routing/util.ts @@ -3,11 +3,15 @@ import type { OutgoingHttpHeaders } from "node:http"; import { Readable } from "node:stream"; import { BuildId, HtmlPages, NextConfig } from "config/index.js"; -import type { IncomingMessage, StreamCreator } from "http/index.js"; +import type { IncomingMessage } from "http/index.js"; import { OpenNextNodeResponse } from "http/openNextResponse.js"; import { parseHeaders } from "http/util.js"; import type { MiddlewareManifest } from "types/next-types"; -import type { InternalEvent, InternalResult } from "types/open-next.js"; +import type { + InternalEvent, + InternalResult, + StreamCreator, +} from "types/open-next.js"; import { debug, error } from "../../adapters/logger.js"; import { isBinaryContentType } from "../../utils/binary.js"; diff --git a/packages/open-next/src/http/openNextResponse.ts b/packages/open-next/src/http/openNextResponse.ts index 1cc8e777e..430b22424 100644 --- a/packages/open-next/src/http/openNextResponse.ts +++ b/packages/open-next/src/http/openNextResponse.ts @@ -8,23 +8,13 @@ import type { Socket } from "node:net"; import type { TransformCallback, Writable } from "node:stream"; import { Transform } from "node:stream"; +import type { StreamCreator } from "types/open-next"; import { debug } from "../adapters/logger"; import { parseCookies, parseHeaders } from "./util"; const SET_COOKIE_HEADER = "set-cookie"; const CANNOT_BE_USED = "This cannot be used in OpenNext"; -export interface StreamCreator { - writeHeaders(prelude: { - statusCode: number; - cookies: string[]; - headers: Record; - }): Writable; - // Just to fix an issue with aws lambda streaming with empty body - onWrite?: () => void; - onFinish: (length: number) => void; -} - // We only need to implement the methods that are used by next.js export class OpenNextNodeResponse extends Transform implements ServerResponse { statusCode!: number; @@ -92,9 +82,7 @@ export class OpenNextNodeResponse extends Transform implements ServerResponse { } get finished() { - return Boolean( - this.writableFinished && this.responseStream?.writableFinished, - ); + return this.writableFinished && (this.responseStream?.writableFinished ?? true); } setHeader(name: string, value: string | string[]): this { @@ -303,7 +291,7 @@ export class OpenNextNodeResponse extends Transform implements ServerResponse { ?.getStore() ?.pendingPromiseRunner.add(this.onEnd(this.headers)); const bodyLength = this.getBody().length; - this.streamCreator?.onFinish(bodyLength); + this.streamCreator?.onFinish?.(bodyLength); //This is only here because of aws broken streaming implementation. //Hopefully one day they will be able to give us a working streaming implementation in lambda for everyone diff --git a/packages/open-next/src/overrides/wrappers/aws-lambda-streaming.ts b/packages/open-next/src/overrides/wrappers/aws-lambda-streaming.ts index 315edacc1..bb48ba714 100644 --- a/packages/open-next/src/overrides/wrappers/aws-lambda-streaming.ts +++ b/packages/open-next/src/overrides/wrappers/aws-lambda-streaming.ts @@ -2,9 +2,9 @@ import { Readable, type Writable } from "node:stream"; import zlib from "node:zlib"; import type { APIGatewayProxyEventV2 } from "aws-lambda"; -import type { StreamCreator } from "http/index"; import type { Wrapper, WrapperHandler } from "types/overrides"; +import type { StreamCreator } from "types/open-next"; import { debug, error } from "../../adapters/logger"; import type { WarmerEvent, @@ -92,10 +92,6 @@ const handler: WrapperHandler = async (handler, converter) => return compressedStream ?? responseStream; }, - onWrite: () => { - // _hasWriten = true; - }, - onFinish: () => {}, }; const response = await handler(internalEvent, streamCreator); diff --git a/packages/open-next/src/overrides/wrappers/aws-lambda.ts b/packages/open-next/src/overrides/wrappers/aws-lambda.ts index eb1a67e41..e23135747 100644 --- a/packages/open-next/src/overrides/wrappers/aws-lambda.ts +++ b/packages/open-next/src/overrides/wrappers/aws-lambda.ts @@ -8,9 +8,9 @@ import type { CloudFrontRequestEvent, CloudFrontRequestResult, } from "aws-lambda"; -import type { StreamCreator } from "http/openNextResponse"; import type { WrapperHandler } from "types/overrides"; +import type { StreamCreator } from "types/open-next"; import type { WarmerEvent, WarmerResponse, @@ -59,9 +59,6 @@ const handler: WrapperHandler = }, }); }, - onFinish: () => { - // Do nothing - }, }; const response = await handler(internalEvent, fakeStream); diff --git a/packages/open-next/src/overrides/wrappers/cloudflare-node.ts b/packages/open-next/src/overrides/wrappers/cloudflare-node.ts index f48f81fd9..f27869362 100644 --- a/packages/open-next/src/overrides/wrappers/cloudflare-node.ts +++ b/packages/open-next/src/overrides/wrappers/cloudflare-node.ts @@ -1,8 +1,11 @@ -import type { InternalEvent, InternalResult } from "types/open-next"; +import type { + InternalEvent, + InternalResult, + StreamCreator, +} from "types/open-next"; import type { Wrapper, WrapperHandler } from "types/overrides"; import { Writable } from "node:stream"; -import type { StreamCreator } from "http/index"; const handler: WrapperHandler = async (handler, converter) => @@ -56,8 +59,6 @@ const handler: WrapperHandler = return Writable.fromWeb(writable); }, - onWrite: () => {}, - onFinish: (_length: number) => {}, }; ctx.waitUntil(handler(internalEvent, streamCreator)); diff --git a/packages/open-next/src/overrides/wrappers/node.ts b/packages/open-next/src/overrides/wrappers/node.ts index 6f53fd554..71768daae 100644 --- a/packages/open-next/src/overrides/wrappers/node.ts +++ b/packages/open-next/src/overrides/wrappers/node.ts @@ -1,8 +1,8 @@ import { createServer } from "node:http"; -import type { StreamCreator } from "http/index"; import type { Wrapper, WrapperHandler } from "types/overrides"; +import type { StreamCreator } from "types/open-next"; import { debug, error } from "../../adapters/logger"; const wrapper: WrapperHandler = async (handler, converter) => { @@ -16,9 +16,6 @@ const wrapper: WrapperHandler = async (handler, converter) => { res.uncork(); return res; }, - onFinish: () => { - // Is it necessary to do something here? - }, }; if (internalEvent.rawPath === "/__health") { res.writeHead(200, { diff --git a/packages/open-next/src/types/open-next.ts b/packages/open-next/src/types/open-next.ts index af0338deb..f24c318ac 100644 --- a/packages/open-next/src/types/open-next.ts +++ b/packages/open-next/src/types/open-next.ts @@ -1,5 +1,6 @@ import type { ReadableStream } from "node:stream/web"; +import type { Writable } from "node:stream"; import type { WarmerEvent, WarmerResponse } from "../adapters/warmer-function"; import type { Converter, @@ -35,6 +36,17 @@ export type InternalResult = { isBase64Encoded: boolean; } & BaseEventOrResult<"core">; +export interface StreamCreator { + writeHeaders(prelude: { + statusCode: number; + cookies: string[]; + headers: Record; + }): Writable; + // Just to fix an issue with aws lambda streaming with empty body + onWrite?: () => void; + onFinish?: (length: number) => void; +} + export interface DangerousOptions { /** * The tag cache is used for revalidateTags and revalidatePath. diff --git a/packages/open-next/src/types/overrides.ts b/packages/open-next/src/types/overrides.ts index b245b7b2e..f2de3bbd3 100644 --- a/packages/open-next/src/types/overrides.ts +++ b/packages/open-next/src/types/overrides.ts @@ -1,6 +1,5 @@ import type { Readable } from "node:stream"; -import type { StreamCreator } from "http/index"; import type { Meta } from "types/cache"; import type { @@ -9,6 +8,7 @@ import type { InternalEvent, InternalResult, Origin, + StreamCreator, } from "./open-next"; // Queue From 5eb848fcde091dfdb46fe4afd51f6e0fd12e1b7f Mon Sep 17 00:00:00 2001 From: Victor Berchet Date: Tue, 3 Dec 2024 13:35:05 +0100 Subject: [PATCH 2/2] fixup! format --- packages/open-next/src/http/openNextResponse.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/open-next/src/http/openNextResponse.ts b/packages/open-next/src/http/openNextResponse.ts index 430b22424..5db512ebf 100644 --- a/packages/open-next/src/http/openNextResponse.ts +++ b/packages/open-next/src/http/openNextResponse.ts @@ -82,7 +82,9 @@ export class OpenNextNodeResponse extends Transform implements ServerResponse { } get finished() { - return this.writableFinished && (this.responseStream?.writableFinished ?? true); + return this.responseStream + ? this.responseStream?.writableFinished + : this.writableFinished; } setHeader(name: string, value: string | string[]): this {