zhongrj
2025-11-24 276323dce9613867abb3f58a4cc2abbfb2fd0dea
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
import os
import shutil
import tempfile
import traceback
import json
import socket
 
import time
from threading import Event, Thread
from celery.utils.log import get_task_logger
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Count
from django.db.models import Q
from app.models import Profile
 
from app.models import Project
from app.models import Task
from nodeodm import status_codes
from nodeodm.models import ProcessingNode
from webodm import settings
import worker
from .celery import app
from app.raster_utils import export_raster as export_raster_sync, extension_for_export_format
from app.pointcloud_utils import export_pointcloud as export_pointcloud_sync
from django.utils import timezone
from datetime import timedelta
import redis
 
logger = get_task_logger("app.logger")
redis_client = redis.Redis.from_url(settings.CELERY_BROKER_URL)
 
# What class to use for async results, since during testing we need to mock it
TestSafeAsyncResult = worker.celery.MockAsyncResult if settings.TESTING else app.AsyncResult
 
@app.task(ignore_result=True)
def update_nodes_info():
    if settings.NODE_OPTIMISTIC_MODE:
        return
    
    processing_nodes = ProcessingNode.objects.all()
    for processing_node in processing_nodes:
        processing_node.update_node_info()
 
        # Workaround for mysterious "webodm_node-odm-1" or "webodm-node-odm-1" hostname switcharoo on Mac
        # Technically we already check for the correct hostname during setup, 
        # but sometimes that doesn't work?
        check_hostname = 'webodm_node-odm-1'
        if processing_node.hostname == check_hostname and not processing_node.is_online():
            try:
                socket.gethostbyname(processing_node.hostname)
            except:
                # Hostname was invalid, try renaming
                processing_node.hostname = 'webodm-node-odm-1'
                processing_node.update_node_info()
                if processing_node.is_online():
                    logger.info("Found and fixed webodm_node-odm-1 hostname switcharoo")
                else:
                    processing_node.hostname = check_hostname
                processing_node.save()
 
@app.task(ignore_result=True)
def cleanup_projects():
    # Delete all projects that are marked for deletion
    # and that have no tasks left
    total, count_dict = Project.objects.filter(deleting=True).annotate(
        tasks_count=Count('task')
    ).filter(tasks_count=0).delete()
    if total > 0 and 'app.Project' in count_dict:
        logger.info("Deleted {} projects".format(count_dict['app.Project']))
 
@app.task(ignore_result=True)
def cleanup_tasks():
    # Delete tasks that are older than 
    if settings.CLEANUP_PARTIAL_TASKS is None:
        return
    
    tasks_to_delete = Task.objects.filter(partial=True, created_at__lte=timezone.now() - timedelta(hours=settings.CLEANUP_PARTIAL_TASKS))
    for t in tasks_to_delete:
        logger.info("Cleaning up partial task {}".format(t))
        t.delete()
 
@app.task(ignore_result=True)
def cleanup_tmp_directory():
    # Delete files and folder in the tmp directory that are
    # older than 24 hours
    tmpdir = settings.MEDIA_TMP
    time_limit = 60 * 60 * 24
 
    for f in os.listdir(tmpdir):
        now = time.time()
        filepath = os.path.join(tmpdir, f)
        modified = os.stat(filepath).st_mtime
        if modified < now - time_limit:
            if os.path.isfile(filepath):
                os.remove(filepath)
            else:
                shutil.rmtree(filepath, ignore_errors=True)
 
            logger.info('Cleaned up: %s (%s)' % (f, modified))
 
 
# Based on https://stackoverflow.com/questions/22498038/improve-current-implementation-of-a-setinterval-python/22498708#22498708
def setInterval(interval, func, *args):
    stopped = Event()
    def loop():
        while not stopped.wait(interval):
            func(*args)
    t = Thread(target=loop)
    t.daemon = True
    t.start()
    return stopped.set
 
@app.task(ignore_result=True)
def process_task(taskId):
    lock_id = 'task_lock_{}'.format(taskId)
    cancel_monitor = None
    delete_lock = True
 
    try:
        task_lock_last_update = redis_client.getset(lock_id, time.time())
        if task_lock_last_update is not None:
            # Check if lock has expired
            if time.time() - float(task_lock_last_update) <= 30:
                # Locked
                delete_lock = False
                return
            else:
                # Expired
                logger.warning("Task {} has an expired lock! This could mean that WebODM is running out of memory. Check your server configuration.".format(taskId))
 
        # Set lock
        def update_lock():
            redis_client.set(lock_id, time.time())
        cancel_monitor = setInterval(5, update_lock)
 
        try:
            task = Task.objects.get(pk=taskId)
        except ObjectDoesNotExist:
            logger.info("Task {} has already been deleted.".format(taskId))
            return
 
        try:
            task.process()
        except Exception as e:
            logger.error(
                "Uncaught error! This is potentially bad. Please report it to http://github.com/OpenDroneMap/WebODM/issues: {} {}".format(
                    e, traceback.format_exc()))
            if settings.TESTING: raise e
    finally:
        if cancel_monitor is not None:
            cancel_monitor()
 
        if delete_lock:
            try:
                redis_client.delete(lock_id)
            except redis.exceptions.RedisError:
                # Ignore errors, the lock will expire at some point
                pass
 
 
 
def get_pending_tasks():
    # All tasks that have a processing node assigned
    # Or that need one assigned (via auto)
    # or tasks that need a status update
    # or tasks that have a pending action
    # no partial tasks allowed
    return Task.objects.filter(Q(processing_node__isnull=True, auto_processing_node=True, partial=False) |
                                Q(Q(status=None) | Q(status__in=[status_codes.QUEUED, status_codes.RUNNING]),
                                  processing_node__isnull=False, partial=False) |
                                Q(pending_action__isnull=False, partial=False))
 
@app.task(ignore_result=True)
def process_pending_tasks():
    tasks = get_pending_tasks()
    for task in tasks:
        process_task.delay(task.id)
 
 
@app.task(bind=True)
def export_raster(self, input, **opts):
    try:
        logger.info("Exporting raster {} with options: {}".format(input, json.dumps(opts)))
        tmpfile = tempfile.mktemp('_raster.{}'.format(extension_for_export_format(opts.get('format', 'gtiff'))), dir=settings.MEDIA_TMP)
        export_raster_sync(input, tmpfile, **opts)
        result = {'file': tmpfile}
 
        if settings.TESTING:
            TestSafeAsyncResult.set(self.request.id, result)
 
        return result
    except Exception as e:
        logger.error(str(e))
        return {'error': str(e)}
 
@app.task(bind=True)
def export_pointcloud(self, input, **opts):
    try:
        logger.info("Exporting point cloud {} with options: {}".format(input, json.dumps(opts)))
        tmpfile = tempfile.mktemp('_pointcloud.{}'.format(opts.get('format', 'laz')), dir=settings.MEDIA_TMP)
        export_pointcloud_sync(input, tmpfile, **opts)
        result = {'file': tmpfile}
 
        if settings.TESTING:
            TestSafeAsyncResult.set(self.request.id, result)
 
        return result
    except Exception as e:
        logger.error(str(e))
        return {'error': str(e)}
 
@app.task(ignore_result=True)
def check_quotas():
    profiles = Profile.objects.filter(quota__gt=-1)
    for p in profiles:
        if p.has_exceeded_quota():
            deadline = p.get_quota_deadline()
            if deadline is None:
                deadline = p.set_quota_deadline(settings.QUOTA_EXCEEDED_GRACE_PERIOD)
            now = time.time()
            if now > deadline:
                # deadline passed, delete tasks until quota is met
                logger.info("Quota deadline expired for %s, deleting tasks" % str(p.user.username))
                task_count = Task.objects.filter(project__owner=p.user).count()
                c = 0
 
                while p.has_exceeded_quota():
                    try:
                        last_task = Task.objects.filter(project__owner=p.user).order_by("-created_at").first()
                        if last_task is None:
                            break
                        logger.info("Deleting %s" % last_task)
                        last_task.delete()
                    except Exception as e:
                        logger.warn("Cannot delete %s for %s: %s" % (str(last_task), str(p.user.username), str(e)))
                    
                    c += 1
                    if c >= task_count:
                        break
        else:
            p.clear_quota_deadline()