diff --git a/functions/app/build.gradle.kts b/functions/app/build.gradle.kts index 45154cde6..c69347f21 100644 --- a/functions/app/build.gradle.kts +++ b/functions/app/build.gradle.kts @@ -43,4 +43,7 @@ dependencies { // TODO(thatfiredev): remove the pinned dependency version when // https://github.com/firebase/firebase-android-sdk/issues/6522 is fixed implementation("com.google.firebase:firebase-functions:21.0.0") + + // For streaming callable Cloud Functions + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:1.10.2") } diff --git a/functions/app/src/main/java/devrel/firebase/google/com/functions/MainActivity.java b/functions/app/src/main/java/devrel/firebase/google/com/functions/MainActivity.java index 9fdfc8a24..cded505c9 100644 --- a/functions/app/src/main/java/devrel/firebase/google/com/functions/MainActivity.java +++ b/functions/app/src/main/java/devrel/firebase/google/com/functions/MainActivity.java @@ -26,9 +26,16 @@ import com.google.android.gms.tasks.Task; import com.google.firebase.functions.FirebaseFunctions; import com.google.firebase.functions.FirebaseFunctionsException; +import com.google.firebase.functions.HttpsCallableReference; import com.google.firebase.functions.HttpsCallableResult; +import com.google.firebase.functions.StreamResponse; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; public class MainActivity extends AppCompatActivity { @@ -127,22 +134,99 @@ public void onComplete(@NonNull Task task) { // [END call_add_numbers] } - private void callAddMessage(String inputMessage) { - // [START call_add_message] - addMessage(inputMessage) - .addOnCompleteListener(new OnCompleteListener() { - @Override - public void onComplete(@NonNull Task task) { - if (!task.isSuccessful()) { - Exception e = task.getException(); - if (e instanceof FirebaseFunctionsException) { - FirebaseFunctionsException ffe = (FirebaseFunctionsException) e; - FirebaseFunctionsException.Code code = ffe.getCode(); - Object details = ffe.getDetails(); - } - } - } - }); - // [END call_add_message] - } + private void callAddMessage(String inputMessage) { + // [START call_add_message] + addMessage(inputMessage) + .addOnCompleteListener(new OnCompleteListener() { + @Override + public void onComplete(@NonNull Task task) { + if (!task.isSuccessful()) { + Exception e = task.getException(); + if (e instanceof FirebaseFunctionsException) { + FirebaseFunctionsException ffe = (FirebaseFunctionsException) e; + FirebaseFunctionsException.Code code = ffe.getCode(); + Object details = ffe.getDetails(); + } + } + } + }); + // [END call_add_message] + } + + private void callStreamingFunctions() { + List favoriteLocations = new ArrayList<>(); + favoriteLocations.add(new devrel.firebase.google.com.functions.kotlin.MainActivity.Location( + "The Googleplex", + 37.4220199895279, + -122.08531347325561)); + favoriteLocations.add(new devrel.firebase.google.com.functions.kotlin.MainActivity.Location( + "Yosemite Valley", + 37.745192257741984, + -119.5945133017153)); + favoriteLocations.add(new devrel.firebase.google.com.functions.kotlin.MainActivity.Location( + "Old Faithful", + 44.46037818049411, + -110.82802255265777)); + // [START stream_data_client] + // Get the callable by passing an initialized functions SDK. + HttpsCallableReference getForecast = mFunctions.getHttpsCallable("getForecast"); + getForecast.stream( + new HashMap() {{ + put("locations", favoriteLocations); + }} + ).subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription subscription) { + subscription.request(Long.MAX_VALUE); + } + + @Override + public void onNext(StreamResponse streamResponse) { + if (streamResponse instanceof StreamResponse.Message) { + // The flow will emit a [StreamResponse.Message] value every time the + // callable function calls `sendChunk()`. + StreamResponse.Message response = (StreamResponse.Message) streamResponse; + Map forecastDataChunk = + (Map) response.getMessage().getData(); + // Update the UI every time a new chunk is received + // from the callable function + updateUI( + (double) forecastDataChunk.get("latitude"), + (double) forecastDataChunk.get("longitude"), + (double) forecastDataChunk.get("forecast") + ); + } else if(streamResponse instanceof StreamResponse.Result) { + // The flow will emit a [StreamResponse.Result] value when the + // callable function completes. + StreamResponse.Result response = (StreamResponse.Result) streamResponse; + List> allWeatherForecasts = + (List>) response.getResult().getData(); + finalizeUI(); + } + } + + @Override + public void onError(Throwable throwable) { + // an error occurred in the function + } + + @Override + public void onComplete() { + + } + }); + // [END stream_data_client] + } + + private void updateUI( + double latitude, + double longitude, + double forecast + ) { + + } + + private void finalizeUI() { + + } } diff --git a/functions/app/src/main/java/devrel/firebase/google/com/functions/kotlin/MainActivity.kt b/functions/app/src/main/java/devrel/firebase/google/com/functions/kotlin/MainActivity.kt index 3ff9615ab..be985ce32 100644 --- a/functions/app/src/main/java/devrel/firebase/google/com/functions/kotlin/MainActivity.kt +++ b/functions/app/src/main/java/devrel/firebase/google/com/functions/kotlin/MainActivity.kt @@ -7,6 +7,8 @@ import com.google.firebase.functions.FirebaseFunctions import com.google.firebase.functions.FirebaseFunctionsException import com.google.firebase.functions.functions import com.google.firebase.Firebase +import com.google.firebase.functions.StreamResponse +import kotlinx.coroutines.reactive.asFlow class MainActivity : AppCompatActivity() { @@ -107,4 +109,56 @@ class MainActivity : AppCompatActivity() { } // [END call_add_message] } + + data class Location(val name: String, val latitude: Double, val longitude: Double) + + private suspend fun callStreamingFunctions() { + val favoriteLocations = listOf( + Location("The Googleplex", 37.4220199895279, -122.08531347325561), + Location("Yosemite Valley", 37.745192257741984, -119.5945133017153), + Location("Old Faithful", 44.46037818049411, -110.82802255265777), + ) + // [START stream_data_client] + // Get the callable by passing an initialized functions SDK. + val getForecast = functions.getHttpsCallable("getForecast"); + + // Call the function with the `.stream()` method and convert it to a flow + getForecast.stream( + mapOf("locations" to favoriteLocations) + ).asFlow().collect { response -> + when (response) { + is StreamResponse.Message -> { + // The flow will emit a [StreamResponse.Message] value every time the + // callable function calls `sendChunk()`. + val forecastDataChunk = response.message.data as Map + // Update the UI every time a new chunk is received + // from the callable function + updateUI( + forecastDataChunk["latitude"] as Double, + forecastDataChunk["longitude"] as Double, + forecastDataChunk["forecast"] as Double, + ) + } + is StreamResponse.Result -> { + // The flow will emit a [StreamResponse.Result] value when the + // callable function completes. + val allWeatherForecasts = response.result.data as List> + finalizeUI(allWeatherForecasts) + } + } + } + // [END stream_data_client] + } + + private fun updateUI( + latitude: Double, + longitude: Double, + forecast: Double + ) { + + } + + private fun finalizeUI(results: List>) { + + } }