#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
import sys
import threading
import weakref
from concurrent.futures import _base
try:  # Python3
  import queue
except Exception:  # Python2
  import Queue as queue
class _WorkItem(object):
  def __init__(self, future, fn, args, kwargs):
    self._future = future
    self._fn = fn
    self._fn_args = args
    self._fn_kwargs = kwargs
  def run(self):
    if self._future.set_running_or_notify_cancel():
      # If the future wasn't cancelled, then attempt to execute it.
      try:
        self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
      except BaseException as exc:
        # Even though Python 2 futures library has #set_exection(),
        # the way it generates the traceback doesn't align with
        # the way in which Python 3 does it so we provide alternative
        # implementations that match our test expectations.
        if sys.version_info.major >= 3:
          self._future.set_exception(exc)
        else:
          e, tb = sys.exc_info()[1:]
          self._future.set_exception_info(e, tb)
class _Worker(threading.Thread):
  def __init__(self, idle_worker_queue, permitted_thread_age_in_seconds,
               work_item):
    super(_Worker, self).__init__()
    self._idle_worker_queue = idle_worker_queue
    self._permitted_thread_age_in_seconds = permitted_thread_age_in_seconds
    self._work_item = work_item
    self._wake_event = threading.Event()
    self._lock = threading.Lock()
    self._shutdown = False
  def run(self):
    while True:
      self._work_item.run()
      self._work_item = None
      # If we are explicitly awake then don't add ourselves back to the
      # idle queue. This occurs in case 3 described below.
      if not self._wake_event.is_set():
        self._idle_worker_queue.put(self)
      self._wake_event.wait(self._permitted_thread_age_in_seconds)
      with self._lock:
        # When we are awoken, we may be in one of three states:
        #  1) _work_item is set and _shutdown is False.
        #     This represents the case when we have accepted work.
        #  2) _work_item is unset and _shutdown is True.
        #     This represents the case where either we timed out before
        #     accepting work or explicitly were shutdown without accepting
        #     any work.
        #  3) _work_item is set and _shutdown is True.
        #     This represents a race where we accepted work and also
        #     were shutdown before the worker thread started processing
        #     that work. In this case we guarantee to process the work
        #     but we don't clear the event ensuring that the next loop
        #     around through to the wait() won't block and we will exit
        #     since _work_item will be unset.
        # We only exit when _work_item is unset to prevent dropping of
        # submitted work.
        if self._work_item is None:
          self._shutdown = True
          return
        if not self._shutdown:
          self._wake_event.clear()
  def accepted_work(self, work_item):
    """Returns True if the work was accepted.
    This method must only be called while the worker is idle.
    """
    with self._lock:
      if self._shutdown:
        return False
      self._work_item = work_item
      self._wake_event.set()
      return True
  def shutdown(self):
    """Marks this thread as shutdown possibly waking it up if it is idle."""
    with self._lock:
      if self._shutdown:
        return
      self._shutdown = True
      self._wake_event.set()
[docs]class UnboundedThreadPoolExecutor(_base.Executor):
  def __init__(self, permitted_thread_age_in_seconds=30):
    self._permitted_thread_age_in_seconds = permitted_thread_age_in_seconds
    self._idle_worker_queue = queue.Queue()
    self._workers = weakref.WeakSet()
    self._shutdown = False
    self._lock = threading.Lock() # Guards access to _workers and _shutdown
[docs]  def submit(self, fn, *args, **kwargs):
    """Attempts to submit the work item.
    A runtime error is raised if the pool has been shutdown.
    """
    future = _base.Future()
    work_item = _WorkItem(future, fn, args, kwargs)
    try:
      # Keep trying to get an idle worker from the queue until we find one
      # that accepts the work.
      while not self._idle_worker_queue.get(
          block=False).accepted_work(work_item):
        pass
      return future
    except queue.Empty:
      with self._lock:
        if self._shutdown:
          raise RuntimeError('Cannot schedule new tasks after thread pool '
                             'has been shutdown.')
        worker = _Worker(
            self._idle_worker_queue, self._permitted_thread_age_in_seconds,
            work_item)
        worker.daemon = True
        worker.start()
        self._workers.add(worker)
        return future 
[docs]  def shutdown(self, wait=True):
    with self._lock:
      if self._shutdown:
        return
      self._shutdown = True
      for worker in self._workers:
        worker.shutdown()
      if wait:
        for worker in self._workers:
          worker.join()