4 #include "classlib/iobase/InetServerSocket.h"
5 #include "classlib/iobase/LocalServerSocket.h"
6 #include "classlib/iobase/Filename.h"
7 #include "classlib/sysapi/InetSocket.h"
8 #include "classlib/utils/TimeInfo.h"
9 #include "classlib/utils/StringList.h"
10 #include "classlib/utils/StringFormat.h"
11 #include "classlib/utils/StringOps.h"
12 #include "classlib/utils/SystemError.h"
13 #include "classlib/utils/Regexp.h"
26 # define MESSAGE_SIZE_LIMIT (1*1024*1024)
27 # define SOCKET_BUF_SIZE (1*1024*1024)
29 # define MESSAGE_SIZE_LIMIT (8*1024*1024)
30 # define SOCKET_BUF_SIZE (8*1024*1024)
32 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE/8)
33 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
37 static const Regexp
s_rxmeval(
"<(.*)>(i|f|s|qr)=(.*)</\\1>");
46 << now.format(
true,
"%Y-%m-%d %H:%M:%S.")
47 << now.nanoformat(3, 3)
48 <<
" " << appname_ <<
"[" << pid_ <<
"]: ";
56 (
const unsigned char *)data,
57 (
const unsigned char *)data + len);
85 << (err ?
"; error was: " + err->explain() :
std::string(
""))
90 for (WaitList::iterator
i = waiting_.begin(),
e = waiting_.end();
i !=
e; )
121 words[0] =
sizeof(words) + len;
122 words[1] = DQM_MSG_GET_OBJECT;
124 copydata(*msg, words,
sizeof(words));
125 copydata(*msg, name, len);
138 requestObjectData(owner, name.size() ? &name[0] : 0, name.size());
140 waiting_.push_back(wo);
155 releaseFromWait(*msg, *i, o);
157 assert(i->peer->waiting > 0);
166 for (WaitList::iterator
i = waiting_.begin(),
e = waiting_.end();
i !=
e; )
168 releaseFromWait(
i++, o);
180 std::ostringstream qrs;
181 QReports::const_iterator qi, qe;
182 for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi)
185 sprintf(buf,
"%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG+2, qi->qtresult);
188 << qi->qtname <<
'\0'
189 << qi->algorithm <<
'\0'
190 << qi->message <<
'\0'
201 const char *qdata = from;
208 while (*qdata) ++qdata; ++qdata;
209 while (*qdata) ++qdata; ++qdata;
210 while (*qdata) ++qdata; ++qdata;
211 while (*qdata) ++qdata; ++qdata;
212 while (*qdata) ++qdata; ++qdata;
223 qv.
code = atoi(qdata);
224 while (*qdata) ++qdata;
241 while (*qdata) ++qdata;
244 while (*qdata) ++qdata;
247 while (*qdata) ++qdata;
250 while (*qdata) ++qdata;
258 extractNextObject(TBufferFile &buf)
260 if (buf.Length() == buf.BufferSize())
264 Int_t pos = buf.Length();
265 TClass *
c = buf.ReadClass();
266 buf.SetBufferOffset(pos);
268 return c ? buf.ReadObject(c) : 0;
273 DQMNet::reconstructObject(Object &
o)
275 TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
279 if (! (o.object = extractNextObject(buf)))
283 o.reference = extractNextObject(buf);
286 unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
293 DQMNet::reinstateObject(
DQMStore *store, Object &o)
295 if (! reconstructObject (o))
301 switch (o.flags & DQM_PROP_TYPE_MASK)
303 case DQM_PROP_TYPE_INT:
304 obj = store->
bookInt(o.objname);
305 obj->
Fill(atoll(o.scalar.c_str()));
308 case DQM_PROP_TYPE_REAL:
310 obj->
Fill(atof(o.scalar.c_str()));
313 case DQM_PROP_TYPE_STRING:
317 case DQM_PROP_TYPE_TH1F:
318 obj = store->
book1D(
name, dynamic_cast<TH1F *>(o.object));
321 case DQM_PROP_TYPE_TH1S:
322 obj = store->
book1S(
name, dynamic_cast<TH1S *>(o.object));
325 case DQM_PROP_TYPE_TH1D:
326 obj = store->
book1DD(
name, dynamic_cast<TH1D *>(o.object));
329 case DQM_PROP_TYPE_TH2F:
330 obj = store->
book2D(
name, dynamic_cast<TH2F *>(o.object));
333 case DQM_PROP_TYPE_TH2S:
334 obj = store->
book2S(
name, dynamic_cast<TH2S *>(o.object));
337 case DQM_PROP_TYPE_TH2D:
338 obj = store->
book2DD(
name, dynamic_cast<TH2D *>(o.object));
341 case DQM_PROP_TYPE_TH3F:
342 obj = store->
book3D(
name, dynamic_cast<TH3F *>(o.object));
345 case DQM_PROP_TYPE_TH3S:
346 obj = store->book3S(
name, dynamic_cast<TH3S *>(o.object));
349 case DQM_PROP_TYPE_TH3D:
350 obj = store->book3DD(
name, dynamic_cast<TH3D *>(o.object));
353 case DQM_PROP_TYPE_PROF:
357 case DQM_PROP_TYPE_PROF2D:
363 <<
"ERROR: unexpected monitor element of type "
364 << (o.flags & DQM_PROP_TYPE_MASK) <<
" called '"
365 << *o.dirname <<
'/' << o.objname <<
"'\n";
395 sendObjectToPeer(msg, *o,
true);
399 words[0] =
sizeof(words) + w.
name.size();
400 words[1] = DQM_REPLY_NONE;
401 words[2] = w.
name.size();
403 msg->
data.reserve(msg->
data.size() + words[0]);
404 copydata(msg, &words[0],
sizeof(words));
405 copydata(msg, &w.
name[0], w.
name.size());
418 if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
419 objdata.insert(objdata.end(),
423 objdata.insert(objdata.end(),
429 uint32_t datalen = objdata.size();
430 uint32_t qlen = o.
qdata.size();
435 words[0] = 9*
sizeof(uint32_t) + namelen + datalen + qlen;
436 words[1] = DQM_REPLY_OBJECT;
438 words[3] = (o.
version >> 0 ) & 0xffffffff;
439 words[4] = (o.
version >> 32) & 0xffffffff;
445 msg->
data.reserve(msg->
data.size() + words[0]);
446 copydata(msg, &words[0], 9*
sizeof(uint32_t));
451 copydata(msg,
"/", 1);
455 copydata(msg, &objdata[0], datalen);
457 copydata(msg, &o.
qdata[0], qlen);
467 memcpy (&type, data +
sizeof(uint32_t),
sizeof (type));
470 case DQM_MSG_UPDATE_ME:
472 if (len != 2*
sizeof(uint32_t))
475 <<
"ERROR: corrupt 'UPDATE_ME' message of length " << len
476 <<
" from peer " << p->
peeraddr << std::endl;
482 <<
"DEBUG: received message 'UPDATE ME' from peer "
483 << p->
peeraddr <<
", size " << len << std::endl;
489 case DQM_MSG_LIST_OBJECTS:
493 <<
"DEBUG: received message 'LIST OBJECTS' from peer "
494 << p->
peeraddr <<
", size " << len << std::endl;
497 sendObjectListToPeer(msg,
true,
false);
501 case DQM_MSG_GET_OBJECT:
505 <<
"DEBUG: received message 'GET OBJECT' from peer "
506 << p->
peeraddr <<
", size " << len << std::endl;
508 if (len < 3*
sizeof(uint32_t))
511 <<
"ERROR: corrupt 'GET IMAGE' message of length " << len
512 <<
" from peer " << p->
peeraddr << std::endl;
517 memcpy (&namelen, data + 2*
sizeof(uint32_t),
sizeof(namelen));
518 if (len != 3*
sizeof(uint32_t) + namelen)
521 <<
"ERROR: corrupt 'GET OBJECT' message of length " << len
523 <<
", expected length " << (3*
sizeof(uint32_t))
524 <<
" + " << namelen << std::endl;
530 Object *o = findObject(0, name, &owner);
535 && (o->
flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
536 waitForData(p, name,
"", owner);
538 sendObjectToPeer(msg, *o,
true);
543 words[0] =
sizeof(words) + name.size();
544 words[1] = DQM_REPLY_NONE;
545 words[2] = name.size();
547 msg->
data.reserve(msg->
data.size() + words[0]);
548 copydata(msg, &words[0],
sizeof(words));
549 copydata(msg, &name[0], name.size());
554 case DQM_REPLY_LIST_BEGIN:
556 if (len != 4*
sizeof(uint32_t))
559 <<
"ERROR: corrupt 'LIST BEGIN' message of length " << len
560 <<
" from peer " << p->
peeraddr << std::endl;
566 memcpy(&flags, data + 3*
sizeof(uint32_t),
sizeof(uint32_t));
570 <<
"DEBUG: received message 'LIST BEGIN "
571 << (flags ?
"FULL" :
"INCREMENTAL")
573 <<
", size " << len << std::endl;
586 case DQM_REPLY_LIST_END:
588 if (len != 4*
sizeof(uint32_t))
591 <<
"ERROR: corrupt 'LIST END' message of length " << len
592 <<
" from peer " << p->
peeraddr << std::endl;
598 memcpy(&flags, data + 3*
sizeof(uint32_t),
sizeof(uint32_t));
609 <<
"DEBUG: received message 'LIST END "
610 << (flags ?
"FULL" :
"INCREMENTAL")
612 <<
", size " << len << std::endl;
621 case DQM_REPLY_OBJECT:
624 if (len <
sizeof(words))
627 <<
"ERROR: corrupt 'OBJECT' message of length " << len
628 <<
" from peer " << p->
peeraddr << std::endl;
632 memcpy (&words[0], data,
sizeof(words));
633 uint32_t &namelen = words[6];
634 uint32_t &datalen = words[7];
635 uint32_t &qlen = words[8];
637 if (len !=
sizeof(words) + namelen + datalen + qlen)
640 <<
"ERROR: corrupt 'OBJECT' message of length " << len
642 <<
", expected length " <<
sizeof(words)
650 unsigned char *namedata = data +
sizeof(words);
651 unsigned char *objdata = namedata + namelen;
652 unsigned char *qdata = objdata + datalen;
653 unsigned char *enddata = qdata + qlen;
655 assert (enddata == data + len);
659 <<
"DEBUG: received message 'OBJECT " << name
661 <<
", size " << len << std::endl;
667 Object *o = findObject(p, name);
669 o = makeObject(p, name);
671 o->
flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
676 if ((o->
flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
687 o->
flags |= DQM_PROP_STALE;
688 o->
qdata.insert(o->
qdata.end(), qdata, enddata);
694 && (o->
flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
695 requestObjectData(p, (namelen ? &name[0] : 0), namelen);
699 releaseWaiters(name, o);
706 if (len <
sizeof(words))
709 <<
"ERROR: corrupt 'NONE' message of length " << len
710 <<
" from peer " << p->
peeraddr << std::endl;
714 memcpy (&words[0], data,
sizeof(words));
715 uint32_t &namelen = words[2];
717 if (len !=
sizeof(words) + namelen)
720 <<
"ERROR: corrupt 'NONE' message of length " << len
722 <<
", expected length " <<
sizeof(words)
723 <<
" + " << namelen << std::endl;
727 unsigned char *namedata = data +
sizeof(words);
732 <<
"DEBUG: received message 'NONE " << name
734 <<
", size " << len << std::endl;
740 if (
Object *o = findObject(p, name))
742 o->flags |= DQM_PROP_DEAD;
747 releaseWaiters(name, 0);
753 <<
"ERROR: unrecognised message of length " << len
754 <<
" and type " << type <<
" from peer " << p->
peeraddr
766 assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p);
777 <<
"WARNING: connection to the DQM server at " << p->
peeraddr
778 <<
" lost (will attempt to reconnect in 15 seconds)\n";
782 losePeer(
"WARNING: lost peer connection ", p, ev);
794 const void *
data = (len ? (
const void *)&
b->data[p->
sendpos]
795 : (
const void *)&
data);
800 done = (len ? ev->source->write (data, len) : 0);
803 <<
"DEBUG: sent " << done <<
" bytes to peer "
808 losePeer(
"WARNING: unable to write to peer ", p, ev, &e);
841 if ((sz = ev->source->read(&buf[0], buf.size())))
845 <<
"DEBUG: received " << sz <<
" bytes from peer "
848 if (data.capacity () < data.size () + sz)
850 data.insert (data.end(), &buf[0], &buf[0] + sz);
852 while (sz ==
sizeof (buf));
856 SystemError *
next =
dynamic_cast<SystemError *
>(e.next());
857 if (next && next->portable() == SysErr::ErrTryAgain)
862 losePeer(
"WARNING: failed to read from peer ", p, ev, &e);
871 while (data.size()-consumed >=
sizeof(uint32_t)
872 && p->
waiting < MAX_PEER_WAITREQS)
875 memcpy (&msglen, &data[0]+consumed,
sizeof(msglen));
879 losePeer(
"WARNING: excessively large message from ", p, ev);
884 if (data.size()-consumed >= msglen)
887 if (msglen < 2*
sizeof(uint32_t))
890 <<
"ERROR: corrupt peer message of length " << msglen
891 <<
" from peer " << p->
peeraddr << std::endl;
899 valid = onMessage(&msg, p, &data[0]+consumed, msglen);
902 if (! msg.
data.empty())
906 prev = &(*prev)->
next;
916 losePeer(
"WARNING: data stream error with ", p, ev);
927 data.erase(data.begin(), data.begin()+consumed);
949 assert (ev->source == server_);
952 Socket *
s = server_->accept();
954 assert (! s->isBlocking());
958 Peer *
p = createPeer(s);
960 if (InetSocket *inet = dynamic_cast<InetSocket *>(s))
962 InetAddress peeraddr = inet->peername();
963 InetAddress myaddr = inet->sockname();
965 .arg(peeraddr.hostname())
966 .
arg(peeraddr.port());
967 localaddr = StringFormat(
"%1:%2")
968 .arg(myaddr.hostname())
971 else if (LocalSocket *local = dynamic_cast<LocalSocket *>(s))
973 p->
peeraddr = local->peername().path();
974 localaddr = local->sockname().path();
985 <<
"INFO: new peer " << p->
peeraddr <<
" is now connected to "
986 << localaddr << std::endl;
1009 unsigned char buf [1024];
1010 while ((sz = ev->source->read(buf,
sizeof(buf))))
1015 SystemError *
next =
dynamic_cast<SystemError *
>(e.next());
1016 if (next && next->portable() == SysErr::ErrTryAgain)
1020 <<
"WARNING: error reading from notification pipe: "
1021 << e.explain() << std::endl;
1040 unsigned oldmask = p->
mask;
1047 if (debug_ && oldmask != p->
mask)
1049 <<
"DEBUG: updating mask for " << p->
peeraddr <<
" to "
1050 << p->
mask <<
" from " << oldmask << std::endl;
1056 assert (! p->
sendq);
1058 logme() <<
"INFO: connection closed to " << p->
peeraddr << std::endl;
1066 appname_ (appname.
empty() ?
"DQMNet" : appname.c_str()),
1070 communicate_ ((pthread_t) -1),
1073 waitStale_ (0, 0, 0, 0, 500000000 ),
1074 waitMax_ (0, 0, 0, 5 , 0),
1129 logme() <<
"ERROR: DQM server was already started.\n";
1135 InetAddress addr(
"0.0.0.0", port);
1136 InetSocket *
s =
new InetSocket(SOCK_STREAM, 0, addr.family());
1141 s->setBlocking(
false);
1149 <<
"ERROR: Failed to start server at port " << port <<
": "
1150 << e.explain() << std::endl;
1152 raiseDQMError(
"DQMNet::startLocalServer",
"Failed to start server at port"
1153 " %d: %s", port, e.explain().c_str());
1156 logme() <<
"INFO: DQM server started at port " << port << std::endl;
1167 logme() <<
"ERROR: DQM server was already started.\n";
1173 server_ =
new LocalServerSocket(path, 10);
1184 <<
"ERROR: Failed to start server at path " << path <<
": "
1185 << e.explain() << std::endl;
1187 raiseDQMError(
"DQMNet::startLocalServer",
"Failed to start server at path"
1188 " %s: %s", path, e.explain().c_str());
1191 logme() <<
"INFO: DQM server started at path " << path << std::endl;
1203 <<
"ERROR: Already updating another collector at "
1222 <<
"ERROR: Already receiving data from another collector at "
1250 pthread_sigmask (SIG_BLOCK, &sigs, 0);
1260 pthread_mutex_lock(&
lock_);
1268 pthread_mutex_unlock(&
lock_);
1280 <<
"ERROR: DQM networking thread has already been started\n";
1284 pthread_mutex_init(&
lock_, 0);
1299 for (
int i = 0;
i < 2; ++
i)
1306 if (! ap->
host.empty()
1310 ap->
next = now + TimeSpan(0, 0, 0, 15 , 0);
1314 InetAddress addr(ap->
host.c_str(), ap->
port);
1315 s =
new InetSocket (SOCK_STREAM, 0, addr.family());
1316 s->setBlocking(
false);
1323 SystemError *sys =
dynamic_cast<SystemError *
>(e.next());
1324 if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
1343 InetAddress peeraddr = ((InetSocket *) s)->peername();
1344 InetAddress myaddr = ((InetSocket *) s)->sockname();
1345 p->
peeraddr = StringFormat(
"%1:%2")
1346 .arg(peeraddr.hostname())
1347 .
arg(peeraddr.port());
1365 <<
"INFO: now connected to " << p->
peeraddr <<
" from "
1366 << myaddr.hostname() <<
":" << myaddr.port() << std::endl;
1378 if (
flush_ && now > nextFlush)
1381 nextFlush = now + TimeSpan(0, 0, 0, 15 , 0);
1401 if (
i->time < waitold)
1404 <<
"WARNING: source not responding in " << (waitMax_.ns() * 1
e-9)
1405 <<
"s to retrieval, releasing '" <<
i->name <<
"' from wait, have "
1406 << (o ? o->
rawdata.size() : 0) <<
" data available\n";
1412 <<
"WARNING: source not responding in " << (waitStale_.ns() * 1
e-9)
1413 <<
"s to update, releasing '" <<
i->name <<
"' from wait, have "
1414 << o->
rawdata.size() <<
" data available\n";
1433 wakeup_.sink()->write(&byte, 1);
1449 local_->objs.resize(size);
1458 std::pair<ObjectMap::iterator, bool>
info(
local_->objs.insert(o));
1483 ObjectMap::iterator
i,
e;
1484 for (i =
local_->objs.begin(), e =
local_->objs.end(); i !=
e; )
1487 path.reserve(i->dirname->size() + i->objname.size() + 2);
1488 path += *i->dirname;
1493 if (! known.count(path))
1494 ++removed,
local_->objs.erase(i++);
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
DQMNet(const std::string &appname="")
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_MSG_UPDATE_ME
MonitorElement * book2S(const char *name, const char *title, int nchX, double lowX, double highX, int nchY, double lowY, double highY)
Book 2S histogram.
virtual bool shouldStop(void)
void shutdown(void)
Stop the network layer and wait it to finish.
void sendLocalChanges(void)
bool onLocalNotify(lat::IOSelectEvent *ev)
MonitorElement * book1D(const char *name, const char *title, int nchX, double lowX, double highX)
Book 1D histogram.
virtual void sendObjectListToPeers(bool all)=0
#define MESSAGE_SIZE_LIMIT
MonitorElement * book3D(const char *name, const char *title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, int nchZ, double lowZ, double highZ)
Book 3D histogram.
static void discard(Bucket *&b)
MonitorElement * book2DD(const char *name, const char *title, int nchX, double lowX, double highX, int nchY, double lowY, double highY)
Book 2D double histogram.
virtual Peer * createPeer(lat::Socket *s)=0
void releaseWaiters(const std::string &name, Object *o)
std::vector< Variable::Flags > flags
void staleObjectWaitLimit(lat::TimeSpan time)
const std::string * dirname
void lock(void)
Acquire a lock on the DQM net layer.
MonitorElement * book1DD(const char *name, const char *title, int nchX, double lowX, double highX)
Book 1S histogram.
static void * communicate(void *obj)
MonitorElement * bookFloat(const char *name)
Book float.
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
MonitorElement * bookString(const char *name, const char *value)
Book string.
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
std::vector< unsigned char > DataBlob
std::ostream & logme(void)
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
static const uint32_t DQM_PROP_REPORT_ERROR
#define SOCKET_READ_GROWTH
MonitorElement * bookProfile(const char *name, const char *title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, const char *option="s")
static const uint32_t DQM_PROP_REPORT_OTHER
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
void unlock(void)
Release the lock on the DQM net layer.
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
unsigned long long uint64_t
bool removeLocalExcept(const std::set< std::string > &known)
bool onPeerConnect(lat::IOSelectEvent *ev)
void startLocalServer(int port)
static void packQualityData(std::string &into, const QReports &qr)
static const uint32_t DQM_PROP_STALE
virtual void updatePeerMasks(void)=0
static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>")
char data[epos_bytes_allocation]
void listenToCollector(const std::string &host, int port)
static const int STATUS_OK
static const uint32_t DQM_MSG_LIST_OBJECTS
volatile std::atomic< bool > shutdown_flag false
static void copydata(Bucket *b, const void *data, size_t len)
void updateLocalObject(Object &o)
std::vector< QValue > QReports
MonitorElement * bookInt(const char *name)
Book int.
MonitorElement * book2D(const char *name, const char *title, int nchX, double lowX, double highX, int nchY, double lowY, double highY)
Book 2D histogram.
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
void updateToCollector(const std::string &host, int port)
MonitorElement * book1S(const char *name, const char *title, int nchX, double lowX, double highX)
Book 1S histogram.
tuple size
Write out results.
DQMBasicNet(const std::string &appname="")
void setCurrentFolder(const std::string &fullpath)
virtual Peer * createPeer(lat::Socket *s)
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0
MonitorElement * bookProfile2D(const char *name, const char *title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, int nchZ, double lowZ, double highZ, const char *option="s")
void raiseDQMError(const char *context, const char *fmt,...)