策略为王源代码分析-NetTs工程

Published

摘要:

此工程分三部分来解读,相互之间是独立的

1.发起请求部分:向行情服务器发送请求数据包后,发起请求部分就此结束,不用管了。

(1)接收StkNet发过来的单个请求

(2)启动了一个分笔数据请求线程,在交易时间内每隔3秒向服务器请求所有股票分笔数据行情。

2.数据接收部分:用事件机制接收行情服务器发送过来的数据

接收数据后存入类中的缓冲区,然后对此缓冲区的数据进行解码,解码后的数据包存入类中的另一个缓冲区后,不管了。

3.数据推送部分:数据分发线程

启动了一个数据分发线程,从缓冲区取出数据,用消息发送数据到StkNet的窗口句柄,不管了。

 

一、发起请求部分

(一)开启主动请求所有股票分笔数据的线程

//功能:1.每隔3秒向获取缓冲区的数据
// 2.接收数据空闲时,发送队列里缓存的行情数据请求命令。
// 缺陷:用线程发起请求,数据有延迟,应该改用websocket主动推送

条件:获取本地股票分笔成交文件,股票数量》0;

        只在交易时间内发起请求,可以修改本地时间。

UINT TWAutoReportThreadMain(LPVOID pParam)

 

请求数据

 CTWSocket的socket的事件机制把收到的数据先用CTSCache解码,CTSCache解码后存在了缓冲区,GetStockByCodeEx()利用CTSCache类直接从缓冲区中读取数据

int WINAPI 	GetStockByCodeEx(char * pszStockCode,int nMarket,RCV_REPORT_STRUCTEx * pBuf)
{
	return ( CTSCache::GetInstance().GetStockByCodeEx( pszStockCode, nMarket, pBuf ) ? 1 : -1 );
}

 

数据请求、接收的实现过程

NetTS对外提供

int WINAPI RequestStockData( int nDataType, STOCK_STRUCTEx* pStocks, int nSize, int nKType, int nDataCount );

由StkNet调用此函数发起请求行情。包括请求的数据类型、股票代码的集合等。

用socket的事件机制接收到原始数据后,先进行解码,然后存入变量缓冲区。

 

 

 

(二)分笔数请求过程

1.发起分笔数据请求

CTWSocket::RequestReport(TW_STOCK* pStock, int nSize)
			TW_ASK ask;
			int lenSend = ConstructAskReportBuffer(ask, stocks, putsize);
			if (lenSend > 0)
				//发送分笔数据请求 by freeman
				Send(&ask, lenSend);

 

二、数据接收部分

1.用CTWSocket::OnReceive(int nErrorCode)的事件机制接收数据

