2 #include "classlib/iobase/InetServerSocket.h"
3 #include "classlib/iobase/LocalServerSocket.h"
4 #include "classlib/iobase/Filename.h"
5 #include "classlib/sysapi/InetSocket.h"
6 #include "classlib/utils/TimeInfo.h"
7 #include "classlib/utils/StringList.h"
8 #include "classlib/utils/StringFormat.h"
9 #include "classlib/utils/StringOps.h"
10 #include "classlib/utils/SystemError.h"
11 #include "classlib/utils/Regexp.h"
26 #define MESSAGE_SIZE_LIMIT (1 * 1024 * 1024)
27 #define SOCKET_BUF_SIZE (1 * 1024 * 1024)
29 #define MESSAGE_SIZE_LIMIT (8 * 1024 * 1024)
30 #define SOCKET_BUF_SIZE (8 * 1024 * 1024)
32 #define SOCKET_READ_SIZE (SOCKET_BUF_SIZE / 8)
33 #define SOCKET_READ_GROWTH (SOCKET_BUF_SIZE)
37 static const Regexp
s_rxmeval(
"<(.*)>(i|f|s|qr)=(.*)</\\1>");
51 Time
now = Time::current();
52 return std::cout << now.format(
true,
"%Y-%m-%d %H:%M:%S.") << now.nanoformat(3, 3) <<
" " << appname_ <<
"[" << pid_
58 b->
data.insert(b->
data.end(), (
const unsigned char *)data, (
const unsigned char *)data + len);
80 for (
auto i = waiting_.begin(),
e = waiting_.end();
i !=
e;)
106 (*msg)->
next =
nullptr;
109 words[0] =
sizeof(words) + len;
110 words[1] = DQM_MSG_GET_OBJECT;
112 copydata(*msg, words,
sizeof(words));
113 copydata(*msg, name, len);
124 requestObjectData(owner, !name.empty() ? &name[0] :
nullptr, name.size());
126 waiting_.push_back(wo);
137 (*msg)->
next =
nullptr;
139 releaseFromWait(*msg, *i, o);
141 assert(i->peer->waiting > 0);
148 for (
auto i = waiting_.begin(),
e = waiting_.end();
i !=
e;)
150 releaseFromWait(
i++, o);
160 std::ostringstream qrs;
161 QReports::const_iterator qi, qe;
162 for (qi = qr.begin(), qe = qr.end(); qi != qe; ++qi) {
164 sprintf(buf,
"%d%c%n%.*g", qi->code, 0, &pos, DBL_DIG + 2, qi->qtresult);
165 qrs << buf <<
'\0' << buf + pos <<
'\0' << qi->qtname <<
'\0' << qi->algorithm <<
'\0' << qi->message <<
'\0'
174 const char *qdata = from;
204 qv.
code = atoi(qdata);
245 if (buf.Length() == buf.BufferSize())
249 Int_t pos = buf.Length();
250 TClass *
c = buf.ReadClass();
251 buf.SetBufferOffset(pos);
253 return c ? buf.ReadObject(c) : 0;
258 DQMNet::reconstructObject(Object &
o)
260 TBufferFile
buf(TBufferFile::kRead, o.rawdata.size(), &o.rawdata[0], kFALSE);
271 unpackQualityData(o.qreports, o.flags, o.qdata.c_str());
278 DQMNet::reinstateObject(
DQMStore *store, Object &o)
280 if (! reconstructObject (o))
286 switch (o.flags & DQM_PROP_TYPE_MASK)
288 case DQM_PROP_TYPE_INT:
289 obj = store->
bookInt(o.objname);
290 obj->
Fill(atoll(o.scalar.c_str()));
293 case DQM_PROP_TYPE_REAL:
295 obj->
Fill(atof(o.scalar.c_str()));
298 case DQM_PROP_TYPE_STRING:
302 case DQM_PROP_TYPE_TH1F:
303 obj = store->
book1D(
name, dynamic_cast<TH1F *>(o.object));
306 case DQM_PROP_TYPE_TH1S:
307 obj = store->
book1S(
name, dynamic_cast<TH1S *>(o.object));
310 case DQM_PROP_TYPE_TH1D:
311 obj = store->
book1DD(
name, dynamic_cast<TH1D *>(o.object));
314 case DQM_PROP_TYPE_TH2F:
315 obj = store->
book2D(
name, dynamic_cast<TH2F *>(o.object));
318 case DQM_PROP_TYPE_TH2S:
319 obj = store->
book2S(
name, dynamic_cast<TH2S *>(o.object));
322 case DQM_PROP_TYPE_TH2D:
323 obj = store->
book2DD(
name, dynamic_cast<TH2D *>(o.object));
326 case DQM_PROP_TYPE_TH3F:
327 obj = store->
book3D(
name, dynamic_cast<TH3F *>(o.object));
330 case DQM_PROP_TYPE_TH3S:
331 obj = store->book3S(
name, dynamic_cast<TH3S *>(o.object));
334 case DQM_PROP_TYPE_TH3D:
335 obj = store->book3DD(
name, dynamic_cast<TH3D *>(o.object));
338 case DQM_PROP_TYPE_PROF:
342 case DQM_PROP_TYPE_PROF2D:
348 <<
"ERROR: unexpected monitor element of type "
349 << (o.flags & DQM_PROP_TYPE_MASK) <<
" called '"
350 << o.dirname <<
'/' << o.objname <<
"'\n";
357 obj->data_.tag = o.tag;
358 obj->data_.qreports = o.qreports;
374 sendObjectToPeer(msg, *o,
true);
377 words[0] =
sizeof(words) + w.
name.size();
378 words[1] = DQM_REPLY_NONE;
379 words[2] = w.
name.size();
381 msg->
data.reserve(msg->
data.size() + words[0]);
382 copydata(msg, &words[0],
sizeof(words));
383 copydata(msg, &w.
name[0], w.
name.size());
391 uint32_t flags = o.
flags & ~DQM_PROP_DEAD;
394 if ((flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR)
401 uint32_t datalen = objdata.size();
402 uint32_t qlen = o.
qdata.size();
407 words[0] = 9 *
sizeof(uint32_t) + namelen + datalen + qlen;
408 words[1] = DQM_REPLY_OBJECT;
410 words[3] = (o.
version >> 0) & 0xffffffff;
411 words[4] = (o.
version >> 32) & 0xffffffff;
417 msg->
data.reserve(msg->
data.size() + words[0]);
418 copydata(msg, &words[0], 9 *
sizeof(uint32_t));
422 copydata(msg,
"/", 1);
426 copydata(msg, &objdata[0], datalen);
428 copydata(msg, &o.
qdata[0], qlen);
436 memcpy(&type, data +
sizeof(uint32_t),
sizeof(type));
438 case DQM_MSG_UPDATE_ME: {
439 if (len != 2 *
sizeof(uint32_t)) {
440 logme() <<
"ERROR: corrupt 'UPDATE_ME' message of length " << len <<
" from peer " << p->
peeraddr << std::endl;
445 logme() <<
"DEBUG: received message 'UPDATE ME' from peer " << p->
peeraddr <<
", size " << len << std::endl;
451 case DQM_MSG_LIST_OBJECTS: {
453 logme() <<
"DEBUG: received message 'LIST OBJECTS' from peer " << p->
peeraddr <<
", size " << len << std::endl;
456 sendObjectListToPeer(msg,
true,
false);
460 case DQM_MSG_GET_OBJECT: {
462 logme() <<
"DEBUG: received message 'GET OBJECT' from peer " << p->
peeraddr <<
", size " << len << std::endl;
464 if (len < 3 *
sizeof(uint32_t)) {
465 logme() <<
"ERROR: corrupt 'GET IMAGE' message of length " << len <<
" from peer " << p->
peeraddr << std::endl;
470 memcpy(&namelen, data + 2 *
sizeof(uint32_t),
sizeof(namelen));
471 if (len != 3 *
sizeof(uint32_t) + namelen) {
472 logme() <<
"ERROR: corrupt 'GET OBJECT' message of length " << len <<
" from peer " << p->
peeraddr
473 <<
", expected length " << (3 *
sizeof(uint32_t)) <<
" + " << namelen << std::endl;
478 Peer *owner =
nullptr;
479 Object *o = findObject(
nullptr, name, &owner);
481 o->
lastreq = Time::current().ns();
482 if ((o->
rawdata.empty() || (o->
flags & DQM_PROP_STALE)) &&
483 (o->
flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
484 waitForData(p, name,
"", owner);
486 sendObjectToPeer(msg, *o,
true);
489 words[0] =
sizeof(words) + name.size();
490 words[1] = DQM_REPLY_NONE;
491 words[2] = name.size();
493 msg->
data.reserve(msg->
data.size() + words[0]);
494 copydata(msg, &words[0],
sizeof(words));
495 copydata(msg, &name[0], name.size());
500 case DQM_REPLY_LIST_BEGIN: {
501 if (len != 4 *
sizeof(uint32_t)) {
502 logme() <<
"ERROR: corrupt 'LIST BEGIN' message of length " << len <<
" from peer " << p->
peeraddr << std::endl;
508 memcpy(&flags, data + 3 *
sizeof(uint32_t),
sizeof(uint32_t));
511 logme() <<
"DEBUG: received message 'LIST BEGIN " << (flags ?
"FULL" :
"INCREMENTAL") <<
"' from "
512 << p->
peeraddr <<
", size " << len << std::endl;
525 case DQM_REPLY_LIST_END: {
526 if (len != 4 *
sizeof(uint32_t)) {
527 logme() <<
"ERROR: corrupt 'LIST END' message of length " << len <<
" from peer " << p->
peeraddr << std::endl;
533 memcpy(&flags, data + 3 *
sizeof(uint32_t),
sizeof(uint32_t));
543 logme() <<
"DEBUG: received message 'LIST END " << (flags ?
"FULL" :
"INCREMENTAL") <<
"' from " << p->
peeraddr
544 <<
", size " << len << std::endl;
553 case DQM_REPLY_OBJECT: {
555 if (len <
sizeof(words)) {
556 logme() <<
"ERROR: corrupt 'OBJECT' message of length " << len <<
" from peer " << p->
peeraddr << std::endl;
560 memcpy(&words[0], data,
sizeof(words));
561 uint32_t &namelen = words[6];
562 uint32_t &datalen = words[7];
563 uint32_t &qlen = words[8];
565 if (len !=
sizeof(words) + namelen + datalen + qlen) {
566 logme() <<
"ERROR: corrupt 'OBJECT' message of length " << len <<
" from peer " << p->
peeraddr
567 <<
", expected length " <<
sizeof(words) <<
" + " << namelen <<
" + " << datalen <<
" + " << qlen
572 unsigned char *namedata = data +
sizeof(words);
573 unsigned char *objdata = namedata + namelen;
574 unsigned char *qdata = objdata + datalen;
575 unsigned char *enddata = qdata + qlen;
577 assert(enddata == data + len);
580 logme() <<
"DEBUG: received message 'OBJECT " << name <<
"' from " << p->
peeraddr <<
", size " << len
587 Object *o = findObject(p, name);
589 o = makeObject(p, name);
591 o->
flags = words[2] | DQM_PROP_NEW | DQM_PROP_RECEIVED;
596 if ((o->
flags & DQM_PROP_TYPE_MASK) <= DQM_PROP_TYPE_SCALAR) {
599 }
else if (datalen) {
602 }
else if (!o->
rawdata.empty())
603 o->
flags |= DQM_PROP_STALE;
604 o->
qdata.insert(o->
qdata.end(), qdata, enddata);
608 if (o->
lastreq && !datalen && (o->
flags & DQM_PROP_TYPE_MASK) > DQM_PROP_TYPE_SCALAR)
609 requestObjectData(p, (namelen ? &name[0] :
nullptr), namelen);
613 releaseWaiters(name, o);
617 case DQM_REPLY_NONE: {
619 if (len <
sizeof(words)) {
620 logme() <<
"ERROR: corrupt 'NONE' message of length " << len <<
" from peer " << p->
peeraddr << std::endl;
624 memcpy(&words[0], data,
sizeof(words));
625 uint32_t &namelen = words[2];
627 if (len !=
sizeof(words) + namelen) {
628 logme() <<
"ERROR: corrupt 'NONE' message of length " << len <<
" from peer " << p->
peeraddr
629 <<
", expected length " <<
sizeof(words) <<
" + " << namelen << std::endl;
633 unsigned char *namedata = data +
sizeof(words);
637 logme() <<
"DEBUG: received message 'NONE " << name <<
"' from " << p->
peeraddr <<
", size " << len
644 if (
Object *o = findObject(p, name)) {
645 o->flags |= DQM_PROP_DEAD;
650 releaseWaiters(name,
nullptr);
655 logme() <<
"ERROR: unrecognised message of length " << len <<
" and type " << type <<
" from peer " << p->
peeraddr
665 assert(getPeer(dynamic_cast<Socket *>(ev->source)) == p);
673 logme() <<
"WARNING: connection to the DQM server at " << p->
peeraddr
674 <<
" lost (will attempt to reconnect in 15 seconds)\n";
675 losePeer(
nullptr, p, ev);
677 losePeer(
"WARNING: lost peer connection ", p, ev);
687 const void *
data = (len ? (
const void *)&
b->data[p->
sendpos] : (
const void *)&
data);
691 done = (len ? ev->source->write(data, len) : 0);
693 logme() <<
"DEBUG: sent " << done <<
" bytes to peer " << p->
peeraddr << std::endl;
695 losePeer(
"WARNING: unable to write to peer ", p, ev, &e);
717 if (ev->events &
IORead) {
725 if ((sz = ev->source->read(&buf[0], buf.size()))) {
727 logme() <<
"DEBUG: received " << sz <<
" bytes from peer " << p->
peeraddr << std::endl;
729 if (data.capacity() < data.size() + sz)
731 data.insert(data.end(), &buf[0], &buf[0] + sz);
733 while (sz ==
sizeof(buf));
735 auto *
next =
dynamic_cast<SystemError *
>(e.next());
736 if (
next &&
next->portable() == SysErr::ErrTryAgain)
740 losePeer(
"WARNING: failed to read from peer ", p, ev, &e);
749 while (data.size() - consumed >=
sizeof(uint32_t) && p->
waiting < MAX_PEER_WAITREQS) {
751 memcpy(&msglen, &data[0] + consumed,
sizeof(msglen));
754 losePeer(
"WARNING: excessively large message from ", p, ev);
759 if (data.size() - consumed >= msglen) {
761 if (msglen < 2 *
sizeof(uint32_t)) {
762 logme() <<
"ERROR: corrupt peer message of length " << msglen <<
" from peer " << p->
peeraddr << std::endl;
768 valid = onMessage(&msg, p, &data[0] + consumed, msglen);
771 if (!msg.
data.empty()) {
774 prev = &(*prev)->
next;
777 (*prev)->
next =
nullptr;
783 losePeer(
"WARNING: data stream error with ", p, ev);
793 data.erase(data.begin(), data.begin() + consumed);
813 assert(ev->source == server_);
816 Socket *
s = server_->accept();
822 Peer *
p = createPeer(s);
824 if (
auto *inet = dynamic_cast<InetSocket *>(s)) {
825 InetAddress peeraddr = inet->peername();
826 InetAddress myaddr = inet->sockname();
827 p->
peeraddr = StringFormat(
"%1:%2").arg(peeraddr.hostname()).
arg(peeraddr.port()).
value();
828 localaddr = StringFormat(
"%1:%2").arg(myaddr.hostname()).
arg(myaddr.port()).
value();
829 }
else if (
auto *local = dynamic_cast<LocalSocket *>(s)) {
830 p->
peeraddr = local->peername().path();
831 localaddr = local->sockname().path();
840 logme() <<
"INFO: new peer " << p->
peeraddr <<
" is now connected to " << localaddr << std::endl;
860 unsigned char buf[1024];
861 while ((sz = ev->source->read(buf,
sizeof(buf))))
864 auto *
next =
dynamic_cast<SystemError *
>(e.next());
865 if (
next &&
next->portable() == SysErr::ErrTryAgain)
868 logme() <<
"WARNING: error reading from notification pipe: " << e.explain() << std::endl;
885 unsigned oldmask = p->
mask;
893 logme() <<
"DEBUG: updating mask for " << p->
peeraddr <<
" to " << p->
mask <<
" from " << oldmask << std::endl;
900 logme() <<
"INFO: connection closed to " << p->
peeraddr << std::endl;
901 losePeer(
nullptr, p,
nullptr);
908 appname_(appname.
empty() ?
"DQMNet" : appname.c_str()),
911 version_(Time::current()),
912 communicate_((pthread_t)-1),
915 waitStale_(0, 0, 0, 0, 500000000 ),
916 waitMax_(0, 0, 0, 5 , 0),
954 logme() <<
"ERROR: DQM server was already started.\n";
959 InetAddress
addr(
"0.0.0.0", port);
960 auto *
s =
new InetSocket(SOCK_STREAM, 0, addr.family());
965 s->setBlocking(
false);
970 logme() <<
"ERROR: Failed to start server at port " << port <<
": " << e.explain() << std::endl;
972 throw cms::Exception(
"DQMNet::startLocalServer") <<
"Failed to start server at port " <<
974 port <<
": " << e.explain().c_str();
977 logme() <<
"INFO: DQM server started at port " << port << std::endl;
985 logme() <<
"ERROR: DQM server was already started.\n";
990 server_ =
new LocalServerSocket(path, 10);
998 logme() <<
"ERROR: Failed to start server at path " << path <<
": " << e.explain() << std::endl;
1001 <<
"Failed to start server at path " << path <<
": " << e.explain().c_str();
1004 logme() <<
"INFO: DQM server started at path " << path << std::endl;
1052 pthread_sigmask(SIG_BLOCK, &sigs,
nullptr);
1060 pthread_mutex_lock(&
lock_);
1066 pthread_mutex_unlock(&
lock_);
1074 logme() <<
"ERROR: DQM networking thread has already been started\n";
1078 pthread_mutex_init(&
lock_,
nullptr);
1090 for (
auto ap : automatic) {
1094 if (!ap->host.empty() && !ap->peer && (now = Time::current()) > ap->next) {
1095 ap->
next = now + TimeSpan(0, 0, 0, 15 , 0);
1096 InetSocket *
s =
nullptr;
1098 InetAddress
addr(ap->host.c_str(), ap->port);
1099 s =
new InetSocket(SOCK_STREAM, 0,
addr.family());
1100 s->setBlocking(
false);
1105 auto *sys =
dynamic_cast<SystemError *
>(e.next());
1106 if (!sys || sys->portable() != SysErr::ErrOperationInProgress) {
1123 InetAddress peeraddr = ((InetSocket *)s)->peername();
1124 InetAddress myaddr = ((InetSocket *)s)->sockname();
1125 p->
peeraddr = StringFormat(
"%1:%2").arg(peeraddr.hostname()).
arg(peeraddr.port()).
value();
1140 logme() <<
"INFO: now connected to " << p->
peeraddr <<
" from " << myaddr.hostname() <<
":" << myaddr.port()
1148 now = Time::current();
1153 if (
flush_ && now > nextFlush) {
1155 nextFlush = now + TimeSpan(0, 0, 0, 15 , 0);
1174 if (
i->time < waitold) {
1175 logme() <<
"WARNING: source not responding in " << (waitMax_.ns() * 1
e-9) <<
"s to retrieval, releasing '"
1176 <<
i->name <<
"' from wait, have " << (o ? o->
rawdata.size() : 0) <<
" data available\n";
1179 logme() <<
"WARNING: source not responding in " << (waitStale_.ns() * 1
e-9) <<
"s to update, releasing '"
1180 <<
i->name <<
"' from wait, have " << o->
rawdata.size() <<
" data available\n";
1197 wakeup_.sink()->write(&byte, 1);
1214 std::pair<ObjectMap::iterator, bool>
info(
local_->objs.insert(o));
1219 auto &old =
const_cast<Object &
>(*
info.first);
1236 ObjectMap::iterator
i,
e;
1237 for (i =
local_->objs.begin(), e =
local_->objs.end(); i !=
e;) {
1239 path.reserve(i->dirname.size() + i->objname.size() + 2);
1245 if (!known.count(path))
1246 ++removed,
local_->objs.erase(i++);
edm::ErrorSummaryEntry Error
static void unpackQualityData(QReports &qr, uint32_t &flags, const char *from)
DQMNet(const std::string &appname="")
static const uint32_t DQM_PROP_REPORT_WARN
static const uint32_t DQM_MSG_UPDATE_ME
MonitorElement * bookFloat(TString const &name, FUNC onbooking=NOOP())
bool onLocalNotify(lat::IOSelectEvent *ev)
const edm::EventSetup & c
MonitorElement * bookProfile2D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, double lowZ, double highZ, char const *option="s", FUNC onbooking=NOOP())
MonitorElement * book2S(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
virtual void sendObjectListToPeers(bool all)=0
void lock()
Acquire a lock on the DQM net layer.
#define MESSAGE_SIZE_LIMIT
TObject * extractNextObject(TBufferFile &buf)
static void discard(Bucket *&b)
virtual void updatePeerMasks()=0
virtual Peer * createPeer(lat::Socket *s)=0
void setCurrentFolder(std::string const &fullpath) override
void releaseWaiters(const std::string &name, Object *o)
void staleObjectWaitLimit(lat::TimeSpan time)
static void * communicate(void *obj)
void shutdown()
Stop the network layer and wait it to finish.
MonitorElement * bookString(TString const &name, TString const &value, FUNC onbooking=NOOP())
MonitorElement * book1DD(TString const &name, TString const &title, int nchX, double lowX, double highX, FUNC onbooking=NOOP())
void waitForData(Peer *p, const std::string &name, const std::string &info, Peer *owner)
virtual bool shouldStop()
void requestObjectData(Peer *p, const char *name, size_t len)
Queue an object request to the data server.
MonitorElement * bookProfile(TString const &name, TString const &title, int nchX, double lowX, double highX, int, double lowY, double highY, char const *option="s", FUNC onbooking=NOOP())
void swap(edm::DataFrameContainer &lhs, edm::DataFrameContainer &rhs)
virtual Object * findObject(Peer *p, const std::string &name, Peer **owner=nullptr)=0
virtual bool onMessage(Bucket *msg, Peer *p, unsigned char *data, size_t len)
if(conf_.getParameter< bool >("UseStripCablingDB"))
bool onPeerData(lat::IOSelectEvent *ev, Peer *p)
Handle communication to a particular client.
MonitorElement * book1S(TString const &name, TString const &title, int nchX, double lowX, double highX, FUNC onbooking=NOOP())
static const uint32_t DQM_PROP_REPORT_ERROR
#define SOCKET_READ_GROWTH
static const uint32_t DQM_PROP_REPORT_OTHER
virtual void releaseFromWait(Bucket *msg, WaitObject &w, Object *o)
Peer * createPeer(lat::Socket *s) override
virtual void sendObjectToPeer(Bucket *msg, Object &o, bool data)
unsigned long long uint64_t
bool removeLocalExcept(const std::set< std::string > &known)
bool onPeerConnect(lat::IOSelectEvent *ev)
MonitorElement * book2D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
MonitorElement * bookInt(TString const &name, FUNC onbooking=NOOP())
void startLocalServer(int port)
static void packQualityData(std::string &into, const QReports &qr)
static const uint32_t DQM_PROP_STALE
MonitorElement * book2DD(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, FUNC onbooking=NOOP())
static const Regexp s_rxmeval("<(.*)>(i|f|s|qr)=(.*)</\\1>")
void losePeer(const char *reason, Peer *peer, lat::IOSelectEvent *event, lat::Error *err=nullptr)
char data[epos_bytes_allocation]
void unlock()
Release the lock on the DQM net layer.
void listenToCollector(const std::string &host, int port)
static const int STATUS_OK
static const uint32_t DQM_MSG_LIST_OBJECTS
static void copydata(Bucket *b, const void *data, size_t len)
void updateLocalObject(Object &o)
MonitorElement * book1D(TString const &name, TString const &title, int const nchX, double const lowX, double const highX, FUNC onbooking=NOOP())
std::vector< QValue > QReports
void reserveLocalSpace(uint32_t size)
Give a hint of how much capacity to allocate for local objects.
MonitorElement * book3D(TString const &name, TString const &title, int nchX, double lowX, double highX, int nchY, double lowY, double highY, int nchZ, double lowZ, double highZ, FUNC onbooking=NOOP())
void updateToCollector(const std::string &host, int port)
std::vector< unsigned char > DataBlob
tuple size
Write out results.
DQMBasicNet(const std::string &appname="")