zhongrj
2025-11-25 b89962006164a462404b79a738bee8cbb6d7fe7e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from celery import Celery
import os
 
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'webodm.settings')
 
app = Celery('tasks')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.conf.result_backend_transport_options = {
    'retry_policy': {
       'timeout': 5.0
    }
}
 
app.conf.beat_schedule = {
    'update-nodes-info': {
        'task': 'worker.tasks.update_nodes_info',
        'schedule': 30,
        'options': {
            'expires': 14,
            'retry': False
        }
    },
    'cleanup-projects': {
        'task': 'worker.tasks.cleanup_projects',
        'schedule': 60,
        'options': {
            'expires': 29,
            'retry': False
        }
    },
    'cleanup-tasks': {
        'task': 'worker.tasks.cleanup_tasks',
        'schedule': 3600,
        'options': {
            'expires': 1799,
            'retry': False
        }
    },
    'cleanup-tmp-directory': {
        'task': 'worker.tasks.cleanup_tmp_directory',
        'schedule': 3600,
        'options': {
            'expires': 1799,
            'retry': False
        }
    },
    'process-pending-tasks': {
        'task': 'worker.tasks.process_pending_tasks',
        'schedule': 5,
        'options': {
            'expires': 2,
            'retry': False
        }
    },
    'check-quotas': {
        'task': 'worker.tasks.check_quotas',
        'schedule': 3600,
        'options': {
            'expires': 1799,
            'retry': False
        }
    },
}
 
# Mock class for handling async results during testing
class MockAsyncResult:
    def __init__(self, celery_task_id, result = None):
        self.celery_task_id = celery_task_id
        self.state = "PENDING"
 
        if result is None:
            if celery_task_id == 'bogus':
                self.result = None
            else:
                self.result = MockAsyncResult.results.get(celery_task_id)
        else:
            self.result = result
            MockAsyncResult.results[celery_task_id] = result
 
    def get(self):
        return self.result
 
    def ready(self):
        return self.result is not None
 
MockAsyncResult.results = {}
MockAsyncResult.set = lambda cti, r: MockAsyncResult(cti, r)
 
if __name__ == '__main__':
    app.start()