A first look at Django’s new background tasks

Django 6.0 एक अंतर्निहित पृष्ठभूमि कार्य ढांचे का परिचय देता है django.tasks. लेकिन सेलेरी, ह्युई या अन्य पसंदीदा समाधानों को अभी चरणबद्ध तरीके से ख़त्म करने की उम्मीद न करें।

इस पर रिलीज़ नोट बिल्कुल स्पष्ट हैं:

Django कार्य निर्माण और कतारबद्धता को संभालता है, लेकिन कार्यों को चलाने के लिए कोई कार्यकर्ता तंत्र प्रदान नहीं करता है।
निष्पादन को बाहरी बुनियादी ढांचे, जैसे एक अलग प्रक्रिया या सेवा द्वारा प्रबंधित किया जाना चाहिए।

नये का मुख्य उद्देश्य django.tasks मॉड्यूल है कार्य कतारों के लिए एक सामान्य एपीआई प्रदान करें कार्यान्वयन. इस वृद्धि के पीछे जेक हावर्ड प्रेरक शक्ति है। Django फोरम पर परिचय देखें।

उनका संदर्भ कार्यान्वयन, और साथ ही Django के पुराने संस्करणों के लिए एक बैकपोर्ट, के रूप में उपलब्ध है django-tasks GitHub पर.

लेकिन आइए इसे अनदेखा करें और इसके बजाय Django 6.0 में शामिल अधिक न्यूनतम संस्करण के साथ खेलें। अपना स्वयं का बैकएंड और कार्यकर्ता बनाकर।

हमारा प्रोजेक्ट: सूचनाएं

हम ntfy.sh का उपयोग करके फ़ोन और अन्य उपकरणों पर सूचनाएं भेजने के लिए एक ऐप बनाने जा रहे हैं। (मैं एक प्रशंसक हूँ!)

यदि आप स्वयं कोड में गोता लगाना पसंद करते हैं, तो GitHub पर प्रोजेक्ट का अंतिम संस्करण देखें।

एनएफटीवाई का उपयोग करके आपके फोन पर अधिसूचना भेजने के लिए बस इतना ही आवश्यक है:

  1. एक खाते के लिए पंजीकरण करें
  2. एक विषय बनाएं.
  3. अपने फोन में ऐप इंस्टॉल करें और लॉग इन करें।
  4. HTTP अनुरोध भेजें https://ntfy.sh/

मुफ़्त संस्करण केवल सार्वजनिक विषय और संदेश प्रदान करता है। मतलब कि आप जो सामान भेज रहे हैं उसे कोई भी देख सकता है अगर वे विषय की सदस्यता लेते हैं। अपने उद्देश्य के लिए हम यूयूआईडी जैसे यादृच्छिक नाम से एक विषय बना सकते हैं।

प्रोजेक्ट की सेटिंग्स चरण 4 से यूआरएल को पर्यावरण चर के रूप में आपूर्ति किए जाने की अपेक्षा करती है। उदाहरण के लिए:

NTFY_URL=https://ntfy.sh/062519693d9c4913826f0a39aeea8a4c

यहां हमारा कार्य है जो भारी सामान उठाता है:

import httpx
from django.conf import settings

def send_notification(message: str, title: str | None):
    # Pass the title if specified.
    headers = {"title": title} if title else {}
    httpx.post(
        settings.NTFY_URL,
        content=message,
        headers=headers,
    )

वास्तव में। सूचनाएं भेजना और प्राप्त करना शुरू करने के लिए बस इतना ही है।

एनटीएफवाई सूचनाएं

एक त्वरित प्राइमर

आपको वास्तव में विवरण के लिए टास्क फ्रेमवर्क पर Django दस्तावेज़ पर एक नज़र डालनी चाहिए, लेकिन हम आपका थोड़ा समय बचाएंगे और एक त्वरित प्राइमर देंगे।

किसी कार्य को परिभाषित करना

यह नए ढांचे का मुख्य लक्ष्य है: कार्य कतार विशिष्ट डेकोरेटर या अन्य तरीकों का उपयोग करने के बजाय Django के मानक एपीआई का उपयोग करके कार्यों को परिभाषित करना।

तो यहाँ यह होता है:

# ...
from django.tasks import task

@task
def send_notification(message: str, title: str | None):
    # ...as before

हमारा कार्य अब एक कार्य है। वास्तव में यह एक है django.tasks.Task.

आप कॉल नहीं कर सकते send_notification अब सीधे तौर पर. का उपयोग करके ही कार्य चलाये जा सकते हैं enqueue तरीका। हो सकता है कि यह वह व्यवहार न हो जिसकी आप अपेक्षा करते हैं या चाहते हैं, लेकिन यह सबसे अच्छा विकल्प प्रतीत होता है। यह डिज़ाइन पृष्ठभूमि के बजाय प्रक्रिया में किसी कार्य को गलती से शुरू करने की संभावना को समाप्त कर देता है।

task डेकोरेटर आपको कार्य की प्राथमिकता, कतार का नाम और बैकएंड नाम निर्दिष्ट करने की अनुमति देता है। आप इन सेटिंग्स को इसके साथ ओवरराइड कर सकते हैं using विधि, जो एक नया लौटाती है django.tasks.Task उदाहरण।

यदि आपको कार्य व्यवहार पर अधिक नियंत्रण की आवश्यकता है, तो आप सेट कर सकते हैं takes_context को True डेकोरेटर में और जोड़ें context पहले तर्क के रूप में. यह संदर्भ वर्तमान में आपको कार्य परिणाम तक पहुंच प्रदान करता है और इस प्रकार प्रयासों की संख्या जैसी उपयोगी जानकारी प्रदान करता है।

पुनर्प्रयास और बैकऑफ़, या अन्य फैंसी चीज़ों को परिभाषित करने का कोई तरीका नहीं है जिनकी आप पूर्ण विकसित कार्य कतार कार्यान्वयन से अपेक्षा कर सकते हैं। लेकिन है कि नहीं यह क्या है। यदि आवश्यक हो तो कार्य संदर्भ का निरीक्षण करके आप आसानी से अपना स्वयं का पुनः प्रयास तर्क जोड़ सकते हैं।

किसी कार्य को कतारबद्ध करना

किसी कार्य को कतार में जोड़ना आसान है:

task_result = send_notification.enqueue(
    message="Season's greeting!", 
    title="Santa has something to tell you"
)

किसी कार्य का निष्पादन करना

यहीं से चीजें कम होने लगती हैं। कम से कम अभी. Django 6.0 के साथ भेजा जाएगा ImmediateBackend और यह DummyBackend. पहला कार्य को तुरंत निष्पादित करेगा, जबकि बाद वाला कार्य को बिल्कुल भी निष्पादित नहीं करेगा।

यही कारण है कि हमारी परियोजना में डेटाबेस और एक कार्यकर्ता प्रक्रिया द्वारा समर्थित एक (डेमो) बैकएंड शामिल है!

परिणाम लाया जा रहा है

यदि आप परिणाम के लिए इंतजार नहीं करना चाहते हैं, तो आप बाद में इसकी आईडी का उपयोग करके इसे प्राप्त कर सकते हैं। बस कॉल करें get_result(result_id) आपके कार्य पर.

हमारे प्रोजेक्ट में एक ऐसा दृश्य शामिल है जिसका htmx का उपयोग करके उत्कृष्ट परिणामों के लिए समय-समय पर सर्वेक्षण किया जाता है।

प्रोजेक्ट यूआई

फ़ॉर्म के नीचे की सूची हमारे प्रत्येक कार्य के निष्पादन के परिणाम दिखाती है। जब फॉर्म सबमिट हो जाता है, तो सूची के शीर्ष पर एक नया परिणाम जोड़ा जाता है। Htmx को निर्देश दिया गया है कि जब तक परिणाम की स्थिति ठीक न हो, तब तक परिवर्तनों के लिए मतदान जारी रखें FAILED या SUCCESSFUL.

def task_result(request, result_id, status):
    result = send_notification.get_result(result_id)
    if result.status == status:
        # No need to swap the result.
        return HttpResponse(status=204)
    return TemplateResponse(request, "index.html#result", {"result": result})

आश्चर्य है क्या? index.html#results कर रही है? Django 6.0 टेम्पलेट आंशिक भाग भी प्रस्तुत करता है। इस मामले में हमारा दृष्टिकोण प्रभावी रूप से एक प्रतिक्रिया भेजता है जिसमें केवल आंशिक नाम वाला टेम्पलेट होता है result.

पर्दे के पीछे

जब आप किसी कॉल करने योग्य को सजाते हैं taskकॉन्फ़िगर किया गया बैकएंड task_class कॉल करने योग्य को लपेटने के लिए उपयोग किया जाता है। डिफॉल्ट का django.task.Task.

वह क्लास का enqueue विधि बदले में कॉन्फ़िगर किए गए बैकएंड को लागू करेगी enqueue तरीका।

इसे बुला रहा है get_result विधि समान है: कॉन्फ़िगर किए गए बैकएंड को कॉल करें get_result विधि और परिणाम को आगे बढ़ाएँ।

चूँकि वहाँ कोई कर्मचारी नहीं है, मूल रूप से बैकएंड को बस इतना ही कार्य उपलब्ध कराने की आवश्यकता है। ठंडा। आइए एक जोड़ें, क्या हम?

एक कार्य डेटाबेस बैकएंड

हमारे लक्ष्य:

  • हमारे डेटाबेस द्वारा समर्थित एक बुनियादी कार्य बैकएंड।
  • हम “स्वचालित” पुनर्प्रयासों का समर्थन करना चाहते हैं

हमारा enqueue और get_result विधियाँ डिफ़ॉल्ट का एक उदाहरण लौटाएंगी django.tasks.TaskResult. यह डेटा की न्यूनतम मात्रा निर्धारित करता है जिसे हमें संग्रहीत करने की आवश्यकता है, और हम इसे एक मॉडल में करने जा रहे हैं जिसे कहा जाता है Task.

मॉडल

आइए अपना पहला ड्राफ्ट बनाएं Task मॉडल, के गुणों पर आधारित है TaskResult और Task में django.tasks (“डेटाक्लासेस”):

class Task(models.Model):
    priority = models.IntegerField(default=0)
    callable_path = models.CharField(max_length=255)
    backend = models.CharField(max_length=200)
    queue_name = models.CharField(max_length=100)
    run_after = models.DateTimeField(null=True, blank=True)
    takes_context = models.BooleanField(default=False)
    # Stores args and kwargs
    arguments = models.JSONField(null=True, blank=True)
    status = models.CharField(
        choices=TaskResultStatus.choices, max_length=10, default=TaskResultStatus.READY
    )
    enqueued_at = models.DateTimeField()
    started_at = models.DateTimeField(blank=True, null=True)
    finished_at = models.DateTimeField(blank=True, null=True)
    last_attempted_at = models.DateTimeField(blank=True, null=True)
    return_value = models.JSONField(null=True, blank=True)

क्या नहीं हैं? एक के लिए, TaskResult इसमें सामने आई त्रुटियों की सूची और कार्य को संसाधित करने वाले श्रमिकों की आईडी भी शामिल है। कुछ ऐसा जो हम कर सकते थे शायद अनदेखा करना।

सिवाय TaskResult.attempts संपत्ति कार्यकर्ता आईडी की संख्या पर आधारित है। और यदि आप किसी कार्य के भीतर कार्य संदर्भ का उपयोग कर रहे हैं, तो आप निश्चित रूप से उस प्रकार की जानकारी पर निर्भर रहेंगे।

हम इन विवरणों को इसमें जोड़ सकते हैं Task एक जोड़कर मॉडल JSONField प्रत्येक के लिए। संदर्भ कार्यान्वयन में यह वर्तमान दृष्टिकोण है।

लेकिन आइए अपने दृष्टिकोण को और अधिक स्पष्ट करें और इनके लिए मॉडल भी परिभाषित करें। हम किसी कार्य को निष्पादित करने के प्रत्येक प्रयास और उसकी संभावित त्रुटि को रिकॉर्ड करेंगे, उन्हें एक विदेशी कुंजी के साथ कार्य से जोड़ेंगे:

class Error(models.Model):
    exception_class_path = models.TextField()
    traceback = models.TextField()

class AttemptResultStatus(TextChoices):
    # Subset of TaskResultStatus.
    FAILED = TaskResultStatus.FAILED
    SUCCESSFUL = TaskResultStatus.SUCCESSFUL

class Attempt(models.Model):
    task = models.ForeignKey(Task, related_name="attempts", on_delete=models.CASCADE)
    error = models.OneToOneField(
        Error, related_name="attempt", on_delete=models.CASCADE, null=True, blank=True
    )
    worker_id = models.CharField(max_length=MAX_LENGTH_WORKER_ID)
    started_at = models.DateTimeField()
    stopped_at = models.DateTimeField(blank=True, null=True)
    status = models.CharField(
        choices=AttemptResultStatus.choices, max_length=10, blank=True
    )

यह सेटअप सुनिश्चित करता है कि हमारे पास किसी कार्य को निष्पादित करने के लिए सभी आवश्यक जानकारी है, साथ ही हम हर एक विवरण प्रदान कर सकते हैं TaskResult अनुरोध है.

सब ठीक है, लेकिन हमें कार्यकर्ताओं की आवश्यकताओं के बारे में भी सोचना होगा। इसके लिए सक्षम होना आवश्यक है:

  1. बकाया कार्यों की तुरंत जांच करें
  2. उन कार्यों में से किसी एक का दावा करें
  3. उस कार्य को संसाधित करें और उसे विफल, सफल या तैयार के रूप में चिह्नित करें (बाद में पुनः प्रयास करने के लिए)

जिस तरह से इसे अभी स्थापित किया गया है, हम वह सब कर सकते हैं, लेकिन मैं चीजों को थोड़ा परिष्कृत करना चाहूंगा।

class Task(models.Model):
    # ...
    # This field is used to keep track of when to run a task (again).
    # run_after remains unchanged after enqueueing.
    available_after = models.DateTimeField()
    # Denormalized count of attempts.
    attempt_count = models.IntegerField(default=0)
    # Set when a worker starts processing this task.
    worker_id = models.CharField(max_length=MAX_LENGTH_WORKER_ID, blank=True)
    # ...

available_after फ़ील्ड में वह प्रारंभिक समय शामिल होगा जिस पर कार्य निष्पादित किया जा सकता है। यदि कार्य है run_after निर्दिष्ट किया गया है (जिसे किसी कार्य का उपयोग करके किया जा सकता है… using() तरीका), available_after उस मान पर सेट है. अन्यथा हम वर्तमान दिनांक समय का उपयोग कर रहे हैं; सभी यूटीसी में।

एक बार किसी कार्य को पुनः प्रयास करने की आवश्यकता होती है, available_after कार्य को निष्पादित करने के लिए अगले संभावित समय पर सेट किया जाएगा। दूसरे शब्दों में: हम कर सकते हैं पीछे हटना।

attempt_count फ़ील्ड उपलब्ध कार्यों के लिए क्वेरी करना थोड़ा आसान बनाता है। किसी भी कार्य के साथ attempt_count अधिकतम अनुमत मान से अधिक को नजरअंदाज किया जा सकता है। हाँ, उनकी स्थिति निर्धारित की जानी चाहिए थी FAILED जिसका अर्थ है कि उन्हें डिफ़ॉल्ट रूप से बाहर रखा जाना चाहिए, लेकिन हम कॉन्फ़िगरेशन बदल सकते हैं और प्रयासों की अधिकतम संख्या में बदलाव कर सकते हैं।

worker_id जब कोई कार्यकर्ता किसी कार्य का दावा करता है तो फ़ील्ड भर जाती है। यह, अन्य बातों के अलावा, किसी अन्य कर्मचारी को कार्य लेने से रोकता है। यह मानते हुए कि कार्यकर्ता आईडी अद्वितीय है।

कतारबद्ध करना और परिणाम प्राप्त करना

किसी कार्य को कतारबद्ध करना इससे आसान नहीं हो सकता: एक बनाएं Task मॉडल उदाहरण से Task डेटाक्लास उदाहरणइसे सहेजें, हो गया! ठीक है, कम से कम अंतिम परिणाम को एक में बदलने के बाद TaskResult.

हम परिणाम की आईडी के रूप में मॉडल के डेटाबेस आईडी के स्ट्रिंग संस्करण का उपयोग करते हैं।

परिणाम प्राप्त करना इसी तरह कार्य और उसके प्रयासों को लोड करने और उसे एक में बदलने का मामला है TaskResult.

यहां हमारे कार्य बैकएंड का सरलीकृत संस्करण दिया गया है:

class DatabaseBackend(BaseTaskBackend):
    supports_defer = True
    supports_async_task = False
    supports_get_result = True
    supports_priority = True

    def enqueue(self, task: Task, args, kwargs):
        self.validate_task(task)
        model = self.queue_store.enqueue(task, args, kwargs)
        task_result = TaskResult(
            task=task,
            id=str(model.pk),
            # ...
            # More properties being set
            # ...
        )
        return task_result

    def get_result(self, result_id):
        return self.model_to_result(
            self.queue_store.get(result_id)
        )

    def model_to_result(self, model: models.Task) -> TaskResult:
        ...

बहुत सी कार्यक्षमताएँ इसके लिए स्थगित कर दी गई हैं queue_store संपत्ति। इससे पहले कि हम इसमें उतरें, हम इस बैकएंड के लिए कॉन्फ़िगरेशन विकल्पों की व्याख्या करेंगे।

विन्यास

हम इसके लिए डिफ़ॉल्ट निर्दिष्ट करने में सक्षम होना चाहते हैं:

  • प्रयासों की अधिकतम संख्या (पुनः प्रयास)
  • बैकऑफ़ कारक; यानी हम प्रयोग से पीछे हट जाएंगे math.pow(factor, attempts)

इन्हें प्रत्येक व्यक्तिगत कतार के लिए अनुकूलित किया जा सकता है। तो अंत में हम अपने अंदर कुछ ऐसा ही पाते हैं OPTIONS:

TASKS = {
    "default": {
        "BACKEND": "messagecenter.dbtasks.backend.DatabaseBackend",
        "OPTIONS": {
            "queues": {
                "low_priority": {
                    "max_attempts": 5,
                }
            },
            "max_attempts": 10,
            "backoff_factor": 3,
            "purge": {"finished": "10 days", "unfinished": "20 days"},
        },
    }
}

में एक कार्य जोड़ा गया low_priority बैकऑफ़ कारक के साथ, कतार में पाँच बार तक प्रयास किया जाएगा 3. अन्य कार्यों को समान बैकऑफ़ कारक के साथ दस बार तक करने का प्रयास किया जाएगा।

कतार की दुकान

QueueStore क्लास हमारे बैकएंड का साथी है। इसका ध्यान कार्यों को पुनः प्राप्त करने और कतारबद्ध करने, निष्पादित करने के लिए कार्यों की जाँच करने और कार्यों का दावा करने पर है।

हालाँकि इसे शामिल करने का मुख्य कारण कार्यकर्ता को सरल बनाना है। जैसा कि हम देखेंगे, कार्यकर्ता को कतार स्टोर की अपनी प्रति मिल जाती है, इसे संसाधित करने के लिए आवश्यक कतारों तक ही सीमित है।

कार्यकर्ता

कार्यकर्ता का काम, कम से कम इस परियोजना में, धावक को उत्कृष्ट कार्यों के बारे में जानकारी प्रदान करना और उन कार्यों के प्रसंस्करण को आगे बढ़ाना है बैकएंड द्वारा. इसका मतलब यह है कि यह इस तरह दिखता है:

