experimental universe.sendWorldMessage function
This commit is contained in:
parent
cb19eef701
commit
5fa97741e5
@ -97,6 +97,7 @@ SET (star_core_HEADERS
|
|||||||
StarRandomPoint.hpp
|
StarRandomPoint.hpp
|
||||||
StarRect.hpp
|
StarRect.hpp
|
||||||
StarRpcPromise.hpp
|
StarRpcPromise.hpp
|
||||||
|
StarRpcThreadPromise.hpp
|
||||||
StarSectorArray2D.hpp
|
StarSectorArray2D.hpp
|
||||||
StarSecureRandom.hpp
|
StarSecureRandom.hpp
|
||||||
StarSet.hpp
|
StarSet.hpp
|
||||||
|
164
source/core/StarRpcThreadPromise.hpp
Normal file
164
source/core/StarRpcThreadPromise.hpp
Normal file
@ -0,0 +1,164 @@
|
|||||||
|
#ifndef STAR_RPC_THREAD_PROMISE_HPP
|
||||||
|
#define STAR_RPC_THREAD_PROMISE_HPP
|
||||||
|
|
||||||
|
#include "StarEither.hpp"
|
||||||
|
#include "StarString.hpp"
|
||||||
|
#include "StarThread.hpp"
|
||||||
|
|
||||||
|
// A thread-safe version of RpcPromise.
|
||||||
|
// This is just a copy-paste with a Mutex tacked on. I don't like that, but it's 11 PM.
|
||||||
|
|
||||||
|
namespace Star {
|
||||||
|
|
||||||
|
STAR_EXCEPTION(RpcThreadPromiseException, StarException);
|
||||||
|
|
||||||
|
template <typename Result, typename Error = String>
|
||||||
|
class RpcThreadPromiseKeeper {
|
||||||
|
public:
|
||||||
|
void fulfill(Result result);
|
||||||
|
void fail(Error error);
|
||||||
|
|
||||||
|
private:
|
||||||
|
template <typename ResultT, typename ErrorT>
|
||||||
|
friend class RpcThreadPromise;
|
||||||
|
|
||||||
|
function<void(Result)> m_fulfill;
|
||||||
|
function<void(Error)> m_fail;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename Result, typename Error = String>
|
||||||
|
class RpcThreadPromise {
|
||||||
|
public:
|
||||||
|
static pair<RpcThreadPromise, RpcThreadPromiseKeeper<Result, Error>> createPair();
|
||||||
|
static RpcThreadPromise createFulfilled(Result result);
|
||||||
|
static RpcThreadPromise createFailed(Error error);
|
||||||
|
|
||||||
|
// Has the respoonse either failed or succeeded?
|
||||||
|
bool finished() const;
|
||||||
|
// Has the response finished with success?
|
||||||
|
bool succeeded() const;
|
||||||
|
// Has the response finished with failure?
|
||||||
|
bool failed() const;
|
||||||
|
|
||||||
|
// Returns the result of the rpc call on success, nothing on failure or when
|
||||||
|
// not yet finished.
|
||||||
|
Maybe<Result> result() const;
|
||||||
|
|
||||||
|
// Returns the error of a failed rpc call. Returns nothing if the call is
|
||||||
|
// successful or not yet finished.
|
||||||
|
Maybe<Error> error() const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
template <typename ResultT, typename ErrorT>
|
||||||
|
friend class RpcThreadPromise;
|
||||||
|
|
||||||
|
struct Value {
|
||||||
|
Mutex mutex;
|
||||||
|
|
||||||
|
Maybe<Result> result;
|
||||||
|
Maybe<Error> error;
|
||||||
|
};
|
||||||
|
|
||||||
|
RpcThreadPromise() = default;
|
||||||
|
|
||||||
|
function<Value*()> m_getValue;
|
||||||
|
};
|
||||||
|
|
||||||
|
template <typename Result, typename Error>
|
||||||
|
void RpcThreadPromiseKeeper<Result, Error>::fulfill(Result result) {
|
||||||
|
m_fulfill(move(result));
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename Result, typename Error>
|
||||||
|
void RpcThreadPromiseKeeper<Result, Error>::fail(Error error) {
|
||||||
|
m_fail(move(error));
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename Result, typename Error>
|
||||||
|
pair<RpcThreadPromise<Result, Error>, RpcThreadPromiseKeeper<Result, Error>> RpcThreadPromise<Result, Error>::createPair() {
|
||||||
|
auto valuePtr = make_shared<Value>();
|
||||||
|
|
||||||
|
RpcThreadPromise promise;
|
||||||
|
promise.m_getValue = [valuePtr]() {
|
||||||
|
return valuePtr.get();
|
||||||
|
};
|
||||||
|
|
||||||
|
RpcThreadPromiseKeeper<Result, Error> keeper;
|
||||||
|
keeper.m_fulfill = [valuePtr](Result result) {
|
||||||
|
MutexLocker lock(valuePtr->mutex);
|
||||||
|
if (valuePtr->result || valuePtr->error)
|
||||||
|
throw RpcThreadPromiseException("fulfill called on already finished RpcThreadPromise");
|
||||||
|
valuePtr->result = move(result);
|
||||||
|
};
|
||||||
|
keeper.m_fail = [valuePtr](Error error) {
|
||||||
|
MutexLocker lock(valuePtr->mutex);
|
||||||
|
if (valuePtr->result || valuePtr->error)
|
||||||
|
throw RpcThreadPromiseException("fail called on already finished RpcThreadPromise");
|
||||||
|
valuePtr->error = move(error);
|
||||||
|
};
|
||||||
|
|
||||||
|
return {move(promise), move(keeper)};
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename Result, typename Error>
|
||||||
|
RpcThreadPromise<Result, Error> RpcThreadPromise<Result, Error>::createFulfilled(Result result) {
|
||||||
|
auto valuePtr = make_shared<Value>();
|
||||||
|
valuePtr->result = move(result);
|
||||||
|
|
||||||
|
RpcThreadPromise<Result, Error> promise;
|
||||||
|
promise.m_getValue = [valuePtr]() {
|
||||||
|
return valuePtr.get();
|
||||||
|
};
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename Result, typename Error>
|
||||||
|
RpcThreadPromise<Result, Error> RpcThreadPromise<Result, Error>::createFailed(Error error) {
|
||||||
|
auto valuePtr = make_shared<Value>();
|
||||||
|
valuePtr->error = move(error);
|
||||||
|
|
||||||
|
RpcThreadPromise<Result, Error> promise;
|
||||||
|
promise.m_getValue = [valuePtr]() {
|
||||||
|
return valuePtr.get();
|
||||||
|
};
|
||||||
|
return promise;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename Result, typename Error>
|
||||||
|
bool RpcThreadPromise<Result, Error>::finished() const {
|
||||||
|
auto val = m_getValue();
|
||||||
|
MutexLocker lock(val->mutex);
|
||||||
|
return val->result || val->error;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename Result, typename Error>
|
||||||
|
bool RpcThreadPromise<Result, Error>::succeeded() const {
|
||||||
|
auto val = m_getValue();
|
||||||
|
MutexLocker lock(val->mutex);
|
||||||
|
return val->result.isValid();
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename Result, typename Error>
|
||||||
|
bool RpcThreadPromise<Result, Error>::failed() const {
|
||||||
|
auto val = m_getValue();
|
||||||
|
MutexLocker lock(val->mutex);
|
||||||
|
return val->error.isValid();
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename Result, typename Error>
|
||||||
|
Maybe<Result> RpcThreadPromise<Result, Error>::result() const {
|
||||||
|
auto val = m_getValue();
|
||||||
|
MutexLocker lock(val->mutex);
|
||||||
|
return val->result;
|
||||||
|
}
|
||||||
|
|
||||||
|
template <typename Result, typename Error>
|
||||||
|
Maybe<Error> RpcThreadPromise<Result, Error>::error() const {
|
||||||
|
auto val = m_getValue();
|
||||||
|
MutexLocker lock(val->mutex);
|
||||||
|
return val->error;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
@ -258,6 +258,13 @@ void UniverseServer::setPvp(ConnectionId clientId, bool pvp) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RpcThreadPromise<Json> UniverseServer::sendWorldMessage(WorldId const& worldId, String const& message, JsonArray const& args) {
|
||||||
|
auto pair = RpcThreadPromise<Json>::createPair();
|
||||||
|
RecursiveMutexLocker locker(m_mainLock);
|
||||||
|
m_pendingWorldMessages[worldId].push_back({ message, args, pair.second });
|
||||||
|
return pair.first;
|
||||||
|
}
|
||||||
|
|
||||||
void UniverseServer::clientWarpPlayer(ConnectionId clientId, WarpAction action, bool deploy) {
|
void UniverseServer::clientWarpPlayer(ConnectionId clientId, WarpAction action, bool deploy) {
|
||||||
RecursiveMutexLocker locker(m_mainLock);
|
RecursiveMutexLocker locker(m_mainLock);
|
||||||
m_pendingPlayerWarps[clientId] = pair<WarpAction, bool>(move(action), move(deploy));
|
m_pendingPlayerWarps[clientId] = pair<WarpAction, bool>(move(action), move(deploy));
|
||||||
@ -502,6 +509,7 @@ void UniverseServer::run() {
|
|||||||
sendClientContextUpdates();
|
sendClientContextUpdates();
|
||||||
respondToCelestialRequests();
|
respondToCelestialRequests();
|
||||||
clearBrokenWorlds();
|
clearBrokenWorlds();
|
||||||
|
handleWorldMessages();
|
||||||
shutdownInactiveWorlds();
|
shutdownInactiveWorlds();
|
||||||
doTriggeredStorage();
|
doTriggeredStorage();
|
||||||
} catch (std::exception const& e) {
|
} catch (std::exception const& e) {
|
||||||
@ -1029,6 +1037,23 @@ void UniverseServer::clearBrokenWorlds() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void UniverseServer::handleWorldMessages() {
|
||||||
|
RecursiveMutexLocker locker(m_mainLock);
|
||||||
|
ReadLocker clientsLocker(m_clientsLock);
|
||||||
|
|
||||||
|
auto it = m_pendingWorldMessages.begin();
|
||||||
|
while (it != m_pendingWorldMessages.end()) {
|
||||||
|
auto& worldId = it->first;
|
||||||
|
if (auto worldPtr = triggerWorldCreation(worldId).value()) {
|
||||||
|
worldPtr->passMessages(move(it->second));
|
||||||
|
it = m_pendingWorldMessages.erase(it);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
++it;
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void UniverseServer::shutdownInactiveWorlds() {
|
void UniverseServer::shutdownInactiveWorlds() {
|
||||||
RecursiveMutexLocker locker(m_mainLock);
|
RecursiveMutexLocker locker(m_mainLock);
|
||||||
ReadLocker clientsLocker(m_clientsLock);
|
ReadLocker clientsLocker(m_clientsLock);
|
||||||
@ -1040,7 +1065,7 @@ void UniverseServer::shutdownInactiveWorlds() {
|
|||||||
world->stop();
|
world->stop();
|
||||||
Logger::error("UniverseServer: World {} has stopped due to an error", worldId);
|
Logger::error("UniverseServer: World {} has stopped due to an error", worldId);
|
||||||
worldDiedWithError(world->worldId());
|
worldDiedWithError(world->worldId());
|
||||||
} else if (world->clients().empty()) {
|
} else if (world->noClients()) {
|
||||||
bool anyPendingWarps = false;
|
bool anyPendingWarps = false;
|
||||||
for (auto const& p : m_pendingPlayerWarps) {
|
for (auto const& p : m_pendingPlayerWarps) {
|
||||||
if (resolveWarpAction(p.second.first, p.first, p.second.second).world == world->worldId()) {
|
if (resolveWarpAction(p.second.first, p.first, p.second.second).world == world->worldId()) {
|
||||||
@ -1048,7 +1073,8 @@ void UniverseServer::shutdownInactiveWorlds() {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!anyPendingWarps) {
|
|
||||||
|
if (!anyPendingWarps && world->shouldExpire()) {
|
||||||
Logger::info("UniverseServer: Stopping idle world {}", worldId);
|
Logger::info("UniverseServer: Stopping idle world {}", worldId);
|
||||||
world->stop();
|
world->stop();
|
||||||
}
|
}
|
||||||
|
@ -78,6 +78,8 @@ public:
|
|||||||
bool isPvp(ConnectionId clientId) const;
|
bool isPvp(ConnectionId clientId) const;
|
||||||
void setPvp(ConnectionId clientId, bool pvp);
|
void setPvp(ConnectionId clientId, bool pvp);
|
||||||
|
|
||||||
|
RpcThreadPromise<Json> sendWorldMessage(WorldId const& worldId, String const& message, JsonArray const& args = {});
|
||||||
|
|
||||||
void clientWarpPlayer(ConnectionId clientId, WarpAction action, bool deploy = false);
|
void clientWarpPlayer(ConnectionId clientId, WarpAction action, bool deploy = false);
|
||||||
void clientFlyShip(ConnectionId clientId, Vec3I const& system, SystemLocation const& location);
|
void clientFlyShip(ConnectionId clientId, Vec3I const& system, SystemLocation const& location);
|
||||||
WorldId clientWorld(ConnectionId clientId) const;
|
WorldId clientWorld(ConnectionId clientId) const;
|
||||||
@ -127,6 +129,7 @@ private:
|
|||||||
void respondToCelestialRequests();
|
void respondToCelestialRequests();
|
||||||
void processChat();
|
void processChat();
|
||||||
void clearBrokenWorlds();
|
void clearBrokenWorlds();
|
||||||
|
void handleWorldMessages();
|
||||||
void shutdownInactiveWorlds();
|
void shutdownInactiveWorlds();
|
||||||
void doTriggeredStorage();
|
void doTriggeredStorage();
|
||||||
|
|
||||||
@ -242,6 +245,7 @@ private:
|
|||||||
List<pair<WorldId, UniverseFlagAction>> m_pendingFlagActions;
|
List<pair<WorldId, UniverseFlagAction>> m_pendingFlagActions;
|
||||||
HashMap<ConnectionId, List<pair<String, ChatSendMode>>> m_pendingChat;
|
HashMap<ConnectionId, List<pair<String, ChatSendMode>>> m_pendingChat;
|
||||||
Maybe<WorkerPoolPromise<CelestialCoordinate>> m_nextRandomizedStarterWorld;
|
Maybe<WorkerPoolPromise<CelestialCoordinate>> m_nextRandomizedStarterWorld;
|
||||||
|
Map<WorldId, List<WorldServerThread::Message>> m_pendingWorldMessages;
|
||||||
|
|
||||||
List<TimeoutBan> m_tempBans;
|
List<TimeoutBan> m_tempBans;
|
||||||
};
|
};
|
||||||
|
@ -41,6 +41,7 @@ WorldServer::WorldServer(WorldTemplatePtr const& worldTemplate, IODevicePtr stor
|
|||||||
m_tileProtectionEnabled = true;
|
m_tileProtectionEnabled = true;
|
||||||
m_universeSettings = make_shared<UniverseSettings>();
|
m_universeSettings = make_shared<UniverseSettings>();
|
||||||
m_worldId = worldTemplate->worldName();
|
m_worldId = worldTemplate->worldName();
|
||||||
|
m_expiryTimer = GameTimer(0.0f);
|
||||||
|
|
||||||
init(true);
|
init(true);
|
||||||
writeMetadata();
|
writeMetadata();
|
||||||
@ -514,6 +515,10 @@ List<PacketPtr> WorldServer::getOutgoingPackets(ConnectionId clientId) {
|
|||||||
return move(clientInfo->outgoingPackets);
|
return move(clientInfo->outgoingPackets);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Maybe<Json> WorldServer::receiveMessage(ConnectionId fromConnection, String const& message, JsonArray const& args) {
|
||||||
|
return "what a wonderful world";
|
||||||
|
}
|
||||||
|
|
||||||
WorldServerFidelity WorldServer::fidelity() const {
|
WorldServerFidelity WorldServer::fidelity() const {
|
||||||
return m_fidelity;
|
return m_fidelity;
|
||||||
}
|
}
|
||||||
@ -523,6 +528,19 @@ void WorldServer::setFidelity(WorldServerFidelity fidelity) {
|
|||||||
m_fidelityConfig = m_serverConfig.get("fidelitySettings").get(WorldServerFidelityNames.getRight(m_fidelity));
|
m_fidelityConfig = m_serverConfig.get("fidelitySettings").get(WorldServerFidelityNames.getRight(m_fidelity));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool WorldServer::shouldExpire() {
|
||||||
|
if (!m_clientInfo.empty()) {
|
||||||
|
m_expiryTimer.reset();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return m_expiryTimer.ready();
|
||||||
|
}
|
||||||
|
|
||||||
|
void WorldServer::setExpiryTime(float expiryTime) {
|
||||||
|
m_expiryTimer = GameTimer(expiryTime);
|
||||||
|
}
|
||||||
|
|
||||||
void WorldServer::update(float dt) {
|
void WorldServer::update(float dt) {
|
||||||
++m_currentStep;
|
++m_currentStep;
|
||||||
for (auto const& pair : m_clientInfo)
|
for (auto const& pair : m_clientInfo)
|
||||||
@ -627,6 +645,8 @@ void WorldServer::update(float dt) {
|
|||||||
for (auto& pair : m_clientInfo)
|
for (auto& pair : m_clientInfo)
|
||||||
pair.second->pendingForward = false;
|
pair.second->pendingForward = false;
|
||||||
|
|
||||||
|
m_expiryTimer.tick(dt);
|
||||||
|
|
||||||
LogMap::set(strf("server_{}_entities", m_worldId), strf("{} in {} sectors", m_entityMap->size(), m_tileArray->loadedSectorCount()));
|
LogMap::set(strf("server_{}_entities", m_worldId), strf("{} in {} sectors", m_entityMap->size(), m_tileArray->loadedSectorCount()));
|
||||||
LogMap::set(strf("server_{}_time", m_worldId), strf("age = {:4.2f}, day = {:4.2f}/{:4.2f}s", epochTime(), timeOfDay(), dayLength()));
|
LogMap::set(strf("server_{}_time", m_worldId), strf("age = {:4.2f}, day = {:4.2f}/{:4.2f}s", epochTime(), timeOfDay(), dayLength()));
|
||||||
LogMap::set(strf("server_{}_active_liquid", m_worldId), m_liquidEngine->activeCells());
|
LogMap::set(strf("server_{}_active_liquid", m_worldId), m_liquidEngine->activeCells());
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
#include "StarLuaRoot.hpp"
|
#include "StarLuaRoot.hpp"
|
||||||
#include "StarWorldRenderData.hpp"
|
#include "StarWorldRenderData.hpp"
|
||||||
#include "StarWarping.hpp"
|
#include "StarWarping.hpp"
|
||||||
|
#include "StarRpcThreadPromise.hpp"
|
||||||
|
|
||||||
namespace Star {
|
namespace Star {
|
||||||
|
|
||||||
@ -98,6 +99,8 @@ public:
|
|||||||
void handleIncomingPackets(ConnectionId clientId, List<PacketPtr> const& packets);
|
void handleIncomingPackets(ConnectionId clientId, List<PacketPtr> const& packets);
|
||||||
List<PacketPtr> getOutgoingPackets(ConnectionId clientId);
|
List<PacketPtr> getOutgoingPackets(ConnectionId clientId);
|
||||||
|
|
||||||
|
Maybe<Json> receiveMessage(ConnectionId fromConnection, String const& message, JsonArray const& args);
|
||||||
|
|
||||||
void startFlyingSky(bool enterHyperspace, bool startInWarp);
|
void startFlyingSky(bool enterHyperspace, bool startInWarp);
|
||||||
void stopFlyingSkyAt(SkyParameters const& destination);
|
void stopFlyingSkyAt(SkyParameters const& destination);
|
||||||
void setOrbitalSky(SkyParameters const& destination);
|
void setOrbitalSky(SkyParameters const& destination);
|
||||||
@ -106,6 +109,9 @@ public:
|
|||||||
WorldServerFidelity fidelity() const;
|
WorldServerFidelity fidelity() const;
|
||||||
void setFidelity(WorldServerFidelity fidelity);
|
void setFidelity(WorldServerFidelity fidelity);
|
||||||
|
|
||||||
|
bool shouldExpire();
|
||||||
|
void setExpiryTime(float expiryTime);
|
||||||
|
|
||||||
void update(float dt);
|
void update(float dt);
|
||||||
|
|
||||||
ConnectionId connection() const override;
|
ConnectionId connection() const override;
|
||||||
@ -379,6 +385,8 @@ private:
|
|||||||
List<PhysicsForceRegion> m_forceRegions;
|
List<PhysicsForceRegion> m_forceRegions;
|
||||||
|
|
||||||
String m_worldId;
|
String m_worldId;
|
||||||
|
|
||||||
|
GameTimer m_expiryTimer;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -13,7 +13,8 @@ WorldServerThread::WorldServerThread(WorldServerPtr server, WorldId worldId)
|
|||||||
m_worldServer(move(server)),
|
m_worldServer(move(server)),
|
||||||
m_worldId(move(worldId)),
|
m_worldId(move(worldId)),
|
||||||
m_stop(false),
|
m_stop(false),
|
||||||
m_errorOccurred(false) {
|
m_errorOccurred(false),
|
||||||
|
m_shouldExpire(true) {
|
||||||
if (m_worldServer)
|
if (m_worldServer)
|
||||||
m_worldServer->setWorldId(printWorldId(m_worldId));
|
m_worldServer->setWorldId(printWorldId(m_worldId));
|
||||||
}
|
}
|
||||||
@ -50,6 +51,10 @@ bool WorldServerThread::serverErrorOccurred() {
|
|||||||
return m_errorOccurred;
|
return m_errorOccurred;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool WorldServerThread::shouldExpire() {
|
||||||
|
return m_shouldExpire;
|
||||||
|
}
|
||||||
|
|
||||||
bool WorldServerThread::spawnTargetValid(SpawnTarget const& spawnTarget) {
|
bool WorldServerThread::spawnTargetValid(SpawnTarget const& spawnTarget) {
|
||||||
try {
|
try {
|
||||||
RecursiveMutexLocker locker(m_mutex);
|
RecursiveMutexLocker locker(m_mutex);
|
||||||
@ -115,6 +120,12 @@ bool WorldServerThread::hasClient(ConnectionId clientId) const {
|
|||||||
return m_clients.contains(clientId);
|
return m_clients.contains(clientId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool WorldServerThread::noClients() const {
|
||||||
|
RecursiveMutexLocker locker(m_mutex);
|
||||||
|
return m_clients.empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
List<ConnectionId> WorldServerThread::erroredClients() const {
|
List<ConnectionId> WorldServerThread::erroredClients() const {
|
||||||
RecursiveMutexLocker locker(m_mutex);
|
RecursiveMutexLocker locker(m_mutex);
|
||||||
auto unerroredClients = HashSet<ConnectionId>::from(m_worldServer->clientIds());
|
auto unerroredClients = HashSet<ConnectionId>::from(m_worldServer->clientIds());
|
||||||
@ -165,6 +176,11 @@ void WorldServerThread::setUpdateAction(WorldServerAction updateAction) {
|
|||||||
m_updateAction = updateAction;
|
m_updateAction = updateAction;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void WorldServerThread::passMessages(List<Message>&& messages) {
|
||||||
|
RecursiveMutexLocker locker(m_messageMutex);
|
||||||
|
m_messages.appendAll(move(messages));
|
||||||
|
}
|
||||||
|
|
||||||
WorldChunks WorldServerThread::readChunks() {
|
WorldChunks WorldServerThread::readChunks() {
|
||||||
try {
|
try {
|
||||||
RecursiveMutexLocker locker(m_mutex);
|
RecursiveMutexLocker locker(m_mutex);
|
||||||
@ -256,12 +272,26 @@ void WorldServerThread::update(WorldServerFidelity fidelity) {
|
|||||||
if (!m_pause || *m_pause == false)
|
if (!m_pause || *m_pause == false)
|
||||||
m_worldServer->update(dt);
|
m_worldServer->update(dt);
|
||||||
|
|
||||||
|
List<Message> messages;
|
||||||
|
{
|
||||||
|
RecursiveMutexLocker locker(m_messageMutex);
|
||||||
|
messages = move(m_messages);
|
||||||
|
}
|
||||||
|
for (auto& message : messages) {
|
||||||
|
if (auto resp = m_worldServer->receiveMessage(ServerConnectionId, message.message, message.args))
|
||||||
|
message.promise.fulfill(*resp);
|
||||||
|
else
|
||||||
|
message.promise.fail("Message not handled by world");
|
||||||
|
}
|
||||||
|
|
||||||
for (auto& clientId : unerroredClientIds) {
|
for (auto& clientId : unerroredClientIds) {
|
||||||
auto outgoingPackets = m_worldServer->getOutgoingPackets(clientId);
|
auto outgoingPackets = m_worldServer->getOutgoingPackets(clientId);
|
||||||
RecursiveMutexLocker queueLocker(m_queueMutex);
|
RecursiveMutexLocker queueLocker(m_queueMutex);
|
||||||
m_outgoingPacketQueue[clientId].appendAll(move(outgoingPackets));
|
m_outgoingPacketQueue[clientId].appendAll(move(outgoingPackets));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
m_shouldExpire = m_worldServer->shouldExpire();
|
||||||
|
|
||||||
if (m_updateAction)
|
if (m_updateAction)
|
||||||
m_updateAction(this, m_worldServer.get());
|
m_updateAction(this, m_worldServer.get());
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
|
|
||||||
#include "StarWorldServer.hpp"
|
#include "StarWorldServer.hpp"
|
||||||
#include "StarThread.hpp"
|
#include "StarThread.hpp"
|
||||||
|
#include "StarRpcThreadPromise.hpp"
|
||||||
|
|
||||||
namespace Star {
|
namespace Star {
|
||||||
|
|
||||||
@ -13,6 +14,12 @@ STAR_CLASS(WorldServerThread);
|
|||||||
// the error and trigger the WorldServerThread error state.
|
// the error and trigger the WorldServerThread error state.
|
||||||
class WorldServerThread : public Thread {
|
class WorldServerThread : public Thread {
|
||||||
public:
|
public:
|
||||||
|
struct Message {
|
||||||
|
String message;
|
||||||
|
JsonArray args;
|
||||||
|
RpcThreadPromiseKeeper<Json> promise;
|
||||||
|
};
|
||||||
|
|
||||||
typedef function<void(WorldServerThread*, WorldServer*)> WorldServerAction;
|
typedef function<void(WorldServerThread*, WorldServer*)> WorldServerAction;
|
||||||
|
|
||||||
WorldServerThread(WorldServerPtr server, WorldId worldId);
|
WorldServerThread(WorldServerPtr server, WorldId worldId);
|
||||||
@ -28,6 +35,7 @@ public:
|
|||||||
// An exception occurred from the actual WorldServer itself and the
|
// An exception occurred from the actual WorldServer itself and the
|
||||||
// WorldServerThread has stopped running.
|
// WorldServerThread has stopped running.
|
||||||
bool serverErrorOccurred();
|
bool serverErrorOccurred();
|
||||||
|
bool shouldExpire();
|
||||||
|
|
||||||
bool spawnTargetValid(SpawnTarget const& spawnTarget);
|
bool spawnTargetValid(SpawnTarget const& spawnTarget);
|
||||||
|
|
||||||
@ -37,6 +45,7 @@ public:
|
|||||||
|
|
||||||
List<ConnectionId> clients() const;
|
List<ConnectionId> clients() const;
|
||||||
bool hasClient(ConnectionId clientId) const;
|
bool hasClient(ConnectionId clientId) const;
|
||||||
|
bool noClients() const;
|
||||||
|
|
||||||
// Clients that have caused an error with incoming packets are removed from
|
// Clients that have caused an error with incoming packets are removed from
|
||||||
// the world and no further packets are handled from them. They are still
|
// the world and no further packets are handled from them. They are still
|
||||||
@ -61,6 +70,9 @@ public:
|
|||||||
// also in a thread safe context.
|
// also in a thread safe context.
|
||||||
void setUpdateAction(WorldServerAction updateAction);
|
void setUpdateAction(WorldServerAction updateAction);
|
||||||
|
|
||||||
|
//
|
||||||
|
void passMessages(List<Message>&& messages);
|
||||||
|
|
||||||
// Syncs all active sectors to disk and reads the full content of the world
|
// Syncs all active sectors to disk and reads the full content of the world
|
||||||
// into memory, useful for the ship.
|
// into memory, useful for the ship.
|
||||||
WorldChunks readChunks();
|
WorldChunks readChunks();
|
||||||
@ -84,9 +96,13 @@ private:
|
|||||||
Map<ConnectionId, List<PacketPtr>> m_incomingPacketQueue;
|
Map<ConnectionId, List<PacketPtr>> m_incomingPacketQueue;
|
||||||
Map<ConnectionId, List<PacketPtr>> m_outgoingPacketQueue;
|
Map<ConnectionId, List<PacketPtr>> m_outgoingPacketQueue;
|
||||||
|
|
||||||
|
mutable RecursiveMutex m_messageMutex;
|
||||||
|
List<Message> m_messages;
|
||||||
|
|
||||||
atomic<bool> m_stop;
|
atomic<bool> m_stop;
|
||||||
shared_ptr<const atomic<bool>> m_pause;
|
shared_ptr<const atomic<bool>> m_pause;
|
||||||
mutable atomic<bool> m_errorOccurred;
|
mutable atomic<bool> m_errorOccurred;
|
||||||
|
mutable atomic<bool> m_shouldExpire;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,7 @@
|
|||||||
#include "StarBehaviorState.hpp"
|
#include "StarBehaviorState.hpp"
|
||||||
#include "StarSystemWorld.hpp"
|
#include "StarSystemWorld.hpp"
|
||||||
#include "StarDrawable.hpp"
|
#include "StarDrawable.hpp"
|
||||||
|
#include "StarRpcThreadPromise.hpp"
|
||||||
|
|
||||||
namespace Star {
|
namespace Star {
|
||||||
|
|
||||||
@ -33,6 +34,14 @@ struct LuaUserDataMethods<RpcPromise<T>> {
|
|||||||
static LuaMethods<RpcPromise<T>> make();
|
static LuaMethods<RpcPromise<T>> make();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
struct LuaConverter<RpcThreadPromise<T>> : LuaUserDataConverter<RpcThreadPromise<T>> {};
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
struct LuaUserDataMethods<RpcThreadPromise<T>> {
|
||||||
|
static LuaMethods<RpcThreadPromise<T>> make();
|
||||||
|
};
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
struct LuaConverter<PlatformerAStar::Path> {
|
struct LuaConverter<PlatformerAStar::Path> {
|
||||||
static LuaValue from(LuaEngine& engine, PlatformerAStar::Path const& path);
|
static LuaValue from(LuaEngine& engine, PlatformerAStar::Path const& path);
|
||||||
@ -115,6 +124,16 @@ LuaMethods<RpcPromise<T>> LuaUserDataMethods<RpcPromise<T>>::make() {
|
|||||||
return methods;
|
return methods;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
template <typename T>
|
||||||
|
LuaMethods<RpcThreadPromise<T>> LuaUserDataMethods<RpcThreadPromise<T>>::make() {
|
||||||
|
LuaMethods<RpcThreadPromise<T>> methods;
|
||||||
|
methods.template registerMethodWithSignature<bool, RpcThreadPromise<T>&>("finished", mem_fn(&RpcThreadPromise<T>::finished));
|
||||||
|
methods.template registerMethodWithSignature<bool, RpcThreadPromise<T>&>("succeeded", mem_fn(&RpcThreadPromise<T>::succeeded));
|
||||||
|
methods.template registerMethodWithSignature<Maybe<T>, RpcThreadPromise<T>&>("result", mem_fn(&RpcThreadPromise<T>::result));
|
||||||
|
methods.template registerMethodWithSignature<Maybe<String>, RpcThreadPromise<T>&>("error", mem_fn(&RpcThreadPromise<T>::error));
|
||||||
|
return methods;
|
||||||
|
}
|
||||||
|
|
||||||
template <>
|
template <>
|
||||||
struct LuaConverter<Collection> {
|
struct LuaConverter<Collection> {
|
||||||
static LuaValue from(LuaEngine& engine, Collection const& c);
|
static LuaValue from(LuaEngine& engine, Collection const& c);
|
||||||
|
@ -18,6 +18,7 @@ LuaCallbacks LuaBindings::makeUniverseServerCallbacks(UniverseServer* universe)
|
|||||||
callbacks.registerCallbackWithSignature<bool, ConnectionId>("isAdmin", bind(UniverseServerCallbacks::isAdmin, universe, _1));
|
callbacks.registerCallbackWithSignature<bool, ConnectionId>("isAdmin", bind(UniverseServerCallbacks::isAdmin, universe, _1));
|
||||||
callbacks.registerCallbackWithSignature<bool, ConnectionId>("isPvp", bind(UniverseServerCallbacks::isPvp, universe, _1));
|
callbacks.registerCallbackWithSignature<bool, ConnectionId>("isPvp", bind(UniverseServerCallbacks::isPvp, universe, _1));
|
||||||
callbacks.registerCallbackWithSignature<void, ConnectionId, bool>("setPvp", bind(UniverseServerCallbacks::setPvp, universe, _1, _2));
|
callbacks.registerCallbackWithSignature<void, ConnectionId, bool>("setPvp", bind(UniverseServerCallbacks::setPvp, universe, _1, _2));
|
||||||
|
callbacks.registerCallbackWithSignature<RpcThreadPromise<Json>, LuaEngine&, String, String, LuaVariadic<Json>>("sendWorldMessage", bind(UniverseServerCallbacks::sendWorldMessage, universe, _1, _2, _3, _4));
|
||||||
|
|
||||||
return callbacks;
|
return callbacks;
|
||||||
}
|
}
|
||||||
@ -107,4 +108,8 @@ void LuaBindings::UniverseServerCallbacks::setPvp(UniverseServer* universe, Conn
|
|||||||
universe->setPvp(client, setPvpTo);
|
universe->setPvp(client, setPvpTo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RpcThreadPromise<Json> LuaBindings::UniverseServerCallbacks::sendWorldMessage(UniverseServer* universe, LuaEngine& engine, String const& worldId, String const& message, LuaVariadic<Json> args) {
|
||||||
|
return universe->sendWorldMessage(parseWorldId(worldId), message, JsonArray::from(move(args)));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@
|
|||||||
|
|
||||||
#include "StarLua.hpp"
|
#include "StarLua.hpp"
|
||||||
#include "StarGameTypes.hpp"
|
#include "StarGameTypes.hpp"
|
||||||
|
#include "StarRpcThreadPromise.hpp"
|
||||||
|
|
||||||
namespace Star {
|
namespace Star {
|
||||||
|
|
||||||
@ -22,6 +23,7 @@ namespace LuaBindings {
|
|||||||
bool isAdmin(UniverseServer* universe, ConnectionId arg1);
|
bool isAdmin(UniverseServer* universe, ConnectionId arg1);
|
||||||
bool isPvp(UniverseServer* universe, ConnectionId arg1);
|
bool isPvp(UniverseServer* universe, ConnectionId arg1);
|
||||||
void setPvp(UniverseServer* universe, ConnectionId arg1, Maybe<bool> arg2);
|
void setPvp(UniverseServer* universe, ConnectionId arg1, Maybe<bool> arg2);
|
||||||
|
RpcThreadPromise<Json> sendWorldMessage(UniverseServer* universe, LuaEngine& engine, String const& worldId, String const& message, LuaVariadic<Json> args);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -390,6 +390,9 @@ namespace LuaBindings {
|
|||||||
callbacks.registerCallbackWithSignature<void, double>("setSkyTime", [serverWorld](double skyTime) {
|
callbacks.registerCallbackWithSignature<void, double>("setSkyTime", [serverWorld](double skyTime) {
|
||||||
return serverWorld->sky()->setEpochTime(skyTime);
|
return serverWorld->sky()->setEpochTime(skyTime);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
callbacks.registerCallback("setExpiryTime", [serverWorld](float expiryTime) { serverWorld->setExpiryTime(expiryTime); });
|
||||||
|
|
||||||
callbacks.registerCallback("flyingType", [serverWorld]() -> String { return FlyingTypeNames.getRight(serverWorld->sky()->flyingType()); });
|
callbacks.registerCallback("flyingType", [serverWorld]() -> String { return FlyingTypeNames.getRight(serverWorld->sky()->flyingType()); });
|
||||||
callbacks.registerCallback("warpPhase", [serverWorld]() -> String { return WarpPhaseNames.getRight(serverWorld->sky()->warpPhase()); });
|
callbacks.registerCallback("warpPhase", [serverWorld]() -> String { return WarpPhaseNames.getRight(serverWorld->sky()->warpPhase()); });
|
||||||
callbacks.registerCallback("setUniverseFlag", [serverWorld](String flagName) { return serverWorld->universeSettings()->setFlag(flagName); });
|
callbacks.registerCallback("setUniverseFlag", [serverWorld](String flagName) { return serverWorld->universeSettings()->setFlag(flagName); });
|
||||||
|
Loading…
Reference in New Issue
Block a user