프로그래밍

TCP/IP 패킷 수신(패킷 분리 및 합침)

업글 2021. 12. 15. 00:37

안녕하세요 업글입니다!

 

기본적인 TCP/IP 통신에 대한 예제들은 구글링을 통해서 쉽게 찾을 수 있습니다. 그러나 기본적인 함수 사용에 대한 내용이므로 바로 프로젝트 또는 현업에 적용하는 것은 쉽지 않습니다.

해당 포스팅에서는 TCP/IP 통신에서 패킷 수신 시 고려사항과 해당 사항을 고려한 로직 및 C/C++ 기반의 코드에 대해서 설명드리겠습니다.

 

TCP/IP 통신은 MSS 사이즈, 윈도우 사이즈, 네이글 알고리즘 등과 같은 특징들에 의해서 상대측 어플리케이션 레벨에서 송신한 패킷이 분리되거나 합쳐져서 송신되어 수신 시에 분리되거나 합쳐져서 수신되는 경우가 있습니다.

 

패킷 분리되는 경우

abcdef 송신
abc, def로 분리

위의 그림은 패킷 분리에 대한 예시로 상대측 어플리케이션 레벨에서 한번에 abcdef를 송신하였는데 수신 함수를 통해서 첫번째 수신하였을 때는 abc, 두번째 수신하였을 때는 def가 수신되는 경우입니다. TCP 관련 Layor를 통해서 abc, def로 분리된 후 송신되어 분리된 패킷을 수신하게 됩니다.

 

패킷 합쳐지는 경우

abc, def로 2번 송신
abcdef 수신

위의 그림은 패킷 분리와 반대의 경우로 합쳐지는 경우입니다. 상대측에서 첫번째로 abc를 송신하고 두번째로 def를 송신하였을 때 수신하는 경우에는 abcdef가 한번에 들어오는 경우입니다. 위에서 설명드린 것과 같이 TCP 관련 Layor를 통해서 abc, def 패킷이 합쳐진 후 송신되어 1개의 패킷을 수신하게 됩니다.

 

위의 예시와 같이 여러가지 경우로 패킷이 분리되거나 합쳐질 수 있습니다. TCP/IP 통신 시에 데이터의 형태나 크기가 다른 여러 메시지를 송수신 하기 때문에 이를 고려하여 패킷 수신 로직을 구현 해주셔야 합니다. 구현에 있어서 정답은 없고 사람 및 회사에 따라서 방식에 차이가 있을 것으로 보입니다.

 

데이터의 앞부분에 헤더를 붙여서 통신하는 방식을 설명드리겠습니다. 헤더는 메시지 ID, 송수신 장비 번호, 메시지 크기 등과 같은 내용을 포함하게 됩니다. 패킷의 유효성 검사를 위해서 START/END CODE 및 CHECK SUM/CRC와 같은 항목을 추가하셔도 됩니다.

 

코드로 자세히 설명드리겠습니다. 위에서 설명되는 경우에 대한 처리는 main.c 코드의 ThreadParsing 부분에서 진행함으로 이 부분을 집중적으로 보시는 것을 추천드립니다. TCP/IP 통신 및 데이터 큐 코드는 중요도가 낮으므로 해당 로직에 대한 설명과 테스트 결과에 대한 설명 뒷부분에 게시하였습니다.

 

Visual Studio 속성 > 전처리기에서 _WINSOCK_DEPRECATED_NO_WARNINGS를 추가하셔야 에러없이 동작됨을 참고하시기 바랍니다.

 

main.c 코드

#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <WinSock2.h>

#include "dataQueue.h"
#include "tcpIpComm.h"

#include <windows.h>

#pragma pack(push, 1)
typedef struct {
    uint32_t msgId;
    uint32_t srcId;
    uint32_t dstId;
    size_t msgSize;
}ST_MsgHeader;

typedef struct {
    ST_MsgHeader msgHeader;
    int8_t data[10];
}ST_TestPacket;
#pragma pack(pop) 

typedef enum {
    CONNECT,
    RECV,
    SEND
}CommStat;

typedef enum {
    WAIT_HEADER_RECV,
    GET_HEADER,
    WAIT_DATA_RECV,
    GET_DATA,
    MSG_SEND
}ParsingStat;

ST_Queue g_queue;

DWORD WINAPI ThreadReceive(void * arg)
{
    SOCKET clntSocket;

    CommStat stat = CONNECT;
    while (TRUE) {
        switch (stat) {
            case CONNECT:
                if (ClntConnection(5000, &clntSocket, SERV_IP_ADDR) == FALSE) {
                    Sleep(1000);
                }
                else {
                    stat = RECV;
                }
                break;
            case RECV:
                if (ClntRecv(&clntSocket, &g_queue) == FALSE) {
                    stat = CONNECT;
                }
                break;
        }

        Sleep(0);
    }    
}

