如果数据库发生变化,如何停止执行长进程?

我有一个向RabbitMQ队列发送消息的视图。

message = {'origin': 'Bytes CSV',
           'data': {'csv_key': str(csv_entry.key),
                    'csv_fields': csv_fields
                    'order_by': order_by,
                    'filters': filters}}

...

queue_service.send(message=message, headers={}, exchange_name=EXCHANGE_IN_NAME,
                   routing_key=MESSAGES_ROUTING_KEY.replace('#', 'bytes_counting.create'))

对于我的消费者,我有一个很长的过程来生成 CSV。

def create(self, data):
    csv_obj = self._get_object(key=data['csv_key'])
    if csv_obj.status == CSVRequestStatus.CANCELED:
        self.logger.info(f'CSV {csv_obj.key} was canceled by the user')
        return

    result = self.generate_result_data(filters=data['filters'], order_by=data['order_by'], csv_obj=csv_obj)
    csv_data = self._generate_csv(result=result, csv_fields=data['csv_fields'], csv_obj=csv_obj)
    file_key = self._post_csv(csv_data=csv_data, csv_obj=csv_obj)

    csv_obj.status = CSVRequestStatus.READY
    csv_obj.status_additional = CSVRequestStatusAdditional.SUCCESS
    csv_obj.file_key = file_key
    csv_obj.ready_at = timezone.now()
    csv_obj.save(update_fields=['status', 'status_additional', 'ready_at', 'file_key'])

    self.logger.info(f'CSV {csv_obj.name} created')

长过程发生在 内部self._generate_csv,因为self.generate_result_data返回 a queryset,这是惰性的。

如您所见,如果用户csv_request在消息开始使用之前通过端点更改状态,则不会评估过程。我的目标是在执行self._generate_csv.

到目前为止,我尝试使用Threading,但没有成功。

我怎样才能实现我的目标?

非常感谢!

以上是如果数据库发生变化,如何停止执行长进程?的全部内容。
THE END
分享
二维码
< <上一篇
下一篇>>