Skip to content

Commit

Permalink
clean up logging for navrecv so we can debug the dropped connection i…
Browse files Browse the repository at this point in the history
…ssue
  • Loading branch information
berthubert committed Dec 22, 2024
1 parent 461ae24 commit 6139ac9
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 18 deletions.
24 changes: 11 additions & 13 deletions navrecv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,7 @@ int getfd(const char* path, int mode, int permission)
std::advance(end, toErase);
fds.erase(fds.begin(), end);
}




FileID fid({path, mode, permission});
// cout<<"Request for "<<path<<endl;
auto iter = fds.find(fid);
Expand All @@ -121,7 +119,7 @@ int getfd(const char* path, int mode, int permission)
if(fd < 0) {
throw FatalException("Unable to open file for storage: "+string(strerror(errno)));
}
cout<<"Opened fd "<<fd<<" for path "<<path<<endl;
// cout<<"Opened fd "<<fd<<" for path "<<path<<endl;
fds.emplace(fid, FDID(fd));
return fd;
}
Expand Down Expand Up @@ -231,7 +229,7 @@ ClientKeeper g_ckeeper;
void recvSession2(Socket&& uns, ComboAddress client, ClientKeeper::Sentinel& sentinel)
{
string secret = SRead(uns, 8); // ignored for now
cerr << "Entering compressed session for "<<client.toStringWithPort()<<endl;
cerr << client.toStringWithPort()<< " Entering compressed session"<<endl;
ZStdReader zsr(uns);
int s = zsr.getFD();
// time_t start = time(0);
Expand All @@ -242,7 +240,7 @@ void recvSession2(Socket&& uns, ComboAddress client, ClientKeeper::Sentinel& sen
// sleep(10);
string num=SRead(s, 4);
if(num.empty()) {
cerr<<"EOF from "<<client.toStringWithPort()<<endl;
cerr<<client.toStringWithPort()<<" EOF"<<endl;
break;
}
string out="bert";
Expand All @@ -268,10 +266,9 @@ void recvSession2(Socket&& uns, ComboAddress client, ClientKeeper::Sentinel& sen
writeToDisk(nmm.localutcseconds(), nmm.sourceid(), out);

if(first) {
cerr<<"\tstation: "<<nmm.sourceid() << endl;
cerr<<client.toStringWithPort() <<" station: "<<nmm.sourceid() << endl;
first=false;
}


#ifdef __linux__
SSetsockopt(uns, IPPROTO_TCP, TCP_CORK, 1 );
Expand All @@ -287,15 +284,16 @@ void recvSession(int s, ComboAddress client)
try {
Socket sock(s); // this closes on destruction
SSetsockopt(s, SOL_SOCKET, SO_KEEPALIVE, 1); // saves file descriptors
cerr<<"Receiving messages from "<<client.toStringWithPort()<<endl;
cerr<<client.toStringWithPort()<<" New connection\n";
cerr.flush();
bool first=true;

ClientKeeper::Sentinel sentinel=g_ckeeper.reportClient(client);

for(int count=0;;++count) {
string part=SRead(sock, 4);
if(part.empty()) {
cerr<<"EOF from "<<client.toStringWithPort()<<endl;
cerr<<client.toStringWithPort()<<" EOF"<<endl;
break;
}
if(part != "bert") {
Expand Down Expand Up @@ -323,17 +321,17 @@ void recvSession(int s, ComboAddress client)
NavMonMessage nmm;
nmm.ParseFromString(part);
if(first) {
cerr<<"\tstation: "<<nmm.sourceid() << endl;
cerr<<client.toStringWithPort()<<" station "<<nmm.sourceid() << endl;
first=false;
}
sentinel.update(nmm.sourceid(), true);
writeToDisk(nmm.localutcseconds(), nmm.sourceid(), out);
}
}
catch(std::exception& e) {
cout<<"Error in receiving thread: "<<e.what()<<endl;
cout<<client.toStringWithPort()<<" error in receiving thread: "<<e.what()<<endl;
}
cout<<"Thread for "<<client.toStringWithPort()<< " exiting"<<endl;
cout<<client.toStringWithPort()<< " thread exiting"<<endl;
}

void recvListener(Socket&& s, ComboAddress local)
Expand Down
6 changes: 1 addition & 5 deletions zstdwrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void ZStdReader::worker()
input.pos=0;
int ret = read(d_sourcefd, (char*)input.src, inputcapacity);
if(ret <= 0) {
cerr<<"Got EOF on input fd "<<d_sourcefd<<", terminating thread"<<endl;
// cerr<<"Got EOF on input fd "<<d_sourcefd<<", terminating thread"<<endl;
break;
}
input.size = ret; // this is unsigned, so we need 'ret' to see the error
Expand Down Expand Up @@ -179,10 +179,6 @@ void ZStdReader::worker()

ZStdReader::~ZStdReader()
{
cerr<<"ZStdReader destructor called"<<endl;
int rc = close(d_readpipe);
cerr<<"Close rc = "<<rc<<endl;
cerr<<"Waiting on join"<<endl;
d_thread.join();
cerr<<"Done waiting on join"<<endl;
}

0 comments on commit 6139ac9

Please sign in to comment.