class Worker:
    def __init__(
        self,
        id_: str | None,
        backend_name: str,
        only: set[str] | None,
        excluding: set[str] | None,
    ):
        # Grab the backend and its queue_store.
        self.backend = task_backends[backend_name]
        queue_store: QueueStore = self.backend.queue_store
        # Limit the queue_store to the select queues.
        if only or excluding:
            queue_store = queue_store.subset(only=only, excluding=excluding)
        self.queue_store = queue_store
        # Use or create and id. "Must" be unique.
        self.id = (
            id_ if id_ else create_id(backend_name, queues=queue_store.queue_names)
        )

    def has_more(self) -> bool:
        return self.queue_store.has_more()

    def process(self):
        with transaction.atomic():
            tm = self.queue_store.claim_first_available(worker_id=self.id)
        if tm is not None:
            self.backend.process_task(tm)

एक कार्यशील कार्यकर्ता धावक के लिए हमें बस इतना करना होगा:

  1. कार्यकर्ता का एक उदाहरण बनाएं.
  2. इससे पूछें कि क्या इसका उपयोग करके निष्पादित करने के लिए कोई कार्य हैं has_more.
  3. यदि हां: तो इसे बताएं process पहला उपलब्ध कार्य. यदि नहीं: 4 पर जाएँ.
  4. रुको, फिर 2 पर वापस आओ।

वही हमारा है dbtasks_worker आदेश करता है.

किसी कार्य का दावा करना

हमारा क्यू स्टोर एक प्रदान करता है peek वह विधि जो हमारी कतारों में कार्य की आईडी को सबसे अधिक तत्परता से लौटाती है; का एक संयोजन available_after, priority और attempt_count.

इससे धावक को पता चलता है कि प्रक्रिया करने के लिए और भी कार्य हैं या नहीं। अगला कदम उन कार्यों में से किसी एक पर दावा करना है। तो हम बुलाते हैं peek दोबारा और यदि यह एक कार्य आईडी लौटाता है, तो हम उस विशेष कार्य पर दावा करने का प्रयास करेंगे।

यहां हमारे प्रोजेक्ट में शामिल संस्करण से अधिक बुनियादी, स्पष्ट संस्करण है QueueStore:

def claim_first_available(
    self, worker_id: str, attempts: int = 3
) -> models.Task | None:
    qs = models.Task.filter(
        worker_id="", 
        status=TaskResultStatus.READY,
    )
    for _ in range(attempts):
        task_id = self.peek()
        if not task_id:
            return None
        count = qs.filter(pk=task_id).update(
            worker_id=self.id_,
            status=TaskResultStatus.RUNNING,
        )
        if count:
            return models.Task.objects.get(pk=task_id)
    return None

यदि count शून्य है, हम कार्य का दावा करने में विफल रहे। अन्यथा हम इसे डेटाबेस से पुनर्प्राप्त करते हैं और प्रसंस्करण शुरू कर सकते हैं।

लूप शामिल है क्योंकि हम पहचाने गए कार्य का दावा करने का प्रयास करने के बाद यहां समाप्त हुए peek. जिसे जाहिरा तौर पर पहले ही किसी अन्य कर्मचारी ने उठा लिया है। हम इसका अधिकतम लाभ उठा सकते हैं और कतार से कोई अन्य कार्य प्राप्त करने का प्रयास कर सकते हैं।

कार्य संसाधित किया जा रहा है

और अंततः वह चीज़ जो वास्तव में कुछ करती है!

process_task हमारे बैकएंड की विधि:

  1. एक बनाता है Attempt और धारा का निर्माण करता है TaskResult.
  2. किसी भी विस्तारित चीज़ को कैप्चर करते हुए, कार्य निष्पादित करता है BaseExceptionया वापस कर रहा हूँ return_value कार्य का जब सब कुछ योजना के अनुसार हुआ।
  3. या तो अद्यतन करता है Task मॉडल, Attempt और TaskResult सफल निष्पादन के अंतिम विवरण के साथ, या ऐसा करने में विफलता के विवरण के साथ।
  4. और बाद वाले मामले में: जांचें कि क्या कार्य का पुनः प्रयास किया जा सकता है।

