unit ThreadPool; interface uses Classes,Windows,SysUtils,common,BaseThread; type // TAddEvent = procedure(msg:string); TBaseTask = class //基础任务 protected Fid:byte; FSendType : byte; //发送信息类型:0:原ASCII 1 D131或者D101之类的需要有字符替换 FNumber :string; FBuffer:Tbuffer; FMsg :string; FLog:Tlog; FProcessTaskThread:TThread; FSendTaskThread:TBaseThread; function GetData:string; Function GetCount : integer; public property id:Byte read Fid; property Number:string read FNumber; property SendType :byte read FSendtype write FSendType; property ProcessTaskThread : TThread read FProcessTaskThread write FProcessTaskThread; property SendTaskThread : TBaseThread read FSendTaskThread write FSendTaskThread; property Count : integer read GetCount; procedure Run;virtual; procedure Add(Msg:string);virtual; Constructor Create(Sid:Byte;Number:string;Log_switch:integer);virtual; //对应POOL里面数据头的ID destructor Destroy; override; end; TBaseTaskThread = class(TThread) //任务线程 protected FName :string; Flog: Tlog; FList: TList; function GetCount:integer; function GetTask:TBaseTask; procedure Execute; override; procedure Terminatedevent(sender:Tobject); public property Count:integer read Getcount; property Name :string read FName; procedure Add(Item: Pointer); Constructor Create(Name:string;suspendflag:boolean;logSwitch:integer); end; TBasePoolThread = class(TThread) //分派线程 protected FName :string; FLog :Tlog; FThreadCount : integer; FBuffer: TBuffer; FThreadList : Tlist; FTaskList : Tlist; FOnThreadProcessEvent :TOnThreadProcessEvent; FOnThreadBeforeEvent :TOnThreadBeforeEvent; function SearchTask(Msg:string):TBaseTask; function SearchThread:TBaseTaskThread; procedure Execute; override; procedure Terminatedevent(sender:Tobject); public property ChildThreadCount :integer read FThreadCount; procedure Add(Msg:string); //传入 ID+MSG ,id是byte类型,一位0-255 procedure AddTask(Task:TBaseTask); procedure CreateChildThread; property Name :string read FName; property OnThreadProcessEvent:TOnThreadProcessEvent read FOnThreadProcessEvent write FOnThreadProcessEvent; property OnThreadBeforeEvent :TOnThreadBeforeEvent read FOnThreadBeforeEvent write FOnThreadBeforeEvent; Constructor Create(Name:string;ThreadCount:integer;suspendflag:boolean;logSwitch:integer); end; implementation { TBaseTask } procedure TBaseTask.Add(Msg: string); begin FBuffer.Add(Msg); end; constructor TBaseTask.Create(Sid: Byte; Number: string;Log_switch:integer); begin Fid := Sid; FSendType := 1; FNumber := number; FLog := Tlog.Create(Number,Log_switch); FBuffer := TBuffer.Create; end; destructor TBaseTask.Destroy; begin FBuffer.Free; Flog.Free; inherited; end; function TBaseTask.GetCount: integer; begin result := Fbuffer.Count; end; function TBaseTask.GetData: string; begin Result := Fbuffer.Getdata; FBuffer.DeleteData; end; procedure TBaseTask.Run; begin if FBuffer.Count > 0 then begin fmsg:= GetData; {ToDosomething(msg);} end; end; { TBaseTaskThread } procedure TBaseTaskThread.Add(Item: Pointer); begin Flist.Add(item); TBaseTask(item).ProcessTaskThread := self; self.Resume; end; constructor TBaseTaskThread.Create(Name: string; suspendflag: boolean; logSwitch: integer); begin Fname := name; Flist := TList.Create; Flog := Tlog.Create(name,logswitch); onterminate := terminatedevent; FreeOnTerminate:=true; Flog.Log(name+'线程对象创建',log_all); inherited create(suspendflag); //建立后先挂起 end; procedure TBaseTaskThread.Execute; //var temp :string; begin while True do begin if Terminated then break; if Flist.Count > 0 then begin try GetTask.Run; except on e: Exception do FLog.Log('异常:'+e.Message,log_fail); end; end else suspend; end; end; function TBaseTaskThread.GetCount: integer; begin result:= Flist.Count; end; function TBaseTaskThread.GetTask: TBaseTask; begin result := Flist.items[0]; Flist.Delete(0); end; procedure TBaseTaskThread.Terminatedevent(sender: Tobject); begin Flist.Free; Flog.log(FName+'线程释放',log_all); Flog.Free; end; { TBasePoolThread } procedure TBasePoolThread.Add(Msg: string); begin Fbuffer.Add(msg); resume; end; procedure TBasePoolThread.AddTask(Task: TBaseTask); begin FTaskList.Add(task); end; constructor TBasePoolThread.Create(Name: string; ThreadCount: integer; suspendflag: boolean; logSwitch: integer); begin Fname := name; FBuffer := TBuffer.Create; FThreadCount := ThreadCount; FThreadList := Tlist.Create; Flog := Tlog.Create(name,logswitch); CreateChildThread; onterminate := terminatedevent; FreeOnTerminate:=true; Flog.Log(name+'线程对象创建',log_all); inherited create(suspendflag); //建立后先挂起 end; procedure TBasePoolThread.CreateChildThread; var tempThread : TBaseTaskThread; i:integer; begin for i:=0 to FThreadCount -1 do begin tempThread := TBaseTaskThread.Create(Fname+inttostr(i), true,log_all); FThreadList.Add(tempThread); end; end; procedure TBasePoolThread.Execute; var temp :string; // temptask : TBaseTask; // tempThread : TBasePoolThread; begin while True do begin if Terminated then break; if Fbuffer.Count > 0 then begin temp := Fbuffer.Getdata; Fbuffer.deletedata; if Assigned(FOnThreadBeforeEvent) then temp:=FOnThreadBeforeEvent(Flog,temp); SearchTask(temp); flog.Log('处理数据:'+temp,log_all); if Assigned(FOnThreadProcessEvent) then FOnThreadProcessEvent(Flog,temp); end else suspend; end; end; function TBasePoolThread.SearchTask(Msg: string): TBaseTask; var fid : byte; i:integer; begin fid := byte(msg[1]); result := nil; for i:=0 to FTasklist.Count-1 do begin if TBaseTask(FTasklist.Items[i]).Fid = fid then begin if (TBaseTask(FTasklist.Items[i]).Count > 0) and assigned( TBaseTask(FTasklist.Items[i]).ProcessTaskThread) then begin TBaseTask(FTasklist.Items[i]).Add(copy(msg,2,length(msg)-1)); TBaseTaskThread(TBaseTask(FTasklist.Items[i]).ProcessTaskThread).Add(FTasklist.Items[i]) //只有原任务有线程,且有未完成的任务 end else begin TBaseTask(FTasklist.Items[i]).Add(copy(msg,2,length(msg)-1)); SearchThread.Add(FTasklist.Items[i]); end; break; end; end; end; function TBasePoolThread.SearchThread: TBaseTaskThread; var i,pole:integer; begin result := nil; pole := 9999; for i:=0 to FThreadlist.Count -1 do begin if TBaseTaskThread(FThreadList.Items[i]).Count = 0 then begin result := FThreadList.Items[i]; break; end; if pole < TBaseTaskThread(FThreadList.Items[i]).Count then result := FThreadList.Items[i]; end; end; procedure TBasePoolThread.Terminatedevent(sender: Tobject); var i:integer; begin for i:= 0 to FThreadList.Count-1 do begin TBaseTaskThread(FThreadList.items[i]).Terminate; TBaseTaskThread(FThreadList.items[i]).Resume; end; FThreadList.Free; fbuffer.Free; Flog.log(FName+'线程释放',log_all); Flog.Free; end; end. |
|