public inbox for development@lists.ipfire.org
 help / color / mirror / Atom feed
* [PATCH] squid 5.4: Latest patch - Bug #5055 - from upstream
@ 2022-02-11  8:34 Matthias Fischer
  2022-02-11 10:09 ` Peter Müller
  0 siblings, 1 reply; 2+ messages in thread
From: Matthias Fischer @ 2022-02-11  8:34 UTC (permalink / raw)
  To: development

[-- Attachment #1: Type: text/plain, Size: 150816 bytes --]

Please note:
One day after squid 5.4 was released, this extensive patch followed which
I publish separately. It seems to be important - perhaps we should
consider taking this one in account.

Building and testing was ok - so far no problems or crashes occurred.
Running normal.

Signed-off-by: Matthias Fischer <matthias.fischer(a)ipfire.org>
---
 lfs/squid                                     |    1 +
 ...estinationsEnd_exception_opening_967.patch | 3825 +++++++++++++++++
 2 files changed, 3826 insertions(+)
 create mode 100644 src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch

diff --git a/lfs/squid b/lfs/squid
index 034f4a3e9..6762a0218 100644
--- a/lfs/squid
+++ b/lfs/squid
@@ -78,6 +78,7 @@ $(TARGET) : $(patsubst %,$(DIR_DL)/%,$(objects))
 	@rm -rf $(DIR_APP) && cd $(DIR_SRC) && tar xaf $(DIR_DL)/$(DL_FILE)
 
 	cd $(DIR_APP) && patch -Np1 < $(DIR_SRC)/src/patches/squid/01_squid-gcc11.patch
+	cd $(DIR_APP) && patch -Np1 < $(DIR_SRC)/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch
 
 	cd $(DIR_APP) && autoreconf -vfi
 	cd $(DIR_APP)/libltdl && autoreconf -vfi
diff --git a/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch b/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch
new file mode 100644
index 000000000..9f592c7e8
--- /dev/null
+++ b/src/patches/squid/02_Bug_5055_FATAL_FwdState_noteDestinationsEnd_exception_opening_967.patch
@@ -0,0 +1,3825 @@
+commit 1332f8d606485b5a2f57277634c2f6f7855bd9a6 (refs/remotes/origin/v5, refs/remotes/github/v5, refs/heads/v5)
+Author: Alex Rousskov <rousskov(a)measurement-factory.com>
+Date:   2022-02-08 03:56:43 -0500
+
+    Bug 5055: FATAL FwdState::noteDestinationsEnd exception: opening (#967)
+    
+    * Bug 5055: FATAL FwdState::noteDestinationsEnd exception: opening
+    
+    The bug was caused by commit 25b0ce4. Other known symptoms are:
+    
+        assertion failed: store.cc:1793: "isEmpty()"
+        assertion failed: FwdState.cc:501: "serverConnection() == conn"
+        assertion failed: FwdState.cc:1037: "!opening()"
+    
+    This change has several overlapping parts. Unfortunately, merging
+    individual parts is both difficult and likely to cause crashes.
+    
+    ## Part 1: Bug 5055.
+    
+    FwdState used to check serverConn to decide whether to open a connection
+    to forward the request. Since commit 25b0ce4, a nil serverConn pointer
+    no longer implies that a new connection should be opened: FwdState
+    helper jobs may already be working on preparing an existing open
+    connection (e.g., sending a CONNECT request or negotiating encryption).
+    
+    Bad serverConn checks in both FwdState::noteDestination() and
+    FwdState::noteDestinationsEnd() methods led to extra connectStart()
+    calls creating two conflicting concurrent helper jobs.
+    
+    To fix this, we replaced direct serverConn inspection with a
+    usingDestination() call which also checks whether we are waiting for a
+    helper job. Testing that fix exposed another set of bugs: The helper job
+    pointers or in-job connections left stale/set after forwarding failures.
+    The changes described below addressed (most of) those problems.
+    
+    ## Part 2: Connection establishing helper jobs and their callbacks
+    
+    A proper fix for Bug 5055 required answering a difficult question: When
+    should a dying job call its callbacks? We only found one answer which
+    required cooperation from the job creator and led to the following
+    rules:
+    
+    * AsyncJob destructors must not call any callbacks.
+    
+    * AsyncJob::swanSong() is responsible for async-calling any remaining
+      (i.e. set, uncalled, and uncancelled) callbacks.
+    
+    * AsyncJob::swanSong() is called (only) for started jobs.
+    
+    * AsyncJob destructing sequence should validate that no callbacks remain
+      uncalled for started jobs.
+    
+    ... where an AsyncJob x is considered "started" if AsyncJob::Start(x)
+    has returned without throwing.
+    
+    A new JobWait class helps job creators follow these rules while keeping
+    track on in-progress helper jobs and killing no-longer-needed helpers.
+    
+    Also fixed very similar bugs in tunnel.cc code.
+    
+    ## Part 3: ConnOpener fixes
+    
+    1. Many ConnOpener users are written to keep a ConnectionPointer to the
+       destination given to ConnOpener. This means that their connection
+       magically opens when ConnOpener successfully connects, before
+       ConnOpener has a chance to notify the user about the changes. Having
+       multiple concurrent connection owners is always dangerous, and the
+       user cannot even have a close handler registered for its now-open
+       connection. When something happens to ConnOpener or its answer, the
+       user job may be in trouble. Now, ConnOpener callers no longer pass
+       Connection objects they own, cloning them as needed. That adjustment
+       required adjustment 2:
+    
+    2. Refactored ConnOpener users to stop assuming that the answer contains
+       a pointer to their connection object. After adjustment 1 above, it
+       does not. HappyConnOpener relied on that assumption quite a bit so we
+       had to refactor to use two custom callback methods instead of one
+       with a complicated if-statement distinguishing prime from spare
+       attempts. This refactoring is an overall improvement because it
+       simplifies the code. Other ConnOpener users just needed to remove a
+       few no longer valid paranoid assertions/Musts.
+    
+    3. ConnOpener users were forced to remember to close params.conn when
+       processing negative answers. Some, naturally, forgot, triggering
+       warnings about orphaned connections (e.g., Ident and TcpLogger).
+       ConnOpener now closes its open connection before sending a negative
+       answer.
+    
+    4. ConnOpener would trigger orphan connection warnings if the job ended
+       after opening the connection but without supplying the connection to
+       the requestor (e.g., because the requestor has gone away). Now,
+       ConnOpener explicitly closes its open connection if it has not been
+       sent to the requestor.
+    
+    Also fixed Comm::ConnOpener::cleanFd() debugging that was incorrectly
+    saying that the method closes the temporary descriptor.
+    
+    Also fixed ConnOpener callback's syncWithComm(): The stale
+    CommConnectCbParams override was testing unused (i.e. always negative)
+    CommConnectCbParams::fd and was trying to cancel the callback that most
+    (possibly all) recipients rely on: ConnOpener users expect a negative
+    answer rather than no answer at all.
+    
+    Also, after comparing the needs of two old/existing and a temporary
+    added ("clone everything") Connection cloning method callers, we decided
+    there is no need to maintain three different methods. All existing
+    callers should be fine with a single method because none of them suffers
+    from "extra" copying of members that others need. Right now, the new
+    cloneProfile() method copies everything except FD and a few
+    special-purpose members (with documented reasons for not copying).
+    
+    Also added Comm::Connection::cloneDestinationDetails() debugging to
+    simplify tracking dependencies between half-baked Connection objects
+    carrying destination/flags/other metadata and open Connection objects
+    created by ConnOpener using that metadata (which are later delivered to
+    ConnOpener users and, in some cases, replace those half-baked
+    connections mentioned earlier. Long-term, we need to find a better way
+    to express these and other Connection states/stages than comments and
+    debugging messages.
+    
+    ## Part 4: Comm::Connection closure callbacks
+    
+    We improved many closure callbacks to make sure (to the extent possible)
+    that Connection and other objects are in sync with Comm. There are lots
+    of small bugs, inconsistencies, and other problems in Connection closure
+    handlers. It is not clear whether any of those problems could result in
+    serious runtime errors or leaks. In theory, the rest of the code could
+    neutralize their negative side effects. However, even in that case, it
+    was just a matter of time before the next bug will bite us due to stale
+    Connection::fd and such. These changes themselves carry elevated risk,
+    but they get us closer to reliable code as far as Connection maintenance
+    is concerned; otherwise, we will keep chasing their deadly side effects.
+    
+    Long-term, all these manual efforts to keep things in sync should become
+    unnecessary with the introduction of appropriate Connection ownership
+    APIs that automatically maintain the corresponding environments (TODO).
+    
+    ## Part 5: Other notable improvements in the adjusted code
+    
+    Improved Security::PeerConnector::serverConn and
+    Http::Tunneler::connection management, especially when sending negative
+    answers. When sending a negative answer, we would set answer().conn to
+    an open connection, async-send that answer, and then hurry to close the
+    connection using our pointer to the shared Connection object. If
+    everything went according to the plan, the recipient would get a non-nil
+    but closed Connection object. Now, a negative answer simply means no
+    connection at all. Same for a tunneled answer.
+    
+    Refactored ICAP connection-establishing code to to delay Connection
+    ownership until the ICAP connection is fully ready. This change
+    addresses primary Connection ownership concerns (as they apply to this
+    ICAP code) except orphaning of the temporary Connection object by helper
+    job start exceptions (now an explicit XXX). For example, the transaction
+    no longer shares a Connection object with ConnOpener and
+    IcapPeerConnector jobs.
+    
+    Probably fixed a bug where PeerConnector::negotiate() assumed that a
+    sslFinalized() does not return true after callBack(). It may (e.g., when
+    CertValidationHelper::Submit() throws). Same for
+    PeekingPeerConnector::checkForPeekAndSpliceMatched().
+    
+    Fixed FwdState::advanceDestination() bug that did not save
+    ERR_GATEWAY_FAILURE details and "lost" the address of that failed
+    destination, making it unavailable to future retries (if any).
+    
+    Polished PeerPoolMgr, Ident, and Gopher code to be able to fix similar
+    job callback and connection management issues.
+    
+    Polished AsyncJob::Start() API. Start() used to return a job pointer,
+    but that was a bad idea:
+    
+    * It implies that a failed Start() will return a nil pointer, and that
+      the caller should check the result. Neither is true.
+    
+    * It encourages callers to dereference the returned pointer to further
+      adjust the job. That technically works (today) but violates the rules
+      of communicating with an async job. The Start() method is the boundary
+      after which the job is deemed asynchronous.
+    
+    Also removed old "and wait for..." post-Start() comments because the
+    code itself became clear enough, and those comments were becoming
+    increasingly stale (because they duplicated the callback names above
+    them).
+    
+    Fix Tunneler and PeerConnector handling of last-resort callback
+    requirements. Events like handleStopRequest() and callException() stop
+    the job but should not be reported as a BUG (e.g., it would be up to the
+    callException() to decide how to report the caught exception). There
+    might (or there will) be other, similar cases where the job is stopped
+    prematurely for some non-BUG reason beyond swanSong() knowledge. The
+    existence of non-bug cases does not mean there could be no bugs worth
+    reporting here, but until they can be identified more reliably than all
+    these benign/irrelevant cases, reporting no BUGs is a (much) lesser
+    evil.
+    
+    TODO: Revise AsyncJob::doneAll(). Many of its overrides are written to
+    check for both positive (i.e. mission accomplished) and negative (i.e.
+    mission cancelled or cannot be accomplished) conditions, but the latter
+    is usually unnecessary, especially after we added handleStopRequest()
+    API to properly support external job cancellation events. Many doneAll()
+    overrides can probably be greatly simplified.
+    
+    ----
+    
+    Cherry picked SQUID-568-premature-serverconn-use-v5 commit 22b5f78.
+    
+    * fixup: Cherry-picked SQUID-568-premature-serverconn-use PR-time fixes
+    
+    In git log order:
+    * e64a6c1: Undone an out-of-scope change and added a missing 'auto'
+    * aeaf83d: Fixed an 'unused parameter' error
+    * f49d009: fixup: No explicit destructors with implicit copying methods
+    * c30c37f: Removed an unnecessary explicit copy constructor
+    * 012f5ec: Excluded Connection::rfc931 from cloning
+    * 366c78a: ICAP: do not set connect_timeout on the established conn...
+    
+    This branch is now in sync with SQUID-568-premature-serverconn-use (S)
+    commit e64a6c1 (except for official changes merged from master/v6 into S
+    closer to the end of PR 877 work (i.e. S' merge commit 0a7432a).
+    
+    * Fix FATAL ServiceRep::putConnection exception: theBusyConns > 0
+    
+        FATAL: check failed: theBusyConns > 0
+            exception location: ServiceRep.cc(163) putConnection
+    
+    Since master/v6 commit 2b6b1bc, a timeout on a ready-to-shovel
+    Squid-service ICAP connection was decrementing theBusyConns level one
+    extra time because Adaptation::Icap::Xaction::noteCommTimedout() started
+    calling both noteConnectionFailed() and closeConnection(). Depending on
+    the actual theBusyConns level, the extra decrement could result in FATAL
+    errors later, when putConnection() was called (for a different ICAP
+    transaction) with zero theBusyConns in an exception-unprotected context.
+    
+    Throughout these changes, Xaction still counts the above timeouts as a
+    service failure. That is done by calling ServiceRep::noteFailure() from
+    Xaction::callException(), including in timeout cases described above.
+    
+    ----
+    
+    Cherry-picked master/v6 commit a8ac892.
+
+diff --git a/src/BodyPipe.cc b/src/BodyPipe.cc
+index 9abe7d48b..37a3b32cd 100644
+--- a/src/BodyPipe.cc
++++ b/src/BodyPipe.cc
+@@ -335,6 +335,7 @@ BodyPipe::startAutoConsumptionIfNeeded()
+         return;
+ 
+     theConsumer = new BodySink(this);
++    AsyncJob::Start(theConsumer);
+     debugs(91,7, HERE << "starting auto consumption" << status());
+     scheduleBodyDataNotification();
+ }
+diff --git a/src/CommCalls.cc b/src/CommCalls.cc
+index 4cd503dcb..cca23f6e3 100644
+--- a/src/CommCalls.cc
++++ b/src/CommCalls.cc
+@@ -53,6 +53,9 @@ CommAcceptCbParams::CommAcceptCbParams(void *aData):
+ {
+ }
+ 
++// XXX: Add CommAcceptCbParams::syncWithComm(). Adjust syncWithComm() API if all
++// implementations always return true.
++
+ void
+ CommAcceptCbParams::print(std::ostream &os) const
+ {
+@@ -72,13 +75,24 @@ CommConnectCbParams::CommConnectCbParams(void *aData):
+ bool
+ CommConnectCbParams::syncWithComm()
+ {
+-    // drop the call if the call was scheduled before comm_close but
+-    // is being fired after comm_close
+-    if (fd >= 0 && fd_table[fd].closing()) {
+-        debugs(5, 3, HERE << "dropping late connect call: FD " << fd);
+-        return false;
++    assert(conn);
++
++    // change parameters if this is a successful callback that was scheduled
++    // after its Comm-registered connection started to close
++
++    if (flag != Comm::OK) {
++        assert(!conn->isOpen());
++        return true; // not a successful callback; cannot go out of sync
+     }
+-    return true; // now we are in sync and can handle the call
++
++    assert(conn->isOpen());
++    if (!fd_table[conn->fd].closing())
++        return true; // no closing in progress; in sync (for now)
++
++    debugs(5, 3, "converting to Comm::ERR_CLOSING: " << conn);
++    conn->noteClosure();
++    flag = Comm::ERR_CLOSING;
++    return true; // now the callback is in sync with Comm again
+ }
+ 
+ /* CommIoCbParams */
+diff --git a/src/Downloader.cc b/src/Downloader.cc
+index 298c3a836..012387fb8 100644
+--- a/src/Downloader.cc
++++ b/src/Downloader.cc
+@@ -81,6 +81,10 @@ void
+ Downloader::swanSong()
+ {
+     debugs(33, 6, this);
++
++    if (callback_) // job-ending emergencies like handleStopRequest() or callException()
++        callBack(Http::scInternalServerError);
++
+     if (context_) {
+         context_->finished();
+         context_ = nullptr;
+@@ -251,6 +255,7 @@ Downloader::downloadFinished()
+ void
+ Downloader::callBack(Http::StatusCode const statusCode)
+ {
++    assert(callback_);
+     CbDialer *dialer = dynamic_cast<CbDialer*>(callback_->getDialer());
+     Must(dialer);
+     dialer->status = statusCode;
+diff --git a/src/FwdState.cc b/src/FwdState.cc
+index b721017c3..b69e60c7c 100644
+--- a/src/FwdState.cc
++++ b/src/FwdState.cc
+@@ -207,26 +207,22 @@ FwdState::stopAndDestroy(const char *reason)
+ {
+     debugs(17, 3, "for " << reason);
+ 
+-    if (opening())
+-        cancelOpening(reason);
++    cancelStep(reason);
+ 
+     PeerSelectionInitiator::subscribed = false; // may already be false
+     self = nullptr; // we hope refcounting destroys us soon; may already be nil
+     /* do not place any code here as this object may be gone by now */
+ }
+ 
+-/// Notify connOpener that we no longer need connections. We do not have to do
+-/// this -- connOpener would eventually notice on its own, but notifying reduces
+-/// waste and speeds up spare connection opening for other transactions (that
+-/// could otherwise wait for this transaction to use its spare allowance).
++/// Notify a pending subtask, if any, that we no longer need its help. We do not
++/// have to do this -- the subtask job will eventually end -- but ending it
++/// earlier reduces waste and may reduce DoS attack surface.
+ void
+-FwdState::cancelOpening(const char *reason)
++FwdState::cancelStep(const char *reason)
+ {
+-    assert(calls.connector);
+-    calls.connector->cancel(reason);
+-    calls.connector = nullptr;
+-    notifyConnOpener();
+-    connOpener.clear();
++    transportWait.cancel(reason);
++    encryptionWait.cancel(reason);
++    peerWait.cancel(reason);
+ }
+ 
+ #if STRICT_ORIGINAL_DST
+@@ -348,8 +344,7 @@ FwdState::~FwdState()
+ 
+     entry = NULL;
+ 
+-    if (opening())
+-        cancelOpening("~FwdState");
++    cancelStep("~FwdState");
+ 
+     if (Comm::IsConnOpen(serverConn))
+         closeServerConnection("~FwdState");
+@@ -501,8 +496,17 @@ FwdState::fail(ErrorState * errorState)
+     if (!errorState->request)
+         errorState->request = request;
+ 
+-    if (err->type != ERR_ZERO_SIZE_OBJECT)
+-        return;
++    if (err->type == ERR_ZERO_SIZE_OBJECT)
++        reactToZeroSizeObject();
++
++    destinationReceipt = nullptr; // may already be nil
++}
++
++/// ERR_ZERO_SIZE_OBJECT requires special adjustments
++void
++FwdState::reactToZeroSizeObject()
++{
++    assert(err->type == ERR_ZERO_SIZE_OBJECT);
+ 
+     if (pconnRace == racePossible) {
+         debugs(17, 5, HERE << "pconn race happened");
+@@ -566,6 +570,8 @@ FwdState::complete()
+ 
+         if (Comm::IsConnOpen(serverConn))
+             unregister(serverConn);
++        serverConn = nullptr;
++        destinationReceipt = nullptr;
+ 
+         storedWholeReply_ = nullptr;
+         entry->reset();
+@@ -584,6 +590,12 @@ FwdState::complete()
+     }
+ }
+ 
++bool
++FwdState::usingDestination() const
++{
++    return encryptionWait || peerWait || Comm::IsConnOpen(serverConn);
++}
++
+ void
+ FwdState::markStoredReplyAsWhole(const char * const whyWeAreSure)
+ {
+@@ -613,19 +625,19 @@ FwdState::noteDestination(Comm::ConnectionPointer path)
+ 
+     destinations->addPath(path);
+ 
+-    if (Comm::IsConnOpen(serverConn)) {
++    if (usingDestination()) {
+         // We are already using a previously opened connection, so we cannot be
+-        // waiting for connOpener. We still receive destinations for backup.
+-        Must(!opening());
++        // waiting for it. We still receive destinations for backup.
++        Must(!transportWait);
+         return;
+     }
+ 
+-    if (opening()) {
++    if (transportWait) {
+         notifyConnOpener();
+         return; // and continue to wait for FwdState::noteConnection() callback
+     }
+ 
+-    // This is the first path candidate we have seen. Create connOpener.
++    // This is the first path candidate we have seen. Use it.
+     useDestinations();
+ }
+ 
+@@ -653,19 +665,19 @@ FwdState::noteDestinationsEnd(ErrorState *selectionError)
+     // if all of them fail, forwarding as whole will fail
+     Must(!selectionError); // finding at least one path means selection succeeded
+ 
+-    if (Comm::IsConnOpen(serverConn)) {
++    if (usingDestination()) {
+         // We are already using a previously opened connection, so we cannot be
+-        // waiting for connOpener. We were receiving destinations for backup.
+-        Must(!opening());
++        // waiting for it. We were receiving destinations for backup.
++        Must(!transportWait);
+         return;
+     }
+ 
+-    Must(opening()); // or we would be stuck with nothing to do or wait for
++    Must(transportWait); // or we would be stuck with nothing to do or wait for
+     notifyConnOpener();
+     // and continue to wait for FwdState::noteConnection() callback
+ }
+ 
+-/// makes sure connOpener knows that destinations have changed
++/// makes sure connection opener knows that the destinations have changed
+ void
+ FwdState::notifyConnOpener()
+ {
+@@ -674,7 +686,7 @@ FwdState::notifyConnOpener()
+     } else {
+         debugs(17, 7, "notifying about " << *destinations);
+         destinations->notificationPending = true;
+-        CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange);
++        CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange);
+     }
+ }
+ 
+@@ -684,7 +696,7 @@ static void
+ fwdServerClosedWrapper(const CommCloseCbParams &params)
+ {
+     FwdState *fwd = (FwdState *)params.data;
+-    fwd->serverClosed(params.fd);
++    fwd->serverClosed();
+ }
+ 
+ /**** PRIVATE *****************************************************************/
+@@ -754,13 +766,23 @@ FwdState::checkRetriable()
+ }
+ 
+ void
+-FwdState::serverClosed(int fd)
++FwdState::serverClosed()
+ {
+-    // XXX: fd is often -1 here
+-    debugs(17, 2, "FD " << fd << " " << entry->url() << " after " <<
+-           (fd >= 0 ? fd_table[fd].pconn.uses : -1) << " requests");
+-    if (fd >= 0 && serverConnection()->fd == fd)
+-        fwdPconnPool->noteUses(fd_table[fd].pconn.uses);
++    // XXX: This method logic attempts to tolerate Connection::close() called
++    // for serverConn earlier, by one of our dispatch()ed jobs. If that happens,
++    // serverConn will already be closed here or, worse, it will already be open
++    // for the next forwarding attempt. The current code prevents us getting
++    // stuck, but the long term solution is to stop sharing serverConn.
++    debugs(17, 2, serverConn);
++    if (Comm::IsConnOpen(serverConn)) {
++        const auto uses = fd_table[serverConn->fd].pconn.uses;
++        debugs(17, 3, "prior uses: " << uses);
++        fwdPconnPool->noteUses(uses); // XXX: May not have come from fwdPconnPool
++        serverConn->noteClosure();
++    }
++    serverConn = nullptr;
++    closeHandler = nullptr;
++    destinationReceipt = nullptr;
+     retryOrBail();
+ }
+ 
+@@ -802,6 +824,8 @@ FwdState::handleUnregisteredServerEnd()
+ {
+     debugs(17, 2, HERE << "self=" << self << " err=" << err << ' ' << entry->url());
+     assert(!Comm::IsConnOpen(serverConn));
++    serverConn = nullptr;
++    destinationReceipt = nullptr;
+     retryOrBail();
+ }
+ 
+@@ -819,6 +843,8 @@ FwdState::advanceDestination(const char *stepDescription, const Comm::Connection
+     } catch (...) {
+         debugs (17, 2, "exception while trying to " << stepDescription << ": " << CurrentException);
+         closePendingConnection(conn, "connection preparation exception");
++        if (!err)
++            fail(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request, al));
+         retryOrBail();
+     }
+ }
+@@ -830,8 +856,7 @@ FwdState::noteConnection(HappyConnOpener::Answer &answer)
+ {
+     assert(!destinationReceipt);
+ 
+-    calls.connector = nullptr;
+-    connOpener.clear();
++    transportWait.finish();
+ 
+     Must(n_tries <= answer.n_tries); // n_tries cannot decrease
+     n_tries = answer.n_tries;
+@@ -843,6 +868,8 @@ FwdState::noteConnection(HappyConnOpener::Answer &answer)
+         Must(!Comm::IsConnOpen(answer.conn));
+         answer.error.clear(); // preserve error for errorSendComplete()
+     } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
++        // The socket could get closed while our callback was queued. Sync
++        // Connection. XXX: Connection::fd may already be stale/invalid here.
+         // We do not know exactly why the connection got closed, so we play it
+         // safe, allowing retries only for persistent (reused) connections
+         if (answer.reused) {
+@@ -912,23 +939,24 @@ FwdState::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
+     if (!conn->getPeer()->options.no_delay)
+         tunneler->setDelayId(entry->mem_obj->mostBytesAllowed());
+ #endif
+-    AsyncJob::Start(tunneler);
+-    // and wait for the tunnelEstablishmentDone() call
++    peerWait.start(tunneler, callback);
+ }
+ 
+ /// resumes operations after the (possibly failed) HTTP CONNECT exchange
+ void
+ FwdState::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
+ {
++    peerWait.finish();
++
+     ErrorState *error = nullptr;
+     if (!answer.positive()) {
+-        Must(!Comm::IsConnOpen(answer.conn));
++        Must(!answer.conn);
+         error = answer.squidError.get();
+         Must(error);
+         answer.squidError.clear(); // preserve error for fail()
+     } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
+-        // The socket could get closed while our callback was queued.
+-        // We close Connection here to sync Connection::fd.
++        // The socket could get closed while our callback was queued. Sync
++        // Connection. XXX: Connection::fd may already be stale/invalid here.
+         closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone");
+         error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al);
+     } else if (!answer.leftovers.isEmpty()) {
+@@ -998,18 +1026,21 @@ FwdState::secureConnectionToPeer(const Comm::ConnectionPointer &conn)
+ #endif
+         connector = new Security::BlindPeerConnector(requestPointer, conn, callback, al, sslNegotiationTimeout);
+     connector->noteFwdPconnUse = true;
+-    AsyncJob::Start(connector); // will call our callback
++    encryptionWait.start(connector, callback);
+ }
+ 
+ /// called when all negotiations with the TLS-speaking peer have been completed
+ void
+ FwdState::connectedToPeer(Security::EncryptorAnswer &answer)
+ {
++    encryptionWait.finish();
++
+     ErrorState *error = nullptr;
+     if ((error = answer.error.get())) {
+-        Must(!Comm::IsConnOpen(answer.conn));
++        assert(!answer.conn);
+         answer.error.clear(); // preserve error for errorSendComplete()
+     } else if (answer.tunneled) {
++        assert(!answer.conn);
+         // TODO: When ConnStateData establishes tunnels, its state changes
+         // [in ways that may affect logging?]. Consider informing
+         // ConnStateData about our tunnel or otherwise unifying tunnel
+@@ -1019,6 +1050,8 @@ FwdState::connectedToPeer(Security::EncryptorAnswer &answer)
+         complete(); // destroys us
+         return;
+     } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
++        // The socket could get closed while our callback was queued. Sync
++        // Connection. XXX: Connection::fd may already be stale/invalid here.
+         closePendingConnection(answer.conn, "conn was closed while waiting for connectedToPeer");
+         error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request, al);
+     }
+@@ -1091,22 +1124,20 @@ FwdState::connectStart()
+     Must(!request->pinnedConnection());
+ 
+     assert(!destinations->empty());
+-    assert(!opening());
++    assert(!usingDestination());
+ 
+     // Ditch error page if it was created before.
+     // A new one will be created if there's another problem
+     delete err;
+     err = nullptr;
+     request->clearError();
+-    serverConn = nullptr;
+-    destinationReceipt = nullptr;
+ 
+     request->hier.startPeerClock();
+ 
+-    calls.connector = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this));
++    AsyncCall::Pointer callback = asyncCall(17, 5, "FwdState::noteConnection", HappyConnOpener::CbDialer<FwdState>(&FwdState::noteConnection, this));
+ 
+     HttpRequest::Pointer cause = request;
+-    const auto cs = new HappyConnOpener(destinations, calls.connector, cause, start_t, n_tries, al);
++    const auto cs = new HappyConnOpener(destinations, callback, cause, start_t, n_tries, al);
+     cs->setHost(request->url.host());
+     bool retriable = checkRetriable();
+     if (!retriable && Config.accessList.serverPconnForNonretriable) {
+@@ -1118,8 +1149,7 @@ FwdState::connectStart()
+     cs->setRetriable(retriable);
+     cs->allowPersistent(pconnRace != raceHappened);
+     destinations->notificationPending = true; // start() is async
+-    connOpener = cs;
+-    AsyncJob::Start(cs);
++    transportWait.start(cs, callback);
+ }
+ 
+ /// send request on an existing connection dedicated to the requesting client
+diff --git a/src/FwdState.h b/src/FwdState.h
+index de75f33cc..ebf5f82b1 100644
+--- a/src/FwdState.h
++++ b/src/FwdState.h
+@@ -9,13 +9,12 @@
+ #ifndef SQUID_FORWARD_H
+ #define SQUID_FORWARD_H
+ 
+-#include "base/CbcPointer.h"
+ #include "base/forward.h"
++#include "base/JobWait.h"
+ #include "base/RefCount.h"
+ #include "clients/forward.h"
+ #include "comm.h"
+ #include "comm/Connection.h"
+-#include "comm/ConnOpener.h"
+ #include "error/forward.h"
+ #include "fde.h"
+ #include "http/StatusCode.h"
+@@ -38,7 +37,6 @@ class ResolvedPeers;
+ typedef RefCount<ResolvedPeers> ResolvedPeersPointer;
+ 
+ class HappyConnOpener;
+-typedef CbcPointer<HappyConnOpener> HappyConnOpenerPointer;
+ class HappyConnOpenerAnswer;
+ 
+ /// Sets initial TOS value and Netfilter for the future outgoing connection.
+@@ -87,7 +85,7 @@ public:
+     void handleUnregisteredServerEnd();
+     int reforward();
+     bool reforwardableStatus(const Http::StatusCode s) const;
+-    void serverClosed(int fd);
++    void serverClosed();
+     void connectStart();
+     void connectDone(const Comm::ConnectionPointer & conn, Comm::Flag status, int xerrno);
+     bool checkRetry();
+@@ -115,6 +113,9 @@ private:
+     /* PeerSelectionInitiator API */
+     virtual void noteDestination(Comm::ConnectionPointer conn) override;
+     virtual void noteDestinationsEnd(ErrorState *selectionError) override;
++    /// whether the successfully selected path destination or the established
++    /// server connection is still in use
++    bool usingDestination() const;
+ 
+     void noteConnection(HappyConnOpenerAnswer &);
+ 
+@@ -157,13 +158,10 @@ private:
+     /// \returns the time left for this connection to become connected or 1 second if it is less than one second left
+     time_t connectingTimeout(const Comm::ConnectionPointer &conn) const;
+ 
+-    /// whether we are waiting for HappyConnOpener
+-    /// same as calls.connector but may differ from connOpener.valid()
+-    bool opening() const { return connOpener.set(); }
+-
+-    void cancelOpening(const char *reason);
++    void cancelStep(const char *reason);
+ 
+     void notifyConnOpener();
++    void reactToZeroSizeObject();
+ 
+     void updateAleWithFinalError();
+ 
+@@ -182,11 +180,6 @@ private:
+     time_t start_t;
+     int n_tries; ///< the number of forwarding attempts so far
+ 
+-    // AsyncCalls which we set and may need cancelling.
+-    struct {
+-        AsyncCall::Pointer connector;  ///< a call linking us to the ConnOpener producing serverConn.
+-    } calls;
+-
+     struct {
+         bool connected_okay; ///< TCP link ever opened properly. This affects retry of POST,PUT,CONNECT,etc
+         bool dont_retry;
+@@ -194,7 +187,16 @@ private:
+         bool destinationsFound; ///< at least one candidate path found
+     } flags;
+ 
+-    HappyConnOpenerPointer connOpener; ///< current connection opening job
++    /// waits for a transport connection to the peer to be established/opened
++    JobWait<HappyConnOpener> transportWait;
++
++    /// waits for the established transport connection to be secured/encrypted
++    JobWait<Security::PeerConnector> encryptionWait;
++
++    /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated
++    /// over the (encrypted, if needed) transport connection to that cache_peer
++    JobWait<Http::Tunneler> peerWait;
++
+     ResolvedPeersPointer destinations; ///< paths for forwarding the request
+     Comm::ConnectionPointer serverConn; ///< a successfully opened connection to a server.
+     PeerConnectionPointer destinationReceipt; ///< peer selection result (or nil)
+diff --git a/src/HappyConnOpener.cc b/src/HappyConnOpener.cc
+index 6e0872506..6d83ff14f 100644
+--- a/src/HappyConnOpener.cc
++++ b/src/HappyConnOpener.cc
+@@ -18,6 +18,7 @@
+ #include "neighbors.h"
+ #include "pconn.h"
+ #include "PeerPoolMgr.h"
++#include "sbuf/Stream.h"
+ #include "SquidConfig.h"
+ 
+ CBDATA_CLASS_INIT(HappyConnOpener);
+@@ -330,6 +331,8 @@ HappyConnOpener::HappyConnOpener(const ResolvedPeers::Pointer &dests, const Asyn
+     fwdStart(aFwdStart),
+     callback_(aCall),
+     destinations(dests),
++    prime(&HappyConnOpener::notePrimeConnectDone, "HappyConnOpener::notePrimeConnectDone"),
++    spare(&HappyConnOpener::noteSpareConnectDone, "HappyConnOpener::noteSpareConnectDone"),
+     ale(anAle),
+     cause(request),
+     n_tries(tries)
+@@ -410,33 +413,43 @@ HappyConnOpener::swanSong()
+     AsyncJob::swanSong();
+ }
+ 
++/// HappyConnOpener::Attempt printer for debugging
++std::ostream &
++operator <<(std::ostream &os, const HappyConnOpener::Attempt &attempt)
++{
++    if (!attempt.path)
++        os << '-';
++    else if (attempt.path->isOpen())
++        os << "FD " << attempt.path->fd;
++    else if (attempt.connWait)
++        os << attempt.connWait;
++    else // destination is known; connection closed (and we are not opening any)
++        os << attempt.path->id;
++    return os;
++}
++
+ const char *
+ HappyConnOpener::status() const
+ {
+-    static MemBuf buf;
+-    buf.reset();
++    // TODO: In a redesigned status() API, the caller may mimic this approach.
++    static SBuf buf;
++    buf.clear();
+ 
+-    buf.append(" [", 2);
++    SBufStream os(buf);
++
++    os.write(" [", 2);
+     if (stopReason)
+-        buf.appendf("Stopped, reason:%s", stopReason);
+-    if (prime) {
+-        if (prime.path && prime.path->isOpen())
+-            buf.appendf(" prime FD %d", prime.path->fd);
+-        else if (prime.connector)
+-            buf.appendf(" prime call%ud", prime.connector->id.value);
+-    }
+-    if (spare) {
+-        if (spare.path && spare.path->isOpen())
+-            buf.appendf(" spare FD %d", spare.path->fd);
+-        else if (spare.connector)
+-            buf.appendf(" spare call%ud", spare.connector->id.value);
+-    }
++        os << "Stopped:" << stopReason;
++    if (prime)
++        os << "prime:" << prime;
++    if (spare)
++        os << "spare:" << spare;
+     if (n_tries)
+-        buf.appendf(" tries %d", n_tries);
+-    buf.appendf(" %s%u]", id.prefix(), id.value);
+-    buf.terminate();
++        os << " tries:" << n_tries;
++    os << ' ' << id << ']';
+ 
+-    return buf.content();
++    buf = os.buf();
++    return buf.c_str();
+ }
+ 
+ /// Create "503 Service Unavailable" or "504 Gateway Timeout" error depending
+@@ -516,7 +529,7 @@ void
+ HappyConnOpener::startConnecting(Attempt &attempt, PeerConnectionPointer &dest)
+ {
+     Must(!attempt.path);
+-    Must(!attempt.connector);
++    Must(!attempt.connWait);
+     Must(dest);
+ 
+     const auto bumpThroughPeer = cause->flags.sslBumped && dest->getPeer();
+@@ -552,51 +565,52 @@ HappyConnOpener::openFreshConnection(Attempt &attempt, PeerConnectionPointer &de
+     entry->mem_obj->checkUrlChecksum();
+ #endif
+ 
+-    GetMarkingsToServer(cause.getRaw(), *dest);
++    const auto conn = dest->cloneProfile();
++    GetMarkingsToServer(cause.getRaw(), *conn);
+ 
+-    // ConnOpener modifies its destination argument so we reset the source port
+-    // in case we are reusing the destination already used by our predecessor.
+-    dest->local.port(0);
+     ++n_tries;
+ 
+     typedef CommCbMemFunT<HappyConnOpener, CommConnectCbParams> Dialer;
+-    AsyncCall::Pointer callConnect = JobCallback(48, 5, Dialer, this, HappyConnOpener::connectDone);
++    AsyncCall::Pointer callConnect = asyncCall(48, 5, attempt.callbackMethodName,
++                                     Dialer(this, attempt.callbackMethod));
+     const time_t connTimeout = dest->connectTimeout(fwdStart);
+-    Comm::ConnOpener *cs = new Comm::ConnOpener(dest, callConnect, connTimeout);
+-    if (!dest->getPeer())
++    auto cs = new Comm::ConnOpener(conn, callConnect, connTimeout);
++    if (!conn->getPeer())
+         cs->setHost(host_);
+ 
+-    attempt.path = dest;
+-    attempt.connector = callConnect;
+-    attempt.opener = cs;
++    attempt.path = dest; // but not the being-opened conn!
++    attempt.connWait.start(cs, callConnect);
++}
+ 
+-    AsyncJob::Start(cs);
++/// Comm::ConnOpener callback for the prime connection attempt
++void
++HappyConnOpener::notePrimeConnectDone(const CommConnectCbParams &params)
++{
++    handleConnOpenerAnswer(prime, params, "new prime connection");
+ }
+ 
+-/// called by Comm::ConnOpener objects after a prime or spare connection attempt
+-/// completes (successfully or not)
++/// Comm::ConnOpener callback for the spare connection attempt
+ void
+-HappyConnOpener::connectDone(const CommConnectCbParams &params)
++HappyConnOpener::noteSpareConnectDone(const CommConnectCbParams &params)
+ {
+-    Must(params.conn);
+-    const bool itWasPrime = (params.conn == prime.path);
+-    const bool itWasSpare = (params.conn == spare.path);
+-    Must(itWasPrime != itWasSpare);
+-
+-    PeerConnectionPointer handledPath;
+-    if (itWasPrime) {
+-        handledPath = prime.path;
+-        prime.finish();
+-    } else {
+-        handledPath = spare.path;
+-        spare.finish();
+-        if (gotSpareAllowance) {
+-            TheSpareAllowanceGiver.jobUsedAllowance();
+-            gotSpareAllowance = false;
+-        }
++    if (gotSpareAllowance) {
++        TheSpareAllowanceGiver.jobUsedAllowance();
++        gotSpareAllowance = false;
+     }
++    handleConnOpenerAnswer(spare, params, "new spare connection");
++}
++
++/// prime/spare-agnostic processing of a Comm::ConnOpener result
++void
++HappyConnOpener::handleConnOpenerAnswer(Attempt &attempt, const CommConnectCbParams &params, const char *what)
++{
++    Must(params.conn);
++
++    // finalize the previously selected path before attempt.finish() forgets it
++    auto handledPath = attempt.path;
++    handledPath.finalize(params.conn); // closed on errors
++    attempt.finish();
+ 
+-    const char *what = itWasPrime ? "new prime connection" : "new spare connection";
+     if (params.flag == Comm::OK) {
+         sendSuccess(handledPath, false, what);
+         return;
+@@ -605,7 +619,6 @@ HappyConnOpener::connectDone(const CommConnectCbParams &params)
+     debugs(17, 8, what << " failed: " << params.conn);
+     if (const auto peer = params.conn->getPeer())
+         peerConnectFailed(peer);
+-    params.conn->close(); // TODO: Comm::ConnOpener should do this instead.
+ 
+     // remember the last failure (we forward it if we cannot connect anywhere)
+     lastFailedConnection = handledPath;
+@@ -881,13 +894,23 @@ HappyConnOpener::ranOutOfTimeOrAttempts() const
+     return false;
+ }
+ 
++HappyConnOpener::Attempt::Attempt(const CallbackMethod method, const char *methodName):
++    callbackMethod(method),
++    callbackMethodName(methodName)
++{
++}
++
++void
++HappyConnOpener::Attempt::finish()
++{
++    connWait.finish();
++    path = nullptr;
++}
++
+ void
+ HappyConnOpener::Attempt::cancel(const char *reason)
+ {
+-    if (connector) {
+-        connector->cancel(reason);
+-        CallJobHere(17, 3, opener, Comm::ConnOpener, noteAbort);
+-    }
+-    clear();
++    connWait.cancel(reason);
++    path = nullptr;
+ }
+ 
+diff --git a/src/HappyConnOpener.h b/src/HappyConnOpener.h
+index 4f3135c60..c57c431ad 100644
+--- a/src/HappyConnOpener.h
++++ b/src/HappyConnOpener.h
+@@ -156,22 +156,28 @@ private:
+     /// a connection opening attempt in progress (or falsy)
+     class Attempt {
+     public:
++        /// HappyConnOpener method implementing a ConnOpener callback
++        using CallbackMethod = void (HappyConnOpener::*)(const CommConnectCbParams &);
++
++        Attempt(const CallbackMethod method, const char *methodName);
++
+         explicit operator bool() const { return static_cast<bool>(path); }
+ 
+         /// reacts to a natural attempt completion (successful or otherwise)
+-        void finish() { clear(); }
++        void finish();
+ 
+         /// aborts an in-progress attempt
+         void cancel(const char *reason);
+ 
+         PeerConnectionPointer path; ///< the destination we are connecting to
+-        AsyncCall::Pointer connector; ///< our opener callback
+-        Comm::ConnOpener::Pointer opener; ///< connects to path and calls us
+ 
+-    private:
+-        /// cleans up after the attempt ends (successfully or otherwise)
+-        void clear() { path = nullptr; connector = nullptr; opener = nullptr; }
++        /// waits for a connection to the peer to be established/opened
++        JobWait<Comm::ConnOpener> connWait;
++
++        const CallbackMethod callbackMethod; ///< ConnOpener calls this method
++        const char * const callbackMethodName; ///< for callbackMethod debugging
+     };
++    friend std::ostream &operator <<(std::ostream &, const Attempt &);
+ 
+     /* AsyncJob API */
+     virtual void start() override;
+@@ -190,7 +196,9 @@ private:
+     void openFreshConnection(Attempt &, PeerConnectionPointer &);
+     bool reuseOldConnection(PeerConnectionPointer &);
+ 
+-    void connectDone(const CommConnectCbParams &);
++    void notePrimeConnectDone(const CommConnectCbParams &);
++    void noteSpareConnectDone(const CommConnectCbParams &);
++    void handleConnOpenerAnswer(Attempt &, const CommConnectCbParams &, const char *connDescription);
+ 
+     void checkForNewConnection();
+ 
+diff --git a/src/PeerPoolMgr.cc b/src/PeerPoolMgr.cc
+index 2caa09f44..7423dd669 100644
+--- a/src/PeerPoolMgr.cc
++++ b/src/PeerPoolMgr.cc
+@@ -43,9 +43,8 @@ public:
+ PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"),
+     peer(cbdataReference(aPeer)),
+     request(),
+-    opener(),
+-    securer(),
+-    closer(),
++    transportWait(),
++    encryptionWait(),
+     addrUsed(0)
+ {
+ }
+@@ -90,7 +89,7 @@ PeerPoolMgr::doneAll() const
+ void
+ PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
+ {
+-    opener = NULL;
++    transportWait.finish();
+ 
+     if (!validPeer()) {
+         debugs(48, 3, "peer gone");
+@@ -100,9 +99,6 @@ PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
+     }
+ 
+     if (params.flag != Comm::OK) {
+-        /* it might have been a timeout with a partially open link */
+-        if (params.conn != NULL)
+-            params.conn->close();
+         peerConnectFailed(peer);
+         checkpoint("conn opening failure"); // may retry
+         return;
+@@ -112,20 +108,16 @@ PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams &params)
+ 
+     // Handle TLS peers.
+     if (peer->secure.encryptTransport) {
+-        typedef CommCbMemFunT<PeerPoolMgr, CommCloseCbParams> CloserDialer;
+-        closer = JobCallback(48, 3, CloserDialer, this,
+-                             PeerPoolMgr::handleSecureClosure);
+-        comm_add_close_handler(params.conn->fd, closer);
+-
+-        securer = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
+-                            MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
++        // XXX: Exceptions orphan params.conn
++        AsyncCall::Pointer callback = asyncCall(48, 4, "PeerPoolMgr::handleSecuredPeer",
++                                                MyAnswerDialer(this, &PeerPoolMgr::handleSecuredPeer));
+ 
+         const int peerTimeout = peerConnectTimeout(peer);
+         const int timeUsed = squid_curtime - params.conn->startTime();
+         // Use positive timeout when less than one second is left for conn.
+         const int timeLeft = positiveTimeout(peerTimeout - timeUsed);
+-        auto *connector = new Security::BlindPeerConnector(request, params.conn, securer, nullptr, timeLeft);
+-        AsyncJob::Start(connector); // will call our callback
++        const auto connector = new Security::BlindPeerConnector(request, params.conn, callback, nullptr, timeLeft);
++        encryptionWait.start(connector, callback);
+         return;
+     }
+ 
+@@ -144,16 +136,7 @@ PeerPoolMgr::pushNewConnection(const Comm::ConnectionPointer &conn)
+ void
+ PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer)
+ {
+-    Must(securer != NULL);
+-    securer = NULL;
+-
+-    if (closer != NULL) {
+-        if (answer.conn != NULL)
+-            comm_remove_close_handler(answer.conn->fd, closer);
+-        else
+-            closer->cancel("securing completed");
+-        closer = NULL;
+-    }
++    encryptionWait.finish();
+ 
+     if (!validPeer()) {
+         debugs(48, 3, "peer gone");
+@@ -162,35 +145,33 @@ PeerPoolMgr::handleSecuredPeer(Security::EncryptorAnswer &answer)
+         return;
+     }
+ 
++    assert(!answer.tunneled);
+     if (answer.error.get()) {
+-        if (answer.conn != NULL)
+-            answer.conn->close();
++        assert(!answer.conn);
+         // PeerConnector calls peerConnectFailed() for us;
+         checkpoint("conn securing failure"); // may retry
+         return;
+     }
+ 
+-    pushNewConnection(answer.conn);
+-}
++    assert(answer.conn);
+ 
+-void
+-PeerPoolMgr::handleSecureClosure(const CommCloseCbParams &params)
+-{
+-    Must(closer != NULL);
+-    Must(securer != NULL);
+-    securer->cancel("conn closed by a 3rd party");
+-    securer = NULL;
+-    closer = NULL;
+-    // allow the closing connection to fully close before we check again
+-    Checkpoint(this, "conn closure while securing");
++    // The socket could get closed while our callback was queued. Sync
++    // Connection. XXX: Connection::fd may already be stale/invalid here.
++    if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) {
++        answer.conn->noteClosure();
++        checkpoint("external connection closure"); // may retry
++        return;
++    }
++
++    pushNewConnection(answer.conn);
+ }
+ 
+ void
+ PeerPoolMgr::openNewConnection()
+ {
+     // KISS: Do nothing else when we are already doing something.
+-    if (opener != NULL || securer != NULL || shutting_down) {
+-        debugs(48, 7, "busy: " << opener << '|' << securer << '|' << shutting_down);
++    if (transportWait || encryptionWait || shutting_down) {
++        debugs(48, 7, "busy: " << transportWait << '|' << encryptionWait << '|' << shutting_down);
+         return; // there will be another checkpoint when we are done opening/securing
+     }
+ 
+@@ -227,9 +208,9 @@ PeerPoolMgr::openNewConnection()
+ 
+     const int ctimeout = peerConnectTimeout(peer);
+     typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer;
+-    opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
+-    Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout);
+-    AsyncJob::Start(cs);
++    AsyncCall::Pointer callback = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection);
++    const auto cs = new Comm::ConnOpener(conn, callback, ctimeout);
++    transportWait.start(cs, callback);
+ }
+ 
+ void
+diff --git a/src/PeerPoolMgr.h b/src/PeerPoolMgr.h
+index 217994393..6da61b10b 100644
+--- a/src/PeerPoolMgr.h
++++ b/src/PeerPoolMgr.h
+@@ -10,6 +10,7 @@
+ #define SQUID_PEERPOOLMGR_H
+ 
+ #include "base/AsyncJob.h"
++#include "base/JobWait.h"
+ #include "comm/forward.h"
+ #include "security/forward.h"
+ 
+@@ -54,18 +55,19 @@ protected:
+     /// Security::PeerConnector callback
+     void handleSecuredPeer(Security::EncryptorAnswer &answer);
+ 
+-    /// called when the connection we are trying to secure is closed by a 3rd party
+-    void handleSecureClosure(const CommCloseCbParams &params);
+-
+     /// the final step in connection opening (and, optionally, securing) sequence
+     void pushNewConnection(const Comm::ConnectionPointer &conn);
+ 
+ private:
+     CachePeer *peer; ///< the owner of the pool we manage
+     RefCount<HttpRequest> request; ///< fake HTTP request for conn opening code
+-    AsyncCall::Pointer opener; ///< whether we are opening a connection
+-    AsyncCall::Pointer securer; ///< whether we are securing a connection
+-    AsyncCall::Pointer closer; ///< monitors conn while we are securing it
++
++    /// waits for a transport connection to the peer to be established/opened
++    JobWait<Comm::ConnOpener> transportWait;
++
++    /// waits for the established transport connection to be secured/encrypted
++    JobWait<Security::BlindPeerConnector> encryptionWait;
++
+     unsigned int addrUsed; ///< counter for cycling through peer addresses
+ };
+ 
+diff --git a/src/ResolvedPeers.cc b/src/ResolvedPeers.cc
+index 5b2c6740d..06ab02d23 100644
+--- a/src/ResolvedPeers.cc
++++ b/src/ResolvedPeers.cc
+@@ -151,7 +151,7 @@ ResolvedPeers::extractFound(const char *description, const Paths::iterator &foun
+         while (++pathsToSkip < paths_.size() && !paths_[pathsToSkip].available) {}
+     }
+ 
+-    const auto cleanPath = path.connection->cloneDestinationDetails();
++    const auto cleanPath = path.connection->cloneProfile();
+     return PeerConnectionPointer(cleanPath, found - paths_.begin());
+ }
+ 
+diff --git a/src/adaptation/icap/ModXact.cc b/src/adaptation/icap/ModXact.cc
+index bbeebdbb5..982f38f8d 100644
+--- a/src/adaptation/icap/ModXact.cc
++++ b/src/adaptation/icap/ModXact.cc
+@@ -187,8 +187,7 @@ void Adaptation::Icap::ModXact::startWriting()
+     openConnection();
+ }
+ 
+-// connection with the ICAP service established
+-void Adaptation::Icap::ModXact::handleCommConnected()
++void Adaptation::Icap::ModXact::startShoveling()
+ {
+     Must(state.writing == State::writingConnect);
+ 
+diff --git a/src/adaptation/icap/ModXact.h b/src/adaptation/icap/ModXact.h
+index f8988ac29..2fda1318b 100644
+--- a/src/adaptation/icap/ModXact.h
++++ b/src/adaptation/icap/ModXact.h
+@@ -155,10 +155,11 @@ public:
+     virtual void noteBodyProductionEnded(BodyPipe::Pointer);
+     virtual void noteBodyProducerAborted(BodyPipe::Pointer);
+ 
+-    // comm handlers
+-    virtual void handleCommConnected();
++    /* Xaction API */
++    virtual void startShoveling();
+     virtual void handleCommWrote(size_t size);
+     virtual void handleCommRead(size_t size);
++
+     void handleCommWroteHeaders();
+     void handleCommWroteBody();
+ 
+diff --git a/src/adaptation/icap/OptXact.cc b/src/adaptation/icap/OptXact.cc
+index 5ed4fbfbb..2464ea533 100644
+--- a/src/adaptation/icap/OptXact.cc
++++ b/src/adaptation/icap/OptXact.cc
+@@ -37,7 +37,7 @@ void Adaptation::Icap::OptXact::start()
+     openConnection();
+ }
+ 
+-void Adaptation::Icap::OptXact::handleCommConnected()
++void Adaptation::Icap::OptXact::startShoveling()
+ {
+     scheduleRead();
+ 
+diff --git a/src/adaptation/icap/OptXact.h b/src/adaptation/icap/OptXact.h
+index 3811ab48f..725cd6225 100644
+--- a/src/adaptation/icap/OptXact.h
++++ b/src/adaptation/icap/OptXact.h
+@@ -30,8 +30,9 @@ public:
+     OptXact(ServiceRep::Pointer &aService);
+ 
+ protected:
++    /* Xaction API */
+     virtual void start();
+-    virtual void handleCommConnected();
++    virtual void startShoveling();
+     virtual void handleCommWrote(size_t size);
+     virtual void handleCommRead(size_t size);
+ 
+diff --git a/src/adaptation/icap/ServiceRep.cc b/src/adaptation/icap/ServiceRep.cc
+index 6df475712..fbd896888 100644
+--- a/src/adaptation/icap/ServiceRep.cc
++++ b/src/adaptation/icap/ServiceRep.cc
+@@ -112,9 +112,10 @@ void Adaptation::Icap::ServiceRep::noteFailure()
+     // should be configurable.
+ }
+ 
+-// returns a persistent or brand new connection; negative int on failures
++// TODO: getIdleConnection() and putConnection()/noteConnectionFailed() manage a
++// "used connection slot" resource. Automate that resource tracking (RAII/etc.).
+ Comm::ConnectionPointer
+-Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused)
++Adaptation::Icap::ServiceRep::getIdleConnection(const bool retriableXact)
+ {
+     Comm::ConnectionPointer connection;
+ 
+@@ -137,7 +138,6 @@ Adaptation::Icap::ServiceRep::getConnection(bool retriableXact, bool &reused)
+     else
+         theIdleConns->closeN(1);
+ 
+-    reused = Comm::IsConnOpen(connection);
+     ++theBusyConns;
+     debugs(93,3, HERE << "got connection: " << connection);
+     return connection;
+@@ -150,7 +150,6 @@ void Adaptation::Icap::ServiceRep::putConnection(const Comm::ConnectionPointer &
+     // do not pool an idle connection if we owe connections
+     if (isReusable && excessConnections() == 0) {
+         debugs(93, 3, HERE << "pushing pconn" << comment);
+-        commUnsetConnTimeout(conn);
+         theIdleConns->push(conn);
+     } else {
+         debugs(93, 3, HERE << (sendReset ? "RST" : "FIN") << "-closing " <<
+diff --git a/src/adaptation/icap/ServiceRep.h b/src/adaptation/icap/ServiceRep.h
+index f2e893244..cca2df4ab 100644
+--- a/src/adaptation/icap/ServiceRep.h
++++ b/src/adaptation/icap/ServiceRep.h
+@@ -85,7 +85,8 @@ public:
+     bool wantsPreview(const SBuf &urlPath, size_t &wantedSize) const;
+     bool allows204() const;
+     bool allows206() const;
+-    Comm::ConnectionPointer getConnection(bool isRetriable, bool &isReused);
++    /// \returns an idle persistent ICAP connection or nil
++    Comm::ConnectionPointer getIdleConnection(bool isRetriable);
+     void putConnection(const Comm::ConnectionPointer &conn, bool isReusable, bool sendReset, const char *comment);
+     void noteConnectionUse(const Comm::ConnectionPointer &conn);
+     void noteConnectionFailed(const char *comment);
+diff --git a/src/adaptation/icap/Xaction.cc b/src/adaptation/icap/Xaction.cc
+index 9aacf2e1b..962ed1703 100644
+--- a/src/adaptation/icap/Xaction.cc
++++ b/src/adaptation/icap/Xaction.cc
+@@ -13,6 +13,7 @@
+ #include "adaptation/icap/Config.h"
+ #include "adaptation/icap/Launcher.h"
+ #include "adaptation/icap/Xaction.h"
++#include "base/JobWait.h"
+ #include "base/TextException.h"
+ #include "comm.h"
+ #include "comm/Connection.h"
+@@ -79,7 +80,6 @@ Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::Serv
+     icapRequest(NULL),
+     icapReply(NULL),
+     attempts(0),
+-    connection(NULL),
+     theService(aService),
+     commEof(false),
+     reuseConnection(true),
+@@ -87,14 +87,8 @@ Adaptation::Icap::Xaction::Xaction(const char *aTypeName, Adaptation::Icap::Serv
+     isRepeatable(true),
+     ignoreLastWrite(false),
+     waitingForDns(false),
+-    stopReason(NULL),
+-    connector(NULL),
+-    reader(NULL),
+-    writer(NULL),
+-    closer(NULL),
+     alep(new AccessLogEntry),
+-    al(*alep),
+-    cs(NULL)
++    al(*alep)
+ {
+     debugs(93,3, typeName << " constructed, this=" << this <<
+            " [icapx" << id << ']'); // we should not call virtual status() here
+@@ -150,6 +144,8 @@ static void
+ icapLookupDnsResults(const ipcache_addrs *ia, const Dns::LookupDetails &, void *data)
+ {
+     Adaptation::Icap::Xaction *xa = static_cast<Adaptation::Icap::Xaction *>(data);
++    /// TODO: refactor with CallJobHere1, passing either std::optional (after upgrading to C++17)
++    /// or Optional<Ip::Address> (when it can take non-trivial types)
+     xa->dnsLookupDone(ia);
+ }
+ 
+@@ -164,21 +160,8 @@ Adaptation::Icap::Xaction::openConnection()
+     if (!TheConfig.reuse_connections)
+         disableRetries(); // this will also safely drain pconn pool
+ 
+-    bool wasReused = false;
+-    connection = s.getConnection(isRetriable, wasReused);
+-
+-    if (wasReused && Comm::IsConnOpen(connection)) {
+-        // Set comm Close handler
+-        // fake the connect callback
+-        // TODO: can we sync call Adaptation::Icap::Xaction::noteCommConnected here instead?
+-        typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
+-        CbcPointer<Xaction> self(this);
+-        Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
+-        dialer.params.conn = connection;
+-        dialer.params.flag = Comm::OK;
+-        // fake other parameters by copying from the existing connection
+-        connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
+-        ScheduleCallHere(connector);
++    if (const auto pconn = s.getIdleConnection(isRetriable)) {
++        useTransportConnection(pconn);
+         return;
+     }
+ 
+@@ -207,30 +190,22 @@ Adaptation::Icap::Xaction::dnsLookupDone(const ipcache_addrs *ia)
+ #if WHEN_IPCACHE_NBGETHOSTBYNAME_USES_ASYNC_CALLS
+         dieOnConnectionFailure(); // throws
+ #else // take a step back into protected Async call dialing.
+-        // fake the connect callback
+-        typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> Dialer;
+-        CbcPointer<Xaction> self(this);
+-        Dialer dialer(self, &Adaptation::Icap::Xaction::noteCommConnected);
+-        dialer.params.conn = connection;
+-        dialer.params.flag = Comm::COMM_ERROR;
+-        // fake other parameters by copying from the existing connection
+-        connector = asyncCall(93,3, "Adaptation::Icap::Xaction::noteCommConnected", dialer);
+-        ScheduleCallHere(connector);
++        CallJobHere(93, 3, this, Xaction, Xaction::dieOnConnectionFailure);
+ #endif
+         return;
+     }
+ 
+-    connection = new Comm::Connection;
+-    connection->remote = ia->current();
+-    connection->remote.port(s.cfg().port);
+-    getOutgoingAddress(NULL, connection);
++    const Comm::ConnectionPointer conn = new Comm::Connection();
++    conn->remote = ia->current();
++    conn->remote.port(s.cfg().port);
++    getOutgoingAddress(nullptr, conn);
+ 
+     // TODO: service bypass status may differ from that of a transaction
+     typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommConnectCbParams> ConnectDialer;
+-    connector = JobCallback(93,3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
+-    cs = new Comm::ConnOpener(connection, connector, TheConfig.connect_timeout(service().cfg().bypass));
++    AsyncCall::Pointer callback = JobCallback(93, 3, ConnectDialer, this, Adaptation::Icap::Xaction::noteCommConnected);
++    const auto cs = new Comm::ConnOpener(conn, callback, TheConfig.connect_timeout(service().cfg().bypass));
+     cs->setHost(s.cfg().host.termedBuf());
+-    AsyncJob::Start(cs.get());
++    transportWait.start(cs, callback);
+ }
+ 
+ /*
+@@ -256,6 +231,8 @@ void Adaptation::Icap::Xaction::closeConnection()
+             closer = NULL;
+         }
+ 
++        commUnsetConnTimeout(connection);
++
+         cancelRead(); // may not work
+ 
+         if (reuseConnection && !doneWithIo()) {
+@@ -275,54 +252,65 @@ void Adaptation::Icap::Xaction::closeConnection()
+ 
+         writer = NULL;
+         reader = NULL;
+-        connector = NULL;
+         connection = NULL;
+     }
+ }
+ 
+-// connection with the ICAP service established
++/// called when the connection attempt to an ICAP service completes (successfully or not)
+ void Adaptation::Icap::Xaction::noteCommConnected(const CommConnectCbParams &io)
+ {
+-    cs = NULL;
++    transportWait.finish();
+ 
+-    if (io.flag == Comm::TIMEOUT) {
+-        handleCommTimedout();
++    if (io.flag != Comm::OK) {
++        dieOnConnectionFailure(); // throws
+         return;
+     }
+ 
+-    Must(connector != NULL);
+-    connector = NULL;
+-
+-    if (io.flag != Comm::OK)
+-        dieOnConnectionFailure(); // throws
+-
+-    typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommTimeoutCbParams> TimeoutDialer;
+-    AsyncCall::Pointer timeoutCall =  asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommTimedout",
+-                                      TimeoutDialer(this,&Adaptation::Icap::Xaction::noteCommTimedout));
+-    commSetConnTimeout(io.conn, TheConfig.connect_timeout(service().cfg().bypass), timeoutCall);
++    useTransportConnection(io.conn);
++}
+ 
+-    typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
+-    closer =  asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
+-                        CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
+-    comm_add_close_handler(io.conn->fd, closer);
++/// React to the availability of a transport connection to the ICAP service.
++/// The given connection may (or may not) be secured already.
++void
++Adaptation::Icap::Xaction::useTransportConnection(const Comm::ConnectionPointer &conn)
++{
++    assert(Comm::IsConnOpen(conn));
++    assert(!connection);
+ 
+     // If it is a reused connection and the TLS object is built
+     // we should not negotiate new TLS session
+-    const auto &ssl = fd_table[io.conn->fd].ssl;
++    const auto &ssl = fd_table[conn->fd].ssl;
+     if (!ssl && service().cfg().secure.encryptTransport) {
++        // XXX: Exceptions orphan conn.
+         CbcPointer<Adaptation::Icap::Xaction> me(this);
+-        securer = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer",
+-                            MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer));
++        AsyncCall::Pointer callback = asyncCall(93, 4, "Adaptation::Icap::Xaction::handleSecuredPeer",
++                                                MyIcapAnswerDialer(me, &Adaptation::Icap::Xaction::handleSecuredPeer));
+ 
+-        auto *sslConnector = new Ssl::IcapPeerConnector(theService, io.conn, securer, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass));
+-        AsyncJob::Start(sslConnector); // will call our callback
++        const auto sslConnector = new Ssl::IcapPeerConnector(theService, conn, callback, masterLogEntry(), TheConfig.connect_timeout(service().cfg().bypass));
++
++        encryptionWait.start(sslConnector, callback);
+         return;
+     }
+ 
+-// ??    fd_table[io.conn->fd].noteUse(icapPconnPool);
++    useIcapConnection(conn);
++}
++
++/// react to the availability of a fully-ready ICAP connection
++void
++Adaptation::Icap::Xaction::useIcapConnection(const Comm::ConnectionPointer &conn)
++{
++    assert(!connection);
++    assert(conn);
++    assert(Comm::IsConnOpen(conn));
++    connection = conn;
+     service().noteConnectionUse(connection);
+ 
+-    handleCommConnected();
++    typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommCloseCbParams> CloseDialer;
++    closer =  asyncCall(93, 5, "Adaptation::Icap::Xaction::noteCommClosed",
++                        CloseDialer(this,&Adaptation::Icap::Xaction::noteCommClosed));
++    comm_add_close_handler(connection->fd, closer);
++
++    startShoveling();
+ }
+ 
+ void Adaptation::Icap::Xaction::dieOnConnectionFailure()
+@@ -367,40 +355,25 @@ void Adaptation::Icap::Xaction::noteCommWrote(const CommIoCbParams &io)
+ 
+ // communication timeout with the ICAP service
+ void Adaptation::Icap::Xaction::noteCommTimedout(const CommTimeoutCbParams &)
+-{
+-    handleCommTimedout();
+-}
+-
+-void Adaptation::Icap::Xaction::handleCommTimedout()
+ {
+     debugs(93, 2, HERE << typeName << " failed: timeout with " <<
+            theService->cfg().methodStr() << " " <<
+            theService->cfg().uri << status());
+     reuseConnection = false;
+-    const bool whileConnecting = connector != NULL;
+-    if (whileConnecting) {
+-        assert(!haveConnection());
+-        theService->noteConnectionFailed("timedout");
+-    } else
+-        closeConnection(); // so that late Comm callbacks do not disturb bypass
+-    throw TexcHere(whileConnecting ?
+-                   "timed out while connecting to the ICAP service" :
+-                   "timed out while talking to the ICAP service");
++    assert(haveConnection());
++    closeConnection();
++    throw TextException("timed out while talking to the ICAP service", Here());
+ }
+ 
+ // unexpected connection close while talking to the ICAP service
+ void Adaptation::Icap::Xaction::noteCommClosed(const CommCloseCbParams &)
+ {
+-    if (securer != NULL) {
+-        securer->cancel("Connection closed before SSL negotiation finished");
+-        securer = NULL;
++    if (connection) {
++        connection->noteClosure();
++        connection = nullptr;
+     }
+     closer = NULL;
+-    handleCommClosed();
+-}
+ 
+-void Adaptation::Icap::Xaction::handleCommClosed()
+-{
+     static const auto d = MakeNamedErrorDetail("ICAP_XACT_CLOSE");
+     detailError(d);
+     mustStop("ICAP service connection externally closed");
+@@ -424,7 +397,8 @@ void Adaptation::Icap::Xaction::callEnd()
+ 
+ bool Adaptation::Icap::Xaction::doneAll() const
+ {
+-    return !waitingForDns && !connector && !securer && !reader && !writer &&
++    return !waitingForDns && !transportWait && !encryptionWait &&
++           !reader && !writer &&
+            Adaptation::Initiate::doneAll();
+ }
+ 
+@@ -568,7 +542,7 @@ bool Adaptation::Icap::Xaction::doneWriting() const
+ bool Adaptation::Icap::Xaction::doneWithIo() const
+ {
+     return haveConnection() &&
+-           !connector && !reader && !writer && // fast checks, some redundant
++           !transportWait && !reader && !writer && // fast checks, some redundant
+            doneReading() && doneWriting();
+ }
+ 
+@@ -608,10 +582,7 @@ void Adaptation::Icap::Xaction::setOutcome(const Adaptation::Icap::XactOutcome &
+ void Adaptation::Icap::Xaction::swanSong()
+ {
+     // kids should sing first and then call the parent method.
+-    if (cs.valid()) {
+-        debugs(93,6, HERE << id << " about to notify ConnOpener!");
+-        CallJobHere(93, 3, cs, Comm::ConnOpener, noteAbort);
+-        cs = NULL;
++    if (transportWait || encryptionWait) {
+         service().noteConnectionFailed("abort");
+     }
+ 
+@@ -750,20 +721,12 @@ Ssl::IcapPeerConnector::noteNegotiationDone(ErrorState *error)
+ void
+ Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer)
+ {
+-    Must(securer != NULL);
+-    securer = NULL;
+-
+-    if (closer != NULL) {
+-        if (Comm::IsConnOpen(answer.conn))
+-            comm_remove_close_handler(answer.conn->fd, closer);
+-        else
+-            closer->cancel("securing completed");
+-        closer = NULL;
+-    }
++    encryptionWait.finish();
+ 
++    assert(!answer.tunneled);
+     if (answer.error.get()) {
+-        if (answer.conn != NULL)
+-            answer.conn->close();
++        assert(!answer.conn);
++        // TODO: Refactor dieOnConnectionFailure() to be usable here as well.
+         debugs(93, 2, typeName <<
+                " TLS negotiation to " << service().cfg().uri << " failed");
+         service().noteConnectionFailed("failure");
+@@ -774,8 +737,18 @@ Adaptation::Icap::Xaction::handleSecuredPeer(Security::EncryptorAnswer &answer)
+ 
+     debugs(93, 5, "TLS negotiation to " << service().cfg().uri << " complete");
+ 
+-    service().noteConnectionUse(answer.conn);
++    assert(answer.conn);
++
++    // The socket could get closed while our callback was queued. Sync
++    // Connection. XXX: Connection::fd may already be stale/invalid here.
++    if (answer.conn->isOpen() && fd_table[answer.conn->fd].closing()) {
++        answer.conn->noteClosure();
++        service().noteConnectionFailed("external TLS connection closure");
++        static const auto d = MakeNamedErrorDetail("ICAP_XACT_SSL_CLOSE");
++        detailError(d);
++        throw TexcHere("external closure of the TLS ICAP service connection");
++    }
+ 
+-    handleCommConnected();
++    useIcapConnection(answer.conn);
+ }
+ 
+diff --git a/src/adaptation/icap/Xaction.h b/src/adaptation/icap/Xaction.h
+index b66044594..31a6e22fc 100644
+--- a/src/adaptation/icap/Xaction.h
++++ b/src/adaptation/icap/Xaction.h
+@@ -12,6 +12,7 @@
+ #include "AccessLogEntry.h"
+ #include "adaptation/icap/ServiceRep.h"
+ #include "adaptation/Initiate.h"
++#include "base/JobWait.h"
+ #include "comm/ConnOpener.h"
+ #include "error/forward.h"
+ #include "HttpReply.h"
+@@ -20,6 +21,10 @@
+ 
+ class MemBuf;
+ 
++namespace Ssl {
++class IcapPeerConnector;
++}
++
+ namespace Adaptation
+ {
+ namespace Icap
+@@ -65,12 +70,12 @@ protected:
+     virtual void start();
+     virtual void noteInitiatorAborted(); // TODO: move to Adaptation::Initiate
+ 
++    /// starts sending/receiving ICAP messages
++    virtual void startShoveling() = 0;
++
+     // comm hanndlers; called by comm handler wrappers
+-    virtual void handleCommConnected() = 0;
+     virtual void handleCommWrote(size_t sz) = 0;
+     virtual void handleCommRead(size_t sz) = 0;
+-    virtual void handleCommTimedout();
+-    virtual void handleCommClosed();
+ 
+     void handleSecuredPeer(Security::EncryptorAnswer &answer);
+     /// record error detail if possible
+@@ -78,7 +83,6 @@ protected:
+ 
+     void openConnection();
+     void closeConnection();
+-    void dieOnConnectionFailure();
+     bool haveConnection() const;
+ 
+     void scheduleRead();
+@@ -124,11 +128,13 @@ public:
+     ServiceRep &service();
+ 
+ private:
++    void useTransportConnection(const Comm::ConnectionPointer &);
++    void useIcapConnection(const Comm::ConnectionPointer &);
++    void dieOnConnectionFailure();
+     void tellQueryAborted();
+     void maybeLog();
+ 
+ protected:
+-    Comm::ConnectionPointer connection;     ///< ICAP server connection
+     Adaptation::Icap::ServiceRep::Pointer theService;
+ 
+     SBuf readBuf;
+@@ -139,13 +145,8 @@ protected:
+     bool ignoreLastWrite;
+     bool waitingForDns; ///< expecting a ipcache_nbgethostbyname() callback
+ 
+-    const char *stopReason;
+-
+-    // active (pending) comm callbacks for the ICAP server connection
+-    AsyncCall::Pointer connector;
+     AsyncCall::Pointer reader;
+     AsyncCall::Pointer writer;
+-    AsyncCall::Pointer closer;
+ 
+     AccessLogEntry::Pointer alep; ///< icap.log entry
+     AccessLogEntry &al; ///< short for *alep
+@@ -155,8 +156,16 @@ protected:
+     timeval icap_tio_finish;   /*time when the last byte of the ICAP responsewas received*/
+ 
+ private:
+-    Comm::ConnOpener::Pointer cs;
+-    AsyncCall::Pointer securer; ///< whether we are securing a connection
++    /// waits for a transport connection to the ICAP server to be established/opened
++    JobWait<Comm::ConnOpener> transportWait;
++
++    /// waits for the established transport connection to be secured/encrypted
++    JobWait<Ssl::IcapPeerConnector> encryptionWait;
++
++    /// open and, if necessary, secured connection to the ICAP server (or nil)
++    Comm::ConnectionPointer connection;
++
++    AsyncCall::Pointer closer;
+ };
+ 
+ } // namespace Icap
+diff --git a/src/base/AsyncJob.cc b/src/base/AsyncJob.cc
+index 3b9161e4a..111667d5d 100644
+--- a/src/base/AsyncJob.cc
++++ b/src/base/AsyncJob.cc
+@@ -20,11 +20,11 @@
+ 
+ InstanceIdDefinitions(AsyncJob, "job");
+ 
+-AsyncJob::Pointer AsyncJob::Start(AsyncJob *j)
++void
++AsyncJob::Start(const Pointer &job)
+ {
+-    AsyncJob::Pointer job(j);
+     CallJobHere(93, 5, job, AsyncJob, start);
+-    return job;
++    job->started_ = true; // it is the attempt that counts
+ }
+ 
+ AsyncJob::AsyncJob(const char *aTypeName) :
+@@ -38,6 +38,7 @@ AsyncJob::~AsyncJob()
+ {
+     debugs(93,5, "AsyncJob destructed, this=" << this <<
+            " type=" << typeName << " [" << id << ']');
++    assert(!started_ || swanSang_);
+ }
+ 
+ void AsyncJob::start()
+@@ -141,9 +142,16 @@ void AsyncJob::callEnd()
+         AsyncCall::Pointer inCallSaved = inCall;
+         void *thisSaved = this;
+ 
++        // TODO: Swallow swanSong() exceptions to reduce memory leaks.
++
++        // Job callback invariant: swanSong() is (only) called for started jobs.
++        // Here to detect violations in kids that forgot to call our swanSong().
++        assert(started_);
++
++        swanSang_ = true; // it is the attempt that counts
+         swanSong();
+ 
+-        delete this; // this is the only place where the object is deleted
++        delete this; // this is the only place where a started job is deleted
+ 
+         // careful: this object does not exist any more
+         debugs(93, 6, HERE << *inCallSaved << " ended " << thisSaved);
+diff --git a/src/base/AsyncJob.h b/src/base/AsyncJob.h
+index 4d685dff5..a46e30071 100644
+--- a/src/base/AsyncJob.h
++++ b/src/base/AsyncJob.h
+@@ -36,8 +36,13 @@ public:
+ public:
+     AsyncJob(const char *aTypeName);
+ 
+-    /// starts a freshly created job (i.e., makes the job asynchronous)
+-    static Pointer Start(AsyncJob *job);
++    /// Promises to start the configured job (eventually). The job is deemed to
++    /// be running asynchronously beyond this point, so the caller should only
++    /// access the job object via AsyncCalls rather than directly.
++    ///
++    /// swanSong() is only called for jobs for which this method has returned
++    /// successfully (i.e. without throwing).
++    static void Start(const Pointer &job);
+ 
+ protected:
+     // XXX: temporary method to replace "delete this" in jobs-in-transition.
+@@ -62,6 +67,11 @@ public:
+     /// called when the job throws during an async call
+     virtual void callException(const std::exception &e);
+ 
++    /// process external request to terminate now (i.e. during this async call)
++    void handleStopRequest() { mustStop("externally aborted"); }
++
++    const InstanceId<AsyncJob> id; ///< job identifier
++
+ protected:
+     // external destruction prohibited to ensure swanSong() is called
+     virtual ~AsyncJob();
+@@ -69,7 +79,9 @@ protected:
+     const char *stopReason; ///< reason for forcing done() to be true
+     const char *typeName; ///< kid (leaf) class name, for debugging
+     AsyncCall::Pointer inCall; ///< the asynchronous call being handled, if any
+-    const InstanceId<AsyncJob> id; ///< job identifier
++
++    bool started_ = false; ///< Start() has finished successfully
++    bool swanSang_ = false; ///< swanSong() was called
+ };
+ 
+ #endif /* SQUID_ASYNC_JOB_H */
+diff --git a/src/base/JobWait.cc b/src/base/JobWait.cc
+new file mode 100644
+index 000000000..ba6832415
+--- /dev/null
++++ b/src/base/JobWait.cc
+@@ -0,0 +1,81 @@
++/*
++ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
++ *
++ * Squid software is distributed under GPLv2+ license and includes
++ * contributions from numerous individuals and organizations.
++ * Please see the COPYING and CONTRIBUTORS files for details.
++ */
++
++#include "squid.h"
++#include "base/AsyncJobCalls.h"
++#include "base/JobWait.h"
++
++#include <cassert>
++#include <iostream>
++
++JobWaitBase::JobWaitBase() = default;
++
++JobWaitBase::~JobWaitBase()
++{
++    cancel("owner gone");
++}
++
++void
++JobWaitBase::start_(const AsyncJob::Pointer aJob, const AsyncCall::Pointer aCall)
++{
++    // Invariant: The wait will be over. We cannot guarantee that the job will
++    // call the callback, of course, but we can check these prerequisites.
++    assert(aCall);
++    assert(aJob.valid());
++
++    // "Double" waiting state leads to conflicting/mismatching callbacks
++    // detailed in finish(). Detect that bug ASAP.
++    assert(!waiting());
++
++    assert(!callback_);
++    assert(!job_);
++    callback_ = aCall;
++    job_ = aJob;
++
++    AsyncJob::Start(job_.get());
++}
++
++void
++JobWaitBase::finish()
++{
++    // Unexpected callbacks might result in disasters like secrets exposure,
++    // data corruption, or expensive message routing mistakes when the callback
++    // info is applied to the wrong message part or acted upon prematurely.
++    assert(waiting());
++    clear();
++}
++
++void
++JobWaitBase::cancel(const char *reason)
++{
++    if (callback_) {
++        callback_->cancel(reason);
++
++        // Instead of AsyncJob, the class parameter could be Job. That would
++        // avoid runtime child-to-parent CbcPointer conversion overheads, but
++        // complicate support for Jobs with virtual AsyncJob bases (GCC error:
++        // "pointer to member conversion via virtual base AsyncJob") and also
++        // cache-log "Job::handleStopRequest()" with a non-existent class name.
++        CallJobHere(callback_->debugSection, callback_->debugLevel, job_, AsyncJob, handleStopRequest);
++
++        clear();
++    }
++}
++
++void
++JobWaitBase::print(std::ostream &os) const
++{
++    // use a backarrow to emphasize that this is a callback: call24<-job6
++    if (callback_)
++        os << callback_->id << "<-";
++    if (const auto rawJob = job_.get())
++        os << rawJob->id;
++    else
++        os << job_; // raw pointer of a gone job may still be useful for triage
++}
++
+diff --git a/src/base/JobWait.h b/src/base/JobWait.h
+new file mode 100644
+index 000000000..6b1131331
+--- /dev/null
++++ b/src/base/JobWait.h
+@@ -0,0 +1,91 @@
++/*
++ * Copyright (C) 1996-2021 The Squid Software Foundation and contributors
++ *
++ * Squid software is distributed under GPLv2+ license and includes
++ * contributions from numerous individuals and organizations.
++ * Please see the COPYING and CONTRIBUTORS files for details.
++ */
++
++#ifndef SQUID_BASE_JOBWAIT_H
++#define SQUID_BASE_JOBWAIT_H
++
++#include "base/AsyncJob.h"
++#include "base/CbcPointer.h"
++
++#include <iosfwd>
++
++/// Manages waiting for an AsyncJob callback. Use type-safe JobWait instead.
++/// This base class does not contain code specific to the actual Job type.
++class JobWaitBase
++{
++public:
++    JobWaitBase();
++    ~JobWaitBase();
++
++    /// no copying of any kind: each waiting context needs a dedicated AsyncCall
++    JobWaitBase(JobWaitBase &&) = delete;
++
++    explicit operator bool() const { return waiting(); }
++
++    /// whether we are currently waiting for the job to call us back
++    /// the job itself may be gone even if this returns true
++    bool waiting() const { return bool(callback_); }
++
++    /// ends wait (after receiving the call back)
++    /// forgets the job which is likely to be gone by now
++    void finish();
++
++    /// aborts wait (if any) before receiving the call back
++    /// does nothing if we are not waiting
++    void cancel(const char *reason);
++
++    /// summarizes what we are waiting for (for debugging)
++    void print(std::ostream &) const;
++
++protected:
++    /// starts waiting for the given job to call the given callback
++    void start_(AsyncJob::Pointer, AsyncCall::Pointer);
++
++private:
++    /// the common part of finish() and cancel()
++    void clear() { job_.clear(); callback_ = nullptr; }
++
++    /// the job that we are waiting to call us back (or nil)
++    AsyncJob::Pointer job_;
++
++    /// the call we are waiting for the job_ to make (or nil)
++    AsyncCall::Pointer callback_;
++};
++
++/// Manages waiting for an AsyncJob callback.
++/// Completes JobWaitBase by providing Job type-specific members.
++template <class Job>
++class JobWait: public JobWaitBase
++{
++public:
++    typedef CbcPointer<Job> JobPointer;
++
++    /// starts waiting for the given job to call the given callback
++    void start(const JobPointer &aJob, const AsyncCall::Pointer &aCallback) {
++        start_(aJob, aCallback);
++        typedJob_ = aJob;
++    }
++
++    /// \returns a cbdata pointer to the job we are waiting for (or nil)
++    /// the returned pointer may be falsy, even if we are still waiting()
++    JobPointer job() const { return waiting() ? typedJob_ : nullptr; }
++
++private:
++    /// nearly duplicates JobWaitBase::job_ but exposes the actual job type
++    JobPointer typedJob_;
++};
++
++inline
++std::ostream &operator <<(std::ostream &os, const JobWaitBase &wait)
++{
++    wait.print(os);
++    return os;
++}
++
++#endif /* SQUID_BASE_JOBWAIT_H */
++
+diff --git a/src/base/Makefile.am b/src/base/Makefile.am
+index c9564d755..374ea8798 100644
+--- a/src/base/Makefile.am
++++ b/src/base/Makefile.am
+@@ -34,6 +34,8 @@ libbase_la_SOURCES = \
+ 	Here.h \
+ 	InstanceId.cc \
+ 	InstanceId.h \
++	JobWait.cc \
++	JobWait.h \
+ 	Lock.h \
+ 	LookupTable.h \
+ 	LruMap.h \
+diff --git a/src/base/forward.h b/src/base/forward.h
+index 3803e1ea0..46de97c8d 100644
+--- a/src/base/forward.h
++++ b/src/base/forward.h
+@@ -17,6 +17,7 @@ class ScopedId;
+ 
+ template<class Cbc> class CbcPointer;
+ template<class RefCountableKid> class RefCount;
++template<class Job> class JobWait;
+ 
+ typedef CbcPointer<AsyncJob> AsyncJobPointer;
+ typedef RefCount<CodeContext> CodeContextPointer;
+diff --git a/src/client_side.cc b/src/client_side.cc
+index b8d786423..4eb697696 100644
+--- a/src/client_side.cc
++++ b/src/client_side.cc
+@@ -503,6 +503,10 @@ httpRequestFree(void *data)
+ /* This is a handler normally called by comm_close() */
+ void ConnStateData::connStateClosed(const CommCloseCbParams &)
+ {
++    if (clientConnection) {
++        clientConnection->noteClosure();
++        // keep closed clientConnection for logging, clientdb cleanup, etc.
++    }
+     deleteThis("ConnStateData::connStateClosed");
+ }
+ 
+diff --git a/src/clients/FtpClient.cc b/src/clients/FtpClient.cc
+index 7874db121..afe7827dc 100644
+--- a/src/clients/FtpClient.cc
++++ b/src/clients/FtpClient.cc
+@@ -201,10 +201,6 @@ Ftp::Client::Client(FwdState *fwdState):
+ 
+ Ftp::Client::~Client()
+ {
+-    if (data.opener != NULL) {
+-        data.opener->cancel("Ftp::Client destructed");
+-        data.opener = NULL;
+-    }
+     data.close();
+ 
+     safe_free(old_request);
+@@ -786,10 +782,10 @@ Ftp::Client::connectDataChannel()
+     debugs(9, 3, "connecting to " << conn->remote);
+ 
+     typedef CommCbMemFunT<Client, CommConnectCbParams> Dialer;
+-    data.opener = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected);
+-    Comm::ConnOpener *cs = new Comm::ConnOpener(conn, data.opener, Config.Timeout.connect);
++    AsyncCall::Pointer callback = JobCallback(9, 3, Dialer, this, Ftp::Client::dataChannelConnected);
++    const auto cs = new Comm::ConnOpener(conn, callback, Config.Timeout.connect);
+     cs->setHost(data.host);
+-    AsyncJob::Start(cs);
++    dataConnWait.start(cs, callback);
+ }
+ 
+ bool
+@@ -811,10 +807,11 @@ void
+ Ftp::Client::dataClosed(const CommCloseCbParams &)
+ {
+     debugs(9, 4, status());
++    if (data.conn)
++        data.conn->noteClosure();
+     if (data.listenConn != NULL) {
+         data.listenConn->close();
+         data.listenConn = NULL;
+-        // NP clear() does the: data.fd = -1;
+     }
+     data.clear();
+ }
+@@ -879,6 +876,8 @@ void
+ Ftp::Client::ctrlClosed(const CommCloseCbParams &)
+ {
+     debugs(9, 4, status());
++    if (ctrl.conn)
++        ctrl.conn->noteClosure();
+     ctrl.clear();
+     doneWithFwd = "ctrlClosed()"; // assume FwdState is monitoring too
+     mustStop("Ftp::Client::ctrlClosed");
+diff --git a/src/clients/FtpClient.h b/src/clients/FtpClient.h
+index ac8d22e18..4b2dd61d5 100644
+--- a/src/clients/FtpClient.h
++++ b/src/clients/FtpClient.h
+@@ -64,7 +64,6 @@ public:
+      */
+     Comm::ConnectionPointer listenConn;
+ 
+-    AsyncCall::Pointer opener; ///< Comm opener handler callback.
+ private:
+     AsyncCall::Pointer closer; ///< Comm close handler callback
+ };
+@@ -205,6 +204,10 @@ protected:
+     virtual void sentRequestBody(const CommIoCbParams &io);
+     virtual void doneSendingRequestBody();
+ 
++    /// Waits for an FTP data connection to the server to be established/opened.
++    /// This wait only happens in FTP passive mode (via PASV or EPSV).
++    JobWait<Comm::ConnOpener> dataConnWait;
++
+ private:
+     bool parseControlReply(size_t &bytesUsed);
+ 
+diff --git a/src/clients/FtpGateway.cc b/src/clients/FtpGateway.cc
+index 6a7d139ca..aeeaedb48 100644
+--- a/src/clients/FtpGateway.cc
++++ b/src/clients/FtpGateway.cc
+@@ -486,11 +486,9 @@ Ftp::Gateway::timeout(const CommTimeoutCbParams &io)
+         flags.pasv_supported = false;
+         debugs(9, DBG_IMPORTANT, "FTP Gateway timeout in SENT_PASV state");
+ 
+-        // cancel the data connection setup.
+-        if (data.opener != NULL) {
+-            data.opener->cancel("timeout");
+-            data.opener = NULL;
+-        }
++        // cancel the data connection setup, if any
++        dataConnWait.cancel("timeout");
++
+         data.close();
+     }
+ 
+@@ -1723,7 +1721,7 @@ void
+ Ftp::Gateway::dataChannelConnected(const CommConnectCbParams &io)
+ {
+     debugs(9, 3, HERE);
+-    data.opener = NULL;
++    dataConnWait.finish();
+ 
+     if (io.flag != Comm::OK) {
+         debugs(9, 2, HERE << "Failed to connect. Retrying via another method.");
+@@ -2727,9 +2725,9 @@ Ftp::Gateway::mayReadVirginReplyBody() const
+     return !doneWithServer();
+ }
+ 
+-AsyncJob::Pointer
++void
+ Ftp::StartGateway(FwdState *const fwdState)
+ {
+-    return AsyncJob::Start(new Ftp::Gateway(fwdState));
++    AsyncJob::Start(new Ftp::Gateway(fwdState));
+ }
+ 
+diff --git a/src/clients/FtpRelay.cc b/src/clients/FtpRelay.cc
+index 8796a74c8..ce6ba3ba6 100644
+--- a/src/clients/FtpRelay.cc
++++ b/src/clients/FtpRelay.cc
+@@ -739,7 +739,7 @@ void
+ Ftp::Relay::dataChannelConnected(const CommConnectCbParams &io)
+ {
+     debugs(9, 3, status());
+-    data.opener = NULL;
++    dataConnWait.finish();
+ 
+     if (io.flag != Comm::OK) {
+         debugs(9, 2, "failed to connect FTP server data channel");
+@@ -804,9 +804,9 @@ Ftp::Relay::HandleStoreAbort(Relay *ftpClient)
+         ftpClient->dataComplete();
+ }
+ 
+-AsyncJob::Pointer
++void
+ Ftp::StartRelay(FwdState *const fwdState)
+ {
+-    return AsyncJob::Start(new Ftp::Relay(fwdState));
++    AsyncJob::Start(new Ftp::Relay(fwdState));
+ }
+ 
+diff --git a/src/clients/HttpTunneler.cc b/src/clients/HttpTunneler.cc
+index 023a8586c..67a52e662 100644
+--- a/src/clients/HttpTunneler.cc
++++ b/src/clients/HttpTunneler.cc
+@@ -101,6 +101,11 @@ void
+ Http::Tunneler::handleConnectionClosure(const CommCloseCbParams &params)
+ {
+     closer = nullptr;
++    if (connection) {
++        countFailingConnection();
++        connection->noteClosure();
++        connection = nullptr;
++    }
+     bailWith(new ErrorState(ERR_CONNECT_FAIL, Http::scBadGateway, request.getRaw(), al));
+ }
+ 
+@@ -360,50 +365,64 @@ Http::Tunneler::bailWith(ErrorState *error)
+     Must(error);
+     answer().squidError = error;
+ 
+-    if (const auto p = connection->getPeer())
+-        peerConnectFailed(p);
++    if (const auto failingConnection = connection) {
++        // TODO: Reuse to-peer connections after a CONNECT error response.
++        countFailingConnection();
++        disconnect();
++        failingConnection->close();
++    }
+ 
+     callBack();
+-    disconnect();
+-
+-    if (noteFwdPconnUse)
+-        fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses);
+-    // TODO: Reuse to-peer connections after a CONNECT error response.
+-    connection->close();
+-    connection = nullptr;
+ }
+ 
+ void
+ Http::Tunneler::sendSuccess()
+ {
+     assert(answer().positive());
+-    callBack();
++    assert(Comm::IsConnOpen(connection));
++    answer().conn = connection;
+     disconnect();
++    callBack();
++}
++
++void
++Http::Tunneler::countFailingConnection()
++{
++    assert(connection);
++    if (const auto p = connection->getPeer())
++        peerConnectFailed(p);
++    if (noteFwdPconnUse && connection->isOpen())
++        fwdPconnPool->noteUses(fd_table[connection->fd].pconn.uses);
+ }
+ 
+ void
+ Http::Tunneler::disconnect()
+ {
++    const auto stillOpen = Comm::IsConnOpen(connection);
++
+     if (closer) {
+-        comm_remove_close_handler(connection->fd, closer);
++        if (stillOpen)
++            comm_remove_close_handler(connection->fd, closer);
+         closer = nullptr;
+     }
+ 
+     if (reader) {
+-        Comm::ReadCancel(connection->fd, reader);
++        if (stillOpen)
++            Comm::ReadCancel(connection->fd, reader);
+         reader = nullptr;
+     }
+ 
+-    // remove connection timeout handler
+-    commUnsetConnTimeout(connection);
++    if (stillOpen)
++        commUnsetConnTimeout(connection);
++
++    connection = nullptr; // may still be open
+ }
+ 
+ void
+ Http::Tunneler::callBack()
+ {
+-    debugs(83, 5, connection << status());
+-    if (answer().positive())
+-        answer().conn = connection;
++    debugs(83, 5, answer().conn << status());
++    assert(!connection); // returned inside answer() or gone
+     auto cb = callback;
+     callback = nullptr;
+     ScheduleCallHere(cb);
+@@ -415,11 +434,10 @@ Http::Tunneler::swanSong()
+     AsyncJob::swanSong();
+ 
+     if (callback) {
+-        if (requestWritten && tunnelEstablished) {
++        if (requestWritten && tunnelEstablished && Comm::IsConnOpen(connection)) {
+             sendSuccess();
+         } else {
+-            // we should have bailed when we discovered the job-killing problem
+-            debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while establishing a CONNECT tunnel " << connection << status());
++            // job-ending emergencies like handleStopRequest() or callException()
+             bailWith(new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al));
+         }
+         assert(!callback);
+diff --git a/src/clients/HttpTunneler.h b/src/clients/HttpTunneler.h
+index c4c096fe7..e88d17610 100644
+--- a/src/clients/HttpTunneler.h
++++ b/src/clients/HttpTunneler.h
+@@ -87,6 +87,7 @@ protected:
+     void handleResponse(const bool eof);
+     void bailOnResponseError(const char *error, HttpReply *);
+ 
++private:
+     /// sends the given error to the initiator
+     void bailWith(ErrorState*);
+ 
+@@ -96,12 +97,14 @@ protected:
+     /// a bailWith(), sendSuccess() helper: sends results to the initiator
+     void callBack();
+ 
+-    /// a bailWith(), sendSuccess() helper: stops monitoring the connection
++    /// stops monitoring the connection
+     void disconnect();
+ 
++    /// updates connection usage history before the connection is closed
++    void countFailingConnection();
++
+     TunnelerAnswer &answer();
+ 
+-private:
+     AsyncCall::Pointer writer; ///< called when the request has been written
+     AsyncCall::Pointer reader; ///< called when the response should be read
+     AsyncCall::Pointer closer; ///< called when the connection is being closed
+diff --git a/src/clients/forward.h b/src/clients/forward.h
+index 0351557da..82e5eb9bd 100644
+--- a/src/clients/forward.h
++++ b/src/clients/forward.h
+@@ -28,10 +28,10 @@ namespace Ftp
+ {
+ 
+ /// A new FTP Gateway job
+-AsyncJobPointer StartGateway(FwdState *const fwdState);
++void StartGateway(FwdState *const fwdState);
+ 
+ /// A new FTP Relay job
+-AsyncJobPointer StartRelay(FwdState *const fwdState);
++void StartRelay(FwdState *const fwdState);
+ 
+ /** Construct an URI with leading / in PATH portion for use by CWD command
+  *  possibly others. FTP encodes absolute paths as beginning with '/'
+diff --git a/src/comm.cc b/src/comm.cc
+index 54ba16271..a0913f324 100644
+--- a/src/comm.cc
++++ b/src/comm.cc
+@@ -743,6 +743,10 @@ commCallCloseHandlers(int fd)
+         // If call is not canceled schedule it for execution else ignore it
+         if (!call->canceled()) {
+             debugs(5, 5, "commCallCloseHandlers: ch->handler=" << call);
++            // XXX: Without the following code, callback fd may be -1.
++            // typedef CommCloseCbParams Params;
++            // auto &params = GetCommParams<Params>(call);
++            // params.fd = fd;
+             ScheduleCallHere(call);
+         }
+     }
+@@ -1787,6 +1791,10 @@ DeferredReadManager::CloseHandler(const CommCloseCbParams &params)
+     CbDataList<DeferredRead> *temp = (CbDataList<DeferredRead> *)params.data;
+ 
+     temp->element.closer = NULL;
++    if (temp->element.theRead.conn) {
++        temp->element.theRead.conn->noteClosure();
++        temp->element.theRead.conn = nullptr;
++    }
+     temp->element.markCancelled();
+ }
+ 
+@@ -1860,6 +1868,11 @@ DeferredReadManager::kickARead(DeferredRead const &aRead)
+     if (aRead.cancelled)
+         return;
+ 
++    // TODO: This check still allows theReader call with a closed theRead.conn.
++    // If a delayRead() caller has a close connection handler, then such a call
++    // would be useless and dangerous. If a delayRead() caller does not have it,
++    // then the caller will get stuck when an external connection closure makes
++    // aRead.cancelled (checked above) true.
+     if (Comm::IsConnOpen(aRead.theRead.conn) && fd_table[aRead.theRead.conn->fd].closing())
+         return;
+ 
+diff --git a/src/comm/ConnOpener.cc b/src/comm/ConnOpener.cc
+index dc376b778..19c1237ae 100644
+--- a/src/comm/ConnOpener.cc
++++ b/src/comm/ConnOpener.cc
+@@ -41,6 +41,14 @@ Comm::ConnOpener::ConnOpener(const Comm::ConnectionPointer &c, const AsyncCall::
+     deadline_(squid_curtime + static_cast<time_t>(ctimeout))
+ {
+     debugs(5, 3, "will connect to " << c << " with " << ctimeout << " timeout");
++    assert(conn_); // we know where to go
++
++    // Sharing a being-modified Connection object with the caller is dangerous,
++    // but we cannot ban (or even check for) that using existing APIs. We do not
++    // want to clone "just in case" because cloning is a bit expensive, and most
++    // callers already have a non-owned Connection object to give us. Until the
++    // APIs improve, we can only check that the connection is not open.
++    assert(!conn_->isOpen());
+ }
+ 
+ Comm::ConnOpener::~ConnOpener()
+@@ -78,6 +86,10 @@ Comm::ConnOpener::swanSong()
+     if (temporaryFd_ >= 0)
+         closeFd();
+ 
++    // did we abort while owning an open connection?
++    if (conn_ && conn_->isOpen())
++        conn_->close();
++
+     // did we abort while waiting between retries?
+     if (calls_.sleep_)
+         cancelSleep();
+@@ -131,9 +143,18 @@ Comm::ConnOpener::sendAnswer(Comm::Flag errFlag, int xerrno, const char *why)
+                    " [" << callback_->id << ']' );
+             // TODO save the pconn to the pconnPool ?
+         } else {
++            assert(conn_);
++
++            // free resources earlier and simplify recipients
++            if (errFlag != Comm::OK)
++                conn_->close(); // may not be opened
++            else
++                assert(conn_->isOpen());
++
+             typedef CommConnectCbParams Params;
+             Params &params = GetCommParams<Params>(callback_);
+             params.conn = conn_;
++            conn_ = nullptr; // release ownership; prevent closure by us
+             params.flag = errFlag;
+             params.xerrno = xerrno;
+             ScheduleCallHere(callback_);
+@@ -152,7 +173,7 @@ Comm::ConnOpener::sendAnswer(Comm::Flag errFlag, int xerrno, const char *why)
+ void
+ Comm::ConnOpener::cleanFd()
+ {
+-    debugs(5, 4, HERE << conn_ << " closing temp FD " << temporaryFd_);
++    debugs(5, 4, conn_ << "; temp FD " << temporaryFd_);
+ 
+     Must(temporaryFd_ >= 0);
+     fde &f = fd_table[temporaryFd_];
+@@ -258,6 +279,7 @@ bool
+ Comm::ConnOpener::createFd()
+ {
+     Must(temporaryFd_ < 0);
++    assert(conn_);
+ 
+     // our initators signal abort by cancelling their callbacks
+     if (callback_ == NULL || callback_->canceled())
+diff --git a/src/comm/ConnOpener.h b/src/comm/ConnOpener.h
+index 329d8a3c8..e1e60649d 100644
+--- a/src/comm/ConnOpener.h
++++ b/src/comm/ConnOpener.h
+@@ -19,16 +19,13 @@
+ namespace Comm
+ {
+ 
+-/**
+- * Async-opener of a Comm connection.
+- */
++/// Asynchronously opens a TCP connection. Returns CommConnectCbParams: either
++/// Comm::OK with an open connection or another Comm::Flag with a closed one.
+ class ConnOpener : public AsyncJob
+ {
+     CBDATA_CLASS(ConnOpener);
+ 
+ public:
+-    void noteAbort() { mustStop("externally aborted"); }
+-
+     typedef CbcPointer<ConnOpener> Pointer;
+ 
+     virtual bool doneAll() const;
+diff --git a/src/comm/Connection.cc b/src/comm/Connection.cc
+index 4e9719a01..9bc08da05 100644
+--- a/src/comm/Connection.cc
++++ b/src/comm/Connection.cc
+@@ -7,6 +7,7 @@
+  */
+ 
+ #include "squid.h"
++#include "base/JobWait.h"
+ #include "CachePeer.h"
+ #include "cbdata.h"
+ #include "comm.h"
+@@ -60,26 +61,44 @@ Comm::Connection::~Connection()
+ }
+ 
+ Comm::ConnectionPointer
+-Comm::Connection::cloneDestinationDetails() const
++Comm::Connection::cloneProfile() const
+ {
+-    const ConnectionPointer c = new Comm::Connection;
+-    c->setAddrs(local, remote);
+-    c->peerType = peerType;
+-    c->flags = flags;
+-    c->peer_ = cbdataReference(getPeer());
+-    assert(!c->isOpen());
+-    return c;
+-}
++    const ConnectionPointer clone = new Comm::Connection;
++    auto &c = *clone; // optimization
++
++    /*
++     * Copy or excuse each data member. Excused members do not belong to a
++     * Connection configuration profile because their values cannot be reused
++     * across (co-existing) Connection objects and/or are tied to their own
++     * object lifetime.
++     */
++
++    c.setAddrs(local, remote);
++    c.peerType = peerType;
++    // fd excused
++    c.tos = tos;
++    c.nfmark = nfmark;
++    c.nfConnmark = nfConnmark;
++    // COMM_ORPHANED is not a part of connection opening instructions
++    c.flags = flags & ~COMM_ORPHANED;
++    // rfc931 is excused
++
++#if USE_SQUID_EUI
++    // These are currently only set when accepting connections and never used
++    // for establishing new ones, so this copying is currently in vain, but,
++    // technically, they can be a part of connection opening instructions.
++    c.remoteEui48 = remoteEui48;
++    c.remoteEui64 = remoteEui64;
++#endif
+ 
+-Comm::ConnectionPointer
+-Comm::Connection::cloneIdentDetails() const
+-{
+-    auto c = cloneDestinationDetails();
+-    c->tos = tos;
+-    c->nfmark = nfmark;
+-    c->nfConnmark = nfConnmark;
+-    c->startTime_ = startTime_;
+-    return c;
++    // id excused
++    c.peer_ = cbdataReference(getPeer());
++    // startTime_ excused
++    // tlsHistory excused
++
++    debugs(5, 5, this << " made " << c);
++    assert(!c.isOpen());
++    return clone;
+ }
+ 
+ void
+diff --git a/src/comm/Connection.h b/src/comm/Connection.h
+index 5b66943ba..40c22491d 100644
+--- a/src/comm/Connection.h
++++ b/src/comm/Connection.h
+@@ -77,13 +77,12 @@ public:
+     /** Clear the connection properties and close any open socket. */
+     virtual ~Connection();
+ 
+-    /// Create a new (closed) IDENT Connection object based on our from-Squid
+-    /// connection properties.
+-    ConnectionPointer cloneIdentDetails() const;
++    /// To prevent accidental copying of Connection objects that we started to
++    /// open or that are open, use cloneProfile() instead.
++    Connection(const Connection &&) = delete;
+ 
+-    /// Create a new (closed) Connection object pointing to the same destination
+-    /// as this from-Squid connection.
+-    ConnectionPointer cloneDestinationDetails() const;
++    /// Create a new closed Connection with the same configuration as this one.
++    ConnectionPointer cloneProfile() const;
+ 
+     /// close the still-open connection when its last reference is gone
+     void enterOrphanage() { flags |= COMM_ORPHANED; }
+@@ -140,17 +139,6 @@ public:
+     virtual ScopedId codeContextGist() const override;
+     virtual std::ostream &detailCodeContext(std::ostream &os) const override;
+ 
+-private:
+-    /** These objects may not be exactly duplicated. Use cloneIdentDetails() or
+-     * cloneDestinationDetails() instead.
+-     */
+-    Connection(const Connection &c);
+-
+-    /** These objects may not be exactly duplicated. Use cloneIdentDetails() or
+-     * cloneDestinationDetails() instead.
+-     */
+-    Connection & operator =(const Connection &c);
+-
+ public:
+     /** Address/Port for the Squid end of a TCP link. */
+     Ip::Address local;
+diff --git a/src/comm/TcpAcceptor.cc b/src/comm/TcpAcceptor.cc
+index 341622e0f..510efa94f 100644
+--- a/src/comm/TcpAcceptor.cc
++++ b/src/comm/TcpAcceptor.cc
+@@ -206,7 +206,10 @@ void
+ Comm::TcpAcceptor::handleClosure(const CommCloseCbParams &)
+ {
+     closer_ = NULL;
+-    conn = NULL;
++    if (conn) {
++        conn->noteClosure();
++        conn = nullptr;
++    }
+     Must(done());
+ }
+ 
+diff --git a/src/dns_internal.cc b/src/dns_internal.cc
+index e1f4d3ee7..72078c64e 100644
+--- a/src/dns_internal.cc
++++ b/src/dns_internal.cc
+@@ -872,6 +872,10 @@ static void
+ idnsVCClosed(const CommCloseCbParams &params)
+ {
+     nsvc * vc = (nsvc *)params.data;
++    if (vc->conn) {
++        vc->conn->noteClosure();
++        vc->conn = nullptr;
++    }
+     delete vc;
+ }
+ 
+diff --git a/src/eui/Eui48.h b/src/eui/Eui48.h
+index 11f4e51b1..9aab5bbb7 100644
+--- a/src/eui/Eui48.h
++++ b/src/eui/Eui48.h
+@@ -29,10 +29,8 @@ class Eui48
+ 
+ public:
+     Eui48() { clear(); }
+-    Eui48(const Eui48 &t) { memcpy(this, &t, sizeof(Eui48)); }
+     bool operator== (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) == 0; }
+     bool operator< (const Eui48 &t) const { return memcmp(eui, t.eui, SZ_EUI48_BUF) < 0; }
+-    ~Eui48() {}
+ 
+     const unsigned char *get(void);
+ 
+diff --git a/src/gopher.cc b/src/gopher.cc
+index 455220c2e..576a3f7b1 100644
+--- a/src/gopher.cc
++++ b/src/gopher.cc
+@@ -98,11 +98,8 @@ public:
+         entry->lock("gopherState");
+         *replybuf = 0;
+     }
+-    ~GopherStateData() {if(buf) swanSong();}
+ 
+-    /* AsyncJob API emulated */
+-    void deleteThis(const char *aReason);
+-    void swanSong();
++    ~GopherStateData();
+ 
+ public:
+     StoreEntry *entry;
+@@ -156,30 +153,18 @@ static void
+ gopherStateFree(const CommCloseCbParams &params)
+ {
+     GopherStateData *gopherState = (GopherStateData *)params.data;
+-
+-    if (gopherState == NULL)
+-        return;
+-
+-    gopherState->deleteThis("gopherStateFree");
++    // Assume that FwdState is monitoring and calls noteClosure(). See XXX about
++    // Connection sharing with FwdState in gopherStart().
++    delete gopherState;
+ }
+ 
+-void
+-GopherStateData::deleteThis(const char *)
+-{
+-    swanSong();
+-    delete this;
+-}
+-
+-void
+-GopherStateData::swanSong()
++GopherStateData::~GopherStateData()
+ {
+     if (entry)
+         entry->unlock("gopherState");
+ 
+-    if (buf) {
++    if (buf)
+         memFree(buf, MEM_4K_BUF);
+-        buf = nullptr;
+-    }
+ }
+ 
+ /**
+@@ -986,6 +971,7 @@ gopherStart(FwdState * fwd)
+         return;
+     }
+ 
++    // XXX: Sharing open Connection with FwdState that has its own handlers/etc.
+     gopherState->serverConn = fwd->serverConnection();
+     gopherSendRequest(fwd->serverConnection()->fd, gopherState);
+     AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "gopherTimeout",
+diff --git a/src/ident/Ident.cc b/src/ident/Ident.cc
+index 50dc6baeb..3c3ae5e2f 100644
+--- a/src/ident/Ident.cc
++++ b/src/ident/Ident.cc
+@@ -11,6 +11,7 @@
+ #include "squid.h"
+ 
+ #if USE_IDENT
++#include "base/JobWait.h"
+ #include "comm.h"
+ #include "comm/Connection.h"
+ #include "comm/ConnOpener.h"
+@@ -53,8 +54,15 @@ public:
+ 
+     Comm::ConnectionPointer conn;
+     MemBuf queryMsg;  ///< the lookup message sent to IDENT server
+-    IdentClient *clients;
++    IdentClient *clients = nullptr;
+     char buf[IDENT_BUFSIZE];
++
++    /// waits for a connection to the IDENT server to be established/opened
++    JobWait<Comm::ConnOpener> connWait;
++
++private:
++    // use deleteThis() to destroy
++    ~IdentStateData();
+ };
+ 
+ CBDATA_CLASS_INIT(IdentStateData);
+@@ -73,8 +81,9 @@ static void ClientAdd(IdentStateData * state, IDCB * callback, void *callback_da
+ Ident::IdentConfig Ident::TheConfig;
+ 
+ void
+-Ident::IdentStateData::deleteThis(const char *)
++Ident::IdentStateData::deleteThis(const char *reason)
+ {
++    debugs(30, 3, reason);
+     swanSong();
+     delete this;
+ }
+@@ -84,6 +93,10 @@ Ident::IdentStateData::swanSong()
+ {
+     if (clients != NULL)
+         notify(NULL);
++}
++
++Ident::IdentStateData::~IdentStateData() {
++    assert(!clients);
+ 
+     if (Comm::IsConnOpen(conn)) {
+         comm_remove_close_handler(conn->fd, Ident::Close, this);
+@@ -112,6 +125,10 @@ void
+ Ident::Close(const CommCloseCbParams &params)
+ {
+     IdentStateData *state = (IdentStateData *)params.data;
++    if (state->conn) {
++        state->conn->noteClosure();
++        state->conn = nullptr;
++    }
+     state->deleteThis("connection closed");
+ }
+ 
+@@ -127,6 +144,16 @@ void
+ Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int, void *data)
+ {
+     IdentStateData *state = (IdentStateData *)data;
++    state->connWait.finish();
++
++    // Start owning the supplied connection (so that it is not orphaned if this
++    // function bails early). As a (tiny) optimization or perhaps just diff
++    // minimization, the close handler is added later, when we know we are not
++    // bailing. This delay is safe because comm_remove_close_handler() forgives
++    // missing handlers.
++    assert(conn); // but may be closed
++    assert(!state->conn);
++    state->conn = conn;
+ 
+     if (status != Comm::OK) {
+         if (status == Comm::TIMEOUT)
+@@ -149,8 +176,8 @@ Ident::ConnectDone(const Comm::ConnectionPointer &conn, Comm::Flag status, int,
+         return;
+     }
+ 
+-    assert(conn != NULL && conn == state->conn);
+-    comm_add_close_handler(conn->fd, Ident::Close, state);
++    assert(state->conn->isOpen());
++    comm_add_close_handler(state->conn->fd, Ident::Close, state);
+ 
+     AsyncCall::Pointer writeCall = commCbCall(5,4, "Ident::WriteFeedback",
+                                    CommIoCbPtrFun(Ident::WriteFeedback, state));
+@@ -259,10 +286,10 @@ Ident::Start(const Comm::ConnectionPointer &conn, IDCB * callback, void *data)
+     state->hash.key = xstrdup(key);
+ 
+     // copy the conn details. We do not want the original FD to be re-used by IDENT.
+-    state->conn = conn->cloneIdentDetails();
++    const auto identConn = conn->cloneProfile();
+     // NP: use random port for secure outbound to IDENT_PORT
+-    state->conn->local.port(0);
+-    state->conn->remote.port(IDENT_PORT);
++    identConn->local.port(0);
++    identConn->remote.port(IDENT_PORT);
+ 
+     // build our query from the original connection details
+     state->queryMsg.init();
+@@ -272,7 +299,8 @@ Ident::Start(const Comm::ConnectionPointer &conn, IDCB * callback, void *data)
+     hash_join(ident_hash, &state->hash);
+ 
+     AsyncCall::Pointer call = commCbCall(30,3, "Ident::ConnectDone", CommConnectCbPtrFun(Ident::ConnectDone, state));
+-    AsyncJob::Start(new Comm::ConnOpener(state->conn, call, Ident::TheConfig.timeout));
++    const auto connOpener = new Comm::ConnOpener(identConn, call, Ident::TheConfig.timeout);
++    state->connWait.start(connOpener, call);
+ }
+ 
+ void
+diff --git a/src/log/TcpLogger.cc b/src/log/TcpLogger.cc
+index 2be2f6aea..3be1cc5db 100644
+--- a/src/log/TcpLogger.cc
++++ b/src/log/TcpLogger.cc
+@@ -256,13 +256,16 @@ Log::TcpLogger::doConnect()
+ 
+     typedef CommCbMemFunT<TcpLogger, CommConnectCbParams> Dialer;
+     AsyncCall::Pointer call = JobCallback(MY_DEBUG_SECTION, 5, Dialer, this, Log::TcpLogger::connectDone);
+-    AsyncJob::Start(new Comm::ConnOpener(futureConn, call, 2));
++    const auto cs = new Comm::ConnOpener(futureConn, call, 2);
++    connWait.start(cs, call);
+ }
+ 
+ /// Comm::ConnOpener callback
+ void
+ Log::TcpLogger::connectDone(const CommConnectCbParams &params)
+ {
++    connWait.finish();
++
+     if (params.flag != Comm::OK) {
+         const double delay = 0.5; // seconds
+         if (connectFailures++ % 100 == 0) {
+@@ -367,7 +370,10 @@ Log::TcpLogger::handleClosure(const CommCloseCbParams &)
+ {
+     assert(inCall != NULL);
+     closer = NULL;
+-    conn = NULL;
++    if (conn) {
++        conn->noteClosure();
++        conn = nullptr;
++    }
+     // in all current use cases, we should not try to reconnect
+     mustStop("Log::TcpLogger::handleClosure");
+ }
+diff --git a/src/log/TcpLogger.h b/src/log/TcpLogger.h
+index 1be2113fe..ec7ca5f7b 100644
+--- a/src/log/TcpLogger.h
++++ b/src/log/TcpLogger.h
+@@ -10,6 +10,8 @@
+ #define _SQUID_SRC_LOG_TCPLOGGER_H
+ 
+ #include "base/AsyncJob.h"
++#include "base/JobWait.h"
++#include "comm/forward.h"
+ #include "ip/Address.h"
+ 
+ #include <list>
+@@ -103,6 +105,9 @@ private:
+     Ip::Address remote; ///< where the remote logger expects our records
+     AsyncCall::Pointer closer; ///< handles unexpected/external conn closures
+ 
++    /// waits for a connection to the remote logger to be established/opened
++    JobWait<Comm::ConnOpener> connWait;
++
+     uint64_t connectFailures; ///< number of sequential connection failures
+     uint64_t drops; ///< number of records dropped during the current outage
+ };
+diff --git a/src/mgr/Forwarder.cc b/src/mgr/Forwarder.cc
+index 8edba1d78..e4da4ca08 100644
+--- a/src/mgr/Forwarder.cc
++++ b/src/mgr/Forwarder.cc
+@@ -100,7 +100,11 @@ void
+ Mgr::Forwarder::noteCommClosed(const CommCloseCbParams &)
+ {
+     debugs(16, 5, HERE);
+-    conn = NULL; // needed?
++    closer = nullptr;
++    if (conn) {
++        conn->noteClosure();
++        conn = nullptr;
++    }
+     mustStop("commClosed");
+ }
+ 
+diff --git a/src/mgr/Inquirer.cc b/src/mgr/Inquirer.cc
+index ba41d7aa6..dea0452de 100644
+--- a/src/mgr/Inquirer.cc
++++ b/src/mgr/Inquirer.cc
+@@ -107,11 +107,14 @@ Mgr::Inquirer::noteWroteHeader(const CommIoCbParams& params)
+ 
+ /// called when the HTTP client or some external force closed our socket
+ void
+-Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params)
++Mgr::Inquirer::noteCommClosed(const CommCloseCbParams &)
+ {
+     debugs(16, 5, HERE);
+-    Must(!Comm::IsConnOpen(conn) && params.conn.getRaw() == conn.getRaw());
+-    conn = NULL;
++    closer = nullptr;
++    if (conn) {
++        conn->noteClosure();
++        conn = nullptr;
++    }
+     mustStop("commClosed");
+ }
+ 
+diff --git a/src/mgr/StoreToCommWriter.cc b/src/mgr/StoreToCommWriter.cc
+index e2cccec2f..d3cf73888 100644
+--- a/src/mgr/StoreToCommWriter.cc
++++ b/src/mgr/StoreToCommWriter.cc
+@@ -131,7 +131,11 @@ void
+ Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams &)
+ {
+     debugs(16, 6, HERE);
+-    Must(!Comm::IsConnOpen(clientConnection));
++    if (clientConnection) {
++        clientConnection->noteClosure();
++        clientConnection = nullptr;
++    }
++    closer = nullptr;
+     mustStop("commClosed");
+ }
+ 
+diff --git a/src/security/PeerConnector.cc b/src/security/PeerConnector.cc
+index 4c1401f6e..494edabb8 100644
+--- a/src/security/PeerConnector.cc
++++ b/src/security/PeerConnector.cc
+@@ -89,9 +89,15 @@ Security::PeerConnector::start()
+ void
+ Security::PeerConnector::commCloseHandler(const CommCloseCbParams &params)
+ {
++    debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data);
++
+     closeHandler = nullptr;
++    if (serverConn) {
++        countFailingConnection();
++        serverConn->noteClosure();
++        serverConn = nullptr;
++    }
+ 
+-    debugs(83, 5, "FD " << params.fd << ", Security::PeerConnector=" << params.data);
+     const auto err = new ErrorState(ERR_SECURE_CONNECT_FAIL, Http::scServiceUnavailable, request.getRaw(), al);
+     static const auto d = MakeNamedErrorDetail("TLS_CONNECT_CLOSE");
+     err->detailError(d);
+@@ -111,6 +117,8 @@ Security::PeerConnector::commTimeoutHandler(const CommTimeoutCbParams &)
+ bool
+ Security::PeerConnector::initialize(Security::SessionPointer &serverSession)
+ {
++    Must(Comm::IsConnOpen(serverConnection()));
++
+     Security::ContextPointer ctx(getTlsContext());
+     debugs(83, 5, serverConnection() << ", ctx=" << (void*)ctx.get());
+ 
+@@ -162,6 +170,8 @@ Security::PeerConnector::initialize(Security::SessionPointer &serverSession)
+ void
+ Security::PeerConnector::recordNegotiationDetails()
+ {
++    Must(Comm::IsConnOpen(serverConnection()));
++
+     const int fd = serverConnection()->fd;
+     Security::SessionPointer session(fd_table[fd].ssl);
+ 
+@@ -180,6 +190,8 @@ Security::PeerConnector::recordNegotiationDetails()
+ void
+ Security::PeerConnector::negotiate()
+ {
++    Must(Comm::IsConnOpen(serverConnection()));
++
+     const int fd = serverConnection()->fd;
+     if (fd_table[fd].closing())
+         return;
+@@ -224,7 +236,7 @@ Security::PeerConnector::handleNegotiationResult(const Security::IoResult &resul
+     switch (result.category) {
+     case Security::IoResult::ioSuccess:
+         recordNegotiationDetails();
+-        if (sslFinalized())
++        if (sslFinalized() && callback)
+             sendSuccess();
+         return; // we may be gone by now
+ 
+@@ -252,6 +264,7 @@ Security::PeerConnector::sslFinalized()
+ {
+ #if USE_OPENSSL
+     if (Ssl::TheConfig.ssl_crt_validator && useCertValidator_) {
++        Must(Comm::IsConnOpen(serverConnection()));
+         const int fd = serverConnection()->fd;
+         Security::SessionPointer session(fd_table[fd].ssl);
+ 
+@@ -295,6 +308,7 @@ void
+ Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointer validationResponse)
+ {
+     Must(validationResponse != NULL);
++    Must(Comm::IsConnOpen(serverConnection()));
+ 
+     ErrorDetail::Pointer errDetails;
+     bool validatorFailed = false;
+@@ -317,7 +331,8 @@ Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointe
+ 
+     if (!errDetails && !validatorFailed) {
+         noteNegotiationDone(NULL);
+-        sendSuccess();
++        if (callback)
++            sendSuccess();
+         return;
+     }
+ 
+@@ -343,6 +358,8 @@ Security::PeerConnector::sslCrtvdHandleReply(Ssl::CertValidationResponse::Pointe
+ Security::CertErrors *
+ Security::PeerConnector::sslCrtvdCheckForErrors(Ssl::CertValidationResponse const &resp, ErrorDetail::Pointer &errDetails)
+ {
++    Must(Comm::IsConnOpen(serverConnection()));
++
+     ACLFilledChecklist *check = NULL;
+     Security::SessionPointer session(fd_table[serverConnection()->fd].ssl);
+ 
+@@ -418,9 +435,11 @@ Security::PeerConnector::negotiateSsl()
+ void
+ Security::PeerConnector::noteWantRead()
+ {
+-    const int fd = serverConnection()->fd;
+     debugs(83, 5, serverConnection());
+ 
++    Must(Comm::IsConnOpen(serverConnection()));
++    const int fd = serverConnection()->fd;
++
+     // read timeout to avoid getting stuck while reading from a silent server
+     typedef CommCbMemFunT<Security::PeerConnector, CommTimeoutCbParams> TimeoutDialer;
+     AsyncCall::Pointer timeoutCall = JobCallback(83, 5,
+@@ -434,8 +453,10 @@ Security::PeerConnector::noteWantRead()
+ void
+ Security::PeerConnector::noteWantWrite()
+ {
+-    const int fd = serverConnection()->fd;
+     debugs(83, 5, serverConnection());
++    Must(Comm::IsConnOpen(serverConnection()));
++
++    const int fd = serverConnection()->fd;
+     Comm::SetSelect(fd, COMM_SELECT_WRITE, &NegotiateSsl, new Pointer(this), 0);
+     return;
+ }
+@@ -452,57 +473,76 @@ Security::PeerConnector::noteNegotiationError(const Security::ErrorDetailPointer
+     bail(anErr);
+ }
+ 
++Security::EncryptorAnswer &
++Security::PeerConnector::answer()
++{
++    assert(callback);
++    const auto dialer = dynamic_cast<CbDialer*>(callback->getDialer());
++    assert(dialer);
++    return dialer->answer();
++}
++
+ void
+ Security::PeerConnector::bail(ErrorState *error)
+ {
+     Must(error); // or the recepient will not know there was a problem
+-    Must(callback != NULL);
+-    CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer());
+-    Must(dialer);
+-    dialer->answer().error = error;
++    answer().error = error;
+ 
+-    if (const auto p = serverConnection()->getPeer())
+-        peerConnectFailed(p);
++    if (const auto failingConnection = serverConn) {
++        countFailingConnection();
++        disconnect();
++        failingConnection->close();
++    }
+ 
+     callBack();
+-    disconnect();
+-
+-    if (noteFwdPconnUse)
+-        fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses);
+-    serverConn->close();
+-    serverConn = nullptr;
+ }
+ 
+ void
+ Security::PeerConnector::sendSuccess()
+ {
+-    callBack();
++    assert(Comm::IsConnOpen(serverConn));
++    answer().conn = serverConn;
+     disconnect();
++    callBack();
++}
++
++void
++Security::PeerConnector::countFailingConnection()
++{
++    assert(serverConn);
++    if (const auto p = serverConn->getPeer())
++        peerConnectFailed(p);
++    // TODO: Calling PconnPool::noteUses() should not be our responsibility.
++    if (noteFwdPconnUse && serverConn->isOpen())
++        fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses);
+ }
+ 
+ void
+ Security::PeerConnector::disconnect()
+ {
++    const auto stillOpen = Comm::IsConnOpen(serverConn);
++
+     if (closeHandler) {
+-        comm_remove_close_handler(serverConnection()->fd, closeHandler);
++        if (stillOpen)
++            comm_remove_close_handler(serverConn->fd, closeHandler);
+         closeHandler = nullptr;
+     }
+ 
+-    commUnsetConnTimeout(serverConnection());
++    if (stillOpen)
++        commUnsetConnTimeout(serverConn);
++
++    serverConn = nullptr;
+ }
+ 
+ void
+ Security::PeerConnector::callBack()
+ {
+-    debugs(83, 5, "TLS setup ended for " << serverConnection());
++    debugs(83, 5, "TLS setup ended for " << answer().conn);
+ 
+     AsyncCall::Pointer cb = callback;
+     // Do this now so that if we throw below, swanSong() assert that we _tried_
+     // to call back holds.
+     callback = NULL; // this should make done() true
+-    CbDialer *dialer = dynamic_cast<CbDialer*>(cb->getDialer());
+-    Must(dialer);
+-    dialer->answer().conn = serverConnection();
+     ScheduleCallHere(cb);
+ }
+ 
+@@ -511,8 +551,9 @@ Security::PeerConnector::swanSong()
+ {
+     // XXX: unregister fd-closure monitoring and CommSetSelect interest, if any
+     AsyncJob::swanSong();
+-    if (callback != NULL) { // paranoid: we have left the caller waiting
+-        debugs(83, DBG_IMPORTANT, "BUG: Unexpected state while connecting to a cache_peer or origin server");
++
++    if (callback) {
++        // job-ending emergencies like handleStopRequest() or callException()
+         const auto anErr = new ErrorState(ERR_GATEWAY_FAILURE, Http::scInternalServerError, request.getRaw(), al);
+         bail(anErr);
+         assert(!callback);
+@@ -533,7 +574,7 @@ Security::PeerConnector::status() const
+         buf.append("Stopped, reason:", 16);
+         buf.appendf("%s",stopReason);
+     }
+-    if (serverConn != NULL)
++    if (Comm::IsConnOpen(serverConn))
+         buf.appendf(" FD %d", serverConn->fd);
+     buf.appendf(" %s%u]", id.prefix(), id.value);
+     buf.terminate();
+@@ -581,15 +622,18 @@ Security::PeerConnector::startCertDownloading(SBuf &url)
+                                       PeerConnectorCertDownloaderDialer(&Security::PeerConnector::certDownloadingDone, this));
+ 
+     const auto dl = new Downloader(url, certCallback, XactionInitiator::initCertFetcher, certDownloadNestingLevel() + 1);
+-    AsyncJob::Start(dl);
++    certDownloadWait.start(dl, certCallback);
+ }
+ 
+ void
+ Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus)
+ {
++    certDownloadWait.finish();
++
+     ++certsDownloads;
+     debugs(81, 5, "Certificate downloading status: " << downloadStatus << " certificate size: " << obj.length());
+ 
++    Must(Comm::IsConnOpen(serverConnection()));
+     const auto &sconn = *fd_table[serverConnection()->fd].ssl;
+ 
+     // Parse Certificate. Assume that it is in DER format.
+@@ -642,6 +686,7 @@ Security::PeerConnector::certDownloadingDone(SBuf &obj, int downloadStatus)
+ void
+ Security::PeerConnector::handleMissingCertificates(const Security::IoResult &ioResult)
+ {
++    Must(Comm::IsConnOpen(serverConnection()));
+     auto &sconn = *fd_table[serverConnection()->fd].ssl;
+ 
+     // We download the missing certificate(s) once. We would prefer to clear
+diff --git a/src/security/PeerConnector.h b/src/security/PeerConnector.h
+index 942f8627a..10700d796 100644
+--- a/src/security/PeerConnector.h
++++ b/src/security/PeerConnector.h
+@@ -12,6 +12,7 @@
+ #include "acl/Acl.h"
+ #include "base/AsyncCbdataCalls.h"
+ #include "base/AsyncJob.h"
++#include "base/JobWait.h"
+ #include "CommCalls.h"
+ #include "http/forward.h"
+ #include "security/EncryptorAnswer.h"
+@@ -24,6 +25,7 @@
+ #include <queue>
+ 
+ class ErrorState;
++class Downloader;
+ class AccessLogEntry;
+ typedef RefCount<AccessLogEntry> AccessLogEntryPointer;
+ 
+@@ -152,6 +154,9 @@ protected:
+     /// a bail(), sendSuccess() helper: stops monitoring the connection
+     void disconnect();
+ 
++    /// updates connection usage history before the connection is closed
++    void countFailingConnection();
++
+     /// If called the certificates validator will not used
+     void bypassCertValidator() {useCertValidator_ = false;}
+ 
+@@ -159,6 +164,9 @@ protected:
+     /// logging
+     void recordNegotiationDetails();
+ 
++    /// convenience method to get to the answer fields
++    EncryptorAnswer &answer();
++
+     HttpRequestPointer request; ///< peer connection trigger or cause
+     Comm::ConnectionPointer serverConn; ///< TCP connection to the peer
+     AccessLogEntryPointer al; ///< info for the future access.log entry
+@@ -203,6 +211,8 @@ private:
+ 
+     /// outcome of the last (failed and) suspended negotiation attempt (or nil)
+     Security::IoResultPointer suspendedError_;
++
++    JobWait<Downloader> certDownloadWait; ///< waits for the missing certificate to be downloaded
+ };
+ 
+ } // namespace Security
+diff --git a/src/security/forward.h b/src/security/forward.h
+index dce66e397..26225aae0 100644
+--- a/src/security/forward.h
++++ b/src/security/forward.h
+@@ -172,6 +172,7 @@ class ParsedOptions {}; // we never parse/use TLS options in this case
+ typedef long ParsedPortFlags;
+ 
+ class PeerConnector;
++class BlindPeerConnector;
+ class PeerOptions;
+ 
+ #if USE_OPENSSL
+diff --git a/src/servers/FtpServer.cc b/src/servers/FtpServer.cc
+index 53f822d2f..e0a10b6b5 100644
+--- a/src/servers/FtpServer.cc
++++ b/src/servers/FtpServer.cc
+@@ -61,7 +61,7 @@ Ftp::Server::Server(const MasterXaction::Pointer &xact):
+     dataConn(),
+     uploadAvailSize(0),
+     listener(),
+-    connector(),
++    dataConnWait(),
+     reader(),
+     waitingForOrigin(false),
+     originDataDownloadAbortedOnError(false)
+@@ -1676,11 +1676,11 @@ Ftp::Server::checkDataConnPre()
+ 
+     // active transfer: open a data connection from Squid to client
+     typedef CommCbMemFunT<Server, CommConnectCbParams> Dialer;
+-    connector = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData);
+-    Comm::ConnOpener *cs = new Comm::ConnOpener(dataConn, connector,
+-            Config.Timeout.connect);
+-    AsyncJob::Start(cs);
+-    return false; // ConnStateData::processFtpRequest waits handleConnectDone
++    AsyncCall::Pointer callback = JobCallback(17, 3, Dialer, this, Ftp::Server::connectedForData);
++    const auto cs = new Comm::ConnOpener(dataConn->cloneProfile(), callback,
++                                         Config.Timeout.connect);
++    dataConnWait.start(cs, callback);
++    return false;
+ }
+ 
+ /// Check that client data connection is ready for immediate I/O.
+@@ -1698,18 +1698,22 @@ Ftp::Server::checkDataConnPost() const
+ void
+ Ftp::Server::connectedForData(const CommConnectCbParams &params)
+ {
+-    connector = NULL;
++    dataConnWait.finish();
+ 
+     if (params.flag != Comm::OK) {
+-        /* it might have been a timeout with a partially open link */
+-        if (params.conn != NULL)
+-            params.conn->close();
+         setReply(425, "Cannot open data connection.");
+         Http::StreamPointer context = pipeline.front();
+         Must(context->http);
+         Must(context->http->storeEntry() != NULL);
++        // TODO: call closeDataConnection() to reset data conn processing?
+     } else {
+-        Must(dataConn == params.conn);
++        // Finalize the details and start owning the supplied connection.
++        assert(params.conn);
++        assert(dataConn);
++        assert(!dataConn->isOpen());
++        dataConn = params.conn;
++        // XXX: Missing comm_add_close_handler() to track external closures.
++
+         Must(Comm::IsConnOpen(params.conn));
+         fd_note(params.conn->fd, "active client ftp data");
+     }
+diff --git a/src/servers/FtpServer.h b/src/servers/FtpServer.h
+index fb9ef9081..d67799827 100644
+--- a/src/servers/FtpServer.h
++++ b/src/servers/FtpServer.h
+@@ -11,8 +11,10 @@
+ #ifndef SQUID_SERVERS_FTP_SERVER_H
+ #define SQUID_SERVERS_FTP_SERVER_H
+ 
++#include "base/JobWait.h"
+ #include "base/Lock.h"
+ #include "client_side.h"
++#include "comm/forward.h"
+ 
+ namespace Ftp
+ {
+@@ -188,7 +190,11 @@ private:
+     size_t uploadAvailSize; ///< number of yet unused uploadBuf bytes
+ 
+     AsyncCall::Pointer listener; ///< set when we are passively listening
+-    AsyncCall::Pointer connector; ///< set when we are actively connecting
++
++    /// Waits for an FTP data connection to the client to be established/opened.
++    /// This wait only happens in FTP active mode (via PORT or EPRT).
++    JobWait<Comm::ConnOpener> dataConnWait;
++
+     AsyncCall::Pointer reader; ///< set when we are reading FTP data
+ 
+     /// whether we wait for the origin data transfer to end
+diff --git a/src/snmp/Forwarder.cc b/src/snmp/Forwarder.cc
+index 836eb5772..259bb0441 100644
+--- a/src/snmp/Forwarder.cc
++++ b/src/snmp/Forwarder.cc
+@@ -53,6 +53,7 @@ Snmp::Forwarder::noteCommClosed(const CommCloseCbParams& params)
+ {
+     debugs(49, 5, HERE);
+     Must(fd == params.fd);
++    closer = nullptr;
+     fd = -1;
+     mustStop("commClosed");
+ }
+@@ -68,8 +69,7 @@ void
+ Snmp::Forwarder::handleException(const std::exception& e)
+ {
+     debugs(49, 3, HERE << e.what());
+-    if (fd >= 0)
+-        sendError(SNMP_ERR_GENERR);
++    sendError(SNMP_ERR_GENERR);
+     Ipc::Forwarder::handleException(e);
+ }
+ 
+@@ -78,6 +78,10 @@ void
+ Snmp::Forwarder::sendError(int error)
+ {
+     debugs(49, 3, HERE);
++
++    if (fd < 0)
++        return; // client gone
++
+     Snmp::Request& req = static_cast<Snmp::Request&>(*request);
+     req.pdu.command = SNMP_PDU_RESPONSE;
+     req.pdu.errstat = error;
+diff --git a/src/snmp/Inquirer.cc b/src/snmp/Inquirer.cc
+index 9b6e34405..82f8c4475 100644
+--- a/src/snmp/Inquirer.cc
++++ b/src/snmp/Inquirer.cc
+@@ -88,7 +88,11 @@ Snmp::Inquirer::noteCommClosed(const CommCloseCbParams& params)
+ {
+     debugs(49, 5, HERE);
+     Must(!Comm::IsConnOpen(conn) || conn->fd == params.conn->fd);
+-    conn = NULL;
++    closer = nullptr;
++    if (conn) {
++        conn->noteClosure();
++        conn = nullptr;
++    }
+     mustStop("commClosed");
+ }
+ 
+@@ -102,6 +106,10 @@ void
+ Snmp::Inquirer::sendResponse()
+ {
+     debugs(49, 5, HERE);
++
++    if (!Comm::IsConnOpen(conn))
++        return; // client gone
++
+     aggrPdu.fixAggregate();
+     aggrPdu.command = SNMP_PDU_RESPONSE;
+     u_char buffer[SNMP_REQUEST_SIZE];
+diff --git a/src/ssl/PeekingPeerConnector.cc b/src/ssl/PeekingPeerConnector.cc
+index 9a4055355..89e45435b 100644
+--- a/src/ssl/PeekingPeerConnector.cc
++++ b/src/ssl/PeekingPeerConnector.cc
+@@ -27,18 +27,18 @@ CBDATA_NAMESPACED_CLASS_INIT(Ssl, PeekingPeerConnector);
+ void switchToTunnel(HttpRequest *request, const Comm::ConnectionPointer &clientConn, const Comm::ConnectionPointer &srvConn, const SBuf &preReadServerData);
+ 
+ void
+-Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data)
++Ssl::PeekingPeerConnector::cbCheckForPeekAndSpliceDone(const Acl::Answer aclAnswer, void *data)
+ {
+     Ssl::PeekingPeerConnector *peerConnect = (Ssl::PeekingPeerConnector *) data;
+     // Use job calls to add done() checks and other job logic/protections.
+-    CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, answer);
++    CallJobHere1(83, 7, CbcPointer<PeekingPeerConnector>(peerConnect), Ssl::PeekingPeerConnector, checkForPeekAndSpliceDone, aclAnswer);
+ }
+ 
+ void
+-Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(Acl::Answer answer)
++Ssl::PeekingPeerConnector::checkForPeekAndSpliceDone(const Acl::Answer aclAnswer)
+ {
+-    const Ssl::BumpMode finalAction = answer.allowed() ?
+-                                      static_cast<Ssl::BumpMode>(answer.kind):
++    const Ssl::BumpMode finalAction = aclAnswer.allowed() ?
++                                      static_cast<Ssl::BumpMode>(aclAnswer.kind):
+                                       checkForPeekAndSpliceGuess();
+     checkForPeekAndSpliceMatched(finalAction);
+ }
+@@ -106,10 +106,8 @@ Ssl::PeekingPeerConnector::checkForPeekAndSpliceMatched(const Ssl::BumpMode acti
+         splice = true;
+         // Ssl Negotiation stops here. Last SSL checks for valid certificates
+         // and if done, switch to tunnel mode
+-        if (sslFinalized()) {
+-            debugs(83,5, "Abort NegotiateSSL on FD " << serverConn->fd << " and splice the connection");
++        if (sslFinalized() && callback)
+             callBack();
+-        }
+     }
+ }
+ 
+@@ -272,8 +270,11 @@ Ssl::PeekingPeerConnector::startTunneling()
+     auto b = SSL_get_rbio(session.get());
+     auto srvBio = static_cast<Ssl::ServerBio*>(BIO_get_data(b));
+ 
++    debugs(83, 5, "will tunnel instead of negotiating TLS");
+     switchToTunnel(request.getRaw(), clientConn, serverConn, srvBio->rBufData());
+-    tunnelInsteadOfNegotiating();
++    answer().tunneled = true;
++    disconnect();
++    callBack();
+ }
+ 
+ void
+@@ -397,13 +398,3 @@ Ssl::PeekingPeerConnector::serverCertificateVerified()
+     }
+ }
+ 
+-void
+-Ssl::PeekingPeerConnector::tunnelInsteadOfNegotiating()
+-{
+-    Must(callback != NULL);
+-    CbDialer *dialer = dynamic_cast<CbDialer*>(callback->getDialer());
+-    Must(dialer);
+-    dialer->answer().tunneled = true;
+-    debugs(83, 5, "The SSL negotiation with server aborted");
+-}
+-
+diff --git a/src/ssl/PeekingPeerConnector.h b/src/ssl/PeekingPeerConnector.h
+index 3c86b887d..0145e77f9 100644
+--- a/src/ssl/PeekingPeerConnector.h
++++ b/src/ssl/PeekingPeerConnector.h
+@@ -51,7 +51,7 @@ public:
+     void checkForPeekAndSplice();
+ 
+     /// Callback function for ssl_bump acl check in step3  SSL bump step.
+-    void checkForPeekAndSpliceDone(Acl::Answer answer);
++    void checkForPeekAndSpliceDone(Acl::Answer);
+ 
+     /// Handles the final bumping decision.
+     void checkForPeekAndSpliceMatched(const Ssl::BumpMode finalMode);
+@@ -67,7 +67,7 @@ public:
+     void startTunneling();
+ 
+     /// A wrapper function for checkForPeekAndSpliceDone for use with acl
+-    static void cbCheckForPeekAndSpliceDone(Acl::Answer answer, void *data);
++    static void cbCheckForPeekAndSpliceDone(Acl::Answer, void *data);
+ 
+ private:
+ 
+diff --git a/src/tests/stub_libcomm.cc b/src/tests/stub_libcomm.cc
+index 45aa6deb2..8fdb40bbd 100644
+--- a/src/tests/stub_libcomm.cc
++++ b/src/tests/stub_libcomm.cc
+@@ -22,9 +22,9 @@ void Comm::AcceptLimiter::kick() STUB
+ #include "comm/Connection.h"
+ Comm::Connection::Connection() STUB
+ Comm::Connection::~Connection() STUB
+-Comm::ConnectionPointer Comm::Connection::cloneIdentDetails() const STUB_RETVAL(nullptr)
+-Comm::ConnectionPointer Comm::Connection::cloneDestinationDetails() const STUB_RETVAL(nullptr)
++Comm::ConnectionPointer Comm::Connection::cloneProfile() const STUB_RETVAL(nullptr)
+ void Comm::Connection::close() STUB
++void Comm::Connection::noteClosure() STUB
+ CachePeer * Comm::Connection::getPeer() const STUB_RETVAL(NULL)
+ void Comm::Connection::setPeer(CachePeer * p) STUB
+ ScopedId Comm::Connection::codeContextGist() const STUB_RETVAL(id.detach())
+diff --git a/src/tests/stub_libsecurity.cc b/src/tests/stub_libsecurity.cc
+index aa9cf0b36..176cf337e 100644
+--- a/src/tests/stub_libsecurity.cc
++++ b/src/tests/stub_libsecurity.cc
+@@ -9,6 +9,7 @@
+ #include "squid.h"
+ #include "AccessLogEntry.h"
+ #include "comm/Connection.h"
++#include "Downloader.h"
+ #include "HttpRequest.h"
+ 
+ #define STUB_API "security/libsecurity.la"
+@@ -88,7 +89,9 @@ void PeerConnector::bail(ErrorState *) STUB
+ void PeerConnector::sendSuccess() STUB
+ void PeerConnector::callBack() STUB
+ void PeerConnector::disconnect() STUB
++void PeerConnector::countFailingConnection() STUB
+ void PeerConnector::recordNegotiationDetails() STUB
++EncryptorAnswer &PeerConnector::answer() STUB_RETREF(EncryptorAnswer)
+ }
+ 
+ #include "security/PeerOptions.h"
+diff --git a/src/tunnel.cc b/src/tunnel.cc
+index 386a0ecd6..4fc5abdef 100644
+--- a/src/tunnel.cc
++++ b/src/tunnel.cc
+@@ -11,6 +11,7 @@
+ #include "squid.h"
+ #include "acl/FilledChecklist.h"
+ #include "base/CbcPointer.h"
++#include "base/JobWait.h"
+ #include "CachePeer.h"
+ #include "cbdata.h"
+ #include "client_side.h"
+@@ -180,9 +181,6 @@ public:
+     SBuf preReadClientData;
+     SBuf preReadServerData;
+     time_t startTime; ///< object creation time, before any peer selection/connection attempts
+-    /// Whether we are waiting for the CONNECT request/response exchange with the peer.
+-    bool waitingForConnectExchange;
+-    HappyConnOpenerPointer connOpener; ///< current connection opening job
+     ResolvedPeersPointer destinations; ///< paths for forwarding the request
+     bool destinationsFound; ///< At least one candidate path found
+     /// whether another destination may be still attempted if the TCP connection
+@@ -191,10 +189,15 @@ public:
+     // TODO: remove after fixing deferred reads in TunnelStateData::copyRead()
+     CodeContext::Pointer codeContext; ///< our creator context
+ 
+-    // AsyncCalls which we set and may need cancelling.
+-    struct {
+-        AsyncCall::Pointer connector;  ///< a call linking us to the ConnOpener producing serverConn.
+-    } calls;
++    /// waits for a transport connection to the peer to be established/opened
++    JobWait<HappyConnOpener> transportWait;
++
++    /// waits for the established transport connection to be secured/encrypted
++    JobWait<Security::PeerConnector> encryptionWait;
++
++    /// waits for an HTTP CONNECT tunnel through a cache_peer to be negotiated
++    /// over the (encrypted, if needed) transport connection to that cache_peer
++    JobWait<Http::Tunneler> peerWait;
+ 
+     void copyRead(Connection &from, IOCB *completion);
+ 
+@@ -212,12 +215,6 @@ public:
+     /// when all candidate destinations have been tried and all have failed
+     void noteConnection(HappyConnOpenerAnswer &);
+ 
+-    /// whether we are waiting for HappyConnOpener
+-    /// same as calls.connector but may differ from connOpener.valid()
+-    bool opening() const { return connOpener.set(); }
+-
+-    void cancelOpening(const char *reason);
+-
+     /// Start using an established connection
+     void connectDone(const Comm::ConnectionPointer &conn, const char *origin, const bool reused);
+ 
+@@ -266,6 +263,9 @@ private:
+ 
+     /// \returns whether the request should be retried (nil) or the description why it should not
+     const char *checkRetry();
++    /// whether the successfully selected path destination or the established
++    /// server connection is still in use
++    bool usingDestination() const;
+ 
+     /// details of the "last tunneling attempt" failure (if it failed)
+     ErrorState *savedError = nullptr;
+@@ -275,6 +275,8 @@ private:
+ 
+     void deleteThis();
+ 
++    void cancelStep(const char *reason);
++
+ public:
+     bool keepGoingAfterRead(size_t len, Comm::Flag errcode, int xerrno, Connection &from, Connection &to);
+     void copy(size_t len, Connection &from, Connection &to, IOCB *);
+@@ -357,7 +359,6 @@ TunnelStateData::deleteThis()
+ 
+ TunnelStateData::TunnelStateData(ClientHttpRequest *clientRequest) :
+     startTime(squid_curtime),
+-    waitingForConnectExchange(false),
+     destinations(new ResolvedPeers()),
+     destinationsFound(false),
+     retriable(true),
+@@ -390,8 +391,7 @@ TunnelStateData::~TunnelStateData()
+     debugs(26, 3, "TunnelStateData destructed this=" << this);
+     assert(noConnections());
+     xfree(url);
+-    if (opening())
+-        cancelOpening("~TunnelStateData");
++    cancelStep("~TunnelStateData");
+     delete savedError;
+ }
+ 
+@@ -904,7 +904,10 @@ TunnelStateData::copyServerBytes()
+ static void
+ tunnelStartShoveling(TunnelStateData *tunnelState)
+ {
+-    assert(!tunnelState->waitingForConnectExchange);
++    assert(!tunnelState->transportWait);
++    assert(!tunnelState->encryptionWait);
++    assert(!tunnelState->peerWait);
++
+     assert(tunnelState->server.conn);
+     AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout",
+                                      CommTimeoutCbPtrFun(tunnelTimeout, tunnelState));
+@@ -962,6 +965,7 @@ tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *, size_t len
+ void
+ TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
+ {
++    peerWait.finish();
+     server.len = 0;
+ 
+     if (logTag_ptr)
+@@ -970,13 +974,11 @@ TunnelStateData::tunnelEstablishmentDone(Http::TunnelerAnswer &answer)
+     if (answer.peerResponseStatus != Http::scNone)
+         *status_ptr = answer.peerResponseStatus;
+ 
+-    waitingForConnectExchange = false;
+-
+     auto sawProblem = false;
+ 
+     if (!answer.positive()) {
+         sawProblem = true;
+-        Must(!Comm::IsConnOpen(answer.conn));
++        assert(!answer.conn);
+     } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
+         sawProblem = true;
+         closePendingConnection(answer.conn, "conn was closed while waiting for tunnelEstablishmentDone");
+@@ -1042,8 +1044,7 @@ tunnelErrorComplete(int fd/*const Comm::ConnectionPointer &*/, void *data, size_
+ void
+ TunnelStateData::noteConnection(HappyConnOpener::Answer &answer)
+ {
+-    calls.connector = nullptr;
+-    connOpener.clear();
++    transportWait.finish();
+ 
+     ErrorState *error = nullptr;
+     if ((error = answer.error.get())) {
+@@ -1167,7 +1168,7 @@ TunnelStateData::secureConnectionToPeer(const Comm::ConnectionPointer &conn)
+     AsyncCall::Pointer callback = asyncCall(5,4, "TunnelStateData::noteSecurityPeerConnectorAnswer",
+                                             MyAnswerDialer(&TunnelStateData::noteSecurityPeerConnectorAnswer, this));
+     const auto connector = new Security::BlindPeerConnector(request, conn, callback, al);
+-    AsyncJob::Start(connector); // will call our callback
++    encryptionWait.start(connector, callback);
+ }
+ 
+ /// starts a preparation step for an established connection; retries on failures
+@@ -1194,9 +1195,12 @@ TunnelStateData::advanceDestination(const char *stepDescription, const Comm::Con
+ void
+ TunnelStateData::noteSecurityPeerConnectorAnswer(Security::EncryptorAnswer &answer)
+ {
++    encryptionWait.finish();
++
+     ErrorState *error = nullptr;
++    assert(!answer.tunneled);
+     if ((error = answer.error.get())) {
+-        Must(!Comm::IsConnOpen(answer.conn));
++        assert(!answer.conn);
+         answer.error.clear();
+     } else if (!Comm::IsConnOpen(answer.conn) || fd_table[answer.conn->fd].closing()) {
+         error = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, request.getRaw(), al);
+@@ -1223,8 +1227,6 @@ TunnelStateData::connectedToPeer(const Comm::ConnectionPointer &conn)
+ void
+ TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
+ {
+-    assert(!waitingForConnectExchange);
+-
+     AsyncCall::Pointer callback = asyncCall(5,4,
+                                             "TunnelStateData::tunnelEstablishmentDone",
+                                             Http::Tunneler::CbDialer<TunnelStateData>(&TunnelStateData::tunnelEstablishmentDone, this));
+@@ -1232,9 +1234,7 @@ TunnelStateData::establishTunnelThruProxy(const Comm::ConnectionPointer &conn)
+ #if USE_DELAY_POOLS
+     tunneler->setDelayId(server.delayId);
+ #endif
+-    AsyncJob::Start(tunneler);
+-    waitingForConnectExchange = true;
+-    // and wait for the tunnelEstablishmentDone() call
++    peerWait.start(tunneler, callback);
+ }
+ 
+ void
+@@ -1252,14 +1252,14 @@ TunnelStateData::noteDestination(Comm::ConnectionPointer path)
+ 
+     destinations->addPath(path);
+ 
+-    if (Comm::IsConnOpen(server.conn)) {
++    if (usingDestination()) {
+         // We are already using a previously opened connection but also
+         // receiving destinations in case we need to re-forward.
+-        Must(!opening());
++        Must(!transportWait);
+         return;
+     }
+ 
+-    if (opening()) {
++    if (transportWait) {
+         notifyConnOpener();
+         return; // and continue to wait for tunnelConnectDone() callback
+     }
+@@ -1289,17 +1289,23 @@ TunnelStateData::noteDestinationsEnd(ErrorState *selectionError)
+     // if all of them fail, tunneling as whole will fail
+     Must(!selectionError); // finding at least one path means selection succeeded
+ 
+-    if (Comm::IsConnOpen(server.conn)) {
++    if (usingDestination()) {
+         // We are already using a previously opened connection but also
+         // receiving destinations in case we need to re-forward.
+-        Must(!opening());
++        Must(!transportWait);
+         return;
+     }
+ 
+-    Must(opening()); // or we would be stuck with nothing to do or wait for
++    Must(transportWait); // or we would be stuck with nothing to do or wait for
+     notifyConnOpener();
+ }
+ 
++bool
++TunnelStateData::usingDestination() const
++{
++    return encryptionWait || peerWait || Comm::IsConnOpen(server.conn);
++}
++
+ /// remembers an error to be used if there will be no more connection attempts
+ void
+ TunnelStateData::saveError(ErrorState *error)
+@@ -1320,8 +1326,7 @@ TunnelStateData::sendError(ErrorState *finalError, const char *reason)
+     if (request)
+         request->hier.stopPeerClock(false);
+ 
+-    if (opening())
+-        cancelOpening(reason);
++    cancelStep(reason);
+ 
+     assert(finalError);
+ 
+@@ -1339,18 +1344,15 @@ TunnelStateData::sendError(ErrorState *finalError, const char *reason)
+     errorSend(client.conn, finalError);
+ }
+ 
+-/// Notify connOpener that we no longer need connections. We do not have to do
+-/// this -- connOpener would eventually notice on its own, but notifying reduces
+-/// waste and speeds up spare connection opening for other transactions (that
+-/// could otherwise wait for this transaction to use its spare allowance).
++/// Notify a pending subtask, if any, that we no longer need its help. We do not
++/// have to do this -- the subtask job will eventually end -- but ending it
++/// earlier reduces waste and may reduce DoS attack surface.
+ void
+-TunnelStateData::cancelOpening(const char *reason)
++TunnelStateData::cancelStep(const char *reason)
+ {
+-    assert(calls.connector);
+-    calls.connector->cancel(reason);
+-    calls.connector = nullptr;
+-    notifyConnOpener();
+-    connOpener.clear();
++    transportWait.cancel(reason);
++    encryptionWait.cancel(reason);
++    peerWait.cancel(reason);
+ }
+ 
+ void
+@@ -1360,15 +1362,14 @@ TunnelStateData::startConnecting()
+         request->hier.startPeerClock();
+ 
+     assert(!destinations->empty());
+-    assert(!opening());
+-    calls.connector = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this));
+-    const auto cs = new HappyConnOpener(destinations, calls.connector, request, startTime, 0, al);
++    assert(!usingDestination());
++    AsyncCall::Pointer callback = asyncCall(17, 5, "TunnelStateData::noteConnection", HappyConnOpener::CbDialer<TunnelStateData>(&TunnelStateData::noteConnection, this));
++    const auto cs = new HappyConnOpener(destinations, callback, request, startTime, 0, al);
+     cs->setHost(request->url.host());
+     cs->setRetriable(false);
+     cs->allowPersistent(false);
+     destinations->notificationPending = true; // start() is async
+-    connOpener = cs;
+-    AsyncJob::Start(cs);
++    transportWait.start(cs, callback);
+ }
+ 
+ /// send request on an existing connection dedicated to the requesting client
+@@ -1417,7 +1418,7 @@ TunnelStateData::Connection::setDelayId(DelayId const &newDelay)
+ 
+ #endif
+ 
+-/// makes sure connOpener knows that destinations have changed
++/// makes sure connection opener knows that the destinations have changed
+ void
+ TunnelStateData::notifyConnOpener()
+ {
+@@ -1425,7 +1426,7 @@ TunnelStateData::notifyConnOpener()
+         debugs(17, 7, "reusing pending notification");
+     } else {
+         destinations->notificationPending = true;
+-        CallJobHere(17, 5, connOpener, HappyConnOpener, noteCandidatesChange);
++        CallJobHere(17, 5, transportWait.job(), HappyConnOpener, noteCandidatesChange);
+     }
+ }
+ 
+diff --git a/src/whois.cc b/src/whois.cc
+index 38dfbacde..306ea5c62 100644
+--- a/src/whois.cc
++++ b/src/whois.cc
+@@ -182,6 +182,7 @@ whoisClose(const CommCloseCbParams &params)
+ {
+     WhoisState *p = (WhoisState *)params.data;
+     debugs(75, 3, "whoisClose: FD " << params.fd);
++    // We do not own a Connection. Assume that FwdState is also monitoring.
+     p->entry->unlock("whoisClose");
+     delete p;
+ }
-- 
2.25.1


^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2022-02-11 10:09 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-02-11  8:34 [PATCH] squid 5.4: Latest patch - Bug #5055 - from upstream Matthias Fischer
2022-02-11 10:09 ` Peter Müller

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox