分享

简易自制线程池(备忘)

 delphi_笔记 2018-07-20

unit ThreadPool;

interface

uses Classes,Windows,SysUtils,common,BaseThread;

type

 // TAddEvent = procedure(msg:string);


   TBaseTask = class   //基础任务
   protected
     Fid:byte;
     FSendType : byte; //发送信息类型:0:原ASCII 1 D131或者D101之类的需要有字符替换
     FNumber :string;
     FBuffer:Tbuffer;
     FMsg :string;
     FLog:Tlog;
     FProcessTaskThread:TThread;
     FSendTaskThread:TBaseThread;
     function GetData:string;
     Function GetCount : integer;
   public
      property id:Byte read Fid;
      property Number:string read FNumber;
      property SendType :byte read FSendtype write FSendType;
      property ProcessTaskThread : TThread read FProcessTaskThread write FProcessTaskThread;
      property SendTaskThread : TBaseThread read FSendTaskThread write FSendTaskThread;
      property Count : integer read GetCount;
      procedure Run;virtual;
      procedure Add(Msg:string);virtual;
      Constructor Create(Sid:Byte;Number:string;Log_switch:integer);virtual;    //对应POOL里面数据头的ID
      destructor Destroy; override;
   end;

   TBaseTaskThread = class(TThread)      //任务线程
   protected
     FName :string;
     Flog: Tlog;
     FList: TList;
     function GetCount:integer;
     function GetTask:TBaseTask;
     procedure Execute;   override;
     procedure Terminatedevent(sender:Tobject);
   public
     property Count:integer  read Getcount;
     property Name :string read FName;
     procedure Add(Item: Pointer);
     Constructor Create(Name:string;suspendflag:boolean;logSwitch:integer);
   end;

   TBasePoolThread = class(TThread)               //分派线程
   protected
        FName :string;
        FLog :Tlog;
        FThreadCount : integer;
        FBuffer: TBuffer;
        FThreadList : Tlist;
        FTaskList : Tlist;
        FOnThreadProcessEvent :TOnThreadProcessEvent;
        FOnThreadBeforeEvent :TOnThreadBeforeEvent;
        function SearchTask(Msg:string):TBaseTask;
        function SearchThread:TBaseTaskThread;
        procedure Execute;   override;
        procedure Terminatedevent(sender:Tobject);
   public
        property ChildThreadCount :integer read FThreadCount;
        procedure Add(Msg:string);     //传入 ID+MSG ,id是byte类型,一位0-255
        procedure AddTask(Task:TBaseTask);
        procedure CreateChildThread;
        property Name :string read FName;
        property OnThreadProcessEvent:TOnThreadProcessEvent read FOnThreadProcessEvent write FOnThreadProcessEvent;
        property OnThreadBeforeEvent :TOnThreadBeforeEvent read FOnThreadBeforeEvent write FOnThreadBeforeEvent;
        Constructor Create(Name:string;ThreadCount:integer;suspendflag:boolean;logSwitch:integer);
  end;


implementation

{ TBaseTask }

procedure TBaseTask.Add(Msg: string);
begin
  FBuffer.Add(Msg);
end;

constructor TBaseTask.Create(Sid: Byte; Number: string;Log_switch:integer);
begin
  Fid := Sid;
  FSendType := 1;
  FNumber := number;
  FLog := Tlog.Create(Number,Log_switch);
  FBuffer := TBuffer.Create;
end;

destructor TBaseTask.Destroy;
begin
  FBuffer.Free;
  Flog.Free;
  inherited;
end;

function TBaseTask.GetCount: integer;
begin
  result := Fbuffer.Count;
end;

function TBaseTask.GetData: string;
begin
  Result := Fbuffer.Getdata;
  FBuffer.DeleteData;
end;

procedure TBaseTask.Run;
begin
 if FBuffer.Count > 0 then begin
     fmsg:= GetData;
  {ToDosomething(msg);}
 end;
end;

{ TBaseTaskThread }

procedure TBaseTaskThread.Add(Item: Pointer);
begin
  Flist.Add(item);
  TBaseTask(item).ProcessTaskThread := self;
  self.Resume;
end;

constructor TBaseTaskThread.Create(Name: string; suspendflag: boolean;
  logSwitch: integer);
begin
  Fname := name;
  Flist := TList.Create;
  Flog := Tlog.Create(name,logswitch);
  onterminate := terminatedevent;
  FreeOnTerminate:=true;
  Flog.Log(name+'线程对象创建',log_all);
  inherited create(suspendflag);        //建立后先挂起
end;

procedure TBaseTaskThread.Execute;
//var temp :string;
begin
   while True do begin
      if Terminated then break;
      if Flist.Count > 0 then begin
        try
          GetTask.Run;
        except
          on e: Exception do
            FLog.Log('异常:'+e.Message,log_fail);
        end;
      end else
        suspend;
   end;
end;

function TBaseTaskThread.GetCount: integer;
begin
  result:=  Flist.Count;
end;

function TBaseTaskThread.GetTask: TBaseTask;
begin
  result := Flist.items[0];
  Flist.Delete(0);
end;

procedure TBaseTaskThread.Terminatedevent(sender: Tobject);
begin
  Flist.Free;
  Flog.log(FName+'线程释放',log_all);
  Flog.Free;
end;

{ TBasePoolThread }

procedure TBasePoolThread.Add(Msg: string);
begin
  Fbuffer.Add(msg);
  resume;
end;


procedure TBasePoolThread.AddTask(Task: TBaseTask);
begin
  FTaskList.Add(task);
end;

constructor TBasePoolThread.Create(Name: string; ThreadCount: integer;
  suspendflag: boolean; logSwitch: integer);
begin
  Fname := name;
  FBuffer := TBuffer.Create;
  FThreadCount := ThreadCount;
  FThreadList := Tlist.Create;
  Flog := Tlog.Create(name,logswitch);
  CreateChildThread;
  onterminate := terminatedevent;
  FreeOnTerminate:=true;
  Flog.Log(name+'线程对象创建',log_all);
  inherited create(suspendflag);        //建立后先挂起
end;

procedure TBasePoolThread.CreateChildThread;
var tempThread : TBaseTaskThread;
     i:integer;
begin
    for i:=0 to FThreadCount -1 do begin
      tempThread := TBaseTaskThread.Create(Fname+inttostr(i), true,log_all);
      FThreadList.Add(tempThread);
    end;
end;

procedure TBasePoolThread.Execute;
var temp :string;
   // temptask : TBaseTask;
   // tempThread : TBasePoolThread;
begin
   while True do begin
      if Terminated then break;
      if Fbuffer.Count > 0 then begin
         temp := Fbuffer.Getdata; Fbuffer.deletedata;
         if Assigned(FOnThreadBeforeEvent) then temp:=FOnThreadBeforeEvent(Flog,temp);
            SearchTask(temp);
           flog.Log('处理数据:'+temp,log_all);
         if Assigned(FOnThreadProcessEvent) then FOnThreadProcessEvent(Flog,temp);
      end else
        suspend;
   end;
end;

function TBasePoolThread.SearchTask(Msg: string): TBaseTask;
var fid  : byte;
    i:integer;
begin
  fid := byte(msg[1]);
  result := nil;
  for i:=0 to FTasklist.Count-1 do begin
    if TBaseTask(FTasklist.Items[i]).Fid = fid then begin
          if (TBaseTask(FTasklist.Items[i]).Count > 0and
               assigned( TBaseTask(FTasklist.Items[i]).ProcessTaskThread) then begin
                  TBaseTask(FTasklist.Items[i]).Add(copy(msg,2,length(msg)-1));
                 TBaseTaskThread(TBaseTask(FTasklist.Items[i]).ProcessTaskThread).Add(FTasklist.Items[i])   //只有原任务有线程,且有未完成的任务
               end  else begin
                 TBaseTask(FTasklist.Items[i]).Add(copy(msg,2,length(msg)-1));
                 SearchThread.Add(FTasklist.Items[i]);
               end;
       break;
    end;
  end;
end;

function TBasePoolThread.SearchThread: TBaseTaskThread;
var i,pole:integer;
begin
  result := nil;
  pole := 9999;
  for i:=0 to FThreadlist.Count -1 do begin
    if TBaseTaskThread(FThreadList.Items[i]).Count = 0 then begin
      result := FThreadList.Items[i];
      break;
    end;
    if pole < TBaseTaskThread(FThreadList.Items[i]).Count then
       result := FThreadList.Items[i];
  end;

end;

procedure TBasePoolThread.Terminatedevent(sender: Tobject);
var i:integer;
begin
    for i:= 0 to FThreadList.Count-1 do begin
       TBaseTaskThread(FThreadList.items[i]).Terminate;
       TBaseTaskThread(FThreadList.items[i]).Resume;
    end;
    FThreadList.Free;
    fbuffer.Free;
    Flog.log(FName+'线程释放',log_all);
    Flog.Free;
end;

end.

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章