4 #include "classlib/iobase/InetServerSocket.h"
5 #include "classlib/iobase/LocalServerSocket.h"
6 #include "classlib/iobase/Filename.h"
7 #include "classlib/sysapi/InetSocket.h"
8 #include "classlib/utils/TimeInfo.h"
9 #include "classlib/utils/StringList.h"
10 #include "classlib/utils/StringFormat.h"
11 #include "classlib/utils/StringOps.h"
12 #include "classlib/utils/SystemError.h"
13 #include "classlib/utils/Regexp.h"
26 # define MESSAGE_SIZE_LIMIT (1*1024*1024)
27 # define SOCKET_BUF_SIZE (1*1024*1024)
29 # define MESSAGE_SIZE_LIMIT (8*1024*1024)
30 # define SOCKET_BUF_SIZE (8*1024*1024)
32 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE/8)
33 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
37 static const Regexp
s_rxmeval(
"<(.*)>(i|f|s|qr)=(.*)</\\1>");
46 << now.format(
true,
"%Y-%m-%d %H:%M:%S.")
47 << now.nanoformat(3, 3)
48 <<
" " << appname_ <<
"[" << pid_ <<
"]: ";
56 (
const unsigned char *)data,
57 (
const unsigned char *)data + len);
85 << (err ?
"; error was: " + err->explain() :
std::string(
""))
90 for (WaitList::iterator
i = waiting_.begin(),
e = waiting_.end();
i !=
e; )
121 words[0] =
sizeof(words) + len;
122 words[1] = DQM_MSG_GET_OBJECT;
124 copydata(*msg, words,
sizeof(words));
125 copydata(*msg, name, len);
138 requestObjectData(owner, name.size() ? &name[0] : 0, name.size());
140 waiting_.push_back(wo);
155 releaseFromWait(*msg, *i, o);
157 assert(i->peer->waiting > 0);
166 for (WaitList::iterator
i = waiting_.begin(),
e = waiting_.end();
i !=
e; )
168 releaseFromWait(
i++, o);
180 std::ostringstream qrs;
181 QReports::const_iterator qi, qe;
182 for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi)
185 sprintf(buf,
"%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG+2, qi->qtresult);
188 << qi->qtname <<
'\0'
189 << qi->algorithm <<
'\0'
190 << qi->message <<
'\0'
201 const char *qdata = from;
208 while (*qdata) ++qdata; ++qdata;
209 while (*qdata) ++qdata; ++qdata;
210 while (*qdata) ++qdata; ++qdata;
211 while (*qdata) ++qdata; ++qdata;
212 while (*qdata) ++qdata; ++qdata;
223 qv.
code = atoi(qdata);
224 while (*qdata) ++qdata;
241 while (*qdata) ++qdata;
244 while (*qdata) ++qdata;
247 while (*qdata) ++qdata;
250 while (*qdata) ++qdata;
260 if (buf.Length() == buf.BufferSize())
264 Int_t pos = buf.Length();
265 TClass *
c = buf.ReadClass();
266 buf.SetBufferOffset(pos);
268 return c ? buf.ReadObject(c) : 0;
273 DQMNet::reconstructObject(Object &
o)
275 TBufferFile buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
286 unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
293 DQMNet::reinstateObject(
DQMStore *store, Object &o)
295 if (! reconstructObject (o))
300 store->setCurrentFolder(*o.dirname);
301 switch (o.flags & DQM_PROP_TYPE_MASK)
303 case DQM_PROP_TYPE_INT:
304 obj = store->bookInt(o.objname);
305 obj->
Fill(atoll(o.scalar.c_str()));
308 case DQM_PROP_TYPE_REAL:
309 obj = store->bookFloat(
name);
310 obj->
Fill(atof(o.scalar.c_str()));
313 case DQM_PROP_TYPE_STRING:
314 obj = store->bookString(
name, o.scalar);
317 case DQM_PROP_TYPE_TH1F:
318 obj = store->book1D(
name, dynamic_cast<TH1F *>(o.object));
321 case DQM_PROP_TYPE_TH1S:
322 obj = store->book1S(
name, dynamic_cast<TH1S *>(o.object));
325 case DQM_PROP_TYPE_TH1D:
326 obj = store->book1DD(
name, dynamic_cast<TH1D *>(o.object));
329 case DQM_PROP_TYPE_TH2F:
330 obj = store->book2D(
name, dynamic_cast<TH2F *>(o.object));
333 case DQM_PROP_TYPE_TH2S:
334 obj = store->book2S(
name, dynamic_cast<TH2S *>(o.object));
337 case DQM_PROP_TYPE_TH2D:
338 obj = store->book2DD(
name, dynamic_cast<TH2D *>(o.object));
341 case DQM_PROP_TYPE_TH3F:
342 obj = store->book3D(
name, dynamic_cast<TH3F *>(o.object));
345 case DQM_PROP_TYPE_TH3S:
346 obj = store->book3S(
name, dynamic_cast<TH3S *>(o.object));
349 case DQM_PROP_TYPE_TH3D:
350 obj = store->book3DD(
name, dynamic_cast<TH3D *>(o.object));
353 case DQM_PROP_TYPE_PROF:
354 obj = store->bookProfile(
name, dynamic_cast<TProfile *>(o.object));
357 case DQM_PROP_TYPE_PROF2D:
358 obj = store->bookProfile2D(
name, dynamic_cast<TProfile2D *>(o.object));
363 <<
"ERROR: unexpected monitor element of type "
364 << (o.flags & DQM_PROP_TYPE_MASK) <<
" called '"
365 << *o.dirname <<
'/' << o.objname <<
"'\n";
395 sendObjectToPeer(msg, *o,
true);
399 words[0] =
sizeof(words) + w.
name.size();
400 words[1] = DQM_REPLY_NONE;
401 words[2] = w.
name.size();
403 msg->
data.reserve(msg->
data.size() + words[0]);
404 copydata(msg, &words[0],
sizeof(words));
405 copydata(msg, &w.
name[0], w.
name.size());
418 if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
419 objdata.insert(objdata.end(),
423 objdata.insert(objdata.end(),
429 uint32_t datalen = objdata.size();
430 uint32_t qlen = o.
qdata.size();
435 words[0] = 9*
sizeof(uint32_t) + namelen + datalen + qlen;
436 words[1] = DQM_REPLY_OBJECT;
438 words[3] = (o.
version >> 0 ) & 0xffffffff;
439 words[4] = (o.
version >> 32) & 0xffffffff;
445 msg->
data.reserve(msg->
data.size() + words[0]);
446 copydata(msg, &words[0], 9*
sizeof(uint32_t));
451 copydata(msg,
"/", 1);
455 copydata(msg, &objdata[0], datalen);
457 copydata(msg, &o.
qdata[0], qlen);
467 memcpy (&type, data +
sizeof(uint32_t),
sizeof (type));
470 case DQM_MSG_UPDATE_ME:
472 if (len != 2*
sizeof(uint32_t))
475 <<
"ERROR: corrupt 'UPDATE_ME' message of length " << len
476 <<
" from peer " << p->
peeraddr << std::endl;
482 <<
"DEBUG: received message 'UPDATE ME' from peer "
483 << p->
peeraddr <<
", size " << len << std::endl;
489 case DQM_MSG_LIST_OBJECTS:
493 <<
"DEBUG: received message 'LIST OBJECTS' from peer "
494 << p->
peeraddr <<
", size " << len << std::endl;
497 sendObjectListToPeer(msg,
true,
false);
501 case DQM_MSG_GET_OBJECT:
505 <<
"DEBUG: received message 'GET OBJECT' from peer "
506 << p->
peeraddr <<
", size " << len << std::endl;
508 if (len < 3*
sizeof(uint32_t))
511 <<
"ERROR: corrupt 'GET IMAGE' message of length " << len
512 <<
" from peer " << p->
peeraddr << std::endl;
517 memcpy (&namelen, data + 2*
sizeof(uint32_t),
sizeof(namelen));
518 if (len != 3*
sizeof(uint32_t) + namelen)
521 <<
"ERROR: corrupt 'GET OBJECT' message of length " << len
523 <<
", expected length " << (3*
sizeof(uint32_t))
524 <<
" + " << namelen << std::endl;
530 Object *o = findObject(0, name, &owner);
535 && (o->
flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
536 waitForData(p, name,
"", owner);
538 sendObjectToPeer(msg, *o,
true);
543 words[0] =
sizeof(words) + name.size();
544 words[1] = DQM_REPLY_NONE;
545 words[2] = name.size();
547 msg->
data.reserve(msg->
data.size() + words[0]);
548 copydata(msg, &words[0],
sizeof(words));
549 copydata(msg, &name[0], name.size());
554 case DQM_REPLY_LIST_BEGIN:
556 if (len != 4*
sizeof(uint32_t))
559 <<
"ERROR: corrupt 'LIST BEGIN' message of length " << len
560 <<
" from peer " << p->
peeraddr << std::endl;
566 memcpy(&flags, data + 3*
sizeof(uint32_t),
sizeof(uint32_t));
570 <<
"DEBUG: received message 'LIST BEGIN "
571 << (flags ?
"FULL" :
"INCREMENTAL")
573 <<
", size " << len << std::endl;
586 case DQM_REPLY_LIST_END:
588 if (len != 4*
sizeof(uint32_t))
591 <<
"ERROR: corrupt 'LIST END' message of length " << len
592 <<
" from peer " << p->
peeraddr << std::endl;
598 memcpy(&flags, data + 3*
sizeof(uint32_t),
sizeof(uint32_t));
609 <<
"DEBUG: received message 'LIST END "
610 << (flags ?
"FULL" :
"INCREMENTAL")
612 <<
", size " << len << std::endl;
621 case DQM_REPLY_OBJECT:
624 if (len <
sizeof(words))
627 <<
"ERROR: corrupt 'OBJECT' message of length " << len
628 <<
" from peer " << p->
peeraddr << std::endl;
632 memcpy (&words[0], data,
sizeof(words));
633 uint32_t &namelen = words[6];
634 uint32_t &datalen = words[7];
635 uint32_t &qlen = words[8];
637 if (len !=
sizeof(words) + namelen + datalen + qlen)
640 <<
"ERROR: corrupt 'OBJECT' message of length " << len
642 <<
", expected length " <<
sizeof(words)
650 unsigned char *namedata = data +
sizeof(words);
651 unsigned char *objdata = namedata + namelen;
652 unsigned char *qdata = objdata + datalen;
653 unsigned char *enddata = qdata + qlen;
655 assert (enddata == data + len);
659 <<
"DEBUG: received message 'OBJECT " << name
661 <<
", size " << len << std::endl;
667 Object *o = findObject(p, name);
669 o = makeObject(p, name);
671 o->
flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
676 if ((o->
flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
687 o->
flags |= DQM_PROP_STALE;
688 o->
qdata.insert(o->
qdata.end(), qdata, enddata);
694 && (o->
flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
695 requestObjectData(p, (namelen ? &name[0] : 0), namelen);
699 releaseWaiters(name, o);
706 if (len <
sizeof(words))
709 <<
"ERROR: corrupt 'NONE' message of length " << len
710 <<
" from peer " << p->
peeraddr << std::endl;
714 memcpy (&words[0], data,
sizeof(words));
715 uint32_t &namelen = words[2];
717 if (len !=
sizeof(words) + namelen)
720 <<
"ERROR: corrupt 'NONE' message of length " << len
722 <<
", expected length " <<
sizeof(words)
723 <<
" + " << namelen << std::endl;
727 unsigned char *namedata = data +
sizeof(words);
732 <<
"DEBUG: received message 'NONE " << name
734 <<
", size " << len << std::endl;
740 if (
Object *o = findObject(p, name))
742 o->flags |= DQM_PROP_DEAD;
747 releaseWaiters(name, 0);
753 <<
"ERROR: unrecognised message of length " << len
754 <<
" and type " << type <<
" from peer " << p->
peeraddr
766 assert (getPeer(dynamic_cast<Socket *> (ev->source)) == p);
777 <<
"WARNING: connection to the DQM server at " << p->
peeraddr
778 <<
" lost (will attempt to reconnect in 15 seconds)\n";
782 losePeer(
"WARNING: lost peer connection ", p, ev);
794 const void *
data = (len ? (
const void *)&
b->data[p->
sendpos]
795 : (
const void *)&
data);
800 done = (len ? ev->source->write (data, len) : 0);
803 <<
"DEBUG: sent " << done <<
" bytes to peer "
808 losePeer(
"WARNING: unable to write to peer ", p, ev, &e);
841 if ((sz = ev->source->read(&buf[0], buf.size())))
845 <<
"DEBUG: received " << sz <<
" bytes from peer "
848 if (data.capacity () < data.size () + sz)
850 data.insert (data.end(), &buf[0], &buf[0] + sz);
852 while (sz ==
sizeof (buf));
856 SystemError *
next =
dynamic_cast<SystemError *
>(e.next());
857 if (next && next->portable() == SysErr::ErrTryAgain)
862 losePeer(
"WARNING: failed to read from peer ", p, ev, &e);
871 while (data.size()-consumed >=
sizeof(uint32_t)
872 && p->
waiting < MAX_PEER_WAITREQS)
875 memcpy (&msglen, &data[0]+consumed,
sizeof(msglen));
879 losePeer(
"WARNING: excessively large message from ", p, ev);
884 if (data.size()-consumed >= msglen)
887 if (msglen < 2*
sizeof(uint32_t))
890 <<
"ERROR: corrupt peer message of length " << msglen
891 <<
" from peer " << p->
peeraddr << std::endl;
899 valid = onMessage(&msg, p, &data[0]+consumed, msglen);
902 if (! msg.
data.empty())
906 prev = &(*prev)->
next;
916 losePeer(
"WARNING: data stream error with ", p, ev);
927 data.erase(data.begin(), data.begin()+consumed);
949 assert (ev->source == server_);
952 Socket *
s = server_->accept();
954 assert (! s->isBlocking());
958 Peer *
p = createPeer(s);
960 if (InetSocket *inet = dynamic_cast<InetSocket *>(s))
962 InetAddress peeraddr = inet->peername();
963 InetAddress myaddr = inet->sockname();
965 .arg(peeraddr.hostname())
966 .
arg(peeraddr.port());
967 localaddr = StringFormat(
"%1:%2")
968 .arg(myaddr.hostname())
971 else if (LocalSocket *local = dynamic_cast<LocalSocket *>(s))
973 p->
peeraddr = local->peername().path();
974 localaddr = local->sockname().path();
985 <<
"INFO: new peer " << p->
peeraddr <<
" is now connected to "
986 << localaddr << std::endl;
1009 unsigned char buf [1024];
1010 while ((sz = ev->source->read(buf,
sizeof(buf))))
1015 SystemError *
next =
dynamic_cast<SystemError *
>(e.next());
1016 if (next && next->portable() == SysErr::ErrTryAgain)
1020 <<
"WARNING: error reading from notification pipe: "
1021 << e.explain() << std::endl;
1040 unsigned oldmask = p->
mask;
1047 if (debug_ && oldmask != p->
mask)
1049 <<
"DEBUG: updating mask for " << p->
peeraddr <<
" to "
1050 << p->
mask <<
" from " << oldmask << std::endl;
1058 logme() <<
"INFO: connection closed to " << p->
peeraddr << std::endl;
1066 appname_ (appname.
empty() ?
"DQMNet" : appname.c_str()),
1070 communicate_ ((pthread_t) -1),
1073 waitStale_ (0, 0, 0, 0, 500000000 ),
1074 waitMax_ (0, 0, 0, 5 , 0),
1129 logme() <<
"ERROR: DQM server was already started.\n";
1135 InetAddress addr(
"0.0.0.0", port);
1136 InetSocket *
s =
new InetSocket(SOCK_STREAM, 0, addr.family());
1141 s->setBlocking(
false);
1149 <<
"ERROR: Failed to start server at port " << port <<
": "
1150 << e.explain() << std::endl;
1152 raiseDQMError(
"DQMNet::startLocalServer",
"Failed to start server at port"
1153 " %d: %s", port, e.explain().c_str());
1156 logme() <<
"INFO: DQM server started at port " << port << std::endl;
1167 logme() <<
"ERROR: DQM server was already started.\n";
1173 server_ =
new LocalServerSocket(path, 10);
1184 <<
"ERROR: Failed to start server at path " << path <<
": "
1185 << e.explain() << std::endl;
1187 raiseDQMError(
"DQMNet::startLocalServer",
"Failed to start server at path"
1188 " %s: %s", path, e.explain().c_str());
1191 logme() <<
"INFO: DQM server started at path " << path << std::endl;
1203 <<
"ERROR: Already updating another collector at "
1222 <<
"ERROR: Already receiving data from another collector at "
1250 pthread_sigmask (SIG_BLOCK, &sigs, 0);
1260 pthread_mutex_lock(&
lock_);
1268 pthread_mutex_unlock(&
lock_);
1280 <<
"ERROR: DQM networking thread has already been started\n";
1284 pthread_mutex_init(&
lock_, 0);
1299 for (
int i = 0;
i < 2; ++
i)
1306 if (! ap->
host.empty()
1310 ap->
next = now + TimeSpan(0, 0, 0, 15 , 0);
1314 InetAddress addr(ap->
host.c_str(), ap->
port);
1315 s =
new InetSocket (SOCK_STREAM, 0, addr.family());
1316 s->setBlocking(
false);
1323 SystemError *sys =
dynamic_cast<SystemError *
>(e.next());
1324 if (! sys || sys->portable() != SysErr::ErrOperationInProgress)
1343 InetAddress peeraddr = ((InetSocket *) s)->peername();
1344 InetAddress myaddr = ((InetSocket *) s)->sockname();
1345 p->
peeraddr = StringFormat(
"%1:%2")
1346 .arg(peeraddr.hostname())
1347 .
arg(peeraddr.port());
1365 <<
"INFO: now connected to " << p->
peeraddr <<
" from "
1366 << myaddr.hostname() <<
":" << myaddr.port() << std::endl;
1378 if (
flush_ && now > nextFlush)
1381 nextFlush = now + TimeSpan(0, 0, 0, 15 , 0);
1401 if (
i->time < waitold)
1404 <<
"WARNING: source not responding in " << (waitMax_.ns() * 1
e-9)
1405 <<
"s to retrieval, releasing '" <<
i->name <<
"' from wait, have "
1406 << (o ? o->
rawdata.size() : 0) <<
" data available\n";
1412 <<
"WARNING: source not responding in " << (waitStale_.ns() * 1
e-9)
1413 <<
"s to update, releasing '" <<
i->name <<
"' from wait, have "
1414 << o->
rawdata.size() <<
" data available\n";
1433 wakeup_.sink()->write(&byte, 1);
1449 local_->objs.resize(size);
1458 std::pair<ObjectMap::iterator, bool>
info(
local_->objs.insert(o));
1483 ObjectMap::iterator
i,
e;
1484 for (i =
local_->objs.begin(), e =
local_->objs.end(); i !=
e; )
1487 path.reserve(i->dirname->size() + i->objname.size() + 2);
1488 path += *i->dirname;
1493 if (! known.count(path))
1494 ++removed,
local_->objs.erase(i++);
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
DQMNet(const std::string &appname="")
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_MSG_UPDATE_ME
virtual bool shouldStop(void)
void shutdown(void)
Stop the network layer and wait it to finish.
void sendLocalChanges(void)
bool onLocalNotify(lat::IOSelectEvent *ev)
virtual void sendObjectListToPeers(bool all)=0
#define MESSAGE_SIZE_LIMIT
TObject * extractNextObject(TBufferFile &buf)
static void discard(Bucket *&b)
virtual Peer * createPeer(lat::Socket *s)=0
void releaseWaiters(const std::string &name, Object *o)
std::vector< Variable::Flags > flags
void staleObjectWaitLimit(lat::TimeSpan time)
const std::string * dirname
void lock(void)
Acquire a lock on the DQM net layer.
static void * communicate(void *obj)
tuple path
else: Piece not in the list, fine.
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
std::vector< unsigned char > DataBlob
std::ostream & logme(void)
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
static const uint32_t DQM_PROP_REPORT_ERROR
#define SOCKET_READ_GROWTH
static const uint32_t DQM_PROP_REPORT_OTHER
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=0)
void unlock(void)
Release the lock on the DQM net layer.
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
unsigned long long uint64_t
bool removeLocalExcept(const std::set< std::string > &known)
bool onPeerConnect(lat::IOSelectEvent *ev)
void startLocalServer(int port)
static void packQualityData(std::string &into, const QReports &qr)
static const uint32_t DQM_PROP_STALE
virtual void updatePeerMasks(void)=0
static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>")
char data[epos_bytes_allocation]
void listenToCollector(const std::string &host, int port)
static const int STATUS_OK
static const uint32_t DQM_MSG_LIST_OBJECTS
volatile std::atomic< bool > shutdown_flag false
static void copydata(Bucket *b, const void *data, size_t len)
void updateLocalObject(Object &o)
std::vector< QValue > QReports
if(conf.exists("allCellsPositionCalc"))
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
void updateToCollector(const std::string &host, int port)
tuple size
Write out results.
DQMBasicNet(const std::string &appname="")
virtual Peer * createPeer(lat::Socket *s)
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=0)=0
void raiseDQMError(const char *context, const char *fmt,...)