दोबारा: यदि आप विवरणों में गोता लगाना चाहते हैं, तो रिपॉजिटरी पर एक नज़र डालें।

इतना ही

निःसंदेह यह डेमो प्रोजेक्ट उन सभी चीजों को छोड़ देता है जिनके बारे में आपको वास्तव में गंभीरता से सोचने की आवश्यकता है। कार्यकर्ता के लिए संकेत की तरह. या डेटाबेस लेनदेन तर्क। इसका मतलब यह नहीं है कि यह असंभव है। से बहुत दूर। यह इस लेख का लक्ष्य ही नहीं था.

Django में इस कार्यक्षमता को शामिल करने से निश्चित रूप से मौजूदा कार्य कतारों के लिए नई लाइब्रेरी या एडाप्टर को पॉप अप करने की अनुमति मिल जाएगी। और हम संभवतः जल्द ही कुछ शिकायतें देखेंगे django.tasks पर्याप्त व्यापक नहीं है.

क्योंकि, यदि आप वर्तमान में अपने कार्य कतार की अधिक उन्नत कार्यक्षमता का उपयोग कर रहे हैं, तो संभवतः कुछ चीजें हैं जो आप गायब हैं django.tasks.

जटिल ऑर्केस्ट्रेशन

सेलेरी जैसी कुछ कार्य कतार लाइब्रेरी, कार्यों के संयोजन के तरीके प्रदान करती हैं। आप एक कार्य के परिणाम को दूसरे में फ़ीड कर सकते हैं, एक सूची में प्रत्येक आइटम के लिए कार्यों को सूचीबद्ध कर सकते हैं, इत्यादि।

अब तक यह स्पष्ट हो जाना चाहिए कि इस प्रकार के आयोजन का समर्थन करना लक्ष्य नहीं है django.tasks. और मुझे बिल्कुल भी आपत्ति नहीं है. इसका समर्थन करने के लिए एकीकृत एपीआई बनाने का कोई व्यवहार्य तरीका नहीं है। मुझे पुस्तकालयों के साथ अपनी समस्याओं का सामना करना पड़ा है करना इसका समर्थन करने का दावा करें.

पुन: प्रयास करें

जैसा कि पहले उल्लेख किया गया है, वर्तमान में किसी विफल कार्य को स्वचालित रूप से पुनः प्रयास करने का कोई तरीका नहीं है, जब तक कि आपका बैकएंड भारी भारोत्तोलन न करे। जैसे हमारा करता है.

आपके बैकएंड के आधार पर इसे स्वयं संभालना काफी आसान हो सकता है। उदाहरण के लिए डेकोरेटर का उपयोग करना:

def retry(func):
    @functools.wraps(func)
    def wrapper(context: TaskContext, *args, **kwargs):
        try:
            return func(context, *args, **kwargs)
        except BaseException as e:
            result = context.task_result
            backoff = math.pow(2, result.attempts)
            run_after = datetime.now(tz=UTC) + timedelta(seconds=backoff)
            result.task.using(run_after=run_after).enqueue(*args, **kwargs)
            raise e
    return wrapper


@task(takes_context=True)
@retry
def send_email(context: TaskContext, to: str, subject: str, body: str):
    # Do your thing 
    ...

एक वास्तविक कार्यकर्ता तंत्र

सत्य। लेकिन संदर्भ कार्यान्वयन वास्तविक श्रमिक प्रदान करता है। धैर्य रखें, या इससे भी बेहतर: मदद करना शुरू करें!

कोई सटीक समाधान नहीं है

मेरा मानना ​​है django.tasks जल्द ही कम से कम 80% सबसे आम उपयोग के मामलों को कवर करने में परिणाम मिलेगा। हां, इसकी एपीआई सरल और सीमित है, लेकिन मेरे लिए यह गलती से ज्यादा फायदा है। मुझे लगता है कि यह मानकीकृत दृष्टिकोण के उतना करीब है जितना आप पा सकते हैं।



<a href

Leave a Comment