Skip to content

Replace APScheduler to Celery asynchronous tasks #229

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ See a preview of some of the screenshots
- [x] Follows Restful API specification
- [x] Global SQLAlchemy 2.0 syntax
- [x] Casbin RBAC access control model
- [x] APScheduler online timed tasks
- [x] Celery asynchronous tasks
- [x] JWT middleware whitelist authentication
- [x] Global customizable time zone time
- [x] Docker / Docker-compose deployment
Expand Down Expand Up @@ -85,6 +85,7 @@ TODO:
### BackEnd

1. Install dependencies

```shell
pip install -r requirements.txt
```
Expand Down Expand Up @@ -115,9 +116,17 @@ TODO:
# Execute the migration
alembic upgrade head
```
7. Modify the configuration file as needed
8. Execute the `backend/app/main.py` file to start the service
9. Browser access: http://127.0.0.1:8000/api/v1/docs
7. Start celery worker and beat

```shell
celery -A tasks worker --loglevel=INFO
# Optional, if you don't need to use the scheduled task
celery -A tasks beat --loglevel=INFO
```

8. Modify the configuration file as needed
9. Execute the `backend/app/main.py` file to start the service
10. Browser access: http://127.0.0.1:8000/api/v1/docs

---

Expand All @@ -127,6 +136,11 @@ Click [fastapi_best_architecture_ui](https://github.com/fastapi-practices/fastap

### Docker deploy

> [!WARNING]
> Default port conflict:8000,3306,6379,5672
>
> As a best practice, shut down on-premises services before deployment:mysql,redis,rabbitmq...

1. Go to the directory where the ``docker-compose.yml`` file is located and create the environment variable
file ``.env``

Expand All @@ -143,7 +157,7 @@ Click [fastapi_best_architecture_ui](https://github.com/fastapi-practices/fastap
3. Execute the one-click boot command

```shell
docker-compose up -d -build
docker-compose up -d --build
```

4. Wait for the command to complete automatically
Expand Down
24 changes: 19 additions & 5 deletions README.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mvc 架构作为常规设计模式,在 python web 中也很常见,但是三
- [x] 遵循 Restful API 规范
- [x] 全局 SQLAlchemy 2.0 语法
- [x] Casbin RBAC 访问控制模型
- [x] APScheduler 在线定时任务
- [x] Celery 异步任务
- [x] JWT 中间件白名单认证
- [x] 全局自定义时区时间
- [x] Docker / Docker-compose 部署
Expand Down Expand Up @@ -79,6 +79,7 @@ TODO:
### 后端

1. 安装依赖项

```shell
pip install -r requirements.txt
```
Expand Down Expand Up @@ -110,9 +111,17 @@ TODO:
alembic upgrade head
```

7. 按需修改配置文件
8. 执行 `backend/app/main.py` 文件启动服务
9. 浏览器访问:http://127.0.0.1:8000/api/v1/docs
7. 启动 celery worker 和 beat

```shell
celery -A tasks worker --loglevel=INFO
# 可选,如果您不需要使用计划任务
celery -A tasks beat --loglevel=INFO
```

8. 按需修改配置文件
9. 执行 `backend/app/main.py` 文件启动服务
10. 浏览器访问:http://127.0.0.1:8000/api/v1/docs

---

Expand All @@ -122,6 +131,11 @@ TODO:

### Docker 部署

> [!WARNING]
> 默认端口冲突:8000,3306,6379,5672
>
> 最佳做法是在部署之前关闭本地服务:mysql,redis,rabbitmq...

1. 进入 `docker-compose.yml` 文件所在目录,创建环境变量文件`.env`