DWORD WINAPI ThreadParsing(void* arg)
{
    int32_t i;
    int8_t * msg = NULL;
    ST_MsgHeader msgHeader;
    ParsingStat stat = WAIT_HEADER_RECV;

    while (TRUE) {
        switch (stat) {
        case WAIT_HEADER_RECV:
            if (QueueSize(&g_queue) >= sizeof(ST_MsgHeader)) {
                stat = GET_HEADER;
            }
            break;
        case GET_HEADER:
            QueueAddrPop(&g_queue, (int8_t*)&msgHeader, sizeof(ST_MsgHeader));
            stat = WAIT_DATA_RECV;
            break;
        case WAIT_DATA_RECV:
            if (QueueSize(&g_queue) >= (msgHeader.msgSize - sizeof(ST_MsgHeader))) {
                stat = GET_DATA;
            }
            break;
        case GET_DATA:
            msg = (int8_t *)malloc(msgHeader.msgSize);
            memcpy(msg, &msgHeader, sizeof(ST_MsgHeader));
            QueueAddrPop(&g_queue, &msg[sizeof(ST_MsgHeader)], (msgHeader.msgSize - sizeof(ST_MsgHeader)));
            stat = MSG_SEND;
            break;

        case MSG_SEND:
            printf("msgId=%d, srcId=%d, dstId=%d, msgSize=%d\n", msgHeader.msgId, msgHeader.srcId, msgHeader.dstId, msgHeader.msgSize);
            for (i = 0; i < (msgHeader.msgSize - sizeof(ST_MsgHeader)); i++) {
                printf("%d,", msg[sizeof(ST_MsgHeader) + i]);
            }
            printf("\n");
            switch (msgHeader.msgId) {
                // msgId 별 처리 ex) 다른 쓰레드로 전달
            }
            free(msg);
            stat = WAIT_HEADER_RECV;
            break;
        }

        Sleep(0);
    }
}

DWORD WINAPI ThreadSend(void* arg)
{
    int32_t i, j;
    SOCKET servSocket, clntSocket;
    ST_TestPacket packet[3];

    for (j = 0; j < 3; j++) {
    
        packet[j].msgHeader.msgId = j+1;
        packet[j].msgHeader.srcId = 1;
        packet[j].msgHeader.dstId = 2;
        packet[j].msgHeader.msgSize = sizeof(ST_TestPacket);

        for (i = 0; i < 10; i++) {
            packet[j].data[i] = j+1;
        }
    }

    CommStat stat = CONNECT;
    while (TRUE) {
        switch (stat) {
        case CONNECT:
            if (ServConnection(5000, &servSocket , &clntSocket) == FALSE) {
                Sleep(1000);
            }
            else {
                stat = SEND;
            }
            break;
        case SEND:
            // case 1 분할
            send(clntSocket, &packet[0], sizeof(ST_MsgHeader) + 5, 0);
            send(clntSocket, ((int8_t *)&packet[0] + sizeof(ST_MsgHeader) + 5), 5, 0);

            // case 2 합
            send(clntSocket, &packet[1], sizeof(ST_TestPacket) * 2, 0);
            Sleep(10000);
            break;
        }

        Sleep(0);
    }
}


int main()
{
    if (QueueInit(&g_queue, QUEUE_LENGTH) == NO) {
        printf("queue init fail\n");
    }

    WSADATA wsaData;
    WSAStartup(MAKEWORD(2, 2), &wsaData);

    HANDLE hReceiveThrd, hParsingThrd, hSendThrd;
    DWORD threadId, threadParam;
    hReceiveThrd = CreateThread(NULL, 0, ThreadReceive, &threadParam, 0, &threadId);
    hParsingThrd = CreateThread(NULL, 0, ThreadParsing, &threadParam, 0, &threadId);
    hSendThrd = CreateThread(NULL, 0, ThreadSend, &threadParam, 0, &threadId);

    WaitForSingleObject(hReceiveThrd, INFINITE);
    WaitForSingleObject(hParsingThrd, INFINITE);
    WaitForSingleObject(hSendThrd, INFINITE);

    CloseHandle(hReceiveThrd);
    CloseHandle(hParsingThrd);
    CloseHandle(hSendThrd);

    return 0;
}

