C#/네트워크

[c#][서버] Command 패턴 / JobQueue

goliot 2024. 6. 14. 19:18
반응형

Command 패턴 개념

  • 지금까지는
    • 클라이언트 세션이 요청
    • 패킷 핸들러에서 함수를 호출해 작업 수행
      • 게임 룸에 접근하여 브로드캐스팅
        • 주방에 직접 들어가 주문을 하는 것과 비슷
        • 직원이 서빙, 요리 다함 -> 그런데 엄청 많은 직원
    • 주문하려는 손님이 몰리고, 직원도 좁은 주방에 들어가 있는 상태
    • 일이 밀리면, 직원만 더 늘어남
  • 직원의 업무를 분담시켜서 직원 수를 가장 최적화시켜야 한다
    • 서빙 담당이 주문서를 만들어 주방에 전달하게 하자
    • 한 명의 주방장만 주문을 처리하게 하자
  • Command 패턴은, 캡슐화(주문서 만들기) 하는 작업을 하는 것
    • 주문을 받는것과 처리하는 것을 분리하는 것이 가능
    • 손님이 주문을 바꾸는 경우에도, 쉽게 처리 가능(UNDO)

 Command 패턴 구현(JobQueue)

namespace ServerCore
{
    public interface IJobQueue
    {
        void Push(Action job);
    }

    public class JobQueue : IJobQueue
    {
        Queue<Action> _jobQueue = new Queue<Action>();
        object _lock = new object();
        bool _flush = false;

        public void Push(Action job)
        {
            bool flush = false;

            lock (_lock)
            {
                _jobQueue.Enqueue(job);
                if (_flush == false)
                    flush = _flush = true;
            }

            if (flush)
                Flush();
        }

        void Flush()
        {
            while (true)
            {
                Action action = Pop();
                if (action == null)
                    return;

                action.Invoke();
            }
        }

        Action Pop()
        {
            //pop을 하는 와중에도 작업이 들어올 수 있으니 락을 건다
            lock (_lock) 
            {
                if (_jobQueue.Count == 0)
                {
                    _flush = false;
                    return null;
                }
                return _jobQueue.Dequeue();
            }
        }
    }
}
  • 잡큐에서 락을 다 잡고있으므로, 잡큐를 사용하는 다른 클래스에서는 락을 걸 필요가 없음
  • 잡큐를 사용하면, 생성되는 쓰레드 수가 확연히 줄어든다

 충돌 상황

public static void C_ChatHandler(PacketSession session, IPacket packet)
{
    C_Chat chatPacket = packet as C_Chat;
    ClientSession clientSession = session as ClientSession;

    if (clientSession == null)
        return;

    //잡큐에 broadcast 작업 넣기
    clientSession.Room.Push(() => clientSession.Room.Broadcast(clientSession, chatPacket.chat));
}
  • 다음과 같은 push 작업 이후, 대기중일 때, clientSession이 접속이 종료된다던가 하는 상황으로 인해 null로 바뀌면, 충돌이 발생
GameRoom room = clientSession.Room;
room.Push(() => room.Broadcast(clientSession, chatPacket.chat));
  • 위와 같이, 한 번 꺼내서, Room의 상태가 바뀌더라도 room이 남게 하면 안정적으로 처리 가능
  • 람다가 없던 시절에는 클래스를 따로 파서 구현했다
interface ITask
{
    void Execute();
}

class BroadcastTask : ITask
{
    GameRoom _room;
    ClientSession _session;
    string _chat;

    BroadcastTask(GameRoom room, ClientSession session, string chat)
    {
        _room = room;
        _session = session;
        _chat = chat;
    }

    public void Execute()
    {
        _room.Broadcast(_session, _chat);
    }
}

class TaskQueue
{
    Queue<ITask> _queue = new Queue<ITask>();
}

 

누가 JobQueue를 처리하나?(쓰레드)

  • '바람의 나라' 같은 영역이 딱 정해진 게임에서는 영역마다 잡큐를 하나씩 둔다.
  • 오픈월드같은 영역 개념이 없는 곳에서는, 캐릭터, 스킬, 몬스터 등 사물마다 하나씩 둔다

패킷 모아 보내기

  • 잡큐를 사용하여, 쓰레드는 줄였지만, 여전히 O(N^2)를 벗어나지 못했다.
    • 500명만 Dummy Client를 넣어도 사용 메모리가 계속 늘어나고 힘들어한다
  • 요청이 들어올 때마다 보내는 작업을 큐에 넣고있기 때문.
  • 이를 좀 모아서 보내도록 해보자
public void Flush()
{
    // N ^ 2
    foreach (ClientSession s in _sessions)
        s.Send(_pendingList);

    Console.WriteLine($"Flushed {_pendingList.Count} items");
    _pendingList.Clear();
}

public void Broadcast(ClientSession session, string chat)
{
    S_Chat packet = new S_Chat();
    packet.playerId = session.SessionId;
    packet.chat = $"{chat} I am {packet.playerId}";
    ArraySegment<byte> segment = packet.Write();

    _pendingList.Add(segment);
}
  • 리스트에 넣어만 두고, 이를 처리하는 Flush는 메인에서 일정 시간마다 한번씩 수행하도록 수

JobTimer

  • Room이 여러개고, Flush를 하는 객체가 많아질 때를 대비해보자
  • 모든 객체를 동일한 시간의 텀으로 Flush할 수는 없다.
using ServerCore;

namespace Server
{
    struct JobTimerElem : IComparable<JobTimerElem>
    {
        public int execTick; // 실행 시간
        public Action action;

        public int CompareTo(JobTimerElem other)
        {
            return other.execTick - execTick;
        }
    }

    class JobTimer
    {
        PriorityQueue<JobTimerElem> _pq = new PriorityQueue<JobTimerElem>();
        object _lock = new object();

        public static JobTimer Instance { get; } = new JobTimer();

        public void Push(Action action, int tickAfter = 0)
        {
            JobTimerElem job;
            job.execTick = Environment.TickCount + tickAfter;
            job.action = action;

            lock (_lock)
            {
                _pq.Push(job);
            }
        }

        public void Flush()
        {
            while(true)
            {
                int now = Environment.TickCount;

                JobTimerElem job;

                lock (_lock)
                {
                    if (_pq.Count == 0)
                        break;

                    job = _pq.Peek();
                    if (job.execTick > now)
                        break;

                    _pq.Pop();
                }

                job.action.Invoke();
            }
        }
    }
}
  • 우선순위 큐를 사용해, 각 Job의 다음 실행 시간을 관리한다.
  • 메인에서는
static void FlushRoom()
{
    Room.Push(() => Room.Flush());
    JobTimer.Instance.Push(FlushRoom, 250);
}

static void Main(string[] args)
{
    //DNS 사용
    string host = Dns.GetHostName();
    IPHostEntry ipHost = Dns.GetHostEntry(host);
    IPAddress ipAddr = ipHost.AddressList[0];
    IPEndPoint endPoint = new IPEndPoint(ipAddr, 7777);

    _listener.Init(endPoint, () => { return SessionManager.Instance.Generate(); });
    Console.WriteLine("Listening...");

    //FlushRoom();
    JobTimer.Instance.Push(FlushRoom);

    while (true)
    {
        JobTimer.Instance.Flush();
    }
}

 

반응형