DELPHI高性能大容量SOCKET并发_第1页
DELPHI高性能大容量SOCKET并发_第2页
DELPHI高性能大容量SOCKET并发_第3页
DELPHI高性能大容量SOCKET并发_第4页
DELPHI高性能大容量SOCKET并发_第5页
已阅读5页,还剩68页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1 (一):IOCP完成端口例子介绍IOCP、控制、SQL查询、上传、下载等协议实现,并包括一些初步的性能测试结果。界面截图如下:提供服务和桌面方式运行,桌面方式可直接打开程序,方便日常调试,可以使用命令行注册或卸载服务,emoSvrexeinstallCMDD:\DEMO\IOCPDemo\Bin\IOCPDemoSvr.exe-uninstall来卸载服务。界面截图如下:主要实现了服务端日志查看,服务端协议类表查看,SQL语句执行协议,上传、下载协议实现,其中对上传、下载实现了一个多线程同时传,用于测试服务器并发性能。2从测试结果可以看出随着发送包增大,速度变快。这里存在一个风险,就是SOCKET传输失败的次数也3 (二):IOCP完成端口控件封装时管理为数众多的套接字,而且希望随着系统内安装的CPU数量的增多,应用程序的性能也可以线性提升,采用完成端口模型,往往可以达到最佳的系统性能。个传文件连接,则每个连接能分到的速度2.56KB/S;如果是20000个命令交互连接,则每个连接分到的吞吐量是655B/S,这种速度的吞吐量对很多应用是不满足,这时就要考虑加大网卡的传输速度或实现水平扩展,会介绍。完成端口是由系统内核管理多个线程之间的切换,比外部实现线程池性能要高,CPU利用率上内核和用TCPServer传输速度要快,吞吐量更高。PostQueuedCompletionStatus。CreateIoCompletionPort的功能是:1、创建一个完成端口对象;2、将一个句柄和完成端口关联在一起;GetQueuedCompletionStatus是获取完成端口状态,是阻塞式调用,在指定时间内如果没有事件通知,会一直等待;PostQueuedCompletionStatus用于向完成端口投递一个完成事件通知。functionCreateIoCompletionPort(FileHandle,ExistingCompletionPort:THandle;CompletionKey,NumberOfConcurrentThreads:DWORD):THandle;stdcall;NumberOfConcurrentThreads参数定义了在一个频繁的线程场景切换。因此可以使用下列语句来创建一个完成端口FIocpHandle:=CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0);执行线程个数创建完成端口后,就可以将套接字句柄与对象关联在一起,这时就需要创建工作者线程,以便在完成端口收到数据后,为完成端口提供处理数据线程。到底创建多少个线程为完成端口服务,这个是完成端口最为复杂个参数需要提供设置,并根据最终的应用场景反复测试得出一个结果。一般的经验值是设置为CPU的个数*2+4;IOCP完成端口一般使用步骤2、判断系统内安装了多少个处理器;个SOCKET套接字开始监听;5、使用Accept接收连接;6、调用CreateIoCompletionPort将连接和完成端口绑定在一起;7、投递接收数据请求8、工作者线程调用GetQueuedCompletionStatus获取事件通知,处理数据;IOCP控件核心代码第1步到第4步实现代码:[[delphi]viewplaincopy2.var3.WsaData:TWsaData;4.iNumberOfProcessors,i,iWorkThreadCount:Integer;5.WorkThread:TWorkThread;46.Addr:TSockAddr;7.begin8.ifWSAStartup($0202,WsaData)<>0then//初始化SOCKET10.FIocpHandle:=CreateIoCompletionPort(INVALID_HANDLE_VALUE,0,0,0);//创建一个完成端口11.ifFIocpHandle=0thentr13.FSocket:=WSASocket(PF_INET,SOCK_STREAM,0,nil,0,WSA_FLAG_OVERLAPPED);//创建一个SOCKET14.ifFSocket=INVALID_SOCKETthenrStr16.FillChar(Addr,SizeOf(Addr),0);17.Addr.sin_family:=AF_INET;18.Addr.sin_port:=htons(FPort);Str22.iflisten(FSocket,MaxInt)<>0then23.raiseESocketError.Create(GetLastWsaErrorStr);25.iWorkThreadCount:=iNumberOfProcessors*2+4;//由于服务器处理可能比较费时间,因此线程设为26.ifiWorkThreadCount<FMinWorkThrCountthen//限定最大工作者线程和最小工作者线程27.iWorkThreadCount:=FMinWorkThrCount;28.ifiWorkThreadCount>FMaxWorkThrCountthen29.iWorkThreadCount:=FMaxWorkThrCount;30.fori:=0toiWorkThreadCount-1do//创建工作者线程31.begin34.WorkThread.Resume;35.end;fTrue第5步和第6步实现代码:[[delphi]cedureTIocpServer.AcceptClient;2.var3.ClientSocket:TSocket;4.begin5.ClientSocket:=WSAAccept(FSocket,nil,nil,nil,0);//接收连接6.ifClientSocket<>INVALID_SOCKETthen7.begin8.ifnotFActivethen59.begin10.closesocket(ClientSocket);11.Exit;12.end;14.end;事件中判断是否允许连接。[[delphi]viewplaincopy2.var3.SocketHandle:TSocketHandle;4.iIndex:Integer;5.ClientSocket:PClientSocket;6.begin7.SocketHandle:=nil;8.ifnotDoConnect(ASocket,SocketHandle)then//如果不允许连接,则退出9.begin10.closesocket(ASocket);11.Exit;12.end;14.try17.finally18.FSocketHandles.UnLock;19.end;20.ifCreateIoCompletionPort(ASocket,FIOCPHandle,DWORD(ClientSocket),0)=0then//将连接和21.begin23.FSocketHandles.Lock;//如果投递到列表中失败,则删除24.try26.finally28.end;29.end30.else31.beginil33.end;6[[delphi]viewplaincopy2.varAAllowConnect:Boolean;varSocketHandle:TSocketHandle);3.var4.BaseSocket:TBaseSocket;5.chFlag:Char;7.functionGetSocket(constAEnable:Boolean;BaseSocketClass:TBaseSocketClass):TBaseSocket;8.begin9.ifAEnablethen11.else12.Result:=nil;13.end;andlesCountGIniOptionsMaxSockehen16.begin17.AAllowConnect:=False;18.Exit;19.end;S21.begin22.caseTSocketFlag(Byte(chFlag))of28.else29.BaseSocket:=nil;;30.end;31.ifBaseSocket<>nilthen32.begin33.SocketHandle:=BaseSocket;d',ketePortnt37.AAllowConnect:=True;38.end39.else40.AAllowConnect:=False;41.end742.else43.AAllowConnect:=False;其中ReadChar函数是用于判断指定时间是否有数据上来,函数实现过程用Select函数检测:[[delphi]viewplaincopy1.functionTIocpServer.ReadChar(constASocket:TSocket;varAChar:Char;constATimeOutMS:Inte2.var3.iRead:Integer;4.begin5.Result:=CheckTimeOut(ASocket,ATimeOutMS);6.ifResultthen7.begin8.iRead:=recv(ASocket,AChar,1,0);9.Result:=iRead=1;10.end;14.constATimeOutMS:Integer):Boolean;16.tmTo:TTimeVal;17.FDRead:TFDSet;19.FillChar(FDRead,SizeOf(FDRead),0);20.FDRead.fd_count:=1;21.FDRead.fd_array[0]:=ASocket;22.tmTo.tv_sec:=ATimeOutMSdiv1000;23.tmTo.tv_usec:=(ATimeOutMSmod1000)*1000;24.Result:=Select(0,@FDRead,nil,nil,@tmTO)=1;接收连接之后要投递接收数据请求,实现代码:[[delphi]viewplaincopy2.var3.iFlags,iTransfer:Cardinal;4.iErrCode:Integer;5.begin6.ifnotAssigned(AIocpRecord)then7.begin8.New(AIocpRecord);BufIZE11.FIocpRecv:=AIocpRecord;12.end;13.AIocpRecord.Overlapped.Internal:=0;814.AIocpRecord.Overlapped.InternalHigh:=0;15.AIocpRecord.Overlapped.Offset:=0;16.AIocpRecord.Overlapped.OffsetHigh:=0;17.AIocpRecord.Overlapped.hEvent:=0;18.//AIocpRecord.WsaBuf.buf:=@FIocpRecvBuf;19.//AIocpRecord.WsaBuf.len:=MAX_IOCPBUFSIZE;20.AIocpRecord.IocpOperate:=ioRead;21.iFlags:=0;23.nil)=SOCKET_ERRORthen24.begin25.iErrCode:=WSAGetLastError;26.ifiErrCode=WSAECONNRESETthen//客户端被关闭27.FConnected:=False;28.ifiErrCode<>ERROR_IO_PENDINGthen//不抛出异常,触发异常事件29.begin31.ProcessNetError(iErrCode);32.end;33.end;第8步实现代码:[[delphi]viewplaincopy1.functionTIocpServer.WorkClient:Boolean;2.var3.ClientSocket:PClientSocket;4.IocpRecord:PIocpRecord;5.iWorkCount:Cardinal;6.begin7.IocpRecord:=nil;8.iWorkCount:=0;9.ClientSocket:=nil;10.Result:=False;11.ifnotGetQueuedCompletionStatus(FIocpHandle,iWorkCount,DWORD(ClientSocket),enSocketHandle13.begin//客户端异常断开tHandlethen15.begin16.ClientSocket.SocketHandle.FConnected:=False;17.Exit;18.end;19.end;20.ifCardinal(IocpRecord)=SHUTDOWN_FLAGthen21.Exit;22.ifnotFActivethen23.Exit;924.24.Result:=True;25.ifAssigned(ClientSocket)andAssigned(ClientSocket.SocketHandle)then26.begin28.begin29.ifiWorkCount>0then30.begin31.try32.ClientSocket.Lock.Enter;33.try36.finallyave38.end;40.begin42.tryhen45.finally47.end;48.end;49.except50.onE:Exceptiondo52.end;53.end54.else//如果完成个数为0,且状态为接收,则释放连接55.begin57.begin58.ClientSocket.Lock.Enter;59.try62.finally64.end;65.end66.elseStrGetLastErrorMessage68.end;69.end70.else//断开连接71.begin72.ClientSocket.Lock.Enter;73.try76.finallyave78.end;79.end;80.end第8步主要是使用GetQueuedCompletionStatus函数来获取完成端口事件通知,如果有事件完成,则可这里有个技巧是我们在绑定的时候使用如下语句:CreateIoCompletionPort(ASocket,FIOCPHandle,DWORD(ClientSocket),0),用lpCompletionKey来传递了一个指针,这其中就包含了一个锁和服务对象,定[[delphi]viewplaincopy1.{*客户端对象和锁*}2.TClientSocket=record3.Lock:TCriticalSection;4.SocketHandle:TSocketHandle;5.end;6.PClientSocket=^TClientSocket;因而我们在收到事件通知后,就可以直接调用TSocketHandle对象来完成分包解包以及后续的逻辑处理。这样一个完成端口的整体骨架就有了,后续还有收发数据、分包解包和业务逻辑处理,以及对象和锁分离等细更详细代码见示例代码的IOCPSocket单元。 (三):接收、发送、缓存接收完成端口是结合重叠端口一起使用的,在接收数据之前,我们需要投递一个接收请求,使用functionWSARecv(s:TSocket;lpBuffers:LPWSABUF;dwBufferCount:DWORD;varlpNumberOfBytesRecvd,lpFlags:DWORD;lpOverlapped:LPWSAOVERLAPPED;lpCompletionRoutine:LPWSAOVERLAPPED_COMPLETION_ROUTINE):Integer函数,接收连接后,就需要投递一个请求,代码[[delphi]viewplaincopy2.var3.iFlags,iTransfer:Cardinal;4.iErrCode:Integer;5.begin6.ifnotAssigned(AIocpRecord)then7.begin8.New(AIocpRecord);BufIZE11.FIocpRecv:=AIocpRecord;12.end;13.AIocpRecord.Overlapped.Internal:=0;14.AIocpRecord.Overlapped.InternalHigh:=0;15.AIocpRecord.Overlapped.Offset:=0;16.AIocpRecord.Overlapped.OffsetHigh:=0;18.AIocpRecord.IocpOperate:=ioRead;19.iFlags:=0;21.nil)=SOCKET_ERRORthen22.begin23.iErrCode:=WSAGetLastError;24.ifiErrCode=WSAECONNRESETthen//客户端被关闭25.FConnected:=False;26.ifiErrCode<>ERROR_IO_PENDINGthen//不抛出异常,触发异常事件27.begin29.ProcessNetError(iErrCode);30.end;31.end;WSARecv会返回错误,可以帮助我们定位网络问题,如连接断了,具体函数代码如下:[[delphi]viewplaincopy2.begin3.ifAErrorCode=WSAECONNRESETthen//客户端断开连接4.FConnected:=False5.elseifAErrorCode=WSAEDISCONthen//客户端断开连接6.FConnected:=False7.elseifAErrorCode=WSAENETDOWNthen//网络异常8.FConnected:=False9.elseifAErrorCode=WSAENETRESETthen//心跳包拦截到的异常10.FConnected:=False11.elseifAErrorCode=WSAESHUTDOWNthen//连接被关闭12.FConnected:=False13.elseifAErrorCode=WSAETIMEDOUTthen//连接断掉或者网络异常14.FConnected:=False;WSARecv断开连接了,WSARecv会收到一个0字节的返回,我们可以根据这个释放连接对象。如果客户端直接拔网线,WSARecv是检测不到,这是我们要根据超时来检测。接收连接之后我们投递一次接收数据请求,然后收到数据处理后,再投递下一次请求,因而我们相当于每次只有一个接收请求,我们只需要定义一个接收结构体和一个固定大小的缓存就可以了,类似:[[delphi]viewplaincopy1.{*保存投递请求的地址信息*}2.FIocpRecv:PIocpRecord;3.FIocpRecvBuf:array[0..MAX_IOCPBUFSIZE-1]ofChar;接收的数据我们一起放在一个内存流中,主要代码如下:[[delphi]viewplaincopy2.constACount:Cardinal);3.begin4.caseAIocpRecord.IocpOperateof5.ioNone:Exit;6.ioRead://收到数据7.begin8.FActiveTime:=Now;10.ifFConnectedthen11.PreRecv(AIocpRecord);//投递请求12.end;13.ioWrite://发送数据完成,需要释放AIocpRecord的指针14.begin15.FActiveTime:=Now;17.end;18.ioStream:19.begin20.FActiveTime:=Now;22.WriteStream;//继续发送流23.end;24.end;inal30.Process;有很多完成端口为了提高效率,会对接收队列使用内存池,以减少内存的分配和释放次数,使用内存流会LPHI发送和接收是类似的,使用functionWSASend(s:TSocket;lpBuffers:LPWSABUF;dwBufferCount:DWORD;varlpNumberOfBytesSent:DWORD;dwFlags:DWORD;lpOverlapped:LPWSAOVERLAPPED;lpCompletionRoutine:LPWSAOVERLAPPED_COMPLETION_ROUTINE):Integer[[delphi]viewplaincopyuntInteger2.constAIocpOperate:TIocpOperate);3.var4.IocpRec:PIocpRecord;5.iErrCode:Integer;6.dSend,dFlag:DWORD;7.begin10.IocpRec.IocpOperate:=AIocpOperate;12.dFlag:=0;14.begin15.iErrCode:=WSAGetLastError;16.ifiErrCode<>ERROR_IO_PENDINGthen17.begin19.ProcessNetError(iErrCode);20.end;21.end;检测网络连接类似接收逻辑。发送时需要先申请一个缓存,发送完成后释放这个缓存,因而在WriteBuffer时需要先用FSendOverlapped申请一个缓存,然后用WSASend投递给完成端口,完成端口执行完成后,需要释放这块缓存,代码如下:[[delphi]viewplaincopy2.constACount:Cardinal);3.begin4.caseAIocpRecord.IocpOperateof5.ioNone:Exit;6.ioRead://收到数据7.begin8.FActiveTime:=Now;10.ifFConnectedthen11.PreRecv(AIocpRecord);//投递请求12.end;13.ioWrite://发送数据完成,需要释放AIocpRecord的指针14.begin15.FActiveTime:=Now;17.end;18.ioStream:19.begin20.FActiveTime:=Now;22.WriteStream;//继续发送流23.end;24.end;在这儿如果要加快速度和减少内存碎片,也可以使用内存池等技术。FSendOverlapped是一个列表管理,代码如下:[[delphi]viewplaincopy1.{*发送所申请的重叠端口缓冲*}2.TSendOverlapped=class3.private4.FList:TList;5.FLock:TCriticalSection;6.procedureClear;7.public8.constructorCreate;9.destructorDestroy;override;10.{*申请一个*}11.functionAllocate(ALen:Cardinal):PIocpRecord;12.{*释放一个*}13.procedureRelease(AValue:PIocpRecord);[[delphi]viewplaincopy1.{TSendOverlapped}3.procedureTSendOverlapped.Clear;4.var5.i:Integer;6.begin7.FLock.Enter;8.try10.begin14.end;15.FList.Clear;16.finally17.FLock.Leave;18.end;23.inheritedCreate;30.Clear;31.FList.Free;32.FLock.Free;33.inherited;38.FLock.Enter;39.try40.New(Result);41.Result.Overlapped.Internal:=0;42.Result.Overlapped.InternalHigh:=0;43.Result.Overlapped.Offset:=0;44.Result.Overlapped.OffsetHigh:=0;45.Result.Overlapped.hEvent:=0;46.Result.IocpOperate:=ioNone;47.Result.WsaBuf.buf:=GetMemory(ALen);48.Result.WsaBuf.len:=ALen;49.FList.Add(Result);50.finally51.FLock.Leave;52.end;57.i:Integer;59.FLock.Enter;60.try61.fori:=0toFList.Count-1do62.begin63.ifCardinal(AValue)=Cardinal(FList[i])then64.begin68.FList.Delete(i);69.Break;70.end;71.end;72.finally73.FLock.Leave;74.end;说,它是串行的,不会并行。 (四):粘包、分包、解包粘包使用TCP长连接就会引入粘包的问题,粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从粘包可能由发送方造成,也可能由接收方造成。TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据,造成多个数据包的粘连。如果接收进程不及时接收数据,已收到的数据就放在系统接收缓冲区,用户进程读取数据时就可能同时读到多个数据包。粘包一般的解决办法是制定通讯协议,由协议来规定如何分包解包。在IOCPDemo例子程序中,我们分包的逻辑是先发一个长度,然后紧接着是数据包内容,这样就可以把IOCPSocket分包处理主要代码,我们收到的数据都是在TSocketHandle.ProcessIOComplete方法中处[[delphi]viewplaincopy2.constACount:Cardinal);3.begin4.caseAIocpRecord.IocpOperateof5.ioNone:Exit;6.ioRead://收到数据7.begin8.FActiveTime:=Now;10.ifFConnectedthen11.PreRecv(AIocpRecord);//投递请求12.end;14.begin15.FActiveTime:=Now;17.end;18.ioStream:19.begin20.FActiveTime:=Now;22.WriteStream;//继续发送流23.end;24.end;Process分包。FInputBuf是一个内存流(FInputBuf:TMemoryStream),内存流的每次写入会造成一次内存内存管理方式。还有一种更好的解决方案是规定每次发包的大小,如每个包最大不超过64K,哪么缓冲区的最大大小可以设置为128K(缓存两个数据包),这的分配和释放比内存的读写效率要低)[[delphi]viewplaincopyal2.begin4.Process;5.end;[[delphi]viewplaincopy2.var3.AData,ALast,NewBuf:PByte;4.iLenOffset,iOffset,iReserveLen:Integer;6.functionReadLen:Integer;7.var8.wLen:Word;9.cLen:Cardinal;10.begin11.FInputBuf.Position:=iOffset;12.ifFLenType=ltWordthen13.begin15.//wLen:=ntohs(wLen);16.Result:=wLen;17.end18.else19.begin21.//cLen:=ntohl(cLen);22.Result:=cLen;23.end;24.end;26.caseFLenTypeof27.ltWord,ltCardinal:28.begin29.ifFLenType=ltWordthen30.iLenOffset:=231.else32.iLenOffset:=4;33.iReserveLen:=0;34.FPacketLen:=0;35.iOffset:=0;36.ifFPacketLen<=0then37.begin39.FInputBuf.Position:=0;//移动到最前面40.FPacketLen:=ReadLen;41.iOffset:=iLenOffset;43.ifFPacketLen>iReserveLenthen//不够一个包的长度44.begine46.FPacketLen:=0;47.Exit;48.end;49.end;50.while(FPacketLen>0)and(iReserveLen>=FPacketLen)do//如果数据够长,则处理51.begin//多个包循环处理et53.Execute(AData,FPacketLen);54.iOffset:=iOffset+FPacketLen;//移到下一个点55.FPacketLen:=0;57.ifiReserveLen>iLenOffsetthen//剩下的数据58.begin59.FPacketLen:=ReadLen;60.iOffset:=iOffset+iLenOffset;62.ifFPacketLen>iReserveLenthen//不够一个包的长度,需要把长度回退63.begin64.iOffset:=iOffset-iLenOffset;66.FPacketLen:=0;67.end;68.end69.else//不够长度字节数70.FPacketLen:=0;71.end;72.ifiReserveLen>0then//把剩下的自己缓存起来73.begin75.GetMem(NewBuf,iReserveLen);76.try77.CopyMemory(NewBuf,ALast,iReserveLen);en80.finally81.FreeMemory(NewBuf);82.end;83.end84.else85.begin86.FInputBuf.Clear;87.end;88.end;89.else90.begin95.end;96.end;解包处理,因而每个Socket服务对象都需要解包,我们解包的逻辑是放在TBaseSocket.DecodePacket中,命令al符号整数)令据这里和第一版公布的代码不同,这版的代码对命令进行了编码,采用UTF-8编码,代码如下:[[delphi]viewplaincopy1.functionTBaseSocket.DecodePacket(APacketData:PByte;2.constALen:Integer):Boolean;3.var4.CommandLen:Integer;5.UTF8Command:UTF8String;6.beginen8.begin9.CopyMemory(@CommandLen,APacketData,SizeOf(Cardinal));//获取命令长度10.Inc(APacketData,SizeOf(Cardinal));11.SetLength(UTF8Command,CommandLen);12.CopyMemory(PUTF8String(UTF8Command),APacketData,CommandLen);//读取命令13.Inc(APacketData,CommandLen);14.FRequestData:=APacketData;//数据15.FRequestDataLen:=ALen-SizeOf(Cardinal)-CommandLen;//数据长度17.Result:=True;18.end19.else20.Result:=False;成Execute方法,调用DecodePacket进行解包,然后根据命令进行协议逻辑处理,[delphi]viewplaincopy2.TSQLSocket=class(TBaseSocket)3.private4.{*开始事务创建TADOConnection,关闭事务时释放*}5.FBeginTrans:Boolean;6.FADOConn:TADOConnection;7.protected8.{*处理数据接口*}9.procedureExecute(AData:PByte;constALen:Cardinal);override;11.procedureDoCmdSQLOpen;13.procedureDoCmdSQLExec;14.{*开始事务*}15.procedureDoCmdBeginTrans;16.{*提交事务*}17.procedureDoCmdCommitTrans;18.{*回滚事务*}19.procedureDoCmdRollbackTrans;21.procedureDoCreate;override;22.destructorDestroy;override;24.functionGetSQL:string;25.propertyBeginTrans:BooleanreadFBeginTrans;Exceute是调用DecodePacket进行解包,然后获取命令分别调用不同的命令处理逻辑,代码如下:[[delphi]viewplaincopy2.var3.sErr:string;4.begin5.inherited;6.FRequest.Clear;8.try9.AddResponseHeader;10.ifALen=0then11.begin12.DoFailure(CIPackLenError);13.DoSendResult;14.Exit;15.end;16.ifDecodePacket(AData,ALen)then17.begin19.AddResponseHeader;220.caseStrToSQLCommand(Command)of21.scLogin:22.begin23.DoCmdLogin;24.DoSendResult;25.end;26.scActive:27.begin28.DoSuccess;29.DoSendResult;30.end;31.scSQLOpen:32.begin33.DoCmdSQLOpen;34.end;35.scSQLExec:36.begin37.DoCmdSQLExec;38.DoSendResult;39.end;40.scBeginTrans:41.begin42.DoCmdBeginTrans;43.DoSendResult;44.end;45.scCommitTrans:46.begin47.DoCmdCommitTrans;48.DoSendResult;49.end;50.scRollbackTrans:51.begin52.DoCmdRollbackTrans;53.DoSendResult;54.end;55.else56.DoFailure(CINoExistCommand,'UnknowCommand');57.DoSendResult;58.end;59.end60.else61.begincketMustIncludernrn63.DoSendResult;64.end;65.except66.onE:Exceptiondo//发生未知错误,断开连接67.begin68.sErr:=RemoteAddress+':'+IntToStr(RemotePort)+CSComma+'UnknowError:'+E.Mes69.WriteLogMsg(ltError,sErr);70.Disconnect;71.end;72.end; (五):锁和对象分离锁和对象一起封装的危险放锁,这样会造成死锁。我们写一个测试例子,我们创建一个锁,把锁锁住,然后再创建一个线程,一直不停的等待锁返回,然后我们把锁释放,这时线程就死锁,代码如下:定义接口:[[delphi]viewplaincopy1.type2.TLockObject=class;3.TLockTestThread=class;4.TForm1=class(TForm)5.btn1:TButton;6.mmoLockThreadTest:TMemo;7.procedurebtn1Click(Sender:TObject);8.procedureFormCreate(Sender:TObject);9.private10.FLockTestThread:TLockTestThread;11.FLockObject:TLockObject;12.public13.{Publicdeclarations}14.end;16.TLockTestThread=class(TThread)17.private18.FLockObject:TLockObject;19.cedureExecute;override;21.procedureAddLockLog;22.propertyLockObject:TLockObjectreadFLockObjectwriteFLockObject;23.end;25.TLockObject=class(TObject)26.private27.FLock:TCriticalSection;28.public29.constructorCreate;virtual;30.destructorDestroy;override;31.procedureLock;32.procedureUnLock;33.end;36.Form1:TForm1;在Form1创建的时候创建锁,并把锁锁住,创建一个线程等待锁返回:[[delphi]cedureTForm1.FormCreate(Sender:TObject);2.begin4.FLockObject.Lock;6.FLockTestThread.LockObject:=FLockObject;7.FLockTestThread.FreeOnTerminate:=True;8.FLockTestThread.Resume;9.end;线程的执行方法一直等待锁返回,并写一条日志。由于是窗体创建的时候,锁已经锁住了,因此线程会一[[delphi]cedureTLockTestThread.AddLockLog;2.begin3.Form1.mmoLockThreadTest.Lines.Add('Lock')4.end;6.procedureTLockTestThread.Execute;7.begin8.inherited;9.whilenotTerminateddo10.begin12.Synchronize(AddLockLog);13.end;这时线程会一直等待,如果我们把FLockObject释放,线程也会一直等待,造成死锁。[[delphi]cedureTForm1.btn1Click(Sender:TObject);2.begin3.FLockObject.Lock;4.FLockObject.Free;5.FLockObject:=nil;6.end;锁和对象分离有了上面的基础之后,我们就需要把锁和对象分离,在IOCPDemoSvr例子代码中TSocketHandle我们用一个结构体来管理锁和对象,锁在创建之后只有等TSocketHandle释放之后再释放,主要代码是TSocketHandles单元:[[delphi]viewplaincopyet2.TSocketHandles=class(TObject)3.private4.{*正在使用列表管理对象*}5.FList:TList;6.{*不再使用列表管理对象*}7.FIdleList:TList;8.{*锁*}9.FLock:TCriticalSection;10.{*获取某一个*}11.functionGetItems(constAIndex:Integer):PClientSocket;12.{*获取总个数*}13.functionGetCount:Integer;14.{*清除*}15.procedureClear;17.constructorCreate;virtual;18.destructorDestroy;override;19.{*加锁*}20.procedureLock;21.{*解锁*}22.procedureUnLock;23.{*添加一个对象*}24.functionAdd(ASocketHandle:TSocketHandle):Integer;25.{*删除*}26.procedureDelete(constAIndex:Integer);overload;27.procedureDelete(ASocketHandle:TSocketHandle);overload;28.propertyItems[constAIndex:Integer]:PClientSocketreadGetItems;default;29.propertyCount:IntegerreadGetCount;实现单元:[[delphi]viewplaincopy1.{TSocketHandles}4.beginte8.end;12.Clear;13.FList.Free;15.FLock.Free;16.inherited;21.Result:=FList[AIndex];26.Result:=FList.Count;31.i:Integer;32.ClientSocket:PClientSocket;34.fori:=0toCount-1do35.begin36.ClientSocket:=Items[i];39.Dispose(ClientSocket);40.end;41.FList.Clear;42.fori:=0toFIdleList.Count-1do43.begin44.ClientSocket:=FIdleList[i];46.Dispose(ClientSocket);47.end;53.FLock.Enter;58.FLock.Leave;63.ClientSocket:PClientSocket;66.begin67.ClientSocket:=FIdleList[0];68.FIdleList.Delete(0);69.end70.else//否则创建一个71.begin72.72.New(ClientSocket);74.end;75.ClientSocket.SocketHandle:=ASocketHandle;77.Result:=FList.Add(ClientSocket);82.ClientSocket:PClientSocket;84.ClientSocket:=FList[AIndex];85.ClientSocket.Lock.Enter;86.try89.finally91.end;92.FList.Delete(AIndex);94.Dispose(ClientSocket)95.else96.FIdleList.Add(ClientSocket);104.fori:=0toCount-1do105.begin106.ifItems[i].SocketHandle=ASocketHandlethen107.begin108.iIndex:=i;109.Break;110.end;111.end;112.ifiIndex<>-1then113.begin114.Delete(iIndex);115.end; (六):协议字符集Fson及其它语言(如日文,韩文)。使用UTF-8的好处是现在一些手机平台都是使用UTF-8,另外在一些嵌入式平台,如果不支持中文,只DELPHI默认支持的编码是ANSI,为了支持UTF-8,需要转换下,为了编程简便性,我们只在发包和解[[delphi]viewplaincopy2.ARequest,AResponse:TStrings;constAData:string):Boolean;3.var4.slBuff:TStringList;5.dwPacketLen,dwCommandLen,dwCode:Cardinal;6.sBuff:string;7.utf8Command,utf8Data:UTF8String;8.begin9.try11.try12.FClient.ReadTimeout:=FTimeOutMS;13.FClient.MaxLineLength:=1024*1024*1024;15.slBuff.Add(CSCommand+CSEqualSign+ACommand);16.ifAssigned(ARequest)then17.slBuff.AddStrings(ARequest);20.ifnotIsEmptyStr(AData)then22.dwPacketLen:=SizeOf(Cardinal)+Length(utf8Command)+Length(utf8Data);//总长度23.dwCommandLen:=Length(utf8Command);//命令长度24.FClient.WriteCardinal(dwPacketLen,False);//发送整个包长度25.FClient.WriteCardinal(dwCommandLen,False);//发送命令长度26.FClient.WriteBuffer(PUTF8String(utf8Command)^,dwCommandLen,True);//发送命令内容27.ifnotIsEmptyStr(AData)thengutfDataLengthutfDataTrue31.SetLength(utf8Command,dwPacketLen-SizeOf(Integer));32.FClient.ReadBuffer(PUTF8String(utf8Command)^,dwPacketLen-SizeOf(Integer));//读取后续33.slBuff.Clear;34.slBuff.Text:=Utf8ToAnsi(utf8Command);//转换为ANSI36.Result:=dwCode=0;37.ifnotResultthen38.FLastError:=GetErrorCodeString(dwCode)+':'+slBuff.Values[CSMessage];39.ifAssigned(AResponse)then40.AResponse.Text:=slBuff.Text;41.finally42.slBuff.Free;43.end;44.except45.onE:Exceptiondo46.beginge48.Result:=False;49.end;50.end;[[delphi]viewplaincopy1.functionTBaseSocket.DecodePacket(APacketData:PByte;2.constALen:Integer):Boolean;3.var4.CommandLen:Integer;5.UTF8Command:UTF8String;6.beginen8.begin9.CopyMemory(@CommandLen,APacketData,SizeOf(Cardinal));//获取命令长度10.Inc(APacketData,SizeOf(Cardinal));11.SetLength(UTF8Command,CommandLen);12.CopyMemory(PUTF8String(UTF8Command),APacketData,CommandLen);//读取命令13.Inc(APacketData,CommandLen);14.FRequestData:=APacketData;//数据15.FRequestDataLen:=ALen-SizeOf(Cardinal)-CommandLen;//数据长度17.Result:=True;18.end19.else20.Result:=False;这样好处就是外部编程接口就都是ANSI,和DELPHI保持一致,应用编写就可以不用注意这个细节。 (七):通讯协议协议种类开发Socket程序有两种协议类型,一种是用文本描述的,类似HTTP协议,定义字符集,好处是兼容性和调试方便,缺点是解析文本会损耗一些性能;一种是用Code加结构体,定义字节顺序,好处是性能高,缺点是兼容性和调试不方便。这个可以根据应用场景灵活选择,如果您的应用相对稳定,需求变化少,性能要求高,则可以使用Code加结构体的方式。如果您的应用需要不停的扩充功能,但是对性能要求不苛刻,则可以使用文本解析的方式。这两种协议有两个比较典型的应用场景,Code加结构体更多应用在中间件上,因为协议的封装都是透明的,不需要联调,而且性能要求较高;文本解析则更多应用在外部交互上,如和设备、手机通讯,需要联调,但是性能要求没那么高。我们Demo是采用文本解析的方式,具体可以根据应用灵活选择。定义协议有以下注意点(方便不同平台接入)。字节顺序不同硬件平台或操作系统下,字节顺序是不一致的,有的是高位在前,低位在后,有的则是低位在前。Windows是低位在前,高位在后,每个平台下都有函数实现字节转换。TCP/IP定义的字节顺序是高位在前、低位在后,可以使用[[delphi]viewplaincopy1.functionntohl(netlong:u_long):u_long;stdcall;2.functionntohs(netshort:u_short):u_short;stdcall;[delphi]viewplaincopy1.functionhtonl(hostlong:u_long):u_long;stdcall;2.functionhtons(hostshort:u_short):u_short;stdcall;来实现本地字节顺序转为网络字节顺序。更流行的做法是使用网络字节顺序,这样规范统一。我们这里使用Windows字节顺序,即低位在前、高位在后,和网络字节顺序刚好相反。字符集需要转换就是UTF-8格式,对于跨平台具有优势。数据包格式Code个4字节的长度,后 (八):断点续传点续传断点续传主要是用在上传或下载文件,一般做法是开始上传的时候,服务器返回上次已经上传的大小,如果上传完成,则返回-1;下载开始的时候,由客户端上报本地已经下载大小,服务器根据位置信息下发数据,Size大小,例如我们协议格式。客户端->服务器{[Request]Command=UploadFileName=FileName}服务器->客户端{[Response]Command=UploadCode=ErrorCodeMessage=MessageFileSize=FileSize}客户端->服务器{[Request]#目录,全路径名#文件名(不包括路径)#错误码#如果出错,返回错误描述信息#已上传文件的大小,用于续传Command=DownloadDir=Dir#目录,全路径名FileName=FileName#文件名(不包括路径)FileSize=FileSize#客户端本地文件大小,用于断点续传PacketSize=PacketSize#下发数据包大小,单位为KB,用于速度测试}服务器->客户端{[Response]Command=DownloadCode=ErrorCodeMessage=Message}多线程并发下载#错误码#如果出错,返回错误描述信息部分,全部下载完成后,把每个数据块合并为一个文件。这个服务端和客户端协议都不需要修改,只是需要做多线程并发上传这个需要定义通讯来支持这个逻辑,主要是服务器要提供合并多个数据文件为一个文件的协议逻辑。 (九):稳定性问题解决IOCP接收缓存导致的内存错乱检测,是本没有内存错误的地方,而且内存错误出现的地方也不固定。这是一个不可重现的Bug,后续通过打TSocketHandle释放引起的,具体原因是:在IOCP中,每个Socket连接需要投递一个接收请求,并给出数TSocketHandle销毁后,IOCP返回一个异步接收消息,会导致写入到已销毁的接收缓存,造成内存被重写,导致内存错误。解决办法,是用锁和对象分离相同的机制,把接收缓存和对象分离,在释放对象的时候不释放接收缓存,等待超过30分钟后,重新使用这个锁和接受缓存,这样做即可以解决内存错乱问题,也起到了锁和接收缓存的池化处理。具体代码处理:投递请求缓存和对象分开,采用是锁和对象分离相同的机制。[[delphi]viewplaincopy1.{*客户端对象和锁*}2.TClientSocket=record3.

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论