Skip to content

feat: implement tasks #54

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Mar 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
92c2d78
feat: tasks module
pr-Mais Jan 20, 2023
375b1e0
tasks using callable handler
pr-Mais Jan 20, 2023
0ae2b03
docs: on_task_dipached desc
pr-Mais Jan 20, 2023
cc67a74
feat: set required api
pr-Mais Jan 20, 2023
4b9b56f
fix: required apis
pr-Mais Jan 20, 2023
7de6439
fix: skip required apis if it doesn't exist
pr-Mais Jan 20, 2023
f949ecd
fix: required api type check
pr-Mais Jan 20, 2023
8870543
fix: spelling
pr-Mais Jan 20, 2023
e0ffd77
feat: tasks module sample
pr-Mais Jan 23, 2023
e65957a
Merge branch '8-storage' of https://github.com/invertase/firebase-fun…
Salakar Jan 23, 2023
bdad636
update venv
Salakar Jan 26, 2023
4c245bf
refactor: finalize and add missing options classes and manifest conve…
Salakar Jan 26, 2023
71f05fb
Merge branch '8-storage' of https://github.com/invertase/firebase-fun…
Salakar Jan 26, 2023
f69c8cf
Merge branch '8-storage' of https://github.com/invertase/firebase-fun…
Salakar Jan 26, 2023
d1bb30f
fix: tasks example
pr-Mais Jan 27, 2023
94ca397
ocs: added README and comments on the tasks sample
pr-Mais Jan 27, 2023
a067a91
fix: readme
pr-Mais Jan 27, 2023
da414f4
-
Salakar Feb 1, 2023
bf3ebdf
Merge branch 'main' of https://github.com/invertase/firebase-function…
Salakar Feb 1, 2023
f1c2c92
Merge branch 'types-readability' of https://github.com/invertase/fire…
Salakar Mar 20, 2023
80d4116
-
Salakar Mar 20, 2023
cd2f3e0
test(tasks): add unit tests
Salakar Mar 20, 2023
1ecf6ab
refactor: rename to tasks_fn
Salakar Mar 20, 2023
01b392b
Merge branch 'main' of https://github.com/invertase/firebase-function…
Salakar Mar 23, 2023
3ad5130
fix: merge APIs should merge reasons
Salakar Mar 27, 2023
28bf828
chore: update tasks example
Salakar Mar 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ exclude = build|setup.py|venv

# cloudevents package has no types
ignore_missing_imports = True
enable_incomplete_feature = Unpack

[mypy-yaml.*]
ignore_missing_imports = True
5 changes: 5 additions & 0 deletions samples/basic_tasks/.firebaserc
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"projects": {
"default": "python-functions-testing"
}
}
66 changes: 66 additions & 0 deletions samples/basic_tasks/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Logs
logs
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
firebase-debug.log*
firebase-debug.*.log*

# Firebase cache
.firebase/

# Firebase config

# Uncomment this if you'd like others to create their own Firebase project.
# For a team working on the same Firebase project(s), it is recommended to leave
# it commented so all members can deploy to the same project(s) in .firebaserc.
# .firebaserc

# Runtime data
pids
*.pid
*.seed
*.pid.lock

# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov

# Coverage directory used by tools like istanbul
coverage

# nyc test coverage
.nyc_output

# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt

# Bower dependency directory (https://bower.io/)
bower_components

# node-waf configuration
.lock-wscript

# Compiled binary addons (http://nodejs.org/api/addons.html)
build/Release

# Dependency directories
node_modules/

# Optional npm cache directory
.npm

# Optional eslint cache
.eslintcache

# Optional REPL history
.node_repl_history

# Output of 'npm pack'
*.tgz

# Yarn Integrity file
.yarn-integrity

