This commit is contained in:
Dwight 2007-12-01 11:44:42 -05:00
parent 652bfc115a
commit fb7d5b718c
8 changed files with 34 additions and 25 deletions

View File

@ -143,7 +143,7 @@ void receivedDelete(Message& m) {
deleteObjects(ns, pattern, flags & 1);
}
void receivedQuery(Message& m) {
void receivedQuery(Message& m, stringstream& ss) {
DbMessage d(m);
const char *ns = d.getns();
int ntoreturn = d.pullInt();
@ -154,7 +154,7 @@ void receivedQuery(Message& m) {
d.nextJsObj().getFieldNames(*fields);
}
QueryResult* msgdata =
runQuery(ns, ntoreturn, query, fields);
runQuery(ns, ntoreturn, query, fields, ss);
Message resp;
resp.setData(msgdata, true);
dbMsgPort.reply(m, resp);
@ -186,7 +186,7 @@ void receivedInsert(Message& m) {
DbMessage d(m);
while( d.moreJSObjs() ) {
JSObj js = d.nextJsObj();
cout << " temp dbinsert: got js object, size=" << js.objsize() << " ns:" << d.getns() << endl;
// cout << " temp dbinsert: got js object, size=" << js.objsize() << " ns:" << d.getns() << endl;
theDataFileMgr.insert(d.getns(), (void*) js.objdata(), js.objsize());
}
}
@ -222,25 +222,31 @@ void run() {
testTheDb();
cout << curTimeMillis() % 10000 << " waiting for msg...\n" << endl;
Message m;
while( 1 ) {
m.reset();
//ss.clear();
stringstream ss;
// temp:
// sleepsecs(1);
cout << curTimeMillis() % 10000 << " waiting for msg..." << endl;
if( !dbMsgPort.recv(m) ) {
cout << "MessagingPort::recv() returned false" << endl;
break;
}
cout << curTimeMillis() % 10000 << " db.cpp got msg" << endl;
ss << curTimeMillis() % 10000 << ' ';
Timer t;
//cout << " got msg" << endl;
//cout << " op:" << m.data->operation << " len:" << m.data->len << endl;
if( m.data->operation == dbMsg ) {
ss << "msg ";
char *p = m.data->_data;
int len = strlen(p);
if( len > 400 )
@ -257,23 +263,30 @@ void run() {
}
}
else if( m.data->operation == dbQuery ) {
receivedQuery(m);
receivedQuery(m, ss);
}
else if( m.data->operation == dbInsert ) {
ss << "insert ";
receivedInsert(m);
}
else if( m.data->operation == dbUpdate ) {
ss << "update ";
receivedUpdate(m);
}
else if( m.data->operation == dbDelete ) {
ss << "remove ";
receivedDelete(m);
}
else if( m.data->operation == dbGetMore ) {
ss << "getmore ";
receivedGetMore(m);
}
else {
cout << " operation isn't supported ?" << endl;
}
ss << ' ' << t.millis() << "ms";
cout << ss.str().c_str() << endl;
}
}

View File

@ -328,7 +328,6 @@ void IndexDetails::getKeysFromObject(JSObj& obj, set<JSObj>& keys) {
JSObjBuilder b;
b.appendAs(e, f.fieldName());
JSObj o = b.doneAndDecouple();
// cout << "TEMP: got key " << o.toString() << endl;
keys.insert(o);
}
}
@ -396,11 +395,9 @@ void setDifference(set<JSObj>& l, set<JSObj>& r, vector<JSObj*> &diff) {
while( 1 ) {
if( i == l.end() )
break;
// cout << i->toString() << endl;
while( j != r.end() && *j < *i )
j++;
if( !i->woEqual(*j) ) {
// cout << "INDIFF:" << i->toString() << " j:" << j->toString() << endl;
const JSObj *j = &*i;
diff.push_back( (JSObj *) j );
}

View File

@ -165,10 +165,11 @@ void updateObjects(const char *ns, JSObj updateobj, JSObj pattern, bool upsert)
theDataFileMgr.insert(ns, (void*) updateobj.objdata(), updateobj.objsize());
}
QueryResult* runQuery(const char *ns, int ntoreturn, JSObj jsobj, auto_ptr< set<string> > filter) {
cout << "runQuery ns:" << ns << " ntoreturn:" << ntoreturn << " queryobjsize:" <<
jsobj.objsize();
QueryResult* runQuery(const char *ns, int ntoreturn, JSObj jsobj,
auto_ptr< set<string> > filter, stringstream& ss) {
ss << "query:" << ns << " ntoreturn:" << ntoreturn;
if( jsobj.objsize() > 100 )
ss << " querysz:" << jsobj.objsize();
BufBuilder b(32768);
@ -237,6 +238,7 @@ QueryResult* runQuery(const char *ns, int ntoreturn, JSObj jsobj, auto_ptr< set<
qr->_data[2] = 0;
qr->_data[3] = 0;
qr->len = b.len();
ss << " resLen:" << b.len();
// qr->channel = 0;
qr->operation = opReply;
qr->cursorId = cursorid;
@ -244,14 +246,14 @@ QueryResult* runQuery(const char *ns, int ntoreturn, JSObj jsobj, auto_ptr< set<
qr->nReturned = n;
b.decouple();
cout << " nReturned:" << n << endl;
ss << " nReturned:" << n;
return qr;
}
QueryResult* getMore(const char *ns, int ntoreturn, long long cursorid) {
cout << "getMore ns:" << ns << " ntoreturn:" << ntoreturn << " cursorid:" <<
cursorid << endl;
// cout << "getMore ns:" << ns << " ntoreturn:" << ntoreturn << " cursorid:" <<
// cursorid << endl;
BufBuilder b(32768);

View File

@ -62,7 +62,8 @@ struct QueryResult : public MsgData {
QueryResult* getMore(const char *ns, int ntoreturn, long long cursorid);
QueryResult* runQuery(const char *ns, int ntoreturn,
JSObj j, auto_ptr< set<string> > fieldFilter);
JSObj j, auto_ptr< set<string> > fieldFilter,
stringstream&);
void updateObjects(const char *ns, JSObj updateobj, JSObj pattern, bool upsert);
void deleteObjects(const char *ns, JSObj pattern, bool justOne);

View File

@ -117,7 +117,7 @@ void MessagingPort::say(int channel, SockAddr& to, Message& toSend, int response
int mss = conn.mtu(to) - FragHeader;
int left = toSend.data->len;
cout << "say() len:" << left << endl;
// cout << "say() len:" << left << endl;
int i = 0;
char *p = (char *) toSend.data;
while( left>0 ) {

View File

@ -90,8 +90,6 @@ inline ProtocolConnection::ProtocolConnection(ProtocolConnection& par, EndPoint&
first = true;
// todo: LOCK
cout << "TEMP2: count:" << pcMap.count(farEnd) << ' ' << farEnd.toString() << endl;
assert(pcMap.count(farEnd) == 0);
// pcMap[myEnd] = this;
}
@ -325,9 +323,7 @@ void receiverThread() {
continue;
}
cout << ".New connection accepted from " << fromAddr.toString() << endl;
cout << "TEMP1: count:" << pcMap.count(fromAddr) << ' ' << fromAddr.toString() << endl;
mypc = new ProtocolConnection(*startingConn, fromAddr);
cout << "TEMP3: count:" << pcMap.count(fromAddr) << ' ' << fromAddr.toString() << endl;
pcMap[fromAddr] = mypc;
}
else

View File

@ -128,7 +128,7 @@ void gotACK(F* fr, ProtocolConnection *pc) {
void MS::send() {
/* flow control */
cout << "send() to:" << to.toString() << endl;
// cout << "send() to:" << to.toString() << endl;
ptrace( cout << "..MS::send() pending=" << pc.cs.pendingSend.size() << endl; )
lock lk(biglock);

View File

@ -89,9 +89,9 @@ typedef boost::mutex::scoped_lock lock;
class Timer {
public:
Timer() { old = curTimeMicros(); }
int millis() { return micros() / 1000; }
int micros() {
unsigned n = curTimeMicros();
cout << "old:" << old << " new:" << n << endl;
return tdiff(old, n);
}
private: