1、创建线程中运行的方法体

public void Func(object state)
{
   lock(obj)
   {
        .....
    }
}

2、运行线程操作类

public void  FuncTask(object state)
{
    ManagedTreadPool.QueueUserWorkItem(new WaitCallback(Func),state)
}

3、附线程操作类库

using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace Als.Utils.Task
{
    /// <summary>
    ///用C#托管代码实现的线程池
    /// 它在管理线程的时候,有一个缓存线程的池,即一个ArrayList对象
    /// 它一开始就初始化了一定数量的线程,并通过ProcessQueuedItems方法保证异步执行进入池中的队列任务
    /// 这样在分配异步任务的时候,就省去了频繁去创建(new)一个线程
    /// </summary>
    public class ManagedThreadPool
    {
        #region Constants
        /// <summary>Maximum number of threads the thread pool has at its disposal.</summary>
        private const int _maxWorkerThreads = 10;
        #endregion

        #region Member Variables
        /// <summary>Queue of all the callbacks waiting to be executed.</summary>
        private static Queue _waitingCallbacks;
        /// <summary>
        /// Used to signal that a worker thread is needed for processing.  Note that multiple
        /// threads may be needed simultaneously and as such we use a semaphore instead of
        /// an auto reset event.
        /// </summary>
        private static Semaphore _workerThreadNeeded;
        /// <summary>List of all worker threads at the disposal of the thread pool.</summary>
        private static ArrayList _workerThreads;
        /// <summary>Number of threads currently active.</summary>
        private static int _inUseThreads;
        /// <summary>Lockable object for the pool.</summary>
        private static object _poolLock = new object();
        #endregion

        #region Construction and Finalization
        /// <summary>Initialize the thread pool.</summary>
        static ManagedThreadPool() { Initialize(); }

        /// <summary>Initializes the thread pool.</summary>
        private static void Initialize()
        {
            // Create our thread stores; we handle synchronization ourself
            // as we may run into situtations where multiple operations need to be atomic.
            // We keep track of the threads we've created just for good measure; not actually
            // needed for any core functionality.
            _waitingCallbacks = new Queue();
            _workerThreads = new ArrayList();
            _inUseThreads = 0;

            // Create our "thread needed" event
            _workerThreadNeeded = new Semaphore(0);

            // Create all of the worker threads
            for (int i = 0; i < _maxWorkerThreads; i++)
            {
                // Create a new thread and add it to the list of threads.
                Thread newThread = new Thread(new ThreadStart(ProcessQueuedItems));
                _workerThreads.Add(newThread);

                // Configure the new thread and start it
                newThread.Name = "ManagedPoolThread #" + i.ToString();
                newThread.IsBackground = true;
                newThread.Start();
            }
        }
        #endregion

        #region Public Methods
        /// <summary>Queues a user work item to the thread pool.</summary>
        /// <param name="callback">
        /// A WaitCallback representing the delegate to invoke when the thread in the 
        /// thread pool picks up the work item.
        /// </param>
        public static void QueueUserWorkItem(WaitCallback callback)
        {
            // Queue the delegate with no state
            //Logger.Debug(string.Format("添加线程操作方法:{0},Target:{1}", callback.Method.Name, callback.Target.ToString()));
            QueueUserWorkItem(callback, null);
        }

        /// <summary>Queues a user work item to the thread pool.</summary>
        /// <param name="callback">
        /// A WaitCallback representing the delegate to invoke when the thread in the 
        /// thread pool picks up the work item.
        /// </param>
        /// <param name="state">
        /// The object that is passed to the delegate when serviced from the thread pool.
        /// </param>
        public static void QueueUserWorkItem(WaitCallback callback, object state)
        {
            // Create a waiting callback that contains the delegate and its state.
            // At it to the processing queue, and signal that data is waiting.
            WaitingCallback waiting = new WaitingCallback(callback, state);
            lock (_poolLock) { _waitingCallbacks.Enqueue(waiting); }
            _workerThreadNeeded.AddOne();
        }

        /// <summary>Empties the work queue of any queued work items.  Resets all threads in the pool.</summary>
        public static void Reset()
        {
            lock (_poolLock)
            {
                // Cleanup any waiting callbacks
                try
                {
                    // Try to dispose of all remaining state
                    foreach (object obj in _waitingCallbacks)
                    {
                        WaitingCallback callback = (WaitingCallback)obj;
                        if (callback.State is IDisposable) ((IDisposable)callback.State).Dispose();
                    }
                }
                catch { }

                // Shutdown all existing threads
                try
                {
                    foreach (Thread thread in _workerThreads)
                    {
                        if (thread != null) thread.Abort("reset");
                    }
                }
                catch { }

                // Reinitialize the pool (create new threads, etc.)
                Initialize();
            }
        }
        #endregion

        #region Properties
        /// <summary>Gets the number of threads at the disposal of the thread pool.</summary>
        public static int MaxThreads { get { return _maxWorkerThreads; } }
        /// <summary>Gets the number of currently active threads in the thread pool.</summary>
        public static int ActiveThreads { get { return _inUseThreads; } }
        /// <summary>Gets the number of callback delegates currently waiting in the thread pool.</summary>
        public static int WaitingCallbacks { get { lock (_poolLock) { return _waitingCallbacks.Count; } } }
        #endregion

        #region Thread Processing
        /// <summary>Event raised when there is an exception on a threadpool thread.</summary>

#if NET1
        public static event UnhandledExceptionEventHandler UnhandledException;
#endif

        /// <summary>A thread worker function that processes items from the work queue.</summary>
        private static void ProcessQueuedItems()
        {
            // Process indefinitely
            while (true)
            {
                _workerThreadNeeded.WaitOne();

                // Get the next item in the queue.  If there is nothing there, go to sleep
                // for a while until we're woken up when a callback is waiting.
                WaitingCallback callback = null;

                // Try to get the next callback available.  We need to lock on the 
                // queue in order to make our count check and retrieval atomic.
                lock (_poolLock)
                {
                    if (_waitingCallbacks.Count > 0)
                    {
                        try { callback = (WaitingCallback)_waitingCallbacks.Dequeue(); }
                        catch { } // make sure not to fail here
                    }
                }

                if (callback != null)
                {
                    // We now have a callback.  Execute it.  Make sure to accurately
                    // record how many callbacks are currently executing.
                    try
                    {
                        Interlocked.Increment(ref _inUseThreads);
                        callback.Callback(callback.State);
                    }

                    catch (Exception)
                    {

                    }

                    finally
                    {
                        Interlocked.Decrement(ref _inUseThreads);
                    }
                }
            }
        }
        #endregion

        /// <summary>Used to hold a callback delegate and the state for that delegate.</summary>
        private class WaitingCallback
        {
            #region Member Variables
            /// <summary>Callback delegate for the callback.</summary>
            private WaitCallback _callback;
            /// <summary>State with which to call the callback delegate.</summary>
            private object _state;
            #endregion

            #region Construction
            /// <summary>Initialize the callback holding object.</summary>
            /// <param name="callback">Callback delegate for the callback.</param>
            /// <param name="state">State with which to call the callback delegate.</param>
            public WaitingCallback(WaitCallback callback, object state)
            {
                _callback = callback;
                _state = state;
            }
            #endregion

            #region Properties
            /// <summary>Gets the callback delegate for the callback.</summary>
            public WaitCallback Callback { get { return _callback; } }
            /// <summary>Gets the state with which to call the callback delegate.</summary>
            public object State { get { return _state; } }
            #endregion
        }
    }

    /// <summary>
    /// Implementation of Dijkstra's PV Semaphore based on the Monitor class.
    /// </summary>
    public class Semaphore
    {
        #region Member Variables
        /// <summary>The number of units alloted by this semaphore.</summary>
        private int _count;
        /// <summary>Lock for the semaphore.</summary>
        private object _semLock = new object();
        #endregion

        #region Construction
        /// <summary> Initialize the semaphore as a binary semaphore.</summary>
        public Semaphore()
            : this(1)
        {
        }

        /// <summary> Initialize the semaphore as a counting semaphore.</summary>
        /// <param name="count">Initial number of threads that can take out units from this semaphore.</param>
        /// <exception cref="ArgumentException">Throws if the count argument is less than 0.</exception>
        public Semaphore(int count)
        {
            if (count < 0) throw new ArgumentException("Semaphore must have a count of at least 0.", "count");
            _count = count;
        }
        #endregion

        #region Synchronization Operations
        /// <summary>V the semaphore (add 1 unit to it).</summary>
        public void AddOne() { V(); }

        /// <summary>P the semaphore (take out 1 unit from it).</summary>
        public void WaitOne() { P(); }

        /// <summary>P the semaphore (take out 1 unit from it).</summary>
        public void P()
        {
            // Lock so we can work in peace.  This works because lock is actually
            // built around Monitor.
            lock (_semLock)
            {
                // Wait until a unit becomes available.  We need to wait
                // in a loop in case someone else wakes up before us.  This could
                // happen if the Monitor.Pulse statements were changed to Monitor.PulseAll
                // statements in order to introduce some randomness into the order
                // in which threads are woken.
                while (_count <= 0) Monitor.Wait(_semLock, Timeout.Infinite);
                _count--;
            }
        }

        /// <summary>V the semaphore (add 1 unit to it).</summary>
        public void V()
        {
            // Lock so we can work in peace.  This works because lock is actually
            // built around Monitor.
            lock (_semLock)
            {
                // Release our hold on the unit of control.  Then tell everyone
                // waiting on this object that there is a unit available.
                _count++;
                Monitor.Pulse(_semLock);
            }
        }

        /// <summary>Resets the semaphore to the specified count.  Should be used cautiously.</summary>
        public void Reset(int count)
        {
            lock (_semLock) { _count = count; }
        }
        #endregion
    }

}
最后修改:2022 年 06 月 17 日 10 : 29 AM
如果觉得我的文章对你有用,请随意赞赏