Browse Source

* dthread.cpp: replace TMegaPkt with DThreadStruct

* consistently use uint8_t for pnum
 * get rid of checks that never fail
 * use atomic_bool for dthread_running
* dthread: eliminate superfluous memory copies
* Pass correct-sized buffer to DeltaExportJunk
* dthread: Fix crash when list is empty
* DthreadHandler: eliminate delay
* Dthread: use SdlMutex and lock_guard
* dhtread: explicitly provide T for lock_guard
pull/2411/head
Vladimir Olteanu 5 years ago committed by GitHub
parent
commit
dd4b70a24b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 129
      Source/dthread.cpp
  2. 5
      Source/dthread.h
  3. 23
      Source/msg.cpp
  4. 8
      Source/multi.cpp

129
Source/dthread.cpp

@ -4,18 +4,37 @@
* Implementation of functions for updating game state from network commands.
*/
#include <list>
#include <atomic>
#include <mutex>
#include "nthread.h"
#include "storm/storm.h"
#include "utils/sdl_mutex.h"
#include "utils/thread.h"
namespace devilution {
struct DThreadPkt {
int pnum;
_cmd_id cmd;
std::unique_ptr<byte[]> data;
uint32_t len;
DThreadPkt(int pnum, _cmd_id(cmd), std::unique_ptr<byte[]> data, uint32_t len)
: pnum(pnum)
, cmd(cmd)
, data(std::move(data))
, len(len)
{
}
};
namespace {
CCritSect sgMemCrit;
SdlMutex DthreadMutex;
SDL_threadID glpDThreadId;
TMegaPkt *sgpInfoHead; /* may not be right struct */
bool dthread_running;
std::list<DThreadPkt> InfoList;
std::atomic_bool dthread_running;
event_emul *sghWorkToDoEvent;
/* rdata */
@ -23,37 +42,21 @@ SDL_Thread *sghThread = nullptr;
void DthreadHandler()
{
const char *errorBuf;
TMegaPkt *pkt;
DWORD dwMilliseconds;
while (dthread_running) {
if (sgpInfoHead == nullptr && WaitForEvent(sghWorkToDoEvent) == -1) {
errorBuf = SDL_GetError();
app_fatal("dthread4:\n%s", errorBuf);
}
if (InfoList.empty() && WaitForEvent(sghWorkToDoEvent) == -1)
app_fatal("dthread4:\n%s", SDL_GetError());
sgMemCrit.Enter();
pkt = sgpInfoHead;
if (sgpInfoHead != nullptr)
sgpInfoHead = sgpInfoHead->pNext;
else
DthreadMutex.lock();
if (InfoList.empty()) {
ResetEvent(sghWorkToDoEvent);
sgMemCrit.Leave();
if (pkt != nullptr) {
if (pkt->dwSpaceLeft != MAX_PLRS)
multi_send_zero_packet(pkt->dwSpaceLeft, static_cast<_cmd_id>(pkt->data[0]), &pkt->data[8], *(DWORD *)&pkt->data[4]);
dwMilliseconds = 1000 * *(DWORD *)&pkt->data[4] / gdwDeltaBytesSec;
if (dwMilliseconds >= 1)
dwMilliseconds = 1;
std::free(pkt);
if (dwMilliseconds != 0)
SDL_Delay(dwMilliseconds);
DthreadMutex.unlock();
continue;
}
DThreadPkt pkt = std::move(InfoList.front());
InfoList.pop_front();
DthreadMutex.unlock();
multi_send_zero_packet(pkt.pnum, pkt.cmd, pkt.data.get(), pkt.len);
}
}
@ -61,74 +64,38 @@ void DthreadHandler()
void dthread_remove_player(uint8_t pnum)
{
TMegaPkt *pkt;
sgMemCrit.Enter();
for (pkt = sgpInfoHead; pkt != nullptr; pkt = pkt->pNext) {
if (pkt->dwSpaceLeft == pnum)
pkt->dwSpaceLeft = MAX_PLRS;
}
sgMemCrit.Leave();
std::lock_guard<SdlMutex> lock(DthreadMutex);
InfoList.remove_if([&](auto &pkt) {
return pkt.pnum == pnum;
});
}
void dthread_send_delta(int pnum, _cmd_id cmd, byte *pbSrc, int dwLen)
void dthread_send_delta(int pnum, _cmd_id cmd, std::unique_ptr<byte[]> data, uint32_t len)
{
TMegaPkt *pkt;
TMegaPkt *p;
if (!gbIsMultiplayer) {
if (!gbIsMultiplayer)
return;
}
pkt = static_cast<TMegaPkt *>(std::malloc(dwLen + 20));
if (pkt == nullptr)
app_fatal("Failed to allocate memory");
pkt->pNext = nullptr;
pkt->dwSpaceLeft = pnum;
pkt->data[0] = static_cast<byte>(cmd);
*(DWORD *)&pkt->data[4] = dwLen;
memcpy(&pkt->data[8], pbSrc, dwLen);
sgMemCrit.Enter();
p = (TMegaPkt *)&sgpInfoHead;
while (p->pNext != nullptr) {
p = p->pNext;
}
p->pNext = pkt;
DThreadPkt pkt { pnum, cmd, std::move(data), len };
std::lock_guard<SdlMutex> lock(DthreadMutex);
InfoList.push_back(std::move(pkt));
SetEvent(sghWorkToDoEvent);
sgMemCrit.Leave();
}
void dthread_start()
{
const char *errorBuf;
if (!gbIsMultiplayer) {
if (!gbIsMultiplayer)
return;
}
sghWorkToDoEvent = StartEvent();
if (sghWorkToDoEvent == nullptr) {
errorBuf = SDL_GetError();
app_fatal("dthread:1\n%s", errorBuf);
}
dthread_running = true;
sghThread = CreateThread(DthreadHandler, &glpDThreadId);
if (sghThread == nullptr) {
errorBuf = SDL_GetError();
app_fatal("dthread2:\n%s", errorBuf);
}
}
void DThreadCleanup()
{
TMegaPkt *tmp;
if (sghWorkToDoEvent == nullptr) {
if (sghWorkToDoEvent == nullptr)
return;
}
dthread_running = false;
SetEvent(sghWorkToDoEvent);
@ -139,11 +106,7 @@ void DThreadCleanup()
EndEvent(sghWorkToDoEvent);
sghWorkToDoEvent = nullptr;
while (sgpInfoHead != nullptr) {
tmp = sgpInfoHead->pNext;
std::free(sgpInfoHead);
sgpInfoHead = tmp;
}
InfoList.clear();
}
} // namespace devilution

5
Source/dthread.h

@ -5,10 +5,13 @@
*/
#pragma once
#include <memory>
#include "utils/stdcompat/cstddef.hpp"
namespace devilution {
void dthread_remove_player(uint8_t pnum);
void dthread_send_delta(int pnum, _cmd_id cmd, byte *pbSrc, int dwLen);
void dthread_send_delta(int pnum, _cmd_id cmd, std::unique_ptr<byte[]> data, uint32_t len);
void dthread_start();
void DThreadCleanup();

23
Source/msg.cpp

@ -1863,24 +1863,25 @@ void run_delta_info()
void DeltaExportData(int pnum)
{
if (sgbDeltaChanged) {
int size;
std::unique_ptr<byte[]> dst { new byte[sizeof(DLevel) + 1] };
byte *dstEnd;
for (int i = 0; i < NUMLEVELS; i++) {
dstEnd = &dst[1];
std::unique_ptr<byte[]> dst { new byte[sizeof(DLevel) + 1] };
byte *dstEnd = &dst.get()[1];
dstEnd = DeltaExportItem(dstEnd, sgLevels[i].item);
dstEnd = DeltaExportObject(dstEnd, sgLevels[i].object);
dstEnd = DeltaExportMonster(dstEnd, sgLevels[i].monster);
size = CompressData(dst.get(), dstEnd);
dthread_send_delta(pnum, static_cast<_cmd_id>(i + CMD_DLEVEL_0), dst.get(), size);
int size = CompressData(dst.get(), dstEnd);
dthread_send_delta(pnum, static_cast<_cmd_id>(i + CMD_DLEVEL_0), std::move(dst), size);
}
dstEnd = &dst[1];
std::unique_ptr<byte[]> dst { new byte[sizeof(DJunk) + 1] };
byte *dstEnd = &dst.get()[1];
dstEnd = DeltaExportJunk(dstEnd);
size = CompressData(dst.get(), dstEnd);
dthread_send_delta(pnum, CMD_DLEVEL_JUNK, dst.get(), size);
int size = CompressData(dst.get(), dstEnd);
dthread_send_delta(pnum, CMD_DLEVEL_JUNK, std::move(dst), size);
}
byte src { 0 };
dthread_send_delta(pnum, CMD_DLEVEL_END, &src, 1);
std::unique_ptr<byte[]> src { new byte[1] { static_cast<byte>(0) } };
dthread_send_delta(pnum, CMD_DLEVEL_END, std::move(src), 1);
}
void delta_init()

8
Source/multi.cpp

@ -24,6 +24,7 @@
#include "tmsg.h"
#include "utils/endian.hpp"
#include "utils/language.h"
#include "utils/stdcompat/cstddef.hpp"
namespace devilution {
@ -311,10 +312,11 @@ void ProcessTmsgs()
void SendPlayerInfo(int pnum, _cmd_id cmd)
{
PkPlayerStruct pkplr;
static_assert(alignof(PkPlayerStruct) == 1, "Fix pkplr alignment");
std::unique_ptr<byte[]> pkplr { new byte[sizeof(PkPlayerStruct)] };
PackPlayer(&pkplr, Players[MyPlayerId], true);
dthread_send_delta(pnum, cmd, (byte *)&pkplr, sizeof(pkplr));
PackPlayer(reinterpret_cast<PkPlayerStruct *>(pkplr.get()), Players[MyPlayerId], true);
dthread_send_delta(pnum, cmd, std::move(pkplr), sizeof(PkPlayerStruct));
}
dungeon_type InitLevelType(int l)

Loading…
Cancel
Save