@@ -41,7 +41,9 @@ void AnchorClient::handle_query_result(class MMDSTableRequest *m)
::decode(ino, p);
::decode(trace, p);
- assert(pending_lookup.count(ino));
+ if (!pending_lookup.count(ino))
+ return;
+
list<_pending_lookup> ls;
ls.swap(pending_lookup[ino]);
pending_lookup.erase(ino);
@@ -80,9 +82,12 @@ void AnchorClient::lookup(inodeno_t ino, vector<Anchor>& trace, Context *onfinis
void AnchorClient::_lookup(inodeno_t ino)
{
+ int ts = mds->mdsmap->get_tableserver();
+ if (mds->mdsmap->get_state(ts) < MDSMap::STATE_REJOIN)
+ return;
MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_QUERY, 0, 0);
::encode(ino, req->bl);
- mds->send_message_mds(req, mds->mdsmap->get_tableserver());
+ mds->send_message_mds(req, ts);
}
@@ -1050,7 +1050,7 @@ void MDS::handle_mds_map(MMDSMap *m)
for (set<int>::iterator p = failed.begin(); p != failed.end(); ++p)
if (oldfailed.count(*p) == 0) {
messenger->mark_down(oldmap->get_inst(*p).addr);
- mdcache->handle_mds_failure(*p);
+ handle_mds_failure(*p);
}
// or down then up?
@@ -1061,7 +1061,7 @@ void MDS::handle_mds_map(MMDSMap *m)
if (oldmap->have_inst(*p) &&
oldmap->get_inst(*p) != mdsmap->get_inst(*p)) {
messenger->mark_down(oldmap->get_inst(*p).addr);
- mdcache->handle_mds_failure(*p);
+ handle_mds_failure(*p);
}
}
if (is_clientreplay() || is_active() || is_stopping()) {
@@ -1548,6 +1548,16 @@ void MDS::handle_mds_recovery(int who)
waiting_for_active_peer.erase(who);
}
+void MDS::handle_mds_failure(int who)
+{
+ dout(5) << "handle_mds_failure mds." << who << dendl;
+
+ mdcache->handle_mds_failure(who);
+
+ anchorclient->handle_mds_failure(who);
+ snapclient->handle_mds_failure(who);
+}
+
void MDS::stopping_start()
{
dout(2) << "stopping_start" << dendl;
@@ -378,13 +378,15 @@ class MDS : public Dispatcher {
void rejoin_joint_start();
void rejoin_done();
void recovery_done();
- void handle_mds_recovery(int who);
void clientreplay_start();
void clientreplay_done();
void active_start();
void stopping_start();
void stopping_done();
+ void handle_mds_recovery(int who);
+ void handle_mds_failure(int who);
+
void suicide();
void respawn();
@@ -65,18 +65,15 @@ void MDSTableClient::handle_request(class MMDSTableRequest *m)
}
}
else if (pending_commit.count(tid)) {
- dout(10) << "stray agree on " << reqid
- << " tid " << tid
- << ", already committing, resending COMMIT"
- << dendl;
- MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_COMMIT, 0, tid);
- mds->send_message_mds(req, mds->mdsmap->get_tableserver());
+ dout(10) << "stray agree on " << reqid << " tid " << tid
+ << ", already committing, will resend COMMIT" << dendl;
+ assert(!server_ready);
+ // will re-send commit when receiving the server ready message
}
else {
- dout(10) << "stray agree on " << reqid
- << " tid " << tid
- << ", sending ROLLBACK"
- << dendl;
+ dout(10) << "stray agree on " << reqid << " tid " << tid
+ << ", sending ROLLBACK" << dendl;
+ assert(!server_ready);
MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_ROLLBACK, 0, tid);
mds->send_message_mds(req, mds->mdsmap->get_tableserver());
}
@@ -102,6 +99,9 @@ void MDSTableClient::handle_request(class MMDSTableRequest *m)
break;
case TABLESERVER_OP_SERVER_READY:
+ assert(!server_ready);
+ server_ready = true;
+
if (last_reqid == ~0ULL)
last_reqid = reqid;
@@ -144,26 +144,18 @@ void MDSTableClient::_prepare(bufferlist& mutation, version_t *ptid, bufferlist
uint64_t reqid = ++last_reqid;
dout(10) << "_prepare " << reqid << dendl;
- // send message
- MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_PREPARE, reqid);
- req->bl = mutation;
-
pending_prepare[reqid].mutation = mutation;
pending_prepare[reqid].ptid = ptid;
pending_prepare[reqid].pbl = pbl;
pending_prepare[reqid].onfinish = onfinish;
- send_to_tableserver(req);
-}
-
-void MDSTableClient::send_to_tableserver(MMDSTableRequest *req)
-{
- int ts = mds->mdsmap->get_tableserver();
- if (mds->mdsmap->get_state(ts) >= MDSMap::STATE_CLIENTREPLAY)
- mds->send_message_mds(req, ts);
- else {
- dout(10) << " deferring request to not-yet-active tableserver mds." << ts << dendl;
- }
+ if (server_ready) {
+ // send message
+ MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_PREPARE, reqid);
+ req->bl = mutation;
+ mds->send_message_mds(req, mds->mdsmap->get_tableserver());
+ } else
+ dout(10) << "tableserver is not ready yet, deferring request" << dendl;
}
void MDSTableClient::commit(version_t tid, LogSegment *ls)
@@ -176,9 +168,12 @@ void MDSTableClient::commit(version_t tid, LogSegment *ls)
assert(g_conf->mds_kill_mdstable_at != 4);
- // send message
- MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_COMMIT, 0, tid);
- send_to_tableserver(req);
+ if (server_ready) {
+ // send message
+ MMDSTableRequest *req = new MMDSTableRequest(table, TABLESERVER_OP_COMMIT, 0, tid);
+ mds->send_message_mds(req, mds->mdsmap->get_tableserver());
+ } else
+ dout(10) << "tableserver is not ready yet, deferring request" << dendl;
}
@@ -228,3 +223,12 @@ void MDSTableClient::resend_prepares()
mds->send_message_mds(req, mds->mdsmap->get_tableserver());
}
}
+
+void MDSTableClient::handle_mds_failure(int who)
+{
+ if (who != mds->mdsmap->get_tableserver())
+ return; // do nothing.
+
+ dout(7) << "tableserver mds." << who << " fails" << dendl;
+ server_ready = false;
+}
@@ -30,6 +30,8 @@ protected:
uint64_t last_reqid;
+ bool server_ready;
+
// prepares
struct _pending_prepare {
Context *onfinish;
@@ -63,7 +65,8 @@ protected:
void _logged_ack(version_t tid);
public:
- MDSTableClient(MDS *m, int tab) : mds(m), table(tab), last_reqid(~0ULL) {}
+ MDSTableClient(MDS *m, int tab) :
+ mds(m), table(tab), last_reqid(~0ULL), server_ready(false) {}
virtual ~MDSTableClient() {}
void handle_request(MMDSTableRequest *m);
@@ -85,7 +88,7 @@ public:
ack_waiters[tid].push_back(c);
}
- void send_to_tableserver(MMDSTableRequest *req);
+ void handle_mds_failure(int mds);
// child must implement
virtual void resend_queries() = 0;