Skip to content

Commit 275f9e5

Browse files
authored
Replace APScheduler to Celery asynchronous tasks (#229)
* Replace APScheduler to Celery task * black format * Add celery to run the script * Update celery usage to README * Update test task * Add celery rabbitmq broker * Fix dockerfiles * Add task interface access authorization * Update celery deploy run * Fix dockerfiles * Fix supervisor conf * Update celery broker default is redis * Force the pro env to use rabbitmq * Update the task interface * Add celery beat README description * Update warning text style * Revoke the default config comment content of the supervisor
1 parent 81c0d2c commit 275f9e5

22 files changed

+334
-320
lines changed

README.md

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ See a preview of some of the screenshots
4343
- [x] Follows Restful API specification
4444
- [x] Global SQLAlchemy 2.0 syntax
4545
- [x] Casbin RBAC access control model
46-
- [x] APScheduler online timed tasks
46+
- [x] Celery asynchronous tasks
4747
- [x] JWT middleware whitelist authentication
4848
- [x] Global customizable time zone time
4949
- [x] Docker / Docker-compose deployment
@@ -85,6 +85,7 @@ TODO:
8585
### BackEnd
8686

8787
1. Install dependencies
88+
8889
```shell
8990
pip install -r requirements.txt
9091
```
@@ -115,9 +116,17 @@ TODO:
115116
# Execute the migration
116117
alembic upgrade head
117118
```
118-
7. Modify the configuration file as needed
119-
8. Execute the `backend/app/main.py` file to start the service
120-
9. Browser access: http://127.0.0.1:8000/api/v1/docs
119+
7. Start celery worker and beat
120+
121+
```shell
122+
celery -A tasks worker --loglevel=INFO
123+
# Optional, if you don't need to use the scheduled task
124+
celery -A tasks beat --loglevel=INFO
125+
```
126+
127+
8. Modify the configuration file as needed
128+
9. Execute the `backend/app/main.py` file to start the service
129+
10. Browser access: http://127.0.0.1:8000/api/v1/docs
121130

122131
---
123132

@@ -127,6 +136,11 @@ Click [fastapi_best_architecture_ui](https://github.com/fastapi-practices/fastap
127136

128137
### Docker deploy
129138

139+
> [!WARNING]
140+
> Default port conflict:8000,3306,6379,5672
141+
>
142+
> As a best practice, shut down on-premises services before deployment:mysql,redis,rabbitmq...
143+
130144
1. Go to the directory where the ``docker-compose.yml`` file is located and create the environment variable
131145
file ``.env``
132146

@@ -143,7 +157,7 @@ Click [fastapi_best_architecture_ui](https://github.com/fastapi-practices/fastap
143157
3. Execute the one-click boot command
144158

145159
```shell
146-
docker-compose up -d -build
160+
docker-compose up -d --build
147161
```
148162

149163
4. Wait for the command to complete automatically

README.zh-CN.md

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ mvc 架构作为常规设计模式,在 python web 中也很常见,但是三
3232
- [x] 遵循 Restful API 规范
3333
- [x] 全局 SQLAlchemy 2.0 语法
3434
- [x] Casbin RBAC 访问控制模型
35-
- [x] APScheduler 在线定时任务
35+
- [x] Celery 异步任务
3636
- [x] JWT 中间件白名单认证
3737
- [x] 全局自定义时区时间
3838
- [x] Docker / Docker-compose 部署
@@ -79,6 +79,7 @@ TODO:
7979
### 后端
8080

8181
1. 安装依赖项
82+
8283
```shell
8384
pip install -r requirements.txt
8485
```
@@ -110,9 +111,17 @@ TODO:
110111
alembic upgrade head
111112
```
112113

113-
7. 按需修改配置文件
114-
8. 执行 `backend/app/main.py` 文件启动服务
115-
9. 浏览器访问:http://127.0.0.1:8000/api/v1/docs
114+
7. 启动 celery worker 和 beat
115+
116+
```shell
117+
celery -A tasks worker --loglevel=INFO
118+
# 可选,如果您不需要使用计划任务
119+
celery -A tasks beat --loglevel=INFO
120+
```
121+
122+
8. 按需修改配置文件
123+
9. 执行 `backend/app/main.py` 文件启动服务
124+
10. 浏览器访问:http://127.0.0.1:8000/api/v1/docs
116125

117126
---
118127

@@ -122,6 +131,11 @@ TODO:
122131

123132
### Docker 部署
124133

134+
> [!WARNING]
135+
> 默认端口冲突:8000,3306,6379,5672
136+
>
137+
> 最佳做法是在部署之前关闭本地服务:mysql,redis,rabbitmq...
138+
125139
1. 进入 `docker-compose.yml` 文件所在目录,创建环境变量文件`.env`
126140

127141
```shell
@@ -137,7 +151,7 @@ TODO:
137151
3. 执行一键启动命令
138152

139153
```shell
140-
docker-compose up -d -build
154+
docker-compose up -d --build
141155
```
142156

143157
4. 等待命令自动完成

Dockerfile renamed to backend.dockerfile

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,23 @@ WORKDIR /fba
44

55
COPY . .
66

7-
RUN sed -i s@/deb.debian.org/@/mirrors.aliyun.com/@g /etc/apt/sources.list \
8-
&& sed -i s@/security.debian.org/@/mirrors.aliyun.com/@g /etc/apt/sources.list
7+
RUN sed -i 's/deb.debian.org/mirrors.ustc.edu.cn/g' /etc/apt/sources.list.d/debian.sources \
8+
&& sed -i 's|security.debian.org/debian-security|mirrors.ustc.edu.cn/debian-security|g' /etc/apt/sources.list.d/debian.sources
99

1010
RUN apt-get update \
1111
&& apt-get install -y --no-install-recommends gcc python3-dev \
1212
&& rm -rf /var/lib/apt/lists/*
1313

14-
# 某些包可能存在同步不及时导致安装失败的情况,可选择备用源
15-
# 清华源(国内快,也可能同步不及时):https://pypi.tuna.tsinghua.edu.cn/simple
16-
# 官方源(国外慢,但永远都是最新的):https://pypi.org/simple
14+
# 某些包可能存在同步不及时导致安装失败的情况,可更改为官方源:https://pypi.org/simple
1715
RUN pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple \
1816
&& pip install --no-cache-dir -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple
1917

2018
ENV TZ = Asia/Shanghai
2119

2220
RUN mkdir -p /var/log/fastapi_server
2321

22+
COPY ./deploy/fastapi_server.conf /etc/supervisor/conf.d/
23+
2424
EXPOSE 8001
2525

2626
CMD ["uvicorn", "backend.app.main:app", "--host", "127.0.0.1", "--port", "8000"]

backend/app/.env.example

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,17 @@ REDIS_HOST='127.0.0.1'
1010
REDIS_PORT=6379
1111
REDIS_PASSWORD=''
1212
REDIS_DATABASE=0
13-
# APScheduler
14-
APS_REDIS_HOST='127.0.0.1'
15-
APS_REDIS_PORT=6379
16-
APS_REDIS_PASSWORD=''
17-
APS_REDIS_DATABASE=1
13+
# Celery
14+
CELERY_REDIS_HOST='127.0.0.1'
15+
CELERY_REDIS_PORT=6379
16+
CELERY_REDIS_PASSWORD=''
17+
CELERY_BROKER_REDIS_DATABASE=1
18+
CELERY_BACKEND_REDIS_DATABASE=2
19+
# Rabbitmq
20+
RABBITMQ_HOST='127.0.0.1'
21+
RABBITMQ_PORT=5672
22+
RABBITMQ_USERNAME='guest'
23+
RABBITMQ_PASSWORD='guest'
1824
# Token
1925
TOKEN_SECRET_KEY='1VkVF75nsNABBjK_7-qz7GtzNy3AMvktc9TCPwKczCk'
2026
# Opera Log

backend/app/api/v1/mixed/config.py

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -5,54 +5,10 @@
55

66
from backend.app.common.rbac import DependsRBAC
77
from backend.app.common.response.response_schema import response_base
8-
from backend.app.core.conf import settings
98

109
router = APIRouter()
1110

1211

13-
@router.get('/configs', summary='获取系统配置', dependencies=[DependsRBAC])
14-
async def get_sys_config():
15-
return await response_base.success(
16-
data={
17-
'title': settings.TITLE,
18-
'version': settings.VERSION,
19-
'description': settings.DESCRIPTION,
20-
'docs_url': settings.DOCS_URL,
21-
'redocs_url': settings.REDOCS_URL,
22-
'openapi_url': settings.OPENAPI_URL,
23-
'environment': settings.ENVIRONMENT,
24-
'static_files': settings.STATIC_FILES,
25-
'uvicorn_host': settings.UVICORN_HOST,
26-
'uvicorn_port': settings.UVICORN_PORT,
27-
'uvicorn_reload': settings.UVICORN_RELOAD,
28-
'db_host': settings.DB_HOST,
29-
'db_port': settings.DB_PORT,
30-
'db_user': settings.DB_USER,
31-
'db_database': settings.DB_DATABASE,
32-
'db_charset': settings.DB_CHARSET,
33-
'redis_host': settings.REDIS_HOST,
34-
'redis_port': settings.REDIS_PORT,
35-
'redis_database': settings.REDIS_DATABASE,
36-
'redis_timeout': settings.REDIS_TIMEOUT,
37-
'aps_redis_host': settings.APS_REDIS_HOST,
38-
'aps_redis_port': settings.APS_REDIS_PORT,
39-
'aps_redis_database': settings.APS_REDIS_DATABASE,
40-
'aps_redis_timeout': settings.APS_REDIS_TIMEOUT,
41-
'aps_coalesce': settings.APS_COALESCE,
42-
'aps_max_instances': settings.APS_MAX_INSTANCES,
43-
'aps_misfire_grace_time': settings.APS_MISFIRE_GRACE_TIME,
44-
'token_algorithm': settings.TOKEN_ALGORITHM,
45-
'token_expire_seconds': settings.TOKEN_EXPIRE_SECONDS,
46-
'token_swagger_url': settings.TOKEN_URL_SWAGGER,
47-
'access_log_filename': settings.LOG_STDOUT_FILENAME,
48-
'error_log_filename': settings.LOG_STDERR_FILENAME,
49-
'middleware_cors': settings.MIDDLEWARE_CORS,
50-
'middleware_gzip': settings.MIDDLEWARE_GZIP,
51-
'middleware_access': settings.MIDDLEWARE_ACCESS,
52-
}
53-
)
54-
55-
5612
@router.get('/routers', summary='获取所有路由', dependencies=[DependsRBAC])
5713
async def get_all_route(request: Request):
5814
data = []
@@ -64,7 +20,6 @@ async def get_all_route(request: Request):
6420
'name': route.name,
6521
'summary': route.summary,
6622
'methods': route.methods,
67-
'dependencies': route.dependencies,
6823
}
6924
)
7025
return await response_base.success(data={'route_list': data})

backend/app/api/v1/mixed/tests.py

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,16 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
3-
import datetime
4-
53
from fastapi import APIRouter, File, UploadFile, Form
64

7-
from backend.app.common.response.response_schema import response_base
8-
from backend.app.common.task import scheduler
5+
from backend.app.tasks import task_demo_async
96

107
router = APIRouter(prefix='/tests')
118

129

13-
def task_demo():
14-
print('普通任务')
15-
16-
17-
async def task_demo_async():
18-
print('异步任务')
19-
20-
21-
@router.post('/sync', summary='测试添加同步任务')
22-
async def task_demo_add():
23-
scheduler.add_job(
24-
task_demo, 'interval', seconds=1, id='task_demo', replace_existing=True, start_date=datetime.datetime.now()
25-
)
26-
27-
return await response_base.success()
28-
29-
30-
@router.post('/async', summary='测试添加异步任务')
31-
async def task_demo_add_async():
32-
scheduler.add_job(
33-
task_demo_async,
34-
'interval',
35-
seconds=1,
36-
id='task_demo_async',
37-
replace_existing=True,
38-
start_date=datetime.datetime.now(),
39-
)
40-
return await response_base.success()
10+
@router.post('/send', summary='测试异步任务')
11+
async def task_send():
12+
result = task_demo_async.delay()
13+
return {'msg': 'Success', 'data': result.id}
4114

4215

4316
@router.post('/files', summary='测试文件上传')

backend/app/api/v1/task.py

Lines changed: 26 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,37 @@
11
#!/usr/bin/env python3
22
# -*- coding: utf-8 -*-
3-
from fastapi import APIRouter
3+
from typing import Annotated
4+
5+
from fastapi import APIRouter, Path, Body
46

5-
from backend.app.common.rbac import DependsRBAC
67
from backend.app.common.jwt import DependsJwtAuth
8+
from backend.app.common.rbac import DependsRBAC
9+
from backend.app.common.response.response_code import CustomResponseCode
710
from backend.app.common.response.response_schema import response_base
811
from backend.app.services.task_service import TaskService
912

1013
router = APIRouter()
1114

1215

13-
@router.get('', summary='获取任务列表', dependencies=[DependsJwtAuth])
16+
@router.get('', summary='获取所有可执行任务模块', dependencies=[DependsJwtAuth])
1417
async def get_all_tasks():
15-
tasks_list = await TaskService.get_task_list()
16-
return await response_base.success(data=tasks_list)
17-
18-
19-
@router.get('/{pk}', summary='获取任务详情', dependencies=[DependsJwtAuth])
20-
async def get_task(pk: str):
21-
task = await TaskService.get_task(pk=pk)
22-
return await response_base.success(data=task)
23-
24-
25-
@router.post('/{pk}/run', summary='执行任务', dependencies=[DependsRBAC])
26-
async def run_task(pk: str):
27-
task = await TaskService().run(pk=pk)
28-
return await response_base.success(data=task)
29-
30-
31-
@router.post('/{pk}/pause', summary='暂停任务', dependencies=[DependsRBAC])
32-
async def pause_task(pk: str):
33-
task = await TaskService().pause(pk=pk)
34-
return await response_base.success(data=task)
35-
36-
37-
@router.post('/{pk}/resume', summary='恢复任务', dependencies=[DependsRBAC])
38-
async def resume_task(pk: str):
39-
task = await TaskService().resume(pk=pk)
40-
return await response_base.success(data=task)
41-
42-
43-
@router.post('/{pk}/stop', summary='删除任务', dependencies=[DependsRBAC])
44-
async def delete_task(pk: str):
45-
task = await TaskService().delete(pk=pk)
46-
return await response_base.success(data=task)
18+
tasks = TaskService.gets()
19+
return await response_base.success(data=tasks)
20+
21+
22+
@router.get('/{pk}', summary='获取任务结果', dependencies=[DependsJwtAuth])
23+
async def get_task_result(pk: str = Path(description='任务ID')):
24+
task = TaskService.get(pk)
25+
if not task:
26+
return await response_base.fail(res=CustomResponseCode.HTTP_204, data=pk)
27+
return await response_base.success(data=task.result)
28+
29+
30+
@router.post('/{module}', summary='执行任务', dependencies=[DependsRBAC])
31+
async def run_task(
32+
module: Annotated[str, Path(description='任务模块')],
33+
args: Annotated[list | None, Body()] = None,
34+
kwargs: Annotated[dict | None, Body()] = None,
35+
):
36+
task = TaskService.run(module=module, args=args, kwargs=kwargs)
37+
return await response_base.success(data=task.result)

backend/app/celery-start.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#!/usr/bin/env bash
2+
3+
celery -A tasks worker --loglevel=INFO -B

0 commit comments

Comments
 (0)