Django 6.0 एक अंतर्निहित पृष्ठभूमि कार्य ढांचे का परिचय देता है django.tasks. लेकिन सेलेरी, ह्युई या अन्य पसंदीदा समाधानों को अभी चरणबद्ध तरीके से ख़त्म करने की उम्मीद न करें।
इस पर रिलीज़ नोट बिल्कुल स्पष्ट हैं:
Django कार्य निर्माण और कतारबद्धता को संभालता है, लेकिन कार्यों को चलाने के लिए कोई कार्यकर्ता तंत्र प्रदान नहीं करता है।
निष्पादन को बाहरी बुनियादी ढांचे, जैसे एक अलग प्रक्रिया या सेवा द्वारा प्रबंधित किया जाना चाहिए।
नये का मुख्य उद्देश्य django.tasks मॉड्यूल है कार्य कतारों के लिए एक सामान्य एपीआई प्रदान करें कार्यान्वयन. इस वृद्धि के पीछे जेक हावर्ड प्रेरक शक्ति है। Django फोरम पर परिचय देखें।
उनका संदर्भ कार्यान्वयन, और साथ ही Django के पुराने संस्करणों के लिए एक बैकपोर्ट, के रूप में उपलब्ध है django-tasks GitHub पर.
लेकिन आइए इसे अनदेखा करें और इसके बजाय Django 6.0 में शामिल अधिक न्यूनतम संस्करण के साथ खेलें। अपना स्वयं का बैकएंड और कार्यकर्ता बनाकर।
हमारा प्रोजेक्ट: सूचनाएं
हम ntfy.sh का उपयोग करके फ़ोन और अन्य उपकरणों पर सूचनाएं भेजने के लिए एक ऐप बनाने जा रहे हैं। (मैं एक प्रशंसक हूँ!)
यदि आप स्वयं कोड में गोता लगाना पसंद करते हैं, तो GitHub पर प्रोजेक्ट का अंतिम संस्करण देखें।
एनएफटीवाई का उपयोग करके आपके फोन पर अधिसूचना भेजने के लिए बस इतना ही आवश्यक है:
- एक खाते के लिए पंजीकरण करें
- एक विषय बनाएं.
- अपने फोन में ऐप इंस्टॉल करें और लॉग इन करें।
- HTTP अनुरोध भेजें
https://ntfy.sh/
मुफ़्त संस्करण केवल सार्वजनिक विषय और संदेश प्रदान करता है। मतलब कि आप जो सामान भेज रहे हैं उसे कोई भी देख सकता है अगर वे विषय की सदस्यता लेते हैं। अपने उद्देश्य के लिए हम यूयूआईडी जैसे यादृच्छिक नाम से एक विषय बना सकते हैं।
प्रोजेक्ट की सेटिंग्स चरण 4 से यूआरएल को पर्यावरण चर के रूप में आपूर्ति किए जाने की अपेक्षा करती है। उदाहरण के लिए:
NTFY_URL=https://ntfy.sh/062519693d9c4913826f0a39aeea8a4c
यहां हमारा कार्य है जो भारी सामान उठाता है:
import httpx
from django.conf import settings
def send_notification(message: str, title: str | None):
# Pass the title if specified.
headers = {"title": title} if title else {}
httpx.post(
settings.NTFY_URL,
content=message,
headers=headers,
)
वास्तव में। सूचनाएं भेजना और प्राप्त करना शुरू करने के लिए बस इतना ही है।

