diff --git a/README.md b/README.md index 3498645d..cca73d19 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ See a preview of some of the screenshots - [x] Follows Restful API specification - [x] Global SQLAlchemy 2.0 syntax - [x] Casbin RBAC access control model -- [x] APScheduler online timed tasks +- [x] Celery asynchronous tasks - [x] JWT middleware whitelist authentication - [x] Global customizable time zone time - [x] Docker / Docker-compose deployment @@ -85,6 +85,7 @@ TODO: ### BackEnd 1. Install dependencies + ```shell pip install -r requirements.txt ``` @@ -115,9 +116,17 @@ TODO: # Execute the migration alembic upgrade head ``` -7. Modify the configuration file as needed -8. Execute the `backend/app/main.py` file to start the service -9. Browser access: http://127.0.0.1:8000/api/v1/docs +7. Start celery worker and beat + + ```shell + celery -A tasks worker --loglevel=INFO + # Optional, if you don't need to use the scheduled task + celery -A tasks beat --loglevel=INFO + ``` + +8. Modify the configuration file as needed +9. Execute the `backend/app/main.py` file to start the service +10. Browser access: http://127.0.0.1:8000/api/v1/docs --- @@ -127,6 +136,11 @@ Click [fastapi_best_architecture_ui](https://github.com/fastapi-practices/fastap ### Docker deploy +> [!WARNING] +> Default port conflict:8000,3306,6379,5672 +> +> As a best practice, shut down on-premises services before deployment:mysql,redis,rabbitmq... + 1. Go to the directory where the ``docker-compose.yml`` file is located and create the environment variable file ``.env`` @@ -143,7 +157,7 @@ Click [fastapi_best_architecture_ui](https://github.com/fastapi-practices/fastap 3. Execute the one-click boot command ```shell - docker-compose up -d -build + docker-compose up -d --build ``` 4. Wait for the command to complete automatically diff --git a/README.zh-CN.md b/README.zh-CN.md index de53db58..f250b272 100644 --- a/README.zh-CN.md +++ b/README.zh-CN.md @@ -32,7 +32,7 @@ mvc 架构作为常规设计模式,在 python web 中也很常见,但是三 - [x] 遵循 Restful API 规范 - [x] 全局 SQLAlchemy 2.0 语法 - [x] Casbin RBAC 访问控制模型 -- [x] APScheduler 在线定时任务 +- [x] Celery 异步任务 - [x] JWT 中间件白名单认证 - [x] 全局自定义时区时间 - [x] Docker / Docker-compose 部署 @@ -79,6 +79,7 @@ TODO: ### 后端 1. 安装依赖项 + ```shell pip install -r requirements.txt ``` @@ -110,9 +111,17 @@ TODO: alembic upgrade head ``` -7. 按需修改配置文件 -8. 执行 `backend/app/main.py` 文件启动服务 -9. 浏览器访问:http://127.0.0.1:8000/api/v1/docs +7. 启动 celery worker 和 beat + + ```shell + celery -A tasks worker --loglevel=INFO + # 可选,如果您不需要使用计划任务 + celery -A tasks beat --loglevel=INFO + ``` + +8. 按需修改配置文件 +9. 执行 `backend/app/main.py` 文件启动服务 +10. 浏览器访问:http://127.0.0.1:8000/api/v1/docs --- @@ -122,6 +131,11 @@ TODO: ### Docker 部署 +> [!WARNING] +> 默认端口冲突:8000,3306,6379,5672 +> +> 最佳做法是在部署之前关闭本地服务:mysql,redis,rabbitmq... + 1. 进入 `docker-compose.yml` 文件所在目录,创建环境变量文件`.env` ```shell @@ -137,7 +151,7 @@ TODO: 3. 执行一键启动命令 ```shell - docker-compose up -d -build + docker-compose up -d --build ``` 4. 等待命令自动完成 diff --git a/Dockerfile b/backend.dockerfile similarity index 61% rename from Dockerfile rename to backend.dockerfile index efe01972..c5777def 100644 --- a/Dockerfile +++ b/backend.dockerfile @@ -4,16 +4,14 @@ WORKDIR /fba COPY . . -RUN sed -i s@/deb.debian.org/@/mirrors.aliyun.com/@g /etc/apt/sources.list \ - && sed -i s@/security.debian.org/@/mirrors.aliyun.com/@g /etc/apt/sources.list +RUN sed -i 's/deb.debian.org/mirrors.ustc.edu.cn/g' /etc/apt/sources.list.d/debian.sources \ + && sed -i 's|security.debian.org/debian-security|mirrors.ustc.edu.cn/debian-security|g' /etc/apt/sources.list.d/debian.sources RUN apt-get update \ && apt-get install -y --no-install-recommends gcc python3-dev \ && rm -rf /var/lib/apt/lists/* -# 某些包可能存在同步不及时导致安装失败的情况,可选择备用源 -# 清华源(国内快,也可能同步不及时):https://pypi.tuna.tsinghua.edu.cn/simple -# 官方源(国外慢,但永远都是最新的):https://pypi.org/simple +# 某些包可能存在同步不及时导致安装失败的情况,可更改为官方源:https://pypi.org/simple RUN pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple \ && pip install --no-cache-dir -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple @@ -21,6 +19,8 @@ ENV TZ = Asia/Shanghai RUN mkdir -p /var/log/fastapi_server +COPY ./deploy/fastapi_server.conf /etc/supervisor/conf.d/ + EXPOSE 8001 CMD ["uvicorn", "backend.app.main:app", "--host", "127.0.0.1", "--port", "8000"] diff --git a/backend/app/.env.example b/backend/app/.env.example index 93bfd659..1863b8c2 100644 --- a/backend/app/.env.example +++ b/backend/app/.env.example @@ -10,11 +10,17 @@ REDIS_HOST='127.0.0.1' REDIS_PORT=6379 REDIS_PASSWORD='' REDIS_DATABASE=0 -# APScheduler -APS_REDIS_HOST='127.0.0.1' -APS_REDIS_PORT=6379 -APS_REDIS_PASSWORD='' -APS_REDIS_DATABASE=1 +# Celery +CELERY_REDIS_HOST='127.0.0.1' +CELERY_REDIS_PORT=6379 +CELERY_REDIS_PASSWORD='' +CELERY_BROKER_REDIS_DATABASE=1 +CELERY_BACKEND_REDIS_DATABASE=2 +# Rabbitmq +RABBITMQ_HOST='127.0.0.1' +RABBITMQ_PORT=5672 +RABBITMQ_USERNAME='guest' +RABBITMQ_PASSWORD='guest' # Token TOKEN_SECRET_KEY='1VkVF75nsNABBjK_7-qz7GtzNy3AMvktc9TCPwKczCk' # Opera Log diff --git a/backend/app/api/v1/mixed/config.py b/backend/app/api/v1/mixed/config.py index 4b9c6f2c..bdc7dec1 100644 --- a/backend/app/api/v1/mixed/config.py +++ b/backend/app/api/v1/mixed/config.py @@ -5,54 +5,10 @@ from backend.app.common.rbac import DependsRBAC from backend.app.common.response.response_schema import response_base -from backend.app.core.conf import settings router = APIRouter() -@router.get('/configs', summary='获取系统配置', dependencies=[DependsRBAC]) -async def get_sys_config(): - return await response_base.success( - data={ - 'title': settings.TITLE, - 'version': settings.VERSION, - 'description': settings.DESCRIPTION, - 'docs_url': settings.DOCS_URL, - 'redocs_url': settings.REDOCS_URL, - 'openapi_url': settings.OPENAPI_URL, - 'environment': settings.ENVIRONMENT, - 'static_files': settings.STATIC_FILES, - 'uvicorn_host': settings.UVICORN_HOST, - 'uvicorn_port': settings.UVICORN_PORT, - 'uvicorn_reload': settings.UVICORN_RELOAD, - 'db_host': settings.DB_HOST, - 'db_port': settings.DB_PORT, - 'db_user': settings.DB_USER, - 'db_database': settings.DB_DATABASE, - 'db_charset': settings.DB_CHARSET, - 'redis_host': settings.REDIS_HOST, - 'redis_port': settings.REDIS_PORT, - 'redis_database': settings.REDIS_DATABASE, - 'redis_timeout': settings.REDIS_TIMEOUT, - 'aps_redis_host': settings.APS_REDIS_HOST, - 'aps_redis_port': settings.APS_REDIS_PORT, - 'aps_redis_database': settings.APS_REDIS_DATABASE, - 'aps_redis_timeout': settings.APS_REDIS_TIMEOUT, - 'aps_coalesce': settings.APS_COALESCE, - 'aps_max_instances': settings.APS_MAX_INSTANCES, - 'aps_misfire_grace_time': settings.APS_MISFIRE_GRACE_TIME, - 'token_algorithm': settings.TOKEN_ALGORITHM, - 'token_expire_seconds': settings.TOKEN_EXPIRE_SECONDS, - 'token_swagger_url': settings.TOKEN_URL_SWAGGER, - 'access_log_filename': settings.LOG_STDOUT_FILENAME, - 'error_log_filename': settings.LOG_STDERR_FILENAME, - 'middleware_cors': settings.MIDDLEWARE_CORS, - 'middleware_gzip': settings.MIDDLEWARE_GZIP, - 'middleware_access': settings.MIDDLEWARE_ACCESS, - } - ) - - @router.get('/routers', summary='获取所有路由', dependencies=[DependsRBAC]) async def get_all_route(request: Request): data = [] @@ -64,7 +20,6 @@ async def get_all_route(request: Request): 'name': route.name, 'summary': route.summary, 'methods': route.methods, - 'dependencies': route.dependencies, } ) return await response_base.success(data={'route_list': data}) diff --git a/backend/app/api/v1/mixed/tests.py b/backend/app/api/v1/mixed/tests.py index a228e1a4..9dc6049f 100644 --- a/backend/app/api/v1/mixed/tests.py +++ b/backend/app/api/v1/mixed/tests.py @@ -1,43 +1,16 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -import datetime - from fastapi import APIRouter, File, UploadFile, Form -from backend.app.common.response.response_schema import response_base -from backend.app.common.task import scheduler +from backend.app.tasks import task_demo_async router = APIRouter(prefix='/tests') -def task_demo(): - print('普通任务') - - -async def task_demo_async(): - print('异步任务') - - -@router.post('/sync', summary='测试添加同步任务') -async def task_demo_add(): - scheduler.add_job( - task_demo, 'interval', seconds=1, id='task_demo', replace_existing=True, start_date=datetime.datetime.now() - ) - - return await response_base.success() - - -@router.post('/async', summary='测试添加异步任务') -async def task_demo_add_async(): - scheduler.add_job( - task_demo_async, - 'interval', - seconds=1, - id='task_demo_async', - replace_existing=True, - start_date=datetime.datetime.now(), - ) - return await response_base.success() +@router.post('/send', summary='测试异步任务') +async def task_send(): + result = task_demo_async.delay() + return {'msg': 'Success', 'data': result.id} @router.post('/files', summary='测试文件上传') diff --git a/backend/app/api/v1/task.py b/backend/app/api/v1/task.py index 1a78c52d..47577209 100644 --- a/backend/app/api/v1/task.py +++ b/backend/app/api/v1/task.py @@ -1,46 +1,37 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -from fastapi import APIRouter +from typing import Annotated + +from fastapi import APIRouter, Path, Body -from backend.app.common.rbac import DependsRBAC from backend.app.common.jwt import DependsJwtAuth +from backend.app.common.rbac import DependsRBAC +from backend.app.common.response.response_code import CustomResponseCode from backend.app.common.response.response_schema import response_base from backend.app.services.task_service import TaskService router = APIRouter() -@router.get('', summary='获取任务列表', dependencies=[DependsJwtAuth]) +@router.get('', summary='获取所有可执行任务模块', dependencies=[DependsJwtAuth]) async def get_all_tasks(): - tasks_list = await TaskService.get_task_list() - return await response_base.success(data=tasks_list) - - -@router.get('/{pk}', summary='获取任务详情', dependencies=[DependsJwtAuth]) -async def get_task(pk: str): - task = await TaskService.get_task(pk=pk) - return await response_base.success(data=task) - - -@router.post('/{pk}/run', summary='执行任务', dependencies=[DependsRBAC]) -async def run_task(pk: str): - task = await TaskService().run(pk=pk) - return await response_base.success(data=task) - - -@router.post('/{pk}/pause', summary='暂停任务', dependencies=[DependsRBAC]) -async def pause_task(pk: str): - task = await TaskService().pause(pk=pk) - return await response_base.success(data=task) - - -@router.post('/{pk}/resume', summary='恢复任务', dependencies=[DependsRBAC]) -async def resume_task(pk: str): - task = await TaskService().resume(pk=pk) - return await response_base.success(data=task) - - -@router.post('/{pk}/stop', summary='删除任务', dependencies=[DependsRBAC]) -async def delete_task(pk: str): - task = await TaskService().delete(pk=pk) - return await response_base.success(data=task) + tasks = TaskService.gets() + return await response_base.success(data=tasks) + + +@router.get('/{pk}', summary='获取任务结果', dependencies=[DependsJwtAuth]) +async def get_task_result(pk: str = Path(description='任务ID')): + task = TaskService.get(pk) + if not task: + return await response_base.fail(res=CustomResponseCode.HTTP_204, data=pk) + return await response_base.success(data=task.result) + + +@router.post('/{module}', summary='执行任务', dependencies=[DependsRBAC]) +async def run_task( + module: Annotated[str, Path(description='任务模块')], + args: Annotated[list | None, Body()] = None, + kwargs: Annotated[dict | None, Body()] = None, +): + task = TaskService.run(module=module, args=args, kwargs=kwargs) + return await response_base.success(data=task.result) diff --git a/backend/app/celery-start.sh b/backend/app/celery-start.sh new file mode 100644 index 00000000..a44f2cb3 --- /dev/null +++ b/backend/app/celery-start.sh @@ -0,0 +1,3 @@ +#!/usr/bin/env bash + +celery -A tasks worker --loglevel=INFO -B diff --git a/backend/app/common/task.py b/backend/app/common/task.py deleted file mode 100644 index 4efa7fb7..00000000 --- a/backend/app/common/task.py +++ /dev/null @@ -1,60 +0,0 @@ -# !/usr/bin/env python3 -# -*- coding: utf-8 -*- -from apscheduler.executors.asyncio import AsyncIOExecutor -from apscheduler.jobstores.redis import RedisJobStore -from apscheduler.schedulers.asyncio import AsyncIOScheduler - -from backend.app.common.log import log -from backend.app.core.conf import settings - - -def _scheduler_conf() -> dict: - """ - task conf - - :return: - """ - redis_conf = { - 'host': settings.APS_REDIS_HOST, - 'port': settings.APS_REDIS_PORT, - 'password': settings.APS_REDIS_PASSWORD, - 'db': settings.APS_REDIS_DATABASE, - 'socket_timeout': settings.APS_REDIS_TIMEOUT, - } - - end_conf = { - # 配置存储器 - 'jobstores': {'default': RedisJobStore(**redis_conf)}, - # 配置执行器 - 'executors': { - 'default': AsyncIOExecutor(), - }, - # 创建task时的默认参数 - 'job_defaults': { - 'coalesce': settings.APS_COALESCE, - 'max_instances': settings.APS_MAX_INSTANCES, - 'misfire_grace_time': settings.APS_MISFIRE_GRACE_TIME, - }, - # 时区 - 'timezone': settings.DATETIME_TIMEZONE, - } - - return end_conf - - -class Scheduler(AsyncIOScheduler): - def start(self, paused: bool = False): - try: - super().start(paused) - except Exception as e: - log.error(f'❌ 任务 scheduler 启动失败: {e}') - - def shutdown(self, wait: bool = True): - try: - super().shutdown(wait) - except Exception as e: - log.error(f'❌ 任务 scheduler 关闭失败: {e}') - - -# 调度器 -scheduler = Scheduler(**_scheduler_conf()) diff --git a/backend/app/core/celery.py b/backend/app/core/celery.py new file mode 100644 index 00000000..81ad2ea4 --- /dev/null +++ b/backend/app/core/celery.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +from celery import Celery + +from backend.app.core.conf import settings + +__all__ = ['celery_app'] + + +def make_celery(main_name: str) -> Celery: + """ + 创建 celery 应用 + + :param main_name: __main__ module name + :return: + """ + app = Celery(main_name) + + # Celery Config + app.conf.broker_url = ( + ( + f'redis://:{settings.CELERY_REDIS_PASSWORD}@{settings.CELERY_REDIS_HOST}:' + f'{settings.CELERY_REDIS_PORT}/{settings.CELERY_BROKER_REDIS_DATABASE}' + ) + if settings.CELERY_BROKER == 'redis' + else ( + f'amqp://{settings.RABBITMQ_USERNAME}:{settings.RABBITMQ_PASSWORD}@{settings.RABBITMQ_HOST}:' + f'{settings.RABBITMQ_PORT}' + ) + ) + app.conf.result_backend = ( + f'redis://:{settings.CELERY_REDIS_PASSWORD}@{settings.CELERY_REDIS_HOST}:' + f'{settings.CELERY_REDIS_PORT}/{settings.CELERY_BACKEND_REDIS_DATABASE}' + ) + app.conf.result_backend_transport_options = { + 'global_keyprefix': settings.CELERY_BACKEND_REDIS_PREFIX, + 'retry_policy': { + 'timeout': settings.CELERY_BACKEND_REDIS_TIMEOUT, + }, + 'result_chord_ordered': settings.CELERY_BACKEND_REDIS_ORDERED, + } + app.conf.timezone = settings.DATETIME_TIMEZONE + app.conf.task_track_started = True + app.autodiscover_tasks() + + # Celery Schedule Tasks + # https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html + app.conf.beat_schedule = settings.CELERY_BEAT_SCHEDULE + app.conf.beat_schedule_filename = settings.CELERY_BEAT_SCHEDULE_FILENAME + + return app + + +celery_app = make_celery('celery_app') diff --git a/backend/app/core/conf.py b/backend/app/core/conf.py index 22049635..bdf0bbdc 100644 --- a/backend/app/core/conf.py +++ b/backend/app/core/conf.py @@ -22,11 +22,19 @@ class Settings(BaseSettings): REDIS_PASSWORD: str REDIS_DATABASE: int - # Env APScheduler Redis - APS_REDIS_HOST: str - APS_REDIS_PORT: int - APS_REDIS_PASSWORD: str - APS_REDIS_DATABASE: int + # Env Celery + CELERY_REDIS_HOST: str + CELERY_REDIS_PORT: int + CELERY_REDIS_PASSWORD: str + CELERY_BROKER_REDIS_DATABASE: int # 仅当使用 redis 作为 broker 时生效, 更适用于测试环境 + CELERY_BACKEND_REDIS_DATABASE: int + + # Env Rabbitmq + # docker run -d --hostname fba-mq --name fba-mq -p 5672:5672 -p 15672:15672 rabbitmq:latest + RABBITMQ_HOST: str + RABBITMQ_PORT: int + RABBITMQ_USERNAME: str + RABBITMQ_PASSWORD: str # Env Token TOKEN_SECRET_KEY: str # 密钥 secrets.token_urlsafe(32) @@ -44,7 +52,7 @@ class Settings(BaseSettings): OPENAPI_URL: str | None = f'{API_V1_STR}/openapi' @root_validator - def validator_api_url(cls, values): + def validate_openapi_url(cls, values): if values['ENVIRONMENT'] == 'pro': values['OPENAPI_URL'] = None return values @@ -84,14 +92,6 @@ def validator_api_url(cls, values): # Redis REDIS_TIMEOUT: int = 5 - # APScheduler Redis - APS_REDIS_TIMEOUT: int = 10 - - # APScheduler Default - APS_COALESCE: bool = False # 是否合并运行 - APS_MAX_INSTANCES: int = 3 # 最大实例数 - APS_MISFIRE_GRACE_TIME: int = 60 # 任务错过执行时间后,最大容错时间,过期后不再执行,单位:秒 - # Token TOKEN_ALGORITHM: str = 'HS256' # 算法 TOKEN_EXPIRE_SECONDS: int = 60 * 60 * 24 * 1 # 过期时间,单位:秒 @@ -147,10 +147,29 @@ def validator_api_url(cls, values): OPERA_LOG_ENCRYPT: int = 1 # 0: AES (性能损耗); 1: md5; 2: ItsDangerous; 3: 不加密, others: 替换为 ****** OPERA_LOG_ENCRYPT_INCLUDE: list[str] = ['password', 'old_password', 'new_password', 'confirm_password'] - # ip location + # Ip location IP_LOCATION_REDIS_PREFIX: str = 'fba_ip_location' IP_LOCATION_EXPIRE_SECONDS: int = 60 * 60 * 24 * 1 # 过期时间,单位:秒 + # Celery + CELERY_BROKER: Literal['rabbitmq', 'redis'] = 'redis' + CELERY_BACKEND_REDIS_PREFIX: str = 'fba_celery' + CELERY_BACKEND_REDIS_TIMEOUT: float = 5.0 + CELERY_BACKEND_REDIS_ORDERED: bool = True + CELERY_BEAT_SCHEDULE_FILENAME: str = './log/celery_beat-schedule' + CELERY_BEAT_SCHEDULE: dict = { + 'task_demo_async': { + 'task': 'tasks.task_demo_async', + 'schedule': 5.0, + }, + } + + @root_validator + def validate_celery_broker(cls, values): + if values['ENVIRONMENT'] == 'pro': + values['CELERY_BROKER'] = 'rabbitmq' + return values + class Config: # https://docs.pydantic.dev/usage/settings/#dotenv-env-support env_file = '.env' diff --git a/backend/app/core/registrar.py b/backend/app/core/registrar.py index 71c67e24..67378417 100644 --- a/backend/app/core/registrar.py +++ b/backend/app/core/registrar.py @@ -10,7 +10,6 @@ from backend.app.api.routers import v1 from backend.app.common.exception.exception_handler import register_exception from backend.app.common.redis import redis_client -from backend.app.common.task import scheduler from backend.app.core.conf import settings from backend.app.database.db_mysql import create_table from backend.app.middleware.jwt_auth_middleware import JwtAuthMiddleware @@ -33,8 +32,6 @@ async def register_init(app: FastAPI): await redis_client.open() # 初始化 limiter await FastAPILimiter.init(redis_client, prefix=settings.LIMITER_REDIS_PREFIX, http_callback=http_limit_callback) - # 启动定时任务 - scheduler.start() yield @@ -42,8 +39,6 @@ async def register_init(app: FastAPI): await redis_client.close() # 关闭 limiter await FastAPILimiter.close() - # 关闭定时任务 - scheduler.shutdown() def register_app(): diff --git a/backend/app/schemas/task.py b/backend/app/schemas/task.py deleted file mode 100644 index c0a13412..00000000 --- a/backend/app/schemas/task.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -from datetime import datetime - -from backend.app.schemas.base import SchemaBase - - -class GetTask(SchemaBase): - id: str - func_name: str - trigger: str - executor: str - name: str - misfire_grace_time: str - coalesce: str - max_instances: str - next_run_time: datetime | None diff --git a/backend/app/services/task_service.py b/backend/app/services/task_service.py index a33f7eb2..0e95e5da 100644 --- a/backend/app/services/task_service.py +++ b/backend/app/services/task_service.py @@ -1,78 +1,33 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- -from datetime import datetime +from celery.exceptions import BackendGetMetaError, NotRegistered +from celery.result import AsyncResult -import pytz -from asgiref.sync import sync_to_async - -from backend.app.common.exception import errors -from backend.app.common.task import scheduler -from backend.app.core.conf import settings -from backend.app.schemas.task import GetTask +from backend.app.common.exception.errors import NotFoundError +from backend.app.core.celery import celery_app class TaskService: @staticmethod - @sync_to_async - def get_task_list(): - tasks = [] - for job in scheduler.get_jobs(): - tasks.append( - GetTask( - **{ - 'id': job.id, - 'func_name': job.func_ref, - 'trigger': str(job.trigger), - 'executor': job.executor, - 'name': job.name, - 'misfire_grace_time': job.misfire_grace_time, - 'coalesce': job.coalesce, - 'max_instances': job.max_instances, - 'next_run_time': job.next_run_time, - } - ).dict() - ) - return tasks + def get(pk: str) -> AsyncResult | None: + try: + result = celery_app.AsyncResult(pk) + except (BackendGetMetaError, NotRegistered): + raise NotFoundError(msg='任务不存在') + if result.failed(): + return None + return result @staticmethod - @sync_to_async - def get_task(pk: str): - job = scheduler.get_job(job_id=pk) - if not job: - raise errors.NotFoundError(msg='任务不存在') - task = GetTask( - **{ - 'id': job.id, - 'func_name': job.func_ref, - 'trigger': str(job.trigger), - 'executor': job.executor, - 'name': job.name, - 'misfire_grace_time': job.misfire_grace_time, - 'coalesce': job.coalesce, - 'max_instances': job.max_instances, - 'next_run_time': job.next_run_time, - } - ) - - return task - - async def run(self, pk: str): - task = await self.get_task(pk=pk) - # next_run_time 仅适用于 pytz 模块 - scheduler.modify_job(job_id=pk, next_run_time=datetime.now(pytz.timezone(settings.DATETIME_TIMEZONE))) - return task + def gets() -> dict: + filtered_tasks = {} + tasks = celery_app.tasks + for key, value in tasks.items(): + if not key.startswith('celery.'): + filtered_tasks[key] = value + return filtered_tasks - async def pause(self, pk: str): - task = await self.get_task(pk=pk) - scheduler.pause_job(job_id=pk) - return task - - async def resume(self, pk: str): - task = await self.get_task(pk=pk) - scheduler.resume_job(job_id=pk) - return task - - async def delete(self, pk: str): - task = await self.get_task(pk=pk) - scheduler.remove_job(job_id=pk) + @staticmethod + def run(*, module: str, args: list | None = None, kwargs: dict | None = None) -> AsyncResult: + task = celery_app.send_task(module, args, kwargs) return task diff --git a/backend/app/tasks.py b/backend/app/tasks.py new file mode 100644 index 00000000..7d18228b --- /dev/null +++ b/backend/app/tasks.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +import uuid +import sys + +sys.path.append('../../') + +from backend.app.core.celery import celery_app # noqa: E402 + + +@celery_app.task +def task_demo_async() -> str: + uid = uuid.uuid4().hex + print(f'异步任务 {uid} 执行成功') + return uid diff --git a/celery.dockerfile b/celery.dockerfile new file mode 100644 index 00000000..33fc1885 --- /dev/null +++ b/celery.dockerfile @@ -0,0 +1,27 @@ +FROM python:3.10-slim + +WORKDIR /fba + +COPY . . + +RUN sed -i 's/deb.debian.org/mirrors.ustc.edu.cn/g' /etc/apt/sources.list.d/debian.sources \ + && sed -i 's|security.debian.org/debian-security|mirrors.ustc.edu.cn/debian-security|g' /etc/apt/sources.list.d/debian.sources + +RUN apt-get update \ + && apt-get install -y --no-install-recommends gcc python3-dev \ + && rm -rf /var/lib/apt/lists/* + +RUN pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple \ + && pip install --no-cache-dir -r requirements.txt -i https://mirrors.aliyun.com/pypi/simple + +ENV TZ = Asia/Shanghai + +RUN mkdir -p /var/log/celery + +COPY ./deploy/celery.conf /etc/supervisor/conf.d/ + +WORKDIR /fba/backend/app + +RUN chmod +x celery-start.sh + +CMD ["./celery-start.sh"] diff --git a/deploy/celery.conf b/deploy/celery.conf new file mode 100644 index 00000000..ea68c063 --- /dev/null +++ b/deploy/celery.conf @@ -0,0 +1,19 @@ +[program:celery_worker] +directory=/fba/backend/app +command=/usr/local/bin/celery -A tasks worker --loglevel=INFO +user=root +autostart=true +autorestart=true +startretries=5 +redirect_stderr=true +stdout_logfile=/var/log/celery/fba_celery_worker.log + +[program:celery_beat] +directory=/fba/backend/app +command=/usr/local/bin/celery -A tasks beat --loglevel=INFO +user=root +autostart=true +autorestart=true +startretries=5 +redirect_stderr=true +stdout_logfile=/var/log/celery/fba_celery_beat.log diff --git a/deploy/docker-compose/.env.server b/deploy/docker-compose/.env.server index 3309a345..bccb885e 100644 --- a/deploy/docker-compose/.env.server +++ b/deploy/docker-compose/.env.server @@ -10,11 +10,17 @@ REDIS_HOST='fba_redis' REDIS_PORT=6379 REDIS_PASSWORD='' REDIS_DATABASE=0 -# APScheduler -APS_REDIS_HOST='fba_redis' -APS_REDIS_PORT=6379 -APS_REDIS_PASSWORD='' -APS_REDIS_DATABASE=1 +# Celery +CELERY_REDIS_HOST='fba_redis' +CELERY_REDIS_PORT=6379 +CELERY_REDIS_PASSWORD='' +CELERY_BROKER_REDIS_DATABASE=1 +CELERY_BACKEND_REDIS_DATABASE=2 +# Rabbitmq +RABBITMQ_HOST='fba_rabbitmq' +RABBITMQ_PORT=5672 +RABBITMQ_USERNAME='guest' +RABBITMQ_PASSWORD='guest' # Token TOKEN_SECRET_KEY='1VkVF75nsNABBjK_7-qz7GtzNy3AMvktc9TCPwKczCk' # Opera Log diff --git a/deploy/docker-compose/docker-compose.yml b/deploy/docker-compose/docker-compose.yml index 25cf1fb7..8656df2a 100644 --- a/deploy/docker-compose/docker-compose.yml +++ b/deploy/docker-compose/docker-compose.yml @@ -2,6 +2,7 @@ version: "3.10" networks: fba_network: + name: fba_network driver: bridge ipam: driver: default @@ -10,19 +11,25 @@ networks: volumes: fba_mysql: + name: fba_mysql fba_redis: + name: fba_redis fba_static: + name: fba_static + fba_rabbitmq: + name: fba_rabbitmq services: fba_server: build: context: ../../ - dockerfile: Dockerfile - container_name: "fba_server" + dockerfile: backend.dockerfile + container_name: fba_server restart: always depends_on: - fba_mysql - fba_redis + - fba_celery volumes: - fba_static:/fba/backend/app/static networks: @@ -32,13 +39,15 @@ services: - -c - | wait-for-it -s fba_mysql:3306 -s fba_redis:6379 -t 300 + mkdir -p /var/log/supervisor/ supervisord -c /fba/deploy/supervisor.conf + supervisorctl restart fastapi_server fba_mysql: image: mysql:8.0.29 ports: - "${DOCKER_DB_MAP_PORT:-3306}:3306" - container_name: "fba_mysql" + container_name: fba_mysql restart: always environment: MYSQL_DATABASE: fba @@ -58,7 +67,7 @@ services: image: redis:6.2.7 ports: - "${DOCKER_REDIS_MAP_PORT:-6379}:6379" - container_name: "fba_redis" + container_name: fba_redis restart: always environment: - TZ=Asia/Shanghai @@ -71,7 +80,7 @@ services: image: nginx ports: - "8000:80" - container_name: "fba_nginx" + container_name: fba_nginx restart: always depends_on: - fba_server @@ -80,3 +89,39 @@ services: - fba_static:/www/fba/backend/app/static networks: - fba_network + + fba_rabbitmq: + hostname: fba_rabbitmq + image: rabbitmq:3.12.7 + ports: + - "15672:15672" + - "5672:5672" + container_name: fba_rabbitmq + restart: always + environment: + - RABBITMQ_DEFAULT_USER=guest + - RABBITMQ_DEFAULT_PASS=guest + volumes: + - fba_rabbitmq:/var/lib/rabbitmq + networks: + - fba_network + + fba_celery: + build: + context: ../../ + dockerfile: celery.dockerfile + container_name: fba_celery + restart: always + depends_on: + - fba_rabbitmq + networks: + - fba_network + command: + - bash + - -c + - | + wait-for-it -s fba_rabbitmq:5672 -t 300 + mkdir -p /var/log/supervisor/ + supervisord -c /fba/deploy/supervisor.conf + supervisorctl restart celery_worker + supervisorctl restart celery_beat diff --git a/deploy/fastapi_server.conf b/deploy/fastapi_server.conf new file mode 100644 index 00000000..76bd9ba6 --- /dev/null +++ b/deploy/fastapi_server.conf @@ -0,0 +1,9 @@ +[program:fastapi_server] +directory=/fba +command=/usr/local/bin/gunicorn -c /fba/deploy/gunicorn.conf.py main:app +user=root +autostart=true +autorestart=true +startretries=5 +redirect_stderr=true +stdout_logfile=/var/log/fastapi_server/fba_server.log diff --git a/deploy/supervisor.conf b/deploy/supervisor.conf index ef9afbc7..c69c0ef7 100644 --- a/deploy/supervisor.conf +++ b/deploy/supervisor.conf @@ -42,7 +42,7 @@ file=/tmp/supervisor.sock ; the path to the socket file ;password=123 ; default is no password (open server) [supervisord] -logfile=/tmp/supervisord.log ; main log file; default $CWD/supervisord.log +logfile=/var/log/supervisor/supervisord.log ; main log file; default $CWD/supervisord.log logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB logfile_backups=10 ; # of main logfile backups; 0 means none, default 10 loglevel=info ; log level; default info; others: debug,warn,trace @@ -151,15 +151,5 @@ serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL for a unix socket ; interpreted as relative to this file. Included files *cannot* ; include files themselves. -;[include] -;files = relative/directory/*.ini - -[program:fastapi_server] -directory=/fba -command=/usr/local/bin/gunicorn -c /fba/deploy/gunicorn.conf.py main:app -user=root -autostart=true -autorestart=true -startretries=5 -redirect_stderr=true -stdout_logfile=/var/log/fastapi_server/fba_server.log +[include] +files = /etc/supervisor/conf.d/*.conf diff --git a/requirements.txt b/requirements.txt index 0c939481..da4f75a1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ asyncmy==0.2.5 bcrypt==3.2.2 casbin==1.23.0 casbin_async_sqlalchemy_adapter==1.3.0 +celery==5.3.4 cryptography==41.0.2 email-validator==1.1.3 Faker==9.7.1 @@ -34,6 +35,6 @@ SQLAlchemy==2.0.8 starlette==0.27.0 supervisor==4.2.5 user_agents==2.2.0 -uvicorn[standard]==0.13.4 +uvicorn[standard]==0.22.0 wait-for-it==2.2.1 XdbSearchIP==1.0.2