Skip to content

Commit 7454c07

Browse files
committed
Update streaming callable API
1 parent 9ed934d commit 7454c07

File tree

4 files changed

+83
-47
lines changed

4 files changed

+83
-47
lines changed

spec/common/providers/https.spec.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,7 @@ describe("onCallHandler", () => {
770770
cors: { origin: true, methods: "POST" },
771771
},
772772
(req, resp) => {
773-
resp.write("hello");
773+
resp.sendChunk("hello");
774774
return "world";
775775
},
776776
"gcfv2"
@@ -840,10 +840,10 @@ describe("onCallHandler", () => {
840840
{
841841
cors: { origin: true, methods: "POST" },
842842
},
843-
(req, resp) => {
844-
resp.write("initial message");
845-
mockReq.emit("close");
846-
resp.write("should not be sent");
843+
async (req, resp) => {
844+
await resp.sendChunk("initial message");
845+
await mockReq.emit("close");
846+
await resp.sendChunk("should not be sent");
847847
return "done";
848848
},
849849
"gcfv2"
@@ -908,7 +908,7 @@ describe("onCallHandler", () => {
908908
},
909909
async (resp, res) => {
910910
await new Promise((resolve) => setTimeout(resolve, 3_000));
911-
res.write("hello");
911+
res.sendChunk("hello");
912912
await new Promise((resolve) => setTimeout(resolve, 3_000));
913913
return "done";
914914
},

spec/helper.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,10 @@ export function runHandler(
8484
}
8585
}
8686

87-
public write(writeBody: any) {
87+
public write(writeBody: any, cb: () => void = () => {}) {
8888
this.sentBody += typeof writeBody === "object" ? JSON.stringify(writeBody) : writeBody;
89+
// N.B. setImmediate breaks sinon.
90+
setImmediate(cb);
8991
return true;
9092
}
9193

src/common/providers/https.ts

Lines changed: 46 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -141,23 +141,30 @@ export interface CallableRequest<T = any> {
141141
* The raw request handled by the callable.
142142
*/
143143
rawRequest: Request;
144+
145+
/**
146+
* Whether this is a streaming request.
147+
* Code can be optimized by not trying to generate a stream of chunks to
148+
* call response.sendChunk on if request.acceptsStreaming is false.
149+
* It is always safe, however, to call response.sendChunk as this will
150+
* noop if acceptsStreaming is false.
151+
*/
152+
acceptsStreaming: boolean;
144153
}
145154

146155
/**
147-
* CallableProxyResponse exposes subset of express.Response object
148-
* to allow writing partial, streaming responses back to the client.
156+
* CallableProxyResponse allows streaming response chunks and listening to signals
157+
* triggered in events such as a disconnect.
149158
*/
150-
export interface CallableProxyResponse {
159+
export interface CallableResponse<T = string> {
151160
/**
152161
* Writes a chunk of the response body to the client. This method can be called
153162
* multiple times to stream data progressively.
163+
* Returns a promise of whether the data was written. This can be false, for example,
164+
* if the request was not a streaming request. Rejects if there is a network error.
154165
*/
155-
write: express.Response["write"];
156-
/**
157-
* Indicates whether the client has requested and can handle streaming responses.
158-
* This should be checked before attempting to stream data to avoid compatibility issues.
159-
*/
160-
acceptsStreaming: boolean;
166+
sendChunk: (chunk: T) => Promise<boolean>;
167+
161168
/**
162169
* An AbortSignal that is triggered when the client disconnects or the
163170
* request is terminated prematurely.
@@ -586,13 +593,9 @@ async function checkTokens(
586593
auth: "INVALID",
587594
};
588595

589-
await Promise.all([
590-
Promise.resolve().then(async () => {
591-
verifications.auth = await checkAuthToken(req, ctx);
592-
}),
593-
Promise.resolve().then(async () => {
594-
verifications.app = await checkAppCheckToken(req, ctx, options);
595-
}),
596+
[verifications.auth, verifications.app] = await Promise.all([
597+
checkAuthToken(req, ctx),
598+
checkAppCheckToken(req, ctx, options),
596599
]);
597600

598601
const logPayload = {
@@ -697,9 +700,9 @@ async function checkAppCheckToken(
697700
}
698701

699702
type v1CallableHandler = (data: any, context: CallableContext) => any | Promise<any>;
700-
type v2CallableHandler<Req, Res> = (
703+
type v2CallableHandler<Req, Res, Stream> = (
701704
request: CallableRequest<Req>,
702-
response?: CallableProxyResponse
705+
response?: CallableResponse<Stream>
703706
) => Res;
704707

705708
/** @internal **/
@@ -717,9 +720,9 @@ export interface CallableOptions {
717720
}
718721

719722
/** @internal */
720-
export function onCallHandler<Req = any, Res = any>(
723+
export function onCallHandler<Req = any, Res = any, Stream = string>(
721724
options: CallableOptions,
722-
handler: v1CallableHandler | v2CallableHandler<Req, Res>,
725+
handler: v1CallableHandler | v2CallableHandler<Req, Res, Stream>,
723726
version: "gcfv1" | "gcfv2"
724727
): (req: Request, res: express.Response) => Promise<void> {
725728
const wrapped = wrapOnCallHandler(options, handler, version);
@@ -738,9 +741,9 @@ function encodeSSE(data: unknown): string {
738741
}
739742

740743
/** @internal */
741-
function wrapOnCallHandler<Req = any, Res = any>(
744+
function wrapOnCallHandler<Req = any, Res = any, Stream = string>(
742745
options: CallableOptions,
743-
handler: v1CallableHandler | v2CallableHandler<Req, Res>,
746+
handler: v1CallableHandler | v2CallableHandler<Req, Res, Stream>,
744747
version: "gcfv1" | "gcfv2"
745748
): (req: Request, res: express.Response) => Promise<void> {
746749
return async (req: Request, res: express.Response): Promise<void> => {
@@ -848,27 +851,41 @@ function wrapOnCallHandler<Req = any, Res = any>(
848851
const arg: CallableRequest<Req> = {
849852
...context,
850853
data,
854+
acceptsStreaming,
851855
};
852856

853-
const responseProxy: CallableProxyResponse = {
854-
write(chunk): boolean {
857+
const responseProxy: CallableResponse<Stream> = {
858+
sendChunk(chunk: Stream): Promise<boolean> {
855859
// if client doesn't accept sse-protocol, response.write() is no-op.
856860
if (!acceptsStreaming) {
857-
return false;
861+
return Promise.resolve(false);
858862
}
859863
// if connection is already closed, response.write() is no-op.
860864
if (abortController.signal.aborted) {
861-
return false;
865+
return Promise.resolve(false);
862866
}
863867
const formattedData = encodeSSE({ message: chunk });
864-
const wrote = res.write(formattedData);
868+
let resolve: (wrote: boolean) => void;
869+
let reject: (err: Error) => void;
870+
const p = new Promise<boolean>((res, rej) => {
871+
resolve = res;
872+
reject = rej;
873+
});
874+
const wrote = res.write(formattedData, (error) => {
875+
if (error) {
876+
reject(error);
877+
return;
878+
}
879+
resolve(wrote);
880+
});
881+
865882
// Reset heartbeat timer after successful write
866883
if (wrote && heartbeatInterval !== null && heartbeatSeconds > 0) {
867884
scheduleHeartbeat();
868885
}
869-
return wrote;
886+
887+
return p;
870888
},
871-
acceptsStreaming,
872889
signal: abortController.signal,
873890
};
874891
if (acceptsStreaming) {

src/v2/providers/https.ts

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import { isDebugFeatureEnabled } from "../../common/debug";
3333
import { ResetValue } from "../../common/options";
3434
import {
3535
CallableRequest,
36-
CallableProxyResponse,
36+
CallableResponse,
3737
FunctionsErrorCode,
3838
HttpsError,
3939
onCallHandler,
@@ -226,13 +226,19 @@ export type HttpsFunction = ((
226226
/**
227227
* Creates a callable method for clients to call using a Firebase SDK.
228228
*/
229-
export interface CallableFunction<T, Return> extends HttpsFunction {
229+
export interface CallableFunction<T, Return, Stream = string> extends HttpsFunction {
230230
/** Executes the handler function with the provided data as input. Used for unit testing.
231231
* @param data - An input for the handler function.
232232
* @returns The output of the handler function.
233233
*/
234-
run(data: CallableRequest<T>): Return;
234+
run(request: CallableRequest<T>): Return;
235+
236+
stream(
237+
request: CallableRequest<T>,
238+
response: CallableResponse<Stream>
239+
): { stream: AsyncIterator<Stream>; output: Return };
235240
}
241+
236242
/**
237243
* Handles HTTPS requests.
238244
* @param opts - Options to set on this function
@@ -354,22 +360,22 @@ export function onRequest(
354360
* @param handler - A function that takes a {@link https.CallableRequest}.
355361
* @returns A function that you can export and deploy.
356362
*/
357-
export function onCall<T = any, Return = any | Promise<any>>(
363+
export function onCall<T = any, Return = any | Promise<any>, Stream = string>(
358364
opts: CallableOptions,
359-
handler: (request: CallableRequest<T>, response?: CallableProxyResponse) => Return
360-
): CallableFunction<T, Return extends Promise<unknown> ? Return : Promise<Return>>;
365+
handler: (request: CallableRequest<T>, response?: CallableResponse<Stream>) => Return
366+
): CallableFunction<T, Return extends Promise<unknown> ? Return : Promise<Return>, Stream>;
361367

362368
/**
363369
* Declares a callable method for clients to call using a Firebase SDK.
364370
* @param handler - A function that takes a {@link https.CallableRequest}.
365371
* @returns A function that you can export and deploy.
366372
*/
367-
export function onCall<T = any, Return = any | Promise<any>>(
368-
handler: (request: CallableRequest<T>, response?: CallableProxyResponse) => Return
373+
export function onCall<T = any, Return = any | Promise<any>, Stream = string>(
374+
handler: (request: CallableRequest<T>, response?: CallableResponse<Stream>) => Return
369375
): CallableFunction<T, Return extends Promise<unknown> ? Return : Promise<Return>>;
370-
export function onCall<T = any, Return = any | Promise<any>>(
376+
export function onCall<T = any, Return = any | Promise<any>, Stream = string>(
371377
optsOrHandler: CallableOptions | ((request: CallableRequest<T>) => Return),
372-
handler?: (request: CallableRequest<T>, response?: CallableProxyResponse) => Return
378+
handler?: (request: CallableRequest<T>, response?: CallableResponse<Stream>) => Return
373379
): CallableFunction<T, Return extends Promise<unknown> ? Return : Promise<Return>> {
374380
let opts: CallableOptions;
375381
if (arguments.length === 1) {
@@ -388,7 +394,7 @@ export function onCall<T = any, Return = any | Promise<any>>(
388394
}
389395

390396
// fix the length of handler to make the call to handler consistent
391-
const fixedLen = (req: CallableRequest<T>, resp?: CallableProxyResponse) =>
397+
const fixedLen = (req: CallableRequest<T>, resp?: CallableResponse<Stream>) =>
392398
withInit(handler)(req, resp);
393399
let func: any = onCallHandler(
394400
{
@@ -441,6 +447,17 @@ export function onCall<T = any, Return = any | Promise<any>>(
441447
callableTrigger: {},
442448
};
443449

450+
// TODO: in the next major version, do auth/appcheck in these helper methods too.
444451
func.run = withInit(handler);
452+
func.stream = () => {
453+
return {
454+
stream: {
455+
next(): Promise<IteratorResult<Stream>> {
456+
return Promise.reject("Coming soon");
457+
},
458+
},
459+
output: Promise.reject("Coming soon"),
460+
};
461+
};
445462
return func;
446463
}

0 commit comments

Comments
 (0)