```shell
Expand All @@ -137,7 +151,7 @@ TODO:
3. 执行一键启动命令

```shell
docker-compose up -d -build
docker-compose up -d --build
```

4. 等待命令自动完成
Expand Down
10 changes: 5 additions & 5 deletions Dockerfile → backend.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,23 @@ WORKDIR /fba

COPY . .

RUN sed -i s@/deb.debian.org/@/mirrors.aliyun.com/@g /etc/apt/sources.list \
&& sed -i s@/security.debian.org/@/mirrors.aliyun.com/@g /etc/apt/sources.list
RUN sed -i 's/deb.debian.org/mirrors.ustc.edu.cn/g' /etc/apt/sources.list.d/debian.sources \
&& sed -i 's|security.debian.org/debian-security|mirrors.ustc.edu.cn/debian-security|g' /etc/apt/sources.list.d/debian.sources

RUN apt-get update \
&& apt-get install -y --no-install-recommends gcc python3-dev \
&& rm -rf /var/lib/apt/lists/*

# 某些包可能存在同步不及时导致安装失败的情况,可选择备用源
# 清华源(国内快,也可能同步不及时):https://pypi.tuna.tsinghua.edu.cn/simple
# 官方源(国外慢,但永远都是最新的):https://pypi.org/simple
# 某些包可能存在同步不及时导致安装失败的情况,可更改为官方源:https://pypi.org/simple
RUN pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple \
&& pip install --no-cache-dir -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple

ENV TZ = Asia/Shanghai

RUN mkdir -p /var/log/fastapi_server

COPY ./deploy/fastapi_server.conf /etc/supervisor/conf.d/

EXPOSE 8001

CMD ["uvicorn", "backend.app.main:app", "--host", "127.0.0.1", "--port", "8000"]
16 changes: 11 additions & 5 deletions backend/app/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@ REDIS_HOST='127.0.0.1'
REDIS_PORT=6379
REDIS_PASSWORD=''
REDIS_DATABASE=0
# APScheduler
APS_REDIS_HOST='127.0.0.1'
APS_REDIS_PORT=6379
APS_REDIS_PASSWORD=''
APS_REDIS_DATABASE=1
# Celery
CELERY_REDIS_HOST='127.0.0.1'
CELERY_REDIS_PORT=6379
CELERY_REDIS_PASSWORD=''
CELERY_BROKER_REDIS_DATABASE=1
CELERY_BACKEND_REDIS_DATABASE=2
# Rabbitmq
RABBITMQ_HOST='127.0.0.1'
RABBITMQ_PORT=5672
RABBITMQ_USERNAME='guest'
RABBITMQ_PASSWORD='guest'
# Token
TOKEN_SECRET_KEY='1VkVF75nsNABBjK_7-qz7GtzNy3AMvktc9TCPwKczCk'
# Opera Log
Expand Down
45 changes: 0 additions & 45 deletions backend/app/api/v1/mixed/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,10 @@

from backend.app.common.rbac import DependsRBAC
from backend.app.common.response.response_schema import response_base
from backend.app.core.conf import settings

router = APIRouter()


@router.get('/configs', summary='获取系统配置', dependencies=[DependsRBAC])
async def get_sys_config():
return await response_base.success(
data={
'title': settings.TITLE,
'version': settings.VERSION,
'description': settings.DESCRIPTION,
'docs_url': settings.DOCS_URL,
'redocs_url': settings.REDOCS_URL,
'openapi_url': settings.OPENAPI_URL,
'environment': settings.ENVIRONMENT,
'static_files': settings.STATIC_FILES,
'uvicorn_host': settings.UVICORN_HOST,
'uvicorn_port': settings.UVICORN_PORT,
'uvicorn_reload': settings.UVICORN_RELOAD,
'db_host': settings.DB_HOST,
'db_port': settings.DB_PORT,
'db_user': settings.DB_USER,
'db_database': settings.DB_DATABASE,
'db_charset': settings.DB_CHARSET,
'redis_host': settings.REDIS_HOST,
'redis_port': settings.REDIS_PORT,
'redis_database': settings.REDIS_DATABASE,
'redis_timeout': settings.REDIS_TIMEOUT,
'aps_redis_host': settings.APS_REDIS_HOST,
'aps_redis_port': settings.APS_REDIS_PORT,
'aps_redis_database': settings.APS_REDIS_DATABASE,
'aps_redis_timeout': settings.APS_REDIS_TIMEOUT,
'aps_coalesce': settings.APS_COALESCE,
'aps_max_instances': settings.APS_MAX_INSTANCES,
'aps_misfire_grace_time': settings.APS_MISFIRE_GRACE_TIME,
'token_algorithm': settings.TOKEN_ALGORITHM,
'token_expire_seconds': settings.TOKEN_EXPIRE_SECONDS,
'token_swagger_url': settings.TOKEN_URL_SWAGGER,
'access_log_filename': settings.LOG_STDOUT_FILENAME,
'error_log_filename': settings.LOG_STDERR_FILENAME,
'middleware_cors': settings.MIDDLEWARE_CORS,
'middleware_gzip': settings.MIDDLEWARE_GZIP,
'middleware_access': settings.MIDDLEWARE_ACCESS,
}
)


@router.get('/routers', summary='获取所有路由', dependencies=[DependsRBAC])
async def get_all_route(request: Request):
data = []
Expand All @@ -64,7 +20,6 @@ async def get_all_route(request: Request):
'name': route.name,
'summary': route.summary,
'methods': route.methods,
'dependencies': route.dependencies,
}
)
return await response_base.success(data={'route_list': data})
37 changes: 5 additions & 32 deletions backend/app/api/v1/mixed/tests.py
Original file line number Diff line number Diff line change
@@ -1,43 +1,16 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime

from fastapi import APIRouter, File, UploadFile, Form

from backend.app.common.response.response_schema import response_base
from backend.app.common.task import scheduler
from backend.app.tasks import task_demo_async

router = APIRouter(prefix='/tests')


def task_demo():
print('普通任务')


async def task_demo_async():
print('异步任务')


@router.post('/sync', summary='测试添加同步任务')
async def task_demo_add():
scheduler.add_job(
task_demo, 'interval', seconds=1, id='task_demo', replace_existing=True, start_date=datetime.datetime.now()
)

return await response_base.success()


@router.post('/async', summary='测试添加异步任务')
async def task_demo_add_async():
scheduler.add_job(
task_demo_async,
'interval',
seconds=1,
id='task_demo_async',
replace_existing=True,
start_date=datetime.datetime.now(),
)
return await response_base.success()
@router.post('/send', summary='测试异步任务')
async def task_send():
result = task_demo_async.delay()
return {'msg': 'Success', 'data': result.id}


@router.post('/files', summary='测试文件上传')
Expand Down
61 changes: 26 additions & 35 deletions backend/app/api/v1/task.py
Original file line number Diff line number Diff line change
@@ -1,46 +1,37 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from fastapi import APIRouter
from typing import Annotated

from fastapi import APIRouter, Path, Body

from backend.app.common.rbac import DependsRBAC
from backend.app.common.jwt import DependsJwtAuth
from backend.app.common.rbac import DependsRBAC
from backend.app.common.response.response_code import CustomResponseCode
from backend.app.common.response.response_schema import response_base
from backend.app.services.task_service import TaskService

router = APIRouter()


@router.get('', summary='获取任务列表', dependencies=[DependsJwtAuth])
@router.get('', summary='获取所有可执行任务模块', dependencies=[DependsJwtAuth])
async def get_all_tasks():
tasks_list = await TaskService.get_task_list()
return await response_base.success(data=tasks_list)


@router.get('/{pk}', summary='获取任务详情', dependencies=[DependsJwtAuth])
async def get_task(pk: str):
task = await TaskService.get_task(pk=pk)
return await response_base.success(data=task)


@router.post('/{pk}/run', summary='执行任务', dependencies=[DependsRBAC])
async def run_task(pk: str):
task = await TaskService().run(pk=pk)
return await response_base.success(data=task)


@router.post('/{pk}/pause', summary='暂停任务', dependencies=[DependsRBAC])
async def pause_task(pk: str):
task = await TaskService().pause(pk=pk)
return await response_base.success(data=task)


@router.post('/{pk}/resume', summary='恢复任务', dependencies=[DependsRBAC])
async def resume_task(pk: str):
task = await TaskService().resume(pk=pk)
return await response_base.success(data=task)


@router.post('/{pk}/stop', summary='删除任务', dependencies=[DependsRBAC])
async def delete_task(pk: str):
task = await TaskService().delete(pk=pk)
return await response_base.success(data=task)
tasks = TaskService.gets()
return await response_base.success(data=tasks)


@router.get('/{pk}', summary='获取任务结果', dependencies=[DependsJwtAuth])
async def get_task_result(pk: str = Path(description='任务ID')):
task = TaskService.get(pk)
if not task:
return await response_base.fail(res=CustomResponseCode.HTTP_204, data=pk)
return await response_base.success(data=task.result)


@router.post('/{module}', summary='执行任务', dependencies=[DependsRBAC])
async def run_task(
module: Annotated[str, Path(description='任务模块')],
args: Annotated[list | None, Body()] = None,
kwargs: Annotated[dict | None, Body()] = None,
):
task = TaskService.run(module=module, args=args, kwargs=kwargs)
return await response_base.success(data=task.result)
3 changes: 3 additions & 0 deletions backend/app/celery-start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash

celery -A tasks worker --loglevel=INFO -B
Loading