एक त्वरित प्राइमर
आपको वास्तव में विवरण के लिए टास्क फ्रेमवर्क पर Django दस्तावेज़ पर एक नज़र डालनी चाहिए, लेकिन हम आपका थोड़ा समय बचाएंगे और एक त्वरित प्राइमर देंगे।
किसी कार्य को परिभाषित करना
यह नए ढांचे का मुख्य लक्ष्य है: कार्य कतार विशिष्ट डेकोरेटर या अन्य तरीकों का उपयोग करने के बजाय Django के मानक एपीआई का उपयोग करके कार्यों को परिभाषित करना।
तो यहाँ यह होता है:
# ...
from django.tasks import task
@task
def send_notification(message: str, title: str | None):
# ...as before
हमारा कार्य अब एक कार्य है। वास्तव में यह एक है django.tasks.Task.
आप कॉल नहीं कर सकते send_notification अब सीधे तौर पर. का उपयोग करके ही कार्य चलाये जा सकते हैं enqueue तरीका। हो सकता है कि यह वह व्यवहार न हो जिसकी आप अपेक्षा करते हैं या चाहते हैं, लेकिन यह सबसे अच्छा विकल्प प्रतीत होता है। यह डिज़ाइन पृष्ठभूमि के बजाय प्रक्रिया में किसी कार्य को गलती से शुरू करने की संभावना को समाप्त कर देता है।
task डेकोरेटर आपको कार्य की प्राथमिकता, कतार का नाम और बैकएंड नाम निर्दिष्ट करने की अनुमति देता है। आप इन सेटिंग्स को इसके साथ ओवरराइड कर सकते हैं using विधि, जो एक नया लौटाती है django.tasks.Task उदाहरण।
यदि आपको कार्य व्यवहार पर अधिक नियंत्रण की आवश्यकता है, तो आप सेट कर सकते हैं takes_context को True डेकोरेटर में और जोड़ें context पहले तर्क के रूप में. यह संदर्भ वर्तमान में आपको कार्य परिणाम तक पहुंच प्रदान करता है और इस प्रकार प्रयासों की संख्या जैसी उपयोगी जानकारी प्रदान करता है।
पुनर्प्रयास और बैकऑफ़, या अन्य फैंसी चीज़ों को परिभाषित करने का कोई तरीका नहीं है जिनकी आप पूर्ण विकसित कार्य कतार कार्यान्वयन से अपेक्षा कर सकते हैं। लेकिन है कि नहीं यह क्या है। यदि आवश्यक हो तो कार्य संदर्भ का निरीक्षण करके आप आसानी से अपना स्वयं का पुनः प्रयास तर्क जोड़ सकते हैं।
किसी कार्य को कतारबद्ध करना
किसी कार्य को कतार में जोड़ना आसान है:
task_result = send_notification.enqueue(
message="Season's greeting!",
title="Santa has something to tell you"
)
किसी कार्य का निष्पादन करना
यहीं से चीजें कम होने लगती हैं। कम से कम अभी. Django 6.0 के साथ भेजा जाएगा ImmediateBackend और यह DummyBackend. पहला कार्य को तुरंत निष्पादित करेगा, जबकि बाद वाला कार्य को बिल्कुल भी निष्पादित नहीं करेगा।
यही कारण है कि हमारी परियोजना में डेटाबेस और एक कार्यकर्ता प्रक्रिया द्वारा समर्थित एक (डेमो) बैकएंड शामिल है!
परिणाम लाया जा रहा है
यदि आप परिणाम के लिए इंतजार नहीं करना चाहते हैं, तो आप बाद में इसकी आईडी का उपयोग करके इसे प्राप्त कर सकते हैं। बस कॉल करें get_result(result_id) आपके कार्य पर.
हमारे प्रोजेक्ट में एक ऐसा दृश्य शामिल है जिसका htmx का उपयोग करके उत्कृष्ट परिणामों के लिए समय-समय पर सर्वेक्षण किया जाता है।