ThreadSend의 경우 테스트를 위한 쓰레드이고 ThreadReceive는 수신을 위한 쓰레드입니다. ThreadParsing이 분리, 합쳐진 패킷을 기존 패킷으로 파싱하는 쓰레드입니다. ThreadSend의 첫번째 케이스에서는 패킷이 분할되는 상황, 두번째 케이스에서는 패킷이 합쳐지는 상황에 대해서 모의하였습니다. ThreadReceive에서는 TCP/IP 수신 후 데이터를 큐에 Push합니다. ThreadParsing에서는 큐를 확인하고 Pop하여 파싱으로 기존 전송 패킷을 수신한 후 처리합니다. 아래의 그림은ThreadParsing에서 패킷을 수신하기 위한 파싱 로직입니다.

패킷 수신 파싱 로직

테스트 결과는 아래의 그림과 같습니다.

테스트 결과

 

TCP/IP 통신 코드 헤더

#pragma once

#include <stdint.h>
#include <WinSock2.h>
#include "dataQueue.h"

#define RECV_BUFFER_SIZE (16384)
#define SERV_IP_ADDR ("127.0.0.1")

int32_t ServConnection(int16_t port, SOCKET* servSocket, SOCKET* clntSocket);
int32_t ServRecv(SOCKET* servSocket, SOCKET* clntSocket, ST_Queue* queue);
int32_t ClntConnection(int16_t port, SOCKET* clntSocket, char* ipAddr);
int32_t ClntRecv(SOCKET* socket, ST_Queue* queue);

 

TCP/IP 통신 코드

#include "tcpIpComm.h"

int32_t ServConnection(int16_t port, SOCKET* servSocket, SOCKET* clntSocket)
{
	int32_t result = FALSE;
	int32_t option = 1;
	SOCKADDR_IN servAddr, clntAddr;
	int32_t szClntAddr;

	do {
		*servSocket = socket(PF_INET, SOCK_STREAM, 0);

		if (*servSocket == INVALID_SOCKET) {
			break;
		}

		memset(&servAddr, 0, sizeof(servAddr));

		servAddr.sin_family = AF_INET;
		servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
		servAddr.sin_port = htons(port);

		if (bind(*servSocket, (SOCKADDR*)&servAddr, sizeof(servAddr)) == SOCKET_ERROR) {
			closesocket(*servSocket);
			break;
		}

		if (listen(*servSocket, 5) == SOCKET_ERROR) {
			closesocket(*servSocket);
			break;
		}

		szClntAddr = sizeof(clntAddr);
		*clntSocket = accept(*servSocket, (SOCKADDR*)&clntAddr, &szClntAddr);   // 클라이언트와 통신 할

		if (*clntSocket == INVALID_SOCKET) {
			closesocket(*clntSocket);
			closesocket(*servSocket);
			break;
		}
		else {
			result = TRUE;
			break;
		}
	} while (0);
	
	return result;
}

int32_t ServRecv(SOCKET* servSocket, SOCKET* clntSocket, ST_Queue* queue)
{
	int32_t result, len;
	static int8_t recvBuf[RECV_BUFFER_SIZE];

	len = recv(*clntSocket, recvBuf, RECV_BUFFER_SIZE, 0);

	if (len <= 0) {
		closesocket(*servSocket);
		closesocket(*clntSocket);
		result = FALSE;
	}
	else {
		QueueAddrPush(queue, recvBuf, len);
		result = TRUE;
	}

	return result;
}

int32_t ClntConnection(int16_t port, SOCKET* clntSocket, char * ipAddr)
{
	int32_t result = FALSE;

	SOCKADDR_IN servAddr;

	do {
		*clntSocket = socket(PF_INET, SOCK_STREAM, 0);

		if (*clntSocket == INVALID_SOCKET) {
			break;
		}

		servAddr.sin_family = AF_INET;
		servAddr.sin_addr.s_addr = inet_addr(ipAddr);
		servAddr.sin_port = htons(port);
		
		if (connect(*clntSocket, (SOCKADDR*)&servAddr, sizeof(servAddr)) == SOCKET_ERROR) {
			closesocket(*clntSocket);
			break;
		}
		else {
			result = TRUE;
			break;
		}

	} while (0);

	return result;
}

int32_t ClntRecv(SOCKET* socket, ST_Queue* queue)
{
	int32_t result, len;
	static int8_t recvBuf[RECV_BUFFER_SIZE];

	len = recv(*socket, recvBuf, RECV_BUFFER_SIZE, 0);

	if (len <= 0) {
		closesocket(*socket);
		result = FALSE;
	}
	else {
		QueueAddrPush(queue, recvBuf, len);
		result = TRUE;
	}

	return result;
}

 

데이터 큐 코드 헤더

#pragma once

#include <stdint.h>
#include <windows.h>

#define NO (0)
#define YES (!NO)

#define QUEUE_LENGTH (65536)

typedef struct {
    int32_t front;
    int32_t back;
    int32_t size;
    int32_t bufSize;
    int8_t* buf;

    CRITICAL_SECTION lock;
}ST_Queue;

