Skip to content

Commit a37bf60

Browse files
authored
feat: add streaming functions snippets (#631)
1 parent 897a3b5 commit a37bf60

File tree

3 files changed

+159
-18
lines changed

3 files changed

+159
-18
lines changed

functions/app/build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,7 @@ dependencies {
4343
// TODO(thatfiredev): remove the pinned dependency version when
4444
// https://github.com/firebase/firebase-android-sdk/issues/6522 is fixed
4545
implementation("com.google.firebase:firebase-functions:21.0.0")
46+
47+
// For streaming callable Cloud Functions
48+
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:1.10.2")
4649
}

functions/app/src/main/java/devrel/firebase/google/com/functions/MainActivity.java

Lines changed: 102 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,16 @@
2626
import com.google.android.gms.tasks.Task;
2727
import com.google.firebase.functions.FirebaseFunctions;
2828
import com.google.firebase.functions.FirebaseFunctionsException;
29+
import com.google.firebase.functions.HttpsCallableReference;
2930
import com.google.firebase.functions.HttpsCallableResult;
31+
import com.google.firebase.functions.StreamResponse;
3032

33+
import org.reactivestreams.Subscriber;
34+
import org.reactivestreams.Subscription;
35+
36+
import java.util.ArrayList;
3137
import java.util.HashMap;
38+
import java.util.List;
3239
import java.util.Map;
3340

3441
public class MainActivity extends AppCompatActivity {
@@ -127,22 +134,99 @@ public void onComplete(@NonNull Task<Integer> task) {
127134
// [END call_add_numbers]
128135
}
129136

130-
private void callAddMessage(String inputMessage) {
131-
// [START call_add_message]
132-
addMessage(inputMessage)
133-
.addOnCompleteListener(new OnCompleteListener<String>() {
134-
@Override
135-
public void onComplete(@NonNull Task<String> task) {
136-
if (!task.isSuccessful()) {
137-
Exception e = task.getException();
138-
if (e instanceof FirebaseFunctionsException) {
139-
FirebaseFunctionsException ffe = (FirebaseFunctionsException) e;
140-
FirebaseFunctionsException.Code code = ffe.getCode();
141-
Object details = ffe.getDetails();
142-
}
143-
}
144-
}
145-
});
146-
// [END call_add_message]
147-
}
137+
private void callAddMessage(String inputMessage) {
138+
// [START call_add_message]
139+
addMessage(inputMessage)
140+
.addOnCompleteListener(new OnCompleteListener<String>() {
141+
@Override
142+
public void onComplete(@NonNull Task<String> task) {
143+
if (!task.isSuccessful()) {
144+
Exception e = task.getException();
145+
if (e instanceof FirebaseFunctionsException) {
146+
FirebaseFunctionsException ffe = (FirebaseFunctionsException) e;
147+
FirebaseFunctionsException.Code code = ffe.getCode();
148+
Object details = ffe.getDetails();
149+
}
150+
}
151+
}
152+
});
153+
// [END call_add_message]
154+
}
155+
156+
private void callStreamingFunctions() {
157+
List<devrel.firebase.google.com.functions.kotlin.MainActivity.Location> favoriteLocations = new ArrayList<>();
158+
favoriteLocations.add(new devrel.firebase.google.com.functions.kotlin.MainActivity.Location(
159+
"The Googleplex",
160+
37.4220199895279,
161+
-122.08531347325561));
162+
favoriteLocations.add(new devrel.firebase.google.com.functions.kotlin.MainActivity.Location(
163+
"Yosemite Valley",
164+
37.745192257741984,
165+
-119.5945133017153));
166+
favoriteLocations.add(new devrel.firebase.google.com.functions.kotlin.MainActivity.Location(
167+
"Old Faithful",
168+
44.46037818049411,
169+
-110.82802255265777));
170+
// [START stream_data_client]
171+
// Get the callable by passing an initialized functions SDK.
172+
HttpsCallableReference getForecast = mFunctions.getHttpsCallable("getForecast");
173+
getForecast.stream(
174+
new HashMap<String, Object>() {{
175+
put("locations", favoriteLocations);
176+
}}
177+
).subscribe(new Subscriber<StreamResponse>() {
178+
@Override
179+
public void onSubscribe(Subscription subscription) {
180+
subscription.request(Long.MAX_VALUE);
181+
}
182+
183+
@Override
184+
public void onNext(StreamResponse streamResponse) {
185+
if (streamResponse instanceof StreamResponse.Message) {
186+
// The flow will emit a [StreamResponse.Message] value every time the
187+
// callable function calls `sendChunk()`.
188+
StreamResponse.Message response = (StreamResponse.Message) streamResponse;
189+
Map<String, Object> forecastDataChunk =
190+
(Map<String, Object>) response.getMessage().getData();
191+
// Update the UI every time a new chunk is received
192+
// from the callable function
193+
updateUI(
194+
(double) forecastDataChunk.get("latitude"),
195+
(double) forecastDataChunk.get("longitude"),
196+
(double) forecastDataChunk.get("forecast")
197+
);
198+
} else if(streamResponse instanceof StreamResponse.Result) {
199+
// The flow will emit a [StreamResponse.Result] value when the
200+
// callable function completes.
201+
StreamResponse.Result response = (StreamResponse.Result) streamResponse;
202+
List<Map<String, Object>> allWeatherForecasts =
203+
(List<Map<String, Object>>) response.getResult().getData();
204+
finalizeUI();
205+
}
206+
}
207+
208+
@Override
209+
public void onError(Throwable throwable) {
210+
// an error occurred in the function
211+
}
212+
213+
@Override
214+
public void onComplete() {
215+
216+
}
217+
});
218+
// [END stream_data_client]
219+
}
220+
221+
private void updateUI(
222+
double latitude,
223+
double longitude,
224+
double forecast
225+
) {
226+
227+
}
228+
229+
private void finalizeUI() {
230+
231+
}
148232
}

functions/app/src/main/java/devrel/firebase/google/com/functions/kotlin/MainActivity.kt

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import com.google.firebase.functions.FirebaseFunctions
77
import com.google.firebase.functions.FirebaseFunctionsException
88
import com.google.firebase.functions.functions
99
import com.google.firebase.Firebase
10+
import com.google.firebase.functions.StreamResponse
11+
import kotlinx.coroutines.reactive.asFlow
1012

1113
class MainActivity : AppCompatActivity() {
1214

@@ -107,4 +109,56 @@ class MainActivity : AppCompatActivity() {
107109
}
108110
// [END call_add_message]
109111
}
112+
113+
data class Location(val name: String, val latitude: Double, val longitude: Double)
114+
115+
private suspend fun callStreamingFunctions() {
116+
val favoriteLocations = listOf(
117+
Location("The Googleplex", 37.4220199895279, -122.08531347325561),
118+
Location("Yosemite Valley", 37.745192257741984, -119.5945133017153),
119+
Location("Old Faithful", 44.46037818049411, -110.82802255265777),
120+
)
121+
// [START stream_data_client]
122+
// Get the callable by passing an initialized functions SDK.
123+
val getForecast = functions.getHttpsCallable("getForecast");
124+
125+
// Call the function with the `.stream()` method and convert it to a flow
126+
getForecast.stream(
127+
mapOf("locations" to favoriteLocations)
128+
).asFlow().collect { response ->
129+
when (response) {
130+
is StreamResponse.Message -> {
131+
// The flow will emit a [StreamResponse.Message] value every time the
132+
// callable function calls `sendChunk()`.
133+
val forecastDataChunk = response.message.data as Map<String, Any>
134+
// Update the UI every time a new chunk is received
135+
// from the callable function
136+
updateUI(
137+
forecastDataChunk["latitude"] as Double,
138+
forecastDataChunk["longitude"] as Double,
139+
forecastDataChunk["forecast"] as Double,
140+
)
141+
}
142+
is StreamResponse.Result -> {
143+
// The flow will emit a [StreamResponse.Result] value when the
144+
// callable function completes.
145+
val allWeatherForecasts = response.result.data as List<Map<String, Any>>
146+
finalizeUI(allWeatherForecasts)
147+
}
148+
}
149+
}
150+
// [END stream_data_client]
151+
}
152+
153+
private fun updateUI(
154+
latitude: Double,
155+
longitude: Double,
156+
forecast: Double
157+
) {
158+
159+
}
160+
161+
private fun finalizeUI(results: List<Map<String, Any>>) {
162+
163+
}
110164
}

0 commit comments

Comments
 (0)