फ़ॉर्म के नीचे की सूची हमारे प्रत्येक कार्य के निष्पादन के परिणाम दिखाती है। जब फॉर्म सबमिट हो जाता है, तो सूची के शीर्ष पर एक नया परिणाम जोड़ा जाता है। Htmx को निर्देश दिया गया है कि जब तक परिणाम की स्थिति ठीक न हो, तब तक परिवर्तनों के लिए मतदान जारी रखें FAILED या SUCCESSFUL.
def task_result(request, result_id, status):
result = send_notification.get_result(result_id)
if result.status == status:
# No need to swap the result.
return HttpResponse(status=204)
return TemplateResponse(request, "index.html#result", {"result": result})
आश्चर्य है क्या? index.html#results कर रही है? Django 6.0 टेम्पलेट आंशिक भाग भी प्रस्तुत करता है। इस मामले में हमारा दृष्टिकोण प्रभावी रूप से एक प्रतिक्रिया भेजता है जिसमें केवल आंशिक नाम वाला टेम्पलेट होता है result.
पर्दे के पीछे
जब आप किसी कॉल करने योग्य को सजाते हैं taskकॉन्फ़िगर किया गया बैकएंड task_class कॉल करने योग्य को लपेटने के लिए उपयोग किया जाता है। डिफॉल्ट का django.task.Task.
वह क्लास का enqueue विधि बदले में कॉन्फ़िगर किए गए बैकएंड को लागू करेगी enqueue तरीका।
इसे बुला रहा है get_result विधि समान है: कॉन्फ़िगर किए गए बैकएंड को कॉल करें get_result विधि और परिणाम को आगे बढ़ाएँ।
चूँकि वहाँ कोई कर्मचारी नहीं है, मूल रूप से बैकएंड को बस इतना ही कार्य उपलब्ध कराने की आवश्यकता है। ठंडा। आइए एक जोड़ें, क्या हम?
एक कार्य डेटाबेस बैकएंड
हमारे लक्ष्य:
- हमारे डेटाबेस द्वारा समर्थित एक बुनियादी कार्य बैकएंड।
- हम “स्वचालित” पुनर्प्रयासों का समर्थन करना चाहते हैं
हमारा enqueue और get_result विधियाँ डिफ़ॉल्ट का एक उदाहरण लौटाएंगी django.tasks.TaskResult. यह डेटा की न्यूनतम मात्रा निर्धारित करता है जिसे हमें संग्रहीत करने की आवश्यकता है, और हम इसे एक मॉडल में करने जा रहे हैं जिसे कहा जाता है Task.
मॉडल
आइए अपना पहला ड्राफ्ट बनाएं Task मॉडल, के गुणों पर आधारित है TaskResult और Task में django.tasks (“डेटाक्लासेस”):
class Task(models.Model):
priority = models.IntegerField(default=0)
callable_path = models.CharField(max_length=255)
backend = models.CharField(max_length=200)
queue_name = models.CharField(max_length=100)
run_after = models.DateTimeField(null=True, blank=True)
takes_context = models.BooleanField(default=False)
# Stores args and kwargs
arguments = models.JSONField(null=True, blank=True)
status = models.CharField(
choices=TaskResultStatus.choices, max_length=10, default=TaskResultStatus.READY
)
enqueued_at = models.DateTimeField()
started_at = models.DateTimeField(blank=True, null=True)
finished_at = models.DateTimeField(blank=True, null=True)
last_attempted_at = models.DateTimeField(blank=True, null=True)
return_value = models.JSONField(null=True, blank=True)
क्या नहीं हैं? एक के लिए, TaskResult इसमें सामने आई त्रुटियों की सूची और कार्य को संसाधित करने वाले श्रमिकों की आईडी भी शामिल है। कुछ ऐसा जो हम कर सकते थे शायद अनदेखा करना।
सिवाय TaskResult.attempts संपत्ति कार्यकर्ता आईडी की संख्या पर आधारित है। और यदि आप किसी कार्य के भीतर कार्य संदर्भ का उपयोग कर रहे हैं, तो आप निश्चित रूप से उस प्रकार की जानकारी पर निर्भर रहेंगे।
हम इन विवरणों को इसमें जोड़ सकते हैं Task एक जोड़कर मॉडल JSONField प्रत्येक के लिए। संदर्भ कार्यान्वयन में यह वर्तमान दृष्टिकोण है।
लेकिन आइए अपने दृष्टिकोण को और अधिक स्पष्ट करें और इनके लिए मॉडल भी परिभाषित करें। हम किसी कार्य को निष्पादित करने के प्रत्येक प्रयास और उसकी संभावित त्रुटि को रिकॉर्ड करेंगे, उन्हें एक विदेशी कुंजी के साथ कार्य से जोड़ेंगे:
class Error(models.Model):
exception_class_path = models.TextField()
traceback = models.TextField()
class AttemptResultStatus(TextChoices):
# Subset of TaskResultStatus.
FAILED = TaskResultStatus.FAILED
SUCCESSFUL = TaskResultStatus.SUCCESSFUL
class Attempt(models.Model):
task = models.ForeignKey(Task, related_name="attempts", on_delete=models.CASCADE)
error = models.OneToOneField(
Error, related_name="attempt", on_delete=models.CASCADE, null=True, blank=True
)
worker_id = models.CharField(max_length=MAX_LENGTH_WORKER_ID)
started_at = models.DateTimeField()
stopped_at = models.DateTimeField(blank=True, null=True)
status = models.CharField(
choices=AttemptResultStatus.choices, max_length=10, blank=True
)
यह सेटअप सुनिश्चित करता है कि हमारे पास किसी कार्य को निष्पादित करने के लिए सभी आवश्यक जानकारी है, साथ ही हम हर एक विवरण प्रदान कर सकते हैं TaskResult अनुरोध है.
सब ठीक है, लेकिन हमें कार्यकर्ताओं की आवश्यकताओं के बारे में भी सोचना होगा। इसके लिए सक्षम होना आवश्यक है:
- बकाया कार्यों की तुरंत जांच करें
- उन कार्यों में से किसी एक का दावा करें
- उस कार्य को संसाधित करें और उसे विफल, सफल या तैयार के रूप में चिह्नित करें (बाद में पुनः प्रयास करने के लिए)
जिस तरह से इसे अभी स्थापित किया गया है, हम वह सब कर सकते हैं, लेकिन मैं चीजों को थोड़ा परिष्कृत करना चाहूंगा।
class Task(models.Model):
# ...
# This field is used to keep track of when to run a task (again).
# run_after remains unchanged after enqueueing.
available_after = models.DateTimeField()
# Denormalized count of attempts.
attempt_count = models.IntegerField(default=0)
# Set when a worker starts processing this task.
worker_id = models.CharField(max_length=MAX_LENGTH_WORKER_ID, blank=True)
# ...
available_after फ़ील्ड में वह प्रारंभिक समय शामिल होगा जिस पर कार्य निष्पादित किया जा सकता है। यदि कार्य है run_after निर्दिष्ट किया गया है (जिसे किसी कार्य का उपयोग करके किया जा सकता है… using() तरीका), available_after उस मान पर सेट है. अन्यथा हम वर्तमान दिनांक समय का उपयोग कर रहे हैं; सभी यूटीसी में।
एक बार किसी कार्य को पुनः प्रयास करने की आवश्यकता होती है, available_after कार्य को निष्पादित करने के लिए अगले संभावित समय पर सेट किया जाएगा। दूसरे शब्दों में: हम कर सकते हैं पीछे हटना।
attempt_count फ़ील्ड उपलब्ध कार्यों के लिए क्वेरी करना थोड़ा आसान बनाता है। किसी भी कार्य के साथ attempt_count अधिकतम अनुमत मान से अधिक को नजरअंदाज किया जा सकता है। हाँ, उनकी स्थिति निर्धारित की जानी चाहिए थी FAILED जिसका अर्थ है कि उन्हें डिफ़ॉल्ट रूप से बाहर रखा जाना चाहिए, लेकिन हम कॉन्फ़िगरेशन बदल सकते हैं और प्रयासों की अधिकतम संख्या में बदलाव कर सकते हैं।
worker_id जब कोई कार्यकर्ता किसी कार्य का दावा करता है तो फ़ील्ड भर जाती है। यह, अन्य बातों के अलावा, किसी अन्य कर्मचारी को कार्य लेने से रोकता है। यह मानते हुए कि कार्यकर्ता आईडी अद्वितीय है।
कतारबद्ध करना और परिणाम प्राप्त करना
किसी कार्य को कतारबद्ध करना इससे आसान नहीं हो सकता: एक बनाएं Task मॉडल उदाहरण से Task डेटाक्लास उदाहरणइसे सहेजें, हो गया! ठीक है, कम से कम अंतिम परिणाम को एक में बदलने के बाद TaskResult.
हम परिणाम की आईडी के रूप में मॉडल के डेटाबेस आईडी के स्ट्रिंग संस्करण का उपयोग करते हैं।
परिणाम प्राप्त करना इसी तरह कार्य और उसके प्रयासों को लोड करने और उसे एक में बदलने का मामला है TaskResult.
यहां हमारे कार्य बैकएंड का सरलीकृत संस्करण दिया गया है:
class DatabaseBackend(BaseTaskBackend):
supports_defer = True
supports_async_task = False
supports_get_result = True
supports_priority = True
def enqueue(self, task: Task, args, kwargs):
self.validate_task(task)
model = self.queue_store.enqueue(task, args, kwargs)
task_result = TaskResult(
task=task,
id=str(model.pk),
# ...
# More properties being set
# ...
)
return task_result
def get_result(self, result_id):
return self.model_to_result(
self.queue_store.get(result_id)
)
def model_to_result(self, model: models.Task) -> TaskResult:
...
बहुत सी कार्यक्षमताएँ इसके लिए स्थगित कर दी गई हैं queue_store संपत्ति। इससे पहले कि हम इसमें उतरें, हम इस बैकएंड के लिए कॉन्फ़िगरेशन विकल्पों की व्याख्या करेंगे।
विन्यास
हम इसके लिए डिफ़ॉल्ट निर्दिष्ट करने में सक्षम होना चाहते हैं:
- प्रयासों की अधिकतम संख्या (पुनः प्रयास)
- बैकऑफ़ कारक; यानी हम प्रयोग से पीछे हट जाएंगे
math.pow(factor, attempts)
इन्हें प्रत्येक व्यक्तिगत कतार के लिए अनुकूलित किया जा सकता है। तो अंत में हम अपने अंदर कुछ ऐसा ही पाते हैं OPTIONS:
TASKS = {
"default": {
"BACKEND": "messagecenter.dbtasks.backend.DatabaseBackend",
"OPTIONS": {
"queues": {
"low_priority": {
"max_attempts": 5,
}
},
"max_attempts": 10,
"backoff_factor": 3,
"purge": {"finished": "10 days", "unfinished": "20 days"},
},
}
}
में एक कार्य जोड़ा गया low_priority बैकऑफ़ कारक के साथ, कतार में पाँच बार तक प्रयास किया जाएगा 3. अन्य कार्यों को समान बैकऑफ़ कारक के साथ दस बार तक करने का प्रयास किया जाएगा।
कतार की दुकान
QueueStore क्लास हमारे बैकएंड का साथी है। इसका ध्यान कार्यों को पुनः प्राप्त करने और कतारबद्ध करने, निष्पादित करने के लिए कार्यों की जाँच करने और कार्यों का दावा करने पर है।
हालाँकि इसे शामिल करने का मुख्य कारण कार्यकर्ता को सरल बनाना है। जैसा कि हम देखेंगे, कार्यकर्ता को कतार स्टोर की अपनी प्रति मिल जाती है, इसे संसाधित करने के लिए आवश्यक कतारों तक ही सीमित है।
कार्यकर्ता
कार्यकर्ता का काम, कम से कम इस परियोजना में, धावक को उत्कृष्ट कार्यों के बारे में जानकारी प्रदान करना और उन कार्यों के प्रसंस्करण को आगे बढ़ाना है बैकएंड द्वारा. इसका मतलब यह है कि यह इस तरह दिखता है:
class Worker:
def __init__(
self,
id_: str | None,
backend_name: str,
only: set[str] | None,
excluding: set[str] | None,
):
# Grab the backend and its queue_store.
self.backend = task_backends[backend_name]
queue_store: QueueStore = self.backend.queue_store
# Limit the queue_store to the select queues.
if only or excluding:
queue_store = queue_store.subset(only=only, excluding=excluding)
self.queue_store = queue_store
# Use or create and id. "Must" be unique.
self.id = (
id_ if id_ else create_id(backend_name, queues=queue_store.queue_names)
)
def has_more(self) -> bool:
return self.queue_store.has_more()
def process(self):
with transaction.atomic():
tm = self.queue_store.claim_first_available(worker_id=self.id)
if tm is not None:
self.backend.process_task(tm)
एक कार्यशील कार्यकर्ता धावक के लिए हमें बस इतना करना होगा:
- कार्यकर्ता का एक उदाहरण बनाएं.
- इससे पूछें कि क्या इसका उपयोग करके निष्पादित करने के लिए कोई कार्य हैं
has_more. - यदि हां: तो इसे बताएं
processपहला उपलब्ध कार्य. यदि नहीं: 4 पर जाएँ. - रुको, फिर 2 पर वापस आओ।
वही हमारा है dbtasks_worker आदेश करता है.
किसी कार्य का दावा करना
हमारा क्यू स्टोर एक प्रदान करता है peek वह विधि जो हमारी कतारों में कार्य की आईडी को सबसे अधिक तत्परता से लौटाती है; का एक संयोजन available_after, priority और attempt_count.
इससे धावक को पता चलता है कि प्रक्रिया करने के लिए और भी कार्य हैं या नहीं। अगला कदम उन कार्यों में से किसी एक पर दावा करना है। तो हम बुलाते हैं peek दोबारा और यदि यह एक कार्य आईडी लौटाता है, तो हम उस विशेष कार्य पर दावा करने का प्रयास करेंगे।
यहां हमारे प्रोजेक्ट में शामिल संस्करण से अधिक बुनियादी, स्पष्ट संस्करण है QueueStore:
def claim_first_available(
self, worker_id: str, attempts: int = 3
) -> models.Task | None:
qs = models.Task.filter(
worker_id="",
status=TaskResultStatus.READY,
)
for _ in range(attempts):
task_id = self.peek()
if not task_id:
return None
count = qs.filter(pk=task_id).update(
worker_id=self.id_,
status=TaskResultStatus.RUNNING,
)
if count:
return models.Task.objects.get(pk=task_id)
return None
यदि count शून्य है, हम कार्य का दावा करने में विफल रहे। अन्यथा हम इसे डेटाबेस से पुनर्प्राप्त करते हैं और प्रसंस्करण शुरू कर सकते हैं।
लूप शामिल है क्योंकि हम पहचाने गए कार्य का दावा करने का प्रयास करने के बाद यहां समाप्त हुए peek. जिसे जाहिरा तौर पर पहले ही किसी अन्य कर्मचारी ने उठा लिया है। हम इसका अधिकतम लाभ उठा सकते हैं और कतार से कोई अन्य कार्य प्राप्त करने का प्रयास कर सकते हैं।
कार्य संसाधित किया जा रहा है
और अंततः वह चीज़ जो वास्तव में कुछ करती है!
process_task हमारे बैकएंड की विधि:
- एक बनाता है
Attemptऔर धारा का निर्माण करता हैTaskResult. - किसी भी विस्तारित चीज़ को कैप्चर करते हुए, कार्य निष्पादित करता है
BaseExceptionया वापस कर रहा हूँreturn_valueकार्य का जब सब कुछ योजना के अनुसार हुआ। - या तो अद्यतन करता है
Taskमॉडल,AttemptऔरTaskResultसफल निष्पादन के अंतिम विवरण के साथ, या ऐसा करने में विफलता के विवरण के साथ। - और बाद वाले मामले में: जांचें कि क्या कार्य का पुनः प्रयास किया जा सकता है।
दोबारा: यदि आप विवरणों में गोता लगाना चाहते हैं, तो रिपॉजिटरी पर एक नज़र डालें।
इतना ही
निःसंदेह यह डेमो प्रोजेक्ट उन सभी चीजों को छोड़ देता है जिनके बारे में आपको वास्तव में गंभीरता से सोचने की आवश्यकता है। कार्यकर्ता के लिए संकेत की तरह. या डेटाबेस लेनदेन तर्क। इसका मतलब यह नहीं है कि यह असंभव है। से बहुत दूर। यह इस लेख का लक्ष्य ही नहीं था.
Django में इस कार्यक्षमता को शामिल करने से निश्चित रूप से मौजूदा कार्य कतारों के लिए नई लाइब्रेरी या एडाप्टर को पॉप अप करने की अनुमति मिल जाएगी। और हम संभवतः जल्द ही कुछ शिकायतें देखेंगे django.tasks पर्याप्त व्यापक नहीं है.
क्योंकि, यदि आप वर्तमान में अपने कार्य कतार की अधिक उन्नत कार्यक्षमता का उपयोग कर रहे हैं, तो संभवतः कुछ चीजें हैं जो आप गायब हैं django.tasks.
जटिल ऑर्केस्ट्रेशन
सेलेरी जैसी कुछ कार्य कतार लाइब्रेरी, कार्यों के संयोजन के तरीके प्रदान करती हैं। आप एक कार्य के परिणाम को दूसरे में फ़ीड कर सकते हैं, एक सूची में प्रत्येक आइटम के लिए कार्यों को सूचीबद्ध कर सकते हैं, इत्यादि।
अब तक यह स्पष्ट हो जाना चाहिए कि इस प्रकार के आयोजन का समर्थन करना लक्ष्य नहीं है django.tasks. और मुझे बिल्कुल भी आपत्ति नहीं है. इसका समर्थन करने के लिए एकीकृत एपीआई बनाने का कोई व्यवहार्य तरीका नहीं है। मुझे पुस्तकालयों के साथ अपनी समस्याओं का सामना करना पड़ा है करना इसका समर्थन करने का दावा करें.
पुन: प्रयास करें
जैसा कि पहले उल्लेख किया गया है, वर्तमान में किसी विफल कार्य को स्वचालित रूप से पुनः प्रयास करने का कोई तरीका नहीं है, जब तक कि आपका बैकएंड भारी भारोत्तोलन न करे। जैसे हमारा करता है.
आपके बैकएंड के आधार पर इसे स्वयं संभालना काफी आसान हो सकता है। उदाहरण के लिए डेकोरेटर का उपयोग करना:
def retry(func):
@functools.wraps(func)
def wrapper(context: TaskContext, *args, **kwargs):
try:
return func(context, *args, **kwargs)
except BaseException as e:
result = context.task_result
backoff = math.pow(2, result.attempts)
run_after = datetime.now(tz=UTC) + timedelta(seconds=backoff)
result.task.using(run_after=run_after).enqueue(*args, **kwargs)
raise e
return wrapper
@task(takes_context=True)
@retry
def send_email(context: TaskContext, to: str, subject: str, body: str):
# Do your thing
...
एक वास्तविक कार्यकर्ता तंत्र
सत्य। लेकिन संदर्भ कार्यान्वयन वास्तविक श्रमिक प्रदान करता है। धैर्य रखें, या इससे भी बेहतर: मदद करना शुरू करें!
कोई सटीक समाधान नहीं है
मेरा मानना है django.tasks जल्द ही कम से कम 80% सबसे आम उपयोग के मामलों को कवर करने में परिणाम मिलेगा। हां, इसकी एपीआई सरल और सीमित है, लेकिन मेरे लिए यह गलती से ज्यादा फायदा है। मुझे लगता है कि यह मानकीकृत दृष्टिकोण के उतना करीब है जितना आप पा सकते हैं।
<a href