diff --git a/.gitignore b/.gitignore index 3c277e0..ca95377 100644 --- a/.gitignore +++ b/.gitignore @@ -1,11 +1,135 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# VS code extension +.vscode/ + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +pip-wheel-metadata/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.nox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 +db.sqlite3-journal + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# IPython +profile_default/ +ipython_config.py + +# pyenv +.python-version + +# pipenv +# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. +# However, in case of collaboration, if having platform-specific dependencies or dependencies +# having no cross-platform support, pipenv may install dependencies that don't work, or not +# install all needed dependencies. +#Pipfile.lock + +# PEP 582; used by e.g. github.com/David-OConnor/pyflow +__pypackages__/ + +# Celery stuff +celerybeat-schedule +celerybeat.pid + +# SageMath parsed files +*.sage.py + +# Environments .env -env -venv -__pycache__ - -*.pyc -*.sqlite -*.coverage -.DS_Store -env.sh -migrations +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json + +# Pyre type checker +.pyre/ + +# pre-commit +packagename.egg-info/ diff --git a/.isort.cfg b/.isort.cfg new file mode 100644 index 0000000..b7668d2 --- /dev/null +++ b/.isort.cfg @@ -0,0 +1,6 @@ +[settings] +profile = black +line_length = 88 +multi_line_output = 3 +include_trailing_comma = True +known_third_party = base,flask,flask_bootstrap,flask_testing,pandas,pydantic,redis,requests,rq \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000..48cecf3 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,36 @@ +repos: +- repo: https://github.com/asottile/seed-isort-config + rev: v2.2.0 + hooks: + - id: seed-isort-config +- repo: https://github.com/pre-commit/mirrors-isort + rev: v5.10.1 + hooks: + - id: isort +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v4.1.0 # Use the ref you want to point at + hooks: + - id: trailing-whitespace + - id: check-case-conflict + - id: check-added-large-files + exclude: ^pre-commit + - id: check-merge-conflict +- repo: https://github.com/ambv/black + rev: 22.3.0 + hooks: + - id: black +- repo: https://github.com/PyCQA/flake8 + rev: 4.0.1 + hooks: + - id: flake8 + args: [ + "--max-line-length=89", + "--max-complexity=18", + "--config=setup.cfg", + "--ignore=T001,T003" + ] + exclude: __init__.py|manage.py|config.py|env.py + additional_dependencies: [ + flake8-print, pep8-naming, flake8-alfred, flake8-bugbear, flake8-todo, pydocstyle, Pygments, + # flake8-builtins,flake8-comprehensions, flake8-pytest-style, flake8-docstrings, flake8-eradicate, flake8-unused-arguments, + ] diff --git a/.python-version b/.python-version index 1981190..7b59a5c 100644 --- a/.python-version +++ b/.python-version @@ -1 +1 @@ -3.8.0 +3.10.2 diff --git a/Dockerfile b/Dockerfile index c5b4822..5bcdd6b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,15 +1,61 @@ # base image -FROM python:3.8.0-alpine +FROM ubuntu:20.04 -# set working directory +# Set location +ENV TZ=Asia/Kolkata \ + DEBIAN_FRONTEND=noninteractive + +# Update and install python, pip +RUN apt-get update -y +RUN apt-get install tzdata +RUN apt-get install -y python3-pip python3-dev build-essential curl +RUN apt-get update -y +RUN apt upgrade -y +RUN apt-get install -y wkhtmltopdf + +RUN apt-get update && \ + apt-get install -y software-properties-common && \ + rm -rf /var/lib/apt/lists/* +RUN add-apt-repository -y ppa:alex-p/tesseract-ocr +RUN apt-get update +RUN apt-get -y install tesseract-ocr +# pytesseract using pip forgot to install binaries. +# Ref: https://stackoverflow.com/questions/50655738/how-do-i-resolve-a-tesseractnotfounderror +# Ref: https://docs.ropensci.org/tesseract/ +RUN apt-get install -y libtesseract-dev libleptonica-dev tesseract-ocr-eng +RUN cd /usr/share/tesseract-ocr/4.00/tessdata/ +RUN apt install wget +RUN wget https://github.com/tesseract-ocr/tessdata/blob/main/eng.traineddata +RUN cd /usr/share/tesseract-ocr/4.00/tessdata/ +RUN wget https://github.com/tesseract-ocr/tessdata/blob/main/osd.traineddata + +CMD /bin/bash + +# LABEL about the custom image +LABEL maintainer="vaibhav.hiwase@gmail.com" +LABEL version="0.1" +LABEL description="This is custom Docker Image." + +# Set working directory RUN mkdir -p /usr/src/app WORKDIR /usr/src/app -# add requirements (to leverage Docker cache) +# Install some requirements (not a part of this project) +RUN pip3 install pip --upgrade +RUN pip3 install pillow==8.3.2 +RUN pip3 install pytesseract==0.3.8 +RUN pip3 install spacy==3.2.4 +RUN python3 -m spacy download en_core_web_sm + +# Add requirements (to leverage Docker cache) ADD ./requirements.txt /usr/src/app/requirements.txt -# install requirements -RUN pip install -r requirements.txt +# Install depedencies +RUN pip3 install -r requirements.txt -# copy project + +# Copy project COPY . /usr/src/app + +EXPOSE 5000 +CMD ["python3", "-u", "manage.py", "run", "-h", "0.0.0.0"] \ No newline at end of file diff --git a/LICENSE b/LICENSE index a59e7e6..b3f8034 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2019 Michael Herman +Copyright (c) 2022 Michael Herman Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index 3ee1614..86769fe 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Flask Redis Queue +# Demo flask redis dockerized templates Example of how to handle background processes with Flask, Redis Queue, and Docker @@ -7,7 +7,77 @@ Example of how to handle background processes with Flask, Redis Queue, and Docke Spin up the containers: ```sh -$ docker-compose up -d --build +$ docker-compose up --build -V --scale worker=4 +``` + +Spin up the containers in background: + +```sh +$ docker-compose up -d --build -V --scale worker=4 +``` + + +Stop all containers and workers: + +```sh +$ docker-compose down -v ``` Open your browser to http://localhost:5004 + +Open redis dashboard in http://localhost:9181/ + +Show logs from worker containers: +```sh +docker-compose logs --tail=0 -f master +docker-compose logs --tail=0 -f worker +``` + +You can view a list of all the allocated volumes in your system with +```sh +docker volume ls +``` + +If you prefer a more automatic cleanup, the following command will remove any unused images or volumes, and any stopped containers that are still in the system. +```sh +docker system prune --volumes +``` + +## Some important docker commands: +Below command will remove the following: + - all stopped containers + - all networks not used by at least one container + - all dangling images + - all dangling build cache +```sh +docker system prune +``` +Below command will remove the following: + - all stopped containers + - all networks not used by at least one container + - all images without at least one container associated to them + - all build cache +```sh +docker system prune --all --force +``` +Below command will remove all docker images: +```sh +docker rmi --force $(docker images --all --quiet) +``` + +# Contribution + +## Pre-commit +Following command will help to remove trailing-whitespace, check case conflict, check added large files, +check merge conflict by using isort, black and flake8 automation tools. +```sh +python3 pre-commit-2.15.0.pyz run -a +``` + +## Delete __pycache__ files +```sh +find . | grep -E "(__pycache__|\.pyc|\.pyo$)" | xargs rm -rf +``` + +## Upgrade +This architecture has been upgraded in [this](https://github.com/vhiwase/react_native-python_flask-sql-nignx-with-authentication-boilerplate) repo. \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 5b479bf..5e38a24 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,16 +1,63 @@ -version: '3.7' +version: '3.8' services: - web: + master: build: . - image: web - container_name: web + image: master + container_name: master ports: - 5004:5000 - command: python manage.py run -h 0.0.0.0 volumes: - - .:/usr/src/app + - app-volume:/usr/src/app + # volumes: + # - .:/usr/src/app environment: - FLASK_DEBUG=1 - APP_SETTINGS=project.server.config.DevelopmentConfig + - REDIS_URL=redis://redis:6379/0 + depends_on: + - redis + + worker: + image: master + command: python3 -u manage.py run_worker + volumes: + - app-volume:/usr/src/app + # volumes: + # - .:/usr/src/app + environment: + - APP_SETTINGS=project.server.config.DevelopmentConfig + - REDIS_URL=redis://redis:6379/0 + depends_on: + - redis + - master + + redis: + image: redis:6.2-alpine + ports: + - 6379:6379 + + dashboard: + build: ./project/dashboard + image: dashboard + container_name: dashboard + ports: + - 9181:9181 + command: rq-dashboard -H redis + depends_on: + - redis + + webhook: + build: ./webhook + image: webhook + container_name: webhook + ports: + - 8888:8888 + environment: + - APP_SETTINGS=project.server.config.DevelopmentConfig + - WEBHOOK_ENDPOINT=http://webhook:8888 + +volumes: + app-volume: + name: app-volume-001 \ No newline at end of file diff --git a/manage.py b/manage.py index f395c01..68342a0 100644 --- a/manage.py +++ b/manage.py @@ -3,11 +3,12 @@ import unittest +import redis from flask.cli import FlaskGroup +from rq import Connection, Worker from project.server import create_app - app = create_app() cli = FlaskGroup(create_app=create_app) @@ -22,5 +23,14 @@ def test(): return 1 +@cli.command("run_worker") +def run_worker(): + redis_url = app.config["REDIS_URL"] + redis_connection = redis.from_url(redis_url) + with Connection(redis_connection): + worker = Worker(app.config["QUEUES"]) + worker.work() + + if __name__ == "__main__": cli() diff --git a/postman-collection/Demo Flask Redis Docker templates.postman_collection.json b/postman-collection/Demo Flask Redis Docker templates.postman_collection.json new file mode 100644 index 0000000..2a06ab6 --- /dev/null +++ b/postman-collection/Demo Flask Redis Docker templates.postman_collection.json @@ -0,0 +1,150 @@ +{ + "info": { + "_postman_id": "b6992e83-bf22-461c-a075-5a863b64aff3", + "name": "Demo Flask Redis Docker templates", + "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json" + }, + "item": [ + { + "name": "POST sync", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "var jsonData = JSON.parse(responseBody);\r", + "pm.collectionVariables.set(\"job_id\", jsonData.data.job_id);" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "POST", + "header": [], + "body": { + "mode": "formdata", + "formdata": [ + { + "key": "webhook_endpoint", + "value": "http://52.172.224.181:5000", + "type": "text", + "disabled": true + }, + { + "key": "delay", + "value": "5", + "type": "text" + }, + { + "key": "file1", + "type": "file", + "src": [] + }, + { + "key": "file2", + "type": "file", + "src": [] + } + ] + }, + "url": { + "raw": "http://localhost:5004/sync/jobs", + "protocol": "http", + "host": [ + "localhost" + ], + "port": "5004", + "path": [ + "sync", + "jobs" + ] + } + }, + "response": [] + }, + { + "name": "POST async", + "event": [ + { + "listen": "test", + "script": { + "exec": [ + "var jsonData = JSON.parse(responseBody);\r", + "pm.collectionVariables.set(\"job_id\", jsonData.data.job_id);" + ], + "type": "text/javascript" + } + } + ], + "request": { + "method": "POST", + "header": [], + "body": { + "mode": "formdata", + "formdata": [ + { + "key": "webhook_endpoint", + "value": "http://52.172.224.181:5000", + "type": "text" + }, + { + "key": "delay", + "value": "15", + "type": "text" + }, + { + "key": "file1", + "type": "file", + "src": [] + }, + { + "key": "file2", + "type": "file", + "src": [] + } + ] + }, + "url": { + "raw": "http://localhost:5004/async/jobs", + "protocol": "http", + "host": [ + "localhost" + ], + "port": "5004", + "path": [ + "async", + "jobs" + ] + } + }, + "response": [] + }, + { + "name": "GET", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "http://localhost:5004/jobs/{{job_id}}", + "protocol": "http", + "host": [ + "localhost" + ], + "port": "5004", + "path": [ + "jobs", + "{{job_id}}" + ] + } + }, + "response": [] + } + ], + "variable": [ + { + "key": "job_id", + "value": "" + } + ] +} \ No newline at end of file diff --git a/pre-commit-2.15.0.pyz b/pre-commit-2.15.0.pyz new file mode 100644 index 0000000..d1d9a72 Binary files /dev/null and b/pre-commit-2.15.0.pyz differ diff --git a/project/client/static/favicon.ico b/project/client/static/favicon.ico new file mode 100644 index 0000000..8f67463 Binary files /dev/null and b/project/client/static/favicon.ico differ diff --git a/project/client/static/main.js b/project/client/static/main.js deleted file mode 100644 index 61c8bdc..0000000 --- a/project/client/static/main.js +++ /dev/null @@ -1,43 +0,0 @@ -// custom javascript - -$( document ).ready(() => { - console.log('Sanity Check!'); -}); - -$('.btn').on('click', function() { - $.ajax({ - url: '/tasks', - data: { type: $(this).data('type') }, - method: 'POST' - }) - .done((res) => { - getStatus(res.data.task_id) - }) - .fail((err) => { - console.log(err) - }); -}); - -function getStatus(taskID) { - $.ajax({ - url: `/tasks/${taskID}`, - method: 'GET' - }) - .done((res) => { - const html = ` - - ${res.data.task_id} - ${res.data.task_status} - ${res.data.task_result} - ` - $('#tasks').prepend(html); - const taskStatus = res.data.task_status; - if (taskStatus === 'finished' || taskStatus === 'failed') return false; - setTimeout(function() { - getStatus(res.data.task_id); - }, 1000); - }) - .fail((err) => { - console.log(err) - }); -} diff --git a/project/client/templates/_base.html b/project/client/templates/_base.html index 31ad653..181c82b 100644 --- a/project/client/templates/_base.html +++ b/project/client/templates/_base.html @@ -3,13 +3,13 @@ - Flask + Redis Queue + Docker{% block title %}{% endblock %} + Demo Webpage using Flask Docker and Redis{% block title %}{% endblock %} - + - + {{ bootstrap.load_css() }} {% block css %}{% endblock %} @@ -22,14 +22,6 @@ {% include 'footer.html' %} - - - - - - - {% block js %}{% endblock %} - diff --git a/project/client/templates/main/home.html b/project/client/templates/main/home.html index 6e7023d..41235da 100644 --- a/project/client/templates/main/home.html +++ b/project/client/templates/main/home.html @@ -3,34 +3,8 @@ {% block content %}
-