(1)接收到的数据先 存入NetTS\TWSocket.h的CTWSockt::m_rbuffer变量缓冲区(BYTE m_rbuffer[0x10000];  64k 字节接收缓冲区

 

//数据接收事件
void CTWSocket::OnReceive(int nErrorCode) 
{
	char szText[256];
	sprintf(szText, "→step 1.数据到达事件");
	g_pWinTrace->Debug()->Send("CTWSocket::OnReceive", szText);

	m_timeReceiveLast = time(NULL);

	//读取数据事件,存入m_rbuffer缓冲区
	int nReceive = Receive(m_rbuffer, sizeof(m_rbuffer));

	sprintf(szText, "→step 2.读取数据,调用Receive(m_rbuffer, sizeof(m_rbuffer)),存入m_rbuffer缓冲区,字节数:%d", nReceive);
	g_pWinTrace->Debug()->Send("CTWSocket::OnReceive", szText);

	if (nReceive > 0)
	{
		//调用程序对收到的数据解码
		sprintf(szText, "→step 3.读取数据长度>0,调用CTSCache::GetInstance().OnReceive(m_rbuffer, nReceive)处理");
		g_pWinTrace->Debug()->Send("CTWSocket::OnReceive", szText);
		//对收到的数据包进行解码
		CTSCache::GetInstance().OnReceive(m_rbuffer, nReceive);

		if (nReceive < 256 && TryGetLength(m_rbuffer, nReceive) < 256)	// 收到小块包,说明大包接收完毕
			m_bIsReceiving = FALSE;

		if (nReceive == sizeof(m_rbuffer))
			OnReceive(nErrorCode);
	}

	CSocket::OnReceive(nErrorCode);
}

(2) 用CTSCache::OnReceive( BYTE * buf, size_t len )函数对CTWSockt::m_rbuffer变量缓冲区数据进行解码

解码过程

首先将CTWSockt::m_rbuffer的数据拷贝到将数据拷贝到CTSCache::m_buffer 512k。然后开始解码

//对收到的数据包进行解码 by freeman
int CTSCache::OnReceive( BYTE * buf, size_t len )
{
	char szText[256];
	sprintf(szText, "→step 4.准备解码,准备调用DecodePacket() sizeof(m_buffer)=%d len=%d", sizeof(m_buffer), len);
	g_pWinTrace->Debug()->Send("CTSCache::OnReceive", szText);

	CSingleLock lock(&m_mutexBuffer,TRUE);

	if( NULL == buf || len <= 0 )
		return 0;
	if( len > sizeof(m_buffer) )
		return 0;
	if( m_nBufLen + len > sizeof(m_buffer) )
		m_nBufLen = 0;	// discard old

	//将数据拷贝到CTSCache::m_buffer 512k bytes buffer by freeman
	memcpy( m_buffer+m_nBufLen, buf, len ); 
	m_nBufLen += len;
	int packets = DecodePacket();
	while( packets > 0 )
		packets = DecodePacket();
	return len;
}

 

 

//解码数据包
int CTSCache::DecodePacket( )
{
	if( m_nBufLen <= 0 )
		return 0;

	int nPacketLen = FindFirstPacketLength();

	if( nPacketLen > 0 && nPacketLen <= (int)m_nBufLen )
	{
		TryGetPacket( nPacketLen );
		DiscardPacket( nPacketLen );
		return 1;
	}

	if( m_nBufLen > sizeof(m_buffer)/2 )
		m_nBufLen = 0;	// truncate if too big and no packets found.
	return 0;
}

 

int CTSCache::TryGetPacket( int nPacketLen )

 

尝试对CTSCache::m_buffer的数据解码,如果是分笔数据,转换为RCV_DATA结构,存入prcvdata 变量 by freeman
	nLen = TryGetReport( m_buffer, nPacketLen, prcvdata );

NetTS:解码的分笔数据存入了CTSCache::m_aReports变量,结构为RCV_REPORT_STRUCTEx

BOOL CTSCache::PushReport( char * pszStockCode, RCV_REPORT_STRUCTEx * pBuf )
{
	if( NULL == pszStockCode || strlen(pszStockCode) <= 0 || NULL == pBuf )
		return FALSE;

	CSingleLock lock(&m_mutexReports,TRUE);

	// look for it in map
	void * rValue = NULL;
	if( m_mapReports.Lookup( pszStockCode, rValue ) )
	{
		int nNo = (int)rValue;
		ASSERT( nNo >= 0 && nNo < m_aReports.GetSize() && 0 == strncmp(pszStockCode, m_aReports.ElementAt(nNo).m_szLabel, sizeof(m_aReports.ElementAt(nNo).m_szLabel)) );
		if( nNo >= 0 && nNo < m_aReports.GetSize()
			&& ( 0 == (m_aReports.ElementAt(nNo).m_szLabel[0] )
				|| 0 == strncmp(pszStockCode, m_aReports.ElementAt(nNo).m_szLabel, sizeof(m_aReports.ElementAt(nNo).m_szLabel)) ) )
			return PushReport( nNo, pBuf );
		else // something error
			m_mapReports.RemoveKey( pszStockCode );
	}

	ASSERT(	pBuf->m_cbSize == sizeof(RCV_REPORT_STRUCTEx) );
	pBuf->m_cbSize = sizeof(RCV_REPORT_STRUCTEx);
	//存入CTSCache::m_aReports变量,结构为RCV_REPORT_STRUCTEx
	int nNo = m_aReports.Add( *pBuf );
	//修改CTSCache::m_mapReports相关信息 
	m_mapReports.SetAt( pszStockCode, (void *)nNo );
	return TRUE;
}

如果通过解码,判断此数据包时分笔数据包,则:

(1)存入CTSCache::m_aReports变量,结构为RCV_REPORT_STRUCTEx

(2)消息号和数据存入CTSCache::m_aPackets,结构如下:

m_aPackets是一个TS_PACKET类型的数组

CTSPacketArray		m_aPackets;
typedef CArray< TS_PACKET, TS_PACKET &> CTSPacketArray;

TS_PACKET包含了消息号和数据体

typedef struct _ts_packet_t {
	UINT		m_nMsgType;
	PRCV_DATA	m_pRCV_DATA;
} TS_PACKET, PTS_PACKET;

数据体结构

typedef struct tagRCV_DATA
{
	int					m_wDataType;			// 文件类型
	int					m_nPacketNum;			// 记录数,参见注一
	RCV_FILE_HEADEx		m_File;					// 文件接口
	BOOL				m_bDISK;				// 文件是否已存盘的文件
	union
	{
		RCV_REPORT_STRUCTEx  *	m_pReport;
		RCV_HISTORY_STRUCTEx *	m_pDay;
		RCV_MINUTE_STRUCTEx  *	m_pMinute;
		RCV_POWER_STRUCTEx	 *	m_pPower;
		RCV_MULTISORT_STRUCTEx	*	m_pMultisort;
		void				 *	m_pData;		// 参见注二
	};
} RCV_DATA,*PRCV_DATA;
if( nLen > 0 )
	{
		//存入CTSCache::m_aReports变量,结构为RCV_REPORT_STRUCTEx
		PushReport( prcvdata->m_pReport, prcvdata->m_nPacketNum );
		//消息号和数据存入CTSCache::m_aPackets by freeman
		PushPacket( RCV_REPORT, prcvdata );
		prcvdata = NULL;
		return 1;
	}

CTSCache::PushPacket()

//消息号和数据存入CTSCache::m_aPackets by freeman
BOOL CTSCache::PushPacket( UINT nMsgType, PRCV_DATA pRCV_DATA )
{
	ASSERT( pRCV_DATA && pRCV_DATA->m_pData );

	CSingleLock lock(&m_mutexPackets,TRUE);

	TS_PACKET packet;
	packet.m_nMsgType = nMsgType;
	packet.m_pRCV_DATA = pRCV_DATA;
	//消息号和数据存入CTSCache::m_aPackets by freeman
	m_aPackets.Add( packet );
	return TRUE;
}

三、数据发送部分

(一)初始化该类时,启动数据分发线程。

//初始化该类时,启动数据分发线程。
CTSWnd::CTSWnd()
{
	CTSWnd::GetInstance().StartDispatchThread();
}

(二)数据分发线程,从缓冲区取出数据,用消息发送数据到StkNet的窗口句柄

// 分发数据线程,行情服务器主动推送线程 2019/06/08 by freeman
UINT TSDispatchThreadMain(LPVOID pParam)
{
	char szText[256];
	sprintf(szText, "→start CTSWnd::StartDispatchThread()");
	g_pWinTrace->Debug()->Send("NetTS:TSWnd.cpp", szText);

	while(TRUE)
	{
		UINT nMsgType = 0;
		PRCV_DATA pRCV_DATA = NULL;
		//从缓冲区中弹出数据,然后向已注册的窗口句柄发送消息。StkNet 向NetTS类注册了窗口句柄。 by freeman
		if (CTSCache::GetInstance().PopPacket(nMsgType, pRCV_DATA))
		{
			ASSERT(pRCV_DATA && pRCV_DATA->m_pData);
			if (pRCV_DATA && pRCV_DATA->m_pData)
				CTSWnd::GetInstance().SendMessage(nMsgType, (LPARAM)pRCV_DATA);

			char szText[256];
			sprintf(szText, "↑DataType:%d PacketNum:%d", pRCV_DATA->m_wDataType, pRCV_DATA->m_nPacketNum);
			g_pWinTrace->Debug()->Send("TSDispatchThreadMain", szText);

			CTSCache::GetInstance().FreePacket(pRCV_DATA);
		}

		Sleep(1);

		// User wants to quit program
		if (WAIT_OBJECT_0 == WaitForSingleObject(CTSWnd::m_hEventKillDispatchThread,0))
		{
			SetEvent(CTSWnd::m_hEventDispatchThreadKilled);
			AfxEndThread(0, TRUE);
			return 0;
		}
	}
}

 

从CTSCache::m_aPackets缓冲区(数组)中弹出数据

//从CTSCache::m_aPackets缓冲区(数组)中弹出数据
BOOL CTSCache::PopPacket( UINT & nMsgType, PRCV_DATA & pRCV_DATA )
{
	CSingleLock lock(&m_mutexPackets,TRUE);

	if( m_aPackets.GetSize() > 0 )
	{
		nMsgType = m_aPackets.ElementAt(0).m_nMsgType;
		pRCV_DATA = m_aPackets.ElementAt(0).m_pRCV_DATA;
		m_aPackets.RemoveAt(0);
		return TRUE;
	}
	return FALSE;
}