2024-04-29 Fred Gleason <fredg@paravelsystems.com>

* Fixed a race condition in the PAD XML feed that could cause corrupt
	updates.

Signed-off-by: Fred Gleason <fredg@paravelsystems.com>
This commit is contained in:
Fred Gleason
2024-04-29 17:20:50 -04:00
parent d26762493b
commit f8dfc52cf0
8 changed files with 231 additions and 138 deletions

View File

@@ -27,64 +27,6 @@
#include "repeater.h"
MetadataSource::MetadataSource(QTcpSocket *sock)
{
meta_socket=sock;
meta_curly_count=0;
meta_quoted=false;
meta_committed=true;
}
QByteArray MetadataSource::buffer() const
{
return meta_buffer;
}
bool MetadataSource::appendBuffer(const QByteArray &data)
{
if(meta_committed) {
meta_buffer.clear();
}
meta_buffer+=data;
for(int i=0;i<data.size();i++) {
if(data.at(i)=='"') {
meta_quoted=!meta_quoted;
}
if(!meta_quoted) {
if(data.at(i)=='{') {
meta_curly_count++;
}
if(data.at(i)=='}') {
meta_curly_count--;
}
}
}
meta_committed=(meta_curly_count==0);
if(meta_committed) {
meta_buffer+="\n\n";
}
return meta_committed;
}
bool MetadataSource::isCommitted() const
{
return meta_committed;
}
QTcpSocket *MetadataSource::socket() const
{
return meta_socket;
}
Repeater::Repeater(const QString &src_unix_addr,uint16_t serv_port,
QObject *parent)
: QObject(parent)
@@ -110,10 +52,6 @@ Repeater::Repeater(const QString &src_unix_addr,uint16_t serv_port,
//
// Source Server
//
pad_source_ready_mapper=new QSignalMapper(this);
connect(pad_source_ready_mapper,SIGNAL(mapped(int)),
this,SLOT(sourceReadyReadData(int)));
pad_source_disconnect_mapper=new QSignalMapper(this);
connect(pad_source_disconnect_mapper,SIGNAL(mapped(int)),
this,SLOT(sourceDisconnected(int)));
@@ -148,7 +86,10 @@ void Repeater::newClientConnectionData()
pad_client_disconnect_mapper->setMapping(sock,sock->socketDescriptor());
pad_client_sockets[sock->socketDescriptor()]=sock;
SendState(sock->socketDescriptor());
for(QMap<int,RDJsonFramer *>::const_iterator it=pad_framers.begin();
it!=pad_framers.end();it++) {
sock->write(it.value()->currentDocument());
}
}
@@ -174,35 +115,21 @@ void Repeater::newSourceConnectionData()
(const char *)pad_source_server->errorString().toUtf8());
exit(1);
}
connect(sock,SIGNAL(readyRead()),pad_source_ready_mapper,SLOT(map()));
pad_source_ready_mapper->setMapping(sock,sock->socketDescriptor());
connect(sock,SIGNAL(disconnected()),pad_source_disconnect_mapper,SLOT(map()));
pad_source_disconnect_mapper->setMapping(sock,sock->socketDescriptor());
pad_sources[sock->socketDescriptor()]=new MetadataSource(sock);
}
void Repeater::sourceReadyReadData(int id)
{
if(pad_sources[id]!=NULL) {
if(pad_sources[id]->appendBuffer(pad_sources[id]->socket()->readAll())) {
for(QMap<int,QTcpSocket *>::const_iterator it=pad_client_sockets.begin();
it!=pad_client_sockets.end();it++) {
it.value()->write(pad_sources[id]->buffer());
}
}
}
RDJsonFramer *framer=new RDJsonFramer(sock,this);
connect(framer,SIGNAL(documentReceived(const QByteArray &)),
this,SLOT(sendUpdate(const QByteArray &)));
pad_framers[sock->socketDescriptor()]=framer;
}
void Repeater::sourceDisconnected(int id)
{
if(pad_sources.value(id)!=NULL) {
pad_sources.value(id)->socket()->deleteLater();
delete pad_sources.value(id);
pad_sources.remove(id);
if(pad_framers.value(id)!=NULL) {
pad_framers.value(id)->deleteLater();
pad_framers.remove(id);
}
else {
fprintf(stderr,"unknown source connection %d attempted to close\n",id);
@@ -210,12 +137,10 @@ void Repeater::sourceDisconnected(int id)
}
void Repeater::SendState(int id)
void Repeater::sendUpdate(const QByteArray &jdoc)
{
for(QMap<int,MetadataSource *>::const_iterator it=pad_sources.begin();
it!=pad_sources.end();it++) {
if(it.value()->isCommitted()&&(!it.value()->buffer().trimmed().isEmpty())) {
pad_client_sockets.value(id)->write(it.value()->buffer());
}
for(QMap<int,QTcpSocket *>::const_iterator it=pad_client_sockets.begin();
it!=pad_client_sockets.end();it++) {
it.value()->write(jdoc);
}
}

View File

@@ -27,30 +27,11 @@
#include <QTcpServer>
#include <QTcpSocket>
#include <rdjsonframer.h>
#include <rdunixserver.h>
#include "repeater.h"
class MetadataSource
{
public:
MetadataSource(QTcpSocket *sock);
QByteArray buffer() const;
bool appendBuffer(const QByteArray &data);
bool isCommitted() const;
QTcpSocket *socket() const;
private:
QByteArray meta_buffer;
int meta_curly_count;
bool meta_quoted;
bool meta_committed;
QTcpSocket *meta_socket;
};
class Repeater : public QObject
{
Q_OBJECT
@@ -63,20 +44,18 @@ class Repeater : public QObject
void newClientConnectionData();
void clientDisconnected(int id);
void newSourceConnectionData();
void sourceReadyReadData(int id);
void sourceDisconnected(int id);
void sendUpdate(const QByteArray &jdoc);
private:
void SendState(int id);
uint16_t pad_server_port;
QString pad_source_unix_address;
QSignalMapper *pad_client_disconnect_mapper;
QTcpServer *pad_client_server;
QMap<int,QTcpSocket *> pad_client_sockets;
QSignalMapper *pad_source_ready_mapper;
QSignalMapper *pad_source_disconnect_mapper;
RDUnixServer *pad_source_server;
QMap<int,MetadataSource *> pad_sources;
QMap<int,RDJsonFramer *> pad_framers;
};