Flask + Redis Queue + Docker

+

Flask with redis demo application template



-
-

Tasks

-

Pick a task length...

-
-
-
-

-
-

Task Status

-
- - - - - - - - - - -
IDStatusResult
-
-
-{% endblock %} +{% endblock %} \ No newline at end of file diff --git a/project/dashboard/Dockerfile b/project/dashboard/Dockerfile new file mode 100644 index 0000000..3ba51e3 --- /dev/null +++ b/project/dashboard/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.10-alpine + +RUN pip install rq-dashboard + +# https://github.com/rq/rq/issues/1469 +RUN pip uninstall -y click +RUN pip install click==7.1.2 + +EXPOSE 9181 + +CMD ["rq-dashboard"] diff --git a/project/server/__init__.py b/project/server/__init__.py index 8e0bc2b..b6d6127 100644 --- a/project/server/__init__.py +++ b/project/server/__init__.py @@ -4,10 +4,7 @@ import os from flask import Flask -from flask_bootstrap import Bootstrap - -# instantiate the extensions -bootstrap = Bootstrap() +from flask_bootstrap import Bootstrap4 def create_app(script_info=None): @@ -24,7 +21,7 @@ def create_app(script_info=None): app.config.from_object(app_settings) # set up extensions - bootstrap.init_app(app) + Bootstrap4(app) # register blueprints from project.server.main.views import main_blueprint diff --git a/project/server/config.py b/project/server/config.py index 5ef9d9a..3602b9e 100644 --- a/project/server/config.py +++ b/project/server/config.py @@ -3,6 +3,9 @@ import os basedir = os.path.abspath(os.path.dirname(__file__)) +REDIS_URL = os.environ.get("REDIS_URL") # REDIS_URL = "redis://redis:6379/0" + +from project.server.main.job_callbacks import report_failure, report_success class BaseConfig(object): @@ -10,11 +13,67 @@ class BaseConfig(object): WTF_CSRF_ENABLED = True + # Reference: https://python-rq.org/docs + REDIS_URL = os.environ.get("REDIS_URL", REDIS_URL) + QUEUES = ["default"] + + # job_timeout specifies the maximum runtime of the job before it’s \ + # interrupted and marked as failed. Its default unit is second and it can \ + # be an integer or a string representing an integer(e.g. 2, '2'). \ + # Furthermore, it can be a string with specify unit including hour, \ + # minute, second(e.g. '1h', '3m', '5s'). + JOB_TIMEOUT = "1h" + + # result_ttl specifies how long (in seconds) successful jobs and their \ + # results are kept. Expired jobs will be automatically deleted. + # Defaults to 500 seconds. + JOB_RESULT_TTL = 3600 * 12 + + # ttl specifies the maximum queued time (in seconds) of the job before \ + # it’s discarded. This argument defaults to None (infinite TTL). + JOB_TTL = None + + # failure_ttl specifies how long failed jobs are kept (defaults to 1 year) + JOB_FAILURE_TTL = "300s" + + # depends_on specifies another job (or list of jobs) that must complete \ + # before this job will be queued. + DEPENDS_ON = None + + # job_id allows you to manually specify this job’s job_id + JOB_ID = None + + # at_front will place the job at the front of the queue, instead of the back + JOB_AT_FRONT = False + + # description to add additional description to enqueued jobs. + JOB_DESCRIPTION = "This job is handled independently by seperate web worker" + + # on_success allows you to run a function after a job completes successfully + JOB_ON_SUCCESS = report_success + + # on_failure allows you to run a function after a job fails + JOB_ON_FAILURE = report_failure + + # You can also enqueue multiple jobs in bulk with queue.enqueue_many() + # and Queue.prepare_data() which will enqueue all the jobs in a single + # redis pipeline which you can optionally pass in yourself + JOB_PIPELINE = None + + # Refer https://python-rq.org/docs/ and + # https://github.com/rq/rq/blob/master/rq/queue.py + JOB_META = None + JOB_RETRY = None + + # Webhook endpoint to send automatic response after job success or failure + WEBHOOK_ENDPOINT = "http://webhook:8888" + class DevelopmentConfig(BaseConfig): """Development configuration.""" WTF_CSRF_ENABLED = False + REDIS_URL = os.environ.get("REDIS_URL", REDIS_URL) class TestingConfig(BaseConfig): @@ -23,3 +82,4 @@ class TestingConfig(BaseConfig): TESTING = True WTF_CSRF_ENABLED = False PRESERVE_CONTEXT_ON_EXCEPTION = False + REDIS_URL = os.environ.get("REDIS_URL", REDIS_URL) diff --git a/project/server/main/ds.py b/project/server/main/ds.py new file mode 100644 index 0000000..729c08e --- /dev/null +++ b/project/server/main/ds.py @@ -0,0 +1,21 @@ +import dataclasses +from typing import List + +import pandas as pd +from pydantic.dataclasses import dataclass + + +@dataclasses.dataclass +class DataFrames: + dataframe: pd.DataFrame = None + + +@dataclass +class OutputObject: + delay: int = None + time: float = (None,) + texts: List[str] = (None,) + file_names: List[str] = (None,) + byte_data_lengths: List[int] = (None,) + webhook_endpoint: str = None + tic: float = None diff --git a/project/server/main/job_callbacks.py b/project/server/main/job_callbacks.py new file mode 100644 index 0000000..e080ebf --- /dev/null +++ b/project/server/main/job_callbacks.py @@ -0,0 +1,147 @@ +# Job callbacks + +import pathlib +import sys +import time + +import requests +from redis import Redis +from rq import Queue + +try: + basedir = pathlib.os.path.abspath(pathlib.os.path.dirname(__file__)) +except NameError: + basedir = pathlib.os.path.abspath(pathlib.os.path.dirname(".")) +server_parentdir = pathlib.os.path.dirname(basedir) + +try: + from project.server.main.utils import unpacking +except ModuleNotFoundError: + project_parentdir = pathlib.os.path.dirname(server_parentdir) + root_parentdir = pathlib.os.path.dirname(project_parentdir) + # Import root for project + pathlib.sys.path.insert(0, root_parentdir) + from project.server.main.utils import unpacking + + +def report_success( + job: Queue.enqueue, connection: Redis, result: object, *args, **kwargs +): + """ + Success callbacks must be a function that accepts job, connection and + result arguments. Your function should also accept *args and **kwargs + so your application doesn’t break when additional parameters are added. + + Success callbacks are executed after job execution is complete, before + dependents are enqueued. If an exception happens when your callback is + executed, job status will be set to FAILED and dependents won’t be enqueued. + + Callbacks are limited to 60 seconds of execution time. If you want to + execute a long running job, consider using RQ’s job dependency feature instead. + + Parameters + ---------- + job : Queue.enqueue + Enqued job object in rq. + connection : Redis + Redis connection object. + result : object + returned value of enqued job object in rq. + *args + Additional tuple parameter(s). + **kwargs + additional mapped dictionary parameter(s). + + Returns + ------- + None. + + """ + # Run a function after a job completes successfully + unpacked_object = unpacking(job.result) + tic = unpacked_object and unpacked_object.tic + webhook_endpoint = unpacked_object.webhook_endpoint + toc = time.time() + payload = { + "status": "success", + "data": { + "time": toc - tic, + "job_id": job.get_id(), + "job_status": job.get_status(), + "texts": unpacked_object.texts, + "file_names": unpacked_object.file_names, + "byte_data_lengths": unpacked_object.byte_data_lengths, + "webhook_endpoint": unpacked_object.webhook_endpoint, + "delay": unpacked_object.delay, + }, + } + url = webhook_endpoint + headers = {"Content-Type": "text/plain"} + response = requests.request( + "POST", url, headers=headers, data=str(payload).encode("utf-8") + ) + return response + + +def report_failure( + job: Queue.enqueue, + connection: Redis, + type: sys.exc_info()[0], + value: sys.exc_info()[1], + traceback: sys.exc_info()[2], +) -> sys.exc_info(): + """ + Failure callbacks are functions that accept job, connection, type, value + and traceback arguments. type, value and traceback values returned by + sys.exc_info(), which is the exception raised when executing your job. + + Failure callbacks are limited to 60 seconds of execution time. + + Parameters + ---------- + job : Queue.enqueue + Enqued job object in rq. + connection : Redis + Redis connection object. + type : sys.exc_info()[0] + type gets the type of the exception being handled (a subclass of + BaseException). + value : sys.exc_info()[1]. + value gets the exception instance (an instance of the exception type). + traceback : sys.exc_info()[2] + traceback gets a traceback object which encapsulates the call stack + at the point where the exception originally occurred + + Returns + ------- + sys.exc_info(). + This function returns a tuple of three values that give information + about the exception that is currently being handled. + + """ + # Run a function after a job fails + webhook_endpoint = value.webhook_endpoint + tic = value.tic + if job: + pickled_object = job.result + unpacked_object = unpacking(pickled_object) + toc = time.time() + payload = { + "status": "error", + "data": { + "time": toc - tic, + "job_id": job.get_id(), + "job_status": job.get_status(), + "texts": unpacked_object.texts, + "file_names": unpacked_object.file_names, + "byte_data_lengths": unpacked_object.byte_data_lengths, + "webhook_endpoint": unpacked_object.webhook_endpoint, + "delay": unpacked_object.delay, + }, + } + url = webhook_endpoint + headers = {"Content-Type": "text/plain"} + response = requests.request( + "POST", url, headers=headers, data=str(payload).encode("utf-8") + ) + return response diff --git a/project/server/main/jobs.py b/project/server/main/jobs.py new file mode 100644 index 0000000..0f6a8e0 --- /dev/null +++ b/project/server/main/jobs.py @@ -0,0 +1,28 @@ +import time + +from project.server.main import ds +from project.server.main.utils import packing + + +def create_job(texts, file_names, byte_datas, webhook_endpoint, delay, tic): + try: + time.sleep(int(delay)) + webhook_endpoint = webhook_endpoint + byte_data_lengths = [] + for byte_data in byte_datas: + byte_data_lengths.append(len(byte_data)) + toc = time.time() + output_object = ds.OutputObject( + time=toc - tic, + texts=texts, + file_names=file_names, + byte_data_lengths=byte_data_lengths, + webhook_endpoint=webhook_endpoint, + delay=delay, + tic=tic, + ) + pickled_object = packing(source_object=output_object) + return pickled_object + except Exception as e: + e.webhook_endpoint = webhook_endpoint + e.tic = tic diff --git a/project/server/main/utils.py b/project/server/main/utils.py new file mode 100644 index 0000000..488b0b3 --- /dev/null +++ b/project/server/main/utils.py @@ -0,0 +1,43 @@ +import pickle + + +def packing(source_object: object) -> bytes: + """ + A pickle function to pack the object in bytes. + + Parameters + ---------- + source_object : object + Any object or dataclass. + + Returns + ------- + bytes + Pickle object in bytes. + + """ + if source_object is None: + return + pickled_object = pickle.dumps(source_object) + return pickled_object + + +def unpacking(pickled_object: bytes) -> object: + """ + A function to convert any pickle bytes into its object + + Parameters + ---------- + pickled_object : bytes + Pickle object in bytes. + + Returns + ------- + object + Object or dataclass. + + """ + if pickled_object is None: + return + unpacked_object = pickle.loads(pickled_object) + return unpacked_object diff --git a/project/server/main/views.py b/project/server/main/views.py index 365c6c1..3b1be93 100644 --- a/project/server/main/views.py +++ b/project/server/main/views.py @@ -1,9 +1,19 @@ # project/server/main/views.py -from flask import render_template, Blueprint, jsonify, request +import time -main_blueprint = Blueprint("main", __name__,) +import redis +from flask import Blueprint, current_app, jsonify, render_template, request +from rq import Connection, Queue + +from project.server.main.jobs import create_job +from project.server.main.utils import unpacking + +main_blueprint = Blueprint( + "main", + __name__, +) @main_blueprint.route("/", methods=["GET"]) @@ -11,12 +21,132 @@ def home(): return render_template("main/home.html") -@main_blueprint.route("/tasks", methods=["POST"]) -def run_task(): - task_type = request.form["type"] - return jsonify(task_type), 202 +@main_blueprint.route("/sync/jobs", methods=["POST"]) +def run_sync_job(): + tic = time.time() + webhook_endpoint = request.form.get( + "webhook_endpoint", current_app.config["WEBHOOK_ENDPOINT"] + ) + delay = request.form.get("delay") + files = dict(request.files.to_dict(flat=True)) + texts = [] + file_names = [] + byte_datas = [] + for text, v in files.items(): + texts.append(text) + file_names.append(v.filename) + byte_datas.append(v.read()) + pickled_object = create_job( + texts, file_names, byte_datas, webhook_endpoint, delay, tic + ) + toc = time.time() + unpacked_object = unpacking(pickled_object) + tic = unpacked_object and unpacked_object.tic + if tic: + time_elasped = toc - tic + else: + time_elasped = None + response_object = { + "status": "success", + "data": { + "texts": unpacked_object and unpacked_object.texts, + "file_names": unpacked_object and unpacked_object.file_names, + "byte_data_lengths": unpacked_object and unpacked_object.byte_data_lengths, + "webhook_endpoint": unpacked_object and unpacked_object.webhook_endpoint, + "time": unpacked_object and unpacked_object.time, + "delay": unpacked_object and unpacked_object.delay, + "time_elasped": time_elasped, + }, + } + + return jsonify(response_object), 200 + + +@main_blueprint.route("/async/jobs", methods=["POST"]) +def run_async_job(): + tic = time.time() + webhook_endpoint = request.form.get( + "webhook_endpoint", current_app.config["WEBHOOK_ENDPOINT"] + ) + delay = request.form.get("delay") + files = dict(request.files.to_dict(flat=True)) + texts = [] + file_names = [] + byte_datas = [] + for text, v in files.items(): + texts.append(text) + file_names.append(v.filename) + byte_datas.append(v.read()) + on_success = current_app.config["JOB_ON_SUCCESS"] + on_failure = current_app.config["JOB_ON_FAILURE"] + with Connection(redis.from_url(current_app.config["REDIS_URL"])): + q = Queue() + job = q.enqueue( + create_job, + texts, + file_names, + byte_datas, + webhook_endpoint, + delay, + tic, + job_timeout=current_app.config["JOB_TIMEOUT"], + description=current_app.config["JOB_DESCRIPTION"], + result_ttl=current_app.config["JOB_RESULT_TTL"], + ttl=current_app.config["JOB_TTL"], + failure_ttl=current_app.config["JOB_FAILURE_TTL"], + depends_on=current_app.config["DEPENDS_ON"], + job_id=current_app.config["JOB_ID"], + at_front=current_app.config["JOB_AT_FRONT"], + meta=current_app.config["JOB_META"], + retry=current_app.config["JOB_RETRY"], + on_success=on_success, + on_failure=on_failure, + pipeline=current_app.config["JOB_PIPELINE"], + ) + toc = time.time() + response_object = { + "status": "success", + "data": {"job_id": job.get_id(), "time": toc - tic}, + } + return jsonify(response_object), 202 -@main_blueprint.route("/tasks/", methods=["GET"]) -def get_status(task_id): - return jsonify(task_id) +@main_blueprint.route("/jobs/", methods=["GET"]) +def get_status(job_id): + unpacked_object = None + with Connection(redis.from_url(current_app.config["REDIS_URL"])): + q = Queue() + job = q.fetch_job(job_id) + if job: + pickled_object = job.result + unpacked_object = unpacking(pickled_object) + toc = time.time() + tic = unpacked_object and unpacked_object.tic + if tic: + time_elasped = toc - tic + else: + time_elasped = None + response_object = { + "status": "success", + "data": { + "job_id": job.get_id(), + "job_status": job.get_status(), + "texts": unpacked_object and unpacked_object.texts, + "file_names": unpacked_object and unpacked_object.file_names, + "byte_data_lengths": ( + unpacked_object and unpacked_object.byte_data_lengths + ), + "webhook_endpoint": ( + unpacked_object and unpacked_object.webhook_endpoint + ), + "time": unpacked_object and unpacked_object.time, + "delay": unpacked_object and unpacked_object.delay, + "time_elasped": time_elasped, + }, + } + else: + response_object = {"status": "error"} + if unpacked_object: + return jsonify(response_object), 200 + else: + return jsonify(response_object), 206 diff --git a/requirements.txt b/requirements.txt index d90a786..c0e8d7b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,8 @@ -Flask==1.1.1 -Flask-Bootstrap==3.3.7.1 -Flask-Testing==0.7.1 -Flask-WTF==0.14.2 +Bootstrap-Flask==2.0.0 +Flask==2.0.2 +Flask-Testing==0.8.1 +Flask-WTF==1.0.0 +redis==4.1.1 +rq==1.10.1 +pydantic==1.8.2 +pandas diff --git a/webhook/Dockerfile b/webhook/Dockerfile new file mode 100644 index 0000000..f6b9697 --- /dev/null +++ b/webhook/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.10-alpine + +RUN pip install pip --upgrade +RUN pip install Flask==2.0.3 + +# Copy project +COPY . . + +EXPOSE 8888 +CMD ["python3", "-u", "app.py", "-h", "0.0.0.0"] \ No newline at end of file diff --git a/webhook/app.py b/webhook/app.py new file mode 100644 index 0000000..c4a63b3 --- /dev/null +++ b/webhook/app.py @@ -0,0 +1,47 @@ +# Importing flask module in the project is mandatory +# An object of Flask class is our WSGI application. +import pprint +from os import environ + +from flask import Flask, request + +# Flask constructor takes the name of +# current module (__name__) as argument. +app = Flask(__name__) + +try: + WEBHOOK_ENDPOINT = environ.get("WEBHOOK_ENDPOINT") +except RuntimeError: + WEBHOOK_ENDPOINT = None + +print("WEBHOOK_ENDPOINT:", WEBHOOK_ENDPOINT) + +# The route() function of the Flask class is a decorator, +# which tells the application which URL should call +# the associated function. + + +@app.route("/", methods=["POST"]) +# ‘/’ URL is bound with webhook_function() function. +def webhook_function(): + try: + data = eval(request.data) + except Exception: + data = request.data + print() + print("Webhook received following output ...") + print() + pprint.pprint(data) + print("----------" * 10) + print() + print() + return data + + +# main driver function +if __name__ == "__main__": + + # run() method of Flask class runs the application + # on the local development server. + port = WEBHOOK_ENDPOINT.replace("http://webhook:", "") + app.run(host="0.0.0.0", port=port)