Commit 230efcc1 authored by Yuxin Wu's avatar Yuxin Wu

Use weakref for atexit handler in PrefetchDataZMQ (#562)

parent 91f3a441
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
# Author: Yuxin Wu <ppwwyyxx@gmail.com> # Author: Yuxin Wu <ppwwyyxx@gmail.com>
from __future__ import print_function from __future__ import print_function
import weakref
import threading import threading
from contextlib import contextmanager from contextlib import contextmanager
import multiprocessing as mp import multiprocessing as mp
...@@ -12,6 +13,7 @@ import errno ...@@ -12,6 +13,7 @@ import errno
import uuid import uuid
import os import os
import zmq import zmq
import atexit
from .base import DataFlow, ProxyDataFlow, DataFlowTerminated, DataFlowReentrantGuard from .base import DataFlow, ProxyDataFlow, DataFlowTerminated, DataFlowReentrantGuard
from ..utils.concurrency import (ensure_proc_terminate, from ..utils.concurrency import (ensure_proc_terminate,
...@@ -49,6 +51,12 @@ def _get_pipe_name(name): ...@@ -49,6 +51,12 @@ def _get_pipe_name(name):
return pipename return pipename
def del_weakref(x):
o = x()
if o is not None:
o.__del__()
@contextmanager @contextmanager
def _zmq_catch_error(name): def _zmq_catch_error(name):
try: try:
...@@ -82,8 +90,7 @@ class _MultiProcessZMQDataFlow(DataFlow): ...@@ -82,8 +90,7 @@ class _MultiProcessZMQDataFlow(DataFlow):
self._reset_done = True self._reset_done = True
# __del__ not guranteed to get called at exit # __del__ not guranteed to get called at exit
import atexit atexit.register(del_weakref, weakref.ref(self))
atexit.register(lambda x: x.__del__(), self)
self._reset_once() # build processes self._reset_once() # build processes
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment