#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# pytype: skip-file
import queue
import threading
import weakref
from concurrent.futures import _base
class _WorkItem(object):
  def __init__(self, future, fn, args, kwargs):
    self._future = future
    self._fn = fn
    self._fn_args = args
    self._fn_kwargs = kwargs
  def run(self):
    if self._future.set_running_or_notify_cancel():
      # If the future wasn't cancelled, then attempt to execute it.
      try:
        self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
      except BaseException as exc:
        self._future.set_exception(exc)
class _Worker(threading.Thread):
  def __init__(self, idle_worker_queue, work_item):
    super().__init__()
    self._idle_worker_queue = idle_worker_queue
    self._work_item = work_item
    self._wake_semaphore = threading.Semaphore(0)
    self._lock = threading.Lock()
    self._shutdown = False
  def run(self):
    while True:
      self._work_item.run()
      self._work_item = None
      self._idle_worker_queue.put(self)
      self._wake_semaphore.acquire()
      if self._work_item is None:
        return
  def assign_work(self, work_item):
    """Assigns the work item and wakes up the thread.
    This method must only be called while the worker is idle.
    """
    self._work_item = work_item
    self._wake_semaphore.release()
  def shutdown(self):
    """Wakes up this thread with a 'None' work item signalling to shutdown."""
    self._wake_semaphore.release()
[docs]
class UnboundedThreadPoolExecutor(_base.Executor):
  def __init__(self):
    self._idle_worker_queue = queue.Queue()
    self._max_idle_threads = 16
    self._workers = weakref.WeakSet()
    self._shutdown = False
    self._lock = threading.Lock()  # Guards access to _workers and _shutdown
[docs]
  def submit(self, fn, *args, **kwargs):
    """Attempts to submit the work item.
    A runtime error is raised if the pool has been shutdown.
    """
    future = _base.Future()
    work_item = _WorkItem(future, fn, args, kwargs)
    with self._lock:
      if self._shutdown:
        raise RuntimeError(
            'Cannot schedule new tasks after thread pool has been shutdown.')
      try:
        self._idle_worker_queue.get(block=False).assign_work(work_item)
        # If we have more idle threads then the max allowed, shutdown a thread.
        if self._idle_worker_queue.qsize() > self._max_idle_threads:
          try:
            self._idle_worker_queue.get(block=False).shutdown()
          except queue.Empty:
            pass
      except queue.Empty:
        worker = _Worker(self._idle_worker_queue, work_item)
        worker.daemon = True
        worker.start()
        self._workers.add(worker)
    return future 
[docs]
  def shutdown(self, wait=True):
    with self._lock:
      if self._shutdown:
        return
      self._shutdown = True
      for worker in self._workers:
        worker.shutdown()
      if wait:
        for worker in self._workers:
          worker.join() 
 
class _SharedUnboundedThreadPoolExecutor(UnboundedThreadPoolExecutor):
  def shutdown(self, wait=True):
    # Prevent shutting down the shared thread pool
    pass
_SHARED_UNBOUNDED_THREAD_POOL_EXECUTOR = _SharedUnboundedThreadPoolExecutor()
[docs]
def shared_unbounded_instance():
  return _SHARED_UNBOUNDED_THREAD_POOL_EXECUTOR