Please note: One day after squid 5.4 was released, this extensive patch followed which I publish separately. It seems to be important - perhaps we should consider taking this one in account.
Building and testing was ok - so far no problems or crashes occurred. Running normal.
Signed-off-by: Matthias Fischer matthias.fischer@ipfire.org --- lfs/squid | 1 + ...estinationsEnd_exception_opening_967.patch | 3825 +++++++++++++++++ 2 files changed, 3826 insertions(+) create mode 100644 src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch
diff --git a/lfs/squid b/lfs/squid index 034f4a3e9..6762a0218 100644 --- a/lfs/squid +++ b/lfs/squid @@ -78,6 +78,7 @@ $(TARGET) : $(patsubst %,$(DIR_DL)/%,$(objects)) @rm -rf $(DIR_APP) && cd $(DIR_SRC) && tar xaf $(DIR_DL)/$(DL_FILE)
cd $(DIR_APP) && patch -Np1 < $(DIR_SRC)/src/patches/squid/01_squid-gcc11.patch + cd $(DIR_APP) && patch -Np1 < $(DIR_SRC)/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch
cd $(DIR_APP) && autoreconf -vfi cd $(DIR_APP)/libltdl && autoreconf -vfi diff --git a/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch b/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch new file mode 100644 index 000000000..9f592c7e8 --- /dev/null +++ b/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch @@ -0,0 +1,3825 @@ +commit 1332f8d606485b5a2f57277634c2f6f7855bd9a6 (refs/remotes/origin/v5, refs/remotes/github/v5, refs/heads/v5) +Author: Alex Rousskov rousskov@measurement-factory.com +Date: 2022-02-08 03:56:43 -0500 + + Bug 5055: FATAL FwdState::noteDestinationsEnd exception: opening (#967) + + * Bug 5055: FATAL FwdState::noteDestinationsEnd exception: opening + + The bug was caused by commit 25b0ce4. Other known symptoms are: + + assertion failed: store.cc:1793: "isEmpty()" + assertion failed: FwdState.cc:501: "serverConnection() == conn" + assertion failed: FwdState.cc:1037: "!opening()" + + This change has several overlapping parts. Unfortunately, merging + individual parts is both difficult and likely to cause crashes. + + ## Part 1: Bug 5055. + + FwdState used to check serverConn to decide whether to open a connection + to forward the request. Since commit 25b0ce4, a nil serverConn pointer + no longer implies that a new connection should be opened: FwdState + helper jobs may already be working on preparing an existing open + connection (e.g., sending a CONNECT request or negotiating encryption). + + Bad serverConn checks in both FwdState::noteDestination() and + FwdState::noteDestinationsEnd() methods led to extra connectStart() + calls creating two conflicting concurrent helper jobs. + + To fix this, we replaced direct serverConn inspection with a + usingDestination() call which also checks whether we are waiting for a + helper job. Testing that fix exposed another set of bugs: The helper job + pointers or in-job connections left stale/set after forwarding failures. + The changes described below addressed (most of) those problems. + + ## Part 2: Connection establishing helper jobs and their callbacks + + A proper fix for Bug 5055 required answering a difficult question: When + should a dying job call its callbacks? We only found one answer which + required cooperation from the job creator and led to the following + rules: + + * AsyncJob destructors must not call any callbacks. + + * AsyncJob::swanSong() is responsible for async-calling any remaining + (i.e. set, uncalled, and uncancelled) callbacks. + + * AsyncJob::swanSong() is called (only) for started jobs. + + * AsyncJob destructing sequence should validate that no callbacks remain + uncalled for started jobs. + + ... where an AsyncJob x is considered "started" if AsyncJob::Start(x) + has returned without throwing. + + A new JobWait class helps job creators follow these rules while keeping + track on in-progress helper jobs and killing no-longer-needed helpers. + + Also fixed very similar bugs in tunnel.cc code. + + ## Part 3: ConnOpener fixes + + 1. Many ConnOpener users are written to keep a ConnectionPointer to the + destination given to ConnOpener. This means that their connection + magically opens when ConnOpener successfully connects, before + ConnOpener has a chance to notify the user about the changes. Having + multiple concurrent connection owners is always dangerous, and the + user cannot even have a close handler registered for its now-open + connection. When something happens to ConnOpener or its answer, the + user job may be in trouble. Now, ConnOpener callers no longer pass + Connection objects they own, cloning them as needed. That adjustment + required adjustment 2: + + 2. Refactored ConnOpener users to stop assuming that the answer contains + a pointer to their connection object. After adjustment 1 above, it + does not. HappyConnOpener relied on that assumption quite a bit so we + had to refactor to use two custom callback methods instead of one + with a complicated if-statement distinguishing prime from spare + attempts. This refactoring is an overall improvement because it + simplifies the code. Other ConnOpener users just needed to remove a + few no longer valid paranoid assertions/Musts. + + 3. ConnOpener users were forced to remember to close params.conn when + processing negative answers. Some, naturally, forgot, triggering + warnings about orphaned connections (e.g., Ident and TcpLogger). + ConnOpener now closes its open connection before sending a negative + answer. + + 4. ConnOpener would trigger orphan connection warnings if the job ended + after opening the connection but without supplying the connection to + the requestor (e.g., because the requestor has gone away). Now, + ConnOpener explicitly closes its open connection if it has not been + sent to the requestor. + + Also fixed Comm::ConnOpener::cleanFd() debugging that was incorrectly + saying that the method closes the temporary descriptor. + + Also fixed ConnOpener callback's syncWithComm(): The stale + CommConnectCbParams override was testing unused (i.e. always negative) + CommConnectCbParams::fd and was trying to cancel the callback that most + (possibly all) recipients rely on: ConnOpener users expect a negative + answer rather than no answer at all. + + Also, after comparing the needs of two old/existing and a temporary + added ("clone everything") Connection cloning method callers, we decided + there is no need to maintain three different methods. All existing + callers should be fine with a single method because none of them suffers + from "extra" copying of members that others need. Right now, the new + cloneProfile() method copies everything except FD and a few + special-purpose members (with documented reasons for not copying). + + Also added Comm::Connection::cloneDestinationDetails() debugging to + simplify tracking dependencies between half-baked Connection objects + carrying destination/flags/other metadata and open Connection objects + created by ConnOpener using that metadata (which are later delivered to + ConnOpener users and, in some cases, replace those half-baked + connections mentioned earlier. Long-term, we need to find a better way + to express these and other Connection states/stages than comments and + debugging messages. + + ## Part 4: Comm::Connection closure callbacks + + We improved many closure callbacks to make sure (to the extent possible) + that Connection and other objects are in sync with Comm. There are lots + of small bugs, inconsistencies, and other problems in Connection closure + handlers. It is not clear whether any of those problems could result in + serious runtime errors or leaks. In theory, the rest of the code could + neutralize their negative side effects. However, even in that case, it + was just a matter of time before the next bug will bite us due to stale + Connection::fd and such. These changes themselves carry elevated risk, + but they get us closer to reliable code as far as Connection maintenance + is concerned; otherwise, we will keep chasing their deadly side effects. + + Long-term, all these manual efforts to keep things in sync should become + unnecessary with the introduction of appropriate Connection ownership + APIs that automatically maintain the corresponding environments (TODO). + + ## Part 5: Other notable improvements in the adjusted code + + Improved Security::PeerConnector::serverConn and + Http::Tunneler::connection management, especially when sending negative + answers. When sending a negative answer, we would set answer().conn to + an open connection, async-send that answer, and then hurry to close the + connection using our pointer to the shared Connection object. If + everything went according to the plan, the recipient would get a non-nil + but closed Connection object. Now, a negative answer simply means no + connection at all. Same for a tunneled answer. + + Refactored ICAP connection-establishing code to to delay Connection + ownership until the ICAP connection is fully ready. This change + addresses primary Connection ownership concerns (as they apply to this + ICAP code) except orphaning of the temporary Connection object by helper + job start exceptions (now an explicit XXX). For example, the transaction + no longer shares a Connection object with ConnOpener and + IcapPeerConnector jobs. + + Probably fixed a bug where PeerConnector::negotiate() assumed that a + sslFinalized() does not return true after callBack(). It may (e.g., when + CertValidationHelper::Submit() throws). Same for + PeekingPeerConnector::checkForPeekAndSpliceMatched(). + + Fixed FwdState::advanceDestination() bug that did not save + ERR_GATEWAY_FAILURE details and "lost" the address of that failed + destination, making it unavailable to future retries (if any). + + Polished PeerPoolMgr, Ident, and Gopher code to be able to fix similar + job callback and connection management issues. + + Polished AsyncJob::Start() API. Start() used to return a job pointer, + but that was a bad idea: + + * It implies that a failed Start() will return a nil pointer, and that + the caller should check the result. Neither is true. + + * It encourages callers to dereference the returned pointer to further + adjust the job. That technically works (today) but violates the rules + of communicating with an async job. The Start() method is the boundary + after which the job is deemed asynchronous. + + Also removed old "and wait for..." post-Start() comments because the + code itself became clear enough, and those comments were becoming + increasingly stale (because they duplicated the callback names above + them). + + Fix Tunneler and PeerConnector handling of last-resort callback + requirements. Events like handleStopRequest() and callException() stop + the job but should not be reported as a BUG (e.g., it would be up to the + callException() to decide how to report the caught exception). There + might (or there will) be other, similar cases where the job is stopped + prematurely for some non-BUG reason beyond swanSong() knowledge. The + existence of non-bug cases does not mean there could be no bugs worth + reporting here, but until they can be identified more reliably than all + these benign/irrelevant cases, reporting no BUGs is a (much) lesser + evil. + + TODO: Revise AsyncJob::doneAll(). Many of its overrides are written to + check for both positive (i.e. mission accomplished) and negative (i.e. + mission cancelled or cannot be accomplished) conditions, but the latter + is usually unnecessary, especially after we added handleStopRequest() + API to properly support external job cancellation events. Many doneAll() + overrides can probably be greatly simplified. + + ---- + + Cherry picked SQUID-568-premature-serverconn-use-v5 commit 22b5f78. + + * fixup: Cherry-picked SQUID-568-premature-serverconn-use PR-time fixes + + In git log order: + * e64a6c1: Undone an out-of-scope change and added a missing 'auto' + * aeaf83d: Fixed an 'unused parameter' error + * f49d009: fixup: No explicit destructors with implicit copying methods + * c30c37f: Removed an unnecessary explicit copy constructor + * 012f5ec: Excluded Connection::rfc931 from cloning + * 366c78a: ICAP: do not set connect_timeout on the established conn... + + This branch is now in sync with SQUID-568-premature-serverconn-use (S) + commit e64a6c1 (except for official changes merged from master/v6 into S + closer to the end of PR 877 work (i.e. S' merge commit 0a7432a). + + * Fix FATAL ServiceRep::putConnection exception: theBusyConns > 0 + + FATAL: check failed: theBusyConns > 0 + exception location: ServiceRep.cc(163) putConnection + + Since master/v6 commit 2b6b1bc, a timeout on a ready-to-shovel + Squid-service ICAP connection was decrementing theBusyConns level one + extra time because Adaptation::Icap::Xaction::noteCommTimedout() started + calling both noteConnectionFailed() and closeConnection(). Depending on + the actual theBusyConns level, the extra decrement could result in FATAL + errors later, when putConnection() was called (for a different ICAP + transaction) with zero theBusyConns in an exception-unprotected context. + + Throughout these changes, Xaction still counts the above timeouts as a + service failure. That is done by calling ServiceRep::noteFailure() from + Xaction::callException(), including in timeout cases described above. + + ---- + + Cherry-picked master/v6 commit a8ac892. + +diff --git a/src/BodyPipe.cc b/src/BodyPipe.cc +index 9abe7d48b..37a3b32cd 100644 +--- a/src/BodyPipe.cc ++++ b/src/BodyPipe.cc +@@ -335,6 +335,7 @@ BodyPipe::startAutoConsumptionIfNeeded() + return; + + theConsumer = new BodySink(this); ++ AsyncJob::Start(theConsumer); + debugs(91,7, HERE << "starting auto consumption" << status()); + scheduleBodyDataNotification(); + } +diff --git a/src/CommCalls.cc b/src/CommCalls.cc +index 4cd503dcb..cca23f6e3 100644 +--- a/src/CommCalls.cc ++++ b/src/CommCalls.cc +@@ -53,6 +53,9 @@ CommAcceptCbParams::CommAcceptCbParams(void *aData): + { + } + ++// XXX: Add CommAcceptCbParams::syncWithComm(). Adjust syncWithComm() API if all ++// implementations always return true. ++ + void + CommAcceptCbParams::print(std::ostream &os) const + { +@@ -72,13 +75,24 @@ CommConnectCbParams::CommConnectCbParams(void *aData): + bool + CommConnectCbParams::syncWithComm() + { +- // drop the call if the call was scheduled before comm_close but +- // is being fired after comm_close +- if (fd >= 0 && fd_table[fd].closing()) { +- debugs(5, 3, HERE << "dropping late connect call: FD " << fd); +- return false; ++ assert(conn); ++ ++ // change parameters if this is a successful callback that was scheduled ++ // after its Comm-registered connection started to close ++ ++ if (flag != Comm::OK) { ++ assert(!conn->isOpen()); ++ return true; // not a successful callback; cannot go out of sync + } +- return true; // now we are in sync and can handle the call ++ ++ assert(conn->isOpen()); ++ if (!fd_table[conn->fd].closing()) ++ return true; // no closing in progress; in sync (for now) ++ ++ debugs(5, 3, "converting to Comm::ERR_CLOSING: " << conn); ++ conn->noteClosure(); ++ flag = Comm::ERR_CLOSING; ++ return true; // now the callback is in sync with Comm again + } + + /* CommIoCbParams */ +diff --git a/src/Downloader.cc b/src/Downloader.cc +index 298c3a836..012387fb8 100644 +--- a/src/Downloader.cc ++++ b/src/Downloader.cc +@@ -81,6 +81,10 @@ void + Downloader::swanSong() + { + debugs(33, 6, this); ++ ++ if (callback_) // job-ending emergencies like handleStopRequest() or callException() ++ callBack(Http::scInternalServerError); ++ + if (context_) { + context_->finished(); + context_ = nullptr; +@@ -251,6 +255,7 @@ Downloader::downloadFinished() + void + Downloader::callBack(Http::StatusCode const statusCode) + { ++ assert(callback_); + CbDialer *dialer = dynamic_cast<CbDialer*>(callback_->getDialer()); + Must(dialer); + dialer->status = statusCode; +diff --git a/src/FwdState.cc b/src/FwdState.cc +index b721017c3..b69e60c7c 100644 +--- a/src/FwdState.cc ++++ b/src/FwdState.cc +@@ -207,26 +207,22 @@ FwdState::stopAndDestroy(const char *reason) + { + debugs(17, 3, "for " << reason); + +- if (opening()) +- cancelOpening(reason); ++ cancelStep(reason); + + PeerSelectionInitiator::subscribed = false; // may already be false + self = nullptr; // we hope refcounting destroys us soon; may already be nil + /* do not place any code here as this object may be gone by now */ + } + +-/// Notify connOpener that we no longer need connections. We do not have to do +-/// this -- connOpener would eventually notice on its own, but notifying reduces +-/// waste and speeds up spare connection opening for other transactions (that +-/// could otherwise wait for this transaction to use its spare allowance). ++/// Notify a pending subtask, if any, that we no longer need its help. We do not ++/// have to do this -- the subtask job will eventually end -- but ending it ++/// earlier reduces waste and may reduce DoS attack surface. + void +-FwdState::cancelOpening(const char *reason) ++FwdState::cancelStep(const char *reason) + { +- assert(calls.connector); +- calls.connector->cancel(reason); +- calls.connector = nullptr; +- notifyConnOpener(); +- connOpener.clear(); ++ transportWait.cancel(reason); ++ encryptionWait.cancel(reason); ++ peerWait.cancel(reason); + } + + #if STRICT_ORIGINAL_DST +@@ -348,8 +344,7 @@ FwdState::~FwdState() + + entry = NULL; + +- if (opening()) +- cancelOpening("~FwdState"); ++ cancelStep("~FwdState"); + + if (Comm::IsConnOpen(serverConn)) + closeServerConnection("~FwdState"); +@@ -501,8 +496,17 @@ FwdState::fail(ErrorState * errorState) + if (!errorState->request) + errorState->request = request; + +- if (err->type != ERR_ZERO_SIZE_OBJECT) +- return; ++ if (err->type == ERR_ZERO_SIZE_OBJECT) ++ reactToZeroSizeObject(); ++ ++ destinationReceipt = nullptr; // may already be nil ++} ++ ++/// ERR_ZERO_SIZE_OBJECT requires special adjustments ++void ++FwdState::reactToZeroSizeObject() ++{ ++ assert(err->type == ERR_ZERO_SIZE_OBJECT); + + if (pconnRace == racePossible) { + debugs(17, 5, HERE << "pconn race happened"); +@@ -566,6 +570,8 @@ FwdState::complete() + + if (Comm::IsConnOpen(serverConn)) + unregister(serverConn); ++ serverConn = nullptr; ++ destinationReceipt = nullptr; + + storedWholeReply_ = nullptr; + entry->reset(); +@@ -584,6 +590,12 @@ FwdState::complete() + } + } + ++bool ++FwdState::usingDestination() const ++{ ++ return encryptionWait || peerWait || Comm::IsConnOpen(serverConn); ++} ++ + void + FwdState::markStoredReplyAsWhole(const char * const whyWeAreSure) + { +@@ -613,19 +625,19 @@ FwdState::noteDestination(Comm::ConnectionPointer path) + + destinations->addPath(path); + +- if (Comm::IsConnOpen(serverConn)) { ++ if (usingDestination()) { + // We are already using a previously opened connection, so we cannot be +- // waiting for connOpener. We still receive destinations for backup. +- Must(!opening()); ++ // waiting for it. We still receive destinations for backup. ++ Must(!transportWait); + return; + } + +- if (opening()) { ++ if (transportWait) { + notifyConnOpener(); + return; // and continue to wait for FwdState::noteConnection() callback + } + +- // This is the first path candidate we have seen. Create connOpener. ++ // This is the first path candidate we have seen. Use it. + useDestinations(); + } + +@@ -653,19 +665,19 @@ FwdState::noteDestinationsEnd(ErrorState *selectionError) + // if all of them fail, forwarding as whole will fail + Must(!selectionError); // finding at least one path means selection succeeded + +- if (Comm::IsConnOpen(serverConn)) { ++ if (usingDestination()) { + // We are already using a previously opened connection, so we cannot be +- // waiting for connOpener. We were receiving destinations for backup. +- Must(!opening()); ++ // waiting for it. We were receiving destinations for backup. ++ Must(!transportWait); + return; + } + +- Must(opening()); // or we would be stuck with nothing to do or wait for ++ Must(transportWait); // or we would be stuck with nothing to do or wait for + notifyConnOpener(); + // and continue to wait for FwdState::noteConnection() callback + } + +-/// makes sure connOpener knows that destinations have changed ++/// makes sure connection opener knows that the destinations have changed + void + FwdState::notifyConnOpener() + { +@@ -674,7 +686,7 @@ FwdState::notifyConnOpener() + } else { + debugs(17, 7, "notifying about " << *destinations); + destinations->notificationPending = true; +- CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange); ++ CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange); + } + } + +@@ -684,7 +696,7 @@ static void + fwdServerClosedWrapper(const CommCloseCbParams ¶ms) + { + FwdState *fwd = (FwdState *)params.data; +- fwd->serverClosed(params.fd); ++ fwd->serverClosed(); + } + + /**** PRIVATE *****************************************************************/ +@@ -754,13 +766,23 @@ FwdState::checkRetriable() + } + + void +-FwdState::serverClosed(int fd) ++FwdState::serverClosed() + { +- // XXX: fd is often -1 here +- debugs(17, 2, "FD " << fd << " " << entry->url() << " after " << +- (fd >= 0 ? fd_table[fd].pconn.uses : -1) << " requests"); +- if (fd >= 0 && serverConnection()->fd == fd) +- fwdPconnPool->noteUses(fd_table[fd].pconn.uses); ++ // XXX: This method logic attempts to tolerate Connection::close() called ++ // for serverConn earlier, by one of our dispatch()ed jobs. If that happens, ++ // serverConn will already be closed here or, worse, it will already be open ++ // for the next forwarding attempt. The current code prevents us getting ++ // stuck, but the long term solution is to stop sharing serverConn. ++ debugs(17, 2, serverConn); ++ if (Comm::IsConnOpen(serverConn)) { ++ const auto uses = fd_table[serverConn->fd].pconn.uses; ++ debugs(17, 3, "prior uses: " << uses); ++ fwdPconnPool->noteUses(uses); // XXX: May not have come from fwdPconnPool ++ serverConn->noteClosure(); ++ } ++ serverConn = nullptr; ++ closeHandler = nullptr; ++ destinationReceipt = nullptr; + retryOrBail(); + } + +@@ -802,6 +824,8 @@ FwdState::handleUnregisteredServerEnd() + { + debugs(17, 2, HERE << "self=" << self << " err=" << err << ' ' << entry->url()); + assert(!Comm::IsConnOpen(serverConn)); ++ serverConn = nullptr; ++ destinationReceipt = nullptr; + retryOrBail(); + } + +@@ -819,6 +843,8 @@ FwdState::advanceDestination(const char *stepDescription, const Comm::Connection + } catch (...) { + debugs (17, 2, "exception while trying to " << stepDescription << ": " << CurrentException); + closePendingConnection(conn, "connection preparation exception"); ++ if (!err) ++ fail(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request, al)); + retryOrBail(); + } + } +@@ -830,8 +856,7 @@ FwdState::noteConnection(HappyConnOpener::Answer &answer) + { + assert(!destinationReceipt); + +- calls.connector = nullptr; +- connOpener.clear(); ++ transportWait.finish(); + + Must(n_tries <= answer.n_tries); // n_tries cannot decrease + n_tries = answer.n_tries; +@@ -843,6 +868,8 @@ FwdState::noteConnection(HappyConnOpener::Answer &answer) + Must(!Comm::IsConnOpen(answer.conn)); + answer.error.clear(); // preserve error for errorSendComplete() + } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) { ++ // The socket could get closed while our callback was queued. Sync ++ // Connection. XXX: Connection::fd may already be stale/invalid here. + // We do not know exactly why the connection got closed, so we play it + // safe, allowing retries only for persistent (reused) connections + if (answer.reused) { +@@ -912,23 +939,24 @@ FwdState::establishTunnelThruProxy(const Comm::ConnectionPointer &conn) + if (!conn->getPeer()->options.no_delay) + tunneler->setDelayId(entry->mem_obj->mostBytesAllowed()); + #endif +- AsyncJob::Start(tunneler); +- // and wait for the tunnelEstablishmentDone() call ++ peerWait.start(tunneler, callback); + } + + /// resumes operations after the (possibly failed) HTTP CONNECT exchange + void + FwdState::tunnelEstablishmentDone(Http::TunnelerAnswer &answer) + { ++ peerWait.finish(); ++ + ErrorState *error = nullptr; + if (!answer.positive()) { +- Must(!Comm::IsConnOpen(answer.conn)); ++ Must(!answer.conn); + error = answer.squidError.get(); + Must(error); + answer.squidError.clear(); // preserve error for fail() + } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) { +- // The socket could get closed while our callback was queued. +- // We close Connection here to sync Connection::fd. ++ // The socket could get closed while our callback was queued. Sync ++ // Connection. XXX: Connection::fd may already be stale/invalid here. + closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone"); + error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al); + } else if (!answer.leftovers.isEmpty()) { +@@ -998,18 +1026,21 @@ FwdState::secureConnectionToPeer(const Comm::ConnectionPointer &conn) + #endif + connector = new Security::BlindPeerConnector(requestPointer, conn, callback, al, sslNegotiationTimeout); + connector->noteFwdPconnUse = true; +- AsyncJob::Start(connector); // will call our callback ++ encryptionWait.start(connector, callback); + } + + /// called when all negotiations with the TLS-speaking peer have been completed + void + FwdState::connectedToPeer(Security::EncryptorAnswer &answer) + { ++ encryptionWait.finish(); ++ + ErrorState *error = nullptr; + if ((error = answer.error.get())) { +- Must(!Comm::IsConnOpen(answer.conn)); ++ assert(!answer.conn); + answer.error.clear(); // preserve error for errorSendComplete() + } else if (answer.tunneled) { ++ assert(!answer.conn); + // TODO: When ConnStateData establishes tunnels, its state changes + // [in ways that may affect logging?]. Consider informing + // ConnStateData about our tunnel or otherwise unifying tunnel +@@ -1019,6 +1050,8 @@ FwdState::connectedToPeer(Security::EncryptorAnswer &answer) + complete(); // destroys us + return; + } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) { ++ // The socket could get closed while our callback was queued. Sync ++ // Connection. XXX: Connection::fd may already be stale/invalid here. + closePendingConnection(answer.conn, "conn was closed while waiting for connectedToPeer"); + error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al); + } +@@ -1091,22 +1124,20 @@ FwdState::connectStart() + Must(!request->pinnedConnection()); + + assert(!destinations->empty()); +- assert(!opening()); ++ assert(!usingDestination()); + + // Ditch error page if it was created before. + // A new one will be created if there's another problem + delete err; + err = nullptr; + request->clearError(); +- serverConn = nullptr; +- destinationReceipt = nullptr; + + request->hier.startPeerClock(); + +- calls.connector = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this)); ++ AsyncCall::Pointer callback = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this)); + + HttpRequest::Pointer cause = request; +- const auto cs = new HappyConnOpener(destinations, calls.connector, cause, start_t, n_tries, al); ++ const auto cs = new HappyConnOpener(destinations, callback, cause, start_t, n_tries, al); + cs->setHost(request->url.host()); + bool retriable = checkRetriable(); + if (!retriable && Config.accessList.serverPconnForNonretriable) { +@@ -1118,8 +1149,7 @@ FwdState::connectStart() + cs->setRetriable(retriable); + cs->allowPersistent(pconnRace != raceHappened); + destinations->notificationPending = true; // start() is async +- connOpener = cs; +- AsyncJob::Start(cs); ++ transportWait.start(cs, callback); + } + + /// send request on an existing connection dedicated to the requesting client +diff --git a/src/FwdState.h b/src/FwdState.h +index de75f33cc..ebf5f82b1 100644 +--- a/src/FwdState.h ++++ b/src/FwdState.h +@@ -9,13 +9,12 @@ + #ifndef SQUID_FORWARD_H + #define SQUID_FORWARD_H + +-#include "base/CbcPointer.h" + #include "base/forward.h" ++#include "base/JobWait.h" + #include "base/RefCount.h" + #include "clients/forward.h" + #include "comm.h" + #include "comm/Connection.h" +-#include "comm/ConnOpener.h" + #include "error/forward.h" + #include "fde.h" + #include "http/StatusCode.h" +@@ -38,7 +37,6 @@ class ResolvedPeers; + typedef RefCount<ResolvedPeers> ResolvedPeersPointer; + + class HappyConnOpener; +-typedef CbcPointer<HappyConnOpener> HappyConnOpenerPointer; + class HappyConnOpenerAnswer; + + /// Sets initial TOS value and Netfilter for the future outgoing connection. +@@ -87,7 +85,7 @@ public: + void handleUnregisteredServerEnd(); + int reforward(); + bool reforwardableStatus(const Http::StatusCode s) const; +- void serverClosed(int fd); ++ void serverClosed(); + void connectStart(); + void connectDone(const Comm::ConnectionPointer & conn, Comm::Flag status, int xerrno); + bool checkRetry(); +@@ -115,6 +113,9 @@ private: + /* PeerSelectionInitiator API */ + virtual void noteDestination(Comm::ConnectionPointer conn) override; + virtual void noteDestinationsEnd(ErrorState *selectionError) override; ++ /// whether the successfully selected path destination or the established ++ /// server connection is still in use ++ bool usingDestination() const; + + void noteConnection(HappyConnOpenerAnswer &); + +@@ -157,13 +158,10 @@ private: + /// \returns the time left for this connection to become connected or 1 second if it is less than one second left + time_t connectingTimeout(const Comm::ConnectionPointer &conn) const; + +- /// whether we are waiting for HappyConnOpener +- /// same as calls.connector but may differ from connOpener.valid() +- bool opening() const { return connOpener.set(); } +- +- void cancelOpening(const char *reason); ++ void cancelStep(const char *reason); + + void notifyConnOpener(); ++ void reactToZeroSizeObject(); + + void updateAleWithFinalError(); + +@@ -182,11 +180,6 @@ private: + time_t start_t; + int n_tries; ///< the number of forwarding attempts so far + +- // AsyncCalls which we set and may need cancelling. +- struct { +- AsyncCall::Pointer connector; ///< a call linking us to the ConnOpener producing serverConn. +- } calls; +- + struct { + bool connected_okay; ///< TCP link ever opened properly. This affects retry of POST,PUT,CONNECT,etc + bool dont_retry; +@@ -194,7 +187,16 @@ private: + bool destinationsFound; ///< at least one candidate path found + } flags; + +- HappyConnOpenerPointer connOpener; ///< current connection opening job ++ /// waits for a transport connection to the peer to be established/opened ++ JobWait<HappyConnOpener> transportWait; ++ ++ /// waits for the established transport connection to be secured/encrypted ++ JobWaitSecurity::PeerConnector encryptionWait; ++ ++ /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated ++ /// over the (encrypted, if needed) transport connection to that cache_peer ++ JobWaitHttp::Tunneler peerWait; ++ + ResolvedPeersPointer destinations; ///< paths for forwarding the request + Comm::ConnectionPointer serverConn; ///< a successfully opened connection to a server. + PeerConnectionPointer destinationReceipt; ///< peer selection result (or nil) +diff --git a/src/HappyConnOpener.cc b/src/HappyConnOpener.cc +index 6e0872506..6d83ff14f 100644 +--- a/src/HappyConnOpener.cc ++++ b/src/HappyConnOpener.cc +@@ -18,6 +18,7 @@ + #include "neighbors.h" + #include "pconn.h" + #include "PeerPoolMgr.h" ++#include "sbuf/Stream.h" + #include "SquidConfig.h" + + CBDATA_CLASS_INIT(HappyConnOpener); +@@ -330,6 +331,8 @@ HappyConnOpener::HappyConnOpener(const ResolvedPeers::Pointer &dests, const Asyn + fwdStart(aFwdStart), + callback_(aCall), + destinations(dests), ++ prime(&HappyConnOpener::notePrimeConnectDone, "HappyConnOpener::notePrimeConnectDone"), ++ spare(&HappyConnOpener::noteSpareConnectDone, "HappyConnOpener::noteSpareConnectDone"), + ale(anAle), + cause(request), + n_tries(tries) +@@ -410,33 +413,43 @@ HappyConnOpener::swanSong() + AsyncJob::swanSong(); + } + ++/// HappyConnOpener::Attempt printer for debugging ++std::ostream & ++operator <<(std::ostream &os, const HappyConnOpener::Attempt &attempt) ++{ ++ if (!attempt.path) ++ os << '-'; ++ else if (attempt.path->isOpen()) ++ os << "FD " << attempt.path->fd; ++ else if (attempt.connWait) ++ os << attempt.connWait; ++ else // destination is known; connection closed (and we are not opening any) ++ os << attempt.path->id; ++ return os; ++} ++ + const char * + HappyConnOpener::status() const + { +- static MemBuf buf; +- buf.reset(); ++ // TODO: In a redesigned status() API, the caller may mimic this approach. ++ static SBuf buf; ++ buf.clear(); + +- buf.append(" [", 2); ++ SBufStream os(buf); ++ ++ os.write(" [", 2); + if (stopReason) +- buf.appendf("Stopped, reason:%s", stopReason); +- if (prime) { +- if (prime.path && prime.path->isOpen()) +- buf.appendf(" prime FD %d", prime.path->fd); +- else if (prime.connector) +- buf.appendf(" prime call%ud", prime.connector->id.value); +- } +- if (spare) { +- if (spare.path && spare.path->isOpen()) +- buf.appendf(" spare FD %d", spare.path->fd); +- else if (spare.connector) +- buf.appendf(" spare call%ud", spare.connector->id.value); +- } ++ os << "Stopped:" << stopReason; ++ if (prime) ++ os << "prime:" << prime; ++ if (spare) ++ os << "spare:" << spare; + if (n_tries) +- buf.appendf(" tries %d", n_tries); +- buf.appendf(" %s%u]", id.prefix(), id.value); +- buf.terminate(); ++ os << " tries:" << n_tries; ++ os << ' ' << id << ']'; + +- return buf.content(); ++ buf = os.buf(); ++ return buf.c_str(); + } + + /// Create "503 Service Unavailable" or "504 Gateway Timeout" error depending +@@ -516,7 +529,7 @@ void + HappyConnOpener::startConnecting(Attempt &attempt, PeerConnectionPointer &dest) + { + Must(!attempt.path); +- Must(!attempt.connector); ++ Must(!attempt.connWait); + Must(dest); + + const auto bumpThroughPeer = cause->flags.sslBumped && dest->getPeer(); +@@ -552,51 +565,52 @@ HappyConnOpener::openFreshConnection(Attempt &attempt, PeerConnectionPointer &de + entry->mem_obj->checkUrlChecksum(); + #endif + +- GetMarkingsToServer(cause.getRaw(), *dest); ++ const auto conn = dest->cloneProfile(); ++ GetMarkingsToServer(cause.getRaw(), *conn); + +- // ConnOpener modifies its destination argument so we reset the source port +- // in case we are reusing the destination already used by our predecessor. +- dest->local.port(0); + ++n_tries; + + typedef CommCbMemFunT<HappyConnOpener, CommConnectCbParams> Dialer; +- AsyncCall::Pointer callConnect = JobCallback(48, 5, Dialer, this, HappyConnOpener::connectDone); ++ AsyncCall::Pointer callConnect = asyncCall(48, 5, attempt.callbackMethodName, ++ Dialer(this, attempt.callbackMethod)); + const time_t connTimeout = dest->connectTimeout(fwdStart); +- Comm::ConnOpener *cs = new Comm::ConnOpener(dest, callConnect, connTimeout); +- if (!dest->getPeer()) ++ auto cs = new Comm::ConnOpener(conn, callConnect, connTimeout); ++ if (!conn->getPeer()) + cs->setHost(host_); + +- attempt.path = dest; +- attempt.connector = callConnect; +- attempt.opener = cs; ++ attempt.path = dest; // but not the being-opened conn! ++ attempt.connWait.start(cs, callConnect); ++} + +- AsyncJob::Start(cs); ++/// Comm::ConnOpener callback for the prime connection attempt ++void ++HappyConnOpener::notePrimeConnectDone(const CommConnectCbParams ¶ms) ++{ ++ handleConnOpenerAnswer(prime, params, "new prime connection"); + } + +-/// called by Comm::ConnOpener objects after a prime or spare connection attempt +-/// completes (successfully or not) ++/// Comm::ConnOpener callback for the spare connection attempt + void +-HappyConnOpener::connectDone(const CommConnectCbParams ¶ms) ++HappyConnOpener::noteSpareConnectDone(const CommConnectCbParams ¶ms) + { +- Must(params.conn); +- const bool itWasPrime = (params.conn == prime.path); +- const bool itWasSpare = (params.conn == spare.path); +- Must(itWasPrime != itWasSpare); +- +- PeerConnectionPointer handledPath; +- if (itWasPrime) { +- handledPath = prime.path; +- prime.finish(); +- } else { +- handledPath = spare.path; +- spare.finish(); +- if (gotSpareAllowance) { +- TheSpareAllowanceGiver.jobUsedAllowance(); +- gotSpareAllowance = false; +- } ++ if (gotSpareAllowance) { ++ TheSpareAllowanceGiver.jobUsedAllowance(); ++ gotSpareAllowance = false; + } ++ handleConnOpenerAnswer(spare, params, "new spare connection"); ++} ++ ++/// prime/spare-agnostic processing of a Comm::ConnOpener result ++void ++HappyConnOpener::handleConnOpenerAnswer(Attempt &attempt, const CommConnectCbParams ¶ms, const char *what) ++{ ++ Must(params.conn); ++ ++ // finalize the previously selected path before attempt.finish() forgets it ++ auto handledPath = attempt.path; ++ handledPath.finalize(params.conn); // closed on errors ++ attempt.finish(); + +- const char *what = itWasPrime ? "new prime connection" : "new spare connection"; + if (params.flag == Comm::OK) { + sendSuccess(handledPath, false, what); + return; +@@ -605,7 +619,6 @@ HappyConnOpener::connectDone(const CommConnectCbParams ¶ms) + debugs(17, 8, what << " failed: " << params.conn); + if (const auto peer = params.conn->getPeer()) + peerConnectFailed(peer); +- params.conn->close(); // TODO: Comm::ConnOpener should do this instead. + + // remember the last failure (we forward it if we cannot connect anywhere) + lastFailedConnection = handledPath; +@@ -881,13 +894,23 @@ HappyConnOpener::ranOutOfTimeOrAttempts() const + return false; + } + ++HappyConnOpener::Attempt::Attempt(const CallbackMethod method, const char *methodName): ++ callbackMethod(method), ++ callbackMethodName(methodName) ++{ ++} ++ ++void ++HappyConnOpener::Attempt::finish() ++{ ++ connWait.finish(); ++ path = nullptr; ++} ++ + void + HappyConnOpener::Attempt::cancel(const char *reason) + { +- if (connector) { +- connector->cancel(reason); +- CallJobHere(17, 3, opener, Comm::ConnOpener, noteAbort); +- } +- clear(); ++ connWait.cancel(reason); ++ path = nullptr; + } + +diff --git a/src/HappyConnOpener.h b/src/HappyConnOpener.h +index 4f3135c60..c57c431ad 100644 +--- a/src/HappyConnOpener.h ++++ b/src/HappyConnOpener.h +@@ -156,22 +156,28 @@ private: + /// a connection opening attempt in progress (or falsy) + class Attempt { + public: ++ /// HappyConnOpener method implementing a ConnOpener callback ++ using CallbackMethod = void (HappyConnOpener::*)(const CommConnectCbParams &); ++ ++ Attempt(const CallbackMethod method, const char *methodName); ++ + explicit operator bool() const { return static_cast<bool>(path); } + + /// reacts to a natural attempt completion (successful or otherwise) +- void finish() { clear(); } ++ void finish(); + + /// aborts an in-progress attempt + void cancel(const char *reason); + + PeerConnectionPointer path; ///< the destination we are connecting to +- AsyncCall::Pointer connector; ///< our opener callback +- Comm::ConnOpener::Pointer opener; ///< connects to path and calls us + +- private: +- /// cleans up after the attempt ends (successfully or otherwise) +- void clear() { path = nullptr; connector = nullptr; opener = nullptr; } ++ /// waits for a connection to the peer to be established/opened ++ JobWaitComm::ConnOpener connWait; ++ ++ const CallbackMethod callbackMethod; ///< ConnOpener calls this method ++ const char * const callbackMethodName; ///< for callbackMethod debugging + }; ++ friend std::ostream &operator <<(std::ostream &, const Attempt &); + + /* AsyncJob API */ + virtual void start() override; +@@ -190,7 +196,9 @@ private: + void openFreshConnection(Attempt &, PeerConnectionPointer &); + bool reuseOldConnection(PeerConnectionPointer &); + +- void connectDone(const CommConnectCbParams &); ++ void notePrimeConnectDone(const CommConnectCbParams &); ++ void noteSpareConnectDone(const CommConnectCbParams &); ++ void handleConnOpenerAnswer(Attempt &, const CommConnectCbParams &, const char *connDescription); + + void checkForNewConnection(); + +diff --git a/src/PeerPoolMgr.cc b/src/PeerPoolMgr.cc +index 2caa09f44..7423dd669 100644 +--- a/src/PeerPoolMgr.cc ++++ b/src/PeerPoolMgr.cc +@@ -43,9 +43,8 @@ public: + PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"), + peer(cbdataReference(aPeer)), + request(), +- opener(), +- securer(), +- closer(), ++ transportWait(), ++ encryptionWait(), + addrUsed(0) + { + } +@@ -90,7 +89,7 @@ PeerPoolMgr::doneAll() const + void + PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms) + { +- opener = NULL; ++ transportWait.finish(); + + if (!validPeer()) { + debugs(48, 3, "peer gone"); +@@ -100,9 +99,6 @@ PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms) + } + + if (params.flag != Comm::OK) { +- /* it might have been a timeout with a partially open link */ +- if (params.conn != NULL) +- params.conn->close(); + peerConnectFailed(peer); + checkpoint("conn opening failure"); // may retry + return; +@@ -112,20 +108,16 @@ PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms) + + // Handle TLS peers. + if (peer->secure.encryptTransport) { +- typedef CommCbMemFunT<PeerPoolMgr, CommCloseCbParams> CloserDialer; +- closer = JobCallback(48, 3, CloserDialer, this, +- PeerPoolMgr::handleSecureClosure); +- comm_add_close_handler(params.conn->fd, closer); +- +- securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer", +- MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer)); ++ // XXX: Exceptions orphan params.conn ++ AsyncCall::Pointer callback = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer", ++ MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer)); + + const int peerTimeout = peerConnectTimeout(peer); + const int timeUsed = squid_curtime - params.conn->startTime(); + // Use positive timeout when less than one second is left for conn. + const int timeLeft = positiveTimeout(peerTimeout - timeUsed); +- auto *connector = new Security::BlindPeerConnector(request, params.conn, securer, nullptr, timeLeft); +- AsyncJob::Start(connector); // will call our callback ++ const auto connector = new Security::BlindPeerConnector(request, params.conn, callback, nullptr, timeLeft); ++ encryptionWait.start(connector, callback); + return; + } + +@@ -144,16 +136,7 @@ PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn) + void + PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer) + { +- Must(securer != NULL); +- securer = NULL; +- +- if (closer != NULL) { +- if (answer.conn != NULL) +- comm_remove_close_handler(answer.conn->fd, closer); +- else +- closer->cancel("securing completed"); +- closer = NULL; +- } ++ encryptionWait.finish(); + + if (!validPeer()) { + debugs(48, 3, "peer gone"); +@@ -162,35 +145,33 @@ PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer) + return; + } + ++ assert(!answer.tunneled); + if (answer.error.get()) { +- if (answer.conn != NULL) +- answer.conn->close(); ++ assert(!answer.conn); + // PeerConnector calls peerConnectFailed() for us; + checkpoint("conn securing failure"); // may retry + return; + } + +- pushNewConnection(answer.conn); +-} ++ assert(answer.conn); + +-void +-PeerPoolMgr::handleSecureClosure(const CommCloseCbParams ¶ms) +-{ +- Must(closer != NULL); +- Must(securer != NULL); +- securer->cancel("conn closed by a 3rd party"); +- securer = NULL; +- closer = NULL; +- // allow the closing connection to fully close before we check again +- Checkpoint(this, "conn closure while securing"); ++ // The socket could get closed while our callback was queued. Sync ++ // Connection. XXX: Connection::fd may already be stale/invalid here. ++ if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) { ++ answer.conn->noteClosure(); ++ checkpoint("external connection closure"); // may retry ++ return; ++ } ++ ++ pushNewConnection(answer.conn); + } + + void + PeerPoolMgr::openNewConnection() + { + // KISS: Do nothing else when we are already doing something. +- if (opener != NULL || securer != NULL || shutting_down) { +- debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down); ++ if (transportWait || encryptionWait || shutting_down) { ++ debugs(48, 7, "busy: " << transportWait << '|' << encryptionWait << '|' << shutting_down); + return; // there will be another checkpoint when we are done opening/securing + } + +@@ -227,9 +208,9 @@ PeerPoolMgr::openNewConnection() + + const int ctimeout = peerConnectTimeout(peer); + typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer; +- opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection); +- Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout); +- AsyncJob::Start(cs); ++ AsyncCall::Pointer callback = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection); ++ const auto cs = new Comm::ConnOpener(conn, callback, ctimeout); ++ transportWait.start(cs, callback); + } + + void +diff --git a/src/PeerPoolMgr.h b/src/PeerPoolMgr.h +index 217994393..6da61b10b 100644 +--- a/src/PeerPoolMgr.h ++++ b/src/PeerPoolMgr.h +@@ -10,6 +10,7 @@ + #define SQUID_PEERPOOLMGR_H + + #include "base/AsyncJob.h" ++#include "base/JobWait.h" + #include "comm/forward.h" + #include "security/forward.h" + +@@ -54,18 +55,19 @@ protected: + /// Security::PeerConnector callback + void handleSecuredPeer(Security::EncryptorAnswer &answer); + +- /// called when the connection we are trying to secure is closed by a 3rd party +- void handleSecureClosure(const CommCloseCbParams ¶ms); +- + /// the final step in connection opening (and, optionally, securing) sequence + void pushNewConnection(const Comm::ConnectionPointer &conn); + + private: + CachePeer *peer; ///< the owner of the pool we manage + RefCount<HttpRequest> request; ///< fake HTTP request for conn opening code +- AsyncCall::Pointer opener; ///< whether we are opening a connection +- AsyncCall::Pointer securer; ///< whether we are securing a connection +- AsyncCall::Pointer closer; ///< monitors conn while we are securing it ++ ++ /// waits for a transport connection to the peer to be established/opened ++ JobWaitComm::ConnOpener transportWait; ++ ++ /// waits for the established transport connection to be secured/encrypted ++ JobWaitSecurity::BlindPeerConnector encryptionWait; ++ + unsigned int addrUsed; ///< counter for cycling through peer addresses + }; + +diff --git a/src/ResolvedPeers.cc b/src/ResolvedPeers.cc +index 5b2c6740d..06ab02d23 100644 +--- a/src/ResolvedPeers.cc ++++ b/src/ResolvedPeers.cc +@@ -151,7 +151,7 @@ ResolvedPeers::extractFound(const char *description, const Paths::iterator &foun + while (++pathsToSkip < paths_.size() && !paths_[pathsToSkip].available) {} + } + +- const auto cleanPath = path.connection->cloneDestinationDetails(); ++ const auto cleanPath = path.connection->cloneProfile(); + return PeerConnectionPointer(cleanPath, found - paths_.begin()); + } + +diff --git a/src/adaptation/icap/ModXact.cc b/src/adaptation/icap/ModXact.cc +index bbeebdbb5..982f38f8d 100644 +--- a/src/adaptation/icap/ModXact.cc ++++ b/src/adaptation/icap/ModXact.cc +@@ -187,8 +187,7 @@ void Adaptation::Icap::ModXact::startWriting() + openConnection(); + } + +-// connection with the ICAP service established +-void Adaptation::Icap::ModXact::handleCommConnected() ++void Adaptation::Icap::ModXact::startShoveling() + { + Must(state.writing == State::writingConnect); + +diff --git a/src/adaptation/icap/ModXact.h b/src/adaptation/icap/ModXact.h +index f8988ac29..2fda1318b 100644 +--- a/src/adaptation/icap/ModXact.h ++++ b/src/adaptation/icap/ModXact.h +@@ -155,10 +155,11 @@ public: + virtual void noteBodyProductionEnded(BodyPipe::Pointer); + virtual void noteBodyProducerAborted(BodyPipe::Pointer); + +- // comm handlers +- virtual void handleCommConnected(); ++ /* Xaction API */ ++ virtual void startShoveling(); + virtual void handleCommWrote(size_t size); + virtual void handleCommRead(size_t size); ++ + void handleCommWroteHeaders(); + void handleCommWroteBody(); + +diff --git a/src/adaptation/icap/OptXact.cc b/src/adaptation/icap/OptXact.cc +index 5ed4fbfbb..2464ea533 100644 +--- a/src/adaptation/icap/OptXact.cc ++++ b/src/adaptation/icap/OptXact.cc +@@ -37,7 +37,7 @@ void Adaptation::Icap::OptXact::start() + openConnection(); + } + +-void Adaptation::Icap::OptXact::handleCommConnected() ++void Adaptation::Icap::OptXact::startShoveling() + { + scheduleRead(); + +diff --git a/src/adaptation/icap/OptXact.h b/src/adaptation/icap/OptXact.h +index 3811ab48f..725cd6225 100644 +--- a/src/adaptation/icap/OptXact.h ++++ b/src/adaptation/icap/OptXact.h +@@ -30,8 +30,9 @@ public: + OptXact(ServiceRep::Pointer &aService); + + protected: ++ /* Xaction API */ + virtual void start(); +- virtual void handleCommConnected(); ++ virtual void startShoveling(); + virtual void handleCommWrote(size_t size); + virtual void handleCommRead(size_t size); + +diff --git a/src/adaptation/icap/ServiceRep.cc b/src/adaptation/icap/ServiceRep.cc +index 6df475712..fbd896888 100644 +--- a/src/adaptation/icap/ServiceRep.cc ++++ b/src/adaptation/icap/ServiceRep.cc +@@ -112,9 +112,10 @@ void Adaptation::Icap::ServiceRep::noteFailure() + // should be configurable. + } + +-// returns a persistent or brand new connection; negative int on failures ++// TODO: getIdleConnection() and putConnection()/noteConnectionFailed() manage a ++// "used connection slot" resource. Automate that resource tracking (RAII/etc.). + Comm::ConnectionPointer +-Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused) ++Adaptation::Icap::ServiceRep::getIdleConnection(const bool retriableXact) + { + Comm::ConnectionPointer connection; + +@@ -137,7 +138,6 @@ Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused) + else + theIdleConns->closeN(1); + +- reused = Comm::IsConnOpen(connection); + ++theBusyConns; + debugs(93,3, HERE << "got connection: " << connection); + return connection; +@@ -150,7 +150,6 @@ void Adaptation::Icap::ServiceRep::putConnection(const Comm::ConnectionPointer & + // do not pool an idle connection if we owe connections + if (isReusable && excessConnections() == 0) { + debugs(93, 3, HERE << "pushing pconn" << comment); +- commUnsetConnTimeout(conn); + theIdleConns->push(conn); + } else { + debugs(93, 3, HERE << (sendReset ? "RST" : "FIN") << "-closing " << +diff --git a/src/adaptation/icap/ServiceRep.h b/src/adaptation/icap/ServiceRep.h +index f2e893244..cca2df4ab 100644 +--- a/src/adaptation/icap/ServiceRep.h ++++ b/src/adaptation/icap/ServiceRep.h +@@ -85,7 +85,8 @@ public: + bool wantsPreview(const SBuf &urlPath, size_t &wantedSize) const; + bool allows204() const; + bool allows206() const; +- Comm::ConnectionPointer getConnection(bool isRetriable, bool &isReused); ++ /// \returns an idle persistent ICAP connection or nil ++ Comm::ConnectionPointer getIdleConnection(bool isRetriable); + void putConnection(const Comm::ConnectionPointer &conn, bool isReusable, bool sendReset, const char *comment); + void noteConnectionUse(const Comm::ConnectionPointer &conn); + void noteConnectionFailed(const char *comment); +diff --git a/src/adaptation/icap/Xaction.cc b/src/adaptation/icap/Xaction.cc +index 9aacf2e1b..962ed1703 100644 +--- a/src/adaptation/icap/Xaction.cc ++++ b/src/adaptation/icap/Xaction.cc +@@ -13,6 +13,7 @@ + #include "adaptation/icap/Config.h" + #include "adaptation/icap/Launcher.h" + #include "adaptation/icap/Xaction.h" ++#include "base/JobWait.h" + #include "base/TextException.h" + #include "comm.h" + #include "comm/Connection.h" +@@ -79,7 +80,6 @@ Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::Serv + icapRequest(NULL), + icapReply(NULL), + attempts(0), +- connection(NULL), + theService(aService), + commEof(false), + reuseConnection(true), +@@ -87,14 +87,8 @@ Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::Serv + isRepeatable(true), + ignoreLastWrite(false), + waitingForDns(false), +- stopReason(NULL), +- connector(NULL), +- reader(NULL), +- writer(NULL), +- closer(NULL), + alep(new AccessLogEntry), +- al(*alep), +- cs(NULL) ++ al(*alep) + { + debugs(93,3, typeName << " constructed, this=" << this << + " [icapx" << id << ']'); // we should not call virtual status() here +@@ -150,6 +144,8 @@ static void + icapLookupDnsResults(const ipcache_addrs *ia, const Dns::LookupDetails &, void *data) + { + Adaptation::Icap::Xaction *xa = static_cast<Adaptation::Icap::Xaction *>(data); ++ /// TODO: refactor with CallJobHere1, passing either std::optional (after upgrading to C++17) ++ /// or OptionalIp::Address (when it can take non-trivial types) + xa->dnsLookupDone(ia); + } + +@@ -164,21 +160,8 @@ Adaptation::Icap::Xaction::openConnection() + if (!TheConfig.reuse_connections) + disableRetries(); // this will also safely drain pconn pool + +- bool wasReused = false; +- connection = s.getConnection(isRetriable, wasReused); +- +- if (wasReused && Comm::IsConnOpen(connection)) { +- // Set comm Close handler +- // fake the connect callback +- // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead? +- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer; +- CbcPointer<Xaction> self(this); +- Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected); +- dialer.params.conn = connection; +- dialer.params.flag = Comm::OK; +- // fake other parameters by copying from the existing connection +- connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer); +- ScheduleCallHere(connector); ++ if (const auto pconn = s.getIdleConnection(isRetriable)) { ++ useTransportConnection(pconn); + return; + } + +@@ -207,30 +190,22 @@ Adaptation::Icap::Xaction::dnsLookupDone(const ipcache_addrs *ia) + #if WHEN_IPCACHE_NBGETHOSTBYNAME_USES_ASYNC_CALLS + dieOnConnectionFailure(); // throws + #else // take a step back into protected Async call dialing. +- // fake the connect callback +- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer; +- CbcPointer<Xaction> self(this); +- Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected); +- dialer.params.conn = connection; +- dialer.params.flag = Comm::COMM_ERROR; +- // fake other parameters by copying from the existing connection +- connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer); +- ScheduleCallHere(connector); ++ CallJobHere(93, 3, this, Xaction, Xaction::dieOnConnectionFailure); + #endif + return; + } + +- connection = new Comm::Connection; +- connection->remote = ia->current(); +- connection->remote.port(s.cfg().port); +- getOutgoingAddress(NULL, connection); ++ const Comm::ConnectionPointer conn = new Comm::Connection(); ++ conn->remote = ia->current(); ++ conn->remote.port(s.cfg().port); ++ getOutgoingAddress(nullptr, conn); + + // TODO: service bypass status may differ from that of a transaction + typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer; +- connector = JobCallback(93,3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected); +- cs = new Comm::ConnOpener(connection, connector, TheConfig.connect_timeout(service().cfg().bypass)); ++ AsyncCall::Pointer callback = JobCallback(93, 3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected); ++ const auto cs = new Comm::ConnOpener(conn, callback, TheConfig.connect_timeout(service().cfg().bypass)); + cs->setHost(s.cfg().host.termedBuf()); +- AsyncJob::Start(cs.get()); ++ transportWait.start(cs, callback); + } + + /* +@@ -256,6 +231,8 @@ void Adaptation::Icap::Xaction::closeConnection() + closer = NULL; + } + ++ commUnsetConnTimeout(connection); ++ + cancelRead(); // may not work + + if (reuseConnection && !doneWithIo()) { +@@ -275,54 +252,65 @@ void Adaptation::Icap::Xaction::closeConnection() + + writer = NULL; + reader = NULL; +- connector = NULL; + connection = NULL; + } + } + +-// connection with the ICAP service established ++/// called when the connection attempt to an ICAP service completes (successfully or not) + void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io) + { +- cs = NULL; ++ transportWait.finish(); + +- if (io.flag == Comm::TIMEOUT) { +- handleCommTimedout(); ++ if (io.flag != Comm::OK) { ++ dieOnConnectionFailure(); // throws + return; + } + +- Must(connector != NULL); +- connector = NULL; +- +- if (io.flag != Comm::OK) +- dieOnConnectionFailure(); // throws +- +- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer; +- AsyncCall::Pointer timeoutCall = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout", +- TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout)); +- commSetConnTimeout(io.conn, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall); ++ useTransportConnection(io.conn); ++} + +- typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer; +- closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", +- CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); +- comm_add_close_handler(io.conn->fd, closer); ++/// React to the availability of a transport connection to the ICAP service. ++/// The given connection may (or may not) be secured already. ++void ++Adaptation::Icap::Xaction::useTransportConnection(const Comm::ConnectionPointer &conn) ++{ ++ assert(Comm::IsConnOpen(conn)); ++ assert(!connection); + + // If it is a reused connection and the TLS object is built + // we should not negotiate new TLS session +- const auto &ssl = fd_table[io.conn->fd].ssl; ++ const auto &ssl = fd_table[conn->fd].ssl; + if (!ssl && service().cfg().secure.encryptTransport) { ++ // XXX: Exceptions orphan conn. + CbcPointerAdaptation::Icap::Xaction me(this); +- securer = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer", +- MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer)); ++ AsyncCall::Pointer callback = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer", ++ MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer)); + +- auto *sslConnector = new Ssl::IcapPeerConnector(theService, io.conn, securer, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass)); +- AsyncJob::Start(sslConnector); // will call our callback ++ const auto sslConnector = new Ssl::IcapPeerConnector(theService, conn, callback, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass)); ++ ++ encryptionWait.start(sslConnector, callback); + return; + } + +-// ?? fd_table[io.conn->fd].noteUse(icapPconnPool); ++ useIcapConnection(conn); ++} ++ ++/// react to the availability of a fully-ready ICAP connection ++void ++Adaptation::Icap::Xaction::useIcapConnection(const Comm::ConnectionPointer &conn) ++{ ++ assert(!connection); ++ assert(conn); ++ assert(Comm::IsConnOpen(conn)); ++ connection = conn; + service().noteConnectionUse(connection); + +- handleCommConnected(); ++ typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer; ++ closer = asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed", ++ CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed)); ++ comm_add_close_handler(connection->fd, closer); ++ ++ startShoveling(); + } + + void Adaptation::Icap::Xaction::dieOnConnectionFailure() +@@ -367,40 +355,25 @@ void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams &io) + + // communication timeout with the ICAP service + void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &) +-{ +- handleCommTimedout(); +-} +- +-void Adaptation::Icap::Xaction::handleCommTimedout() + { + debugs(93, 2, HERE << typeName << " failed: timeout with " << + theService->cfg().methodStr() << " " << + theService->cfg().uri << status()); + reuseConnection = false; +- const bool whileConnecting = connector != NULL; +- if (whileConnecting) { +- assert(!haveConnection()); +- theService->noteConnectionFailed("timedout"); +- } else +- closeConnection(); // so that late Comm callbacks do not disturb bypass +- throw TexcHere(whileConnecting ? +- "timed out while connecting to the ICAP service" : +- "timed out while talking to the ICAP service"); ++ assert(haveConnection()); ++ closeConnection(); ++ throw TextException("timed out while talking to the ICAP service", Here()); + } + + // unexpected connection close while talking to the ICAP service + void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &) + { +- if (securer != NULL) { +- securer->cancel("Connection closed before SSL negotiation finished"); +- securer = NULL; ++ if (connection) { ++ connection->noteClosure(); ++ connection = nullptr; + } + closer = NULL; +- handleCommClosed(); +-} + +-void Adaptation::Icap::Xaction::handleCommClosed() +-{ + static const auto d = MakeNamedErrorDetail("ICAP_XACT_CLOSE"); + detailError(d); + mustStop("ICAP service connection externally closed"); +@@ -424,7 +397,8 @@ void Adaptation::Icap::Xaction::callEnd() + + bool Adaptation::Icap::Xaction::doneAll() const + { +- return !waitingForDns && !connector && !securer && !reader && !writer && ++ return !waitingForDns && !transportWait && !encryptionWait && ++ !reader && !writer && + Adaptation::Initiate::doneAll(); + } + +@@ -568,7 +542,7 @@ bool Adaptation::Icap::Xaction::doneWriting() const + bool Adaptation::Icap::Xaction::doneWithIo() const + { + return haveConnection() && +- !connector && !reader && !writer && // fast checks, some redundant ++ !transportWait && !reader && !writer && // fast checks, some redundant + doneReading() && doneWriting(); + } + +@@ -608,10 +582,7 @@ void Adaptation::Icap::Xaction::setOutcome(const Adaptation::Icap::XactOutcome & + void Adaptation::Icap::Xaction::swanSong() + { + // kids should sing first and then call the parent method. +- if (cs.valid()) { +- debugs(93,6, HERE << id << " about to notify ConnOpener!"); +- CallJobHere(93, 3, cs, Comm::ConnOpener, noteAbort); +- cs = NULL; ++ if (transportWait || encryptionWait) { + service().noteConnectionFailed("abort"); + } + +@@ -750,20 +721,12 @@ Ssl::IcapPeerConnector::noteNegotiationDone(ErrorState *error) + void + Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer) + { +- Must(securer != NULL); +- securer = NULL; +- +- if (closer != NULL) { +- if (Comm::IsConnOpen(answer.conn)) +- comm_remove_close_handler(answer.conn->fd, closer); +- else +- closer->cancel("securing completed"); +- closer = NULL; +- } ++ encryptionWait.finish(); + ++ assert(!answer.tunneled); + if (answer.error.get()) { +- if (answer.conn != NULL) +- answer.conn->close(); ++ assert(!answer.conn); ++ // TODO: Refactor dieOnConnectionFailure() to be usable here as well. + debugs(93, 2, typeName << + " TLS negotiation to " << service().cfg().uri << " failed"); + service().noteConnectionFailed("failure"); +@@ -774,8 +737,18 @@ Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer) + + debugs(93, 5, "TLS negotiation to " << service().cfg().uri << " complete"); + +- service().noteConnectionUse(answer.conn); ++ assert(answer.conn); ++ ++ // The socket could get closed while our callback was queued. Sync ++ // Connection. XXX: Connection::fd may already be stale/invalid here. ++ if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) { ++ answer.conn->noteClosure(); ++ service().noteConnectionFailed("external TLS connection closure"); ++ static const auto d = MakeNamedErrorDetail("ICAP_XACT_SSL_CLOSE"); ++ detailError(d); ++ throw TexcHere("external closure of the TLS ICAP service connection"); ++ } + +- handleCommConnected(); ++ useIcapConnection(answer.conn); + } + +diff --git a/src/adaptation/icap/Xaction.h b/src/adaptation/icap/Xaction.h +index b66044594..31a6e22fc 100644 +--- a/src/adaptation/icap/Xaction.h ++++ b/src/adaptation/icap/Xaction.h +@@ -12,6 +12,7 @@ + #include "AccessLogEntry.h" + #include "adaptation/icap/ServiceRep.h" + #include "adaptation/Initiate.h" ++#include "base/JobWait.h" + #include "comm/ConnOpener.h" + #include "error/forward.h" + #include "HttpReply.h" +@@ -20,6 +21,10 @@ + + class MemBuf; + ++namespace Ssl { ++class IcapPeerConnector; ++} ++ + namespace Adaptation + { + namespace Icap +@@ -65,12 +70,12 @@ protected: + virtual void start(); + virtual void noteInitiatorAborted(); // TODO: move to Adaptation::Initiate + ++ /// starts sending/receiving ICAP messages ++ virtual void startShoveling() = 0; ++ + // comm hanndlers; called by comm handler wrappers +- virtual void handleCommConnected() = 0; + virtual void handleCommWrote(size_t sz) = 0; + virtual void handleCommRead(size_t sz) = 0; +- virtual void handleCommTimedout(); +- virtual void handleCommClosed(); + + void handleSecuredPeer(Security::EncryptorAnswer &answer); + /// record error detail if possible +@@ -78,7 +83,6 @@ protected: + + void openConnection(); + void closeConnection(); +- void dieOnConnectionFailure(); + bool haveConnection() const; + + void scheduleRead(); +@@ -124,11 +128,13 @@ public: + ServiceRep &service(); + + private: ++ void useTransportConnection(const Comm::ConnectionPointer &); ++ void useIcapConnection(const Comm::ConnectionPointer &); ++ void dieOnConnectionFailure(); + void tellQueryAborted(); + void maybeLog(); + + protected: +- Comm::ConnectionPointer connection; ///< ICAP server connection + Adaptation::Icap::ServiceRep::Pointer theService; + + SBuf readBuf; +@@ -139,13 +145,8 @@ protected: + bool ignoreLastWrite; + bool waitingForDns; ///< expecting a ipcache_nbgethostbyname() callback + +- const char *stopReason; +- +- // active (pending) comm callbacks for the ICAP server connection +- AsyncCall::Pointer connector; + AsyncCall::Pointer reader; + AsyncCall::Pointer writer; +- AsyncCall::Pointer closer; + + AccessLogEntry::Pointer alep; ///< icap.log entry + AccessLogEntry &al; ///< short for *alep +@@ -155,8 +156,16 @@ protected: + timeval icap_tio_finish; /*time when the last byte of the ICAP responsewas received*/ + + private: +- Comm::ConnOpener::Pointer cs; +- AsyncCall::Pointer securer; ///< whether we are securing a connection ++ /// waits for a transport connection to the ICAP server to be established/opened ++ JobWaitComm::ConnOpener transportWait; ++ ++ /// waits for the established transport connection to be secured/encrypted ++ JobWaitSsl::IcapPeerConnector encryptionWait; ++ ++ /// open and, if necessary, secured connection to the ICAP server (or nil) ++ Comm::ConnectionPointer connection; ++ ++ AsyncCall::Pointer closer; + }; + + } // namespace Icap +diff --git a/src/base/AsyncJob.cc b/src/base/AsyncJob.cc +index 3b9161e4a..111667d5d 100644 +--- a/src/base/AsyncJob.cc ++++ b/src/base/AsyncJob.cc +@@ -20,11 +20,11 @@ + + InstanceIdDefinitions(AsyncJob, "job"); + +-AsyncJob::Pointer AsyncJob::Start(AsyncJob *j) ++void ++AsyncJob::Start(const Pointer &job) + { +- AsyncJob::Pointer job(j); + CallJobHere(93, 5, job, AsyncJob, start); +- return job; ++ job->started_ = true; // it is the attempt that counts + } + + AsyncJob::AsyncJob(const char *aTypeName) : +@@ -38,6 +38,7 @@ AsyncJob::~AsyncJob() + { + debugs(93,5, "AsyncJob destructed, this=" << this << + " type=" << typeName << " [" << id << ']'); ++ assert(!started_ || swanSang_); + } + + void AsyncJob::start() +@@ -141,9 +142,16 @@ void AsyncJob::callEnd() + AsyncCall::Pointer inCallSaved = inCall; + void *thisSaved = this; + ++ // TODO: Swallow swanSong() exceptions to reduce memory leaks. ++ ++ // Job callback invariant: swanSong() is (only) called for started jobs. ++ // Here to detect violations in kids that forgot to call our swanSong(). ++ assert(started_); ++ ++ swanSang_ = true; // it is the attempt that counts + swanSong(); + +- delete this; // this is the only place where the object is deleted ++ delete this; // this is the only place where a started job is deleted + + // careful: this object does not exist any more + debugs(93, 6, HERE << *inCallSaved << " ended " << thisSaved); +diff --git a/src/base/AsyncJob.h b/src/base/AsyncJob.h +index 4d685dff5..a46e30071 100644 +--- a/src/base/AsyncJob.h ++++ b/src/base/AsyncJob.h +@@ -36,8 +36,13 @@ public: + public: + AsyncJob(const char *aTypeName); + +- /// starts a freshly created job (i.e., makes the job asynchronous) +- static Pointer Start(AsyncJob *job); ++ /// Promises to start the configured job (eventually). The job is deemed to ++ /// be running asynchronously beyond this point, so the caller should only ++ /// access the job object via AsyncCalls rather than directly. ++ /// ++ /// swanSong() is only called for jobs for which this method has returned ++ /// successfully (i.e. without throwing). ++ static void Start(const Pointer &job); + + protected: + // XXX: temporary method to replace "delete this" in jobs-in-transition. +@@ -62,6 +67,11 @@ public: + /// called when the job throws during an async call + virtual void callException(const std::exception &e); + ++ /// process external request to terminate now (i.e. during this async call) ++ void handleStopRequest() { mustStop("externally aborted"); } ++ ++ const InstanceId<AsyncJob> id; ///< job identifier ++ + protected: + // external destruction prohibited to ensure swanSong() is called + virtual ~AsyncJob(); +@@ -69,7 +79,9 @@ protected: + const char *stopReason; ///< reason for forcing done() to be true + const char *typeName; ///< kid (leaf) class name, for debugging + AsyncCall::Pointer inCall; ///< the asynchronous call being handled, if any +- const InstanceId<AsyncJob> id; ///< job identifier ++ ++ bool started_ = false; ///< Start() has finished successfully ++ bool swanSang_ = false; ///< swanSong() was called + }; + + #endif /* SQUID_ASYNC_JOB_H */ +diff --git a/src/base/JobWait.cc b/src/base/JobWait.cc +new file mode 100644 +index 000000000..ba6832415 +--- /dev/null ++++ b/src/base/JobWait.cc +@@ -0,0 +1,81 @@ ++/* ++ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors ++ * ++ * Squid software is distributed under GPLv2+ license and includes ++ * contributions from numerous individuals and organizations. ++ * Please see the COPYING and CONTRIBUTORS files for details. ++ */ ++ ++#include "squid.h" ++#include "base/AsyncJobCalls.h" ++#include "base/JobWait.h" ++ ++#include <cassert> ++#include <iostream> ++ ++JobWaitBase::JobWaitBase() = default; ++ ++JobWaitBase::~JobWaitBase() ++{ ++ cancel("owner gone"); ++} ++ ++void ++JobWaitBase::start_(const AsyncJob::Pointer aJob, const AsyncCall::Pointer aCall) ++{ ++ // Invariant: The wait will be over. We cannot guarantee that the job will ++ // call the callback, of course, but we can check these prerequisites. ++ assert(aCall); ++ assert(aJob.valid()); ++ ++ // "Double" waiting state leads to conflicting/mismatching callbacks ++ // detailed in finish(). Detect that bug ASAP. ++ assert(!waiting()); ++ ++ assert(!callback_); ++ assert(!job_); ++ callback_ = aCall; ++ job_ = aJob; ++ ++ AsyncJob::Start(job_.get()); ++} ++ ++void ++JobWaitBase::finish() ++{ ++ // Unexpected callbacks might result in disasters like secrets exposure, ++ // data corruption, or expensive message routing mistakes when the callback ++ // info is applied to the wrong message part or acted upon prematurely. ++ assert(waiting()); ++ clear(); ++} ++ ++void ++JobWaitBase::cancel(const char *reason) ++{ ++ if (callback_) { ++ callback_->cancel(reason); ++ ++ // Instead of AsyncJob, the class parameter could be Job. That would ++ // avoid runtime child-to-parent CbcPointer conversion overheads, but ++ // complicate support for Jobs with virtual AsyncJob bases (GCC error: ++ // "pointer to member conversion via virtual base AsyncJob") and also ++ // cache-log "Job::handleStopRequest()" with a non-existent class name. ++ CallJobHere(callback_->debugSection, callback_->debugLevel, job_, AsyncJob, handleStopRequest); ++ ++ clear(); ++ } ++} ++ ++void ++JobWaitBase::print(std::ostream &os) const ++{ ++ // use a backarrow to emphasize that this is a callback: call24<-job6 ++ if (callback_) ++ os << callback_->id << "<-"; ++ if (const auto rawJob = job_.get()) ++ os << rawJob->id; ++ else ++ os << job_; // raw pointer of a gone job may still be useful for triage ++} ++ +diff --git a/src/base/JobWait.h b/src/base/JobWait.h +new file mode 100644 +index 000000000..6b1131331 +--- /dev/null ++++ b/src/base/JobWait.h +@@ -0,0 +1,91 @@ ++/* ++ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors ++ * ++ * Squid software is distributed under GPLv2+ license and includes ++ * contributions from numerous individuals and organizations. ++ * Please see the COPYING and CONTRIBUTORS files for details. ++ */ ++ ++#ifndef SQUID_BASE_JOBWAIT_H ++#define SQUID_BASE_JOBWAIT_H ++ ++#include "base/AsyncJob.h" ++#include "base/CbcPointer.h" ++ ++#include <iosfwd> ++ ++/// Manages waiting for an AsyncJob callback. Use type-safe JobWait instead. ++/// This base class does not contain code specific to the actual Job type. ++class JobWaitBase ++{ ++public: ++ JobWaitBase(); ++ ~JobWaitBase(); ++ ++ /// no copying of any kind: each waiting context needs a dedicated AsyncCall ++ JobWaitBase(JobWaitBase &&) = delete; ++ ++ explicit operator bool() const { return waiting(); } ++ ++ /// whether we are currently waiting for the job to call us back ++ /// the job itself may be gone even if this returns true ++ bool waiting() const { return bool(callback_); } ++ ++ /// ends wait (after receiving the call back) ++ /// forgets the job which is likely to be gone by now ++ void finish(); ++ ++ /// aborts wait (if any) before receiving the call back ++ /// does nothing if we are not waiting ++ void cancel(const char *reason); ++ ++ /// summarizes what we are waiting for (for debugging) ++ void print(std::ostream &) const; ++ ++protected: ++ /// starts waiting for the given job to call the given callback ++ void start_(AsyncJob::Pointer, AsyncCall::Pointer); ++ ++private: ++ /// the common part of finish() and cancel() ++ void clear() { job_.clear(); callback_ = nullptr; } ++ ++ /// the job that we are waiting to call us back (or nil) ++ AsyncJob::Pointer job_; ++ ++ /// the call we are waiting for the job_ to make (or nil) ++ AsyncCall::Pointer callback_; ++}; ++ ++/// Manages waiting for an AsyncJob callback. ++/// Completes JobWaitBase by providing Job type-specific members. ++template <class Job> ++class JobWait: public JobWaitBase ++{ ++public: ++ typedef CbcPointer<Job> JobPointer; ++ ++ /// starts waiting for the given job to call the given callback ++ void start(const JobPointer &aJob, const AsyncCall::Pointer &aCallback) { ++ start_(aJob, aCallback); ++ typedJob_ = aJob; ++ } ++ ++ /// \returns a cbdata pointer to the job we are waiting for (or nil) ++ /// the returned pointer may be falsy, even if we are still waiting() ++ JobPointer job() const { return waiting() ? typedJob_ : nullptr; } ++ ++private: ++ /// nearly duplicates JobWaitBase::job_ but exposes the actual job type ++ JobPointer typedJob_; ++}; ++ ++inline ++std::ostream &operator <<(std::ostream &os, const JobWaitBase &wait) ++{ ++ wait.print(os); ++ return os; ++} ++ ++#endif /* SQUID_BASE_JOBWAIT_H */ ++ +diff --git a/src/base/Makefile.am b/src/base/Makefile.am +index c9564d755..374ea8798 100644 +--- a/src/base/Makefile.am ++++ b/src/base/Makefile.am +@@ -34,6 +34,8 @@ libbase_la_SOURCES = \ + Here.h \ + InstanceId.cc \ + InstanceId.h \ ++ JobWait.cc \ ++ JobWait.h \ + Lock.h \ + LookupTable.h \ + LruMap.h \ +diff --git a/src/base/forward.h b/src/base/forward.h +index 3803e1ea0..46de97c8d 100644 +--- a/src/base/forward.h ++++ b/src/base/forward.h +@@ -17,6 +17,7 @@ class ScopedId; + + template<class Cbc> class CbcPointer; + template<class RefCountableKid> class RefCount; ++template<class Job> class JobWait; + + typedef CbcPointer<AsyncJob> AsyncJobPointer; + typedef RefCount<CodeContext> CodeContextPointer; +diff --git a/src/client_side.cc b/src/client_side.cc +index b8d786423..4eb697696 100644 +--- a/src/client_side.cc ++++ b/src/client_side.cc +@@ -503,6 +503,10 @@ httpRequestFree(void *data) + /* This is a handler normally called by comm_close() */ + void ConnStateData::connStateClosed(const CommCloseCbParams &) + { ++ if (clientConnection) { ++ clientConnection->noteClosure(); ++ // keep closed clientConnection for logging, clientdb cleanup, etc. ++ } + deleteThis("ConnStateData::connStateClosed"); + } + +diff --git a/src/clients/FtpClient.cc b/src/clients/FtpClient.cc +index 7874db121..afe7827dc 100644 +--- a/src/clients/FtpClient.cc ++++ b/src/clients/FtpClient.cc +@@ -201,10 +201,6 @@ Ftp::Client::Client(FwdState *fwdState): + + Ftp::Client::~Client() + { +- if (data.opener != NULL) { +- data.opener->cancel("Ftp::Client destructed"); +- data.opener = NULL; +- } + data.close(); + + safe_free(old_request); +@@ -786,10 +782,10 @@ Ftp::Client::connectDataChannel() + debugs(9, 3, "connecting to " << conn->remote); + + typedef CommCbMemFunT<Client, CommConnectCbParams> Dialer; +- data.opener = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected); +- Comm::ConnOpener *cs = new Comm::ConnOpener(conn, data.opener, Config.Timeout.connect); ++ AsyncCall::Pointer callback = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected); ++ const auto cs = new Comm::ConnOpener(conn, callback, Config.Timeout.connect); + cs->setHost(data.host); +- AsyncJob::Start(cs); ++ dataConnWait.start(cs, callback); + } + + bool +@@ -811,10 +807,11 @@ void + Ftp::Client::dataClosed(const CommCloseCbParams &) + { + debugs(9, 4, status()); ++ if (data.conn) ++ data.conn->noteClosure(); + if (data.listenConn != NULL) { + data.listenConn->close(); + data.listenConn = NULL; +- // NP clear() does the: data.fd = -1; + } + data.clear(); + } +@@ -879,6 +876,8 @@ void + Ftp::Client::ctrlClosed(const CommCloseCbParams &) + { + debugs(9, 4, status()); ++ if (ctrl.conn) ++ ctrl.conn->noteClosure(); + ctrl.clear(); + doneWithFwd = "ctrlClosed()"; // assume FwdState is monitoring too + mustStop("Ftp::Client::ctrlClosed"); +diff --git a/src/clients/FtpClient.h b/src/clients/FtpClient.h +index ac8d22e18..4b2dd61d5 100644 +--- a/src/clients/FtpClient.h ++++ b/src/clients/FtpClient.h +@@ -64,7 +64,6 @@ public: + */ + Comm::ConnectionPointer listenConn; + +- AsyncCall::Pointer opener; ///< Comm opener handler callback. + private: + AsyncCall::Pointer closer; ///< Comm close handler callback + }; +@@ -205,6 +204,10 @@ protected: + virtual void sentRequestBody(const CommIoCbParams &io); + virtual void doneSendingRequestBody(); + ++ /// Waits for an FTP data connection to the server to be established/opened. ++ /// This wait only happens in FTP passive mode (via PASV or EPSV). ++ JobWaitComm::ConnOpener dataConnWait; ++ + private: + bool parseControlReply(size_t &bytesUsed); + +diff --git a/src/clients/FtpGateway.cc b/src/clients/FtpGateway.cc +index 6a7d139ca..aeeaedb48 100644 +--- a/src/clients/FtpGateway.cc ++++ b/src/clients/FtpGateway.cc +@@ -486,11 +486,9 @@ Ftp::Gateway::timeout(const CommTimeoutCbParams &io) + flags.pasv_supported = false; + debugs(9, DBG_IMPORTANT, "FTP Gateway timeout in SENT_PASV state"); + +- // cancel the data connection setup. +- if (data.opener != NULL) { +- data.opener->cancel("timeout"); +- data.opener = NULL; +- } ++ // cancel the data connection setup, if any ++ dataConnWait.cancel("timeout"); ++ + data.close(); + } + +@@ -1723,7 +1721,7 @@ void + Ftp::Gateway::dataChannelConnected(const CommConnectCbParams &io) + { + debugs(9, 3, HERE); +- data.opener = NULL; ++ dataConnWait.finish(); + + if (io.flag != Comm::OK) { + debugs(9, 2, HERE << "Failed to connect. Retrying via another method."); +@@ -2727,9 +2725,9 @@ Ftp::Gateway::mayReadVirginReplyBody() const + return !doneWithServer(); + } + +-AsyncJob::Pointer ++void + Ftp::StartGateway(FwdState *const fwdState) + { +- return AsyncJob::Start(new Ftp::Gateway(fwdState)); ++ AsyncJob::Start(new Ftp::Gateway(fwdState)); + } + +diff --git a/src/clients/FtpRelay.cc b/src/clients/FtpRelay.cc +index 8796a74c8..ce6ba3ba6 100644 +--- a/src/clients/FtpRelay.cc ++++ b/src/clients/FtpRelay.cc +@@ -739,7 +739,7 @@ void + Ftp::Relay::dataChannelConnected(const CommConnectCbParams &io) + { + debugs(9, 3, status()); +- data.opener = NULL; ++ dataConnWait.finish(); + + if (io.flag != Comm::OK) { + debugs(9, 2, "failed to connect FTP server data channel"); +@@ -804,9 +804,9 @@ Ftp::Relay::HandleStoreAbort(Relay *ftpClient) + ftpClient->dataComplete(); + } + +-AsyncJob::Pointer ++void + Ftp::StartRelay(FwdState *const fwdState) + { +- return AsyncJob::Start(new Ftp::Relay(fwdState)); ++ AsyncJob::Start(new Ftp::Relay(fwdState)); + } + +diff --git a/src/clients/HttpTunneler.cc b/src/clients/HttpTunneler.cc +index 023a8586c..67a52e662 100644 +--- a/src/clients/HttpTunneler.cc ++++ b/src/clients/HttpTunneler.cc +@@ -101,6 +101,11 @@ void + Http::Tunneler::handleConnectionClosure(const CommCloseCbParams ¶ms) + { + closer = nullptr; ++ if (connection) { ++ countFailingConnection(); ++ connection->noteClosure(); ++ connection = nullptr; ++ } + bailWith(new ErrorState(ERR_CONNECT_FAIL, Http::scBadGateway, request.getRaw(), al)); + } + +@@ -360,50 +365,64 @@ Http::Tunneler::bailWith(ErrorState *error) + Must(error); + answer().squidError = error; + +- if (const auto p = connection->getPeer()) +- peerConnectFailed(p); ++ if (const auto failingConnection = connection) { ++ // TODO: Reuse to-peer connections after a CONNECT error response. ++ countFailingConnection(); ++ disconnect(); ++ failingConnection->close(); ++ } + + callBack(); +- disconnect(); +- +- if (noteFwdPconnUse) +- fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses); +- // TODO: Reuse to-peer connections after a CONNECT error response. +- connection->close(); +- connection = nullptr; + } + + void + Http::Tunneler::sendSuccess() + { + assert(answer().positive()); +- callBack(); ++ assert(Comm::IsConnOpen(connection)); ++ answer().conn = connection; + disconnect(); ++ callBack(); ++} ++ ++void ++Http::Tunneler::countFailingConnection() ++{ ++ assert(connection); ++ if (const auto p = connection->getPeer()) ++ peerConnectFailed(p); ++ if (noteFwdPconnUse && connection->isOpen()) ++ fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses); + } + + void + Http::Tunneler::disconnect() + { ++ const auto stillOpen = Comm::IsConnOpen(connection); ++ + if (closer) { +- comm_remove_close_handler(connection->fd, closer); ++ if (stillOpen) ++ comm_remove_close_handler(connection->fd, closer); + closer = nullptr; + } + + if (reader) { +- Comm::ReadCancel(connection->fd, reader); ++ if (stillOpen) ++ Comm::ReadCancel(connection->fd, reader); + reader = nullptr; + } + +- // remove connection timeout handler +- commUnsetConnTimeout(connection); ++ if (stillOpen) ++ commUnsetConnTimeout(connection); ++ ++ connection = nullptr; // may still be open + } + + void + Http::Tunneler::callBack() + { +- debugs(83, 5, connection << status()); +- if (answer().positive()) +- answer().conn = connection; ++ debugs(83, 5, answer().conn << status()); ++ assert(!connection); // returned inside answer() or gone + auto cb = callback; + callback = nullptr; + ScheduleCallHere(cb); +@@ -415,11 +434,10 @@ Http::Tunneler::swanSong() + AsyncJob::swanSong(); + + if (callback) { +- if (requestWritten && tunnelEstablished) { ++ if (requestWritten && tunnelEstablished && Comm::IsConnOpen(connection)) { + sendSuccess(); + } else { +- // we should have bailed when we discovered the job-killing problem +- debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while establishing a CONNECT tunnel " << connection << status()); ++ // job-ending emergencies like handleStopRequest() or callException() + bailWith(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al)); + } + assert(!callback); +diff --git a/src/clients/HttpTunneler.h b/src/clients/HttpTunneler.h +index c4c096fe7..e88d17610 100644 +--- a/src/clients/HttpTunneler.h ++++ b/src/clients/HttpTunneler.h +@@ -87,6 +87,7 @@ protected: + void handleResponse(const bool eof); + void bailOnResponseError(const char *error, HttpReply *); + ++private: + /// sends the given error to the initiator + void bailWith(ErrorState*); + +@@ -96,12 +97,14 @@ protected: + /// a bailWith(), sendSuccess() helper: sends results to the initiator + void callBack(); + +- /// a bailWith(), sendSuccess() helper: stops monitoring the connection ++ /// stops monitoring the connection + void disconnect(); + ++ /// updates connection usage history before the connection is closed ++ void countFailingConnection(); ++ + TunnelerAnswer &answer(); + +-private: + AsyncCall::Pointer writer; ///< called when the request has been written + AsyncCall::Pointer reader; ///< called when the response should be read + AsyncCall::Pointer closer; ///< called when the connection is being closed +diff --git a/src/clients/forward.h b/src/clients/forward.h +index 0351557da..82e5eb9bd 100644 +--- a/src/clients/forward.h ++++ b/src/clients/forward.h +@@ -28,10 +28,10 @@ namespace Ftp + { + + /// A new FTP Gateway job +-AsyncJobPointer StartGateway(FwdState *const fwdState); ++void StartGateway(FwdState *const fwdState); + + /// A new FTP Relay job +-AsyncJobPointer StartRelay(FwdState *const fwdState); ++void StartRelay(FwdState *const fwdState); + + /** Construct an URI with leading / in PATH portion for use by CWD command + * possibly others. FTP encodes absolute paths as beginning with '/' +diff --git a/src/comm.cc b/src/comm.cc +index 54ba16271..a0913f324 100644 +--- a/src/comm.cc ++++ b/src/comm.cc +@@ -743,6 +743,10 @@ commCallCloseHandlers(int fd) + // If call is not canceled schedule it for execution else ignore it + if (!call->canceled()) { + debugs(5, 5, "commCallCloseHandlers: ch->handler=" << call); ++ // XXX: Without the following code, callback fd may be -1. ++ // typedef CommCloseCbParams Params; ++ // auto ¶ms = GetCommParams<Params>(call); ++ // params.fd = fd; + ScheduleCallHere(call); + } + } +@@ -1787,6 +1791,10 @@ DeferredReadManager::CloseHandler(const CommCloseCbParams ¶ms) + CbDataList<DeferredRead> *temp = (CbDataList<DeferredRead> *)params.data; + + temp->element.closer = NULL; ++ if (temp->element.theRead.conn) { ++ temp->element.theRead.conn->noteClosure(); ++ temp->element.theRead.conn = nullptr; ++ } + temp->element.markCancelled(); + } + +@@ -1860,6 +1868,11 @@ DeferredReadManager::kickARead(DeferredRead const &aRead) + if (aRead.cancelled) + return; + ++ // TODO: This check still allows theReader call with a closed theRead.conn. ++ // If a delayRead() caller has a close connection handler, then such a call ++ // would be useless and dangerous. If a delayRead() caller does not have it, ++ // then the caller will get stuck when an external connection closure makes ++ // aRead.cancelled (checked above) true. + if (Comm::IsConnOpen(aRead.theRead.conn) && fd_table[aRead.theRead.conn->fd].closing()) + return; + +diff --git a/src/comm/ConnOpener.cc b/src/comm/ConnOpener.cc +index dc376b778..19c1237ae 100644 +--- a/src/comm/ConnOpener.cc ++++ b/src/comm/ConnOpener.cc +@@ -41,6 +41,14 @@ Comm::ConnOpener::ConnOpener(const Comm::ConnectionPointer &c, const AsyncCall:: + deadline_(squid_curtime + static_cast<time_t>(ctimeout)) + { + debugs(5, 3, "will connect to " << c << " with " << ctimeout << " timeout"); ++ assert(conn_); // we know where to go ++ ++ // Sharing a being-modified Connection object with the caller is dangerous, ++ // but we cannot ban (or even check for) that using existing APIs. We do not ++ // want to clone "just in case" because cloning is a bit expensive, and most ++ // callers already have a non-owned Connection object to give us. Until the ++ // APIs improve, we can only check that the connection is not open. ++ assert(!conn_->isOpen()); + } + + Comm::ConnOpener::~ConnOpener() +@@ -78,6 +86,10 @@ Comm::ConnOpener::swanSong() + if (temporaryFd_ >= 0) + closeFd(); + ++ // did we abort while owning an open connection? ++ if (conn_ && conn_->isOpen()) ++ conn_->close(); ++ + // did we abort while waiting between retries? + if (calls_.sleep_) + cancelSleep(); +@@ -131,9 +143,18 @@ Comm::ConnOpener::sendAnswer(Comm::Flag errFlag, int xerrno, const char *why) + " [" << callback_->id << ']' ); + // TODO save the pconn to the pconnPool ? + } else { ++ assert(conn_); ++ ++ // free resources earlier and simplify recipients ++ if (errFlag != Comm::OK) ++ conn_->close(); // may not be opened ++ else ++ assert(conn_->isOpen()); ++ + typedef CommConnectCbParams Params; + Params ¶ms = GetCommParams<Params>(callback_); + params.conn = conn_; ++ conn_ = nullptr; // release ownership; prevent closure by us + params.flag = errFlag; + params.xerrno = xerrno; + ScheduleCallHere(callback_); +@@ -152,7 +173,7 @@ Comm::ConnOpener::sendAnswer(Comm::Flag errFlag, int xerrno, const char *why) + void + Comm::ConnOpener::cleanFd() + { +- debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_); ++ debugs(5, 4, conn_ << "; temp FD " << temporaryFd_); + + Must(temporaryFd_ >= 0); + fde &f = fd_table[temporaryFd_]; +@@ -258,6 +279,7 @@ bool + Comm::ConnOpener::createFd() + { + Must(temporaryFd_ < 0); ++ assert(conn_); + + // our initators signal abort by cancelling their callbacks + if (callback_ == NULL || callback_->canceled()) +diff --git a/src/comm/ConnOpener.h b/src/comm/ConnOpener.h +index 329d8a3c8..e1e60649d 100644 +--- a/src/comm/ConnOpener.h ++++ b/src/comm/ConnOpener.h +@@ -19,16 +19,13 @@ + namespace Comm + { + +-/** +- * Async-opener of a Comm connection. +- */ ++/// Asynchronously opens a TCP connection. Returns CommConnectCbParams: either ++/// Comm::OK with an open connection or another Comm::Flag with a closed one. + class ConnOpener : public AsyncJob + { + CBDATA_CLASS(ConnOpener); + + public: +- void noteAbort() { mustStop("externally aborted"); } +- + typedef CbcPointer<ConnOpener> Pointer; + + virtual bool doneAll() const; +diff --git a/src/comm/Connection.cc b/src/comm/Connection.cc +index 4e9719a01..9bc08da05 100644 +--- a/src/comm/Connection.cc ++++ b/src/comm/Connection.cc +@@ -7,6 +7,7 @@ + */ + + #include "squid.h" ++#include "base/JobWait.h" + #include "CachePeer.h" + #include "cbdata.h" + #include "comm.h" +@@ -60,26 +61,44 @@ Comm::Connection::~Connection() + } + + Comm::ConnectionPointer +-Comm::Connection::cloneDestinationDetails() const ++Comm::Connection::cloneProfile() const + { +- const ConnectionPointer c = new Comm::Connection; +- c->setAddrs(local, remote); +- c->peerType = peerType; +- c->flags = flags; +- c->peer_ = cbdataReference(getPeer()); +- assert(!c->isOpen()); +- return c; +-} ++ const ConnectionPointer clone = new Comm::Connection; ++ auto &c = *clone; // optimization ++ ++ /* ++ * Copy or excuse each data member. Excused members do not belong to a ++ * Connection configuration profile because their values cannot be reused ++ * across (co-existing) Connection objects and/or are tied to their own ++ * object lifetime. ++ */ ++ ++ c.setAddrs(local, remote); ++ c.peerType = peerType; ++ // fd excused ++ c.tos = tos; ++ c.nfmark = nfmark; ++ c.nfConnmark = nfConnmark; ++ // COMM_ORPHANED is not a part of connection opening instructions ++ c.flags = flags & ~COMM_ORPHANED; ++ // rfc931 is excused ++ ++#if USE_SQUID_EUI ++ // These are currently only set when accepting connections and never used ++ // for establishing new ones, so this copying is currently in vain, but, ++ // technically, they can be a part of connection opening instructions. ++ c.remoteEui48 = remoteEui48; ++ c.remoteEui64 = remoteEui64; ++#endif + +-Comm::ConnectionPointer +-Comm::Connection::cloneIdentDetails() const +-{ +- auto c = cloneDestinationDetails(); +- c->tos = tos; +- c->nfmark = nfmark; +- c->nfConnmark = nfConnmark; +- c->startTime_ = startTime_; +- return c; ++ // id excused ++ c.peer_ = cbdataReference(getPeer()); ++ // startTime_ excused ++ // tlsHistory excused ++ ++ debugs(5, 5, this << " made " << c); ++ assert(!c.isOpen()); ++ return clone; + } + + void +diff --git a/src/comm/Connection.h b/src/comm/Connection.h +index 5b66943ba..40c22491d 100644 +--- a/src/comm/Connection.h ++++ b/src/comm/Connection.h +@@ -77,13 +77,12 @@ public: + /** Clear the connection properties and close any open socket. */ + virtual ~Connection(); + +- /// Create a new (closed) IDENT Connection object based on our from-Squid +- /// connection properties. +- ConnectionPointer cloneIdentDetails() const; ++ /// To prevent accidental copying of Connection objects that we started to ++ /// open or that are open, use cloneProfile() instead. ++ Connection(const Connection &&) = delete; + +- /// Create a new (closed) Connection object pointing to the same destination +- /// as this from-Squid connection. +- ConnectionPointer cloneDestinationDetails() const; ++ /// Create a new closed Connection with the same configuration as this one. ++ ConnectionPointer cloneProfile() const; + + /// close the still-open connection when its last reference is gone + void enterOrphanage() { flags |= COMM_ORPHANED; } +@@ -140,17 +139,6 @@ public: + virtual ScopedId codeContextGist() const override; + virtual std::ostream &detailCodeContext(std::ostream &os) const override; + +-private: +- /** These objects may not be exactly duplicated. Use cloneIdentDetails() or +- * cloneDestinationDetails() instead. +- */ +- Connection(const Connection &c); +- +- /** These objects may not be exactly duplicated. Use cloneIdentDetails() or +- * cloneDestinationDetails() instead. +- */ +- Connection & operator =(const Connection &c); +- + public: + /** Address/Port for the Squid end of a TCP link. */ + Ip::Address local; +diff --git a/src/comm/TcpAcceptor.cc b/src/comm/TcpAcceptor.cc +index 341622e0f..510efa94f 100644 +--- a/src/comm/TcpAcceptor.cc ++++ b/src/comm/TcpAcceptor.cc +@@ -206,7 +206,10 @@ void + Comm::TcpAcceptor::handleClosure(const CommCloseCbParams &) + { + closer_ = NULL; +- conn = NULL; ++ if (conn) { ++ conn->noteClosure(); ++ conn = nullptr; ++ } + Must(done()); + } + +diff --git a/src/dns_internal.cc b/src/dns_internal.cc +index e1f4d3ee7..72078c64e 100644 +--- a/src/dns_internal.cc ++++ b/src/dns_internal.cc +@@ -872,6 +872,10 @@ static void + idnsVCClosed(const CommCloseCbParams ¶ms) + { + nsvc * vc = (nsvc *)params.data; ++ if (vc->conn) { ++ vc->conn->noteClosure(); ++ vc->conn = nullptr; ++ } + delete vc; + } + +diff --git a/src/eui/Eui48.h b/src/eui/Eui48.h +index 11f4e51b1..9aab5bbb7 100644 +--- a/src/eui/Eui48.h ++++ b/src/eui/Eui48.h +@@ -29,10 +29,8 @@ class Eui48 + + public: + Eui48() { clear(); } +- Eui48(const Eui48 &t) { memcpy(this, &t, sizeof(Eui48)); } + bool operator== (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) == 0; } + bool operator< (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) < 0; } +- ~Eui48() {} + + const unsigned char *get(void); + +diff --git a/src/gopher.cc b/src/gopher.cc +index 455220c2e..576a3f7b1 100644 +--- a/src/gopher.cc ++++ b/src/gopher.cc +@@ -98,11 +98,8 @@ public: + entry->lock("gopherState"); + *replybuf = 0; + } +- ~GopherStateData() {if(buf) swanSong();} + +- /* AsyncJob API emulated */ +- void deleteThis(const char *aReason); +- void swanSong(); ++ ~GopherStateData(); + + public: + StoreEntry *entry; +@@ -156,30 +153,18 @@ static void + gopherStateFree(const CommCloseCbParams ¶ms) + { + GopherStateData *gopherState = (GopherStateData *)params.data; +- +- if (gopherState == NULL) +- return; +- +- gopherState->deleteThis("gopherStateFree"); ++ // Assume that FwdState is monitoring and calls noteClosure(). See XXX about ++ // Connection sharing with FwdState in gopherStart(). ++ delete gopherState; + } + +-void +-GopherStateData::deleteThis(const char *) +-{ +- swanSong(); +- delete this; +-} +- +-void +-GopherStateData::swanSong() ++GopherStateData::~GopherStateData() + { + if (entry) + entry->unlock("gopherState"); + +- if (buf) { ++ if (buf) + memFree(buf, MEM_4K_BUF); +- buf = nullptr; +- } + } + + /** +@@ -986,6 +971,7 @@ gopherStart(FwdState * fwd) + return; + } + ++ // XXX: Sharing open Connection with FwdState that has its own handlers/etc. + gopherState->serverConn = fwd->serverConnection(); + gopherSendRequest(fwd->serverConnection()->fd, gopherState); + AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "gopherTimeout", +diff --git a/src/ident/Ident.cc b/src/ident/Ident.cc +index 50dc6baeb..3c3ae5e2f 100644 +--- a/src/ident/Ident.cc ++++ b/src/ident/Ident.cc +@@ -11,6 +11,7 @@ + #include "squid.h" + + #if USE_IDENT ++#include "base/JobWait.h" + #include "comm.h" + #include "comm/Connection.h" + #include "comm/ConnOpener.h" +@@ -53,8 +54,15 @@ public: + + Comm::ConnectionPointer conn; + MemBuf queryMsg; ///< the lookup message sent to IDENT server +- IdentClient *clients; ++ IdentClient *clients = nullptr; + char buf[IDENT_BUFSIZE]; ++ ++ /// waits for a connection to the IDENT server to be established/opened ++ JobWaitComm::ConnOpener connWait; ++ ++private: ++ // use deleteThis() to destroy ++ ~IdentStateData(); + }; + + CBDATA_CLASS_INIT(IdentStateData); +@@ -73,8 +81,9 @@ static void ClientAdd(IdentStateData * state, IDCB * callback, void *callback_da + Ident::IdentConfig Ident::TheConfig; + + void +-Ident::IdentStateData::deleteThis(const char *) ++Ident::IdentStateData::deleteThis(const char *reason) + { ++ debugs(30, 3, reason); + swanSong(); + delete this; + } +@@ -84,6 +93,10 @@ Ident::IdentStateData::swanSong() + { + if (clients != NULL) + notify(NULL); ++} ++ ++Ident::IdentStateData::~IdentStateData() { ++ assert(!clients); + + if (Comm::IsConnOpen(conn)) { + comm_remove_close_handler(conn->fd, Ident::Close, this); +@@ -112,6 +125,10 @@ void + Ident::Close(const CommCloseCbParams ¶ms) + { + IdentStateData *state = (IdentStateData *)params.data; ++ if (state->conn) { ++ state->conn->noteClosure(); ++ state->conn = nullptr; ++ } + state->deleteThis("connection closed"); + } + +@@ -127,6 +144,16 @@ void + Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int, void *data) + { + IdentStateData *state = (IdentStateData *)data; ++ state->connWait.finish(); ++ ++ // Start owning the supplied connection (so that it is not orphaned if this ++ // function bails early). As a (tiny) optimization or perhaps just diff ++ // minimization, the close handler is added later, when we know we are not ++ // bailing. This delay is safe because comm_remove_close_handler() forgives ++ // missing handlers. ++ assert(conn); // but may be closed ++ assert(!state->conn); ++ state->conn = conn; + + if (status != Comm::OK) { + if (status == Comm::TIMEOUT) +@@ -149,8 +176,8 @@ Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int, + return; + } + +- assert(conn != NULL && conn == state->conn); +- comm_add_close_handler(conn->fd, Ident::Close, state); ++ assert(state->conn->isOpen()); ++ comm_add_close_handler(state->conn->fd, Ident::Close, state); + + AsyncCall::Pointer writeCall = commCbCall(5,4, "Ident::WriteFeedback", + CommIoCbPtrFun(Ident::WriteFeedback, state)); +@@ -259,10 +286,10 @@ Ident::Start(const Comm::ConnectionPointer &conn, IDCB * callback, void *data) + state->hash.key = xstrdup(key); + + // copy the conn details. We do not want the original FD to be re-used by IDENT. +- state->conn = conn->cloneIdentDetails(); ++ const auto identConn = conn->cloneProfile(); + // NP: use random port for secure outbound to IDENT_PORT +- state->conn->local.port(0); +- state->conn->remote.port(IDENT_PORT); ++ identConn->local.port(0); ++ identConn->remote.port(IDENT_PORT); + + // build our query from the original connection details + state->queryMsg.init(); +@@ -272,7 +299,8 @@ Ident::Start(const Comm::ConnectionPointer &conn, IDCB * callback, void *data) + hash_join(ident_hash, &state->hash); + + AsyncCall::Pointer call = commCbCall(30,3, "Ident::ConnectDone", CommConnectCbPtrFun(Ident::ConnectDone, state)); +- AsyncJob::Start(new Comm::ConnOpener(state->conn, call, Ident::TheConfig.timeout)); ++ const auto connOpener = new Comm::ConnOpener(identConn, call, Ident::TheConfig.timeout); ++ state->connWait.start(connOpener, call); + } + + void +diff --git a/src/log/TcpLogger.cc b/src/log/TcpLogger.cc +index 2be2f6aea..3be1cc5db 100644 +--- a/src/log/TcpLogger.cc ++++ b/src/log/TcpLogger.cc +@@ -256,13 +256,16 @@ Log::TcpLogger::doConnect() + + typedef CommCbMemFunT<TcpLogger, CommConnectCbParams> Dialer; + AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, this, Log::TcpLogger::connectDone); +- AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2)); ++ const auto cs = new Comm::ConnOpener(futureConn, call, 2); ++ connWait.start(cs, call); + } + + /// Comm::ConnOpener callback + void + Log::TcpLogger::connectDone(const CommConnectCbParams ¶ms) + { ++ connWait.finish(); ++ + if (params.flag != Comm::OK) { + const double delay = 0.5; // seconds + if (connectFailures++ % 100 == 0) { +@@ -367,7 +370,10 @@ Log::TcpLogger::handleClosure(const CommCloseCbParams &) + { + assert(inCall != NULL); + closer = NULL; +- conn = NULL; ++ if (conn) { ++ conn->noteClosure(); ++ conn = nullptr; ++ } + // in all current use cases, we should not try to reconnect + mustStop("Log::TcpLogger::handleClosure"); + } +diff --git a/src/log/TcpLogger.h b/src/log/TcpLogger.h +index 1be2113fe..ec7ca5f7b 100644 +--- a/src/log/TcpLogger.h ++++ b/src/log/TcpLogger.h +@@ -10,6 +10,8 @@ + #define _SQUID_SRC_LOG_TCPLOGGER_H + + #include "base/AsyncJob.h" ++#include "base/JobWait.h" ++#include "comm/forward.h" + #include "ip/Address.h" + + #include <list> +@@ -103,6 +105,9 @@ private: + Ip::Address remote; ///< where the remote logger expects our records + AsyncCall::Pointer closer; ///< handles unexpected/external conn closures + ++ /// waits for a connection to the remote logger to be established/opened ++ JobWaitComm::ConnOpener connWait; ++ + uint64_t connectFailures; ///< number of sequential connection failures + uint64_t drops; ///< number of records dropped during the current outage + }; +diff --git a/src/mgr/Forwarder.cc b/src/mgr/Forwarder.cc +index 8edba1d78..e4da4ca08 100644 +--- a/src/mgr/Forwarder.cc ++++ b/src/mgr/Forwarder.cc +@@ -100,7 +100,11 @@ void + Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &) + { + debugs(16, 5, HERE); +- conn = NULL; // needed? ++ closer = nullptr; ++ if (conn) { ++ conn->noteClosure(); ++ conn = nullptr; ++ } + mustStop("commClosed"); + } + +diff --git a/src/mgr/Inquirer.cc b/src/mgr/Inquirer.cc +index ba41d7aa6..dea0452de 100644 +--- a/src/mgr/Inquirer.cc ++++ b/src/mgr/Inquirer.cc +@@ -107,11 +107,14 @@ Mgr::Inquirer::noteWroteHeader(const CommIoCbParams& params) + + /// called when the HTTP client or some external force closed our socket + void +-Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params) ++Mgr::Inquirer::noteCommClosed(const CommCloseCbParams &) + { + debugs(16, 5, HERE); +- Must(!Comm::IsConnOpen(conn) && params.conn.getRaw() == conn.getRaw()); +- conn = NULL; ++ closer = nullptr; ++ if (conn) { ++ conn->noteClosure(); ++ conn = nullptr; ++ } + mustStop("commClosed"); + } + +diff --git a/src/mgr/StoreToCommWriter.cc b/src/mgr/StoreToCommWriter.cc +index e2cccec2f..d3cf73888 100644 +--- a/src/mgr/StoreToCommWriter.cc ++++ b/src/mgr/StoreToCommWriter.cc +@@ -131,7 +131,11 @@ void + Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams &) + { + debugs(16, 6, HERE); +- Must(!Comm::IsConnOpen(clientConnection)); ++ if (clientConnection) { ++ clientConnection->noteClosure(); ++ clientConnection = nullptr; ++ } ++ closer = nullptr; + mustStop("commClosed"); + } + +diff --git a/src/security/PeerConnector.cc b/src/security/PeerConnector.cc +index 4c1401f6e..494edabb8 100644 +--- a/src/security/PeerConnector.cc ++++ b/src/security/PeerConnector.cc +@@ -89,9 +89,15 @@ Security::PeerConnector::start() + void + Security::PeerConnector::commCloseHandler(const CommCloseCbParams ¶ms) + { ++ debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data); ++ + closeHandler = nullptr; ++ if (serverConn) { ++ countFailingConnection(); ++ serverConn->noteClosure(); ++ serverConn = nullptr; ++ } + +- debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data); + const auto err = new ErrorState(ERR_SECURE_CONNECT_FAIL, Http::scServiceUnavailable, request.getRaw(), al); + static const auto d = MakeNamedErrorDetail("TLS_CONNECT_CLOSE"); + err->detailError(d); +@@ -111,6 +117,8 @@ Security::PeerConnector::commTimeoutHandler(const CommTimeoutCbParams &) + bool + Security::PeerConnector::initialize(Security::SessionPointer &serverSession) + { ++ Must(Comm::IsConnOpen(serverConnection())); ++ + Security::ContextPointer ctx(getTlsContext()); + debugs(83, 5, serverConnection() << ", ctx=" << (void*)ctx.get()); + +@@ -162,6 +170,8 @@ Security::PeerConnector::initialize(Security::SessionPointer &serverSession) + void + Security::PeerConnector::recordNegotiationDetails() + { ++ Must(Comm::IsConnOpen(serverConnection())); ++ + const int fd = serverConnection()->fd; + Security::SessionPointer session(fd_table[fd].ssl); + +@@ -180,6 +190,8 @@ Security::PeerConnector::recordNegotiationDetails() + void + Security::PeerConnector::negotiate() + { ++ Must(Comm::IsConnOpen(serverConnection())); ++ + const int fd = serverConnection()->fd; + if (fd_table[fd].closing()) + return; +@@ -224,7 +236,7 @@ Security::PeerConnector::handleNegotiationResult(const Security::IoResult &resul + switch (result.category) { + case Security::IoResult::ioSuccess: + recordNegotiationDetails(); +- if (sslFinalized()) ++ if (sslFinalized() && callback) + sendSuccess(); + return; // we may be gone by now + +@@ -252,6 +264,7 @@ Security::PeerConnector::sslFinalized() + { + #if USE_OPENSSL + if (Ssl::TheConfig.ssl_crt_validator && useCertValidator_) { ++ Must(Comm::IsConnOpen(serverConnection())); + const int fd = serverConnection()->fd; + Security::SessionPointer session(fd_table[fd].ssl); + +@@ -295,6 +308,7 @@ void + Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointer validationResponse) + { + Must(validationResponse != NULL); ++ Must(Comm::IsConnOpen(serverConnection())); + + ErrorDetail::Pointer errDetails; + bool validatorFailed = false; +@@ -317,7 +331,8 @@ Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointe + + if (!errDetails && !validatorFailed) { + noteNegotiationDone(NULL); +- sendSuccess(); ++ if (callback) ++ sendSuccess(); + return; + } + +@@ -343,6 +358,8 @@ Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointe + Security::CertErrors * + Security::PeerConnector::sslCrtvdCheckForErrors(Ssl::CertValidationResponse const &resp, ErrorDetail::Pointer &errDetails) + { ++ Must(Comm::IsConnOpen(serverConnection())); ++ + ACLFilledChecklist *check = NULL; + Security::SessionPointer session(fd_table[serverConnection()->fd].ssl); + +@@ -418,9 +435,11 @@ Security::PeerConnector::negotiateSsl() + void + Security::PeerConnector::noteWantRead() + { +- const int fd = serverConnection()->fd; + debugs(83, 5, serverConnection()); + ++ Must(Comm::IsConnOpen(serverConnection())); ++ const int fd = serverConnection()->fd; ++ + // read timeout to avoid getting stuck while reading from a silent server + typedef CommCbMemFunT<Security::PeerConnector, CommTimeoutCbParams> TimeoutDialer; + AsyncCall::Pointer timeoutCall = JobCallback(83, 5, +@@ -434,8 +453,10 @@ Security::PeerConnector::noteWantRead() + void + Security::PeerConnector::noteWantWrite() + { +- const int fd = serverConnection()->fd; + debugs(83, 5, serverConnection()); ++ Must(Comm::IsConnOpen(serverConnection())); ++ ++ const int fd = serverConnection()->fd; + Comm::SetSelect(fd, COMM_SELECT_WRITE, &NegotiateSsl, new Pointer(this), 0); + return; + } +@@ -452,57 +473,76 @@ Security::PeerConnector::noteNegotiationError(const Security::ErrorDetailPointer + bail(anErr); + } + ++Security::EncryptorAnswer & ++Security::PeerConnector::answer() ++{ ++ assert(callback); ++ const auto dialer = dynamic_cast<CbDialer*>(callback->getDialer()); ++ assert(dialer); ++ return dialer->answer(); ++} ++ + void + Security::PeerConnector::bail(ErrorState *error) + { + Must(error); // or the recepient will not know there was a problem +- Must(callback != NULL); +- CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer()); +- Must(dialer); +- dialer->answer().error = error; ++ answer().error = error; + +- if (const auto p = serverConnection()->getPeer()) +- peerConnectFailed(p); ++ if (const auto failingConnection = serverConn) { ++ countFailingConnection(); ++ disconnect(); ++ failingConnection->close(); ++ } + + callBack(); +- disconnect(); +- +- if (noteFwdPconnUse) +- fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses); +- serverConn->close(); +- serverConn = nullptr; + } + + void + Security::PeerConnector::sendSuccess() + { +- callBack(); ++ assert(Comm::IsConnOpen(serverConn)); ++ answer().conn = serverConn; + disconnect(); ++ callBack(); ++} ++ ++void ++Security::PeerConnector::countFailingConnection() ++{ ++ assert(serverConn); ++ if (const auto p = serverConn->getPeer()) ++ peerConnectFailed(p); ++ // TODO: Calling PconnPool::noteUses() should not be our responsibility. ++ if (noteFwdPconnUse && serverConn->isOpen()) ++ fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses); + } + + void + Security::PeerConnector::disconnect() + { ++ const auto stillOpen = Comm::IsConnOpen(serverConn); ++ + if (closeHandler) { +- comm_remove_close_handler(serverConnection()->fd, closeHandler); ++ if (stillOpen) ++ comm_remove_close_handler(serverConn->fd, closeHandler); + closeHandler = nullptr; + } + +- commUnsetConnTimeout(serverConnection()); ++ if (stillOpen) ++ commUnsetConnTimeout(serverConn); ++ ++ serverConn = nullptr; + } + + void + Security::PeerConnector::callBack() + { +- debugs(83, 5, "TLS setup ended for " << serverConnection()); ++ debugs(83, 5, "TLS setup ended for " << answer().conn); + + AsyncCall::Pointer cb = callback; + // Do this now so that if we throw below, swanSong() assert that we _tried_ + // to call back holds. + callback = NULL; // this should make done() true +- CbDialer *dialer = dynamic_cast<CbDialer*>(cb->getDialer()); +- Must(dialer); +- dialer->answer().conn = serverConnection(); + ScheduleCallHere(cb); + } + +@@ -511,8 +551,9 @@ Security::PeerConnector::swanSong() + { + // XXX: unregister fd-closure monitoring and CommSetSelect interest, if any + AsyncJob::swanSong(); +- if (callback != NULL) { // paranoid: we have left the caller waiting +- debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while connecting to a cache_peer or origin server"); ++ ++ if (callback) { ++ // job-ending emergencies like handleStopRequest() or callException() + const auto anErr = new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al); + bail(anErr); + assert(!callback); +@@ -533,7 +574,7 @@ Security::PeerConnector::status() const + buf.append("Stopped, reason:", 16); + buf.appendf("%s",stopReason); + } +- if (serverConn != NULL) ++ if (Comm::IsConnOpen(serverConn)) + buf.appendf(" FD %d", serverConn->fd); + buf.appendf(" %s%u]", id.prefix(), id.value); + buf.terminate(); +@@ -581,15 +622,18 @@ Security::PeerConnector::startCertDownloading(SBuf &url) + PeerConnectorCertDownloaderDialer(&Security::PeerConnector::certDownloadingDone, this)); + + const auto dl = new Downloader(url, certCallback, XactionInitiator::initCertFetcher, certDownloadNestingLevel() + 1); +- AsyncJob::Start(dl); ++ certDownloadWait.start(dl, certCallback); + } + + void + Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus) + { ++ certDownloadWait.finish(); ++ + ++certsDownloads; + debugs(81, 5, "Certificate downloading status: " << downloadStatus << " certificate size: " << obj.length()); + ++ Must(Comm::IsConnOpen(serverConnection())); + const auto &sconn = *fd_table[serverConnection()->fd].ssl; + + // Parse Certificate. Assume that it is in DER format. +@@ -642,6 +686,7 @@ Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus) + void + Security::PeerConnector::handleMissingCertificates(const Security::IoResult &ioResult) + { ++ Must(Comm::IsConnOpen(serverConnection())); + auto &sconn = *fd_table[serverConnection()->fd].ssl; + + // We download the missing certificate(s) once. We would prefer to clear +diff --git a/src/security/PeerConnector.h b/src/security/PeerConnector.h +index 942f8627a..10700d796 100644 +--- a/src/security/PeerConnector.h ++++ b/src/security/PeerConnector.h +@@ -12,6 +12,7 @@ + #include "acl/Acl.h" + #include "base/AsyncCbdataCalls.h" + #include "base/AsyncJob.h" ++#include "base/JobWait.h" + #include "CommCalls.h" + #include "http/forward.h" + #include "security/EncryptorAnswer.h" +@@ -24,6 +25,7 @@ + #include <queue> + + class ErrorState; ++class Downloader; + class AccessLogEntry; + typedef RefCount<AccessLogEntry> AccessLogEntryPointer; + +@@ -152,6 +154,9 @@ protected: + /// a bail(), sendSuccess() helper: stops monitoring the connection + void disconnect(); + ++ /// updates connection usage history before the connection is closed ++ void countFailingConnection(); ++ + /// If called the certificates validator will not used + void bypassCertValidator() {useCertValidator_ = false;} + +@@ -159,6 +164,9 @@ protected: + /// logging + void recordNegotiationDetails(); + ++ /// convenience method to get to the answer fields ++ EncryptorAnswer &answer(); ++ + HttpRequestPointer request; ///< peer connection trigger or cause + Comm::ConnectionPointer serverConn; ///< TCP connection to the peer + AccessLogEntryPointer al; ///< info for the future access.log entry +@@ -203,6 +211,8 @@ private: + + /// outcome of the last (failed and) suspended negotiation attempt (or nil) + Security::IoResultPointer suspendedError_; ++ ++ JobWait<Downloader> certDownloadWait; ///< waits for the missing certificate to be downloaded + }; + + } // namespace Security +diff --git a/src/security/forward.h b/src/security/forward.h +index dce66e397..26225aae0 100644 +--- a/src/security/forward.h ++++ b/src/security/forward.h +@@ -172,6 +172,7 @@ class ParsedOptions {}; // we never parse/use TLS options in this case + typedef long ParsedPortFlags; + + class PeerConnector; ++class BlindPeerConnector; + class PeerOptions; + + #if USE_OPENSSL +diff --git a/src/servers/FtpServer.cc b/src/servers/FtpServer.cc +index 53f822d2f..e0a10b6b5 100644 +--- a/src/servers/FtpServer.cc ++++ b/src/servers/FtpServer.cc +@@ -61,7 +61,7 @@ Ftp::Server::Server(const MasterXaction::Pointer &xact): + dataConn(), + uploadAvailSize(0), + listener(), +- connector(), ++ dataConnWait(), + reader(), + waitingForOrigin(false), + originDataDownloadAbortedOnError(false) +@@ -1676,11 +1676,11 @@ Ftp::Server::checkDataConnPre() + + // active transfer: open a data connection from Squid to client + typedef CommCbMemFunT<Server, CommConnectCbParams> Dialer; +- connector = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData); +- Comm::ConnOpener *cs = new Comm::ConnOpener(dataConn, connector, +- Config.Timeout.connect); +- AsyncJob::Start(cs); +- return false; // ConnStateData::processFtpRequest waits handleConnectDone ++ AsyncCall::Pointer callback = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData); ++ const auto cs = new Comm::ConnOpener(dataConn->cloneProfile(), callback, ++ Config.Timeout.connect); ++ dataConnWait.start(cs, callback); ++ return false; + } + + /// Check that client data connection is ready for immediate I/O. +@@ -1698,18 +1698,22 @@ Ftp::Server::checkDataConnPost() const + void + Ftp::Server::connectedForData(const CommConnectCbParams ¶ms) + { +- connector = NULL; ++ dataConnWait.finish(); + + if (params.flag != Comm::OK) { +- /* it might have been a timeout with a partially open link */ +- if (params.conn != NULL) +- params.conn->close(); + setReply(425, "Cannot open data connection."); + Http::StreamPointer context = pipeline.front(); + Must(context->http); + Must(context->http->storeEntry() != NULL); ++ // TODO: call closeDataConnection() to reset data conn processing? + } else { +- Must(dataConn == params.conn); ++ // Finalize the details and start owning the supplied connection. ++ assert(params.conn); ++ assert(dataConn); ++ assert(!dataConn->isOpen()); ++ dataConn = params.conn; ++ // XXX: Missing comm_add_close_handler() to track external closures. ++ + Must(Comm::IsConnOpen(params.conn)); + fd_note(params.conn->fd, "active client ftp data"); + } +diff --git a/src/servers/FtpServer.h b/src/servers/FtpServer.h +index fb9ef9081..d67799827 100644 +--- a/src/servers/FtpServer.h ++++ b/src/servers/FtpServer.h +@@ -11,8 +11,10 @@ + #ifndef SQUID_SERVERS_FTP_SERVER_H + #define SQUID_SERVERS_FTP_SERVER_H + ++#include "base/JobWait.h" + #include "base/Lock.h" + #include "client_side.h" ++#include "comm/forward.h" + + namespace Ftp + { +@@ -188,7 +190,11 @@ private: + size_t uploadAvailSize; ///< number of yet unused uploadBuf bytes + + AsyncCall::Pointer listener; ///< set when we are passively listening +- AsyncCall::Pointer connector; ///< set when we are actively connecting ++ ++ /// Waits for an FTP data connection to the client to be established/opened. ++ /// This wait only happens in FTP active mode (via PORT or EPRT). ++ JobWaitComm::ConnOpener dataConnWait; ++ + AsyncCall::Pointer reader; ///< set when we are reading FTP data + + /// whether we wait for the origin data transfer to end +diff --git a/src/snmp/Forwarder.cc b/src/snmp/Forwarder.cc +index 836eb5772..259bb0441 100644 +--- a/src/snmp/Forwarder.cc ++++ b/src/snmp/Forwarder.cc +@@ -53,6 +53,7 @@ Snmp::Forwarder::noteCommClosed(const CommCloseCbParams& params) + { + debugs(49, 5, HERE); + Must(fd == params.fd); ++ closer = nullptr; + fd = -1; + mustStop("commClosed"); + } +@@ -68,8 +69,7 @@ void + Snmp::Forwarder::handleException(const std::exception& e) + { + debugs(49, 3, HERE << e.what()); +- if (fd >= 0) +- sendError(SNMP_ERR_GENERR); ++ sendError(SNMP_ERR_GENERR); + Ipc::Forwarder::handleException(e); + } + +@@ -78,6 +78,10 @@ void + Snmp::Forwarder::sendError(int error) + { + debugs(49, 3, HERE); ++ ++ if (fd < 0) ++ return; // client gone ++ + Snmp::Request& req = static_castSnmp::Request&(*request); + req.pdu.command = SNMP_PDU_RESPONSE; + req.pdu.errstat = error; +diff --git a/src/snmp/Inquirer.cc b/src/snmp/Inquirer.cc +index 9b6e34405..82f8c4475 100644 +--- a/src/snmp/Inquirer.cc ++++ b/src/snmp/Inquirer.cc +@@ -88,7 +88,11 @@ Snmp::Inquirer::noteCommClosed(const CommCloseCbParams& params) + { + debugs(49, 5, HERE); + Must(!Comm::IsConnOpen(conn) || conn->fd == params.conn->fd); +- conn = NULL; ++ closer = nullptr; ++ if (conn) { ++ conn->noteClosure(); ++ conn = nullptr; ++ } + mustStop("commClosed"); + } + +@@ -102,6 +106,10 @@ void + Snmp::Inquirer::sendResponse() + { + debugs(49, 5, HERE); ++ ++ if (!Comm::IsConnOpen(conn)) ++ return; // client gone ++ + aggrPdu.fixAggregate(); + aggrPdu.command = SNMP_PDU_RESPONSE; + u_char buffer[SNMP_REQUEST_SIZE]; +diff --git a/src/ssl/PeekingPeerConnector.cc b/src/ssl/PeekingPeerConnector.cc +index 9a4055355..89e45435b 100644 +--- a/src/ssl/PeekingPeerConnector.cc ++++ b/src/ssl/PeekingPeerConnector.cc +@@ -27,18 +27,18 @@ CBDATA_NAMESPACED_CLASS_INIT(Ssl, PeekingPeerConnector); + void switchToTunnel(HttpRequest *request, const Comm::ConnectionPointer &clientConn, const Comm::ConnectionPointer &srvConn, const SBuf &preReadServerData); + + void +-Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data) ++Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(const Acl::Answer aclAnswer, void *data) + { + Ssl::PeekingPeerConnector *peerConnect = (Ssl::PeekingPeerConnector *) data; + // Use job calls to add done() checks and other job logic/protections. +- CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, answer); ++ CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, aclAnswer); + } + + void +-Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(Acl::Answer answer) ++Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(const Acl::Answer aclAnswer) + { +- const Ssl::BumpMode finalAction = answer.allowed() ? +- static_castSsl::BumpMode(answer.kind): ++ const Ssl::BumpMode finalAction = aclAnswer.allowed() ? ++ static_castSsl::BumpMode(aclAnswer.kind): + checkForPeekAndSpliceGuess(); + checkForPeekAndSpliceMatched(finalAction); + } +@@ -106,10 +106,8 @@ Ssl::PeekingPeerConnector::checkForPeekAndSpliceMatched(const Ssl::BumpMode acti + splice = true; + // Ssl Negotiation stops here. Last SSL checks for valid certificates + // and if done, switch to tunnel mode +- if (sslFinalized()) { +- debugs(83,5, "Abort NegotiateSSL on FD " << serverConn->fd << " and splice the connection"); ++ if (sslFinalized() && callback) + callBack(); +- } + } + } + +@@ -272,8 +270,11 @@ Ssl::PeekingPeerConnector::startTunneling() + auto b = SSL_get_rbio(session.get()); + auto srvBio = static_castSsl::ServerBio*(BIO_get_data(b)); + ++ debugs(83, 5, "will tunnel instead of negotiating TLS"); + switchToTunnel(request.getRaw(), clientConn, serverConn, srvBio->rBufData()); +- tunnelInsteadOfNegotiating(); ++ answer().tunneled = true; ++ disconnect(); ++ callBack(); + } + + void +@@ -397,13 +398,3 @@ Ssl::PeekingPeerConnector::serverCertificateVerified() + } + } + +-void +-Ssl::PeekingPeerConnector::tunnelInsteadOfNegotiating() +-{ +- Must(callback != NULL); +- CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer()); +- Must(dialer); +- dialer->answer().tunneled = true; +- debugs(83, 5, "The SSL negotiation with server aborted"); +-} +- +diff --git a/src/ssl/PeekingPeerConnector.h b/src/ssl/PeekingPeerConnector.h +index 3c86b887d..0145e77f9 100644 +--- a/src/ssl/PeekingPeerConnector.h ++++ b/src/ssl/PeekingPeerConnector.h +@@ -51,7 +51,7 @@ public: + void checkForPeekAndSplice(); + + /// Callback function for ssl_bump acl check in step3 SSL bump step. +- void checkForPeekAndSpliceDone(Acl::Answer answer); ++ void checkForPeekAndSpliceDone(Acl::Answer); + + /// Handles the final bumping decision. + void checkForPeekAndSpliceMatched(const Ssl::BumpMode finalMode); +@@ -67,7 +67,7 @@ public: + void startTunneling(); + + /// A wrapper function for checkForPeekAndSpliceDone for use with acl +- static void cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data); ++ static void cbCheckForPeekAndSpliceDone(Acl::Answer, void *data); + + private: + +diff --git a/src/tests/stub_libcomm.cc b/src/tests/stub_libcomm.cc +index 45aa6deb2..8fdb40bbd 100644 +--- a/src/tests/stub_libcomm.cc ++++ b/src/tests/stub_libcomm.cc +@@ -22,9 +22,9 @@ void Comm::AcceptLimiter::kick() STUB + #include "comm/Connection.h" + Comm::Connection::Connection() STUB + Comm::Connection::~Connection() STUB +-Comm::ConnectionPointer Comm::Connection::cloneIdentDetails() const STUB_RETVAL(nullptr) +-Comm::ConnectionPointer Comm::Connection::cloneDestinationDetails() const STUB_RETVAL(nullptr) ++Comm::ConnectionPointer Comm::Connection::cloneProfile() const STUB_RETVAL(nullptr) + void Comm::Connection::close() STUB ++void Comm::Connection::noteClosure() STUB + CachePeer * Comm::Connection::getPeer() const STUB_RETVAL(NULL) + void Comm::Connection::setPeer(CachePeer * p) STUB + ScopedId Comm::Connection::codeContextGist() const STUB_RETVAL(id.detach()) +diff --git a/src/tests/stub_libsecurity.cc b/src/tests/stub_libsecurity.cc +index aa9cf0b36..176cf337e 100644 +--- a/src/tests/stub_libsecurity.cc ++++ b/src/tests/stub_libsecurity.cc +@@ -9,6 +9,7 @@ + #include "squid.h" + #include "AccessLogEntry.h" + #include "comm/Connection.h" ++#include "Downloader.h" + #include "HttpRequest.h" + + #define STUB_API "security/libsecurity.la" +@@ -88,7 +89,9 @@ void PeerConnector::bail(ErrorState *) STUB + void PeerConnector::sendSuccess() STUB + void PeerConnector::callBack() STUB + void PeerConnector::disconnect() STUB ++void PeerConnector::countFailingConnection() STUB + void PeerConnector::recordNegotiationDetails() STUB ++EncryptorAnswer &PeerConnector::answer() STUB_RETREF(EncryptorAnswer) + } + + #include "security/PeerOptions.h" +diff --git a/src/tunnel.cc b/src/tunnel.cc +index 386a0ecd6..4fc5abdef 100644 +--- a/src/tunnel.cc ++++ b/src/tunnel.cc +@@ -11,6 +11,7 @@ + #include "squid.h" + #include "acl/FilledChecklist.h" + #include "base/CbcPointer.h" ++#include "base/JobWait.h" + #include "CachePeer.h" + #include "cbdata.h" + #include "client_side.h" +@@ -180,9 +181,6 @@ public: + SBuf preReadClientData; + SBuf preReadServerData; + time_t startTime; ///< object creation time, before any peer selection/connection attempts +- /// Whether we are waiting for the CONNECT request/response exchange with the peer. +- bool waitingForConnectExchange; +- HappyConnOpenerPointer connOpener; ///< current connection opening job + ResolvedPeersPointer destinations; ///< paths for forwarding the request + bool destinationsFound; ///< At least one candidate path found + /// whether another destination may be still attempted if the TCP connection +@@ -191,10 +189,15 @@ public: + // TODO: remove after fixing deferred reads in TunnelStateData::copyRead() + CodeContext::Pointer codeContext; ///< our creator context + +- // AsyncCalls which we set and may need cancelling. +- struct { +- AsyncCall::Pointer connector; ///< a call linking us to the ConnOpener producing serverConn. +- } calls; ++ /// waits for a transport connection to the peer to be established/opened ++ JobWait<HappyConnOpener> transportWait; ++ ++ /// waits for the established transport connection to be secured/encrypted ++ JobWaitSecurity::PeerConnector encryptionWait; ++ ++ /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated ++ /// over the (encrypted, if needed) transport connection to that cache_peer ++ JobWaitHttp::Tunneler peerWait; + + void copyRead(Connection &from, IOCB *completion); + +@@ -212,12 +215,6 @@ public: + /// when all candidate destinations have been tried and all have failed + void noteConnection(HappyConnOpenerAnswer &); + +- /// whether we are waiting for HappyConnOpener +- /// same as calls.connector but may differ from connOpener.valid() +- bool opening() const { return connOpener.set(); } +- +- void cancelOpening(const char *reason); +- + /// Start using an established connection + void connectDone(const Comm::ConnectionPointer &conn, const char *origin, const bool reused); + +@@ -266,6 +263,9 @@ private: + + /// \returns whether the request should be retried (nil) or the description why it should not + const char *checkRetry(); ++ /// whether the successfully selected path destination or the established ++ /// server connection is still in use ++ bool usingDestination() const; + + /// details of the "last tunneling attempt" failure (if it failed) + ErrorState *savedError = nullptr; +@@ -275,6 +275,8 @@ private: + + void deleteThis(); + ++ void cancelStep(const char *reason); ++ + public: + bool keepGoingAfterRead(size_t len, Comm::Flag errcode, int xerrno, Connection &from, Connection &to); + void copy(size_t len, Connection &from, Connection &to, IOCB *); +@@ -357,7 +359,6 @@ TunnelStateData::deleteThis() + + TunnelStateData::TunnelStateData(ClientHttpRequest *clientRequest) : + startTime(squid_curtime), +- waitingForConnectExchange(false), + destinations(new ResolvedPeers()), + destinationsFound(false), + retriable(true), +@@ -390,8 +391,7 @@ TunnelStateData::~TunnelStateData() + debugs(26, 3, "TunnelStateData destructed this=" << this); + assert(noConnections()); + xfree(url); +- if (opening()) +- cancelOpening("~TunnelStateData"); ++ cancelStep("~TunnelStateData"); + delete savedError; + } + +@@ -904,7 +904,10 @@ TunnelStateData::copyServerBytes() + static void + tunnelStartShoveling(TunnelStateData *tunnelState) + { +- assert(!tunnelState->waitingForConnectExchange); ++ assert(!tunnelState->transportWait); ++ assert(!tunnelState->encryptionWait); ++ assert(!tunnelState->peerWait); ++ + assert(tunnelState->server.conn); + AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", + CommTimeoutCbPtrFun(tunnelTimeout, tunnelState)); +@@ -962,6 +965,7 @@ tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *, size_t len + void + TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer) + { ++ peerWait.finish(); + server.len = 0; + + if (logTag_ptr) +@@ -970,13 +974,11 @@ TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer) + if (answer.peerResponseStatus != Http::scNone) + *status_ptr = answer.peerResponseStatus; + +- waitingForConnectExchange = false; +- + auto sawProblem = false; + + if (!answer.positive()) { + sawProblem = true; +- Must(!Comm::IsConnOpen(answer.conn)); ++ assert(!answer.conn); + } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) { + sawProblem = true; + closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone"); +@@ -1042,8 +1044,7 @@ tunnelErrorComplete(int fd/*const Comm::ConnectionPointer &*/, void *data, size_ + void + TunnelStateData::noteConnection(HappyConnOpener::Answer &answer) + { +- calls.connector = nullptr; +- connOpener.clear(); ++ transportWait.finish(); + + ErrorState *error = nullptr; + if ((error = answer.error.get())) { +@@ -1167,7 +1168,7 @@ TunnelStateData::secureConnectionToPeer(const Comm::ConnectionPointer &conn) + AsyncCall::Pointer callback = asyncCall(5,4, "TunnelStateData::noteSecurityPeerConnectorAnswer", + MyAnswerDialer(&TunnelStateData::noteSecurityPeerConnectorAnswer, this)); + const auto connector = new Security::BlindPeerConnector(request, conn, callback, al); +- AsyncJob::Start(connector); // will call our callback ++ encryptionWait.start(connector, callback); + } + + /// starts a preparation step for an established connection; retries on failures +@@ -1194,9 +1195,12 @@ TunnelStateData::advanceDestination(const char *stepDescription, const Comm::Con + void + TunnelStateData::noteSecurityPeerConnectorAnswer(Security::EncryptorAnswer &answer) + { ++ encryptionWait.finish(); ++ + ErrorState *error = nullptr; ++ assert(!answer.tunneled); + if ((error = answer.error.get())) { +- Must(!Comm::IsConnOpen(answer.conn)); ++ assert(!answer.conn); + answer.error.clear(); + } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) { + error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request.getRaw(), al); +@@ -1223,8 +1227,6 @@ TunnelStateData::connectedToPeer(const Comm::ConnectionPointer &conn) + void + TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn) + { +- assert(!waitingForConnectExchange); +- + AsyncCall::Pointer callback = asyncCall(5,4, + "TunnelStateData::tunnelEstablishmentDone", + Http::Tunneler::CbDialer<TunnelStateData>(&TunnelStateData::tunnelEstablishmentDone, this)); +@@ -1232,9 +1234,7 @@ TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn) + #if USE_DELAY_POOLS + tunneler->setDelayId(server.delayId); + #endif +- AsyncJob::Start(tunneler); +- waitingForConnectExchange = true; +- // and wait for the tunnelEstablishmentDone() call ++ peerWait.start(tunneler, callback); + } + + void +@@ -1252,14 +1252,14 @@ TunnelStateData::noteDestination(Comm::ConnectionPointer path) + + destinations->addPath(path); + +- if (Comm::IsConnOpen(server.conn)) { ++ if (usingDestination()) { + // We are already using a previously opened connection but also + // receiving destinations in case we need to re-forward. +- Must(!opening()); ++ Must(!transportWait); + return; + } + +- if (opening()) { ++ if (transportWait) { + notifyConnOpener(); + return; // and continue to wait for tunnelConnectDone() callback + } +@@ -1289,17 +1289,23 @@ TunnelStateData::noteDestinationsEnd(ErrorState *selectionError) + // if all of them fail, tunneling as whole will fail + Must(!selectionError); // finding at least one path means selection succeeded + +- if (Comm::IsConnOpen(server.conn)) { ++ if (usingDestination()) { + // We are already using a previously opened connection but also + // receiving destinations in case we need to re-forward. +- Must(!opening()); ++ Must(!transportWait); + return; + } + +- Must(opening()); // or we would be stuck with nothing to do or wait for ++ Must(transportWait); // or we would be stuck with nothing to do or wait for + notifyConnOpener(); + } + ++bool ++TunnelStateData::usingDestination() const ++{ ++ return encryptionWait || peerWait || Comm::IsConnOpen(server.conn); ++} ++ + /// remembers an error to be used if there will be no more connection attempts + void + TunnelStateData::saveError(ErrorState *error) +@@ -1320,8 +1326,7 @@ TunnelStateData::sendError(ErrorState *finalError, const char *reason) + if (request) + request->hier.stopPeerClock(false); + +- if (opening()) +- cancelOpening(reason); ++ cancelStep(reason); + + assert(finalError); + +@@ -1339,18 +1344,15 @@ TunnelStateData::sendError(ErrorState *finalError, const char *reason) + errorSend(client.conn, finalError); + } + +-/// Notify connOpener that we no longer need connections. We do not have to do +-/// this -- connOpener would eventually notice on its own, but notifying reduces +-/// waste and speeds up spare connection opening for other transactions (that +-/// could otherwise wait for this transaction to use its spare allowance). ++/// Notify a pending subtask, if any, that we no longer need its help. We do not ++/// have to do this -- the subtask job will eventually end -- but ending it ++/// earlier reduces waste and may reduce DoS attack surface. + void +-TunnelStateData::cancelOpening(const char *reason) ++TunnelStateData::cancelStep(const char *reason) + { +- assert(calls.connector); +- calls.connector->cancel(reason); +- calls.connector = nullptr; +- notifyConnOpener(); +- connOpener.clear(); ++ transportWait.cancel(reason); ++ encryptionWait.cancel(reason); ++ peerWait.cancel(reason); + } + + void +@@ -1360,15 +1362,14 @@ TunnelStateData::startConnecting() + request->hier.startPeerClock(); + + assert(!destinations->empty()); +- assert(!opening()); +- calls.connector = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this)); +- const auto cs = new HappyConnOpener(destinations, calls.connector, request, startTime, 0, al); ++ assert(!usingDestination()); ++ AsyncCall::Pointer callback = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this)); ++ const auto cs = new HappyConnOpener(destinations, callback, request, startTime, 0, al); + cs->setHost(request->url.host()); + cs->setRetriable(false); + cs->allowPersistent(false); + destinations->notificationPending = true; // start() is async +- connOpener = cs; +- AsyncJob::Start(cs); ++ transportWait.start(cs, callback); + } + + /// send request on an existing connection dedicated to the requesting client +@@ -1417,7 +1418,7 @@ TunnelStateData::Connection::setDelayId(DelayId const &newDelay) + + #endif + +-/// makes sure connOpener knows that destinations have changed ++/// makes sure connection opener knows that the destinations have changed + void + TunnelStateData::notifyConnOpener() + { +@@ -1425,7 +1426,7 @@ TunnelStateData::notifyConnOpener() + debugs(17, 7, "reusing pending notification"); + } else { + destinations->notificationPending = true; +- CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange); ++ CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange); + } + } + +diff --git a/src/whois.cc b/src/whois.cc +index 38dfbacde..306ea5c62 100644 +--- a/src/whois.cc ++++ b/src/whois.cc +@@ -182,6 +182,7 @@ whoisClose(const CommCloseCbParams ¶ms) + { + WhoisState *p = (WhoisState *)params.data; + debugs(75, 3, "whoisClose: FD " << params.fd); ++ // We do not own a Connection. Assume that FwdState is also monitoring. + p->entry->unlock("whoisClose"); + delete p; + }