Please note: One day after squid 5.4 was released, this extensive patch followed which I publish separately. It seems to be important - perhaps we should consider taking this one in account.
Building and testing was ok - so far no problems or crashes occurred. Running normal.
Signed-off-by: Matthias Fischer matthias.fischer@ipfire.org --- lfs/squid | 1 + ...estinationsEnd_exception_opening_967.patch | 3825 +++++++++++++++++ 2 files changed, 3826 insertions(+) create mode 100644 src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch
diff --git a/lfs/squid b/lfs/squid index 034f4a3e9..6762a0218 100644 --- a/lfs/squid +++ b/lfs/squid @@ -78,6 +78,7 @@ $(TARGET) : $(patsubst %,$(DIR_DL)/%,$(objects)) @rm -rf $(DIR_APP) && cd $(DIR_SRC) && tar xaf $(DIR_DL)/$(DL_FILE)
cd $(DIR_APP) && patch -Np1 < $(DIR_SRC)/src/patches/squid/01_squid-gcc11.patch + cd $(DIR_APP) && patch -Np1 < $(DIR_SRC)/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch
cd $(DIR_APP) && autoreconf -vfi cd $(DIR_APP)/libltdl && autoreconf -vfi diff --git a/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch b/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch new file mode 100644 index 000000000..9f592c7e8 --- /dev/null +++ b/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch @@ -0,0 +1,3825 @@ +commit 1332f8d606485b5a2f57277634c2f6f7855bd9a6 (refs/remotes/origin/v5, refs/remotes/github/v5, refs/heads/v5) +Author: Alex Rousskov rousskov@measurement-factory.com +Date: 2022-02-08 03:56:43 -0500 + + Bug 5055: FATAL FwdState::noteDestinationsEnd exception: opening (#967) + + * Bug 5055: FATAL FwdState::noteDestinationsEnd exception: opening + + The bug was caused by commit 25b0ce4. Other known symptoms are: + + assertion failed: store.cc:1793: "isEmpty()" + assertion failed: FwdState.cc:501: "serverConnection() == conn" + assertion failed: FwdState.cc:1037: "!opening()" + + This change has several overlapping parts. Unfortunately, merging + individual parts is both difficult and likely to cause crashes. + + ## Part 1: Bug 5055. + + FwdState used to check serverConn to decide whether to open a connection + to forward the request. Since commit 25b0ce4, a nil serverConn pointer + no longer implies that a new connection should be opened: FwdState + helper jobs may already be working on preparing an existing open + connection (e.g., sending a CONNECT request or negotiating encryption). + + Bad serverConn checks in both FwdState::noteDestination() and + FwdState::noteDestinationsEnd() methods led to extra connectStart() + calls creating two conflicting concurrent helper jobs. + + To fix this, we replaced direct serverConn inspection with a + usingDestination() call which also checks whether we are waiting for a + helper job. Testing that fix exposed another set of bugs: The helper job + pointers or in-job connections left stale/set after forwarding failures. + The changes described below addressed (most of) those problems. + + ## Part 2: Connection establishing helper jobs and their callbacks + + A proper fix for Bug 5055 required answering a difficult question: When + should a dying job call its callbacks? We only found one answer which + required cooperation from the job creator and led to the following + rules: + + * AsyncJob destructors must not call any callbacks. + + * AsyncJob::swanSong() is responsible for async-calling any remaining + (i.e. set, uncalled, and uncancelled) callbacks. + + * AsyncJob::swanSong() is called (only) for started jobs. + + * AsyncJob destructing sequence should validate that no callbacks remain + uncalled for started jobs. + + ... where an AsyncJob x is considered "started" if AsyncJob::Start(x) + has returned without throwing. + + A new JobWait class helps job creators follow these rules while keeping + track on in-progress helper jobs and killing no-longer-needed helpers. + + Also fixed very similar bugs in tunnel.cc code. + + ## Part 3: ConnOpener fixes + + 1. Many ConnOpener users are written to keep a ConnectionPointer to the + destination given to ConnOpener. This means that their connection + magically opens when ConnOpener successfully connects, before + ConnOpener has a chance to notify the user about the changes. Having + multiple concurrent connection owners is always dangerous, and the + user cannot even have a close handler registered for its now-open + connection. When something happens to ConnOpener or its answer, the + user job may be in trouble. Now, ConnOpener callers no longer pass + Connection objects they own, cloning them as needed. That adjustment + required adjustment 2: + + 2. Refactored ConnOpener users to stop assuming that the answer contains + a pointer to their connection object. After adjustment 1 above, it + does not. HappyConnOpener relied on that assumption quite a bit so we + had to refactor to use two custom callback methods instead of one + with a complicated if-statement distinguishing prime from spare + attempts. This refactoring is an overall improvement because it + simplifies the code. Other ConnOpener users just needed to remove a + few no longer valid paranoid assertions/Musts. + + 3. ConnOpener users were forced to remember to close params.conn when + processing negative answers. Some, naturally, forgot, triggering + warnings about orphaned connections (e.g., Ident and TcpLogger). + ConnOpener now closes its open connection before sending a negative + answer. + + 4. ConnOpener would trigger orphan connection warnings if the job ended + after opening the connection but without supplying the connection to + the requestor (e.g., because the requestor has gone away). Now, + ConnOpener explicitly closes its open connection if it has not been + sent to the requestor. + + Also fixed Comm::ConnOpener::cleanFd() debugging that was incorrectly + saying that the method closes the temporary descriptor. + + Also fixed ConnOpener callback's syncWithComm(): The stale + CommConnectCbParams override was testing unused (i.e. always negative) + CommConnectCbParams::fd and was trying to cancel the callback that most + (possibly all) recipients rely on: ConnOpener users expect a negative + answer rather than no answer at all. + + Also, after comparing the needs of two old/existing and a temporary + added ("clone everything") Connection cloning method callers, we decided + there is no need to maintain three different methods. All existing + callers should be fine with a single method because none of them suffers + from "extra" copying of members that others need. Right now, the new + cloneProfile() method copies everything except FD and a few + special-purpose members (with documented reasons for not copying). + + Also added Comm::Connection::cloneDestinationDetails() debugging to + simplify tracking dependencies between half-baked Connection objects + carrying destination/flags/other metadata and open Connection objects + created by ConnOpener using that metadata (which are later delivered to + ConnOpener users and, in some cases, replace those half-baked + connections mentioned earlier. Long-term, we need to find a better way + to express these and other Connection states/stages than comments and + debugging messages. + + ## Part 4: Comm::Connection closure callbacks + + We improved many closure callbacks to make sure (to the extent possible) + that Connection and other objects are in sync with Comm. There are lots + of small bugs, inconsistencies, and other problems in Connection closure + handlers. It is not clear whether any of those problems could result in + serious runtime errors or leaks. In theory, the rest of the code could + neutralize their negative side effects. However, even in that case, it + was just a matter of time before the next bug will bite us due to stale + Connection::fd and such. These changes themselves carry elevated risk, + but they get us closer to reliable code as far as Connection maintenance + is concerned; otherwise, we will keep chasing their deadly side effects. + + Long-term, all these manual efforts to keep things in sync should become + unnecessary with the introduction of appropriate Connection ownership + APIs that automatically maintain the corresponding environments (TODO). + + ## Part 5: Other notable improvements in the adjusted code + + Improved Security::PeerConnector::serverConn and + Http::Tunneler::connection management, especially when sending negative + answers. When sending a negative answer, we would set answer().conn to + an open connection, async-send that answer, and then hurry to close the + connection using our pointer to the shared Connection object. If + everything went according to the plan, the recipient would get a non-nil + but closed Connection object. Now, a negative answer simply means no + connection at all. Same for a tunneled answer. + + Refactored ICAP connection-establishing code to to delay Connection + ownership until the ICAP connection is fully ready. This change + addresses primary Connection ownership concerns (as they apply to this + ICAP code) except orphaning of the temporary Connection object by helper + job start exceptions (now an explicit XXX). For example, the transaction + no longer shares a Connection object with ConnOpener and + IcapPeerConnector jobs. + + Probably fixed a bug where PeerConnector::negotiate() assumed that a + sslFinalized() does not return true after callBack(). It may (e.g., when + CertValidationHelper::Submit() throws). Same for + PeekingPeerConnector::checkForPeekAndSpliceMatched(). + + Fixed FwdState::advanceDestination() bug that did not save + ERR_GATEWAY_FAILURE details and "lost" the address of that failed + destination, making it unavailable to future retries (if any). + + Polished PeerPoolMgr, Ident, and Gopher code to be able to fix similar + job callback and connection management issues. + + Polished AsyncJob::Start() API. Start() used to return a job pointer, + but that was a bad idea: + + * It implies that a failed Start() will return a nil pointer, and that + the caller should check the result. Neither is true. + + * It encourages callers to dereference the returned pointer to further + adjust the job. That technically works (today) but violates the rules + of communicating with an async job. The Start() method is the boundary + after which the job is deemed asynchronous. + + Also removed old "and wait for..." post-Start() comments because the + code itself became clear enough, and those comments were becoming + increasingly stale (because they duplicated the callback names above + them). + + Fix Tunneler and PeerConnector handling of last-resort callback + requirements. Events like handleStopRequest() and callException() stop + the job but should not be reported as a BUG (e.g., it would be up to the + callException() to decide how to report the caught exception). There + might (or there will) be other, similar cases where the job is stopped + prematurely for some non-BUG reason beyond swanSong() knowledge. The + existence of non-bug cases does not mean there could be no bugs worth + reporting here, but until they can be identified more reliably than all + these benign/irrelevant cases, reporting no BUGs is a (much) lesser + evil. + + TODO: Revise AsyncJob::doneAll(). Many of its overrides are written to + check for both positive (i.e. mission accomplished) and negative (i.e. + mission cancelled or cannot be accomplished) conditions, but the latter + is usually unnecessary, especially after we added handleStopRequest() + API to properly support external job cancellation events. Many doneAll() + overrides can probably be greatly simplified. + + ---- + + Cherry picked SQUID-568-premature-serverconn-use-v5 commit 22b5f78. + + * fixup: Cherry-picked SQUID-568-premature-serverconn-use PR-time fixes + + In git log order: + * e64a6c1: Undone an out-of-scope change and added a missing 'auto' + * aeaf83d: Fixed an 'unused parameter' error + * f49d009: fixup: No explicit destructors with implicit copying methods + * c30c37f: Removed an unnecessary explicit copy constructor + * 012f5ec: Excluded Connection::rfc931 from cloning + * 366c78a: ICAP: do not set connect_timeout on the established conn... + + This branch is now in sync with SQUID-568-premature-serverconn-use (S) + commit e64a6c1 (except for official changes merged from master/v6 into S + closer to the end of PR 877 work (i.e. S' merge commit 0a7432a). + + * Fix FATAL ServiceRep::putConnection exception: theBusyConns > 0 + + FATAL: check failed: theBusyConns > 0 + exception location: ServiceRep.cc(163) putConnection + + Since master/v6 commit 2b6b1bc, a timeout on a ready-to-shovel + Squid-service ICAP connection was decrementing theBusyConns level one + extra time because Adaptation::Icap::Xaction::noteCommTimedout() started + calling both noteConnectionFailed() and closeConnection(). Depending on + the actual theBusyConns level, the extra decrement could result in FATAL + errors later, when putConnection() was called (for a different ICAP + transaction) with zero theBusyConns in an exception-unprotected context. + + Throughout these changes, Xaction still counts the above timeouts as a + service failure. That is done by calling ServiceRep::noteFailure() from + Xaction::callException(), including in timeout cases described above. + + ---- + + Cherry-picked master/v6 commit a8ac892. + +diff --git a/src/BodyPipe.cc b/src/BodyPipe.cc +index 9abe7d48b..37a3b32cd 100644 +--- a/src/BodyPipe.cc ++++ b/src/BodyPipe.cc +@@ -335,6 +335,7 @@ BodyPipe::startAutoConsumptionIfNeeded() + return; + + theConsumer = new BodySink(this); ++ AsyncJob::Start(theConsumer); + debugs(91,7, HERE << "starting auto consumption" << status()); + scheduleBodyDataNotification(); + } +diff --git a/src/CommCalls.cc b/src/CommCalls.cc +index 4cd503dcb..cca23f6e3 100644 +--- a/src/CommCalls.cc ++++ b/src/CommCalls.cc +@@ -53,6 +53,9 @@ CommAcceptCbParams::CommAcceptCbParams(void *aData): + { + } + ++// XXX: Add CommAcceptCbParams::syncWithComm(). Adjust syncWithComm() API if all ++// implementations always return true. ++ + void + CommAcceptCbParams::print(std::ostream &os) const + { +@@ -72,13 +75,24 @@ CommConnectCbParams::CommConnectCbParams(void *aData): + bool + CommConnectCbParams::syncWithComm() + { +- // drop the call if the call was scheduled before comm_close but +- // is being fired after comm_close +- if (fd >= 0 && fd_table[fd].closing()) { +- debugs(5, 3, HERE << "dropping late connect call: FD " << fd); +- return false; ++ assert(conn); ++ ++ // change parameters if this is a successful callback that was scheduled ++ // after its Comm-registered connection started to close ++ ++ if (flag != Comm::OK) { ++ assert(!conn->isOpen()); ++ return true; // not a successful callback; cannot go out of sync + } +- return true; // now we are in sync and can handle the call ++ ++ assert(conn->isOpen()); ++ if (!fd_table[conn->fd].closing()) ++ return true; // no closing in progress; in sync (for now) ++ ++ debugs(5, 3, "converting to Comm::ERR_CLOSING: " << conn); ++ conn->noteClosure(); ++ flag = Comm::ERR_CLOSING; ++ return true; // now the callback is in sync with Comm again + } + + /* CommIoCbParams */ +diff --git a/src/Downloader.cc b/src/Downloader.cc +index 298c3a836..012387fb8 100644 +--- a/src/Downloader.cc ++++ b/src/Downloader.cc +@@ -81,6 +81,10 @@ void + Downloader::swanSong() + { + debugs(33, 6, this); ++ ++ if (callback_) // job-ending emergencies like handleStopRequest() or callException() ++ callBack(Http::scInternalServerError); ++ + if (context_) { + context_->finished(); + context_ = nullptr; +@@ -251,6 +255,7 @@ Downloader::downloadFinished() + void + Downloader::callBack(Http::StatusCode const statusCode) + { ++ assert(callback_); + CbDialer *dialer = dynamic_cast<CbDialer*>(callback_->getDialer()); + Must(dialer); + dialer->status = statusCode; +diff --git a/src/FwdState.cc b/src/FwdState.cc +index b721017c3..b69e60c7c 100644 +--- a/src/FwdState.cc ++++ b/src/FwdState.cc +@@ -207,26 +207,22 @@ FwdState::stopAndDestroy(const char *reason) + { + debugs(17, 3, "for " << reason); + +- if (opening()) +- cancelOpening(reason); ++ cancelStep(reason); + + PeerSelectionInitiator::subscribed = false; // may already be false + self = nullptr; // we hope refcounting destroys us soon; may already be nil + /* do not place any code here as this object may be gone by now */ + } + +-/// Notify connOpener that we no longer need connections. We do not have to do +-/// this -- connOpener would eventually notice on its own, but notifying reduces +-/// waste and speeds up spare connection opening for other transactions (that +-/// could otherwise wait for this transaction to use its spare allowance). ++/// Notify a pending subtask, if any, that we no longer need its help. We do not ++/// have to do this -- the subtask job will eventually end -- but ending it ++/// earlier reduces waste and may reduce DoS attack surface. + void +-FwdState::cancelOpening(const char *reason) ++FwdState::cancelStep(const char *reason) + { +- assert(calls.connector); +- calls.connector->cancel(reason); +- calls.connector = nullptr; +- notifyConnOpener(); +- connOpener.clear(); ++ transportWait.cancel(reason); ++ encryptionWait.cancel(reason); ++ peerWait.cancel(reason); + } + + #if STRICT_ORIGINAL_DST +@@ -348,8 +344,7 @@ FwdState::~FwdState() + + entry = NULL; + +- if (opening()) +- cancelOpening("~FwdState"); ++ cancelStep("~FwdState"); + + if (Comm::IsConnOpen(serverConn)) + closeServerConnection("~FwdState"); +@@ -501,8 +496,17 @@ FwdState::fail(ErrorState * errorState) + if (!errorState->request) + errorState->request = request; + +- if (err->type != ERR_ZERO_SIZE_OBJECT) +- return; ++ if (err->type == ERR_ZERO_SIZE_OBJECT) ++ reactToZeroSizeObject(); ++ ++ destinationReceipt = nullptr; // may already be nil ++} ++ ++/// ERR_ZERO_SIZE_OBJECT requires special adjustments ++void ++FwdState::reactToZeroSizeObject() ++{ ++ assert(err->type == ERR_ZERO_SIZE_OBJECT); + + if (pconnRace == racePossible) { + debugs(17, 5, HERE << "pconn race happened"); +@@ -566,6 +570,8 @@ FwdState::complete() + + if (Comm::IsConnOpen(serverConn)) + unregister(serverConn); ++ serverConn = nullptr; ++ destinationReceipt = nullptr; + + storedWholeReply_ = nullptr; + entry->reset(); +@@ -584,6 +590,12 @@ FwdState::complete() + } + } + ++bool ++FwdState::usingDestination() const ++{ ++ return encryptionWait || peerWait || Comm::IsConnOpen(serverConn); ++} ++ + void + FwdState::markStoredReplyAsWhole(const char * const whyWeAreSure) + { +@@ -613,19 +625,19 @@ FwdState::noteDestination(Comm::ConnectionPointer path) + + destinations->addPath(path); + +- if (Comm::IsConnOpen(serverConn)) { ++ if (usingDestination()) { + // We are already using a previously opened connection, so we cannot be +- // waiting for connOpener. We still receive destinations for backup. +- Must(!opening()); ++ // waiting for it. We still receive destinations for backup. ++ Must(!transportWait); + return; + } + +- if (opening()) { ++ if (transportWait) { + notifyConnOpener(); + return; // and continue to wait for FwdState::noteConnection() callback + } + +- // This is the first path candidate we have seen. Create connOpener. ++ // This is the first path candidate we have seen. Use it. + useDestinations(); + } + +@@ -653,19 +665,19 @@ FwdState::noteDestinationsEnd(ErrorState *selectionError) + // if all of them fail, forwarding as whole will fail + Must(!selectionError); // finding at least one path means selection succeeded + +- if (Comm::IsConnOpen(serverConn)) { ++ if (usingDestination()) { + // We are already using a previously opened connection, so we cannot be +- // waiting for connOpener. We were receiving destinations for backup. +- Must(!opening()); ++ // waiting for it. We were receiving destinations for backup. ++ Must(!transportWait); + return; + } + +- Must(opening()); // or we would be stuck with nothing to do or wait for ++ Must(transportWait); // or we would be stuck with nothing to do or wait for + notifyConnOpener(); + // and continue to wait for FwdState::noteConnection() callback + } + +-/// makes sure connOpener knows that destinations have changed ++/// makes sure connection opener knows that the destinations have changed + void + FwdState::notifyConnOpener() + { +@@ -674,7 +686,7 @@ FwdState::notifyConnOpener() + } else { + debugs(17, 7, "notifying about " << *destinations); + destinations->notificationPending = true; +- CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange); ++ CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange); + } + } + +@@ -684,7 +696,7 @@ static void + fwdServerClosedWrapper(const CommCloseCbParams ¶ms) + { + FwdState *fwd = (FwdState *)params.data; +- fwd->serverClosed(params.fd); ++ fwd->serverClosed(); + } + + /**** PRIVATE *****************************************************************/ +@@ -754,13 +766,23 @@ FwdState::checkRetriable() + } + + void +-FwdState::serverClosed(int fd) ++FwdState::serverClosed() + { +- // XXX: fd is often -1 here +- debugs(17, 2, "FD " << fd << " " << entry->url() << " after " << +- (fd >= 0 ? fd_table[fd].pconn.uses : -1) << " requests"); +- if (fd >= 0 && serverConnection()->fd == fd) +- fwdPconnPool->noteUses(fd_table[fd].pconn.uses); ++ // XXX: This method logic attempts to tolerate Connection::close() called ++ // for serverConn earlier, by one of our dispatch()ed jobs. If that happens, ++ // serverConn will already be closed here or, worse, it will already be open ++ // for the next forwarding attempt. The current code prevents us getting ++ // stuck, but the long term solution is to stop sharing serverConn. ++ debugs(17, 2, serverConn); ++ if (Comm::IsConnOpen(serverConn)) { ++ const auto uses = fd_table[serverConn->fd].pconn.uses; ++ debugs(17, 3, "prior uses: " << uses); ++ fwdPconnPool->noteUses(uses); // XXX: May not have come from fwdPconnPool ++ serverConn->noteClosure(); ++ } ++ serverConn = nullptr; ++ closeHandler = nullptr; ++ destinationReceipt = nullptr; + retryOrBail(); + } + +@@ -802,6 +824,8 @@ FwdState::handleUnregisteredServerEnd() + { + debugs(17, 2, HERE << "self=" << self << " err=" << err << ' ' << entry->url()); + assert(!Comm::IsConnOpen(serverConn)); ++ serverConn = nullptr; ++ destinationReceipt = nullptr; + retryOrBail(); + } + +@@ -819,6 +843,8 @@ FwdState::advanceDestination(const char *stepDescription, const Comm::Connection + } catch (...) { + debugs (17, 2, "exception while trying to " << stepDescription << ": " << CurrentException); + closePendingConnection(conn, "connection preparation exception"); ++ if (!err) ++ fail(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request, al)); + retryOrBail(); + } + } +@@ -830,8 +856,7 @@ FwdState::noteConnection(HappyConnOpener::Answer &answer) + { + assert(!destinationReceipt); + +- calls.connector = nullptr; +- connOpener.clear(); ++ transportWait.finish(); + + Must(n_tries <= answer.n_tries); // n_tries cannot decrease + n_tries = answer.n_tries; +@@ -843,6 +868,8 @@ FwdState::noteConnection(HappyConnOpener::Answer &answer) + Must(!Comm::IsConnOpen(answer.conn)); + answer.error.clear(); // preserve error for errorSendComplete() + } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) { ++ // The socket could get closed while our callback was queued. Sync ++ // Connection. XXX: Connection::fd may already be stale/invalid here. + // We do not know exactly why the connection got closed, so we play it + // safe, allowing retries only for persistent (reused) connections + if (answer.reused) { +@@ -912,23 +939,24 @@ FwdState::establishTunnelThruProxy(const Comm::ConnectionPointer &conn) + if (!conn->getPeer()->options.no_delay) + tunneler->setDelayId(entry->mem_obj->mostBytesAllowed()); + #endif +- AsyncJob::Start(tunneler); +- // and wait for the tunnelEstablishmentDone() call ++ peerWait.start(tunneler, callback); + } + + /// resumes operations after the (possibly failed) HTTP CONNECT exchange + void + FwdState::tunnelEstablishmentDone(Http::TunnelerAnswer &answer) + { ++ peerWait.finish(); ++ + ErrorState *error = nullptr; + if (!answer.positive()) { +- Must(!Comm::IsConnOpen(answer.conn)); ++ Must(!answer.conn); + error = answer.squidError.get(); + Must(error); + answer.squidError.clear(); // preserve error for fail() + } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) { +- // The socket could get closed while our callback was queued. +- // We close Connection here to sync Connection::fd. ++ // The socket could get closed while our callback was queued. Sync ++ // Connection. XXX: Connection::fd may already be stale/invalid here. + closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone"); + error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al); + } else if (!answer.leftovers.isEmpty()) { +@@ -998,18 +1026,21 @@ FwdState::secureConnectionToPeer(const Comm::ConnectionPointer &conn) + #endif + connector = new Security::BlindPeerConnector(requestPointer, conn, callback, al, sslNegotiationTimeout); + connector->noteFwdPconnUse = true; +- AsyncJob::Start(connector); // will call our callback ++ encryptionWait.start(connector, callback); + } + + /// called when all negotiations with the TLS-speaking peer have been completed + void + FwdState::connectedToPeer(Security::EncryptorAnswer &answer) + { ++ encryptionWait.finish(); ++ + ErrorState *error = nullptr; + if ((error = answer.error.get())) { +- Must(!Comm::IsConnOpen(answer.conn)); ++ assert(!answer.conn); + answer.error.clear(); // preserve error for errorSendComplete() + } else if (answer.tunneled) { ++ assert(!answer.conn); + // TODO: When ConnStateData establishes tunnels, its state changes + // [in ways that may affect logging?]. Consider informing + // ConnStateData about our tunnel or otherwise unifying tunnel +@@ -1019,6 +1050,8 @@ FwdState::connectedToPeer(Security::EncryptorAnswer &answer) + complete(); // destroys us + return; + } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) { ++ // The socket could get closed while our callback was queued. Sync ++ // Connection. XXX: Connection::fd may already be stale/invalid here. + closePendingConnection(answer.conn, "conn was closed while waiting for connectedToPeer"); + error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al); + } +@@ -1091,22 +1124,20 @@ FwdState::connectStart() + Must(!request->pinnedConnection()); + + assert(!destinations->empty()); +- assert(!opening()); ++ assert(!usingDestination()); + + // Ditch error page if it was created before. + // A new one will be created if there's another problem + delete err; + err = nullptr; + request->clearError(); +- serverConn = nullptr; +- destinationReceipt = nullptr; + + request->hier.startPeerClock(); + +- calls.connector = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this)); ++ AsyncCall::Pointer callback = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this)); + + HttpRequest::Pointer cause = request; +- const auto cs = new HappyConnOpener(destinations, calls.connector, cause, start_t, n_tries, al); ++ const auto cs = new HappyConnOpener(destinations, callback, cause, start_t, n_tries, al); + cs->setHost(request->url.host()); + bool retriable = checkRetriable(); + if (!retriable && Config.accessList.serverPconnForNonretriable) { +@@ -1118,8 +1149,7 @@ FwdState::connectStart() + cs->setRetriable(retriable); + cs->allowPersistent(pconnRace != raceHappened); + destinations->notificationPending = true; // start() is async +- connOpener = cs; +- AsyncJob::Start(cs); ++ transportWait.start(cs, callback); + } + + /// send request on an existing connection dedicated to the requesting client +diff --git a/src/FwdState.h b/src/FwdState.h +index de75f33cc..ebf5f82b1 100644 +--- a/src/FwdState.h ++++ b/src/FwdState.h +@@ -9,13 +9,12 @@ + #ifndef SQUID_FORWARD_H + #define SQUID_FORWARD_H + +-#include "base/CbcPointer.h" + #include "base/forward.h" ++#include "base/JobWait.h" + #include "base/RefCount.h" + #include "clients/forward.h" + #include "comm.h" + #include "comm/Connection.h" +-#include "comm/ConnOpener.h" + #include "error/forward.h" + #include "fde.h" + #include "http/StatusCode.h" +@@ -38,7 +37,6 @@ class ResolvedPeers; + typedef RefCount<ResolvedPeers> ResolvedPeersPointer; + + class HappyConnOpener; +-typedef CbcPointer<HappyConnOpener> HappyConnOpenerPointer; + class HappyConnOpenerAnswer; + + /// Sets initial TOS value and Netfilter for the future outgoing connection. +@@ -87,7 +85,7 @@ public: + void handleUnregisteredServerEnd(); + int reforward(); + bool reforwardableStatus(const Http::StatusCode s) const; +- void serverClosed(int fd); ++ void serverClosed(); + void connectStart(); + void connectDone(const Comm::ConnectionPointer & conn, Comm::Flag status, int xerrno); + bool checkRetry(); +@@ -115,6 +113,9 @@ private: + /* PeerSelectionInitiator API */ + virtual void noteDestination(Comm::ConnectionPointer conn) override; + virtual void noteDestinationsEnd(ErrorState *selectionError) override; ++ /// whether the successfully selected path destination or the established ++ /// server connection is still in use ++ bool usingDestination() const; + + void noteConnection(HappyConnOpenerAnswer &); + +@@ -157,13 +158,10 @@ private: + /// \returns the time left for this connection to become connected or 1 second if it is less than one second left + time_t connectingTimeout(const Comm::ConnectionPointer &conn) const; + +- /// whether we are waiting for HappyConnOpener +- /// same as calls.connector but may differ from connOpener.valid() +- bool opening() const { return connOpener.set(); } +- +- void cancelOpening(const char *reason); ++ void cancelStep(const char *reason); + + void notifyConnOpener(); ++ void reactToZeroSizeObject(); + + void updateAleWithFinalError(); + +@@ -182,11 +180,6 @@ private: + time_t start_t; + int n_tries; ///< the number of forwarding attempts so far + +- // AsyncCalls which we set and may need cancelling. +- struct { +- AsyncCall::Pointer connector; ///< a call linking us to the ConnOpener producing serverConn. +- } calls; +- + struct { + bool connected_okay; ///< TCP link ever opened properly. This affects retry of POST,PUT,CONNECT,etc + bool dont_retry; +@@ -194,7 +187,16 @@ private: + bool destinationsFound; ///< at least one candidate path found + } flags; + +- HappyConnOpenerPointer connOpener; ///< current connection opening job ++ /// waits for a transport connection to the peer to be established/opened ++ JobWait<HappyConnOpener> transportWait; ++ ++ /// waits for the established transport connection to be secured/encrypted ++ JobWaitSecurity::PeerConnector encryptionWait; ++ ++ /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated ++ /// over the (encrypted, if needed) transport connection to that cache_peer ++ JobWaitHttp::Tunneler peerWait; ++ + ResolvedPeersPointer destinations; ///< paths for forwarding the request + Comm::ConnectionPointer serverConn; ///< a successfully opened connection to a server. + PeerConnectionPointer destinationReceipt; ///< peer selection result (or nil) +diff --git a/src/HappyConnOpener.cc b/src/HappyConnOpener.cc +index 6e0872506..6d83ff14f 100644 +--- a/src/HappyConnOpener.cc ++++ b/src/HappyConnOpener.cc +@@ -18,6 +18,7 @@ + #include "neighbors.h" + #include "pconn.h" + #include "PeerPoolMgr.h" ++#include "sbuf/Stream.h" + #include "SquidConfig.h" + + CBDATA_CLASS_INIT(HappyConnOpener); +@@ -330,6 +331,8 @@ HappyConnOpener::HappyConnOpener(const ResolvedPeers::Pointer &dests, const Asyn + fwdStart(aFwdStart), + callback_(aCall), + destinations(dests), ++ prime(&HappyConnOpener::notePrimeConnectDone, "HappyConnOpener::notePrimeConnectDone"), ++ spare(&HappyConnOpener::noteSpareConnectDone, "HappyConnOpener::noteSpareConnectDone"), + ale(anAle), + cause(request), + n_tries(tries) +@@ -410,33 +413,43 @@ HappyConnOpener::swanSong() + AsyncJob::swanSong(); + } + ++/// HappyConnOpener::Attempt printer for debugging ++std::ostream & ++operator <<(std::ostream &os, const HappyConnOpener::Attempt &attempt) ++{ ++ if (!attempt.path) ++ os << '-'; ++ else if (attempt.path->isOpen()) ++ os << "FD " << attempt.path->fd; ++ else if (attempt.connWait) ++ os << attempt.connWait; ++ else // destination is known; connection closed (and we are not opening any) ++ os << attempt.path->id; ++ return os; ++} ++ + const char * + HappyConnOpener::status() const + { +- static MemBuf buf; +- buf.reset(); ++ // TODO: In a redesigned status() API, the caller may mimic this approach. ++ static SBuf buf; ++ buf.clear(); + +- buf.append(" [", 2); ++ SBufStream os(buf); ++ ++ os.write(" [", 2); + if (stopReason) +- buf.appendf("Stopped, reason:%s", stopReason); +- if (prime) { +- if (prime.path && prime.path->isOpen()) +- buf.appendf(" prime FD %d", prime.path->fd); +- else if (prime.connector) +- buf.appendf(" prime call%ud", prime.connector->id.value); +- } +- if (spare) { +- if (spare.path && spare.path->isOpen()) +- buf.appendf(" spare FD %d", spare.path->fd); +- else if (spare.connector) +- buf.appendf(" spare call%ud", spare.connector->id.value); +- } ++ os << "Stopped:" << stopReason; ++ if (prime) ++ os << "prime:" << prime; ++ if (spare) ++ os << "spare:" << spare; + if (n_tries) +- buf.appendf(" tries %d", n_tries); +- buf.appendf(" %s%u]", id.prefix(), id.value); +- buf.terminate(); ++ os << " tries:" << n_tries; ++ os << ' ' << id << ']'; + +- return buf.content(); ++ buf = os.buf(); ++ return buf.c_str(); + } + + /// Create "503 Service Unavailable" or "504 Gateway Timeout" error depending +@@ -516,7 +529,7 @@ void + HappyConnOpener::startConnecting(Attempt &attempt, PeerConnectionPointer &dest) + { + Must(!attempt.path); +- Must(!attempt.connector); ++ Must(!attempt.connWait); + Must(dest); + + const auto bumpThroughPeer = cause->flags.sslBumped && dest->getPeer(); +@@ -552,51 +565,52 @@ HappyConnOpener::openFreshConnection(Attempt &attempt, PeerConnectionPointer &de + entry->mem_obj->checkUrlChecksum(); + #endif + +- GetMarkingsToServer(cause.getRaw(), *dest); ++ const auto conn = dest->cloneProfile(); ++ GetMarkingsToServer(cause.getRaw(), *conn); + +- // ConnOpener modifies its destination argument so we reset the source port +- // in case we are reusing the destination already used by our predecessor. +- dest->local.port(0); + ++n_tries; + + typedef CommCbMemFunT<HappyConnOpener, CommConnectCbParams> Dialer; +- AsyncCall::Pointer callConnect = JobCallback(48, 5, Dialer, this, HappyConnOpener::connectDone); ++ AsyncCall::Pointer callConnect = asyncCall(48, 5, attempt.callbackMethodName, ++ Dialer(this, attempt.callbackMethod)); + const time_t connTimeout = dest->connectTimeout(fwdStart); +- Comm::ConnOpener *cs = new Comm::ConnOpener(dest, callConnect, connTimeout); +- if (!dest->getPeer()) ++ auto cs = new Comm::ConnOpener(conn, callConnect, connTimeout); ++ if (!conn->getPeer()) + cs->setHost(host_); + +- attempt.path = dest; +- attempt.connector = callConnect; +- attempt.opener = cs; ++ attempt.path = dest; // but not the being-opened conn! ++ attempt.connWait.start(cs, callConnect); ++} + +- AsyncJob::Start(cs); ++/// Comm::ConnOpener callback for the prime connection attempt ++void ++HappyConnOpener::notePrimeConnectDone(const CommConnectCbParams ¶ms) ++{ ++ handleConnOpenerAnswer(prime, params, "new prime connection"); + } + +-/// called by Comm::ConnOpener objects after a prime or spare connection attempt +-/// completes (successfully or not) ++/// Comm::ConnOpener callback for the spare connection attempt + void +-HappyConnOpener::connectDone(const CommConnectCbParams ¶ms) ++HappyConnOpener::noteSpareConnectDone(const CommConnectCbParams ¶ms) + { +- Must(params.conn); +- const bool itWasPrime = (params.conn == prime.path); +- const bool itWasSpare = (params.conn == spare.path); +- Must(itWasPrime != itWasSpare); +- +- PeerConnectionPointer handledPath; +- if (itWasPrime) { +- handledPath = prime.path; +- prime.finish(); +- } else { +- handledPath = spare.path; +- spare.finish(); +- if (gotSpareAllowance) { +- TheSpareAllowanceGiver.jobUsedAllowance(); +- gotSpareAllowance = false; +- } ++ if (gotSpareAllowance) { ++ TheSpareAllowanceGiver.jobUsedAllowance(); ++ gotSpareAllowance = false; + } ++ handleConnOpenerAnswer(spare, params, "new spare connection"); ++} ++ ++/// prime/spare-agnostic processing of a Comm::ConnOpener result ++void ++HappyConnOpener::handleConnOpenerAnswer(Attempt &attempt, const CommConnectCbParams ¶ms, const char *what) ++{ ++ Must(params.conn); ++ ++ // finalize the previously selected path before attempt.finish() forgets it ++ auto handledPath = attempt.path; ++ handledPath.finalize(params.conn); // closed on errors ++ attempt.finish(); + +- const char *what = itWasPrime ? "new prime connection" : "new spare connection"; + if (params.flag == Comm::OK) { + sendSuccess(handledPath, false, what); + return; +@@ -605,7 +619,6 @@ HappyConnOpener::connectDone(const CommConnectCbParams ¶ms) + debugs(17, 8, what << " failed: " << params.conn); + if (const auto peer = params.conn->getPeer()) + peerConnectFailed(peer); +- params.conn->close(); // TODO: Comm::ConnOpener should do this instead. + + // remember the last failure (we forward it if we cannot connect anywhere) + lastFailedConnection = handledPath; +@@ -881,13 +894,23 @@ HappyConnOpener::ranOutOfTimeOrAttempts() const + return false; + } + ++HappyConnOpener::Attempt::Attempt(const CallbackMethod method, const char *methodName): ++ callbackMethod(method), ++ callbackMethodName(methodName) ++{ ++} ++ ++void ++HappyConnOpener::Attempt::finish() ++{ ++ connWait.finish(); ++ path = nullptr; ++} ++ + void + HappyConnOpener::Attempt::cancel(const char *reason) + { +- if (connector) { +- connector->cancel(reason); +- CallJobHere(17, 3, opener, Comm::ConnOpener, noteAbort); +- } +- clear(); ++ connWait.cancel(reason); ++ path = nullptr; + } + +diff --git a/src/HappyConnOpener.h b/src/HappyConnOpener.h +index 4f3135c60..c57c431ad 100644 +--- a/src/HappyConnOpener.h ++++ b/src/HappyConnOpener.h +@@ -156,22 +156,28 @@ private: + /// a connection opening attempt in progress (or falsy) + class Attempt { + public: ++ /// HappyConnOpener method implementing a ConnOpener callback ++ using CallbackMethod = void (HappyConnOpener::*)(const CommConnectCbParams &); ++ ++ Attempt(const CallbackMethod method, const char *methodName); ++ + explicit operator bool() const { return static_cast<bool>(path); } + + /// reacts to a natural attempt completion (successful or otherwise) +- void finish() { clear(); } ++ void finish(); + + /// aborts an in-progress attempt + void cancel(const char *reason); + + PeerConnectionPointer path; ///< the destination we are connecting to +- AsyncCall::Pointer connector; ///< our opener callback +- Comm::ConnOpener::Pointer opener; ///< connects to path and calls us + +- private: +- /// cleans up after the attempt ends (successfully or otherwise) +- void clear() { path = nullptr; connector = nullptr; opener = nullptr; } ++ /// waits for a connection to the peer to be established/opened ++ JobWaitComm::ConnOpener connWait; ++ ++ const CallbackMethod callbackMethod; ///< ConnOpener calls this method ++ const char * const callbackMethodName; ///< for callbackMethod debugging + }; ++ friend std::ostream &operator <<(std::ostream &, const Attempt &); + + /* AsyncJob API */ + virtual void start() override; +@@ -190,7 +196,9 @@ private: + void openFreshConnection(Attempt &, PeerConnectionPointer &); + bool reuseOldConnection(PeerConnectionPointer &); + +- void connectDone(const CommConnectCbParams &); ++ void notePrimeConnectDone(const CommConnectCbParams &); ++ void noteSpareConnectDone(const CommConnectCbParams &); ++ void handleConnOpenerAnswer(Attempt &, const CommConnectCbParams &, const char *connDescription); + + void checkForNewConnection(); + +diff --git a/src/PeerPoolMgr.cc b/src/PeerPoolMgr.cc +index 2caa09f44..7423dd669 100644 +--- a/src/PeerPoolMgr.cc ++++ b/src/PeerPoolMgr.cc +@@ -43,9 +43,8 @@ public: + PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"), + peer(cbdataReference(aPeer)), + request(), +- opener(), +- securer(), +- closer(), ++ transportWait(), ++ encryptionWait(), + addrUsed(0) + { + } +@@ -90,7 +89,7 @@ PeerPoolMgr::doneAll() const + void + PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms) + { +- opener = NULL; ++ transportWait.finish(); + + if (!validPeer()) { + debugs(48, 3, "peer gone"); +@@ -100,9 +99,6 @@ PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms) + } + + if (params.flag != Comm::OK) { +- /* it might have been a timeout with a partially open link */ +- if (params.conn != NULL) +- params.conn->close(); + peerConnectFailed(peer); + checkpoint("conn opening failure"); // may retry + return; +@@ -112,20 +108,16 @@ PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms) + + // Handle TLS peers. + if (peer->secure.encryptTransport) { +- typedef CommCbMemFunT<PeerPoolMgr, CommCloseCbParams> CloserDialer; +- closer = JobCallback(48, 3, CloserDialer, this, +- PeerPoolMgr::handleSecureClosure); +- comm_add_close_handler(params.conn->fd, closer); +- +- securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer", +- MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer)); ++ // XXX: Exceptions orphan params.conn ++ AsyncCall::Pointer callback = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer", ++ MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer)); + + const int peerTimeout = peerConnectTimeout(peer); + const int timeUsed = squid_curtime - params.conn->startTime(); + // Use positive timeout when less than one second is left for conn. + const int timeLeft = positiveTimeout(peerTimeout - timeUsed); +- auto *connector = new Security::BlindPeerConnector(request, params.conn, securer, nullptr, timeLeft); +- AsyncJob::Start(connector); // will call our callback ++ const auto connector = new Security::BlindPeerConnector(request, params.conn, callback, nullptr, timeLeft); ++ encryptionWait.start(connector, callback); + return; + } + +@@ -144,16 +136,7 @@ PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn) + void + PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer) + { +- Must(securer != NULL); +- securer = NULL; +- +- if (closer != NULL) { +- if (answer.conn != NULL) +- comm_remove_close_handler(answer.conn->fd, closer); +- else +- closer->cancel("securing completed"); +- closer = NULL; +- } ++ encryptionWait.finish(); + + if (!validPeer()) { + debugs(48, 3, "peer gone"); +@@ -162,35 +145,33 @@ PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer) + return; + } + ++ assert(!answer.tunneled); + if (answer.error.get()) { +- if (answer.conn != NULL) +- answer.conn->close(); ++ assert(!answer.conn); + // PeerConnector calls peerConnectFailed() for us; + checkpoint("conn securing failure"); // may retry + return; + } + +- pushNewConnection(answer.conn); +-} ++ assert(answer.conn); + +-void +-PeerPoolMgr::handleSecureClosure(const CommCloseCbParams ¶ms) +-{ +- Must(closer != NULL); +- Must(securer != NULL); +- securer->cancel("conn closed by a 3rd party"); +- securer = NULL; +- closer = NULL; +- // allow the closing connection to fully close before we check again +- Checkpoint(this, "conn closure while securing"); ++ // The socket could get closed while our callback was queued. Sync ++ // Connection. XXX: Connection::fd may already be stale/invalid here. ++ if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) { ++ answer.conn->noteClosure(); ++ checkpoint("external connection closure"); // may retry ++ return; ++ } ++ ++ pushNewConnection(answer.conn); + } + + void + PeerPoolMgr::openNewConnection() + { + // KISS: Do nothing else when we are already doing something. +- if (opener != NULL || securer != NULL || shutting_down) { +- debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down); ++ if (transportWait || encryptionWait || shutting_down) { ++ debugs(48, 7, "busy: " << transportWait << '|' << encryptionWait << '|' << shutting_down); + return; // there will be another checkpoint when we are done opening/securing + } + +@@ -227,9 +208,9 @@ PeerPoolMgr::openNewConnection() + + const int ctimeout = peerConnectTimeout(peer); + typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer; +- opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection); +- Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout); +- AsyncJob::Start(cs); ++ AsyncCall::Pointer callback = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection); ++ const auto cs = new Comm::ConnOpener(conn, callback, ctimeout); ++ transportWait.start(cs, callback); + } + + void +diff --git a/src/PeerPoolMgr.h b/src/PeerPoolMgr.h +index 217994393..6da61b10b 100644 +--- a/src/PeerPoolMgr.h ++++ b/src/PeerPoolMgr.h +@@ -10,6 +10,7 @@ + #define SQUID_PEERPOOLMGR_H + + #include "base/AsyncJob.h" ++#include "base/JobWait.h" + #include "comm/forward.h" + #include "security/forward.h" + +@@ -54,18 +55,19 @@ protected: + /// Security::PeerConnector callback + void handleSecuredPeer(Security::EncryptorAnswer &answer); + +- /// called when the connection we are trying to secure is closed by a 3rd party +- void handleSecureClosure(const CommCloseCbParams ¶ms); +- + /// the final step in connection opening (and, optionally, securing) sequence + void pushNewConnection(const Comm::ConnectionPointer &conn); + + private: + CachePeer *peer; ///< the owner of the pool we manage + RefCount<HttpRequest> request; ///< fake HTTP request for conn opening code +- AsyncCall::Pointer opener; ///< whether we are opening a connection +- AsyncCall::Pointer securer; ///< whether we are securing a connection +- AsyncCall::Pointer closer; ///< monitors conn while we are securing it ++ ++ /// waits for a transport connection to the peer to be established/opened ++ JobWaitComm::ConnOpener transportWait; ++ ++ /// waits for the established transport connection to be secured/encrypted ++ JobWaitSecurity::BlindPeerConnector encryptionWait; ++ + unsigned int addrUsed; ///< counter for cycling through peer addresses + }; + +diff --git a/src/ResolvedPeers.cc b/src/ResolvedPeers.cc +index 5b2c6740d..06ab02d23 100644 +--- a/src/ResolvedPeers.cc ++++ b/src/ResolvedPeers.cc +@@ -151,7 +151,7 @@ ResolvedPeers::extractFound(const char *description, const Paths::iterator &foun + while (++pathsToSkip < paths_.size() && !paths_[pathsToSkip].available) {} + } + +- const auto cleanPath = path.connection->cloneDestinationDetails(); ++ const auto cleanPath = path.connection->cloneProfile(); + return PeerConnectionPointer(cleanPath, found - paths_.begin()); + } + +diff --git a/src/adaptation/icap/ModXact.cc b/src/adaptation/icap/ModXact.cc +index bbeebdbb5..982f38f8d 100644 +--- a/src/adaptation/icap/ModXact.cc ++++ b/src/adaptation/icap/ModXact.cc +@@ -187,8 +187,7 @@ void Adaptation::Icap::ModXact::startWriting() + openConnection(); + } + +-// connection with the ICAP service established +-void Adaptation::Icap::ModXact::handleCommConnected() ++void Adaptation::Icap::ModXact::startShoveling() + { + Must(state.writing == State::writingConnect); + +diff --git a/src/adaptation/icap/ModXact.h b/src/adaptation/icap/ModXact.h +index f8988ac29..2fda1318b 100644 +--- a/src/adaptation/icap/ModXact.h ++++ b/src/adaptation/icap/ModXact.h +@@ -155,10 +155,11 @@ public: + virtual void noteBodyProductionEnded(BodyPipe::Pointer); + virtual void noteBodyProducerAborted(BodyPipe::Pointer); + +- // comm handlers +- virtual void handleCommConnected(); ++ /* Xaction API */ ++ virtual void startShoveling(); + virtual void handleCommWrote(size_t size); + virtual void handleCommRead(size_t size); ++ + void handleCommWroteHeaders(); + void handleCommWroteBody(); + +diff --git a/src/adaptation/icap/OptXact.cc b/src/adaptation/icap/OptXact.cc +index 5ed4fbfbb..2464ea533 100644 +--- a/src/adaptation/icap/OptXact.cc ++++ b/src/adaptation/icap/OptXact.cc +@@ -37,7 +37,7 @@ void Adaptation::Icap::OptXact::start() + openConnection(); + } + +-void Adaptation::Icap::OptXact::handleCommConnected() ++void Adaptation::Icap::OptXact::startShoveling() + { + scheduleRead(); + +diff --git a/src/adaptation/icap/OptXact.h b/src/adaptation/icap/OptXact.h +index 3811ab48f..725cd6225 100644 +--- a/src/adaptation/icap/OptXact.h ++++ b/src/adaptation/icap/OptXact.h +@@ -30,8 +30,9 @@ public: + OptXact(ServiceRep::Pointer &aService); + + protected: ++ /* Xaction API */ + virtual void start(); +- virtual void handleCommConnected(); ++ virtual void startShoveling(); + virtual void handleCommWrote(size_t size); + virtual void handleCommRead(size_t size); + +diff --git a/src/adaptation/icap/ServiceRep.cc b/src/adaptation/icap/ServiceRep.cc +index 6df475712..fbd896888 100644 +--- a/src/adaptation/icap/ServiceRep.cc ++++ b/src/adaptation/icap/ServiceRep.cc +@@ -112,9 +112,10 @@ void Adaptation::Icap::ServiceRep::noteFailure() + // should be configurable. + } + +-// returns a persistent or brand new connection; negative int on failures ++// TODO: getIdleConnection() and putConnection()/noteConnectionFailed() manage a ++// "used connection slot" resource. Automate that resource tracking (RAII/etc.). + Comm::ConnectionPointer +-Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused) ++Adaptation::Icap::ServiceRep::getIdleConnection(const bool retriableXact) + { + Comm::ConnectionPointer connection; + +@@ -137,7 +138,6 @@ Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused) + else + theIdleConns->closeN(1); + +- reused = Comm::IsConnOpen(connection); + ++theBusyConns; + debugs(93,3, HERE << "got connection: " << connection); + return connection; +@@ -150,7 +150,6 @@ void Adaptation::Icap::ServiceRep::putConnection(const Comm::ConnectionPointer & + // do not pool an idle connection if we owe connections + if (isReusable && excessConnections() == 0) { + debugs(93, 3, HERE << "pushing pconn" << comment); +- commUnsetConnTimeout(conn); + theIdleConns->push(conn); + } else { + debugs(93, 3, HERE << (sendReset ? "RST" : "FIN") << "-closing " << +diff --git a/src/adaptation/icap/ServiceRep.h b/src/adaptation/icap/ServiceRep.h +index f2e893244..cca2df4ab 100644 +--- a/src/adaptation/icap/ServiceRep.h ++++ b/src/adaptation/icap/ServiceRep.h +@@ -85,7 +85,8 @@ public: + bool wantsPreview(const SBuf &urlPath, size_t &wantedSize) const; + bool allows204() const; + bool allows206() const; +- Comm::ConnectionPointer getConnection(bool isRetriable, bool &isReused); ++ /// \returns an idle persistent ICAP connection or nil ++ Comm::ConnectionPointer getIdleConnection(bool isRetriable); + void putConnection(const Comm::ConnectionPointer &conn, bool isReusable, bool sendReset, const char *comment); + void noteConnectionUse(const Comm::ConnectionPointer &conn); + void noteConnectionFailed(const char *comment); +diff --git a/src/adaptation/icap/Xaction.cc b/src/adaptation/icap/Xaction.cc +index 9aacf2e1b..962ed1703 100644 +--- a/src/adaptation/icap/Xaction.cc ++++ b/src/adaptation/icap/Xaction.cc +@@ -13,6 +13,7 @@ + #include "adaptation/icap/Config.h" + #include "adaptation/icap/Launcher.h" + #include "adaptation/icap/Xaction.h" ++#include "base/JobWait.h" + #include "base/TextException.h" + #include "comm.h" + #include "comm/Connection.h" +@@ -79,7 +80,6 @@ Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::Serv + icapRequest(NULL), + icapReply(NULL), + attempts(0), +- connection(NULL), + theService(aService), + commEof(false), + reuseConnection(true), +@@ -87,14 +87,8 @@ Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::Serv + isRepeatable(true), + ignoreLastWrite(false), + waitingForDns(false), +- stopReason(NULL), +- connector(NULL), +- reader(NULL), +- writer(NULL), +- closer(NULL), + alep(new AccessLogEntry), +- al(*alep), +- cs(NULL) ++ al(*alep) + { + debugs(93,3, typeName << " constructed, this=" << this << + " [icapx" << id << ']'); // we should not call virtual status() here +@@ -150,6 +144,8 @@ static void + icapLookupDnsResults(const ipcache_addrs *ia, const Dns::LookupDetails &, void *data) + { + Adaptation::Icap::Xaction *xa = static_cast<Adaptation::Icap::Xaction *>(data); ++ /// TODO: refactor with CallJobHere1, passing either std::optional (after upgrading to C++17) ++ /// or OptionalIp::Address (when it can take non-trivial types) + xa->dnsLookupDone(ia); + } + +@@ -164,21 +160,8 @@ Adaptation::Icap::Xaction::openConnection() + if (!TheConfig.reuse_connections) + disableRetries(); // this will also safely drain pconn pool + +- bool wasReused = false; +- connection = s.getConnection(isRetriable, wasReused); +- +- if (wasReused && Comm::IsConnOpen(connection)) { +- // Set comm Close handler +- // fake the connect callback +- // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead? +- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer; +- CbcPointer<Xaction> self(this); +- Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected); +- dialer.params.conn = connection; +- dialer.params.flag = Comm::OK; +- // fake other parameters by copying from the existing connection +- connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer); +- ScheduleCallHere(connector); ++ if (const auto pconn = s.getIdleConnection(isRetriable)) { ++ useTransportConnection(pconn); + return; + } + +@@ -207,30 +190,22 @@ Adaptation::Icap::Xaction::dnsLookupDone(const ipcache_addrs *ia) + #if WHEN_IPCACHE_NBGETHOSTBYNAME_USES_ASYNC_CALLS + dieOnConnectionFailure(); // throws + #else // take a step back into protected Async call dialing. +- // fake the connect callback +- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer; +- CbcPointer<Xaction> self(this); +- Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected); +- dialer.params.conn = connection; +- dialer.params.flag = Comm::COMM_ERROR; +- // fake other parameters by copying from the existing connection +- connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer); +- ScheduleCallHere(connector); ++ CallJobHere(93, 3, this, Xaction, Xaction::dieOnConnectionFailure); + #endif + return; + } + +- connection = new Comm::Connection; +- connection->remote = ia->current(); +- connection->remote.port(s.cfg().port); +- getOutgoingAddress(NULL, connection); ++ const Comm::ConnectionPointer conn = new Comm::Connection(); ++ conn->remote = ia->current(); ++ conn->remote.port(s.cfg().port); ++ getOutgoingAddress(nullptr, conn); + + // TODO: service bypass status may differ from that of a transaction + typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer; +- connector = JobCallback(93,3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected); +- cs = new Comm::ConnOpener(connection, connector, TheConfig.connect_timeout(service().cfg().bypass)); ++ AsyncCall::Pointer callback = JobCallback(93, 3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected); ++ const auto cs = new Comm::ConnOpener(conn, callback, TheConfig.connect_timeout(service().cfg().bypass)); + cs->setHost(s.cfg().host.termedBuf()); +- AsyncJob::Start(cs.get()); ++ transportWait.start(cs, callback); + } + + /* +@@ -256,6 +231,8 @@ void Adaptation::Icap::Xaction::closeConnection() + closer = NULL; + } + ++ commUnsetConnTimeout(connection); ++ + cancelRead(); // may not work + + if (reuseConnection && !doneWithIo()) { +@@ -275,54 +252,65 @@ void Adaptation::Icap::Xaction::closeConnection() + + writer = NULL; + reader = NULL; +- connector = NULL; + connection = NULL; + } + } + +-// connection with the ICAP service established ++/// called when the connection attempt to an ICAP service completes (successfully or not) + void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io) + { +- cs = NULL; ++ transportWait.finish(); + +- if (io.flag == Comm::TIMEOUT) { +- handleCommTimedout(); ++ if (io.flag != Comm::OK) { ++ dieOnConnectionFailure(); // throws + return; + } + +- Must(connector != NULL); +- connector = NULL; +- +- if (io.flag != Comm::OK) +- dieOnConnectionFailure(); // throws +- +- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer; +- AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout", +- TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout)); +- commSetConnTimeout(io.conn, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall); ++ useTransportConnection(io.conn); ++} + +- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer; +- closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", +- CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); +- comm_add_close_handler(io.conn->fd, closer); ++/// React to the availability of a transport connection to the ICAP service. ++/// The given connection may (or may not) be secured already. ++void ++Adaptation::Icap::Xaction::useTransportConnection(const Comm::ConnectionPointer &conn) ++{ ++ assert(Comm::IsConnOpen(conn)); ++ assert(!connection); + + // If it is a reused connection and the TLS object is built + // we should not negotiate new TLS session +- const auto &ssl = fd_table[io.conn->fd].ssl; ++ const auto &ssl = fd_table[conn->fd].ssl; + if (!ssl && service().cfg().secure.encryptTransport) { ++ // XXX: Exceptions orphan conn. + CbcPointerAdaptation::Icap::Xaction me(this); +- securer = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer", +- MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer)); ++ AsyncCall::Pointer callback = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer", ++ MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer)); + +- auto *sslConnector = new Ssl::IcapPeerConnector(theService, io.conn, securer, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass)); +- AsyncJob::Start(sslConnector); // will call our callback ++ const auto sslConnector = new Ssl::IcapPeerConnector(theService, conn, callback, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass)); ++ ++ encryptionWait.start(sslConnector, callback); + return; + } + +-// ?? fd_table[io.conn->fd].noteUse(icapPconnPool); ++ useIcapConnection(conn); ++} ++ ++/// react to the availability of a fully-ready ICAP connection ++void ++Adaptation::Icap::Xaction::useIcapConnection(const Comm::ConnectionPointer &conn) ++{ ++ assert(!connection); ++ assert(conn); ++ assert(Comm::IsConnOpen(conn)); ++ connection = conn; + service().noteConnectionUse(connection); + +- handleCommConnected(); ++ typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer; ++ closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", ++ CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); ++ comm_add_close_handler(connection->fd, closer); ++ ++ startShoveling(); + } + + void Adaptation::Icap::Xaction::dieOnConnectionFailure() +@@ -367,40 +355,25 @@ void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams &io) + + // communication timeout with the ICAP service + void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &) +-{ +- handleCommTimedout(); +-} +- +-void Adaptation::Icap::Xaction::handleCommTimedout() + { + debugs(93, 2, HERE << typeName << " failed: timeout with " << + theService->cfg().methodStr() << " " << + theService->cfg().uri << status()); + reuseConnection = false; +- const bool whileConnecting = connector != NULL; +- if (whileConnecting) { +- assert(!haveConnection()); +- theService->noteConnectionFailed("timedout"); +- } else +- closeConnection(); // so that late Comm callbacks do not disturb bypass +- throw TexcHere(whileConnecting ? +- "timed out while connecting to the ICAP service" : +- "timed out while talking to the ICAP service"); ++ assert(haveConnection()); ++ closeConnection(); ++ throw TextException("timed out while talking to the ICAP service", Here()); + } + + // unexpected connection close while talking to the ICAP service + void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &) + { +- if (securer != NULL) { +- securer->cancel("Connection closed before SSL negotiation finished"); +- securer = NULL; ++ if (connection) { ++ connection->noteClosure(); ++ connection = nullptr; + } + closer = NULL; +- handleCommClosed(); +-} + +-void Adaptation::Icap::Xaction::handleCommClosed() +-{ + static const auto d = MakeNamedErrorDetail("ICAP_XACT_CLOSE"); + detailError(d); + mustStop("ICAP service connection externally closed"); +@@ -424,7 +397,8 @@ void Adaptation::Icap::Xaction::callEnd() + + bool Adaptation::Icap::Xaction::doneAll() const + { +- return !waitingForDns && !connector && !securer && !reader && !writer && ++ return !waitingForDns && !transportWait && !encryptionWait && ++ !reader && !writer && + Adaptation::Initiate::doneAll(); + } + +@@ -568,7 +542,7 @@ bool Adaptation::Icap::Xaction::doneWriting() const + bool Adaptation::Icap::Xaction::doneWithIo() const + { + return haveConnection() && +- !connector && !reader && !writer && // fast checks, some redundant ++ !transportWait && !reader && !writer && // fast checks, some redundant + doneReading() && doneWriting(); + } + +@@ -608,10 +582,7 @@ void Adaptation::Icap::Xaction::setOutcome(const Adaptation::Icap::XactOutcome & + void Adaptation::Icap::Xaction::swanSong() + { + // kids should sing first and then call the parent method. +- if (cs.valid()) { +- debugs(93,6, HERE << id << " about to notify ConnOpener!"); +- CallJobHere(93, 3, cs, Comm::ConnOpener, noteAbort); +- cs = NULL; ++ if (transportWait || encryptionWait) { + service().noteConnectionFailed("abort"); + } + +@@ -750,20 +721,12 @@ Ssl::IcapPeerConnector::noteNegotiationDone(ErrorState *error) + void + Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer) + { +- Must(securer != NULL); +- securer = NULL; +- +- if (closer != NULL) { +- if (Comm::IsConnOpen(answer.conn)) +- comm_remove_close_handler(answer.conn->fd, closer); +- else +- closer->cancel("securing completed"); +- closer = NULL; +- } ++ encryptionWait.finish(); + ++ assert(!answer.tunneled); + if (answer.error.get()) { +- if (answer.conn != NULL) +- answer.conn->close(); ++ assert(!answer.conn); ++ // TODO: Refactor dieOnConnectionFailure() to be usable here as well. + debugs(93, 2, typeName << + " TLS negotiation to " << service().cfg().uri << " failed"); + service().noteConnectionFailed("failure"); +@@ -774,8 +737,18 @@ Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer) + + debugs(93, 5, "TLS negotiation to " << service().cfg().uri << " complete"); + +- service().noteConnectionUse(answer.conn); ++ assert(answer.conn); ++ ++ // The socket could get closed while our callback was queued. Sync ++ // Connection. XXX: Connection::fd may already be stale/invalid here. ++ if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) { ++ answer.conn->noteClosure(); ++ service().noteConnectionFailed("external TLS connection closure"); ++ static const auto d = MakeNamedErrorDetail("ICAP_XACT_SSL_CLOSE"); ++ detailError(d); ++ throw TexcHere("external closure of the TLS ICAP service connection"); ++ } + +- handleCommConnected(); ++ useIcapConnection(answer.conn); + } + +diff --git a/src/adaptation/icap/Xaction.h b/src/adaptation/icap/Xaction.h +index b66044594..31a6e22fc 100644 +--- a/src/adaptation/icap/Xaction.h ++++ b/src/adaptation/icap/Xaction.h +@@ -12,6 +12,7 @@ + #include "AccessLogEntry.h" + #include "adaptation/icap/ServiceRep.h" + #include "adaptation/Initiate.h" ++#include "base/JobWait.h" + #include "comm/ConnOpener.h" + #include "error/forward.h" + #include "HttpReply.h" +@@ -20,6 +21,10 @@ + + class MemBuf; + ++namespace Ssl { ++class IcapPeerConnector; ++} ++ + namespace Adaptation + { + namespace Icap +@@ -65,12 +70,12 @@ protected: + virtual void start(); + virtual void noteInitiatorAborted(); // TODO: move to Adaptation::Initiate + ++ /// starts sending/receiving ICAP messages ++ virtual void startShoveling() = 0; ++ + // comm hanndlers; called by comm handler wrappers +- virtual void handleCommConnected() = 0; + virtual void handleCommWrote(size_t sz) = 0; + virtual void handleCommRead(size_t sz) = 0; +- virtual void handleCommTimedout(); +- virtual void handleCommClosed(); + + void handleSecuredPeer(Security::EncryptorAnswer &answer); + /// record error detail if possible +@@ -78,7 +83,6 @@ protected: + + void openConnection(); + void closeConnection(); +- void dieOnConnectionFailure(); + bool haveConnection() const; + + void scheduleRead(); +@@ -124,11 +128,13 @@ public: + ServiceRep &service(); + + private: ++ void useTransportConnection(const Comm::ConnectionPointer &); ++ void useIcapConnection(const Comm::ConnectionPointer &); ++ void dieOnConnectionFailure(); + void tellQueryAborted(); + void maybeLog(); + + protected: +- Comm::ConnectionPointer connection; ///< ICAP server connection + Adaptation::Icap::ServiceRep::Pointer theService; + + SBuf readBuf; +@@ -139,13 +145,8 @@ protected: + bool ignoreLastWrite; + bool waitingForDns; ///< expecting a ipcache_nbgethostbyname() callback + +- const char *stopReason; +- +- // active (pending) comm callbacks for the ICAP server connection +- AsyncCall::Pointer connector; + AsyncCall::Pointer reader; + AsyncCall::Pointer writer; +- AsyncCall::Pointer closer; + + AccessLogEntry::Pointer alep; ///< icap.log entry + AccessLogEntry &al; ///< short for *alep +@@ -155,8 +156,16 @@ protected: + timeval icap_tio_finish; /*time when the last byte of the ICAP responsewas received*/ + + private: +- Comm::ConnOpener::Pointer cs; +- AsyncCall::Pointer securer; ///< whether we are securing a connection ++ /// waits for a transport connection to the ICAP server to be established/opened ++ JobWaitComm::ConnOpener transportWait; ++ ++ /// waits for the established transport connection to be secured/encrypted ++ JobWaitSsl::IcapPeerConnector encryptionWait; ++ ++ /// open and, if necessary, secured connection to the ICAP server (or nil) ++ Comm::ConnectionPointer connection; ++ ++ AsyncCall::Pointer closer; + }; + + } // namespace Icap +diff --git a/src/base/AsyncJob.cc b/src/base/AsyncJob.cc +index 3b9161e4a..111667d5d 100644 +--- a/src/base/AsyncJob.cc ++++ b/src/base/AsyncJob.cc +@@ -20,11 +20,11 @@ + + InstanceIdDefinitions(AsyncJob, "job"); + +-AsyncJob::Pointer AsyncJob::Start(AsyncJob *j) ++void ++AsyncJob::Start(const Pointer &job) + { +- AsyncJob::Pointer job(j); + CallJobHere(93, 5, job, AsyncJob, start); +- return job; ++ job->started_ = true; // it is the attempt that counts + } + + AsyncJob::AsyncJob(const char *aTypeName) : +@@ -38,6 +38,7 @@ AsyncJob::~AsyncJob() + { + debugs(93,5, "AsyncJob destructed, this=" << this << + " type=" << typeName << " [" << id << ']'); ++ assert(!started_ || swanSang_); + } + + void AsyncJob::start() +@@ -141,9 +142,16 @@ void AsyncJob::callEnd() + AsyncCall::Pointer inCallSaved = inCall; + void *thisSaved = this; + ++ // TODO: Swallow swanSong() exceptions to reduce memory leaks. ++ ++ // Job callback invariant: swanSong() is (only) called for started jobs. ++ // Here to detect violations in kids that forgot to call our swanSong(). ++ assert(started_); ++ ++ swanSang_ = true; // it is the attempt that counts + swanSong(); + +- delete this; // this is the only place where the object is deleted ++ delete this; // this is the only place where a started job is deleted + + // careful: this object does not exist any more + debugs(93, 6, HERE << *inCallSaved << " ended " << thisSaved); +diff --git a/src/base/AsyncJob.h b/src/base/AsyncJob.h +index 4d685dff5..a46e30071 100644 +--- a/src/base/AsyncJob.h ++++ b/src/base/AsyncJob.h +@@ -36,8 +36,13 @@ public: + public: + AsyncJob(const char *aTypeName); + +- /// starts a freshly created job (i.e., makes the job asynchronous) +- static Pointer Start(AsyncJob *job); ++ /// Promises to start the configured job (eventually). The job is deemed to ++ /// be running asynchronously beyond this point, so the caller should only ++ /// access the job object via AsyncCalls rather than directly. ++ /// ++ /// swanSong() is only called for jobs for which this method has returned ++ /// successfully (i.e. without throwing). ++ static void Start(const Pointer &job); + + protected: + // XXX: temporary method to replace "delete this" in jobs-in-transition. +@@ -62,6 +67,11 @@ public: + /// called when the job throws during an async call + virtual void callException(const std::exception &e); + ++ /// process external request to terminate now (i.e. during this async call) ++ void handleStopRequest() { mustStop("externally aborted"); } ++ ++ const InstanceId<AsyncJob> id; ///< job identifier ++ + protected: + // external destruction prohibited to ensure swanSong() is called + virtual ~AsyncJob(); +@@ -69,7 +79,9 @@ protected: + const char *stopReason; ///< reason for forcing done() to be true + const char *typeName; ///< kid (leaf) class name, for debugging + AsyncCall::Pointer inCall; ///< the asynchronous call being handled, if any +- const InstanceId<AsyncJob> id; ///< job identifier ++ ++ bool started_ = false; ///< Start() has finished successfully ++ bool swanSang_ = false; ///< swanSong() was called + }; + + #endif /* SQUID_ASYNC_JOB_H */ +diff --git a/src/base/JobWait.cc b/src/base/JobWait.cc +new file mode 100644 +index 000000000..ba6832415 +--- /dev/null ++++ b/src/base/JobWait.cc +@@ -0,0 +1,81 @@ ++/* ++ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors ++ * ++ * Squid software is distributed under GPLv2+ license and includes ++ * contributions from numerous individuals and organizations. ++ * Please see the COPYING and CONTRIBUTORS files for details. ++ */ ++ ++#include "squid.h" ++#include "base/AsyncJobCalls.h" ++#include "base/JobWait.h" ++ ++#include <cassert> ++#include <iostream> ++ ++JobWaitBase::JobWaitBase() = default; ++ ++JobWaitBase::~JobWaitBase() ++{ ++ cancel("owner gone"); ++} ++ ++void ++JobWaitBase::start_(const AsyncJob::Pointer aJob, const AsyncCall::Pointer aCall) ++{ ++ // Invariant: The wait will be over. We cannot guarantee that the job will ++ // call the callback, of course, but we can check these prerequisites. ++ assert(aCall); ++ assert(aJob.valid()); ++ ++ // "Double" waiting state leads to conflicting/mismatching callbacks ++ // detailed in finish(). Detect that bug ASAP. ++ assert(!waiting()); ++ ++ assert(!callback_); ++ assert(!job_); ++ callback_ = aCall; ++ job_ = aJob; ++ ++ AsyncJob::Start(job_.get()); ++} ++ ++void ++JobWaitBase::finish() ++{ ++ // Unexpected callbacks might result in disasters like secrets exposure, ++ // data corruption, or expensive message routing mistakes when the callback ++ // info is applied to the wrong message part or acted upon prematurely. ++ assert(waiting()); ++ clear(); ++} ++ ++void ++JobWaitBase::cancel(const char *reason) ++{ ++ if (callback_) { ++ callback_->cancel(reason); ++ ++ // Instead of AsyncJob, the class parameter could be Job. That would ++ // avoid runtime child-to-parent CbcPointer conversion overheads, but ++ // complicate support for Jobs with virtual AsyncJob bases (GCC error: ++ // "pointer to member conversion via virtual base AsyncJob") and also ++ // cache-log "Job::handleStopRequest()" with a non-existent class name. ++ CallJobHere(callback_->debugSection, callback_->debugLevel, job_, AsyncJob, handleStopRequest); ++ ++ clear(); ++ } ++} ++ ++void ++JobWaitBase::print(std::ostream &os) const ++{ ++ // use a backarrow to emphasize that this is a callback: call24<-job6 ++ if (callback_) ++ os << callback_->id << "<-"; ++ if (const auto rawJob = job_.get()) ++ os << rawJob->id; ++ else ++ os << job_; // raw pointer of a gone job may still be useful for triage ++} ++ +diff --git a/src/base/JobWait.h b/src/base/JobWait.h +new file mode 100644 +index 000000000..6b1131331 +--- /dev/null ++++ b/src/base/JobWait.h +@@ -0,0 +1,91 @@ ++/* ++ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors ++ * ++ * Squid software is distributed under GPLv2+ license and includes ++ * contributions from numerous individuals and organizations. ++ * Please see the COPYING and CONTRIBUTORS files for details. ++ */ ++ ++#ifndef SQUID_BASE_JOBWAIT_H ++#define SQUID_BASE_JOBWAIT_H ++ ++#include "base/AsyncJob.h" ++#include "base/CbcPointer.h" ++ ++#include <iosfwd> ++ ++/// Manages waiting for an AsyncJob callback. Use type-safe JobWait instead. ++/// This base class does not contain code specific to the actual Job type. ++class JobWaitBase ++{ ++public: ++ JobWaitBase(); ++ ~JobWaitBase(); ++ ++ /// no copying of any kind: each waiting context needs a dedicated AsyncCall ++ JobWaitBase(JobWaitBase &&) = delete; ++ ++ explicit operator bool() const { return waiting(); } ++ ++ /// whether we are currently waiting for the job to call us back ++ /// the job itself may be gone even if this returns true ++ bool waiting() const { return bool(callback_); } ++ ++ /// ends wait (after receiving the call back) ++ /// forgets the job which is likely to be gone by now ++ void finish(); ++ ++ /// aborts wait (if any) before receiving the call back ++ /// does nothing if we are not waiting ++ void cancel(const char *reason); ++ ++ /// summarizes what we are waiting for (for debugging) ++ void print(std::ostream &) const; ++ ++protected: ++ /// starts waiting for the given job to call the given callback ++ void start_(AsyncJob::Pointer, AsyncCall::Pointer); ++ ++private: ++ /// the common part of finish() and cancel() ++ void clear() { job_.clear(); callback_ = nullptr; } ++ ++ /// the job that we are waiting to call us back (or nil) ++ AsyncJob::Pointer job_; ++ ++ /// the call we are waiting for the job_ to make (or nil) ++ AsyncCall::Pointer callback_; ++}; ++ ++/// Manages waiting for an AsyncJob callback. ++/// Completes JobWaitBase by providing Job type-specific members. ++template <class Job> ++class JobWait: public JobWaitBase ++{ ++public: ++ typedef CbcPointer<Job> JobPointer; ++ ++ /// starts waiting for the given job to call the given callback ++ void start(const JobPointer &aJob, const AsyncCall::Pointer &aCallback) { ++ start_(aJob, aCallback); ++ typedJob_ = aJob; ++ } ++ ++ /// \returns a cbdata pointer to the job we are waiting for (or nil) ++ /// the returned pointer may be falsy, even if we are still waiting() ++ JobPointer job() const { return waiting() ? typedJob_ : nullptr; } ++ ++private: ++ /// nearly duplicates JobWaitBase::job_ but exposes the actual job type ++ JobPointer typedJob_; ++}; ++ ++inline ++std::ostream &operator <<(std::ostream &os, const JobWaitBase &wait) ++{ ++ wait.print(os); ++ return os; ++} ++ ++#endif /* SQUID_BASE_JOBWAIT_H */ ++ +diff --git a/src/base/Makefile.am b/src/base/Makefile.am +index c9564d755..374ea8798 100644 +--- a/src/base/Makefile.am ++++ b/src/base/Makefile.am +@@ -34,6 +34,8 @@ libbase_la_SOURCES = \ + Here.h \ + InstanceId.cc \ + InstanceId.h \ ++ JobWait.cc \ ++ JobWait.h \ + Lock.h \ + LookupTable.h \ + LruMap.h \ +diff --git a/src/base/forward.h b/src/base/forward.h +index 3803e1ea0..46de97c8d 100644 +--- a/src/base/forward.h ++++ b/src/base/forward.h +@@ -17,6 +17,7 @@ class ScopedId; + + template<class Cbc> class CbcPointer; + template<class RefCountableKid> class RefCount; ++template<class Job> class JobWait; + + typedef CbcPointer<AsyncJob> AsyncJobPointer; + typedef RefCount<CodeContext> CodeContextPointer; +diff --git a/src/client_side.cc b/src/client_side.cc +index b8d786423..4eb697696 100644 +--- a/src/client_side.cc ++++ b/src/client_side.cc +@@ -503,6 +503,10 @@ httpRequestFree(void *data) + /* This is a handler normally called by comm_close() */ + void ConnStateData::connStateClosed(const CommCloseCbParams &) + { ++ if (clientConnection) { ++ clientConnection->noteClosure(); ++ // keep closed clientConnection for logging, clientdb cleanup, etc. ++ } + deleteThis("ConnStateData::connStateClosed"); + } + +diff --git a/src/clients/FtpClient.cc b/src/clients/FtpClient.cc +index 7874db121..afe7827dc 100644 +--- a/src/clients/FtpClient.cc ++++ b/src/clients/FtpClient.cc +@@ -201,10 +201,6 @@ Ftp::Client::Client(FwdState *fwdState): + + Ftp::Client::~Client() + { +- if (data.opener != NULL) { +- data.opener->cancel("Ftp::Client destructed"); +- data.opener = NULL; +- } + data.close(); + + safe_free(old_request); +@@ -786,10 +782,10 @@ Ftp::Client::connectDataChannel() + debugs(9, 3, "connecting to " << conn->remote); + + typedef CommCbMemFunT<Client, CommConnectCbParams> Dialer; +- data.opener = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected); +- Comm::ConnOpener *cs = new Comm::ConnOpener(conn, data.opener, Config.Timeout.connect); ++ AsyncCall::Pointer callback = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected); ++ const auto cs = new Comm::ConnOpener(conn, callback, Config.Timeout.connect); + cs->setHost(data.host); +- AsyncJob::Start(cs); ++ dataConnWait.start(cs, callback); + } + + bool +@@ -811,10 +807,11 @@ void + Ftp::Client::dataClosed(const CommCloseCbParams &) + { + debugs(9, 4, status()); ++ if (data.conn) ++ data.conn->noteClosure(); + if (data.listenConn != NULL) { + data.listenConn->close(); + data.listenConn = NULL; +- // NP clear() does the: data.fd = -1; + } + data.clear(); + } +@@ -879,6 +876,8 @@ void + Ftp::Client::ctrlClosed(const CommCloseCbParams &) + { + debugs(9, 4, status()); ++ if (ctrl.conn) ++ ctrl.conn->noteClosure(); + ctrl.clear(); + doneWithFwd = "ctrlClosed()"; // assume FwdState is monitoring too + mustStop("Ftp::Client::ctrlClosed"); +diff --git a/src/clients/FtpClient.h b/src/clients/FtpClient.h +index ac8d22e18..4b2dd61d5 100644 +--- a/src/clients/FtpClient.h ++++ b/src/clients/FtpClient.h +@@ -64,7 +64,6 @@ public: + */ + Comm::ConnectionPointer listenConn; + +- AsyncCall::Pointer opener; ///< Comm opener handler callback. + private: + AsyncCall::Pointer closer; ///< Comm close handler callback + }; +@@ -205,6 +204,10 @@ protected: + virtual void sentRequestBody(const CommIoCbParams &io); + virtual void doneSendingRequestBody(); + ++ /// Waits for an FTP data connection to the server to be established/opened. ++ /// This wait only happens in FTP passive mode (via PASV or EPSV). ++ JobWaitComm::ConnOpener dataConnWait; ++ + private: + bool parseControlReply(size_t &bytesUsed); + +diff --git a/src/clients/FtpGateway.cc b/src/clients/FtpGateway.cc +index 6a7d139ca..aeeaedb48 100644 +--- a/src/clients/FtpGateway.cc ++++ b/src/clients/FtpGateway.cc +@@ -486,11 +486,9 @@ Ftp::Gateway::timeout(const CommTimeoutCbParams &io) + flags.pasv_supported = false; + debugs(9, DBG_IMPORTANT, "FTP Gateway timeout in SENT_PASV state"); + +- // cancel the data connection setup. +- if (data.opener != NULL) { +- data.opener->cancel("timeout"); +- data.opener = NULL; +- } ++ // cancel the data connection setup, if any ++ dataConnWait.cancel("timeout"); ++ + data.close(); + } + +@@ -1723,7 +1721,7 @@ void + Ftp::Gateway::dataChannelConnected(const CommConnectCbParams &io) + { + debugs(9, 3, HERE); +- data.opener = NULL; ++ dataConnWait.finish(); + + if (io.flag != Comm::OK) { + debugs(9, 2, HERE << "Failed to connect. Retrying via another method."); +@@ -2727,9 +2725,9 @@ Ftp::Gateway::mayReadVirginReplyBody() const + return !doneWithServer(); + } + +-AsyncJob::Pointer ++void + Ftp::StartGateway(FwdState *const fwdState) + { +- return AsyncJob::Start(new Ftp::Gateway(fwdState)); ++ AsyncJob::Start(new Ftp::Gateway(fwdState)); + } + +diff --git a/src/clients/FtpRelay.cc b/src/clients/FtpRelay.cc +index 8796a74c8..ce6ba3ba6 100644 +--- a/src/clients/FtpRelay.cc ++++ b/src/clients/FtpRelay.cc +@@ -739,7 +739,7 @@ void + Ftp::Relay::dataChannelConnected(const CommConnectCbParams &io) + { + debugs(9, 3, status()); +- data.opener = NULL; ++ dataConnWait.finish(); + + if (io.flag != Comm::OK) { + debugs(9, 2, "failed to connect FTP server data channel"); +@@ -804,9 +804,9 @@ Ftp::Relay::HandleStoreAbort(Relay *ftpClient) + ftpClient->dataComplete(); + } + +-AsyncJob::Pointer ++void + Ftp::StartRelay(FwdState *const fwdState) + { +- return AsyncJob::Start(new Ftp::Relay(fwdState)); ++ AsyncJob::Start(new Ftp::Relay(fwdState)); + } + +diff --git a/src/clients/HttpTunneler.cc b/src/clients/HttpTunneler.cc +index 023a8586c..67a52e662 100644 +--- a/src/clients/HttpTunneler.cc ++++ b/src/clients/HttpTunneler.cc +@@ -101,6 +101,11 @@ void + Http::Tunneler::handleConnectionClosure(const CommCloseCbParams ¶ms) + { + closer = nullptr; ++ if (connection) { ++ countFailingConnection(); ++ connection->noteClosure(); ++ connection = nullptr; ++ } + bailWith(new ErrorState(ERR_CONNECT_FAIL, Http::scBadGateway, request.getRaw(), al)); + } + +@@ -360,50 +365,64 @@ Http::Tunneler::bailWith(ErrorState *error) + Must(error); + answer().squidError = error; + +- if (const auto p = connection->getPeer()) +- peerConnectFailed(p); ++ if (const auto failingConnection = connection) { ++ // TODO: Reuse to-peer connections after a CONNECT error response. ++ countFailingConnection(); ++ disconnect(); ++ failingConnection->close(); ++ } + + callBack(); +- disconnect(); +- +- if (noteFwdPconnUse) +- fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses); +- // TODO: Reuse to-peer connections after a CONNECT error response. +- connection->close(); +- connection = nullptr; + } + + void + Http::Tunneler::sendSuccess() + { + assert(answer().positive()); +- callBack(); ++ assert(Comm::IsConnOpen(connection)); ++ answer().conn = connection; + disconnect(); ++ callBack(); ++} ++ ++void ++Http::Tunneler::countFailingConnection() ++{ ++ assert(connection); ++ if (const auto p = connection->getPeer()) ++ peerConnectFailed(p); ++ if (noteFwdPconnUse && connection->isOpen()) ++ fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses); + } + + void + Http::Tunneler::disconnect() + { ++ const auto stillOpen = Comm::IsConnOpen(connection); ++ + if (closer) { +- comm_remove_close_handler(connection->fd, closer); ++ if (stillOpen) ++ comm_remove_close_handler(connection->fd, closer); + closer = nullptr; + } + + if (reader) { +- Comm::ReadCancel(connection->fd, reader); ++ if (stillOpen) ++ Comm::ReadCancel(connection->fd, reader); + reader = nullptr; + } + +- // remove connection timeout handler +- commUnsetConnTimeout(connection); ++ if (stillOpen) ++ commUnsetConnTimeout(connection); ++ ++ connection = nullptr; // may still be open + } + + void + Http::Tunneler::callBack() + { +- debugs(83, 5, connection << status()); +- if (answer().positive()) +- answer().conn = connection; ++ debugs(83, 5, answer().conn << status()); ++ assert(!connection); // returned inside answer() or gone + auto cb = callback; + callback = nullptr; + ScheduleCallHere(cb); +@@ -415,11 +434,10 @@ Http::Tunneler::swanSong() + AsyncJob::swanSong(); + + if (callback) { +- if (requestWritten && tunnelEstablished) { ++ if (requestWritten && tunnelEstablished && Comm::IsConnOpen(connection)) { + sendSuccess(); + } else { +- // we should have bailed when we discovered the job-killing problem +- debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while establishing a CONNECT tunnel " << connection << status()); ++ // job-ending emergencies like handleStopRequest() or callException() + bailWith(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al)); + } + assert(!callback); +diff --git a/src/clients/HttpTunneler.h b/src/clients/HttpTunneler.h +index c4c096fe7..e88d17610 100644 +--- a/src/clients/HttpTunneler.h ++++ b/src/clients/HttpTunneler.h +@@ -87,6 +87,7 @@ protected: + void handleResponse(const bool eof); + void bailOnResponseError(const char *error, HttpReply *); + ++private: + /// sends the given error to the initiator + void bailWith(ErrorState*); + +@@ -96,12 +97,14 @@ protected: + /// a bailWith(), sendSuccess() helper: sends results to the initiator + void callBack(); + +- /// a bailWith(), sendSuccess() helper: stops monitoring the connection ++ /// stops monitoring the connection + void disconnect(); + ++ /// updates connection usage history before the connection is closed ++ void countFailingConnection(); ++ + TunnelerAnswer &answer(); + +-private: + AsyncCall::Pointer writer; ///< called when the request has been written + AsyncCall::Pointer reader; ///< called when the response should be read + AsyncCall::Pointer closer; ///< called when the connection is being closed +diff --git a/src/clients/forward.h b/src/clients/forward.h +index 0351557da..82e5eb9bd 100644 +--- a/src/clients/forward.h ++++ b/src/clients/forward.h +@@ -28,10 +28,10 @@ namespace Ftp + { + + /// A new FTP Gateway job +-AsyncJobPointer StartGateway(FwdState *const fwdState); ++void StartGateway(FwdState *const fwdState); + + /// A new FTP Relay job +-AsyncJobPointer StartRelay(FwdState *const fwdState); ++void StartRelay(FwdState *const fwdState); + + /** Construct an URI with leading / in PATH portion for use by CWD command + * possibly others. FTP encodes absolute paths as beginning with '/' +diff --git a/src/comm.cc b/src/comm.cc +index 54ba16271..a0913f324 100644 +--- a/src/comm.cc ++++ b/src/comm.cc +@@ -743,6 +743,10 @@ commCallCloseHandlers(int fd) + // If call is not canceled schedule it for execution else ignore it + if (!call->canceled()) { + debugs(5, 5, "commCallCloseHandlers: ch->handler=" << call); ++ // XXX: Without the following code, callback fd may be -1. ++ // typedef CommCloseCbParams Params; ++ // auto ¶ms = GetCommParams<Params>(call); ++ // params.fd = fd; + ScheduleCallHere(call); + } + } +@@ -1787,6 +1791,10 @@ DeferredReadManager::CloseHandler(const CommCloseCbParams ¶ms) + CbDataList<DeferredRead> *temp = (CbDataList<DeferredRead> *)params.data; + + temp->element.closer = NULL; ++ if (temp->element.theRead.conn) { ++ temp->element.theRead.conn->noteClosure(); ++ temp->element.theRead.conn = nullptr; ++ } + temp->element.markCancelled(); + } + +@@ -1860,6 +1868,11 @@ DeferredReadManager::kickARead(DeferredRead const &aRead) + if (aRead.cancelled) + return; + ++ // TODO: This check still allows theReader call with a closed theRead.conn. ++ // If a delayRead() caller has a close connection handler, then such a call ++ // would be useless and dangerous. If a delayRead() caller does not have it, ++ // then the caller will get stuck when an external connection closure makes ++ // aRead.cancelled (checked above) true. + if (Comm::IsConnOpen(aRead.theRead.conn) && fd_table[aRead.theRead.conn->fd].closing()) + return; + +diff --git a/src/comm/ConnOpener.cc b/src/comm/ConnOpener.cc +index dc376b778..19c1237ae 100644 +--- a/src/comm/ConnOpener.cc ++++ b/src/comm/ConnOpener.cc +@@ -41,6 +41,14 @@ Comm::ConnOpener::ConnOpener(const Comm::ConnectionPointer &c, const AsyncCall:: + deadline_(squid_curtime + static_cast<time_t>(ctimeout)) + { + debugs(5, 3, "will connect to " << c << " with " << ctimeout << " timeout"); ++ assert(conn_); // we know where to go ++ ++ // Sharing a being-modified Connection object with the caller is dangerous, ++ // but we cannot ban (or even check for) that using existing APIs. We do not ++ // want to clone "just in case" because cloning is a bit expensive, and most ++ // callers already have a non-owned Connection object to give us. Until the ++ // APIs improve, we can only check that the connection is not open. ++ assert(!conn_->isOpen()); + } + + Comm::ConnOpener::~ConnOpener() +@@ -78,6 +86,10 @@ Comm::ConnOpener::swanSong() + if (temporaryFd_ >= 0) + closeFd(); + ++ // did we abort while owning an open connection? ++ if (conn_ && conn_->isOpen()) ++ conn_->close(); ++ + // did we abort while waiting between retries? + if (calls_.sleep_) + cancelSleep(); +@@ -131,9 +143,18 @@ Comm::ConnOpener::sendAnswer(Comm::Flag errFlag, int xerrno, const char *why) + " [" << callback_->id << ']' ); + // TODO save the pconn to the pconnPool ? + } else { ++ assert(conn_); ++ ++ // free resources earlier and simplify recipients ++ if (errFlag != Comm::OK) ++ conn_->close(); // may not be opened ++ else ++ assert(conn_->isOpen()); ++ + typedef CommConnectCbParams Params; + Params ¶ms = GetCommParams<Params>(callback_); + params.conn = conn_; ++ conn_ = nullptr; // release ownership; prevent closure by us + params.flag = errFlag; + params.xerrno = xerrno; + ScheduleCallHere(callback_); +@@ -152,7 +173,7 @@ Comm::ConnOpener::sendAnswer(Comm::Flag errFlag, int xerrno, const char *why) + void + Comm::ConnOpener::cleanFd() + { +- debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_); ++ debugs(5, 4, conn_ << "; temp FD " << temporaryFd_); + + Must(temporaryFd_ >= 0); + fde &f = fd_table[temporaryFd_]; +@@ -258,6 +279,7 @@ bool + Comm::ConnOpener::createFd() + { + Must(temporaryFd_ < 0); ++ assert(conn_); + + // our initators signal abort by cancelling their callbacks + if (callback_ == NULL || callback_->canceled()) +diff --git a/src/comm/ConnOpener.h b/src/comm/ConnOpener.h +index 329d8a3c8..e1e60649d 100644 +--- a/src/comm/ConnOpener.h ++++ b/src/comm/ConnOpener.h +@@ -19,16 +19,13 @@ + namespace Comm + { + +-/** +- * Async-opener of a Comm connection. +- */ ++/// Asynchronously opens a TCP connection. Returns CommConnectCbParams: either ++/// Comm::OK with an open connection or another Comm::Flag with a closed one. + class ConnOpener : public AsyncJob + { + CBDATA_CLASS(ConnOpener); + + public: +- void noteAbort() { mustStop("externally aborted"); } +- + typedef CbcPointer<ConnOpener> Pointer; + + virtual bool doneAll() const; +diff --git a/src/comm/Connection.cc b/src/comm/Connection.cc +index 4e9719a01..9bc08da05 100644 +--- a/src/comm/Connection.cc ++++ b/src/comm/Connection.cc +@@ -7,6 +7,7 @@ + */ + + #include "squid.h" ++#include "base/JobWait.h" + #include "CachePeer.h" + #include "cbdata.h" + #include "comm.h" +@@ -60,26 +61,44 @@ Comm::Connection::~Connection() + } + + Comm::ConnectionPointer +-Comm::Connection::cloneDestinationDetails() const ++Comm::Connection::cloneProfile() const + { +- const ConnectionPointer c = new Comm::Connection; +- c->setAddrs(local, remote); +- c->peerType = peerType; +- c->flags = flags; +- c->peer_ = cbdataReference(getPeer()); +- assert(!c->isOpen()); +- return c; +-} ++ const ConnectionPointer clone = new Comm::Connection; ++ auto &c = *clone; // optimization ++ ++ /* ++ * Copy or excuse each data member. Excused members do not belong to a ++ * Connection configuration profile because their values cannot be reused ++ * across (co-existing) Connection objects and/or are tied to their own ++ * object lifetime. ++ */ ++ ++ c.setAddrs(local, remote); ++ c.peerType = peerType; ++ // fd excused ++ c.tos = tos; ++ c.nfmark = nfmark; ++ c.nfConnmark = nfConnmark; ++ // COMM_ORPHANED is not a part of connection opening instructions ++ c.flags = flags & ~COMM_ORPHANED; ++ // rfc931 is excused ++ ++#if USE_SQUID_EUI ++ // These are currently only set when accepting connections and never used ++ // for establishing new ones, so this copying is currently in vain, but, ++ // technically, they can be a part of connection opening instructions. ++ c.remoteEui48 = remoteEui48; ++ c.remoteEui64 = remoteEui64; ++#endif + +-Comm::ConnectionPointer +-Comm::Connection::cloneIdentDetails() const +-{ +- auto c = cloneDestinationDetails(); +- c->tos = tos; +- c->nfmark = nfmark; +- c->nfConnmark = nfConnmark; +- c->startTime_ = startTime_; +- return c; ++ // id excused ++ c.peer_ = cbdataReference(getPeer()); ++ // startTime_ excused ++ // tlsHistory excused ++ ++ debugs(5, 5, this << " made " << c); ++ assert(!c.isOpen()); ++ return clone; + } + + void +diff --git a/src/comm/Connection.h b/src/comm/Connection.h +index 5b66943ba..40c22491d 100644 +--- a/src/comm/Connection.h ++++ b/src/comm/Connection.h +@@ -77,13 +77,12 @@ public: + /** Clear the connection properties and close any open socket. */ + virtual ~Connection(); + +- /// Create a new (closed) IDENT Connection object based on our from-Squid +- /// connection properties. +- ConnectionPointer cloneIdentDetails() const; ++ /// To prevent accidental copying of Connection objects that we started to ++ /// open or that are open, use cloneProfile() instead. ++ Connection(const Connection &&) = delete; + +- /// Create a new (closed) Connection object pointing to the same destination +- /// as this from-Squid connection. +- ConnectionPointer cloneDestinationDetails() const; ++ /// Create a new closed Connection with the same configuration as this one. ++ ConnectionPointer cloneProfile() const; + + /// close the still-open connection when its last reference is gone + void enterOrphanage() { flags |= COMM_ORPHANED; } +@@ -140,17 +139,6 @@ public: + virtual ScopedId codeContextGist() const override; + virtual std::ostream &detailCodeContext(std::ostream &os) const override; + +-private: +- /** These objects may not be exactly duplicated. Use cloneIdentDetails() or +- * cloneDestinationDetails() instead. +- */ +- Connection(const Connection &c); +- +- /** These objects may not be exactly duplicated. Use cloneIdentDetails() or +- * cloneDestinationDetails() instead. +- */ +- Connection & operator =(const Connection &c); +- + public: + /** Address/Port for the Squid end of a TCP link. */ + Ip::Address local; +diff --git a/src/comm/TcpAcceptor.cc b/src/comm/TcpAcceptor.cc +index 341622e0f..510efa94f 100644 +--- a/src/comm/TcpAcceptor.cc ++++ b/src/comm/TcpAcceptor.cc +@@ -206,7 +206,10 @@ void + Comm::TcpAcceptor::handleClosure(const CommCloseCbParams &) + { + closer_ = NULL; +- conn = NULL; ++ if (conn) { ++ conn->noteClosure(); ++ conn = nullptr; ++ } + Must(done()); + } + +diff --git a/src/dns_internal.cc b/src/dns_internal.cc +index e1f4d3ee7..72078c64e 100644 +--- a/src/dns_internal.cc ++++ b/src/dns_internal.cc +@@ -872,6 +872,10 @@ static void + idnsVCClosed(const CommCloseCbParams ¶ms) + { + nsvc * vc = (nsvc *)params.data; ++ if (vc->conn) { ++ vc->conn->noteClosure(); ++ vc->conn = nullptr; ++ } + delete vc; + } + +diff --git a/src/eui/Eui48.h b/src/eui/Eui48.h +index 11f4e51b1..9aab5bbb7 100644 +--- a/src/eui/Eui48.h ++++ b/src/eui/Eui48.h +@@ -29,10 +29,8 @@ class Eui48 + + public: + Eui48() { clear(); } +- Eui48(const Eui48 &t) { memcpy(this, &t, sizeof(Eui48)); } + bool operator== (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) == 0; } + bool operator< (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) < 0; } +- ~Eui48() {} + + const unsigned char *get(void); + +diff --git a/src/gopher.cc b/src/gopher.cc +index 455220c2e..576a3f7b1 100644 +--- a/src/gopher.cc ++++ b/src/gopher.cc +@@ -98,11 +98,8 @@ public: + entry->lock("gopherState"); + *replybuf = 0; + } +- ~GopherStateData() {if(buf) swanSong();} + +- /* AsyncJob API emulated */ +- void deleteThis(const char *aReason); +- void swanSong(); ++ ~GopherStateData(); + + public: + StoreEntry *entry; +@@ -156,30 +153,18 @@ static void + gopherStateFree(const CommCloseCbParams ¶ms) + { + GopherStateData *gopherState = (GopherStateData *)params.data; +- +- if (gopherState == NULL) +- return; +- +- gopherState->deleteThis("gopherStateFree"); ++ // Assume that FwdState is monitoring and calls noteClosure(). See XXX about ++ // Connection sharing with FwdState in gopherStart(). ++ delete gopherState; + } + +-void +-GopherStateData::deleteThis(const char *) +-{ +- swanSong(); +- delete this; +-} +- +-void +-GopherStateData::swanSong() ++GopherStateData::~GopherStateData() + { + if (entry) + entry->unlock("gopherState"); + +- if (buf) { ++ if (buf) + memFree(buf, MEM_4K_BUF); +- buf = nullptr; +- } + } + + /** +@@ -986,6 +971,7 @@ gopherStart(FwdState * fwd) + return; + } + ++ // XXX: Sharing open Connection with FwdState that has its own handlers/etc. + gopherState->serverConn = fwd->serverConnection(); + gopherSendRequest(fwd->serverConnection()->fd, gopherState); + AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "gopherTimeout", +diff --git a/src/ident/Ident.cc b/src/ident/Ident.cc +index 50dc6baeb..3c3ae5e2f 100644 +--- a/src/ident/Ident.cc ++++ b/src/ident/Ident.cc +@@ -11,6 +11,7 @@ + #include "squid.h" + + #if USE_IDENT ++#include "base/JobWait.h" + #include "comm.h" + #include "comm/Connection.h" + #include "comm/ConnOpener.h" +@@ -53,8 +54,15 @@ public: + + Comm::ConnectionPointer conn; + MemBuf queryMsg; ///< the lookup message sent to IDENT server +- IdentClient *clients; ++ IdentClient *clients = nullptr; + char buf[IDENT_BUFSIZE]; ++ ++ /// waits for a connection to the IDENT server to be established/opened ++ JobWaitComm::ConnOpener connWait; ++ ++private: ++ // use deleteThis() to destroy ++ ~IdentStateData(); + }; + + CBDATA_CLASS_INIT(IdentStateData); +@@ -73,8 +81,9 @@ static void ClientAdd(IdentStateData * state, IDCB * callback, void *callback_da + Ident::IdentConfig Ident::TheConfig; + + void +-Ident::IdentStateData::deleteThis(const char *) ++Ident::IdentStateData::deleteThis(const char *reason) + { ++ debugs(30, 3, reason); + swanSong(); + delete this; + } +@@ -84,6 +93,10 @@ Ident::IdentStateData::swanSong() + { + if (clients != NULL) + notify(NULL); ++} ++ ++Ident::IdentStateData::~IdentStateData() { ++ assert(!clients); + + if (Comm::IsConnOpen(conn)) { + comm_remove_close_handler(conn->fd, Ident::Close, this); +@@ -112,6 +125,10 @@ void + Ident::Close(const CommCloseCbParams ¶ms) + { + IdentStateData *state = (IdentStateData *)params.data; ++ if (state->conn) { ++ state->conn->noteClosure(); ++ state->conn = nullptr; ++ } + state->deleteThis("connection closed"); + } + +@@ -127,6 +144,16 @@ void + Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int, void *data) + { + IdentStateData *state = (IdentStateData *)data; ++ state->connWait.finish(); ++ ++ // Start owning the supplied connection (so that it is not orphaned if this ++ // function bails early). As a (tiny) optimization or perhaps just diff ++ // minimization, the close handler is added later, when we know we are not ++ // bailing. This delay is safe because comm_remove_close_handler() forgives ++ // missing handlers. ++ assert(conn); // but may be closed ++ assert(!state->conn); ++ state->conn = conn; + + if (status != Comm::OK) { + if (status == Comm::TIMEOUT) +@@ -149,8 +176,8 @@ Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int, + return; + } + +- assert(conn != NULL && conn == state->conn); +- comm_add_close_handler(conn->fd, Ident::Close, state); ++ assert(state->conn->isOpen()); ++ comm_add_close_handler(state->conn->fd, Ident::Close, state); + + AsyncCall::Pointer writeCall = commCbCall(5,4, "Ident::WriteFeedback", + CommIoCbPtrFun(Ident::WriteFeedback, state)); +@@ -259,10 +286,10 @@ Ident::Start(const Comm::ConnectionPointer &conn, IDCB * callback, void *data) + state->hash.key = xstrdup(key); + + // copy the conn details. We do not want the original FD to be re-used by IDENT. +- state->conn = conn->cloneIdentDetails(); ++ const auto identConn = conn->cloneProfile(); + // NP: use random port for secure outbound to IDENT_PORT +- state->conn->local.port(0); +- state->conn->remote.port(IDENT_PORT); ++ identConn->local.port(0); ++ identConn->remote.port(IDENT_PORT); + + // build our query from the original connection details + state->queryMsg.init(); +@@ -272,7 +299,8 @@ Ident::Start(const Comm::ConnectionPointer &conn, IDCB * callback, void *data) + hash_join(ident_hash, &state->hash); + + AsyncCall::Pointer call = commCbCall(30,3, "Ident::ConnectDone", CommConnectCbPtrFun(Ident::ConnectDone, state)); +- AsyncJob::Start(new Comm::ConnOpener(state->conn, call, Ident::TheConfig.timeout)); ++ const auto connOpener = new Comm::ConnOpener(identConn, call, Ident::TheConfig.timeout); ++ state->connWait.start(connOpener, call); + } + + void +diff --git a/src/log/TcpLogger.cc b/src/log/TcpLogger.cc +index 2be2f6aea..3be1cc5db 100644 +--- a/src/log/TcpLogger.cc ++++ b/src/log/TcpLogger.cc +@@ -256,13 +256,16 @@ Log::TcpLogger::doConnect() + + typedef CommCbMemFunT<TcpLogger, CommConnectCbParams> Dialer; + AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, this, Log::TcpLogger::connectDone); +- AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2)); ++ const auto cs = new Comm::ConnOpener(futureConn, call, 2); ++ connWait.start(cs, call); + } + + /// Comm::ConnOpener callback + void + Log::TcpLogger::connectDone(const CommConnectCbParams ¶ms) + { ++ connWait.finish(); ++ + if (params.flag != Comm::OK) { + const double delay = 0.5; // seconds + if (connectFailures++ % 100 == 0) { +@@ -367,7 +370,10 @@ Log::TcpLogger::handleClosure(const CommCloseCbParams &) + { + assert(inCall != NULL); + closer = NULL; +- conn = NULL; ++ if (conn) { ++ conn->noteClosure(); ++ conn = nullptr; ++ } + // in all current use cases, we should not try to reconnect + mustStop("Log::TcpLogger::handleClosure"); + } +diff --git a/src/log/TcpLogger.h b/src/log/TcpLogger.h +index 1be2113fe..ec7ca5f7b 100644 +--- a/src/log/TcpLogger.h ++++ b/src/log/TcpLogger.h +@@ -10,6 +10,8 @@ + #define _SQUID_SRC_LOG_TCPLOGGER_H + + #include "base/AsyncJob.h" ++#include "base/JobWait.h" ++#include "comm/forward.h" + #include "ip/Address.h" + + #include <list> +@@ -103,6 +105,9 @@ private: + Ip::Address remote; ///< where the remote logger expects our records + AsyncCall::Pointer closer; ///< handles unexpected/external conn closures + ++ /// waits for a connection to the remote logger to be established/opened ++ JobWaitComm::ConnOpener connWait; ++ + uint64_t connectFailures; ///< number of sequential connection failures + uint64_t drops; ///< number of records dropped during the current outage + }; +diff --git a/src/mgr/Forwarder.cc b/src/mgr/Forwarder.cc +index 8edba1d78..e4da4ca08 100644 +--- a/src/mgr/Forwarder.cc ++++ b/src/mgr/Forwarder.cc +@@ -100,7 +100,11 @@ void + Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &) + { + debugs(16, 5, HERE); +- conn = NULL; // needed? ++ closer = nullptr; ++ if (conn) { ++ conn->noteClosure(); ++ conn = nullptr; ++ } + mustStop("commClosed"); + } + +diff --git a/src/mgr/Inquirer.cc b/src/mgr/Inquirer.cc +index ba41d7aa6..dea0452de 100644 +--- a/src/mgr/Inquirer.cc ++++ b/src/mgr/Inquirer.cc +@@ -107,11 +107,14 @@ Mgr::Inquirer::noteWroteHeader(const CommIoCbParams& params) + + /// called when the HTTP client or some external force closed our socket + void +-Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params) ++Mgr::Inquirer::noteCommClosed(const CommCloseCbParams &) + { + debugs(16, 5, HERE); +- Must(!Comm::IsConnOpen(conn) && params.conn.getRaw() == conn.getRaw()); +- conn = NULL; ++ closer = nullptr; ++ if (conn) { ++ conn->noteClosure(); ++ conn = nullptr; ++ } + mustStop("commClosed"); + } + +diff --git a/src/mgr/StoreToCommWriter.cc b/src/mgr/StoreToCommWriter.cc +index e2cccec2f..d3cf73888 100644 +--- a/src/mgr/StoreToCommWriter.cc ++++ b/src/mgr/StoreToCommWriter.cc +@@ -131,7 +131,11 @@ void + Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams &) + { + debugs(16, 6, HERE); +- Must(!Comm::IsConnOpen(clientConnection)); ++ if (clientConnection) { ++ clientConnection->noteClosure(); ++ clientConnection = nullptr; ++ } ++ closer = nullptr; + mustStop("commClosed"); + } + +diff --git a/src/security/PeerConnector.cc b/src/security/PeerConnector.cc +index 4c1401f6e..494edabb8 100644 +--- a/src/security/PeerConnector.cc ++++ b/src/security/PeerConnector.cc +@@ -89,9 +89,15 @@ Security::PeerConnector::start() + void + Security::PeerConnector::commCloseHandler(const CommCloseCbParams ¶ms) + { ++ debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data); ++ + closeHandler = nullptr; ++ if (serverConn) { ++ countFailingConnection(); ++ serverConn->noteClosure(); ++ serverConn = nullptr; ++ } + +- debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data); + const auto err = new ErrorState(ERR_SECURE_CONNECT_FAIL, Http::scServiceUnavailable, request.getRaw(), al); + static const auto d = MakeNamedErrorDetail("TLS_CONNECT_CLOSE"); + err->detailError(d); +@@ -111,6 +117,8 @@ Security::PeerConnector::commTimeoutHandler(const CommTimeoutCbParams &) + bool + Security::PeerConnector::initialize(Security::SessionPointer &serverSession) + { ++ Must(Comm::IsConnOpen(serverConnection())); ++ + Security::ContextPointer ctx(getTlsContext()); + debugs(83, 5, serverConnection() << ", ctx=" << (void*)ctx.get()); + +@@ -162,6 +170,8 @@ Security::PeerConnector::initialize(Security::SessionPointer &serverSession) + void + Security::PeerConnector::recordNegotiationDetails() + { ++ Must(Comm::IsConnOpen(serverConnection())); ++ + const int fd = serverConnection()->fd; + Security::SessionPointer session(fd_table[fd].ssl); + +@@ -180,6 +190,8 @@ Security::PeerConnector::recordNegotiationDetails() + void + Security::PeerConnector::negotiate() + { ++ Must(Comm::IsConnOpen(serverConnection())); ++ + const int fd = serverConnection()->fd; + if (fd_table[fd].closing()) + return; +@@ -224,7 +236,7 @@ Security::PeerConnector::handleNegotiationResult(const Security::IoResult &resul + switch (result.category) { + case Security::IoResult::ioSuccess: + recordNegotiationDetails(); +- if (sslFinalized()) ++ if (sslFinalized() && callback) + sendSuccess(); + return; // we may be gone by now + +@@ -252,6 +264,7 @@ Security::PeerConnector::sslFinalized() + { + #if USE_OPENSSL + if (Ssl::TheConfig.ssl_crt_validator && useCertValidator_) { ++ Must(Comm::IsConnOpen(serverConnection())); + const int fd = serverConnection()->fd; + Security::SessionPointer session(fd_table[fd].ssl); + +@@ -295,6 +308,7 @@ void + Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointer validationResponse) + { + Must(validationResponse != NULL); ++ Must(Comm::IsConnOpen(serverConnection())); + + ErrorDetail::Pointer errDetails; + bool validatorFailed = false; +@@ -317,7 +331,8 @@ Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointe + + if (!errDetails && !validatorFailed) { + noteNegotiationDone(NULL); +- sendSuccess(); ++ if (callback) ++ sendSuccess(); + return; + } + +@@ -343,6 +358,8 @@ Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointe + Security::CertErrors * + Security::PeerConnector::sslCrtvdCheckForErrors(Ssl::CertValidationResponse const &resp, ErrorDetail::Pointer &errDetails) + { ++ Must(Comm::IsConnOpen(serverConnection())); ++ + ACLFilledChecklist *check = NULL; + Security::SessionPointer session(fd_table[serverConnection()->fd].ssl); + +@@ -418,9 +435,11 @@ Security::PeerConnector::negotiateSsl() + void + Security::PeerConnector::noteWantRead() + { +- const int fd = serverConnection()->fd; + debugs(83, 5, serverConnection()); + ++ Must(Comm::IsConnOpen(serverConnection())); ++ const int fd = serverConnection()->fd; ++ + // read timeout to avoid getting stuck while reading from a silent server + typedef CommCbMemFunT<Security::PeerConnector, CommTimeoutCbParams> TimeoutDialer; + AsyncCall::Pointer timeoutCall = JobCallback(83, 5, +@@ -434,8 +453,10 @@ Security::PeerConnector::noteWantRead() + void + Security::PeerConnector::noteWantWrite() + { +- const int fd = serverConnection()->fd; + debugs(83, 5, serverConnection()); ++ Must(Comm::IsConnOpen(serverConnection())); ++ ++ const int fd = serverConnection()->fd; + Comm::SetSelect(fd, COMM_SELECT_WRITE, &NegotiateSsl, new Pointer(this), 0); + return; + } +@@ -452,57 +473,76 @@ Security::PeerConnector::noteNegotiationError(const Security::ErrorDetailPointer + bail(anErr); + } + ++Security::EncryptorAnswer & ++Security::PeerConnector::answer() ++{ ++ assert(callback); ++ const auto dialer = dynamic_cast<CbDialer*>(callback->getDialer()); ++ assert(dialer); ++ return dialer->answer(); ++} ++ + void + Security::PeerConnector::bail(ErrorState *error) + { + Must(error); // or the recepient will not know there was a problem +- Must(callback != NULL); +- CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer()); +- Must(dialer); +- dialer->answer().error = error; ++ answer().error = error; + +- if (const auto p = serverConnection()->getPeer()) +- peerConnectFailed(p); ++ if (const auto failingConnection = serverConn) { ++ countFailingConnection(); ++ disconnect(); ++ failingConnection->close(); ++ } + + callBack(); +- disconnect(); +- +- if (noteFwdPconnUse) +- fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses); +- serverConn->close(); +- serverConn = nullptr; + } + + void + Security::PeerConnector::sendSuccess() + { +- callBack(); ++ assert(Comm::IsConnOpen(serverConn)); ++ answer().conn = serverConn; + disconnect(); ++ callBack(); ++} ++ ++void ++Security::PeerConnector::countFailingConnection() ++{ ++ assert(serverConn); ++ if (const auto p = serverConn->getPeer()) ++ peerConnectFailed(p); ++ // TODO: Calling PconnPool::noteUses() should not be our responsibility. ++ if (noteFwdPconnUse && serverConn->isOpen()) ++ fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses); + } + + void + Security::PeerConnector::disconnect() + { ++ const auto stillOpen = Comm::IsConnOpen(serverConn); ++ + if (closeHandler) { +- comm_remove_close_handler(serverConnection()->fd, closeHandler); ++ if (stillOpen) ++ comm_remove_close_handler(serverConn->fd, closeHandler); + closeHandler = nullptr; + } + +- commUnsetConnTimeout(serverConnection()); ++ if (stillOpen) ++ commUnsetConnTimeout(serverConn); ++ ++ serverConn = nullptr; + } + + void + Security::PeerConnector::callBack() + { +- debugs(83, 5, "TLS setup ended for " << serverConnection()); ++ debugs(83, 5, "TLS setup ended for " << answer().conn); + + AsyncCall::Pointer cb = callback; + // Do this now so that if we throw below, swanSong() assert that we _tried_ + // to call back holds. + callback = NULL; // this should make done() true +- CbDialer *dialer = dynamic_cast<CbDialer*>(cb->getDialer()); +- Must(dialer); +- dialer->answer().conn = serverConnection(); + ScheduleCallHere(cb); + } + +@@ -511,8 +551,9 @@ Security::PeerConnector::swanSong() + { + // XXX: unregister fd-closure monitoring and CommSetSelect interest, if any + AsyncJob::swanSong(); +- if (callback != NULL) { // paranoid: we have left the caller waiting +- debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while connecting to a cache_peer or origin server"); ++ ++ if (callback) { ++ // job-ending emergencies like handleStopRequest() or callException() + const auto anErr = new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al); + bail(anErr); + assert(!callback); +@@ -533,7 +574,7 @@ Security::PeerConnector::status() const + buf.append("Stopped, reason:", 16); + buf.appendf("%s",stopReason); + } +- if (serverConn != NULL) ++ if (Comm::IsConnOpen(serverConn)) + buf.appendf(" FD %d", serverConn->fd); + buf.appendf(" %s%u]", id.prefix(), id.value); + buf.terminate(); +@@ -581,15 +622,18 @@ Security::PeerConnector::startCertDownloading(SBuf &url) + PeerConnectorCertDownloaderDialer(&Security::PeerConnector::certDownloadingDone, this)); + + const auto dl = new Downloader(url, certCallback, XactionInitiator::initCertFetcher, certDownloadNestingLevel() + 1); +- AsyncJob::Start(dl); ++ certDownloadWait.start(dl, certCallback); + } + + void + Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus) + { ++ certDownloadWait.finish(); ++ + ++certsDownloads; + debugs(81, 5, "Certificate downloading status: " << downloadStatus << " certificate size: " << obj.length()); + ++ Must(Comm::IsConnOpen(serverConnection())); + const auto &sconn = *fd_table[serverConnection()->fd].ssl; + + // Parse Certificate. Assume that it is in DER format. +@@ -642,6 +686,7 @@ Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus) + void + Security::PeerConnector::handleMissingCertificates(const Security::IoResult &ioResult) + { ++ Must(Comm::IsConnOpen(serverConnection())); + auto &sconn = *fd_table[serverConnection()->fd].ssl; + + // We download the missing certificate(s) once. We would prefer to clear +diff --git a/src/security/PeerConnector.h b/src/security/PeerConnector.h +index 942f8627a..10700d796 100644 +--- a/src/security/PeerConnector.h ++++ b/src/security/PeerConnector.h +@@ -12,6 +12,7 @@ + #include "acl/Acl.h" + #include "base/AsyncCbdataCalls.h" + #include "base/AsyncJob.h" ++#include "base/JobWait.h" + #include "CommCalls.h" + #include "http/forward.h" + #include "security/EncryptorAnswer.h" +@@ -24,6 +25,7 @@ + #include <queue> + + class ErrorState; ++class Downloader; + class AccessLogEntry; + typedef RefCount<AccessLogEntry> AccessLogEntryPointer; + +@@ -152,6 +154,9 @@ protected: + /// a bail(), sendSuccess() helper: stops monitoring the connection + void disconnect(); + ++ /// updates connection usage history before the connection is closed ++ void countFailingConnection(); ++ + /// If called the certificates validator will not used + void bypassCertValidator() {useCertValidator_ = false;} + +@@ -159,6 +164,9 @@ protected: + /// logging + void recordNegotiationDetails(); + ++ /// convenience method to get to the answer fields ++ EncryptorAnswer &answer(); ++ + HttpRequestPointer request; ///< peer connection trigger or cause + Comm::ConnectionPointer serverConn; ///< TCP connection to the peer + AccessLogEntryPointer al; ///< info for the future access.log entry +@@ -203,6 +211,8 @@ private: + + /// outcome of the last (failed and) suspended negotiation attempt (or nil) + Security::IoResultPointer suspendedError_; ++ ++ JobWait<Downloader> certDownloadWait; ///< waits for the missing certificate to be downloaded + }; + + } // namespace Security +diff --git a/src/security/forward.h b/src/security/forward.h +index dce66e397..26225aae0 100644 +--- a/src/security/forward.h ++++ b/src/security/forward.h +@@ -172,6 +172,7 @@ class ParsedOptions {}; // we never parse/use TLS options in this case + typedef long ParsedPortFlags; + + class PeerConnector; ++class BlindPeerConnector; + class PeerOptions; + + #if USE_OPENSSL +diff --git a/src/servers/FtpServer.cc b/src/servers/FtpServer.cc +index 53f822d2f..e0a10b6b5 100644 +--- a/src/servers/FtpServer.cc ++++ b/src/servers/FtpServer.cc +@@ -61,7 +61,7 @@ Ftp::Server::Server(const MasterXaction::Pointer &xact): + dataConn(), + uploadAvailSize(0), + listener(), +- connector(), ++ dataConnWait(), + reader(), + waitingForOrigin(false), + originDataDownloadAbortedOnError(false) +@@ -1676,11 +1676,11 @@ Ftp::Server::checkDataConnPre() + + // active transfer: open a data connection from Squid to client + typedef CommCbMemFunT<Server, CommConnectCbParams> Dialer; +- connector = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData); +- Comm::ConnOpener *cs = new Comm::ConnOpener(dataConn, connector, +- Config.Timeout.connect); +- AsyncJob::Start(cs); +- return false; // ConnStateData::processFtpRequest waits handleConnectDone ++ AsyncCall::Pointer callback = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData); ++ const auto cs = new Comm::ConnOpener(dataConn->cloneProfile(), callback, ++ Config.Timeout.connect); ++ dataConnWait.start(cs, callback); ++ return false; + } + + /// Check that client data connection is ready for immediate I/O. +@@ -1698,18 +1698,22 @@ Ftp::Server::checkDataConnPost() const + void + Ftp::Server::connectedForData(const CommConnectCbParams ¶ms) + { +- connector = NULL; ++ dataConnWait.finish(); + + if (params.flag != Comm::OK) { +- /* it might have been a timeout with a partially open link */ +- if (params.conn != NULL) +- params.conn->close(); + setReply(425, "Cannot open data connection."); + Http::StreamPointer context = pipeline.front(); + Must(context->http); + Must(context->http->storeEntry() != NULL); ++ // TODO: call closeDataConnection() to reset data conn processing? + } else { +- Must(dataConn == params.conn); ++ // Finalize the details and start owning the supplied connection. ++ assert(params.conn); ++ assert(dataConn); ++ assert(!dataConn->isOpen()); ++ dataConn = params.conn; ++ // XXX: Missing comm_add_close_handler() to track external closures. ++ + Must(Comm::IsConnOpen(params.conn)); + fd_note(params.conn->fd, "active client ftp data"); + } +diff --git a/src/servers/FtpServer.h b/src/servers/FtpServer.h +index fb9ef9081..d67799827 100644 +--- a/src/servers/FtpServer.h ++++ b/src/servers/FtpServer.h +@@ -11,8 +11,10 @@ + #ifndef SQUID_SERVERS_FTP_SERVER_H + #define SQUID_SERVERS_FTP_SERVER_H + ++#include "base/JobWait.h" + #include "base/Lock.h" + #include "client_side.h" ++#include "comm/forward.h" + + namespace Ftp + { +@@ -188,7 +190,11 @@ private: + size_t uploadAvailSize; ///< number of yet unused uploadBuf bytes + + AsyncCall::Pointer listener; ///< set when we are passively listening +- AsyncCall::Pointer connector; ///< set when we are actively connecting ++ ++ /// Waits for an FTP data connection to the client to be established/opened. ++ /// This wait only happens in FTP active mode (via PORT or EPRT). ++ JobWaitComm::ConnOpener dataConnWait; ++ + AsyncCall::Pointer reader; ///< set when we are reading FTP data + + /// whether we wait for the origin data transfer to end +diff --git a/src/snmp/Forwarder.cc b/src/snmp/Forwarder.cc +index 836eb5772..259bb0441 100644 +--- a/src/snmp/Forwarder.cc ++++ b/src/snmp/Forwarder.cc +@@ -53,6 +53,7 @@ Snmp::Forwarder::noteCommClosed(const CommCloseCbParams& params) + { + debugs(49, 5, HERE); + Must(fd == params.fd); ++ closer = nullptr; + fd = -1; + mustStop("commClosed"); + } +@@ -68,8 +69,7 @@ void + Snmp::Forwarder::handleException(const std::exception& e) + { + debugs(49, 3, HERE << e.what()); +- if (fd >= 0) +- sendError(SNMP_ERR_GENERR); ++ sendError(SNMP_ERR_GENERR); + Ipc::Forwarder::handleException(e); + } + +@@ -78,6 +78,10 @@ void + Snmp::Forwarder::sendError(int error) + { + debugs(49, 3, HERE); ++ ++ if (fd < 0) ++ return; // client gone ++ + Snmp::Request& req = static_castSnmp::Request&(*request); + req.pdu.command = SNMP_PDU_RESPONSE; + req.pdu.errstat = error; +diff --git a/src/snmp/Inquirer.cc b/src/snmp/Inquirer.cc +index 9b6e34405..82f8c4475 100644 +--- a/src/snmp/Inquirer.cc ++++ b/src/snmp/Inquirer.cc +@@ -88,7 +88,11 @@ Snmp::Inquirer::noteCommClosed(const CommCloseCbParams& params) + { + debugs(49, 5, HERE); + Must(!Comm::IsConnOpen(conn) || conn->fd == params.conn->fd); +- conn = NULL; ++ closer = nullptr; ++ if (conn) { ++ conn->noteClosure(); ++ conn = nullptr; ++ } + mustStop("commClosed"); + } + +@@ -102,6 +106,10 @@ void + Snmp::Inquirer::sendResponse() + { + debugs(49, 5, HERE); ++ ++ if (!Comm::IsConnOpen(conn)) ++ return; // client gone ++ + aggrPdu.fixAggregate(); + aggrPdu.command = SNMP_PDU_RESPONSE; + u_char buffer[SNMP_REQUEST_SIZE]; +diff --git a/src/ssl/PeekingPeerConnector.cc b/src/ssl/PeekingPeerConnector.cc +index 9a4055355..89e45435b 100644 +--- a/src/ssl/PeekingPeerConnector.cc ++++ b/src/ssl/PeekingPeerConnector.cc +@@ -27,18 +27,18 @@ CBDATA_NAMESPACED_CLASS_INIT(Ssl, PeekingPeerConnector); + void switchToTunnel(HttpRequest *request, const Comm::ConnectionPointer &clientConn, const Comm::ConnectionPointer &srvConn, const SBuf &preReadServerData); + + void +-Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data) ++Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(const Acl::Answer aclAnswer, void *data) + { + Ssl::PeekingPeerConnector *peerConnect = (Ssl::PeekingPeerConnector *) data; + // Use job calls to add done() checks and other job logic/protections. +- CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, answer); ++ CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, aclAnswer); + } + + void +-Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(Acl::Answer answer) ++Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(const Acl::Answer aclAnswer) + { +- const Ssl::BumpMode finalAction = answer.allowed() ? +- static_castSsl::BumpMode(answer.kind): ++ const Ssl::BumpMode finalAction = aclAnswer.allowed() ? ++ static_castSsl::BumpMode(aclAnswer.kind): + checkForPeekAndSpliceGuess(); + checkForPeekAndSpliceMatched(finalAction); + } +@@ -106,10 +106,8 @@ Ssl::PeekingPeerConnector::checkForPeekAndSpliceMatched(const Ssl::BumpMode acti + splice = true; + // Ssl Negotiation stops here. Last SSL checks for valid certificates + // and if done, switch to tunnel mode +- if (sslFinalized()) { +- debugs(83,5, "Abort NegotiateSSL on FD " << serverConn->fd << " and splice the connection"); ++ if (sslFinalized() && callback) + callBack(); +- } + } + } + +@@ -272,8 +270,11 @@ Ssl::PeekingPeerConnector::startTunneling() + auto b = SSL_get_rbio(session.get()); + auto srvBio = static_castSsl::ServerBio*(BIO_get_data(b)); + ++ debugs(83, 5, "will tunnel instead of negotiating TLS"); + switchToTunnel(request.getRaw(), clientConn, serverConn, srvBio->rBufData()); +- tunnelInsteadOfNegotiating(); ++ answer().tunneled = true; ++ disconnect(); ++ callBack(); + } + + void +@@ -397,13 +398,3 @@ Ssl::PeekingPeerConnector::serverCertificateVerified() + } + } + +-void +-Ssl::PeekingPeerConnector::tunnelInsteadOfNegotiating() +-{ +- Must(callback != NULL); +- CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer()); +- Must(dialer); +- dialer->answer().tunneled = true; +- debugs(83, 5, "The SSL negotiation with server aborted"); +-} +- +diff --git a/src/ssl/PeekingPeerConnector.h b/src/ssl/PeekingPeerConnector.h +index 3c86b887d..0145e77f9 100644 +--- a/src/ssl/PeekingPeerConnector.h ++++ b/src/ssl/PeekingPeerConnector.h +@@ -51,7 +51,7 @@ public: + void checkForPeekAndSplice(); + + /// Callback function for ssl_bump acl check in step3 SSL bump step. +- void checkForPeekAndSpliceDone(Acl::Answer answer); ++ void checkForPeekAndSpliceDone(Acl::Answer); + + /// Handles the final bumping decision. + void checkForPeekAndSpliceMatched(const Ssl::BumpMode finalMode); +@@ -67,7 +67,7 @@ public: + void startTunneling(); + + /// A wrapper function for checkForPeekAndSpliceDone for use with acl +- static void cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data); ++ static void cbCheckForPeekAndSpliceDone(Acl::Answer, void *data); + + private: + +diff --git a/src/tests/stub_libcomm.cc b/src/tests/stub_libcomm.cc +index 45aa6deb2..8fdb40bbd 100644 +--- a/src/tests/stub_libcomm.cc ++++ b/src/tests/stub_libcomm.cc +@@ -22,9 +22,9 @@ void Comm::AcceptLimiter::kick() STUB + #include "comm/Connection.h" + Comm::Connection::Connection() STUB + Comm::Connection::~Connection() STUB +-Comm::ConnectionPointer Comm::Connection::cloneIdentDetails() const STUB_RETVAL(nullptr) +-Comm::ConnectionPointer Comm::Connection::cloneDestinationDetails() const STUB_RETVAL(nullptr) ++Comm::ConnectionPointer Comm::Connection::cloneProfile() const STUB_RETVAL(nullptr) + void Comm::Connection::close() STUB ++void Comm::Connection::noteClosure() STUB + CachePeer * Comm::Connection::getPeer() const STUB_RETVAL(NULL) + void Comm::Connection::setPeer(CachePeer * p) STUB + ScopedId Comm::Connection::codeContextGist() const STUB_RETVAL(id.detach()) +diff --git a/src/tests/stub_libsecurity.cc b/src/tests/stub_libsecurity.cc +index aa9cf0b36..176cf337e 100644 +--- a/src/tests/stub_libsecurity.cc ++++ b/src/tests/stub_libsecurity.cc +@@ -9,6 +9,7 @@ + #include "squid.h" + #include "AccessLogEntry.h" + #include "comm/Connection.h" ++#include "Downloader.h" + #include "HttpRequest.h" + + #define STUB_API "security/libsecurity.la" +@@ -88,7 +89,9 @@ void PeerConnector::bail(ErrorState *) STUB + void PeerConnector::sendSuccess() STUB + void PeerConnector::callBack() STUB + void PeerConnector::disconnect() STUB ++void PeerConnector::countFailingConnection() STUB + void PeerConnector::recordNegotiationDetails() STUB ++EncryptorAnswer &PeerConnector::answer() STUB_RETREF(EncryptorAnswer) + } + + #include "security/PeerOptions.h" +diff --git a/src/tunnel.cc b/src/tunnel.cc +index 386a0ecd6..4fc5abdef 100644 +--- a/src/tunnel.cc ++++ b/src/tunnel.cc +@@ -11,6 +11,7 @@ + #include "squid.h" + #include "acl/FilledChecklist.h" + #include "base/CbcPointer.h" ++#include "base/JobWait.h" + #include "CachePeer.h" + #include "cbdata.h" + #include "client_side.h" +@@ -180,9 +181,6 @@ public: + SBuf preReadClientData; + SBuf preReadServerData; + time_t startTime; ///< object creation time, before any peer selection/connection attempts +- /// Whether we are waiting for the CONNECT request/response exchange with the peer. +- bool waitingForConnectExchange; +- HappyConnOpenerPointer connOpener; ///< current connection opening job + ResolvedPeersPointer destinations; ///< paths for forwarding the request + bool destinationsFound; ///< At least one candidate path found + /// whether another destination may be still attempted if the TCP connection +@@ -191,10 +189,15 @@ public: + // TODO: remove after fixing deferred reads in TunnelStateData::copyRead() + CodeContext::Pointer codeContext; ///< our creator context + +- // AsyncCalls which we set and may need cancelling. +- struct { +- AsyncCall::Pointer connector; ///< a call linking us to the ConnOpener producing serverConn. +- } calls; ++ /// waits for a transport connection to the peer to be established/opened ++ JobWait<HappyConnOpener> transportWait; ++ ++ /// waits for the established transport connection to be secured/encrypted ++ JobWaitSecurity::PeerConnector encryptionWait; ++ ++ /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated ++ /// over the (encrypted, if needed) transport connection to that cache_peer ++ JobWaitHttp::Tunneler peerWait; + + void copyRead(Connection &from, IOCB *completion); + +@@ -212,12 +215,6 @@ public: + /// when all candidate destinations have been tried and all have failed + void noteConnection(HappyConnOpenerAnswer &); + +- /// whether we are waiting for HappyConnOpener +- /// same as calls.connector but may differ from connOpener.valid() +- bool opening() const { return connOpener.set(); } +- +- void cancelOpening(const char *reason); +- + /// Start using an established connection + void connectDone(const Comm::ConnectionPointer &conn, const char *origin, const bool reused); + +@@ -266,6 +263,9 @@ private: + + /// \returns whether the request should be retried (nil) or the description why it should not + const char *checkRetry(); ++ /// whether the successfully selected path destination or the established ++ /// server connection is still in use ++ bool usingDestination() const; + + /// details of the "last tunneling attempt" failure (if it failed) + ErrorState *savedError = nullptr; +@@ -275,6 +275,8 @@ private: + + void deleteThis(); + ++ void cancelStep(const char *reason); ++ + public: + bool keepGoingAfterRead(size_t len, Comm::Flag errcode, int xerrno, Connection &from, Connection &to); + void copy(size_t len, Connection &from, Connection &to, IOCB *); +@@ -357,7 +359,6 @@ TunnelStateData::deleteThis() + + TunnelStateData::TunnelStateData(ClientHttpRequest *clientRequest) : + startTime(squid_curtime), +- waitingForConnectExchange(false), + destinations(new ResolvedPeers()), + destinationsFound(false), + retriable(true), +@@ -390,8 +391,7 @@ TunnelStateData::~TunnelStateData() + debugs(26, 3, "TunnelStateData destructed this=" << this); + assert(noConnections()); + xfree(url); +- if (opening()) +- cancelOpening("~TunnelStateData"); ++ cancelStep("~TunnelStateData"); + delete savedError; + } + +@@ -904,7 +904,10 @@ TunnelStateData::copyServerBytes() + static void + tunnelStartShoveling(TunnelStateData *tunnelState) + { +- assert(!tunnelState->waitingForConnectExchange); ++ assert(!tunnelState->transportWait); ++ assert(!tunnelState->encryptionWait); ++ assert(!tunnelState->peerWait); ++ + assert(tunnelState->server.conn); + AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", + CommTimeoutCbPtrFun(tunnelTimeout, tunnelState)); +@@ -962,6 +965,7 @@ tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *, size_t len + void + TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer) + { ++ peerWait.finish(); + server.len = 0; + + if (logTag_ptr) +@@ -970,13 +974,11 @@ TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer) + if (answer.peerResponseStatus != Http::scNone) + *status_ptr = answer.peerResponseStatus; + +- waitingForConnectExchange = false; +- + auto sawProblem = false; + + if (!answer.positive()) { + sawProblem = true; +- Must(!Comm::IsConnOpen(answer.conn)); ++ assert(!answer.conn); + } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) { + sawProblem = true; + closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone"); +@@ -1042,8 +1044,7 @@ tunnelErrorComplete(int fd/*const Comm::ConnectionPointer &*/, void *data, size_ + void + TunnelStateData::noteConnection(HappyConnOpener::Answer &answer) + { +- calls.connector = nullptr; +- connOpener.clear(); ++ transportWait.finish(); + + ErrorState *error = nullptr; + if ((error = answer.error.get())) { +@@ -1167,7 +1168,7 @@ TunnelStateData::secureConnectionToPeer(const Comm::ConnectionPointer &conn) + AsyncCall::Pointer callback = asyncCall(5,4, "TunnelStateData::noteSecurityPeerConnectorAnswer", + MyAnswerDialer(&TunnelStateData::noteSecurityPeerConnectorAnswer, this)); + const auto connector = new Security::BlindPeerConnector(request, conn, callback, al); +- AsyncJob::Start(connector); // will call our callback ++ encryptionWait.start(connector, callback); + } + + /// starts a preparation step for an established connection; retries on failures +@@ -1194,9 +1195,12 @@ TunnelStateData::advanceDestination(const char *stepDescription, const Comm::Con + void + TunnelStateData::noteSecurityPeerConnectorAnswer(Security::EncryptorAnswer &answer) + { ++ encryptionWait.finish(); ++ + ErrorState *error = nullptr; ++ assert(!answer.tunneled); + if ((error = answer.error.get())) { +- Must(!Comm::IsConnOpen(answer.conn)); ++ assert(!answer.conn); + answer.error.clear(); + } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) { + error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request.getRaw(), al); +@@ -1223,8 +1227,6 @@ TunnelStateData::connectedToPeer(const Comm::ConnectionPointer &conn) + void + TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn) + { +- assert(!waitingForConnectExchange); +- + AsyncCall::Pointer callback = asyncCall(5,4, + "TunnelStateData::tunnelEstablishmentDone", + Http::Tunneler::CbDialer<TunnelStateData>(&TunnelStateData::tunnelEstablishmentDone, this)); +@@ -1232,9 +1234,7 @@ TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn) + #if USE_DELAY_POOLS + tunneler->setDelayId(server.delayId); + #endif +- AsyncJob::Start(tunneler); +- waitingForConnectExchange = true; +- // and wait for the tunnelEstablishmentDone() call ++ peerWait.start(tunneler, callback); + } + + void +@@ -1252,14 +1252,14 @@ TunnelStateData::noteDestination(Comm::ConnectionPointer path) + + destinations->addPath(path); + +- if (Comm::IsConnOpen(server.conn)) { ++ if (usingDestination()) { + // We are already using a previously opened connection but also + // receiving destinations in case we need to re-forward. +- Must(!opening()); ++ Must(!transportWait); + return; + } + +- if (opening()) { ++ if (transportWait) { + notifyConnOpener(); + return; // and continue to wait for tunnelConnectDone() callback + } +@@ -1289,17 +1289,23 @@ TunnelStateData::noteDestinationsEnd(ErrorState *selectionError) + // if all of them fail, tunneling as whole will fail + Must(!selectionError); // finding at least one path means selection succeeded + +- if (Comm::IsConnOpen(server.conn)) { ++ if (usingDestination()) { + // We are already using a previously opened connection but also + // receiving destinations in case we need to re-forward. +- Must(!opening()); ++ Must(!transportWait); + return; + } + +- Must(opening()); // or we would be stuck with nothing to do or wait for ++ Must(transportWait); // or we would be stuck with nothing to do or wait for + notifyConnOpener(); + } + ++bool ++TunnelStateData::usingDestination() const ++{ ++ return encryptionWait || peerWait || Comm::IsConnOpen(server.conn); ++} ++ + /// remembers an error to be used if there will be no more connection attempts + void + TunnelStateData::saveError(ErrorState *error) +@@ -1320,8 +1326,7 @@ TunnelStateData::sendError(ErrorState *finalError, const char *reason) + if (request) + request->hier.stopPeerClock(false); + +- if (opening()) +- cancelOpening(reason); ++ cancelStep(reason); + + assert(finalError); + +@@ -1339,18 +1344,15 @@ TunnelStateData::sendError(ErrorState *finalError, const char *reason) + errorSend(client.conn, finalError); + } + +-/// Notify connOpener that we no longer need connections. We do not have to do +-/// this -- connOpener would eventually notice on its own, but notifying reduces +-/// waste and speeds up spare connection opening for other transactions (that +-/// could otherwise wait for this transaction to use its spare allowance). ++/// Notify a pending subtask, if any, that we no longer need its help. We do not ++/// have to do this -- the subtask job will eventually end -- but ending it ++/// earlier reduces waste and may reduce DoS attack surface. + void +-TunnelStateData::cancelOpening(const char *reason) ++TunnelStateData::cancelStep(const char *reason) + { +- assert(calls.connector); +- calls.connector->cancel(reason); +- calls.connector = nullptr; +- notifyConnOpener(); +- connOpener.clear(); ++ transportWait.cancel(reason); ++ encryptionWait.cancel(reason); ++ peerWait.cancel(reason); + } + + void +@@ -1360,15 +1362,14 @@ TunnelStateData::startConnecting() + request->hier.startPeerClock(); + + assert(!destinations->empty()); +- assert(!opening()); +- calls.connector = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this)); +- const auto cs = new HappyConnOpener(destinations, calls.connector, request, startTime, 0, al); ++ assert(!usingDestination()); ++ AsyncCall::Pointer callback = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this)); ++ const auto cs = new HappyConnOpener(destinations, callback, request, startTime, 0, al); + cs->setHost(request->url.host()); + cs->setRetriable(false); + cs->allowPersistent(false); + destinations->notificationPending = true; // start() is async +- connOpener = cs; +- AsyncJob::Start(cs); ++ transportWait.start(cs, callback); + } + + /// send request on an existing connection dedicated to the requesting client +@@ -1417,7 +1418,7 @@ TunnelStateData::Connection::setDelayId(DelayId const &newDelay) + + #endif + +-/// makes sure connOpener knows that destinations have changed ++/// makes sure connection opener knows that the destinations have changed + void + TunnelStateData::notifyConnOpener() + { +@@ -1425,7 +1426,7 @@ TunnelStateData::notifyConnOpener() + debugs(17, 7, "reusing pending notification"); + } else { + destinations->notificationPending = true; +- CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange); ++ CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange); + } + } + +diff --git a/src/whois.cc b/src/whois.cc +index 38dfbacde..306ea5c62 100644 +--- a/src/whois.cc ++++ b/src/whois.cc +@@ -182,6 +182,7 @@ whoisClose(const CommCloseCbParams ¶ms) + { + WhoisState *p = (WhoisState *)params.data; + debugs(75, 3, "whoisClose: FD " << params.fd); ++ // We do not own a Connection. Assume that FwdState is also monitoring. + p->entry->unlock("whoisClose"); + delete p; + }
Reviewed-by: Peter Müller peter.mueller@ipfire.org
Please note: One day after squid 5.4 was released, this extensive patch followed which I publish separately. It seems to be important - perhaps we should consider taking this one in account.
Building and testing was ok - so far no problems or crashes occurred. Running normal.
Signed-off-by: Matthias Fischer matthias.fischer@ipfire.org
lfs/squid | 1 + ...estinationsEnd_exception_opening_967.patch | 3825 +++++++++++++++++ 2 files changed, 3826 insertions(+) create mode 100644 src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch
diff --git a/lfs/squid b/lfs/squid index 034f4a3e9..6762a0218 100644 --- a/lfs/squid +++ b/lfs/squid @@ -78,6 +78,7 @@ $(TARGET) : $(patsubst %,$(DIR_DL)/%,$(objects)) @rm -rf $(DIR_APP) && cd $(DIR_SRC) && tar xaf $(DIR_DL)/$(DL_FILE)
cd $(DIR_APP) && patch -Np1 < $(DIR_SRC)/src/patches/squid/01_squid-gcc11.patch
cd $(DIR_APP) && patch -Np1 < $(DIR_SRC)/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch
cd $(DIR_APP) && autoreconf -vfi cd $(DIR_APP)/libltdl && autoreconf -vfi
diff --git a/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch b/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch new file mode 100644 index 000000000..9f592c7e8 --- /dev/null +++ b/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch @@ -0,0 +1,3825 @@ +commit 1332f8d606485b5a2f57277634c2f6f7855bd9a6 (refs/remotes/origin/v5, refs/remotes/github/v5, refs/heads/v5) +Author: Alex Rousskov rousskov@measurement-factory.com +Date: 2022-02-08 03:56:43 -0500
- Bug 5055: FATAL FwdState::noteDestinationsEnd exception: opening (#967)
- Bug 5055: FATAL FwdState::noteDestinationsEnd exception: opening
- The bug was caused by commit 25b0ce4. Other known symptoms are:
assertion failed: store.cc:1793: "isEmpty()"
assertion failed: FwdState.cc:501: "serverConnection() == conn"
assertion failed: FwdState.cc:1037: "!opening()"
- This change has several overlapping parts. Unfortunately, merging
- individual parts is both difficult and likely to cause crashes.
- ## Part 1: Bug 5055.
- FwdState used to check serverConn to decide whether to open a connection
- to forward the request. Since commit 25b0ce4, a nil serverConn pointer
- no longer implies that a new connection should be opened: FwdState
- helper jobs may already be working on preparing an existing open
- connection (e.g., sending a CONNECT request or negotiating encryption).
- Bad serverConn checks in both FwdState::noteDestination() and
- FwdState::noteDestinationsEnd() methods led to extra connectStart()
- calls creating two conflicting concurrent helper jobs.
- To fix this, we replaced direct serverConn inspection with a
- usingDestination() call which also checks whether we are waiting for a
- helper job. Testing that fix exposed another set of bugs: The helper job
- pointers or in-job connections left stale/set after forwarding failures.
- The changes described below addressed (most of) those problems.
- ## Part 2: Connection establishing helper jobs and their callbacks
- A proper fix for Bug 5055 required answering a difficult question: When
- should a dying job call its callbacks? We only found one answer which
- required cooperation from the job creator and led to the following
- rules:
- AsyncJob destructors must not call any callbacks.
- AsyncJob::swanSong() is responsible for async-calling any remaining
(i.e. set, uncalled, and uncancelled) callbacks.
- AsyncJob::swanSong() is called (only) for started jobs.
- AsyncJob destructing sequence should validate that no callbacks remain
uncalled for started jobs.
- ... where an AsyncJob x is considered "started" if AsyncJob::Start(x)
- has returned without throwing.
- A new JobWait class helps job creators follow these rules while keeping
- track on in-progress helper jobs and killing no-longer-needed helpers.
- Also fixed very similar bugs in tunnel.cc code.
- ## Part 3: ConnOpener fixes
- Many ConnOpener users are written to keep a ConnectionPointer to the
destination given to ConnOpener. This means that their connection
magically opens when ConnOpener successfully connects, before
ConnOpener has a chance to notify the user about the changes. Having
multiple concurrent connection owners is always dangerous, and the
user cannot even have a close handler registered for its now-open
connection. When something happens to ConnOpener or its answer, the
user job may be in trouble. Now, ConnOpener callers no longer pass
Connection objects they own, cloning them as needed. That adjustment
required adjustment 2:
- Refactored ConnOpener users to stop assuming that the answer contains
a pointer to their connection object. After adjustment 1 above, it
does not. HappyConnOpener relied on that assumption quite a bit so we
had to refactor to use two custom callback methods instead of one
with a complicated if-statement distinguishing prime from spare
attempts. This refactoring is an overall improvement because it
simplifies the code. Other ConnOpener users just needed to remove a
few no longer valid paranoid assertions/Musts.
- ConnOpener users were forced to remember to close params.conn when
processing negative answers. Some, naturally, forgot, triggering
warnings about orphaned connections (e.g., Ident and TcpLogger).
ConnOpener now closes its open connection before sending a negative
answer.
- ConnOpener would trigger orphan connection warnings if the job ended
after opening the connection but without supplying the connection to
the requestor (e.g., because the requestor has gone away). Now,
ConnOpener explicitly closes its open connection if it has not been
sent to the requestor.
- Also fixed Comm::ConnOpener::cleanFd() debugging that was incorrectly
- saying that the method closes the temporary descriptor.
- Also fixed ConnOpener callback's syncWithComm(): The stale
- CommConnectCbParams override was testing unused (i.e. always negative)
- CommConnectCbParams::fd and was trying to cancel the callback that most
- (possibly all) recipients rely on: ConnOpener users expect a negative
- answer rather than no answer at all.
- Also, after comparing the needs of two old/existing and a temporary
- added ("clone everything") Connection cloning method callers, we decided
- there is no need to maintain three different methods. All existing
- callers should be fine with a single method because none of them suffers
- from "extra" copying of members that others need. Right now, the new
- cloneProfile() method copies everything except FD and a few
- special-purpose members (with documented reasons for not copying).
- Also added Comm::Connection::cloneDestinationDetails() debugging to
- simplify tracking dependencies between half-baked Connection objects
- carrying destination/flags/other metadata and open Connection objects
- created by ConnOpener using that metadata (which are later delivered to
- ConnOpener users and, in some cases, replace those half-baked
- connections mentioned earlier. Long-term, we need to find a better way
- to express these and other Connection states/stages than comments and
- debugging messages.
- ## Part 4: Comm::Connection closure callbacks
- We improved many closure callbacks to make sure (to the extent possible)
- that Connection and other objects are in sync with Comm. There are lots
- of small bugs, inconsistencies, and other problems in Connection closure
- handlers. It is not clear whether any of those problems could result in
- serious runtime errors or leaks. In theory, the rest of the code could
- neutralize their negative side effects. However, even in that case, it
- was just a matter of time before the next bug will bite us due to stale
- Connection::fd and such. These changes themselves carry elevated risk,
- but they get us closer to reliable code as far as Connection maintenance
- is concerned; otherwise, we will keep chasing their deadly side effects.
- Long-term, all these manual efforts to keep things in sync should become
- unnecessary with the introduction of appropriate Connection ownership
- APIs that automatically maintain the corresponding environments (TODO).
- ## Part 5: Other notable improvements in the adjusted code
- Improved Security::PeerConnector::serverConn and
- Http::Tunneler::connection management, especially when sending negative
- answers. When sending a negative answer, we would set answer().conn to
- an open connection, async-send that answer, and then hurry to close the
- connection using our pointer to the shared Connection object. If
- everything went according to the plan, the recipient would get a non-nil
- but closed Connection object. Now, a negative answer simply means no
- connection at all. Same for a tunneled answer.
- Refactored ICAP connection-establishing code to to delay Connection
- ownership until the ICAP connection is fully ready. This change
- addresses primary Connection ownership concerns (as they apply to this
- ICAP code) except orphaning of the temporary Connection object by helper
- job start exceptions (now an explicit XXX). For example, the transaction
- no longer shares a Connection object with ConnOpener and
- IcapPeerConnector jobs.
- Probably fixed a bug where PeerConnector::negotiate() assumed that a
- sslFinalized() does not return true after callBack(). It may (e.g., when
- CertValidationHelper::Submit() throws). Same for
- PeekingPeerConnector::checkForPeekAndSpliceMatched().
- Fixed FwdState::advanceDestination() bug that did not save
- ERR_GATEWAY_FAILURE details and "lost" the address of that failed
- destination, making it unavailable to future retries (if any).
- Polished PeerPoolMgr, Ident, and Gopher code to be able to fix similar
- job callback and connection management issues.
- Polished AsyncJob::Start() API. Start() used to return a job pointer,
- but that was a bad idea:
- It implies that a failed Start() will return a nil pointer, and that
the caller should check the result. Neither is true.
- It encourages callers to dereference the returned pointer to further
adjust the job. That technically works (today) but violates the rules
of communicating with an async job. The Start() method is the boundary
after which the job is deemed asynchronous.
- Also removed old "and wait for..." post-Start() comments because the
- code itself became clear enough, and those comments were becoming
- increasingly stale (because they duplicated the callback names above
- them).
- Fix Tunneler and PeerConnector handling of last-resort callback
- requirements. Events like handleStopRequest() and callException() stop
- the job but should not be reported as a BUG (e.g., it would be up to the
- callException() to decide how to report the caught exception). There
- might (or there will) be other, similar cases where the job is stopped
- prematurely for some non-BUG reason beyond swanSong() knowledge. The
- existence of non-bug cases does not mean there could be no bugs worth
- reporting here, but until they can be identified more reliably than all
- these benign/irrelevant cases, reporting no BUGs is a (much) lesser
- evil.
- TODO: Revise AsyncJob::doneAll(). Many of its overrides are written to
- check for both positive (i.e. mission accomplished) and negative (i.e.
- mission cancelled or cannot be accomplished) conditions, but the latter
- is usually unnecessary, especially after we added handleStopRequest()
- API to properly support external job cancellation events. Many doneAll()
- overrides can probably be greatly simplified.
- Cherry picked SQUID-568-premature-serverconn-use-v5 commit 22b5f78.
- fixup: Cherry-picked SQUID-568-premature-serverconn-use PR-time fixes
- In git log order:
- e64a6c1: Undone an out-of-scope change and added a missing 'auto'
- aeaf83d: Fixed an 'unused parameter' error
- f49d009: fixup: No explicit destructors with implicit copying methods
- c30c37f: Removed an unnecessary explicit copy constructor
- 012f5ec: Excluded Connection::rfc931 from cloning
- 366c78a: ICAP: do not set connect_timeout on the established conn...
- This branch is now in sync with SQUID-568-premature-serverconn-use (S)
- commit e64a6c1 (except for official changes merged from master/v6 into S
- closer to the end of PR 877 work (i.e. S' merge commit 0a7432a).
- Fix FATAL ServiceRep::putConnection exception: theBusyConns > 0
FATAL: check failed: theBusyConns > 0
exception location: ServiceRep.cc(163) putConnection
- Since master/v6 commit 2b6b1bc, a timeout on a ready-to-shovel
- Squid-service ICAP connection was decrementing theBusyConns level one
- extra time because Adaptation::Icap::Xaction::noteCommTimedout() started
- calling both noteConnectionFailed() and closeConnection(). Depending on
- the actual theBusyConns level, the extra decrement could result in FATAL
- errors later, when putConnection() was called (for a different ICAP
- transaction) with zero theBusyConns in an exception-unprotected context.
- Throughout these changes, Xaction still counts the above timeouts as a
- service failure. That is done by calling ServiceRep::noteFailure() from
- Xaction::callException(), including in timeout cases described above.
- Cherry-picked master/v6 commit a8ac892.
+diff --git a/src/BodyPipe.cc b/src/BodyPipe.cc +index 9abe7d48b..37a3b32cd 100644 +--- a/src/BodyPipe.cc ++++ b/src/BodyPipe.cc +@@ -335,6 +335,7 @@ BodyPipe::startAutoConsumptionIfNeeded()
return;
theConsumer = new BodySink(this);
++ AsyncJob::Start(theConsumer);
debugs(91,7, HERE << "starting auto consumption" << status());
scheduleBodyDataNotification();
- }
+diff --git a/src/CommCalls.cc b/src/CommCalls.cc +index 4cd503dcb..cca23f6e3 100644 +--- a/src/CommCalls.cc ++++ b/src/CommCalls.cc +@@ -53,6 +53,9 @@ CommAcceptCbParams::CommAcceptCbParams(void *aData):
- {
- }
++// XXX: Add CommAcceptCbParams::syncWithComm(). Adjust syncWithComm() API if all ++// implementations always return true. ++
- void
- CommAcceptCbParams::print(std::ostream &os) const
- {
+@@ -72,13 +75,24 @@ CommConnectCbParams::CommConnectCbParams(void *aData):
- bool
- CommConnectCbParams::syncWithComm()
- {
+- // drop the call if the call was scheduled before comm_close but +- // is being fired after comm_close +- if (fd >= 0 && fd_table[fd].closing()) { +- debugs(5, 3, HERE << "dropping late connect call: FD " << fd); +- return false; ++ assert(conn); ++ ++ // change parameters if this is a successful callback that was scheduled ++ // after its Comm-registered connection started to close ++ ++ if (flag != Comm::OK) { ++ assert(!conn->isOpen()); ++ return true; // not a successful callback; cannot go out of sync
}
+- return true; // now we are in sync and can handle the call ++ ++ assert(conn->isOpen()); ++ if (!fd_table[conn->fd].closing()) ++ return true; // no closing in progress; in sync (for now) ++ ++ debugs(5, 3, "converting to Comm::ERR_CLOSING: " << conn); ++ conn->noteClosure(); ++ flag = Comm::ERR_CLOSING; ++ return true; // now the callback is in sync with Comm again
- }
- /* CommIoCbParams */
+diff --git a/src/Downloader.cc b/src/Downloader.cc +index 298c3a836..012387fb8 100644 +--- a/src/Downloader.cc ++++ b/src/Downloader.cc +@@ -81,6 +81,10 @@ void
- Downloader::swanSong()
- {
debugs(33, 6, this);
++ ++ if (callback_) // job-ending emergencies like handleStopRequest() or callException() ++ callBack(Http::scInternalServerError); ++
if (context_) {
context_->finished();
context_ = nullptr;
+@@ -251,6 +255,7 @@ Downloader::downloadFinished()
- void
- Downloader::callBack(Http::StatusCode const statusCode)
- {
++ assert(callback_);
CbDialer *dialer = dynamic_cast<CbDialer*>(callback_->getDialer());
Must(dialer);
dialer->status = statusCode;
+diff --git a/src/FwdState.cc b/src/FwdState.cc +index b721017c3..b69e60c7c 100644 +--- a/src/FwdState.cc ++++ b/src/FwdState.cc +@@ -207,26 +207,22 @@ FwdState::stopAndDestroy(const char *reason)
- {
debugs(17, 3, "for " << reason);
+- if (opening()) +- cancelOpening(reason); ++ cancelStep(reason);
PeerSelectionInitiator::subscribed = false; // may already be false
self = nullptr; // we hope refcounting destroys us soon; may already be nil
/* do not place any code here as this object may be gone by now */
- }
+-/// Notify connOpener that we no longer need connections. We do not have to do +-/// this -- connOpener would eventually notice on its own, but notifying reduces +-/// waste and speeds up spare connection opening for other transactions (that +-/// could otherwise wait for this transaction to use its spare allowance). ++/// Notify a pending subtask, if any, that we no longer need its help. We do not ++/// have to do this -- the subtask job will eventually end -- but ending it ++/// earlier reduces waste and may reduce DoS attack surface.
- void
+-FwdState::cancelOpening(const char *reason) ++FwdState::cancelStep(const char *reason)
- {
+- assert(calls.connector); +- calls.connector->cancel(reason); +- calls.connector = nullptr; +- notifyConnOpener(); +- connOpener.clear(); ++ transportWait.cancel(reason); ++ encryptionWait.cancel(reason); ++ peerWait.cancel(reason);
- }
- #if STRICT_ORIGINAL_DST
+@@ -348,8 +344,7 @@ FwdState::~FwdState()
entry = NULL;
+- if (opening()) +- cancelOpening("~FwdState"); ++ cancelStep("~FwdState");
if (Comm::IsConnOpen(serverConn))
closeServerConnection("~FwdState");
+@@ -501,8 +496,17 @@ FwdState::fail(ErrorState * errorState)
if (!errorState->request)
errorState->request = request;
+- if (err->type != ERR_ZERO_SIZE_OBJECT) +- return; ++ if (err->type == ERR_ZERO_SIZE_OBJECT) ++ reactToZeroSizeObject(); ++ ++ destinationReceipt = nullptr; // may already be nil ++} ++ ++/// ERR_ZERO_SIZE_OBJECT requires special adjustments ++void ++FwdState::reactToZeroSizeObject() ++{ ++ assert(err->type == ERR_ZERO_SIZE_OBJECT);
if (pconnRace == racePossible) {
debugs(17, 5, HERE << "pconn race happened");
+@@ -566,6 +570,8 @@ FwdState::complete()
if (Comm::IsConnOpen(serverConn))
unregister(serverConn);
++ serverConn = nullptr; ++ destinationReceipt = nullptr;
storedWholeReply_ = nullptr;
entry->reset();
+@@ -584,6 +590,12 @@ FwdState::complete()
}
- }
++bool ++FwdState::usingDestination() const ++{ ++ return encryptionWait || peerWait || Comm::IsConnOpen(serverConn); ++} ++
- void
- FwdState::markStoredReplyAsWhole(const char * const whyWeAreSure)
- {
+@@ -613,19 +625,19 @@ FwdState::noteDestination(Comm::ConnectionPointer path)
destinations->addPath(path);
+- if (Comm::IsConnOpen(serverConn)) { ++ if (usingDestination()) {
// We are already using a previously opened connection, so we cannot be
+- // waiting for connOpener. We still receive destinations for backup. +- Must(!opening()); ++ // waiting for it. We still receive destinations for backup. ++ Must(!transportWait);
return;
}
+- if (opening()) { ++ if (transportWait) {
notifyConnOpener();
return; // and continue to wait for FwdState::noteConnection() callback
}
+- // This is the first path candidate we have seen. Create connOpener. ++ // This is the first path candidate we have seen. Use it.
useDestinations();
- }
+@@ -653,19 +665,19 @@ FwdState::noteDestinationsEnd(ErrorState *selectionError)
// if all of them fail, forwarding as whole will fail
Must(!selectionError); // finding at least one path means selection succeeded
+- if (Comm::IsConnOpen(serverConn)) { ++ if (usingDestination()) {
// We are already using a previously opened connection, so we cannot be
+- // waiting for connOpener. We were receiving destinations for backup. +- Must(!opening()); ++ // waiting for it. We were receiving destinations for backup. ++ Must(!transportWait);
return;
}
+- Must(opening()); // or we would be stuck with nothing to do or wait for ++ Must(transportWait); // or we would be stuck with nothing to do or wait for
notifyConnOpener();
// and continue to wait for FwdState::noteConnection() callback
- }
+-/// makes sure connOpener knows that destinations have changed ++/// makes sure connection opener knows that the destinations have changed
- void
- FwdState::notifyConnOpener()
- {
+@@ -674,7 +686,7 @@ FwdState::notifyConnOpener()
} else {
debugs(17, 7, "notifying about " << *destinations);
destinations->notificationPending = true;
+- CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange); ++ CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange);
}
- }
+@@ -684,7 +696,7 @@ static void
- fwdServerClosedWrapper(const CommCloseCbParams ¶ms)
- {
FwdState *fwd = (FwdState *)params.data;
+- fwd->serverClosed(params.fd); ++ fwd->serverClosed();
- }
- /**** PRIVATE *****************************************************************/
+@@ -754,13 +766,23 @@ FwdState::checkRetriable()
- }
- void
+-FwdState::serverClosed(int fd) ++FwdState::serverClosed()
- {
+- // XXX: fd is often -1 here +- debugs(17, 2, "FD " << fd << " " << entry->url() << " after " << +- (fd >= 0 ? fd_table[fd].pconn.uses : -1) << " requests"); +- if (fd >= 0 && serverConnection()->fd == fd) +- fwdPconnPool->noteUses(fd_table[fd].pconn.uses); ++ // XXX: This method logic attempts to tolerate Connection::close() called ++ // for serverConn earlier, by one of our dispatch()ed jobs. If that happens, ++ // serverConn will already be closed here or, worse, it will already be open ++ // for the next forwarding attempt. The current code prevents us getting ++ // stuck, but the long term solution is to stop sharing serverConn. ++ debugs(17, 2, serverConn); ++ if (Comm::IsConnOpen(serverConn)) { ++ const auto uses = fd_table[serverConn->fd].pconn.uses; ++ debugs(17, 3, "prior uses: " << uses); ++ fwdPconnPool->noteUses(uses); // XXX: May not have come from fwdPconnPool ++ serverConn->noteClosure(); ++ } ++ serverConn = nullptr; ++ closeHandler = nullptr; ++ destinationReceipt = nullptr;
retryOrBail();
- }
+@@ -802,6 +824,8 @@ FwdState::handleUnregisteredServerEnd()
- {
debugs(17, 2, HERE << "self=" << self << " err=" << err << ' ' << entry->url());
assert(!Comm::IsConnOpen(serverConn));
++ serverConn = nullptr; ++ destinationReceipt = nullptr;
retryOrBail();
- }
+@@ -819,6 +843,8 @@ FwdState::advanceDestination(const char *stepDescription, const Comm::Connection
} catch (...) {
debugs (17, 2, "exception while trying to " << stepDescription << ": " << CurrentException);
closePendingConnection(conn, "connection preparation exception");
++ if (!err) ++ fail(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request, al));
retryOrBail();
}
- }
+@@ -830,8 +856,7 @@ FwdState::noteConnection(HappyConnOpener::Answer &answer)
- {
assert(!destinationReceipt);
+- calls.connector = nullptr; +- connOpener.clear(); ++ transportWait.finish();
Must(n_tries <= answer.n_tries); // n_tries cannot decrease
n_tries = answer.n_tries;
+@@ -843,6 +868,8 @@ FwdState::noteConnection(HappyConnOpener::Answer &answer)
Must(!Comm::IsConnOpen(answer.conn));
answer.error.clear(); // preserve error for errorSendComplete()
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
++ // The socket could get closed while our callback was queued. Sync ++ // Connection. XXX: Connection::fd may already be stale/invalid here.
// We do not know exactly why the connection got closed, so we play it
// safe, allowing retries only for persistent (reused) connections
if (answer.reused) {
+@@ -912,23 +939,24 @@ FwdState::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
if (!conn->getPeer()->options.no_delay)
tunneler->setDelayId(entry->mem_obj->mostBytesAllowed());
- #endif
+- AsyncJob::Start(tunneler); +- // and wait for the tunnelEstablishmentDone() call ++ peerWait.start(tunneler, callback);
- }
- /// resumes operations after the (possibly failed) HTTP CONNECT exchange
- void
- FwdState::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
- {
++ peerWait.finish(); ++
ErrorState *error = nullptr;
if (!answer.positive()) {
+- Must(!Comm::IsConnOpen(answer.conn)); ++ Must(!answer.conn);
error = answer.squidError.get();
Must(error);
answer.squidError.clear(); // preserve error for fail()
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
+- // The socket could get closed while our callback was queued. +- // We close Connection here to sync Connection::fd. ++ // The socket could get closed while our callback was queued. Sync ++ // Connection. XXX: Connection::fd may already be stale/invalid here.
closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone");
error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al);
} else if (!answer.leftovers.isEmpty()) {
+@@ -998,18 +1026,21 @@ FwdState::secureConnectionToPeer(const Comm::ConnectionPointer &conn)
- #endif
connector = new Security::BlindPeerConnector(requestPointer, conn, callback, al, sslNegotiationTimeout);
connector->noteFwdPconnUse = true;
+- AsyncJob::Start(connector); // will call our callback ++ encryptionWait.start(connector, callback);
- }
- /// called when all negotiations with the TLS-speaking peer have been completed
- void
- FwdState::connectedToPeer(Security::EncryptorAnswer &answer)
- {
++ encryptionWait.finish(); ++
ErrorState *error = nullptr;
if ((error = answer.error.get())) {
+- Must(!Comm::IsConnOpen(answer.conn)); ++ assert(!answer.conn);
answer.error.clear(); // preserve error for errorSendComplete()
} else if (answer.tunneled) {
++ assert(!answer.conn);
// TODO: When ConnStateData establishes tunnels, its state changes
// [in ways that may affect logging?]. Consider informing
// ConnStateData about our tunnel or otherwise unifying tunnel
+@@ -1019,6 +1050,8 @@ FwdState::connectedToPeer(Security::EncryptorAnswer &answer)
complete(); // destroys us
return;
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
++ // The socket could get closed while our callback was queued. Sync ++ // Connection. XXX: Connection::fd may already be stale/invalid here.
closePendingConnection(answer.conn, "conn was closed while waiting for connectedToPeer");
error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al);
}
+@@ -1091,22 +1124,20 @@ FwdState::connectStart()
Must(!request->pinnedConnection());
assert(!destinations->empty());
+- assert(!opening()); ++ assert(!usingDestination());
// Ditch error page if it was created before.
// A new one will be created if there's another problem
delete err;
err = nullptr;
request->clearError();
+- serverConn = nullptr; +- destinationReceipt = nullptr;
request->hier.startPeerClock();
+- calls.connector = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this)); ++ AsyncCall::Pointer callback = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this));
HttpRequest::Pointer cause = request;
+- const auto cs = new HappyConnOpener(destinations, calls.connector, cause, start_t, n_tries, al); ++ const auto cs = new HappyConnOpener(destinations, callback, cause, start_t, n_tries, al);
cs->setHost(request->url.host());
bool retriable = checkRetriable();
if (!retriable && Config.accessList.serverPconnForNonretriable) {
+@@ -1118,8 +1149,7 @@ FwdState::connectStart()
cs->setRetriable(retriable);
cs->allowPersistent(pconnRace != raceHappened);
destinations->notificationPending = true; // start() is async
+- connOpener = cs; +- AsyncJob::Start(cs); ++ transportWait.start(cs, callback);
- }
- /// send request on an existing connection dedicated to the requesting client
+diff --git a/src/FwdState.h b/src/FwdState.h +index de75f33cc..ebf5f82b1 100644 +--- a/src/FwdState.h ++++ b/src/FwdState.h +@@ -9,13 +9,12 @@
- #ifndef SQUID_FORWARD_H
- #define SQUID_FORWARD_H
+-#include "base/CbcPointer.h"
- #include "base/forward.h"
++#include "base/JobWait.h"
- #include "base/RefCount.h"
- #include "clients/forward.h"
- #include "comm.h"
- #include "comm/Connection.h"
+-#include "comm/ConnOpener.h"
- #include "error/forward.h"
- #include "fde.h"
- #include "http/StatusCode.h"
+@@ -38,7 +37,6 @@ class ResolvedPeers;
- typedef RefCount<ResolvedPeers> ResolvedPeersPointer;
- class HappyConnOpener;
+-typedef CbcPointer<HappyConnOpener> HappyConnOpenerPointer;
- class HappyConnOpenerAnswer;
- /// Sets initial TOS value and Netfilter for the future outgoing connection.
+@@ -87,7 +85,7 @@ public:
void handleUnregisteredServerEnd();
int reforward();
bool reforwardableStatus(const Http::StatusCode s) const;
+- void serverClosed(int fd); ++ void serverClosed();
void connectStart();
void connectDone(const Comm::ConnectionPointer & conn, Comm::Flag status, int xerrno);
bool checkRetry();
+@@ -115,6 +113,9 @@ private:
/* PeerSelectionInitiator API */
virtual void noteDestination(Comm::ConnectionPointer conn) override;
virtual void noteDestinationsEnd(ErrorState *selectionError) override;
++ /// whether the successfully selected path destination or the established ++ /// server connection is still in use ++ bool usingDestination() const;
void noteConnection(HappyConnOpenerAnswer &);
+@@ -157,13 +158,10 @@ private:
/// \returns the time left for this connection to become connected or 1 second if it is less than one second left
time_t connectingTimeout(const Comm::ConnectionPointer &conn) const;
+- /// whether we are waiting for HappyConnOpener +- /// same as calls.connector but may differ from connOpener.valid() +- bool opening() const { return connOpener.set(); } +- +- void cancelOpening(const char *reason); ++ void cancelStep(const char *reason);
void notifyConnOpener();
++ void reactToZeroSizeObject();
void updateAleWithFinalError();
+@@ -182,11 +180,6 @@ private:
time_t start_t;
int n_tries; ///< the number of forwarding attempts so far
+- // AsyncCalls which we set and may need cancelling. +- struct { +- AsyncCall::Pointer connector; ///< a call linking us to the ConnOpener producing serverConn. +- } calls; +-
struct {
bool connected_okay; ///< TCP link ever opened properly. This affects retry of POST,PUT,CONNECT,etc
bool dont_retry;
+@@ -194,7 +187,16 @@ private:
bool destinationsFound; ///< at least one candidate path found
} flags;
+- HappyConnOpenerPointer connOpener; ///< current connection opening job ++ /// waits for a transport connection to the peer to be established/opened ++ JobWait<HappyConnOpener> transportWait; ++ ++ /// waits for the established transport connection to be secured/encrypted ++ JobWaitSecurity::PeerConnector encryptionWait; ++ ++ /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated ++ /// over the (encrypted, if needed) transport connection to that cache_peer ++ JobWaitHttp::Tunneler peerWait; ++
ResolvedPeersPointer destinations; ///< paths for forwarding the request
Comm::ConnectionPointer serverConn; ///< a successfully opened connection to a server.
PeerConnectionPointer destinationReceipt; ///< peer selection result (or nil)
+diff --git a/src/HappyConnOpener.cc b/src/HappyConnOpener.cc +index 6e0872506..6d83ff14f 100644 +--- a/src/HappyConnOpener.cc ++++ b/src/HappyConnOpener.cc +@@ -18,6 +18,7 @@
- #include "neighbors.h"
- #include "pconn.h"
- #include "PeerPoolMgr.h"
++#include "sbuf/Stream.h"
- #include "SquidConfig.h"
- CBDATA_CLASS_INIT(HappyConnOpener);
+@@ -330,6 +331,8 @@ HappyConnOpener::HappyConnOpener(const ResolvedPeers::Pointer &dests, const Asyn
fwdStart(aFwdStart),
callback_(aCall),
destinations(dests),
++ prime(&HappyConnOpener::notePrimeConnectDone, "HappyConnOpener::notePrimeConnectDone"), ++ spare(&HappyConnOpener::noteSpareConnectDone, "HappyConnOpener::noteSpareConnectDone"),
ale(anAle),
cause(request),
n_tries(tries)
+@@ -410,33 +413,43 @@ HappyConnOpener::swanSong()
AsyncJob::swanSong();
- }
++/// HappyConnOpener::Attempt printer for debugging ++std::ostream & ++operator <<(std::ostream &os, const HappyConnOpener::Attempt &attempt) ++{ ++ if (!attempt.path) ++ os << '-'; ++ else if (attempt.path->isOpen()) ++ os << "FD " << attempt.path->fd; ++ else if (attempt.connWait) ++ os << attempt.connWait; ++ else // destination is known; connection closed (and we are not opening any) ++ os << attempt.path->id; ++ return os; ++} ++
- const char *
- HappyConnOpener::status() const
- {
+- static MemBuf buf; +- buf.reset(); ++ // TODO: In a redesigned status() API, the caller may mimic this approach. ++ static SBuf buf; ++ buf.clear();
+- buf.append(" [", 2); ++ SBufStream os(buf); ++ ++ os.write(" [", 2);
if (stopReason)
+- buf.appendf("Stopped, reason:%s", stopReason); +- if (prime) { +- if (prime.path && prime.path->isOpen()) +- buf.appendf(" prime FD %d", prime.path->fd); +- else if (prime.connector) +- buf.appendf(" prime call%ud", prime.connector->id.value); +- } +- if (spare) { +- if (spare.path && spare.path->isOpen()) +- buf.appendf(" spare FD %d", spare.path->fd); +- else if (spare.connector) +- buf.appendf(" spare call%ud", spare.connector->id.value); +- } ++ os << "Stopped:" << stopReason; ++ if (prime) ++ os << "prime:" << prime; ++ if (spare) ++ os << "spare:" << spare;
if (n_tries)
+- buf.appendf(" tries %d", n_tries); +- buf.appendf(" %s%u]", id.prefix(), id.value); +- buf.terminate(); ++ os << " tries:" << n_tries; ++ os << ' ' << id << ']';
+- return buf.content(); ++ buf = os.buf(); ++ return buf.c_str();
- }
- /// Create "503 Service Unavailable" or "504 Gateway Timeout" error depending
+@@ -516,7 +529,7 @@ void
- HappyConnOpener::startConnecting(Attempt &attempt, PeerConnectionPointer &dest)
- {
Must(!attempt.path);
+- Must(!attempt.connector); ++ Must(!attempt.connWait);
Must(dest);
const auto bumpThroughPeer = cause->flags.sslBumped && dest->getPeer();
+@@ -552,51 +565,52 @@ HappyConnOpener::openFreshConnection(Attempt &attempt, PeerConnectionPointer &de
entry->mem_obj->checkUrlChecksum();
- #endif
+- GetMarkingsToServer(cause.getRaw(), *dest); ++ const auto conn = dest->cloneProfile(); ++ GetMarkingsToServer(cause.getRaw(), *conn);
+- // ConnOpener modifies its destination argument so we reset the source port +- // in case we are reusing the destination already used by our predecessor. +- dest->local.port(0);
++n_tries;
typedef CommCbMemFunT<HappyConnOpener, CommConnectCbParams> Dialer;
+- AsyncCall::Pointer callConnect = JobCallback(48, 5, Dialer, this, HappyConnOpener::connectDone); ++ AsyncCall::Pointer callConnect = asyncCall(48, 5, attempt.callbackMethodName, ++ Dialer(this, attempt.callbackMethod));
const time_t connTimeout = dest->connectTimeout(fwdStart);
+- Comm::ConnOpener *cs = new Comm::ConnOpener(dest, callConnect, connTimeout); +- if (!dest->getPeer()) ++ auto cs = new Comm::ConnOpener(conn, callConnect, connTimeout); ++ if (!conn->getPeer())
cs->setHost(host_);
+- attempt.path = dest; +- attempt.connector = callConnect; +- attempt.opener = cs; ++ attempt.path = dest; // but not the being-opened conn! ++ attempt.connWait.start(cs, callConnect); ++}
+- AsyncJob::Start(cs); ++/// Comm::ConnOpener callback for the prime connection attempt ++void ++HappyConnOpener::notePrimeConnectDone(const CommConnectCbParams ¶ms) ++{ ++ handleConnOpenerAnswer(prime, params, "new prime connection");
- }
+-/// called by Comm::ConnOpener objects after a prime or spare connection attempt +-/// completes (successfully or not) ++/// Comm::ConnOpener callback for the spare connection attempt
- void
+-HappyConnOpener::connectDone(const CommConnectCbParams ¶ms) ++HappyConnOpener::noteSpareConnectDone(const CommConnectCbParams ¶ms)
- {
+- Must(params.conn); +- const bool itWasPrime = (params.conn == prime.path); +- const bool itWasSpare = (params.conn == spare.path); +- Must(itWasPrime != itWasSpare); +- +- PeerConnectionPointer handledPath; +- if (itWasPrime) { +- handledPath = prime.path; +- prime.finish(); +- } else { +- handledPath = spare.path; +- spare.finish(); +- if (gotSpareAllowance) { +- TheSpareAllowanceGiver.jobUsedAllowance(); +- gotSpareAllowance = false; +- } ++ if (gotSpareAllowance) { ++ TheSpareAllowanceGiver.jobUsedAllowance(); ++ gotSpareAllowance = false;
}
++ handleConnOpenerAnswer(spare, params, "new spare connection"); ++} ++ ++/// prime/spare-agnostic processing of a Comm::ConnOpener result ++void ++HappyConnOpener::handleConnOpenerAnswer(Attempt &attempt, const CommConnectCbParams ¶ms, const char *what) ++{ ++ Must(params.conn); ++ ++ // finalize the previously selected path before attempt.finish() forgets it ++ auto handledPath = attempt.path; ++ handledPath.finalize(params.conn); // closed on errors ++ attempt.finish();
+- const char *what = itWasPrime ? "new prime connection" : "new spare connection";
if (params.flag == Comm::OK) {
sendSuccess(handledPath, false, what);
return;
+@@ -605,7 +619,6 @@ HappyConnOpener::connectDone(const CommConnectCbParams ¶ms)
debugs(17, 8, what << " failed: " << params.conn);
if (const auto peer = params.conn->getPeer())
peerConnectFailed(peer);
+- params.conn->close(); // TODO: Comm::ConnOpener should do this instead.
// remember the last failure (we forward it if we cannot connect anywhere)
lastFailedConnection = handledPath;
+@@ -881,13 +894,23 @@ HappyConnOpener::ranOutOfTimeOrAttempts() const
return false;
- }
++HappyConnOpener::Attempt::Attempt(const CallbackMethod method, const char *methodName): ++ callbackMethod(method), ++ callbackMethodName(methodName) ++{ ++} ++ ++void ++HappyConnOpener::Attempt::finish() ++{ ++ connWait.finish(); ++ path = nullptr; ++} ++
- void
- HappyConnOpener::Attempt::cancel(const char *reason)
- {
+- if (connector) { +- connector->cancel(reason); +- CallJobHere(17, 3, opener, Comm::ConnOpener, noteAbort); +- } +- clear(); ++ connWait.cancel(reason); ++ path = nullptr;
- }
+diff --git a/src/HappyConnOpener.h b/src/HappyConnOpener.h +index 4f3135c60..c57c431ad 100644 +--- a/src/HappyConnOpener.h ++++ b/src/HappyConnOpener.h +@@ -156,22 +156,28 @@ private:
/// a connection opening attempt in progress (or falsy)
class Attempt {
public:
++ /// HappyConnOpener method implementing a ConnOpener callback ++ using CallbackMethod = void (HappyConnOpener::*)(const CommConnectCbParams &); ++ ++ Attempt(const CallbackMethod method, const char *methodName); ++
explicit operator bool() const { return static_cast<bool>(path); }
/// reacts to a natural attempt completion (successful or otherwise)
+- void finish() { clear(); } ++ void finish();
/// aborts an in-progress attempt
void cancel(const char *reason);
PeerConnectionPointer path; ///< the destination we are connecting to
+- AsyncCall::Pointer connector; ///< our opener callback +- Comm::ConnOpener::Pointer opener; ///< connects to path and calls us
+- private: +- /// cleans up after the attempt ends (successfully or otherwise) +- void clear() { path = nullptr; connector = nullptr; opener = nullptr; } ++ /// waits for a connection to the peer to be established/opened ++ JobWaitComm::ConnOpener connWait; ++ ++ const CallbackMethod callbackMethod; ///< ConnOpener calls this method ++ const char * const callbackMethodName; ///< for callbackMethod debugging
};
++ friend std::ostream &operator <<(std::ostream &, const Attempt &);
/* AsyncJob API */
virtual void start() override;
+@@ -190,7 +196,9 @@ private:
void openFreshConnection(Attempt &, PeerConnectionPointer &);
bool reuseOldConnection(PeerConnectionPointer &);
+- void connectDone(const CommConnectCbParams &); ++ void notePrimeConnectDone(const CommConnectCbParams &); ++ void noteSpareConnectDone(const CommConnectCbParams &); ++ void handleConnOpenerAnswer(Attempt &, const CommConnectCbParams &, const char *connDescription);
void checkForNewConnection();
+diff --git a/src/PeerPoolMgr.cc b/src/PeerPoolMgr.cc +index 2caa09f44..7423dd669 100644 +--- a/src/PeerPoolMgr.cc ++++ b/src/PeerPoolMgr.cc +@@ -43,9 +43,8 @@ public:
- PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"),
peer(cbdataReference(aPeer)),
request(),
+- opener(), +- securer(), +- closer(), ++ transportWait(), ++ encryptionWait(),
addrUsed(0)
- {
- }
+@@ -90,7 +89,7 @@ PeerPoolMgr::doneAll() const
- void
- PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms)
- {
+- opener = NULL; ++ transportWait.finish();
if (!validPeer()) {
debugs(48, 3, "peer gone");
+@@ -100,9 +99,6 @@ PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms)
}
if (params.flag != Comm::OK) {
+- /* it might have been a timeout with a partially open link */ +- if (params.conn != NULL) +- params.conn->close();
peerConnectFailed(peer);
checkpoint("conn opening failure"); // may retry
return;
+@@ -112,20 +108,16 @@ PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms)
// Handle TLS peers.
if (peer->secure.encryptTransport) {
+- typedef CommCbMemFunT<PeerPoolMgr, CommCloseCbParams> CloserDialer; +- closer = JobCallback(48, 3, CloserDialer, this, +- PeerPoolMgr::handleSecureClosure); +- comm_add_close_handler(params.conn->fd, closer); +- +- securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer", +- MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer)); ++ // XXX: Exceptions orphan params.conn ++ AsyncCall::Pointer callback = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer", ++ MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
const int peerTimeout = peerConnectTimeout(peer);
const int timeUsed = squid_curtime - params.conn->startTime();
// Use positive timeout when less than one second is left for conn.
const int timeLeft = positiveTimeout(peerTimeout - timeUsed);
+- auto *connector = new Security::BlindPeerConnector(request, params.conn, securer, nullptr, timeLeft); +- AsyncJob::Start(connector); // will call our callback ++ const auto connector = new Security::BlindPeerConnector(request, params.conn, callback, nullptr, timeLeft); ++ encryptionWait.start(connector, callback);
return;
}
+@@ -144,16 +136,7 @@ PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn)
- void
- PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer)
- {
+- Must(securer != NULL); +- securer = NULL; +- +- if (closer != NULL) { +- if (answer.conn != NULL) +- comm_remove_close_handler(answer.conn->fd, closer); +- else +- closer->cancel("securing completed"); +- closer = NULL; +- } ++ encryptionWait.finish();
if (!validPeer()) {
debugs(48, 3, "peer gone");
+@@ -162,35 +145,33 @@ PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer)
return;
}
++ assert(!answer.tunneled);
if (answer.error.get()) {
+- if (answer.conn != NULL) +- answer.conn->close(); ++ assert(!answer.conn);
// PeerConnector calls peerConnectFailed() for us;
checkpoint("conn securing failure"); // may retry
return;
}
+- pushNewConnection(answer.conn); +-} ++ assert(answer.conn);
+-void +-PeerPoolMgr::handleSecureClosure(const CommCloseCbParams ¶ms) +-{ +- Must(closer != NULL); +- Must(securer != NULL); +- securer->cancel("conn closed by a 3rd party"); +- securer = NULL; +- closer = NULL; +- // allow the closing connection to fully close before we check again +- Checkpoint(this, "conn closure while securing"); ++ // The socket could get closed while our callback was queued. Sync ++ // Connection. XXX: Connection::fd may already be stale/invalid here. ++ if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) { ++ answer.conn->noteClosure(); ++ checkpoint("external connection closure"); // may retry ++ return; ++ } ++ ++ pushNewConnection(answer.conn);
- }
- void
- PeerPoolMgr::openNewConnection()
- {
// KISS: Do nothing else when we are already doing something.
+- if (opener != NULL || securer != NULL || shutting_down) { +- debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down); ++ if (transportWait || encryptionWait || shutting_down) { ++ debugs(48, 7, "busy: " << transportWait << '|' << encryptionWait << '|' << shutting_down);
return; // there will be another checkpoint when we are done opening/securing
}
+@@ -227,9 +208,9 @@ PeerPoolMgr::openNewConnection()
const int ctimeout = peerConnectTimeout(peer);
typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer;
+- opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection); +- Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout); +- AsyncJob::Start(cs); ++ AsyncCall::Pointer callback = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection); ++ const auto cs = new Comm::ConnOpener(conn, callback, ctimeout); ++ transportWait.start(cs, callback);
- }
- void
+diff --git a/src/PeerPoolMgr.h b/src/PeerPoolMgr.h +index 217994393..6da61b10b 100644 +--- a/src/PeerPoolMgr.h ++++ b/src/PeerPoolMgr.h +@@ -10,6 +10,7 @@
- #define SQUID_PEERPOOLMGR_H
- #include "base/AsyncJob.h"
++#include "base/JobWait.h"
- #include "comm/forward.h"
- #include "security/forward.h"
+@@ -54,18 +55,19 @@ protected:
/// Security::PeerConnector callback
void handleSecuredPeer(Security::EncryptorAnswer &answer);
+- /// called when the connection we are trying to secure is closed by a 3rd party +- void handleSecureClosure(const CommCloseCbParams ¶ms); +-
/// the final step in connection opening (and, optionally, securing) sequence
void pushNewConnection(const Comm::ConnectionPointer &conn);
- private:
CachePeer *peer; ///< the owner of the pool we manage
RefCount<HttpRequest> request; ///< fake HTTP request for conn opening code
+- AsyncCall::Pointer opener; ///< whether we are opening a connection +- AsyncCall::Pointer securer; ///< whether we are securing a connection +- AsyncCall::Pointer closer; ///< monitors conn while we are securing it ++ ++ /// waits for a transport connection to the peer to be established/opened ++ JobWaitComm::ConnOpener transportWait; ++ ++ /// waits for the established transport connection to be secured/encrypted ++ JobWaitSecurity::BlindPeerConnector encryptionWait; ++
unsigned int addrUsed; ///< counter for cycling through peer addresses
- };
+diff --git a/src/ResolvedPeers.cc b/src/ResolvedPeers.cc +index 5b2c6740d..06ab02d23 100644 +--- a/src/ResolvedPeers.cc ++++ b/src/ResolvedPeers.cc +@@ -151,7 +151,7 @@ ResolvedPeers::extractFound(const char *description, const Paths::iterator &foun
while (++pathsToSkip < paths_.size() && !paths_[pathsToSkip].available) {}
}
+- const auto cleanPath = path.connection->cloneDestinationDetails(); ++ const auto cleanPath = path.connection->cloneProfile();
return PeerConnectionPointer(cleanPath, found - paths_.begin());
- }
+diff --git a/src/adaptation/icap/ModXact.cc b/src/adaptation/icap/ModXact.cc +index bbeebdbb5..982f38f8d 100644 +--- a/src/adaptation/icap/ModXact.cc ++++ b/src/adaptation/icap/ModXact.cc +@@ -187,8 +187,7 @@ void Adaptation::Icap::ModXact::startWriting()
openConnection();
- }
+-// connection with the ICAP service established +-void Adaptation::Icap::ModXact::handleCommConnected() ++void Adaptation::Icap::ModXact::startShoveling()
- {
Must(state.writing == State::writingConnect);
+diff --git a/src/adaptation/icap/ModXact.h b/src/adaptation/icap/ModXact.h +index f8988ac29..2fda1318b 100644 +--- a/src/adaptation/icap/ModXact.h ++++ b/src/adaptation/icap/ModXact.h +@@ -155,10 +155,11 @@ public:
virtual void noteBodyProductionEnded(BodyPipe::Pointer);
virtual void noteBodyProducerAborted(BodyPipe::Pointer);
+- // comm handlers +- virtual void handleCommConnected(); ++ /* Xaction API */ ++ virtual void startShoveling();
virtual void handleCommWrote(size_t size);
virtual void handleCommRead(size_t size);
++
void handleCommWroteHeaders();
void handleCommWroteBody();
+diff --git a/src/adaptation/icap/OptXact.cc b/src/adaptation/icap/OptXact.cc +index 5ed4fbfbb..2464ea533 100644 +--- a/src/adaptation/icap/OptXact.cc ++++ b/src/adaptation/icap/OptXact.cc +@@ -37,7 +37,7 @@ void Adaptation::Icap::OptXact::start()
openConnection();
- }
+-void Adaptation::Icap::OptXact::handleCommConnected() ++void Adaptation::Icap::OptXact::startShoveling()
- {
scheduleRead();
+diff --git a/src/adaptation/icap/OptXact.h b/src/adaptation/icap/OptXact.h +index 3811ab48f..725cd6225 100644 +--- a/src/adaptation/icap/OptXact.h ++++ b/src/adaptation/icap/OptXact.h +@@ -30,8 +30,9 @@ public:
OptXact(ServiceRep::Pointer &aService);
- protected:
++ /* Xaction API */
virtual void start();
+- virtual void handleCommConnected(); ++ virtual void startShoveling();
virtual void handleCommWrote(size_t size);
virtual void handleCommRead(size_t size);
+diff --git a/src/adaptation/icap/ServiceRep.cc b/src/adaptation/icap/ServiceRep.cc +index 6df475712..fbd896888 100644 +--- a/src/adaptation/icap/ServiceRep.cc ++++ b/src/adaptation/icap/ServiceRep.cc +@@ -112,9 +112,10 @@ void Adaptation::Icap::ServiceRep::noteFailure()
// should be configurable.
- }
+-// returns a persistent or brand new connection; negative int on failures ++// TODO: getIdleConnection() and putConnection()/noteConnectionFailed() manage a ++// "used connection slot" resource. Automate that resource tracking (RAII/etc.).
- Comm::ConnectionPointer
+-Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused) ++Adaptation::Icap::ServiceRep::getIdleConnection(const bool retriableXact)
- {
Comm::ConnectionPointer connection;
+@@ -137,7 +138,6 @@ Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused)
else
theIdleConns->closeN(1);
+- reused = Comm::IsConnOpen(connection);
++theBusyConns;
debugs(93,3, HERE << "got connection: " << connection);
return connection;
+@@ -150,7 +150,6 @@ void Adaptation::Icap::ServiceRep::putConnection(const Comm::ConnectionPointer &
// do not pool an idle connection if we owe connections
if (isReusable && excessConnections() == 0) {
debugs(93, 3, HERE << "pushing pconn" << comment);
+- commUnsetConnTimeout(conn);
theIdleConns->push(conn);
} else {
debugs(93, 3, HERE << (sendReset ? "RST" : "FIN") << "-closing " <<
+diff --git a/src/adaptation/icap/ServiceRep.h b/src/adaptation/icap/ServiceRep.h +index f2e893244..cca2df4ab 100644 +--- a/src/adaptation/icap/ServiceRep.h ++++ b/src/adaptation/icap/ServiceRep.h +@@ -85,7 +85,8 @@ public:
bool wantsPreview(const SBuf &urlPath, size_t &wantedSize) const;
bool allows204() const;
bool allows206() const;
+- Comm::ConnectionPointer getConnection(bool isRetriable, bool &isReused); ++ /// \returns an idle persistent ICAP connection or nil ++ Comm::ConnectionPointer getIdleConnection(bool isRetriable);
void putConnection(const Comm::ConnectionPointer &conn, bool isReusable, bool sendReset, const char *comment);
void noteConnectionUse(const Comm::ConnectionPointer &conn);
void noteConnectionFailed(const char *comment);
+diff --git a/src/adaptation/icap/Xaction.cc b/src/adaptation/icap/Xaction.cc +index 9aacf2e1b..962ed1703 100644 +--- a/src/adaptation/icap/Xaction.cc ++++ b/src/adaptation/icap/Xaction.cc +@@ -13,6 +13,7 @@
- #include "adaptation/icap/Config.h"
- #include "adaptation/icap/Launcher.h"
- #include "adaptation/icap/Xaction.h"
++#include "base/JobWait.h"
- #include "base/TextException.h"
- #include "comm.h"
- #include "comm/Connection.h"
+@@ -79,7 +80,6 @@ Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::Serv
icapRequest(NULL),
icapReply(NULL),
attempts(0),
+- connection(NULL),
theService(aService),
commEof(false),
reuseConnection(true),
+@@ -87,14 +87,8 @@ Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::Serv
isRepeatable(true),
ignoreLastWrite(false),
waitingForDns(false),
+- stopReason(NULL), +- connector(NULL), +- reader(NULL), +- writer(NULL), +- closer(NULL),
alep(new AccessLogEntry),
+- al(*alep), +- cs(NULL) ++ al(*alep)
- {
debugs(93,3, typeName << " constructed, this=" << this <<
" [icapx" << id << ']'); // we should not call virtual status() here
+@@ -150,6 +144,8 @@ static void
- icapLookupDnsResults(const ipcache_addrs *ia, const Dns::LookupDetails &, void *data)
- {
Adaptation::Icap::Xaction *xa = static_cast<Adaptation::Icap::Xaction *>(data);
++ /// TODO: refactor with CallJobHere1, passing either std::optional (after upgrading to C++17) ++ /// or OptionalIp::Address (when it can take non-trivial types)
xa->dnsLookupDone(ia);
- }
+@@ -164,21 +160,8 @@ Adaptation::Icap::Xaction::openConnection()
if (!TheConfig.reuse_connections)
disableRetries(); // this will also safely drain pconn pool
+- bool wasReused = false; +- connection = s.getConnection(isRetriable, wasReused); +- +- if (wasReused && Comm::IsConnOpen(connection)) { +- // Set comm Close handler +- // fake the connect callback +- // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead? +- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer; +- CbcPointer<Xaction> self(this); +- Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected); +- dialer.params.conn = connection; +- dialer.params.flag = Comm::OK; +- // fake other parameters by copying from the existing connection +- connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer); +- ScheduleCallHere(connector); ++ if (const auto pconn = s.getIdleConnection(isRetriable)) { ++ useTransportConnection(pconn);
return;
}
+@@ -207,30 +190,22 @@ Adaptation::Icap::Xaction::dnsLookupDone(const ipcache_addrs *ia)
- #if WHEN_IPCACHE_NBGETHOSTBYNAME_USES_ASYNC_CALLS
dieOnConnectionFailure(); // throws
- #else // take a step back into protected Async call dialing.
+- // fake the connect callback +- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer; +- CbcPointer<Xaction> self(this); +- Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected); +- dialer.params.conn = connection; +- dialer.params.flag = Comm::COMM_ERROR; +- // fake other parameters by copying from the existing connection +- connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer); +- ScheduleCallHere(connector); ++ CallJobHere(93, 3, this, Xaction, Xaction::dieOnConnectionFailure);
- #endif
return;
}
+- connection = new Comm::Connection; +- connection->remote = ia->current(); +- connection->remote.port(s.cfg().port); +- getOutgoingAddress(NULL, connection); ++ const Comm::ConnectionPointer conn = new Comm::Connection(); ++ conn->remote = ia->current(); ++ conn->remote.port(s.cfg().port); ++ getOutgoingAddress(nullptr, conn);
// TODO: service bypass status may differ from that of a transaction
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
+- connector = JobCallback(93,3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected); +- cs = new Comm::ConnOpener(connection, connector, TheConfig.connect_timeout(service().cfg().bypass)); ++ AsyncCall::Pointer callback = JobCallback(93, 3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected); ++ const auto cs = new Comm::ConnOpener(conn, callback, TheConfig.connect_timeout(service().cfg().bypass));
cs->setHost(s.cfg().host.termedBuf());
+- AsyncJob::Start(cs.get()); ++ transportWait.start(cs, callback);
- }
- /*
+@@ -256,6 +231,8 @@ void Adaptation::Icap::Xaction::closeConnection()
closer = NULL;
}
++ commUnsetConnTimeout(connection); ++
cancelRead(); // may not work
if (reuseConnection && !doneWithIo()) {
+@@ -275,54 +252,65 @@ void Adaptation::Icap::Xaction::closeConnection()
writer = NULL;
reader = NULL;
+- connector = NULL;
connection = NULL;
}
- }
+-// connection with the ICAP service established ++/// called when the connection attempt to an ICAP service completes (successfully or not)
- void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io)
- {
+- cs = NULL; ++ transportWait.finish();
+- if (io.flag == Comm::TIMEOUT) { +- handleCommTimedout(); ++ if (io.flag != Comm::OK) { ++ dieOnConnectionFailure(); // throws
return;
}
+- Must(connector != NULL); +- connector = NULL; +- +- if (io.flag != Comm::OK) +- dieOnConnectionFailure(); // throws +- +- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer; +- AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout", +- TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout)); +- commSetConnTimeout(io.conn, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall); ++ useTransportConnection(io.conn); ++}
+- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer; +- closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", +- CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); +- comm_add_close_handler(io.conn->fd, closer); ++/// React to the availability of a transport connection to the ICAP service. ++/// The given connection may (or may not) be secured already. ++void ++Adaptation::Icap::Xaction::useTransportConnection(const Comm::ConnectionPointer &conn) ++{ ++ assert(Comm::IsConnOpen(conn)); ++ assert(!connection);
// If it is a reused connection and the TLS object is built
// we should not negotiate new TLS session
+- const auto &ssl = fd_table[io.conn->fd].ssl; ++ const auto &ssl = fd_table[conn->fd].ssl;
if (!ssl && service().cfg().secure.encryptTransport) {
++ // XXX: Exceptions orphan conn.
CbcPointer<Adaptation::Icap::Xaction> me(this);
+- securer = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer", +- MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer)); ++ AsyncCall::Pointer callback = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer", ++ MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer));
+- auto *sslConnector = new Ssl::IcapPeerConnector(theService, io.conn, securer, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass)); +- AsyncJob::Start(sslConnector); // will call our callback ++ const auto sslConnector = new Ssl::IcapPeerConnector(theService, conn, callback, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass)); ++ ++ encryptionWait.start(sslConnector, callback);
return;
}
+-// ?? fd_table[io.conn->fd].noteUse(icapPconnPool); ++ useIcapConnection(conn); ++} ++ ++/// react to the availability of a fully-ready ICAP connection ++void ++Adaptation::Icap::Xaction::useIcapConnection(const Comm::ConnectionPointer &conn) ++{ ++ assert(!connection); ++ assert(conn); ++ assert(Comm::IsConnOpen(conn)); ++ connection = conn;
service().noteConnectionUse(connection);
+- handleCommConnected(); ++ typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer; ++ closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", ++ CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); ++ comm_add_close_handler(connection->fd, closer); ++ ++ startShoveling();
- }
- void Adaptation::Icap::Xaction::dieOnConnectionFailure()
+@@ -367,40 +355,25 @@ void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams &io)
- // communication timeout with the ICAP service
- void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &)
+-{ +- handleCommTimedout(); +-} +- +-void Adaptation::Icap::Xaction::handleCommTimedout()
- {
debugs(93, 2, HERE << typeName << " failed: timeout with " <<
theService->cfg().methodStr() << " " <<
theService->cfg().uri << status());
reuseConnection = false;
+- const bool whileConnecting = connector != NULL; +- if (whileConnecting) { +- assert(!haveConnection()); +- theService->noteConnectionFailed("timedout"); +- } else +- closeConnection(); // so that late Comm callbacks do not disturb bypass +- throw TexcHere(whileConnecting ? +- "timed out while connecting to the ICAP service" : +- "timed out while talking to the ICAP service"); ++ assert(haveConnection()); ++ closeConnection(); ++ throw TextException("timed out while talking to the ICAP service", Here());
- }
- // unexpected connection close while talking to the ICAP service
- void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &)
- {
+- if (securer != NULL) { +- securer->cancel("Connection closed before SSL negotiation finished"); +- securer = NULL; ++ if (connection) { ++ connection->noteClosure(); ++ connection = nullptr;
}
closer = NULL;
+- handleCommClosed(); +-}
+-void Adaptation::Icap::Xaction::handleCommClosed() +-{
static const auto d = MakeNamedErrorDetail("ICAP_XACT_CLOSE");
detailError(d);
mustStop("ICAP service connection externally closed");
+@@ -424,7 +397,8 @@ void Adaptation::Icap::Xaction::callEnd()
- bool Adaptation::Icap::Xaction::doneAll() const
- {
+- return !waitingForDns && !connector && !securer && !reader && !writer && ++ return !waitingForDns && !transportWait && !encryptionWait && ++ !reader && !writer &&
Adaptation::Initiate::doneAll();
- }
+@@ -568,7 +542,7 @@ bool Adaptation::Icap::Xaction::doneWriting() const
- bool Adaptation::Icap::Xaction::doneWithIo() const
- {
return haveConnection() &&
+- !connector && !reader && !writer && // fast checks, some redundant ++ !transportWait && !reader && !writer && // fast checks, some redundant
doneReading() && doneWriting();
- }
+@@ -608,10 +582,7 @@ void Adaptation::Icap::Xaction::setOutcome(const Adaptation::Icap::XactOutcome &
- void Adaptation::Icap::Xaction::swanSong()
- {
// kids should sing first and then call the parent method.
+- if (cs.valid()) { +- debugs(93,6, HERE << id << " about to notify ConnOpener!"); +- CallJobHere(93, 3, cs, Comm::ConnOpener, noteAbort); +- cs = NULL; ++ if (transportWait || encryptionWait) {
service().noteConnectionFailed("abort");
}
+@@ -750,20 +721,12 @@ Ssl::IcapPeerConnector::noteNegotiationDone(ErrorState *error)
- void
- Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer)
- {
+- Must(securer != NULL); +- securer = NULL; +- +- if (closer != NULL) { +- if (Comm::IsConnOpen(answer.conn)) +- comm_remove_close_handler(answer.conn->fd, closer); +- else +- closer->cancel("securing completed"); +- closer = NULL; +- } ++ encryptionWait.finish();
++ assert(!answer.tunneled);
if (answer.error.get()) {
+- if (answer.conn != NULL) +- answer.conn->close(); ++ assert(!answer.conn); ++ // TODO: Refactor dieOnConnectionFailure() to be usable here as well.
debugs(93, 2, typeName <<
" TLS negotiation to " << service().cfg().uri << " failed");
service().noteConnectionFailed("failure");
+@@ -774,8 +737,18 @@ Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer)
debugs(93, 5, "TLS negotiation to " << service().cfg().uri << " complete");
+- service().noteConnectionUse(answer.conn); ++ assert(answer.conn); ++ ++ // The socket could get closed while our callback was queued. Sync ++ // Connection. XXX: Connection::fd may already be stale/invalid here. ++ if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) { ++ answer.conn->noteClosure(); ++ service().noteConnectionFailed("external TLS connection closure"); ++ static const auto d = MakeNamedErrorDetail("ICAP_XACT_SSL_CLOSE"); ++ detailError(d); ++ throw TexcHere("external closure of the TLS ICAP service connection"); ++ }
+- handleCommConnected(); ++ useIcapConnection(answer.conn);
- }
+diff --git a/src/adaptation/icap/Xaction.h b/src/adaptation/icap/Xaction.h +index b66044594..31a6e22fc 100644 +--- a/src/adaptation/icap/Xaction.h ++++ b/src/adaptation/icap/Xaction.h +@@ -12,6 +12,7 @@
- #include "AccessLogEntry.h"
- #include "adaptation/icap/ServiceRep.h"
- #include "adaptation/Initiate.h"
++#include "base/JobWait.h"
- #include "comm/ConnOpener.h"
- #include "error/forward.h"
- #include "HttpReply.h"
+@@ -20,6 +21,10 @@
- class MemBuf;
++namespace Ssl { ++class IcapPeerConnector; ++} ++
- namespace Adaptation
- {
- namespace Icap
+@@ -65,12 +70,12 @@ protected:
virtual void start();
virtual void noteInitiatorAborted(); // TODO: move to Adaptation::Initiate
++ /// starts sending/receiving ICAP messages ++ virtual void startShoveling() = 0; ++
// comm hanndlers; called by comm handler wrappers
+- virtual void handleCommConnected() = 0;
virtual void handleCommWrote(size_t sz) = 0;
virtual void handleCommRead(size_t sz) = 0;
+- virtual void handleCommTimedout(); +- virtual void handleCommClosed();
void handleSecuredPeer(Security::EncryptorAnswer &answer);
/// record error detail if possible
+@@ -78,7 +83,6 @@ protected:
void openConnection();
void closeConnection();
+- void dieOnConnectionFailure();
bool haveConnection() const;
void scheduleRead();
+@@ -124,11 +128,13 @@ public:
ServiceRep &service();
- private:
++ void useTransportConnection(const Comm::ConnectionPointer &); ++ void useIcapConnection(const Comm::ConnectionPointer &); ++ void dieOnConnectionFailure();
void tellQueryAborted();
void maybeLog();
- protected:
+- Comm::ConnectionPointer connection; ///< ICAP server connection
Adaptation::Icap::ServiceRep::Pointer theService;
SBuf readBuf;
+@@ -139,13 +145,8 @@ protected:
bool ignoreLastWrite;
bool waitingForDns; ///< expecting a ipcache_nbgethostbyname() callback
+- const char *stopReason; +- +- // active (pending) comm callbacks for the ICAP server connection +- AsyncCall::Pointer connector;
AsyncCall::Pointer reader;
AsyncCall::Pointer writer;
+- AsyncCall::Pointer closer;
AccessLogEntry::Pointer alep; ///< icap.log entry
AccessLogEntry &al; ///< short for *alep
+@@ -155,8 +156,16 @@ protected:
timeval icap_tio_finish; /*time when the last byte of the ICAP responsewas received*/
- private:
+- Comm::ConnOpener::Pointer cs; +- AsyncCall::Pointer securer; ///< whether we are securing a connection ++ /// waits for a transport connection to the ICAP server to be established/opened ++ JobWaitComm::ConnOpener transportWait; ++ ++ /// waits for the established transport connection to be secured/encrypted ++ JobWaitSsl::IcapPeerConnector encryptionWait; ++ ++ /// open and, if necessary, secured connection to the ICAP server (or nil) ++ Comm::ConnectionPointer connection; ++ ++ AsyncCall::Pointer closer;
- };
- } // namespace Icap
+diff --git a/src/base/AsyncJob.cc b/src/base/AsyncJob.cc +index 3b9161e4a..111667d5d 100644 +--- a/src/base/AsyncJob.cc ++++ b/src/base/AsyncJob.cc +@@ -20,11 +20,11 @@
- InstanceIdDefinitions(AsyncJob, "job");
+-AsyncJob::Pointer AsyncJob::Start(AsyncJob *j) ++void ++AsyncJob::Start(const Pointer &job)
- {
+- AsyncJob::Pointer job(j);
CallJobHere(93, 5, job, AsyncJob, start);
+- return job; ++ job->started_ = true; // it is the attempt that counts
- }
- AsyncJob::AsyncJob(const char *aTypeName) :
+@@ -38,6 +38,7 @@ AsyncJob::~AsyncJob()
- {
debugs(93,5, "AsyncJob destructed, this=" << this <<
" type=" << typeName << " [" << id << ']');
++ assert(!started_ || swanSang_);
- }
- void AsyncJob::start()
+@@ -141,9 +142,16 @@ void AsyncJob::callEnd()
AsyncCall::Pointer inCallSaved = inCall;
void *thisSaved = this;
++ // TODO: Swallow swanSong() exceptions to reduce memory leaks. ++ ++ // Job callback invariant: swanSong() is (only) called for started jobs. ++ // Here to detect violations in kids that forgot to call our swanSong(). ++ assert(started_); ++ ++ swanSang_ = true; // it is the attempt that counts
swanSong();
+- delete this; // this is the only place where the object is deleted ++ delete this; // this is the only place where a started job is deleted
// careful: this object does not exist any more
debugs(93, 6, HERE << *inCallSaved << " ended " << thisSaved);
+diff --git a/src/base/AsyncJob.h b/src/base/AsyncJob.h +index 4d685dff5..a46e30071 100644 +--- a/src/base/AsyncJob.h ++++ b/src/base/AsyncJob.h +@@ -36,8 +36,13 @@ public:
- public:
AsyncJob(const char *aTypeName);
+- /// starts a freshly created job (i.e., makes the job asynchronous) +- static Pointer Start(AsyncJob *job); ++ /// Promises to start the configured job (eventually). The job is deemed to ++ /// be running asynchronously beyond this point, so the caller should only ++ /// access the job object via AsyncCalls rather than directly. ++ /// ++ /// swanSong() is only called for jobs for which this method has returned ++ /// successfully (i.e. without throwing). ++ static void Start(const Pointer &job);
- protected:
// XXX: temporary method to replace "delete this" in jobs-in-transition.
+@@ -62,6 +67,11 @@ public:
/// called when the job throws during an async call
virtual void callException(const std::exception &e);
++ /// process external request to terminate now (i.e. during this async call) ++ void handleStopRequest() { mustStop("externally aborted"); } ++ ++ const InstanceId<AsyncJob> id; ///< job identifier ++
- protected:
// external destruction prohibited to ensure swanSong() is called
virtual ~AsyncJob();
+@@ -69,7 +79,9 @@ protected:
const char *stopReason; ///< reason for forcing done() to be true
const char *typeName; ///< kid (leaf) class name, for debugging
AsyncCall::Pointer inCall; ///< the asynchronous call being handled, if any
+- const InstanceId<AsyncJob> id; ///< job identifier ++ ++ bool started_ = false; ///< Start() has finished successfully ++ bool swanSang_ = false; ///< swanSong() was called
- };
- #endif /* SQUID_ASYNC_JOB_H */
+diff --git a/src/base/JobWait.cc b/src/base/JobWait.cc +new file mode 100644 +index 000000000..ba6832415 +--- /dev/null ++++ b/src/base/JobWait.cc +@@ -0,0 +1,81 @@ ++/* ++ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors ++ * ++ * Squid software is distributed under GPLv2+ license and includes ++ * contributions from numerous individuals and organizations. ++ * Please see the COPYING and CONTRIBUTORS files for details. ++ */ ++ ++#include "squid.h" ++#include "base/AsyncJobCalls.h" ++#include "base/JobWait.h" ++ ++#include <cassert> ++#include <iostream> ++ ++JobWaitBase::JobWaitBase() = default; ++ ++JobWaitBase::~JobWaitBase() ++{ ++ cancel("owner gone"); ++} ++ ++void ++JobWaitBase::start_(const AsyncJob::Pointer aJob, const AsyncCall::Pointer aCall) ++{ ++ // Invariant: The wait will be over. We cannot guarantee that the job will ++ // call the callback, of course, but we can check these prerequisites. ++ assert(aCall); ++ assert(aJob.valid()); ++ ++ // "Double" waiting state leads to conflicting/mismatching callbacks ++ // detailed in finish(). Detect that bug ASAP. ++ assert(!waiting()); ++ ++ assert(!callback_); ++ assert(!job_); ++ callback_ = aCall; ++ job_ = aJob; ++ ++ AsyncJob::Start(job_.get()); ++} ++ ++void ++JobWaitBase::finish() ++{ ++ // Unexpected callbacks might result in disasters like secrets exposure, ++ // data corruption, or expensive message routing mistakes when the callback ++ // info is applied to the wrong message part or acted upon prematurely. ++ assert(waiting()); ++ clear(); ++} ++ ++void ++JobWaitBase::cancel(const char *reason) ++{ ++ if (callback_) { ++ callback_->cancel(reason); ++ ++ // Instead of AsyncJob, the class parameter could be Job. That would ++ // avoid runtime child-to-parent CbcPointer conversion overheads, but ++ // complicate support for Jobs with virtual AsyncJob bases (GCC error: ++ // "pointer to member conversion via virtual base AsyncJob") and also ++ // cache-log "Job::handleStopRequest()" with a non-existent class name. ++ CallJobHere(callback_->debugSection, callback_->debugLevel, job_, AsyncJob, handleStopRequest); ++ ++ clear(); ++ } ++} ++ ++void ++JobWaitBase::print(std::ostream &os) const ++{ ++ // use a backarrow to emphasize that this is a callback: call24<-job6 ++ if (callback_) ++ os << callback_->id << "<-"; ++ if (const auto rawJob = job_.get()) ++ os << rawJob->id; ++ else ++ os << job_; // raw pointer of a gone job may still be useful for triage ++} ++ +diff --git a/src/base/JobWait.h b/src/base/JobWait.h +new file mode 100644 +index 000000000..6b1131331 +--- /dev/null ++++ b/src/base/JobWait.h +@@ -0,0 +1,91 @@ ++/* ++ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors ++ * ++ * Squid software is distributed under GPLv2+ license and includes ++ * contributions from numerous individuals and organizations. ++ * Please see the COPYING and CONTRIBUTORS files for details. ++ */ ++ ++#ifndef SQUID_BASE_JOBWAIT_H ++#define SQUID_BASE_JOBWAIT_H ++ ++#include "base/AsyncJob.h" ++#include "base/CbcPointer.h" ++ ++#include <iosfwd> ++ ++/// Manages waiting for an AsyncJob callback. Use type-safe JobWait instead. ++/// This base class does not contain code specific to the actual Job type. ++class JobWaitBase ++{ ++public: ++ JobWaitBase(); ++ ~JobWaitBase(); ++ ++ /// no copying of any kind: each waiting context needs a dedicated AsyncCall ++ JobWaitBase(JobWaitBase &&) = delete; ++ ++ explicit operator bool() const { return waiting(); } ++ ++ /// whether we are currently waiting for the job to call us back ++ /// the job itself may be gone even if this returns true ++ bool waiting() const { return bool(callback_); } ++ ++ /// ends wait (after receiving the call back) ++ /// forgets the job which is likely to be gone by now ++ void finish(); ++ ++ /// aborts wait (if any) before receiving the call back ++ /// does nothing if we are not waiting ++ void cancel(const char *reason); ++ ++ /// summarizes what we are waiting for (for debugging) ++ void print(std::ostream &) const; ++ ++protected: ++ /// starts waiting for the given job to call the given callback ++ void start_(AsyncJob::Pointer, AsyncCall::Pointer); ++ ++private: ++ /// the common part of finish() and cancel() ++ void clear() { job_.clear(); callback_ = nullptr; } ++ ++ /// the job that we are waiting to call us back (or nil) ++ AsyncJob::Pointer job_; ++ ++ /// the call we are waiting for the job_ to make (or nil) ++ AsyncCall::Pointer callback_; ++}; ++ ++/// Manages waiting for an AsyncJob callback. ++/// Completes JobWaitBase by providing Job type-specific members. ++template <class Job> ++class JobWait: public JobWaitBase ++{ ++public: ++ typedef CbcPointer<Job> JobPointer; ++ ++ /// starts waiting for the given job to call the given callback ++ void start(const JobPointer &aJob, const AsyncCall::Pointer &aCallback) { ++ start_(aJob, aCallback); ++ typedJob_ = aJob; ++ } ++ ++ /// \returns a cbdata pointer to the job we are waiting for (or nil) ++ /// the returned pointer may be falsy, even if we are still waiting() ++ JobPointer job() const { return waiting() ? typedJob_ : nullptr; } ++ ++private: ++ /// nearly duplicates JobWaitBase::job_ but exposes the actual job type ++ JobPointer typedJob_; ++}; ++ ++inline ++std::ostream &operator <<(std::ostream &os, const JobWaitBase &wait) ++{ ++ wait.print(os); ++ return os; ++} ++ ++#endif /* SQUID_BASE_JOBWAIT_H */ ++ +diff --git a/src/base/Makefile.am b/src/base/Makefile.am +index c9564d755..374ea8798 100644 +--- a/src/base/Makefile.am ++++ b/src/base/Makefile.am +@@ -34,6 +34,8 @@ libbase_la_SOURCES = \
- Here.h \
- InstanceId.cc \
- InstanceId.h \
++ JobWait.cc \ ++ JobWait.h \
- Lock.h \
- LookupTable.h \
- LruMap.h \
+diff --git a/src/base/forward.h b/src/base/forward.h +index 3803e1ea0..46de97c8d 100644 +--- a/src/base/forward.h ++++ b/src/base/forward.h +@@ -17,6 +17,7 @@ class ScopedId;
- template<class Cbc> class CbcPointer;
- template<class RefCountableKid> class RefCount;
++template<class Job> class JobWait;
- typedef CbcPointer<AsyncJob> AsyncJobPointer;
- typedef RefCount<CodeContext> CodeContextPointer;
+diff --git a/src/client_side.cc b/src/client_side.cc +index b8d786423..4eb697696 100644 +--- a/src/client_side.cc ++++ b/src/client_side.cc +@@ -503,6 +503,10 @@ httpRequestFree(void *data)
- /* This is a handler normally called by comm_close() */
- void ConnStateData::connStateClosed(const CommCloseCbParams &)
- {
++ if (clientConnection) { ++ clientConnection->noteClosure(); ++ // keep closed clientConnection for logging, clientdb cleanup, etc. ++ }
deleteThis("ConnStateData::connStateClosed");
- }
+diff --git a/src/clients/FtpClient.cc b/src/clients/FtpClient.cc +index 7874db121..afe7827dc 100644 +--- a/src/clients/FtpClient.cc ++++ b/src/clients/FtpClient.cc +@@ -201,10 +201,6 @@ Ftp::Client::Client(FwdState *fwdState):
- Ftp::Client::~Client()
- {
+- if (data.opener != NULL) { +- data.opener->cancel("Ftp::Client destructed"); +- data.opener = NULL; +- }
data.close();
safe_free(old_request);
+@@ -786,10 +782,10 @@ Ftp::Client::connectDataChannel()
debugs(9, 3, "connecting to " << conn->remote);
typedef CommCbMemFunT<Client, CommConnectCbParams> Dialer;
+- data.opener = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected); +- Comm::ConnOpener *cs = new Comm::ConnOpener(conn, data.opener, Config.Timeout.connect); ++ AsyncCall::Pointer callback = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected); ++ const auto cs = new Comm::ConnOpener(conn, callback, Config.Timeout.connect);
cs->setHost(data.host);
+- AsyncJob::Start(cs); ++ dataConnWait.start(cs, callback);
- }
- bool
+@@ -811,10 +807,11 @@ void
- Ftp::Client::dataClosed(const CommCloseCbParams &)
- {
debugs(9, 4, status());
++ if (data.conn) ++ data.conn->noteClosure();
if (data.listenConn != NULL) {
data.listenConn->close();
data.listenConn = NULL;
+- // NP clear() does the: data.fd = -1;
}
data.clear();
- }
+@@ -879,6 +876,8 @@ void
- Ftp::Client::ctrlClosed(const CommCloseCbParams &)
- {
debugs(9, 4, status());
++ if (ctrl.conn) ++ ctrl.conn->noteClosure();
ctrl.clear();
doneWithFwd = "ctrlClosed()"; // assume FwdState is monitoring too
mustStop("Ftp::Client::ctrlClosed");
+diff --git a/src/clients/FtpClient.h b/src/clients/FtpClient.h +index ac8d22e18..4b2dd61d5 100644 +--- a/src/clients/FtpClient.h ++++ b/src/clients/FtpClient.h +@@ -64,7 +64,6 @@ public:
*/
Comm::ConnectionPointer listenConn;
+- AsyncCall::Pointer opener; ///< Comm opener handler callback.
- private:
AsyncCall::Pointer closer; ///< Comm close handler callback
- };
+@@ -205,6 +204,10 @@ protected:
virtual void sentRequestBody(const CommIoCbParams &io);
virtual void doneSendingRequestBody();
++ /// Waits for an FTP data connection to the server to be established/opened. ++ /// This wait only happens in FTP passive mode (via PASV or EPSV). ++ JobWaitComm::ConnOpener dataConnWait; ++
- private:
bool parseControlReply(size_t &bytesUsed);
+diff --git a/src/clients/FtpGateway.cc b/src/clients/FtpGateway.cc +index 6a7d139ca..aeeaedb48 100644 +--- a/src/clients/FtpGateway.cc ++++ b/src/clients/FtpGateway.cc +@@ -486,11 +486,9 @@ Ftp::Gateway::timeout(const CommTimeoutCbParams &io)
flags.pasv_supported = false;
debugs(9, DBG_IMPORTANT, "FTP Gateway timeout in SENT_PASV state");
+- // cancel the data connection setup. +- if (data.opener != NULL) { +- data.opener->cancel("timeout"); +- data.opener = NULL; +- } ++ // cancel the data connection setup, if any ++ dataConnWait.cancel("timeout"); ++
data.close();
}
+@@ -1723,7 +1721,7 @@ void
- Ftp::Gateway::dataChannelConnected(const CommConnectCbParams &io)
- {
debugs(9, 3, HERE);
+- data.opener = NULL; ++ dataConnWait.finish();
if (io.flag != Comm::OK) {
debugs(9, 2, HERE << "Failed to connect. Retrying via another method.");
+@@ -2727,9 +2725,9 @@ Ftp::Gateway::mayReadVirginReplyBody() const
return !doneWithServer();
- }
+-AsyncJob::Pointer ++void
- Ftp::StartGateway(FwdState *const fwdState)
- {
+- return AsyncJob::Start(new Ftp::Gateway(fwdState)); ++ AsyncJob::Start(new Ftp::Gateway(fwdState));
- }
+diff --git a/src/clients/FtpRelay.cc b/src/clients/FtpRelay.cc +index 8796a74c8..ce6ba3ba6 100644 +--- a/src/clients/FtpRelay.cc ++++ b/src/clients/FtpRelay.cc +@@ -739,7 +739,7 @@ void
- Ftp::Relay::dataChannelConnected(const CommConnectCbParams &io)
- {
debugs(9, 3, status());
+- data.opener = NULL; ++ dataConnWait.finish();
if (io.flag != Comm::OK) {
debugs(9, 2, "failed to connect FTP server data channel");
+@@ -804,9 +804,9 @@ Ftp::Relay::HandleStoreAbort(Relay *ftpClient)
ftpClient->dataComplete();
- }
+-AsyncJob::Pointer ++void
- Ftp::StartRelay(FwdState *const fwdState)
- {
+- return AsyncJob::Start(new Ftp::Relay(fwdState)); ++ AsyncJob::Start(new Ftp::Relay(fwdState));
- }
+diff --git a/src/clients/HttpTunneler.cc b/src/clients/HttpTunneler.cc +index 023a8586c..67a52e662 100644 +--- a/src/clients/HttpTunneler.cc ++++ b/src/clients/HttpTunneler.cc +@@ -101,6 +101,11 @@ void
- Http::Tunneler::handleConnectionClosure(const CommCloseCbParams ¶ms)
- {
closer = nullptr;
++ if (connection) { ++ countFailingConnection(); ++ connection->noteClosure(); ++ connection = nullptr; ++ }
bailWith(new ErrorState(ERR_CONNECT_FAIL, Http::scBadGateway, request.getRaw(), al));
- }
+@@ -360,50 +365,64 @@ Http::Tunneler::bailWith(ErrorState *error)
Must(error);
answer().squidError = error;
+- if (const auto p = connection->getPeer()) +- peerConnectFailed(p); ++ if (const auto failingConnection = connection) { ++ // TODO: Reuse to-peer connections after a CONNECT error response. ++ countFailingConnection(); ++ disconnect(); ++ failingConnection->close(); ++ }
callBack();
+- disconnect(); +- +- if (noteFwdPconnUse) +- fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses); +- // TODO: Reuse to-peer connections after a CONNECT error response. +- connection->close(); +- connection = nullptr;
- }
- void
- Http::Tunneler::sendSuccess()
- {
assert(answer().positive());
+- callBack(); ++ assert(Comm::IsConnOpen(connection)); ++ answer().conn = connection;
disconnect();
++ callBack(); ++} ++ ++void ++Http::Tunneler::countFailingConnection() ++{ ++ assert(connection); ++ if (const auto p = connection->getPeer()) ++ peerConnectFailed(p); ++ if (noteFwdPconnUse && connection->isOpen()) ++ fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses);
- }
- void
- Http::Tunneler::disconnect()
- {
++ const auto stillOpen = Comm::IsConnOpen(connection); ++
if (closer) {
+- comm_remove_close_handler(connection->fd, closer); ++ if (stillOpen) ++ comm_remove_close_handler(connection->fd, closer);
closer = nullptr;
}
if (reader) {
+- Comm::ReadCancel(connection->fd, reader); ++ if (stillOpen) ++ Comm::ReadCancel(connection->fd, reader);
reader = nullptr;
}
+- // remove connection timeout handler +- commUnsetConnTimeout(connection); ++ if (stillOpen) ++ commUnsetConnTimeout(connection); ++ ++ connection = nullptr; // may still be open
- }
- void
- Http::Tunneler::callBack()
- {
+- debugs(83, 5, connection << status()); +- if (answer().positive()) +- answer().conn = connection; ++ debugs(83, 5, answer().conn << status()); ++ assert(!connection); // returned inside answer() or gone
auto cb = callback;
callback = nullptr;
ScheduleCallHere(cb);
+@@ -415,11 +434,10 @@ Http::Tunneler::swanSong()
AsyncJob::swanSong();
if (callback) {
+- if (requestWritten && tunnelEstablished) { ++ if (requestWritten && tunnelEstablished && Comm::IsConnOpen(connection)) {
sendSuccess();
} else {
+- // we should have bailed when we discovered the job-killing problem +- debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while establishing a CONNECT tunnel " << connection << status()); ++ // job-ending emergencies like handleStopRequest() or callException()
bailWith(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al));
}
assert(!callback);
+diff --git a/src/clients/HttpTunneler.h b/src/clients/HttpTunneler.h +index c4c096fe7..e88d17610 100644 +--- a/src/clients/HttpTunneler.h ++++ b/src/clients/HttpTunneler.h +@@ -87,6 +87,7 @@ protected:
void handleResponse(const bool eof);
void bailOnResponseError(const char *error, HttpReply *);
++private:
/// sends the given error to the initiator
void bailWith(ErrorState*);
+@@ -96,12 +97,14 @@ protected:
/// a bailWith(), sendSuccess() helper: sends results to the initiator
void callBack();
+- /// a bailWith(), sendSuccess() helper: stops monitoring the connection ++ /// stops monitoring the connection
void disconnect();
++ /// updates connection usage history before the connection is closed ++ void countFailingConnection(); ++
TunnelerAnswer &answer();
+-private:
AsyncCall::Pointer writer; ///< called when the request has been written
AsyncCall::Pointer reader; ///< called when the response should be read
AsyncCall::Pointer closer; ///< called when the connection is being closed
+diff --git a/src/clients/forward.h b/src/clients/forward.h +index 0351557da..82e5eb9bd 100644 +--- a/src/clients/forward.h ++++ b/src/clients/forward.h +@@ -28,10 +28,10 @@ namespace Ftp
- {
- /// A new FTP Gateway job
+-AsyncJobPointer StartGateway(FwdState *const fwdState); ++void StartGateway(FwdState *const fwdState);
- /// A new FTP Relay job
+-AsyncJobPointer StartRelay(FwdState *const fwdState); ++void StartRelay(FwdState *const fwdState);
- /** Construct an URI with leading / in PATH portion for use by CWD command
- possibly others. FTP encodes absolute paths as beginning with '/'
+diff --git a/src/comm.cc b/src/comm.cc +index 54ba16271..a0913f324 100644 +--- a/src/comm.cc ++++ b/src/comm.cc +@@ -743,6 +743,10 @@ commCallCloseHandlers(int fd)
// If call is not canceled schedule it for execution else ignore it
if (!call->canceled()) {
debugs(5, 5, "commCallCloseHandlers: ch->handler=" << call);
++ // XXX: Without the following code, callback fd may be -1. ++ // typedef CommCloseCbParams Params; ++ // auto ¶ms = GetCommParams<Params>(call); ++ // params.fd = fd;
ScheduleCallHere(call);
}
}
+@@ -1787,6 +1791,10 @@ DeferredReadManager::CloseHandler(const CommCloseCbParams ¶ms)
CbDataList<DeferredRead> *temp = (CbDataList<DeferredRead> *)params.data;
temp->element.closer = NULL;
++ if (temp->element.theRead.conn) { ++ temp->element.theRead.conn->noteClosure(); ++ temp->element.theRead.conn = nullptr; ++ }
temp->element.markCancelled();
- }
+@@ -1860,6 +1868,11 @@ DeferredReadManager::kickARead(DeferredRead const &aRead)
if (aRead.cancelled)
return;
++ // TODO: This check still allows theReader call with a closed theRead.conn. ++ // If a delayRead() caller has a close connection handler, then such a call ++ // would be useless and dangerous. If a delayRead() caller does not have it, ++ // then the caller will get stuck when an external connection closure makes ++ // aRead.cancelled (checked above) true.
if (Comm::IsConnOpen(aRead.theRead.conn) && fd_table[aRead.theRead.conn->fd].closing())
return;
+diff --git a/src/comm/ConnOpener.cc b/src/comm/ConnOpener.cc +index dc376b778..19c1237ae 100644 +--- a/src/comm/ConnOpener.cc ++++ b/src/comm/ConnOpener.cc +@@ -41,6 +41,14 @@ Comm::ConnOpener::ConnOpener(const Comm::ConnectionPointer &c, const AsyncCall::
deadline_(squid_curtime + static_cast<time_t>(ctimeout))
- {
debugs(5, 3, "will connect to " << c << " with " << ctimeout << " timeout");
++ assert(conn_); // we know where to go ++ ++ // Sharing a being-modified Connection object with the caller is dangerous, ++ // but we cannot ban (or even check for) that using existing APIs. We do not ++ // want to clone "just in case" because cloning is a bit expensive, and most ++ // callers already have a non-owned Connection object to give us. Until the ++ // APIs improve, we can only check that the connection is not open. ++ assert(!conn_->isOpen());
- }
- Comm::ConnOpener::~ConnOpener()
+@@ -78,6 +86,10 @@ Comm::ConnOpener::swanSong()
if (temporaryFd_ >= 0)
closeFd();
++ // did we abort while owning an open connection? ++ if (conn_ && conn_->isOpen()) ++ conn_->close(); ++
// did we abort while waiting between retries?
if (calls_.sleep_)
cancelSleep();
+@@ -131,9 +143,18 @@ Comm::ConnOpener::sendAnswer(Comm::Flag errFlag, int xerrno, const char *why)
" [" << callback_->id << ']' );
// TODO save the pconn to the pconnPool ?
} else {
++ assert(conn_); ++ ++ // free resources earlier and simplify recipients ++ if (errFlag != Comm::OK) ++ conn_->close(); // may not be opened ++ else ++ assert(conn_->isOpen()); ++
typedef CommConnectCbParams Params;
Params ¶ms = GetCommParams<Params>(callback_);
params.conn = conn_;
++ conn_ = nullptr; // release ownership; prevent closure by us
params.flag = errFlag;
params.xerrno = xerrno;
ScheduleCallHere(callback_);
+@@ -152,7 +173,7 @@ Comm::ConnOpener::sendAnswer(Comm::Flag errFlag, int xerrno, const char *why)
- void
- Comm::ConnOpener::cleanFd()
- {
+- debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_); ++ debugs(5, 4, conn_ << "; temp FD " << temporaryFd_);
Must(temporaryFd_ >= 0);
fde &f = fd_table[temporaryFd_];
+@@ -258,6 +279,7 @@ bool
- Comm::ConnOpener::createFd()
- {
Must(temporaryFd_ < 0);
++ assert(conn_);
// our initators signal abort by cancelling their callbacks
if (callback_ == NULL || callback_->canceled())
+diff --git a/src/comm/ConnOpener.h b/src/comm/ConnOpener.h +index 329d8a3c8..e1e60649d 100644 +--- a/src/comm/ConnOpener.h ++++ b/src/comm/ConnOpener.h +@@ -19,16 +19,13 @@
- namespace Comm
- {
+-/** +- * Async-opener of a Comm connection. +- */ ++/// Asynchronously opens a TCP connection. Returns CommConnectCbParams: either ++/// Comm::OK with an open connection or another Comm::Flag with a closed one.
- class ConnOpener : public AsyncJob
- {
CBDATA_CLASS(ConnOpener);
- public:
+- void noteAbort() { mustStop("externally aborted"); } +-
typedef CbcPointer<ConnOpener> Pointer;
virtual bool doneAll() const;
+diff --git a/src/comm/Connection.cc b/src/comm/Connection.cc +index 4e9719a01..9bc08da05 100644 +--- a/src/comm/Connection.cc ++++ b/src/comm/Connection.cc +@@ -7,6 +7,7 @@
- */
- #include "squid.h"
++#include "base/JobWait.h"
- #include "CachePeer.h"
- #include "cbdata.h"
- #include "comm.h"
+@@ -60,26 +61,44 @@ Comm::Connection::~Connection()
- }
- Comm::ConnectionPointer
+-Comm::Connection::cloneDestinationDetails() const ++Comm::Connection::cloneProfile() const
- {
+- const ConnectionPointer c = new Comm::Connection; +- c->setAddrs(local, remote); +- c->peerType = peerType; +- c->flags = flags; +- c->peer_ = cbdataReference(getPeer()); +- assert(!c->isOpen()); +- return c; +-} ++ const ConnectionPointer clone = new Comm::Connection; ++ auto &c = *clone; // optimization ++ ++ /* ++ * Copy or excuse each data member. Excused members do not belong to a ++ * Connection configuration profile because their values cannot be reused ++ * across (co-existing) Connection objects and/or are tied to their own ++ * object lifetime. ++ */ ++ ++ c.setAddrs(local, remote); ++ c.peerType = peerType; ++ // fd excused ++ c.tos = tos; ++ c.nfmark = nfmark; ++ c.nfConnmark = nfConnmark; ++ // COMM_ORPHANED is not a part of connection opening instructions ++ c.flags = flags & ~COMM_ORPHANED; ++ // rfc931 is excused ++ ++#if USE_SQUID_EUI ++ // These are currently only set when accepting connections and never used ++ // for establishing new ones, so this copying is currently in vain, but, ++ // technically, they can be a part of connection opening instructions. ++ c.remoteEui48 = remoteEui48; ++ c.remoteEui64 = remoteEui64; ++#endif
+-Comm::ConnectionPointer +-Comm::Connection::cloneIdentDetails() const +-{ +- auto c = cloneDestinationDetails(); +- c->tos = tos; +- c->nfmark = nfmark; +- c->nfConnmark = nfConnmark; +- c->startTime_ = startTime_; +- return c; ++ // id excused ++ c.peer_ = cbdataReference(getPeer()); ++ // startTime_ excused ++ // tlsHistory excused ++ ++ debugs(5, 5, this << " made " << c); ++ assert(!c.isOpen()); ++ return clone;
- }
- void
+diff --git a/src/comm/Connection.h b/src/comm/Connection.h +index 5b66943ba..40c22491d 100644 +--- a/src/comm/Connection.h ++++ b/src/comm/Connection.h +@@ -77,13 +77,12 @@ public:
/** Clear the connection properties and close any open socket. */
virtual ~Connection();
+- /// Create a new (closed) IDENT Connection object based on our from-Squid +- /// connection properties. +- ConnectionPointer cloneIdentDetails() const; ++ /// To prevent accidental copying of Connection objects that we started to ++ /// open or that are open, use cloneProfile() instead. ++ Connection(const Connection &&) = delete;
+- /// Create a new (closed) Connection object pointing to the same destination +- /// as this from-Squid connection. +- ConnectionPointer cloneDestinationDetails() const; ++ /// Create a new closed Connection with the same configuration as this one. ++ ConnectionPointer cloneProfile() const;
/// close the still-open connection when its last reference is gone
void enterOrphanage() { flags |= COMM_ORPHANED; }
+@@ -140,17 +139,6 @@ public:
virtual ScopedId codeContextGist() const override;
virtual std::ostream &detailCodeContext(std::ostream &os) const override;
+-private: +- /** These objects may not be exactly duplicated. Use cloneIdentDetails() or +- * cloneDestinationDetails() instead. +- */ +- Connection(const Connection &c); +- +- /** These objects may not be exactly duplicated. Use cloneIdentDetails() or +- * cloneDestinationDetails() instead. +- */ +- Connection & operator =(const Connection &c); +-
- public:
/** Address/Port for the Squid end of a TCP link. */
Ip::Address local;
+diff --git a/src/comm/TcpAcceptor.cc b/src/comm/TcpAcceptor.cc +index 341622e0f..510efa94f 100644 +--- a/src/comm/TcpAcceptor.cc ++++ b/src/comm/TcpAcceptor.cc +@@ -206,7 +206,10 @@ void
- Comm::TcpAcceptor::handleClosure(const CommCloseCbParams &)
- {
closer_ = NULL;
+- conn = NULL; ++ if (conn) { ++ conn->noteClosure(); ++ conn = nullptr; ++ }
Must(done());
- }
+diff --git a/src/dns_internal.cc b/src/dns_internal.cc +index e1f4d3ee7..72078c64e 100644 +--- a/src/dns_internal.cc ++++ b/src/dns_internal.cc +@@ -872,6 +872,10 @@ static void
- idnsVCClosed(const CommCloseCbParams ¶ms)
- {
nsvc * vc = (nsvc *)params.data;
++ if (vc->conn) { ++ vc->conn->noteClosure(); ++ vc->conn = nullptr; ++ }
delete vc;
- }
+diff --git a/src/eui/Eui48.h b/src/eui/Eui48.h +index 11f4e51b1..9aab5bbb7 100644 +--- a/src/eui/Eui48.h ++++ b/src/eui/Eui48.h +@@ -29,10 +29,8 @@ class Eui48
- public:
Eui48() { clear(); }
+- Eui48(const Eui48 &t) { memcpy(this, &t, sizeof(Eui48)); }
bool operator== (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) == 0; }
bool operator< (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) < 0; }
+- ~Eui48() {}
const unsigned char *get(void);
+diff --git a/src/gopher.cc b/src/gopher.cc +index 455220c2e..576a3f7b1 100644 +--- a/src/gopher.cc ++++ b/src/gopher.cc +@@ -98,11 +98,8 @@ public:
entry->lock("gopherState");
*replybuf = 0;
}
+- ~GopherStateData() {if(buf) swanSong();}
+- /* AsyncJob API emulated */ +- void deleteThis(const char *aReason); +- void swanSong(); ++ ~GopherStateData();
- public:
StoreEntry *entry;
+@@ -156,30 +153,18 @@ static void
- gopherStateFree(const CommCloseCbParams ¶ms)
- {
GopherStateData *gopherState = (GopherStateData *)params.data;
+- +- if (gopherState == NULL) +- return; +- +- gopherState->deleteThis("gopherStateFree"); ++ // Assume that FwdState is monitoring and calls noteClosure(). See XXX about ++ // Connection sharing with FwdState in gopherStart(). ++ delete gopherState;
- }
+-void +-GopherStateData::deleteThis(const char *) +-{ +- swanSong(); +- delete this; +-} +- +-void +-GopherStateData::swanSong() ++GopherStateData::~GopherStateData()
- {
if (entry)
entry->unlock("gopherState");
+- if (buf) { ++ if (buf)
memFree(buf, MEM_4K_BUF);
+- buf = nullptr; +- }
- }
- /**
+@@ -986,6 +971,7 @@ gopherStart(FwdState * fwd)
return;
}
++ // XXX: Sharing open Connection with FwdState that has its own handlers/etc.
gopherState->serverConn = fwd->serverConnection();
gopherSendRequest(fwd->serverConnection()->fd, gopherState);
AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "gopherTimeout",
+diff --git a/src/ident/Ident.cc b/src/ident/Ident.cc +index 50dc6baeb..3c3ae5e2f 100644 +--- a/src/ident/Ident.cc ++++ b/src/ident/Ident.cc +@@ -11,6 +11,7 @@
- #include "squid.h"
- #if USE_IDENT
++#include "base/JobWait.h"
- #include "comm.h"
- #include "comm/Connection.h"
- #include "comm/ConnOpener.h"
+@@ -53,8 +54,15 @@ public:
Comm::ConnectionPointer conn;
MemBuf queryMsg; ///< the lookup message sent to IDENT server
+- IdentClient *clients; ++ IdentClient *clients = nullptr;
char buf[IDENT_BUFSIZE];
++ ++ /// waits for a connection to the IDENT server to be established/opened ++ JobWaitComm::ConnOpener connWait; ++ ++private: ++ // use deleteThis() to destroy ++ ~IdentStateData();
- };
- CBDATA_CLASS_INIT(IdentStateData);
+@@ -73,8 +81,9 @@ static void ClientAdd(IdentStateData * state, IDCB * callback, void *callback_da
- Ident::IdentConfig Ident::TheConfig;
- void
+-Ident::IdentStateData::deleteThis(const char *) ++Ident::IdentStateData::deleteThis(const char *reason)
- {
++ debugs(30, 3, reason);
swanSong();
delete this;
- }
+@@ -84,6 +93,10 @@ Ident::IdentStateData::swanSong()
- {
if (clients != NULL)
notify(NULL);
++} ++ ++Ident::IdentStateData::~IdentStateData() { ++ assert(!clients);
if (Comm::IsConnOpen(conn)) {
comm_remove_close_handler(conn->fd, Ident::Close, this);
+@@ -112,6 +125,10 @@ void
- Ident::Close(const CommCloseCbParams ¶ms)
- {
IdentStateData *state = (IdentStateData *)params.data;
++ if (state->conn) { ++ state->conn->noteClosure(); ++ state->conn = nullptr; ++ }
state->deleteThis("connection closed");
- }
+@@ -127,6 +144,16 @@ void
- Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int, void *data)
- {
IdentStateData *state = (IdentStateData *)data;
++ state->connWait.finish(); ++ ++ // Start owning the supplied connection (so that it is not orphaned if this ++ // function bails early). As a (tiny) optimization or perhaps just diff ++ // minimization, the close handler is added later, when we know we are not ++ // bailing. This delay is safe because comm_remove_close_handler() forgives ++ // missing handlers. ++ assert(conn); // but may be closed ++ assert(!state->conn); ++ state->conn = conn;
if (status != Comm::OK) {
if (status == Comm::TIMEOUT)
+@@ -149,8 +176,8 @@ Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int,
return;
}
+- assert(conn != NULL && conn == state->conn); +- comm_add_close_handler(conn->fd, Ident::Close, state); ++ assert(state->conn->isOpen()); ++ comm_add_close_handler(state->conn->fd, Ident::Close, state);
AsyncCall::Pointer writeCall = commCbCall(5,4, "Ident::WriteFeedback",
CommIoCbPtrFun(Ident::WriteFeedback, state));
+@@ -259,10 +286,10 @@ Ident::Start(const Comm::ConnectionPointer &conn, IDCB * callback, void *data)
state->hash.key = xstrdup(key);
// copy the conn details. We do not want the original FD to be re-used by IDENT.
+- state->conn = conn->cloneIdentDetails(); ++ const auto identConn = conn->cloneProfile();
// NP: use random port for secure outbound to IDENT_PORT
+- state->conn->local.port(0); +- state->conn->remote.port(IDENT_PORT); ++ identConn->local.port(0); ++ identConn->remote.port(IDENT_PORT);
// build our query from the original connection details
state->queryMsg.init();
+@@ -272,7 +299,8 @@ Ident::Start(const Comm::ConnectionPointer &conn, IDCB * callback, void *data)
hash_join(ident_hash, &state->hash);
AsyncCall::Pointer call = commCbCall(30,3, "Ident::ConnectDone", CommConnectCbPtrFun(Ident::ConnectDone, state));
+- AsyncJob::Start(new Comm::ConnOpener(state->conn, call, Ident::TheConfig.timeout)); ++ const auto connOpener = new Comm::ConnOpener(identConn, call, Ident::TheConfig.timeout); ++ state->connWait.start(connOpener, call);
- }
- void
+diff --git a/src/log/TcpLogger.cc b/src/log/TcpLogger.cc +index 2be2f6aea..3be1cc5db 100644 +--- a/src/log/TcpLogger.cc ++++ b/src/log/TcpLogger.cc +@@ -256,13 +256,16 @@ Log::TcpLogger::doConnect()
typedef CommCbMemFunT<TcpLogger, CommConnectCbParams> Dialer;
AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, this, Log::TcpLogger::connectDone);
+- AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2)); ++ const auto cs = new Comm::ConnOpener(futureConn, call, 2); ++ connWait.start(cs, call);
- }
- /// Comm::ConnOpener callback
- void
- Log::TcpLogger::connectDone(const CommConnectCbParams ¶ms)
- {
++ connWait.finish(); ++
if (params.flag != Comm::OK) {
const double delay = 0.5; // seconds
if (connectFailures++ % 100 == 0) {
+@@ -367,7 +370,10 @@ Log::TcpLogger::handleClosure(const CommCloseCbParams &)
- {
assert(inCall != NULL);
closer = NULL;
+- conn = NULL; ++ if (conn) { ++ conn->noteClosure(); ++ conn = nullptr; ++ }
// in all current use cases, we should not try to reconnect
mustStop("Log::TcpLogger::handleClosure");
- }
+diff --git a/src/log/TcpLogger.h b/src/log/TcpLogger.h +index 1be2113fe..ec7ca5f7b 100644 +--- a/src/log/TcpLogger.h ++++ b/src/log/TcpLogger.h +@@ -10,6 +10,8 @@
- #define _SQUID_SRC_LOG_TCPLOGGER_H
- #include "base/AsyncJob.h"
++#include "base/JobWait.h" ++#include "comm/forward.h"
- #include "ip/Address.h"
- #include <list>
+@@ -103,6 +105,9 @@ private:
Ip::Address remote; ///< where the remote logger expects our records
AsyncCall::Pointer closer; ///< handles unexpected/external conn closures
++ /// waits for a connection to the remote logger to be established/opened ++ JobWaitComm::ConnOpener connWait; ++
uint64_t connectFailures; ///< number of sequential connection failures
uint64_t drops; ///< number of records dropped during the current outage
- };
+diff --git a/src/mgr/Forwarder.cc b/src/mgr/Forwarder.cc +index 8edba1d78..e4da4ca08 100644 +--- a/src/mgr/Forwarder.cc ++++ b/src/mgr/Forwarder.cc +@@ -100,7 +100,11 @@ void
- Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &)
- {
debugs(16, 5, HERE);
+- conn = NULL; // needed? ++ closer = nullptr; ++ if (conn) { ++ conn->noteClosure(); ++ conn = nullptr; ++ }
mustStop("commClosed");
- }
+diff --git a/src/mgr/Inquirer.cc b/src/mgr/Inquirer.cc +index ba41d7aa6..dea0452de 100644 +--- a/src/mgr/Inquirer.cc ++++ b/src/mgr/Inquirer.cc +@@ -107,11 +107,14 @@ Mgr::Inquirer::noteWroteHeader(const CommIoCbParams& params)
- /// called when the HTTP client or some external force closed our socket
- void
+-Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params) ++Mgr::Inquirer::noteCommClosed(const CommCloseCbParams &)
- {
debugs(16, 5, HERE);
+- Must(!Comm::IsConnOpen(conn) && params.conn.getRaw() == conn.getRaw()); +- conn = NULL; ++ closer = nullptr; ++ if (conn) { ++ conn->noteClosure(); ++ conn = nullptr; ++ }
mustStop("commClosed");
- }
+diff --git a/src/mgr/StoreToCommWriter.cc b/src/mgr/StoreToCommWriter.cc +index e2cccec2f..d3cf73888 100644 +--- a/src/mgr/StoreToCommWriter.cc ++++ b/src/mgr/StoreToCommWriter.cc +@@ -131,7 +131,11 @@ void
- Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams &)
- {
debugs(16, 6, HERE);
+- Must(!Comm::IsConnOpen(clientConnection)); ++ if (clientConnection) { ++ clientConnection->noteClosure(); ++ clientConnection = nullptr; ++ } ++ closer = nullptr;
mustStop("commClosed");
- }
+diff --git a/src/security/PeerConnector.cc b/src/security/PeerConnector.cc +index 4c1401f6e..494edabb8 100644 +--- a/src/security/PeerConnector.cc ++++ b/src/security/PeerConnector.cc +@@ -89,9 +89,15 @@ Security::PeerConnector::start()
- void
- Security::PeerConnector::commCloseHandler(const CommCloseCbParams ¶ms)
- {
++ debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data); ++
closeHandler = nullptr;
++ if (serverConn) { ++ countFailingConnection(); ++ serverConn->noteClosure(); ++ serverConn = nullptr; ++ }
+- debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data);
const auto err = new ErrorState(ERR_SECURE_CONNECT_FAIL, Http::scServiceUnavailable, request.getRaw(), al);
static const auto d = MakeNamedErrorDetail("TLS_CONNECT_CLOSE");
err->detailError(d);
+@@ -111,6 +117,8 @@ Security::PeerConnector::commTimeoutHandler(const CommTimeoutCbParams &)
- bool
- Security::PeerConnector::initialize(Security::SessionPointer &serverSession)
- {
++ Must(Comm::IsConnOpen(serverConnection())); ++
Security::ContextPointer ctx(getTlsContext());
debugs(83, 5, serverConnection() << ", ctx=" << (void*)ctx.get());
+@@ -162,6 +170,8 @@ Security::PeerConnector::initialize(Security::SessionPointer &serverSession)
- void
- Security::PeerConnector::recordNegotiationDetails()
- {
++ Must(Comm::IsConnOpen(serverConnection())); ++
const int fd = serverConnection()->fd;
Security::SessionPointer session(fd_table[fd].ssl);
+@@ -180,6 +190,8 @@ Security::PeerConnector::recordNegotiationDetails()
- void
- Security::PeerConnector::negotiate()
- {
++ Must(Comm::IsConnOpen(serverConnection())); ++
const int fd = serverConnection()->fd;
if (fd_table[fd].closing())
return;
+@@ -224,7 +236,7 @@ Security::PeerConnector::handleNegotiationResult(const Security::IoResult &resul
switch (result.category) {
case Security::IoResult::ioSuccess:
recordNegotiationDetails();
+- if (sslFinalized()) ++ if (sslFinalized() && callback)
sendSuccess();
return; // we may be gone by now
+@@ -252,6 +264,7 @@ Security::PeerConnector::sslFinalized()
- {
- #if USE_OPENSSL
if (Ssl::TheConfig.ssl_crt_validator && useCertValidator_) {
++ Must(Comm::IsConnOpen(serverConnection()));
const int fd = serverConnection()->fd;
Security::SessionPointer session(fd_table[fd].ssl);
+@@ -295,6 +308,7 @@ void
- Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointer validationResponse)
- {
Must(validationResponse != NULL);
++ Must(Comm::IsConnOpen(serverConnection()));
ErrorDetail::Pointer errDetails;
bool validatorFailed = false;
+@@ -317,7 +331,8 @@ Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointe
if (!errDetails && !validatorFailed) {
noteNegotiationDone(NULL);
+- sendSuccess(); ++ if (callback) ++ sendSuccess();
return;
}
+@@ -343,6 +358,8 @@ Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointe
- Security::CertErrors *
- Security::PeerConnector::sslCrtvdCheckForErrors(Ssl::CertValidationResponse const &resp, ErrorDetail::Pointer &errDetails)
- {
++ Must(Comm::IsConnOpen(serverConnection())); ++
ACLFilledChecklist *check = NULL;
Security::SessionPointer session(fd_table[serverConnection()->fd].ssl);
+@@ -418,9 +435,11 @@ Security::PeerConnector::negotiateSsl()
- void
- Security::PeerConnector::noteWantRead()
- {
+- const int fd = serverConnection()->fd;
debugs(83, 5, serverConnection());
++ Must(Comm::IsConnOpen(serverConnection())); ++ const int fd = serverConnection()->fd; ++
// read timeout to avoid getting stuck while reading from a silent server
typedef CommCbMemFunT<Security::PeerConnector, CommTimeoutCbParams> TimeoutDialer;
AsyncCall::Pointer timeoutCall = JobCallback(83, 5,
+@@ -434,8 +453,10 @@ Security::PeerConnector::noteWantRead()
- void
- Security::PeerConnector::noteWantWrite()
- {
+- const int fd = serverConnection()->fd;
debugs(83, 5, serverConnection());
++ Must(Comm::IsConnOpen(serverConnection())); ++ ++ const int fd = serverConnection()->fd;
Comm::SetSelect(fd, COMM_SELECT_WRITE, &NegotiateSsl, new Pointer(this), 0);
return;
- }
+@@ -452,57 +473,76 @@ Security::PeerConnector::noteNegotiationError(const Security::ErrorDetailPointer
bail(anErr);
- }
++Security::EncryptorAnswer & ++Security::PeerConnector::answer() ++{ ++ assert(callback); ++ const auto dialer = dynamic_cast<CbDialer*>(callback->getDialer()); ++ assert(dialer); ++ return dialer->answer(); ++} ++
- void
- Security::PeerConnector::bail(ErrorState *error)
- {
Must(error); // or the recepient will not know there was a problem
+- Must(callback != NULL); +- CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer()); +- Must(dialer); +- dialer->answer().error = error; ++ answer().error = error;
+- if (const auto p = serverConnection()->getPeer()) +- peerConnectFailed(p); ++ if (const auto failingConnection = serverConn) { ++ countFailingConnection(); ++ disconnect(); ++ failingConnection->close(); ++ }
callBack();
+- disconnect(); +- +- if (noteFwdPconnUse) +- fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses); +- serverConn->close(); +- serverConn = nullptr;
- }
- void
- Security::PeerConnector::sendSuccess()
- {
+- callBack(); ++ assert(Comm::IsConnOpen(serverConn)); ++ answer().conn = serverConn;
disconnect();
++ callBack(); ++} ++ ++void ++Security::PeerConnector::countFailingConnection() ++{ ++ assert(serverConn); ++ if (const auto p = serverConn->getPeer()) ++ peerConnectFailed(p); ++ // TODO: Calling PconnPool::noteUses() should not be our responsibility. ++ if (noteFwdPconnUse && serverConn->isOpen()) ++ fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses);
- }
- void
- Security::PeerConnector::disconnect()
- {
++ const auto stillOpen = Comm::IsConnOpen(serverConn); ++
if (closeHandler) {
+- comm_remove_close_handler(serverConnection()->fd, closeHandler); ++ if (stillOpen) ++ comm_remove_close_handler(serverConn->fd, closeHandler);
closeHandler = nullptr;
}
+- commUnsetConnTimeout(serverConnection()); ++ if (stillOpen) ++ commUnsetConnTimeout(serverConn); ++ ++ serverConn = nullptr;
- }
- void
- Security::PeerConnector::callBack()
- {
+- debugs(83, 5, "TLS setup ended for " << serverConnection()); ++ debugs(83, 5, "TLS setup ended for " << answer().conn);
AsyncCall::Pointer cb = callback;
// Do this now so that if we throw below, swanSong() assert that we _tried_
// to call back holds.
callback = NULL; // this should make done() true
+- CbDialer *dialer = dynamic_cast<CbDialer*>(cb->getDialer()); +- Must(dialer); +- dialer->answer().conn = serverConnection();
ScheduleCallHere(cb);
- }
+@@ -511,8 +551,9 @@ Security::PeerConnector::swanSong()
- {
// XXX: unregister fd-closure monitoring and CommSetSelect interest, if any
AsyncJob::swanSong();
+- if (callback != NULL) { // paranoid: we have left the caller waiting +- debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while connecting to a cache_peer or origin server"); ++ ++ if (callback) { ++ // job-ending emergencies like handleStopRequest() or callException()
const auto anErr = new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al);
bail(anErr);
assert(!callback);
+@@ -533,7 +574,7 @@ Security::PeerConnector::status() const
buf.append("Stopped, reason:", 16);
buf.appendf("%s",stopReason);
}
+- if (serverConn != NULL) ++ if (Comm::IsConnOpen(serverConn))
buf.appendf(" FD %d", serverConn->fd);
buf.appendf(" %s%u]", id.prefix(), id.value);
buf.terminate();
+@@ -581,15 +622,18 @@ Security::PeerConnector::startCertDownloading(SBuf &url)
PeerConnectorCertDownloaderDialer(&Security::PeerConnector::certDownloadingDone, this));
const auto dl = new Downloader(url, certCallback, XactionInitiator::initCertFetcher, certDownloadNestingLevel() + 1);
+- AsyncJob::Start(dl); ++ certDownloadWait.start(dl, certCallback);
- }
- void
- Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus)
- {
++ certDownloadWait.finish(); ++
++certsDownloads;
debugs(81, 5, "Certificate downloading status: " << downloadStatus << " certificate size: " << obj.length());
++ Must(Comm::IsConnOpen(serverConnection()));
const auto &sconn = *fd_table[serverConnection()->fd].ssl;
// Parse Certificate. Assume that it is in DER format.
+@@ -642,6 +686,7 @@ Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus)
- void
- Security::PeerConnector::handleMissingCertificates(const Security::IoResult &ioResult)
- {
++ Must(Comm::IsConnOpen(serverConnection()));
auto &sconn = *fd_table[serverConnection()->fd].ssl;
// We download the missing certificate(s) once. We would prefer to clear
+diff --git a/src/security/PeerConnector.h b/src/security/PeerConnector.h +index 942f8627a..10700d796 100644 +--- a/src/security/PeerConnector.h ++++ b/src/security/PeerConnector.h +@@ -12,6 +12,7 @@
- #include "acl/Acl.h"
- #include "base/AsyncCbdataCalls.h"
- #include "base/AsyncJob.h"
++#include "base/JobWait.h"
- #include "CommCalls.h"
- #include "http/forward.h"
- #include "security/EncryptorAnswer.h"
+@@ -24,6 +25,7 @@
- #include <queue>
- class ErrorState;
++class Downloader;
- class AccessLogEntry;
- typedef RefCount<AccessLogEntry> AccessLogEntryPointer;
+@@ -152,6 +154,9 @@ protected:
/// a bail(), sendSuccess() helper: stops monitoring the connection
void disconnect();
++ /// updates connection usage history before the connection is closed ++ void countFailingConnection(); ++
/// If called the certificates validator will not used
void bypassCertValidator() {useCertValidator_ = false;}
+@@ -159,6 +164,9 @@ protected:
/// logging
void recordNegotiationDetails();
++ /// convenience method to get to the answer fields ++ EncryptorAnswer &answer(); ++
HttpRequestPointer request; ///< peer connection trigger or cause
Comm::ConnectionPointer serverConn; ///< TCP connection to the peer
AccessLogEntryPointer al; ///< info for the future access.log entry
+@@ -203,6 +211,8 @@ private:
/// outcome of the last (failed and) suspended negotiation attempt (or nil)
Security::IoResultPointer suspendedError_;
++ ++ JobWait<Downloader> certDownloadWait; ///< waits for the missing certificate to be downloaded
- };
- } // namespace Security
+diff --git a/src/security/forward.h b/src/security/forward.h +index dce66e397..26225aae0 100644 +--- a/src/security/forward.h ++++ b/src/security/forward.h +@@ -172,6 +172,7 @@ class ParsedOptions {}; // we never parse/use TLS options in this case
- typedef long ParsedPortFlags;
- class PeerConnector;
++class BlindPeerConnector;
- class PeerOptions;
- #if USE_OPENSSL
+diff --git a/src/servers/FtpServer.cc b/src/servers/FtpServer.cc +index 53f822d2f..e0a10b6b5 100644 +--- a/src/servers/FtpServer.cc ++++ b/src/servers/FtpServer.cc +@@ -61,7 +61,7 @@ Ftp::Server::Server(const MasterXaction::Pointer &xact):
dataConn(),
uploadAvailSize(0),
listener(),
+- connector(), ++ dataConnWait(),
reader(),
waitingForOrigin(false),
originDataDownloadAbortedOnError(false)
+@@ -1676,11 +1676,11 @@ Ftp::Server::checkDataConnPre()
// active transfer: open a data connection from Squid to client
typedef CommCbMemFunT<Server, CommConnectCbParams> Dialer;
+- connector = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData); +- Comm::ConnOpener *cs = new Comm::ConnOpener(dataConn, connector, +- Config.Timeout.connect); +- AsyncJob::Start(cs); +- return false; // ConnStateData::processFtpRequest waits handleConnectDone ++ AsyncCall::Pointer callback = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData); ++ const auto cs = new Comm::ConnOpener(dataConn->cloneProfile(), callback, ++ Config.Timeout.connect); ++ dataConnWait.start(cs, callback); ++ return false;
- }
- /// Check that client data connection is ready for immediate I/O.
+@@ -1698,18 +1698,22 @@ Ftp::Server::checkDataConnPost() const
- void
- Ftp::Server::connectedForData(const CommConnectCbParams ¶ms)
- {
+- connector = NULL; ++ dataConnWait.finish();
if (params.flag != Comm::OK) {
+- /* it might have been a timeout with a partially open link */ +- if (params.conn != NULL) +- params.conn->close();
setReply(425, "Cannot open data connection.");
Http::StreamPointer context = pipeline.front();
Must(context->http);
Must(context->http->storeEntry() != NULL);
++ // TODO: call closeDataConnection() to reset data conn processing?
} else {
+- Must(dataConn == params.conn); ++ // Finalize the details and start owning the supplied connection. ++ assert(params.conn); ++ assert(dataConn); ++ assert(!dataConn->isOpen()); ++ dataConn = params.conn; ++ // XXX: Missing comm_add_close_handler() to track external closures. ++
Must(Comm::IsConnOpen(params.conn));
fd_note(params.conn->fd, "active client ftp data");
}
+diff --git a/src/servers/FtpServer.h b/src/servers/FtpServer.h +index fb9ef9081..d67799827 100644 +--- a/src/servers/FtpServer.h ++++ b/src/servers/FtpServer.h +@@ -11,8 +11,10 @@
- #ifndef SQUID_SERVERS_FTP_SERVER_H
- #define SQUID_SERVERS_FTP_SERVER_H
++#include "base/JobWait.h"
- #include "base/Lock.h"
- #include "client_side.h"
++#include "comm/forward.h"
- namespace Ftp
- {
+@@ -188,7 +190,11 @@ private:
size_t uploadAvailSize; ///< number of yet unused uploadBuf bytes
AsyncCall::Pointer listener; ///< set when we are passively listening
+- AsyncCall::Pointer connector; ///< set when we are actively connecting ++ ++ /// Waits for an FTP data connection to the client to be established/opened. ++ /// This wait only happens in FTP active mode (via PORT or EPRT). ++ JobWaitComm::ConnOpener dataConnWait; ++
AsyncCall::Pointer reader; ///< set when we are reading FTP data
/// whether we wait for the origin data transfer to end
+diff --git a/src/snmp/Forwarder.cc b/src/snmp/Forwarder.cc +index 836eb5772..259bb0441 100644 +--- a/src/snmp/Forwarder.cc ++++ b/src/snmp/Forwarder.cc +@@ -53,6 +53,7 @@ Snmp::Forwarder::noteCommClosed(const CommCloseCbParams& params)
- {
debugs(49, 5, HERE);
Must(fd == params.fd);
++ closer = nullptr;
fd = -1;
mustStop("commClosed");
- }
+@@ -68,8 +69,7 @@ void
- Snmp::Forwarder::handleException(const std::exception& e)
- {
debugs(49, 3, HERE << e.what());
+- if (fd >= 0) +- sendError(SNMP_ERR_GENERR); ++ sendError(SNMP_ERR_GENERR);
Ipc::Forwarder::handleException(e);
- }
+@@ -78,6 +78,10 @@ void
- Snmp::Forwarder::sendError(int error)
- {
debugs(49, 3, HERE);
++ ++ if (fd < 0) ++ return; // client gone ++
Snmp::Request& req = static_cast<Snmp::Request&>(*request);
req.pdu.command = SNMP_PDU_RESPONSE;
req.pdu.errstat = error;
+diff --git a/src/snmp/Inquirer.cc b/src/snmp/Inquirer.cc +index 9b6e34405..82f8c4475 100644 +--- a/src/snmp/Inquirer.cc ++++ b/src/snmp/Inquirer.cc +@@ -88,7 +88,11 @@ Snmp::Inquirer::noteCommClosed(const CommCloseCbParams& params)
- {
debugs(49, 5, HERE);
Must(!Comm::IsConnOpen(conn) || conn->fd == params.conn->fd);
+- conn = NULL; ++ closer = nullptr; ++ if (conn) { ++ conn->noteClosure(); ++ conn = nullptr; ++ }
mustStop("commClosed");
- }
+@@ -102,6 +106,10 @@ void
- Snmp::Inquirer::sendResponse()
- {
debugs(49, 5, HERE);
++ ++ if (!Comm::IsConnOpen(conn)) ++ return; // client gone ++
aggrPdu.fixAggregate();
aggrPdu.command = SNMP_PDU_RESPONSE;
u_char buffer[SNMP_REQUEST_SIZE];
+diff --git a/src/ssl/PeekingPeerConnector.cc b/src/ssl/PeekingPeerConnector.cc +index 9a4055355..89e45435b 100644 +--- a/src/ssl/PeekingPeerConnector.cc ++++ b/src/ssl/PeekingPeerConnector.cc +@@ -27,18 +27,18 @@ CBDATA_NAMESPACED_CLASS_INIT(Ssl, PeekingPeerConnector);
- void switchToTunnel(HttpRequest *request, const Comm::ConnectionPointer &clientConn, const Comm::ConnectionPointer &srvConn, const SBuf &preReadServerData);
- void
+-Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data) ++Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(const Acl::Answer aclAnswer, void *data)
- {
Ssl::PeekingPeerConnector *peerConnect = (Ssl::PeekingPeerConnector *) data;
// Use job calls to add done() checks and other job logic/protections.
+- CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, answer); ++ CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, aclAnswer);
- }
- void
+-Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(Acl::Answer answer) ++Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(const Acl::Answer aclAnswer)
- {
+- const Ssl::BumpMode finalAction = answer.allowed() ? +- static_castSsl::BumpMode(answer.kind): ++ const Ssl::BumpMode finalAction = aclAnswer.allowed() ? ++ static_castSsl::BumpMode(aclAnswer.kind):
checkForPeekAndSpliceGuess();
checkForPeekAndSpliceMatched(finalAction);
- }
+@@ -106,10 +106,8 @@ Ssl::PeekingPeerConnector::checkForPeekAndSpliceMatched(const Ssl::BumpMode acti
splice = true;
// Ssl Negotiation stops here. Last SSL checks for valid certificates
// and if done, switch to tunnel mode
+- if (sslFinalized()) { +- debugs(83,5, "Abort NegotiateSSL on FD " << serverConn->fd << " and splice the connection"); ++ if (sslFinalized() && callback)
callBack();
+- }
}
- }
+@@ -272,8 +270,11 @@ Ssl::PeekingPeerConnector::startTunneling()
auto b = SSL_get_rbio(session.get());
auto srvBio = static_cast<Ssl::ServerBio*>(BIO_get_data(b));
++ debugs(83, 5, "will tunnel instead of negotiating TLS");
switchToTunnel(request.getRaw(), clientConn, serverConn, srvBio->rBufData());
+- tunnelInsteadOfNegotiating(); ++ answer().tunneled = true; ++ disconnect(); ++ callBack();
- }
- void
+@@ -397,13 +398,3 @@ Ssl::PeekingPeerConnector::serverCertificateVerified()
}
- }
+-void +-Ssl::PeekingPeerConnector::tunnelInsteadOfNegotiating() +-{ +- Must(callback != NULL); +- CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer()); +- Must(dialer); +- dialer->answer().tunneled = true; +- debugs(83, 5, "The SSL negotiation with server aborted"); +-} +- +diff --git a/src/ssl/PeekingPeerConnector.h b/src/ssl/PeekingPeerConnector.h +index 3c86b887d..0145e77f9 100644 +--- a/src/ssl/PeekingPeerConnector.h ++++ b/src/ssl/PeekingPeerConnector.h +@@ -51,7 +51,7 @@ public:
void checkForPeekAndSplice();
/// Callback function for ssl_bump acl check in step3 SSL bump step.
+- void checkForPeekAndSpliceDone(Acl::Answer answer); ++ void checkForPeekAndSpliceDone(Acl::Answer);
/// Handles the final bumping decision.
void checkForPeekAndSpliceMatched(const Ssl::BumpMode finalMode);
+@@ -67,7 +67,7 @@ public:
void startTunneling();
/// A wrapper function for checkForPeekAndSpliceDone for use with acl
+- static void cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data); ++ static void cbCheckForPeekAndSpliceDone(Acl::Answer, void *data);
- private:
+diff --git a/src/tests/stub_libcomm.cc b/src/tests/stub_libcomm.cc +index 45aa6deb2..8fdb40bbd 100644 +--- a/src/tests/stub_libcomm.cc ++++ b/src/tests/stub_libcomm.cc +@@ -22,9 +22,9 @@ void Comm::AcceptLimiter::kick() STUB
- #include "comm/Connection.h"
- Comm::Connection::Connection() STUB
- Comm::Connection::~Connection() STUB
+-Comm::ConnectionPointer Comm::Connection::cloneIdentDetails() const STUB_RETVAL(nullptr) +-Comm::ConnectionPointer Comm::Connection::cloneDestinationDetails() const STUB_RETVAL(nullptr) ++Comm::ConnectionPointer Comm::Connection::cloneProfile() const STUB_RETVAL(nullptr)
- void Comm::Connection::close() STUB
++void Comm::Connection::noteClosure() STUB
- CachePeer * Comm::Connection::getPeer() const STUB_RETVAL(NULL)
- void Comm::Connection::setPeer(CachePeer * p) STUB
- ScopedId Comm::Connection::codeContextGist() const STUB_RETVAL(id.detach())
+diff --git a/src/tests/stub_libsecurity.cc b/src/tests/stub_libsecurity.cc +index aa9cf0b36..176cf337e 100644 +--- a/src/tests/stub_libsecurity.cc ++++ b/src/tests/stub_libsecurity.cc +@@ -9,6 +9,7 @@
- #include "squid.h"
- #include "AccessLogEntry.h"
- #include "comm/Connection.h"
++#include "Downloader.h"
- #include "HttpRequest.h"
- #define STUB_API "security/libsecurity.la"
+@@ -88,7 +89,9 @@ void PeerConnector::bail(ErrorState *) STUB
- void PeerConnector::sendSuccess() STUB
- void PeerConnector::callBack() STUB
- void PeerConnector::disconnect() STUB
++void PeerConnector::countFailingConnection() STUB
- void PeerConnector::recordNegotiationDetails() STUB
++EncryptorAnswer &PeerConnector::answer() STUB_RETREF(EncryptorAnswer)
- }
- #include "security/PeerOptions.h"
+diff --git a/src/tunnel.cc b/src/tunnel.cc +index 386a0ecd6..4fc5abdef 100644 +--- a/src/tunnel.cc ++++ b/src/tunnel.cc +@@ -11,6 +11,7 @@
- #include "squid.h"
- #include "acl/FilledChecklist.h"
- #include "base/CbcPointer.h"
++#include "base/JobWait.h"
- #include "CachePeer.h"
- #include "cbdata.h"
- #include "client_side.h"
+@@ -180,9 +181,6 @@ public:
SBuf preReadClientData;
SBuf preReadServerData;
time_t startTime; ///< object creation time, before any peer selection/connection attempts
+- /// Whether we are waiting for the CONNECT request/response exchange with the peer. +- bool waitingForConnectExchange; +- HappyConnOpenerPointer connOpener; ///< current connection opening job
ResolvedPeersPointer destinations; ///< paths for forwarding the request
bool destinationsFound; ///< At least one candidate path found
/// whether another destination may be still attempted if the TCP connection
+@@ -191,10 +189,15 @@ public:
// TODO: remove after fixing deferred reads in TunnelStateData::copyRead()
CodeContext::Pointer codeContext; ///< our creator context
+- // AsyncCalls which we set and may need cancelling. +- struct { +- AsyncCall::Pointer connector; ///< a call linking us to the ConnOpener producing serverConn. +- } calls; ++ /// waits for a transport connection to the peer to be established/opened ++ JobWait<HappyConnOpener> transportWait; ++ ++ /// waits for the established transport connection to be secured/encrypted ++ JobWaitSecurity::PeerConnector encryptionWait; ++ ++ /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated ++ /// over the (encrypted, if needed) transport connection to that cache_peer ++ JobWaitHttp::Tunneler peerWait;
void copyRead(Connection &from, IOCB *completion);
+@@ -212,12 +215,6 @@ public:
/// when all candidate destinations have been tried and all have failed
void noteConnection(HappyConnOpenerAnswer &);
+- /// whether we are waiting for HappyConnOpener +- /// same as calls.connector but may differ from connOpener.valid() +- bool opening() const { return connOpener.set(); } +- +- void cancelOpening(const char *reason); +-
/// Start using an established connection
void connectDone(const Comm::ConnectionPointer &conn, const char *origin, const bool reused);
+@@ -266,6 +263,9 @@ private:
/// \returns whether the request should be retried (nil) or the description why it should not
const char *checkRetry();
++ /// whether the successfully selected path destination or the established ++ /// server connection is still in use ++ bool usingDestination() const;
/// details of the "last tunneling attempt" failure (if it failed)
ErrorState *savedError = nullptr;
+@@ -275,6 +275,8 @@ private:
void deleteThis();
++ void cancelStep(const char *reason); ++
- public:
bool keepGoingAfterRead(size_t len, Comm::Flag errcode, int xerrno, Connection &from, Connection &to);
void copy(size_t len, Connection &from, Connection &to, IOCB *);
+@@ -357,7 +359,6 @@ TunnelStateData::deleteThis()
- TunnelStateData::TunnelStateData(ClientHttpRequest *clientRequest) :
startTime(squid_curtime),
+- waitingForConnectExchange(false),
destinations(new ResolvedPeers()),
destinationsFound(false),
retriable(true),
+@@ -390,8 +391,7 @@ TunnelStateData::~TunnelStateData()
debugs(26, 3, "TunnelStateData destructed this=" << this);
assert(noConnections());
xfree(url);
+- if (opening()) +- cancelOpening("~TunnelStateData"); ++ cancelStep("~TunnelStateData");
delete savedError;
- }
+@@ -904,7 +904,10 @@ TunnelStateData::copyServerBytes()
- static void
- tunnelStartShoveling(TunnelStateData *tunnelState)
- {
+- assert(!tunnelState->waitingForConnectExchange); ++ assert(!tunnelState->transportWait); ++ assert(!tunnelState->encryptionWait); ++ assert(!tunnelState->peerWait); ++
assert(tunnelState->server.conn);
AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
CommTimeoutCbPtrFun(tunnelTimeout, tunnelState));
+@@ -962,6 +965,7 @@ tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *, size_t len
- void
- TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
- {
++ peerWait.finish();
server.len = 0;
if (logTag_ptr)
+@@ -970,13 +974,11 @@ TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
if (answer.peerResponseStatus != Http::scNone)
*status_ptr = answer.peerResponseStatus;
+- waitingForConnectExchange = false; +-
auto sawProblem = false;
if (!answer.positive()) {
sawProblem = true;
+- Must(!Comm::IsConnOpen(answer.conn)); ++ assert(!answer.conn);
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
sawProblem = true;
closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone");
+@@ -1042,8 +1044,7 @@ tunnelErrorComplete(int fd/*const Comm::ConnectionPointer &*/, void *data, size_
- void
- TunnelStateData::noteConnection(HappyConnOpener::Answer &answer)
- {
+- calls.connector = nullptr; +- connOpener.clear(); ++ transportWait.finish();
ErrorState *error = nullptr;
if ((error = answer.error.get())) {
+@@ -1167,7 +1168,7 @@ TunnelStateData::secureConnectionToPeer(const Comm::ConnectionPointer &conn)
AsyncCall::Pointer callback = asyncCall(5,4, "TunnelStateData::noteSecurityPeerConnectorAnswer",
MyAnswerDialer(&TunnelStateData::noteSecurityPeerConnectorAnswer, this));
const auto connector = new Security::BlindPeerConnector(request, conn, callback, al);
+- AsyncJob::Start(connector); // will call our callback ++ encryptionWait.start(connector, callback);
- }
- /// starts a preparation step for an established connection; retries on failures
+@@ -1194,9 +1195,12 @@ TunnelStateData::advanceDestination(const char *stepDescription, const Comm::Con
- void
- TunnelStateData::noteSecurityPeerConnectorAnswer(Security::EncryptorAnswer &answer)
- {
++ encryptionWait.finish(); ++
ErrorState *error = nullptr;
++ assert(!answer.tunneled);
if ((error = answer.error.get())) {
+- Must(!Comm::IsConnOpen(answer.conn)); ++ assert(!answer.conn);
answer.error.clear();
} else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request.getRaw(), al);
+@@ -1223,8 +1227,6 @@ TunnelStateData::connectedToPeer(const Comm::ConnectionPointer &conn)
- void
- TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
- {
+- assert(!waitingForConnectExchange); +-
AsyncCall::Pointer callback = asyncCall(5,4,
"TunnelStateData::tunnelEstablishmentDone",
Http::Tunneler::CbDialer<TunnelStateData>(&TunnelStateData::tunnelEstablishmentDone, this));
+@@ -1232,9 +1234,7 @@ TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
- #if USE_DELAY_POOLS
tunneler->setDelayId(server.delayId);
- #endif
+- AsyncJob::Start(tunneler); +- waitingForConnectExchange = true; +- // and wait for the tunnelEstablishmentDone() call ++ peerWait.start(tunneler, callback);
- }
- void
+@@ -1252,14 +1252,14 @@ TunnelStateData::noteDestination(Comm::ConnectionPointer path)
destinations->addPath(path);
+- if (Comm::IsConnOpen(server.conn)) { ++ if (usingDestination()) {
// We are already using a previously opened connection but also
// receiving destinations in case we need to re-forward.
+- Must(!opening()); ++ Must(!transportWait);
return;
}
+- if (opening()) { ++ if (transportWait) {
notifyConnOpener();
return; // and continue to wait for tunnelConnectDone() callback
}
+@@ -1289,17 +1289,23 @@ TunnelStateData::noteDestinationsEnd(ErrorState *selectionError)
// if all of them fail, tunneling as whole will fail
Must(!selectionError); // finding at least one path means selection succeeded
+- if (Comm::IsConnOpen(server.conn)) { ++ if (usingDestination()) {
// We are already using a previously opened connection but also
// receiving destinations in case we need to re-forward.
+- Must(!opening()); ++ Must(!transportWait);
return;
}
+- Must(opening()); // or we would be stuck with nothing to do or wait for ++ Must(transportWait); // or we would be stuck with nothing to do or wait for
notifyConnOpener();
- }
++bool ++TunnelStateData::usingDestination() const ++{ ++ return encryptionWait || peerWait || Comm::IsConnOpen(server.conn); ++} ++
- /// remembers an error to be used if there will be no more connection attempts
- void
- TunnelStateData::saveError(ErrorState *error)
+@@ -1320,8 +1326,7 @@ TunnelStateData::sendError(ErrorState *finalError, const char *reason)
if (request)
request->hier.stopPeerClock(false);
+- if (opening()) +- cancelOpening(reason); ++ cancelStep(reason);
assert(finalError);
+@@ -1339,18 +1344,15 @@ TunnelStateData::sendError(ErrorState *finalError, const char *reason)
errorSend(client.conn, finalError);
- }
+-/// Notify connOpener that we no longer need connections. We do not have to do +-/// this -- connOpener would eventually notice on its own, but notifying reduces +-/// waste and speeds up spare connection opening for other transactions (that +-/// could otherwise wait for this transaction to use its spare allowance). ++/// Notify a pending subtask, if any, that we no longer need its help. We do not ++/// have to do this -- the subtask job will eventually end -- but ending it ++/// earlier reduces waste and may reduce DoS attack surface.
- void
+-TunnelStateData::cancelOpening(const char *reason) ++TunnelStateData::cancelStep(const char *reason)
- {
+- assert(calls.connector); +- calls.connector->cancel(reason); +- calls.connector = nullptr; +- notifyConnOpener(); +- connOpener.clear(); ++ transportWait.cancel(reason); ++ encryptionWait.cancel(reason); ++ peerWait.cancel(reason);
- }
- void
+@@ -1360,15 +1362,14 @@ TunnelStateData::startConnecting()
request->hier.startPeerClock();
assert(!destinations->empty());
+- assert(!opening()); +- calls.connector = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this)); +- const auto cs = new HappyConnOpener(destinations, calls.connector, request, startTime, 0, al); ++ assert(!usingDestination()); ++ AsyncCall::Pointer callback = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this)); ++ const auto cs = new HappyConnOpener(destinations, callback, request, startTime, 0, al);
cs->setHost(request->url.host());
cs->setRetriable(false);
cs->allowPersistent(false);
destinations->notificationPending = true; // start() is async
+- connOpener = cs; +- AsyncJob::Start(cs); ++ transportWait.start(cs, callback);
- }
- /// send request on an existing connection dedicated to the requesting client
+@@ -1417,7 +1418,7 @@ TunnelStateData::Connection::setDelayId(DelayId const &newDelay)
- #endif
+-/// makes sure connOpener knows that destinations have changed ++/// makes sure connection opener knows that the destinations have changed
- void
- TunnelStateData::notifyConnOpener()
- {
+@@ -1425,7 +1426,7 @@ TunnelStateData::notifyConnOpener()
debugs(17, 7, "reusing pending notification");
} else {
destinations->notificationPending = true;
+- CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange); ++ CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange);
}
- }
+diff --git a/src/whois.cc b/src/whois.cc +index 38dfbacde..306ea5c62 100644 +--- a/src/whois.cc ++++ b/src/whois.cc +@@ -182,6 +182,7 @@ whoisClose(const CommCloseCbParams ¶ms)
- {
WhoisState *p = (WhoisState *)params.data;
debugs(75, 3, "whoisClose: FD " << params.fd);
++ // We do not own a Connection. Assume that FwdState is also monitoring.
p->entry->unlock("whoisClose");
delete p;
- }