Skip to content

feat: add streaming functions snippets #631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions functions/app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ dependencies {
// TODO(thatfiredev): remove the pinned dependency version when
// https://github.com/firebase/firebase-android-sdk/issues/6522 is fixed
implementation("com.google.firebase:firebase-functions:21.0.0")

// For streaming callable Cloud Functions
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:1.10.2")
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,16 @@
import com.google.android.gms.tasks.Task;
import com.google.firebase.functions.FirebaseFunctions;
import com.google.firebase.functions.FirebaseFunctionsException;
import com.google.firebase.functions.HttpsCallableReference;
import com.google.firebase.functions.HttpsCallableResult;
import com.google.firebase.functions.StreamResponse;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MainActivity extends AppCompatActivity {
Expand Down Expand Up @@ -127,22 +134,99 @@ public void onComplete(@NonNull Task<Integer> task) {
// [END call_add_numbers]
}

private void callAddMessage(String inputMessage) {
// [START call_add_message]
addMessage(inputMessage)
.addOnCompleteListener(new OnCompleteListener<String>() {
@Override
public void onComplete(@NonNull Task<String> task) {
if (!task.isSuccessful()) {
Exception e = task.getException();
if (e instanceof FirebaseFunctionsException) {
FirebaseFunctionsException ffe = (FirebaseFunctionsException) e;
FirebaseFunctionsException.Code code = ffe.getCode();
Object details = ffe.getDetails();
}
}
}
});
// [END call_add_message]
}
private void callAddMessage(String inputMessage) {
// [START call_add_message]
addMessage(inputMessage)
.addOnCompleteListener(new OnCompleteListener<String>() {
@Override
public void onComplete(@NonNull Task<String> task) {
if (!task.isSuccessful()) {
Exception e = task.getException();
if (e instanceof FirebaseFunctionsException) {
FirebaseFunctionsException ffe = (FirebaseFunctionsException) e;
FirebaseFunctionsException.Code code = ffe.getCode();
Object details = ffe.getDetails();
}
}
}
});
// [END call_add_message]
}

private void callStreamingFunctions() {
List<devrel.firebase.google.com.functions.kotlin.MainActivity.Location> favoriteLocations = new ArrayList<>();
favoriteLocations.add(new devrel.firebase.google.com.functions.kotlin.MainActivity.Location(
"The Googleplex",
37.4220199895279,
-122.08531347325561));
favoriteLocations.add(new devrel.firebase.google.com.functions.kotlin.MainActivity.Location(
"Yosemite Valley",
37.745192257741984,
-119.5945133017153));
favoriteLocations.add(new devrel.firebase.google.com.functions.kotlin.MainActivity.Location(
"Old Faithful",
44.46037818049411,
-110.82802255265777));
// [START stream_data_client]
// Get the callable by passing an initialized functions SDK.
HttpsCallableReference getForecast = mFunctions.getHttpsCallable("getForecast");
getForecast.stream(
new HashMap<String, Object>() {{
put("locations", favoriteLocations);
}}
).subscribe(new Subscriber<StreamResponse>() {
@Override
public void onSubscribe(Subscription subscription) {
subscription.request(Long.MAX_VALUE);
}

@Override
public void onNext(StreamResponse streamResponse) {
if (streamResponse instanceof StreamResponse.Message) {
// The flow will emit a [StreamResponse.Message] value every time the
// callable function calls `sendChunk()`.
StreamResponse.Message response = (StreamResponse.Message) streamResponse;
Map<String, Object> forecastDataChunk =
(Map<String, Object>) response.getMessage().getData();
// Update the UI every time a new chunk is received
// from the callable function
updateUI(
(double) forecastDataChunk.get("latitude"),
(double) forecastDataChunk.get("longitude"),
(double) forecastDataChunk.get("forecast")
);
} else if(streamResponse instanceof StreamResponse.Result) {
// The flow will emit a [StreamResponse.Result] value when the
// callable function completes.
StreamResponse.Result response = (StreamResponse.Result) streamResponse;
List<Map<String, Object>> allWeatherForecasts =
(List<Map<String, Object>>) response.getResult().getData();
finalizeUI();
}
}

@Override
public void onError(Throwable throwable) {
// an error occurred in the function
}

@Override
public void onComplete() {

}
});
// [END stream_data_client]
}

private void updateUI(
double latitude,
double longitude,
double forecast
) {

}

private void finalizeUI() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import com.google.firebase.functions.FirebaseFunctions
import com.google.firebase.functions.FirebaseFunctionsException
import com.google.firebase.functions.functions
import com.google.firebase.Firebase
import com.google.firebase.functions.StreamResponse
import kotlinx.coroutines.reactive.asFlow

class MainActivity : AppCompatActivity() {

Expand Down Expand Up @@ -107,4 +109,56 @@ class MainActivity : AppCompatActivity() {
}
// [END call_add_message]
}

data class Location(val name: String, val latitude: Double, val longitude: Double)

private suspend fun callStreamingFunctions() {
val favoriteLocations = listOf(
Location("The Googleplex", 37.4220199895279, -122.08531347325561),
Location("Yosemite Valley", 37.745192257741984, -119.5945133017153),
Location("Old Faithful", 44.46037818049411, -110.82802255265777),
)
// [START stream_data_client]
// Get the callable by passing an initialized functions SDK.
val getForecast = functions.getHttpsCallable("getForecast");

// Call the function with the `.stream()` method and convert it to a flow
getForecast.stream(
mapOf("locations" to favoriteLocations)
).asFlow().collect { response ->
when (response) {
is StreamResponse.Message -> {
// The flow will emit a [StreamResponse.Message] value every time the
// callable function calls `sendChunk()`.
val forecastDataChunk = response.message.data as Map<String, Any>
// Update the UI every time a new chunk is received
// from the callable function
updateUI(
forecastDataChunk["latitude"] as Double,
forecastDataChunk["longitude"] as Double,
forecastDataChunk["forecast"] as Double,
)
}
is StreamResponse.Result -> {
// The flow will emit a [StreamResponse.Result] value when the
// callable function completes.
val allWeatherForecasts = response.result.data as List<Map<String, Any>>
finalizeUI(allWeatherForecasts)
}
}
}
// [END stream_data_client]
}

private fun updateUI(
latitude: Double,
longitude: Double,
forecast: Double
) {

}

private fun finalizeUI(results: List<Map<String, Any>>) {

}
}