# dotenv environment variables file
.env
3 changes: 3 additions & 0 deletions samples/basic_tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Required to avoid a 'duplicate modules' mypy error
# in monorepos that have multiple main.py files.
# https://github.com/python/mypy/issues/4008
11 changes: 11 additions & 0 deletions samples/basic_tasks/firebase.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"functions": [
{
"source": "functions",
"codebase": "default",
"ignore": [
"venv"
]
}
]
}
13 changes: 13 additions & 0 deletions samples/basic_tasks/functions/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# pyenv
.python-version

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Environments
.env
.venv
venv/
venv.bak/
__pycache__
69 changes: 69 additions & 0 deletions samples/basic_tasks/functions/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Firebase Cloud Functions for Tasks."""

import datetime
import json

from firebase_admin import initialize_app
from google.cloud import tasks_v2
from firebase_functions import tasks_fn, https_fn
from firebase_functions.options import SupportedRegion, RetryConfig, RateLimits

app = initialize_app()


# Once this function is deployed, a Task Queue will be created with the name
# `on_task_dispatched_example`. You can then enqueue tasks to this queue by
# calling the `enqueue_task` function.
@tasks_fn.on_task_dispatched(
retry_config=RetryConfig(max_attempts=5),
rate_limits=RateLimits(max_concurrent_dispatches=10),
region=SupportedRegion.US_CENTRAL1,
)
def ontaskdispatchedexample(req: tasks_fn.CallableRequest):
"""
The endpoint which will be executed by the enqueued task.
"""
print(req.data)


# To enqueue a task, you can use the following function.
# e.g.
# curl -X POST -H "Content-Type: application/json" \
# -d '{"data": "Hello World!"}' \
# https://enqueue-task-<projectHash>-<region>.a.run.app\
@https_fn.on_request()
def enqueuetask(req: https_fn.Request) -> https_fn.Response:
"""
Enqueues a task to the queue `on_task_dispatched_function`.
"""
client = tasks_v2.CloudTasksClient()

# The URL of the `on_task_dispatched_function` function.
# Must be set to the URL of the deployed function.

url = req.json.get("url") if req.json else None

body = {"data": req.json}

task: tasks_v2.Task = tasks_v2.Task(
**{
"http_request": {
"http_method": tasks_v2.HttpMethod.POST,
"url": url,
"headers": {
"Content-type": "application/json"
},
"body": json.dumps(body).encode(),
},
"schedule_time":
datetime.datetime.utcnow() + datetime.timedelta(minutes=1),
})

parent = client.queue_path(
app.project_id,
SupportedRegion.US_CENTRAL1,
"ontaskdispatchedexample2",
)

client.create_task(request={"parent": parent, "task": task})
return https_fn.Response("Task enqueued.")
8 changes: 8 additions & 0 deletions samples/basic_tasks/functions/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Not published yet,
# firebase-functions-python >= 0.0.1
# so we use a relative path during development:
./../../../
# Or switch to git ref for deployment testing:
# git+https://github.com/firebase/firebase-functions-python.git@main#egg=firebase-functions

firebase-admin >= 6.0.1
119 changes: 119 additions & 0 deletions src/firebase_functions/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,62 @@ class SupportedRegion(str, _enum.Enum):
US_WEST1 = "us-west1"


@_dataclasses.dataclass(frozen=True)
class RateLimits():
"""
How congestion control should be applied to the function.
"""
max_concurrent_dispatches: int | Expression[
int] | _util.Sentinel | None = None
"""
The maximum number of requests that can be outstanding at a time.
If left unspecified, will default to 1000.
"""

max_dispatches_per_second: int | Expression[
int] | _util.Sentinel | None = None
"""
The maximum number of requests that can be invoked per second.
If left unspecified, will default to 500.
"""


@_dataclasses.dataclass(frozen=True)
class RetryConfig():
"""
How a task should be retried in the event of a non-2xx return.
"""

max_attempts: int | Expression[int] | _util.Sentinel | None = None
"""
The maximum number of times a request should be attempted.
If left unspecified, will default to 3.
"""

max_retry_seconds: int | Expression[int] | _util.Sentinel | None = None
"""
The maximum amount of time for retrying failed task.
If left unspecified will retry indefinitely.
"""

max_backoff_seconds: int | Expression[int] | _util.Sentinel | None = None
"""
The maximum amount of time to wait between attempts.
If left unspecified will default to 1hr.
"""

max_doublings: int | Expression[int] | _util.Sentinel | None = None
"""
The maximum number of times to double the backoff between
retries. If left unspecified will default to 16.
"""

min_backoff_seconds: int | Expression[int] | _util.Sentinel | None = None
"""
The minimum time to wait between attempts.
"""


@_dataclasses.dataclass(frozen=True, kw_only=True)
class RuntimeOptions:
"""
Expand Down Expand Up @@ -318,6 +374,69 @@ def convert_secret(
return endpoint


@_dataclasses.dataclass(frozen=True, kw_only=True)
class TaskQueueOptions(RuntimeOptions):
"""
Options specific to Tasks function types.
"""

retry_config: RetryConfig | None = None
"""
How a task should be retried in the event of a non-2xx return.
"""

rate_limits: RateLimits | None = None
"""
How congestion control should be applied to the function.
"""

invoker: str | list[str] | _typing.Literal["private"] | None = None
"""
Who can enqueue tasks for this function.

Note:
If left unspecified, only service accounts which have
`roles/cloudtasks.enqueuer` and `roles/cloudfunctions.invoker`
will have permissions.
"""

def _endpoint(
self,
**kwargs,
) -> _manifest.ManifestEndpoint:
rate_limits: _manifest.RateLimits | None = _manifest.RateLimits(
maxConcurrentDispatches=self.rate_limits.max_concurrent_dispatches,
maxDispatchesPerSecond=self.rate_limits.max_dispatches_per_second,
) if self.rate_limits is not None else None

retry_config: _manifest.RetryConfig | None = _manifest.RetryConfig(
maxAttempts=self.retry_config.max_attempts,
maxRetrySeconds=self.retry_config.max_retry_seconds,
maxBackoffSeconds=self.retry_config.max_backoff_seconds,
maxDoublings=self.retry_config.max_doublings,
minBackoffSeconds=self.retry_config.min_backoff_seconds,
) if self.retry_config is not None else None

kwargs_merged = {
**_dataclasses.asdict(super()._endpoint(**kwargs)),
"taskQueueTrigger":
_manifest.TaskQueueTrigger(
rateLimits=rate_limits,
retryConfig=retry_config,
),
}
return _manifest.ManifestEndpoint(
**_typing.cast(_typing.Dict, kwargs_merged))

def _required_apis(self) -> list[_manifest.ManifestRequiredApi]:
return [
_manifest.ManifestRequiredApi(
api="cloudtasks.googleapis.com",
reason="Needed for task queue functions",
)
]


@_dataclasses.dataclass(frozen=True, kw_only=True)
class PubSubOptions(RuntimeOptions):
"""
Expand Down
41 changes: 33 additions & 8 deletions src/firebase_functions/private/manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,38 @@ class EventTrigger(_typing.TypedDict):


class RetryConfig(_typing.TypedDict):
retryCount: _typing_extensions.NotRequired[int | _params.Expression[int]]
maxRetrySeconds: _typing_extensions.NotRequired[str |
_params.Expression[str]]
minBackoffSeconds: _typing_extensions.NotRequired[str |
_params.Expression[str]]
maxBackoffSeconds: _typing_extensions.NotRequired[str |
_params.Expression[str]]
maxDoublings: _typing_extensions.NotRequired[int | _params.Expression[int]]
"""
Retry configuration for a endpoint.
"""
maxAttempts: _typing_extensions.NotRequired[int | _params.Expression[int] |
_util.Sentinel | None]
maxRetrySeconds: _typing_extensions.NotRequired[int |
_params.Expression[int] |
_util.Sentinel | None]
maxBackoffSeconds: _typing_extensions.NotRequired[int |
_params.Expression[int] |
_util.Sentinel | None]
maxDoublings: _typing_extensions.NotRequired[int | _params.Expression[int] |
_util.Sentinel | None]
minBackoffSeconds: _typing_extensions.NotRequired[int |
_params.Expression[int] |
_util.Sentinel | None]


class RateLimits(_typing.TypedDict):
maxConcurrentDispatches: int | _params.Expression[
int] | _util.Sentinel | None

maxDispatchesPerSecond: int | _params.Expression[int] | _util.Sentinel | None


class TaskQueueTrigger(_typing.TypedDict):
"""
Trigger definitions for RPCs servers using the HTTP protocol defined at
https://firebase.google.com/docs/functions/callable-reference
"""
retryConfig: RetryConfig | None
rateLimits: RateLimits | None


class ScheduleTrigger(_typing.TypedDict):
Expand Down Expand Up @@ -116,6 +140,7 @@ class ManifestEndpoint:
eventTrigger: EventTrigger | None = None
scheduleTrigger: ScheduleTrigger | None = None
blockingTrigger: BlockingTrigger | None = None
taskQueueTrigger: TaskQueueTrigger | None = None


class ManifestRequiredApi(_typing.TypedDict):
Expand Down
Loading