int32_t IsQueueEmpty(ST_Queue* queue);
int32_t QueueSize(ST_Queue* queue);
int32_t IsQueueFull(ST_Queue* queue);
int32_t QueueInit(ST_Queue* queue, int32_t queueLength);
void QueueDelete(ST_Queue* queue);
void QueuePush(ST_Queue* queue, int8_t data);
void QueueAddrPush(ST_Queue* queue, int8_t* data, size_t dataSize);
int32_t QueuePop(ST_Queue* queue);
void QueueAddrPop(ST_Queue* queue, int8_t* data, size_t dataSize);

 

데이터 큐 코드

#include "dataQueue.h"

int32_t IsQueueEmpty(ST_Queue* queue)
{
    int32_t result;

    EnterCriticalSection(&queue->lock);
    if (queue->size == 0) {
        result = YES;
    }
    else {
        result = NO;
    }
    LeaveCriticalSection(&queue->lock);

    return result;
}

int32_t QueueSize(ST_Queue* queue)
{
    int32_t result;

    EnterCriticalSection(&queue->lock);
    result = queue->size;
    LeaveCriticalSection(&queue->lock);

    return result;
}

int32_t IsQueueFull(ST_Queue* queue)
{
    int32_t result;

    EnterCriticalSection(&queue->lock);
    if (queue->size == queue->bufSize) {
        result = YES;
    }
    else {
        result = NO;
    }
    LeaveCriticalSection(&queue->lock);

    return result;
}

int32_t QueueInit(ST_Queue* queue, int32_t queueLength)
{
    int32_t result;

    queue->buf = (int8_t*)malloc(sizeof(int8_t) * queueLength);

    if (queue->buf != NULL) {
        queue->front = 0;
        queue->back = 0;
        queue->size = 0;
        queue->bufSize = queueLength;
        memset(queue->buf, 0, (sizeof(int8_t) * queueLength));
        InitializeCriticalSection(&queue->lock);

        result = YES;
    }
    else {
        result = NO;
    }

    return result;
}

void QueueDelete(ST_Queue* queue)
{
    free(queue->buf);
    DeleteCriticalSection(&queue->lock);
}

void QueuePush(ST_Queue* queue, int8_t data)
{
    if (IsQueueFull(queue) == YES) {
        printf("queue is full\n");
    }
    else {
        EnterCriticalSection(&queue->lock);
        queue->buf[queue->back] = data;
        queue->back++;
        queue->size++;

        if (queue->back == queue->bufSize) {
            queue->back = 0;
        }
        LeaveCriticalSection(&queue->lock);
    }
}

void QueueAddrPush(ST_Queue* queue, int8_t* data, size_t dataSize)
{
    int32_t i;
    if (IsQueueFull(queue) == YES) {
        printf("queue is full\n");
    }
    else {
        EnterCriticalSection(&queue->lock);
        for (i = 0; i < dataSize; i++) {
            queue->buf[queue->back] = data[i];
            queue->back++;
            queue->size++;

            if (queue->back == queue->bufSize) {
                queue->back = 0;
            }
        }
        LeaveCriticalSection(&queue->lock);
    }
}

int32_t QueuePop(ST_Queue* queue)
{
    int8_t result = 0;

    if (IsQueueEmpty(queue) == YES) {
        printf("queue is emtpy\n");
    }
    else {
        EnterCriticalSection(&queue->lock);
        result = queue->buf[queue->front];
        queue->front++;
        queue->size--;

        if (queue->front == queue->bufSize) {
            queue->front = 0;
        }
        LeaveCriticalSection(&queue->lock);
    }

    return result;
}

void QueueAddrPop(ST_Queue* queue, int8_t* data, size_t dataSize)
{
    int32_t i;
    if (IsQueueEmpty(queue) == YES) {
        printf("queue is emtpy\n");
    }
    else {
        EnterCriticalSection(&queue->lock);
        for (i = 0; i < dataSize; i++) {
            data[i] = queue->buf[queue->front];
            queue->front++;
            queue->size--;

            if (queue->front == queue->bufSize) {
                queue->front = 0;
            }
        }
        LeaveCriticalSection(&queue->lock);
    }
}

이상으로 패킷 분리 및 합침을 고려한 TCP/IP 통신 수신 로직에 대한 설명을 마치겠습니다!

'프로그래밍' 카테고리의 다른 글

[c/c++] 프로세스 우선순위 변경  (0) 2021.12.10
[c언어/c++] 분기문(switch) 제거  (0) 2021.01.21
[c언어] 스택(Stack)  (0) 2021.01.14
[c언어] 링크드리스트(Linked List)  (0) 2021.01.13
[c언어] 큐(Queue)  (0) 2021.01.01