Newer
Older

Dragana Damjanovic
committed
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "WebTransportLog.h"
#include "Http3WebTransportSession.h"

Kershaw Chang
committed
#include "Http3WebTransportStream.h"

Dragana Damjanovic
committed
#include "WebTransportSessionProxy.h"

Kershaw Chang
committed
#include "WebTransportStreamProxy.h"

Dragana Damjanovic
committed
#include "nsIAsyncVerifyRedirectCallback.h"
#include "nsIHttpChannel.h"

Kershaw Chang
committed
#include "nsIHttpChannelInternal.h"

Dragana Damjanovic
committed
#include "nsIRequest.h"
#include "nsNetUtil.h"

Kershaw Chang
committed
#include "nsProxyRelease.h"

Dragana Damjanovic
committed
#include "nsSocketTransportService2.h"
#include "mozilla/Logging.h"

Randell Jesup
committed
#include "mozilla/ScopeExit.h"
#include "mozilla/StaticPrefs_network.h"

Dragana Damjanovic
committed
namespace mozilla::net {
LazyLogModule webTransportLog("nsWebTransport");
NS_IMPL_ISUPPORTS(WebTransportSessionProxy, WebTransportSessionEventListener,
nsIWebTransport, nsIRedirectResultListener, nsIStreamListener,
nsIChannelEventSink, nsIInterfaceRequestor);
WebTransportSessionProxy::WebTransportSessionProxy()

Kershaw Chang
committed
: mMutex("WebTransportSessionProxy::mMutex"),
mTarget(GetMainThreadSerialEventTarget()) {

Dragana Damjanovic
committed
LOG(("WebTransportSessionProxy constructor"));
}
WebTransportSessionProxy::~WebTransportSessionProxy() {
if (OnSocketThread()) {
return;
}
MutexAutoLock lock(mMutex);
if ((mState != WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED) &&
(mState != WebTransportSessionProxyState::ACTIVE) &&
(mState != WebTransportSessionProxyState::SESSION_CLOSE_PENDING)) {
return;
}
MOZ_ASSERT(mState != WebTransportSessionProxyState::SESSION_CLOSE_PENDING,
"We can not be in the SESSION_CLOSE_PENDING state in destructor, "
"because should e an runnable that holds reference to this"
"object.");
Unused << gSocketTransportService->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::ProxyHttp3WebTransportSessionRelease",
[self{std::move(mWebTransportSession)}]() {}));
}
//-----------------------------------------------------------------------------
// WebTransportSessionProxy::nsIWebTransport
//-----------------------------------------------------------------------------
nsresult WebTransportSessionProxy::AsyncConnect(
nsIURI* aURI, nsIPrincipal* aPrincipal, uint32_t aSecurityFlags,
WebTransportSessionEventListener* aListener) {

sunil mayya
committed
return AsyncConnectWithClient(aURI, aPrincipal, aSecurityFlags, aListener,
Maybe<dom::ClientInfo>());
}
nsresult WebTransportSessionProxy::AsyncConnectWithClient(
nsIURI* aURI, nsIPrincipal* aPrincipal, uint32_t aSecurityFlags,
WebTransportSessionEventListener* aListener,
const Maybe<dom::ClientInfo>& aClientInfo) {

Dragana Damjanovic
committed
MOZ_ASSERT(NS_IsMainThread());
LOG(("WebTransportSessionProxy::AsyncConnect"));
{
MutexAutoLock lock(mMutex);
mListener = aListener;
}

Randell Jesup
committed
auto cleanup = MakeScopeExit([self = RefPtr<WebTransportSessionProxy>(this)] {
MutexAutoLock lock(self->mMutex);
self->mListener->OnSessionClosed(0, ""_ns); // TODO: find a better error.
self->mChannel = nullptr;
self->mListener = nullptr;
self->ChangeState(WebTransportSessionProxyState::DONE);
});

Dragana Damjanovic
committed
nsSecurityFlags flags = nsILoadInfo::SEC_COOKIES_OMIT | aSecurityFlags;
nsLoadFlags loadFlags = nsIRequest::LOAD_NORMAL |
nsIRequest::LOAD_BYPASS_CACHE |
nsIRequest::INHIBIT_CACHING;

sunil mayya
committed
nsresult rv = NS_ERROR_FAILURE;
if (aClientInfo.isSome()) {
rv = NS_NewChannel(getter_AddRefs(mChannel), aURI, aPrincipal,
aClientInfo.ref(), Maybe<dom::ServiceWorkerDescriptor>(),
flags, nsContentPolicyType::TYPE_WEB_TRANSPORT,
/* aCookieJarSettings */ nullptr,
/* aPerformanceStorage */ nullptr,
/* aLoadGroup */ nullptr,
/* aCallbacks */ this, loadFlags);
} else {
rv = NS_NewChannel(getter_AddRefs(mChannel), aURI, aPrincipal, flags,
nsContentPolicyType::TYPE_WEB_TRANSPORT,
/* aCookieJarSettings */ nullptr,
/* aPerformanceStorage */ nullptr,
/* aLoadGroup */ nullptr,
/* aCallbacks */ this, loadFlags);
}

Dragana Damjanovic
committed
NS_ENSURE_SUCCESS(rv, rv);
// configure HTTP specific stuff
nsCOMPtr<nsIHttpChannel> httpChannel = do_QueryInterface(mChannel);
if (!httpChannel) {
mChannel = nullptr;
return NS_ERROR_ABORT;
}
{
MutexAutoLock lock(mMutex);
ChangeState(WebTransportSessionProxyState::NEGOTIATING);
}

Kershaw Chang
committed
// https://www.ietf.org/archive/id/draft-ietf-webtrans-http3-04.html#section-6
rv = httpChannel->SetRequestHeader("Sec-Webtransport-Http3-Draft02"_ns,
"1"_ns, false);
if (NS_FAILED(rv)) {
return rv;
}

sunil mayya
committed
// To establish a WebTransport session with an origin origin, follow
// [WEB-TRANSPORT-HTTP3] section 3.3, with using origin, serialized and
// isomorphic encoded, as the `Origin` header of the request.
// https://www.w3.org/TR/webtransport/#protocol-concepts
nsAutoCString serializedOrigin;
if (NS_FAILED(aPrincipal->GetAsciiOrigin(serializedOrigin))) {
// origin/URI will be missing for system principals
// assign null origin
serializedOrigin = "null"_ns;
}
rv = httpChannel->SetRequestHeader("Origin"_ns, serializedOrigin, false);
if (NS_FAILED(rv)) {
return rv;
}

Kershaw Chang
committed
nsCOMPtr<nsIHttpChannelInternal> internalChannel =
do_QueryInterface(mChannel);
if (!internalChannel) {
mChannel = nullptr;
return NS_ERROR_ABORT;
}
Unused << internalChannel->SetWebTransportSessionEventListener(this);

Dragana Damjanovic
committed
rv = mChannel->AsyncOpen(this);

Randell Jesup
committed
if (NS_SUCCEEDED(rv)) {
cleanup.release();

Dragana Damjanovic
committed
}
return rv;
}

Kershaw Chang
committed
NS_IMETHODIMP
WebTransportSessionProxy::RetargetTo(nsIEventTarget* aTarget) {

Kershaw Chang
committed
if (!aTarget) {
return NS_ERROR_INVALID_ARG;
}

Kershaw Chang
committed
{
MutexAutoLock lock(mMutex);
LOG(("WebTransportSessionProxy::RetargetTo mState=%d", mState));
// RetargetTo should be only called after the session is ready.
if (mState != WebTransportSessionProxyState::ACTIVE) {
return NS_ERROR_UNEXPECTED;
}

Kershaw Chang
committed
mTarget = aTarget;

Kershaw Chang
committed
}
return NS_OK;
}

Dragana Damjanovic
committed
NS_IMETHODIMP
WebTransportSessionProxy::GetStats() { return NS_ERROR_NOT_IMPLEMENTED; }
NS_IMETHODIMP
WebTransportSessionProxy::CloseSession(uint32_t status,
const nsACString& reason) {
MutexAutoLock lock(mMutex);

Kershaw Chang
committed
MOZ_ASSERT(mTarget->IsOnCurrentThread());

Dragana Damjanovic
committed
mCloseStatus = status;
mReason = reason;
mListener = nullptr;
mPendingEvents.Clear();

Dragana Damjanovic
committed
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
switch (mState) {
case WebTransportSessionProxyState::INIT:
case WebTransportSessionProxyState::DONE:
return NS_ERROR_NOT_INITIALIZED;
case WebTransportSessionProxyState::NEGOTIATING:
mChannel->Cancel(NS_ERROR_ABORT);
mChannel = nullptr;
ChangeState(WebTransportSessionProxyState::DONE);
break;
case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED:
mChannel->Cancel(NS_ERROR_ABORT);
mChannel = nullptr;
ChangeState(WebTransportSessionProxyState::SESSION_CLOSE_PENDING);
CloseSessionInternal();
break;
case WebTransportSessionProxyState::ACTIVE:
ChangeState(WebTransportSessionProxyState::SESSION_CLOSE_PENDING);
CloseSessionInternal();
break;
case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING:
ChangeState(WebTransportSessionProxyState::DONE);
break;
case SESSION_CLOSE_PENDING:
break;
}
return NS_OK;
}

Kershaw Chang
committed
void WebTransportSessionProxy::CloseSessionInternalLocked() {
MutexAutoLock lock(mMutex);
CloseSessionInternal();
}

Dragana Damjanovic
committed
void WebTransportSessionProxy::CloseSessionInternal() {
if (!OnSocketThread()) {
mMutex.AssertCurrentThreadOwns();
RefPtr<WebTransportSessionProxy> self(this);
Unused << gSocketTransportService->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::CallCloseWebTransportSession",

Kershaw Chang
committed
[self{std::move(self)}]() { self->CloseSessionInternalLocked(); }));

Dragana Damjanovic
committed
return;
}

Kershaw Chang
committed
mMutex.AssertCurrentThreadOwns();

Dragana Damjanovic
committed
RefPtr<Http3WebTransportSession> wt;
uint32_t closeStatus = 0;
nsCString reason;

Kershaw Chang
committed
if (mState == WebTransportSessionProxyState::SESSION_CLOSE_PENDING) {
MOZ_ASSERT(mWebTransportSession);
wt = mWebTransportSession;
mWebTransportSession = nullptr;
closeStatus = mCloseStatus;
reason = mReason;
ChangeState(WebTransportSessionProxyState::DONE);
} else {
MOZ_ASSERT(mState == WebTransportSessionProxyState::DONE);

Dragana Damjanovic
committed
}

Kershaw Chang
committed

Dragana Damjanovic
committed
if (wt) {

Kershaw Chang
committed
MutexAutoUnlock unlock(mMutex);

Dragana Damjanovic
committed
wt->CloseSession(closeStatus, reason);
}
}

Kershaw Chang
committed
class WebTransportStreamCallbackWrapper final {
public:
NS_INLINE_DECL_THREADSAFE_REFCOUNTING(WebTransportStreamCallbackWrapper)
explicit WebTransportStreamCallbackWrapper(
nsIWebTransportStreamCallback* aCallback, bool aBidi)
: mCallback(aCallback),
mTarget(GetCurrentSerialEventTarget()),
mBidi(aBidi) {}

Kershaw Chang
committed
void CallOnError(nsresult aError) {
if (!mTarget->IsOnCurrentThread()) {
RefPtr<WebTransportStreamCallbackWrapper> self(this);
Unused << mTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportStreamCallbackWrapper::CallOnError",
[self{std::move(self)}, error{aError}]() {
self->CallOnError(error);
}));
return;
}
LOG(("WebTransportStreamCallbackWrapper::OnError aError=0x%" PRIx32,

Paul Adenot
committed
static_cast<uint32_t>(aError)));

Kershaw Chang
committed
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
Unused << mCallback->OnError(nsIWebTransport::INVALID_STATE_ERROR);
}
void CallOnStreamReady(WebTransportStreamProxy* aStream) {
if (!mTarget->IsOnCurrentThread()) {
RefPtr<WebTransportStreamCallbackWrapper> self(this);
RefPtr<WebTransportStreamProxy> stream = aStream;
Unused << mTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportStreamCallbackWrapper::CallOnStreamReady",
[self{std::move(self)}, stream{std::move(stream)}]() {
self->CallOnStreamReady(stream);
}));
return;
}
if (mBidi) {
Unused << mCallback->OnBidirectionalStreamReady(aStream);
return;
}
Unused << mCallback->OnUnidirectionalStreamReady(aStream);
}
private:
virtual ~WebTransportStreamCallbackWrapper() {
NS_ProxyRelease(
"WebTransportStreamCallbackWrapper::~WebTransportStreamCallbackWrapper",
mTarget, mCallback.forget());
}
nsCOMPtr<nsIWebTransportStreamCallback> mCallback;
nsCOMPtr<nsIEventTarget> mTarget;
bool mBidi = false;
};
void WebTransportSessionProxy::CreateStreamInternal(
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
nsIWebTransportStreamCallback* callback, bool aBidi) {
mMutex.AssertCurrentThreadOwns();
LOG(
("WebTransportSessionProxy::CreateStreamInternal %p "
"mState=%d, bidi=%d",
this, mState, aBidi));
switch (mState) {
case WebTransportSessionProxyState::INIT:
case WebTransportSessionProxyState::NEGOTIATING:
case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED:
case WebTransportSessionProxyState::ACTIVE: {
RefPtr<WebTransportStreamCallbackWrapper> wrapper =
new WebTransportStreamCallbackWrapper(callback, aBidi);
if (mState == WebTransportSessionProxyState::ACTIVE &&
mWebTransportSession) {
DoCreateStream(wrapper, mWebTransportSession, aBidi);
} else {
LOG(
("WebTransportSessionProxy::CreateStreamInternal %p "
" queue create stream event",
this));
auto task = [self = RefPtr{this}, wrapper{std::move(wrapper)},
bidi(aBidi)](nsresult aStatus) {
if (NS_FAILED(aStatus)) {
wrapper->CallOnError(aStatus);
return;
}
self->DoCreateStream(wrapper, nullptr, bidi);
};
// TODO: we should do this properly in bug 1830362.
mPendingCreateStreamEvents.AppendElement(std::move(task));
}
} break;
case WebTransportSessionProxyState::SESSION_CLOSE_PENDING:
case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING:
case WebTransportSessionProxyState::DONE: {
nsCOMPtr<nsIWebTransportStreamCallback> cb(callback);
NS_DispatchToCurrentThread(NS_NewRunnableFunction(

Randell Jesup
committed
"WebTransportSessionProxy::CreateStreamInternal",
[cb{std::move(cb)}]() {
cb->OnError(nsIWebTransport::INVALID_STATE_ERROR);
}));
} break;
}
}
void WebTransportSessionProxy::DoCreateStream(
WebTransportStreamCallbackWrapper* aCallback,
Http3WebTransportSession* aSession, bool aBidi) {

Kershaw Chang
committed
if (!OnSocketThread()) {
RefPtr<WebTransportSessionProxy> self(this);
RefPtr<WebTransportStreamCallbackWrapper> wrapper(aCallback);
Unused << gSocketTransportService->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::DoCreateStream",

Kershaw Chang
committed
[self{std::move(self)}, wrapper{std::move(wrapper)}, bidi(aBidi)]() {
self->DoCreateStream(wrapper, nullptr, bidi);

Kershaw Chang
committed
}));
return;
}
LOG(("WebTransportSessionProxy::DoCreateStream %p bidi=%d", this, aBidi));

Kershaw Chang
committed
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
RefPtr<Http3WebTransportSession> session = aSession;
// Having no session here means that this is called by dispatching tasks.
// The mState may be already changed, so we need to check it again.
if (!aSession) {
MutexAutoLock lock(mMutex);
switch (mState) {
case WebTransportSessionProxyState::INIT:
case WebTransportSessionProxyState::NEGOTIATING:
case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED:
MOZ_ASSERT(false, "DoCreateStream called with invalid state");
aCallback->CallOnError(NS_ERROR_UNEXPECTED);
return;
case WebTransportSessionProxyState::ACTIVE: {
session = mWebTransportSession;
} break;
case WebTransportSessionProxyState::SESSION_CLOSE_PENDING:
case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING:
case WebTransportSessionProxyState::DONE:
// Session is going to be closed.
aCallback->CallOnError(NS_ERROR_NOT_AVAILABLE);
return;
}
}
if (!session) {
MOZ_ASSERT_UNREACHABLE("This should not happen");
aCallback->CallOnError(NS_ERROR_UNEXPECTED);
return;
}

Kershaw Chang
committed
RefPtr<WebTransportStreamCallbackWrapper> wrapper(aCallback);
auto callback =
[wrapper{std::move(wrapper)}](
Result<RefPtr<Http3WebTransportStream>, nsresult>&& aResult) {
if (aResult.isErr()) {
wrapper->CallOnError(aResult.unwrapErr());
return;
}
RefPtr<Http3WebTransportStream> stream = aResult.unwrap();
RefPtr<WebTransportStreamProxy> streamProxy =
new WebTransportStreamProxy(stream);
wrapper->CallOnStreamReady(streamProxy);
};
if (aBidi) {
session->CreateOutgoingBidirectionalStream(std::move(callback));

Kershaw Chang
committed
} else {
session->CreateOutgoingUnidirectionalStream(std::move(callback));

Kershaw Chang
committed
}
}

Dragana Damjanovic
committed
NS_IMETHODIMP
WebTransportSessionProxy::CreateOutgoingUnidirectionalStream(
nsIWebTransportStreamCallback* callback) {

Kershaw Chang
committed
if (!callback) {
return NS_ERROR_INVALID_ARG;
}
MutexAutoLock lock(mMutex);
CreateStreamInternal(callback, false);

Kershaw Chang
committed
return NS_OK;

Dragana Damjanovic
committed
}
NS_IMETHODIMP
WebTransportSessionProxy::CreateOutgoingBidirectionalStream(
nsIWebTransportStreamCallback* callback) {

Kershaw Chang
committed
if (!callback) {
return NS_ERROR_INVALID_ARG;
}
MutexAutoLock lock(mMutex);
CreateStreamInternal(callback, true);

Kershaw Chang
committed
return NS_OK;

Dragana Damjanovic
committed
}
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
void WebTransportSessionProxy::SendDatagramInternal(
const RefPtr<Http3WebTransportSession>& aSession, nsTArray<uint8_t>&& aData,
uint64_t aTrackingId) {
MOZ_ASSERT(OnSocketThread());
aSession->SendDatagram(std::move(aData), aTrackingId);
}
NS_IMETHODIMP
WebTransportSessionProxy::SendDatagram(const nsTArray<uint8_t>& aData,
uint64_t aTrackingId) {
RefPtr<Http3WebTransportSession> session;
{
MutexAutoLock lock(mMutex);
if (mState != WebTransportSessionProxyState::ACTIVE ||
!mWebTransportSession) {
return NS_ERROR_NOT_AVAILABLE;
}
session = mWebTransportSession;
}
nsTArray<uint8_t> copied;
copied.Assign(aData);
if (!OnSocketThread()) {
return gSocketTransportService->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::SendDatagramInternal",
[self = RefPtr{this}, session{std::move(session)},
data{std::move(copied)}, trackingId(aTrackingId)]() mutable {
self->SendDatagramInternal(session, std::move(data), trackingId);
}));
}
SendDatagramInternal(session, std::move(copied), aTrackingId);
return NS_OK;
}
void WebTransportSessionProxy::GetMaxDatagramSizeInternal(
const RefPtr<Http3WebTransportSession>& aSession) {
MOZ_ASSERT(OnSocketThread());
aSession->GetMaxDatagramSize();
}
NS_IMETHODIMP
WebTransportSessionProxy::GetMaxDatagramSize() {
RefPtr<Http3WebTransportSession> session;
{
MutexAutoLock lock(mMutex);
if (mState != WebTransportSessionProxyState::ACTIVE ||
!mWebTransportSession) {
return NS_ERROR_NOT_AVAILABLE;
}
session = mWebTransportSession;
}
if (!OnSocketThread()) {
return gSocketTransportService->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::GetMaxDatagramSizeInternal",
[self = RefPtr{this}, session{std::move(session)}]() {
self->GetMaxDatagramSizeInternal(session);
}));
}
GetMaxDatagramSizeInternal(session);
return NS_OK;
}

Dragana Damjanovic
committed
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
//-----------------------------------------------------------------------------
// WebTransportSessionProxy::nsIStreamListener
//-----------------------------------------------------------------------------
NS_IMETHODIMP
WebTransportSessionProxy::OnStartRequest(nsIRequest* aRequest) {
MOZ_ASSERT(NS_IsMainThread());
LOG(("WebTransportSessionProxy::OnStartRequest\n"));
nsCOMPtr<WebTransportSessionEventListener> listener;
nsAutoCString reason;
uint32_t closeStatus = 0;
{
MutexAutoLock lock(mMutex);
switch (mState) {
case WebTransportSessionProxyState::INIT:
case WebTransportSessionProxyState::DONE:
case WebTransportSessionProxyState::ACTIVE:
case WebTransportSessionProxyState::SESSION_CLOSE_PENDING:
MOZ_ASSERT(false, "OnStartRequest cannot be called in this state.");
break;
case WebTransportSessionProxyState::NEGOTIATING:
case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING:
listener = mListener;
mListener = nullptr;
mChannel = nullptr;
reason = mReason;
closeStatus = mCloseStatus;
ChangeState(WebTransportSessionProxyState::DONE);
break;
case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED: {
uint32_t status;
nsCOMPtr<nsIHttpChannel> httpChannel = do_QueryInterface(mChannel);
if (!httpChannel ||
NS_FAILED(httpChannel->GetResponseStatus(&status)) ||

Kershaw Chang
committed
!(status >= 200 && status < 300)) {

Dragana Damjanovic
committed
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
listener = mListener;
mListener = nullptr;
mChannel = nullptr;
mReason = ""_ns;
reason = ""_ns;
mCloseStatus =
0; // TODO: find a better error. Currently error code 0 is used
ChangeState(WebTransportSessionProxyState::SESSION_CLOSE_PENDING);
CloseSessionInternal(); // TODO: find a better error. Currently error
// code 0 is used.
}
// The success cases will be handled in OnStopRequest.
} break;
}
}
if (listener) {
listener->OnSessionClosed(closeStatus, reason);
}
return NS_OK;
}
NS_IMETHODIMP
WebTransportSessionProxy::OnDataAvailable(nsIRequest* aRequest,
nsIInputStream* aStream,
uint64_t aOffset, uint32_t aCount) {
MOZ_ASSERT(NS_IsMainThread());
MOZ_RELEASE_ASSERT(
false, "WebTransportSessionProxy::OnDataAvailable should not be called");
return NS_OK;
}
NS_IMETHODIMP
WebTransportSessionProxy::OnStopRequest(nsIRequest* aRequest,
nsresult aStatus) {
MOZ_ASSERT(NS_IsMainThread());
mChannel = nullptr;
nsCOMPtr<WebTransportSessionEventListener> listener;
nsAutoCString reason;
uint32_t closeStatus = 0;
uint64_t sessionId;
bool succeeded = false;
nsTArray<std::function<void()>> pendingEvents;
nsTArray<std::function<void(nsresult)>> pendingCreateStreamEvents;

Dragana Damjanovic
committed
{
MutexAutoLock lock(mMutex);
switch (mState) {
case WebTransportSessionProxyState::INIT:
case WebTransportSessionProxyState::ACTIVE:
case WebTransportSessionProxyState::NEGOTIATING:

Randell Jesup
committed
MOZ_ASSERT(false, "OnStopRequest cannot be called in this state.");

Dragana Damjanovic
committed
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
break;
case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING:
reason = mReason;
closeStatus = mCloseStatus;
listener = mListener;
mListener = nullptr;
ChangeState(WebTransportSessionProxyState::DONE);
break;
case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED:
if (NS_FAILED(aStatus)) {
listener = mListener;
mListener = nullptr;
mReason = ""_ns;
reason = ""_ns;
mCloseStatus = 0;
ChangeState(WebTransportSessionProxyState::SESSION_CLOSE_PENDING);
CloseSessionInternal(); // TODO: find a better error. Currently error
// code 0 is used.
} else {
succeeded = true;
sessionId = mSessionId;
listener = mListener;
ChangeState(WebTransportSessionProxyState::ACTIVE);
}
break;
case WebTransportSessionProxyState::SESSION_CLOSE_PENDING:
case WebTransportSessionProxyState::DONE:
break;
}
pendingEvents = std::move(mPendingEvents);
pendingCreateStreamEvents = std::move(mPendingCreateStreamEvents);

Randell Jesup
committed
if (!pendingCreateStreamEvents.IsEmpty()) {
if (NS_SUCCEEDED(aStatus) &&
(mState == WebTransportSessionProxyState::DONE ||

Kershaw Chang
committed
mState == WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING ||

Randell Jesup
committed
mState == WebTransportSessionProxyState::SESSION_CLOSE_PENDING)) {
aStatus = NS_ERROR_FAILURE;
}
}
mStopRequestCalled = true;

Dragana Damjanovic
committed
}
if (!pendingCreateStreamEvents.IsEmpty()) {
Unused << gSocketTransportService->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::DispatchPendingCreateStreamEvents",
[pendingCreateStreamEvents = std::move(pendingCreateStreamEvents),
status(aStatus)]() {
for (const auto& event : pendingCreateStreamEvents) {
event(status);
}
}));

Randell Jesup
committed
} // otherwise let the CreateStreams just go away

Dragana Damjanovic
committed
if (listener) {
if (succeeded) {
listener->OnSessionReady(sessionId);
if (!pendingEvents.IsEmpty()) {

Kershaw Chang
committed
Unused << gSocketTransportService->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::DispatchPendingEvents",
[pendingEvents = std::move(pendingEvents)]() {
for (const auto& event : pendingEvents) {
event();
}
}));

Dragana Damjanovic
committed
} else {
listener->OnSessionClosed(closeStatus,
reason); // TODO: find a better error.
// Currently error code 0 is used.
}
}
return NS_OK;
}
//-----------------------------------------------------------------------------
// WebTransportSessionProxy::nsIChannelEventSink
//-----------------------------------------------------------------------------
NS_IMETHODIMP
WebTransportSessionProxy::AsyncOnChannelRedirect(
nsIChannel* aOldChannel, nsIChannel* aNewChannel, uint32_t aFlags,
nsIAsyncVerifyRedirectCallback* callback) {
// Currently implementation we do not reach this part of the code
// as location headers are not forwarded by the http3 stack to the applicaion.
// Hence, the channel is aborted due to the location header check in
// nsHttpChannel::AsyncProcessRedirection This comment must be removed after
// the following neqo bug is resolved
// https://github.com/mozilla/neqo/issues/1364
if (!StaticPrefs::network_webtransport_redirect_enabled()) {
LOG(("Channel Redirects are disabled for WebTransport sessions"));
return NS_ERROR_ABORT;
}

Dragana Damjanovic
committed
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
nsCOMPtr<nsIURI> newURI;
nsresult rv = NS_GetFinalChannelURI(aNewChannel, getter_AddRefs(newURI));
NS_ENSURE_SUCCESS(rv, rv);
rv = aNewChannel->GetURI(getter_AddRefs(newURI));
if (NS_FAILED(rv)) {
callback->OnRedirectVerifyCallback(rv);
return NS_OK;
}
// abort the request if redirecting to insecure context
if (!newURI->SchemeIs("https")) {
callback->OnRedirectVerifyCallback(NS_ERROR_ABORT);
return NS_OK;
}
// Assign to mChannel after we get notification about success of the
// redirect in OnRedirectResult.
mRedirectChannel = aNewChannel;
callback->OnRedirectVerifyCallback(NS_OK);
return NS_OK;
}
//-----------------------------------------------------------------------------
// WebTransportSessionProxy::nsIRedirectResultListener
//-----------------------------------------------------------------------------
NS_IMETHODIMP

valenting
committed
WebTransportSessionProxy::OnRedirectResult(nsresult aStatus) {
if (NS_SUCCEEDED(aStatus) && mRedirectChannel) {

Dragana Damjanovic
committed
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
mChannel = mRedirectChannel;
}
mRedirectChannel = nullptr;
return NS_OK;
}
//-----------------------------------------------------------------------------
// WebTransportSessionProxy::nsIInterfaceRequestor
//-----------------------------------------------------------------------------
NS_IMETHODIMP
WebTransportSessionProxy::GetInterface(const nsIID& aIID, void** aResult) {
if (aIID.Equals(NS_GET_IID(nsIChannelEventSink))) {
NS_ADDREF_THIS();
*aResult = static_cast<nsIChannelEventSink*>(this);
return NS_OK;
}
if (aIID.Equals(NS_GET_IID(nsIRedirectResultListener))) {
NS_ADDREF_THIS();
*aResult = static_cast<nsIRedirectResultListener*>(this);
return NS_OK;
}
return NS_ERROR_NO_INTERFACE;
}
//-----------------------------------------------------------------------------
// WebTransportSessionProxy::WebTransportSessionEventListener
//-----------------------------------------------------------------------------
// This function is called when the Http3WebTransportSession is ready. After
// this call WebTransportSessionProxy is responsible for the
// Http3WebTransportSession, i.e. it is responsible for closing it.
// The listener of the WebTransportSessionProxy will be informed during
// OnStopRequest call.
NS_IMETHODIMP
WebTransportSessionProxy::OnSessionReadyInternal(
Http3WebTransportSession* aSession) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
LOG(("WebTransportSessionProxy::OnSessionReadyInternal"));
MutexAutoLock lock(mMutex);
switch (mState) {
case WebTransportSessionProxyState::INIT:
case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING:
case WebTransportSessionProxyState::ACTIVE:
case WebTransportSessionProxyState::SESSION_CLOSE_PENDING:
case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED:
MOZ_ASSERT(false,
"OnSessionReadyInternal cannot be called in this state.");
return NS_ERROR_ABORT;
case WebTransportSessionProxyState::NEGOTIATING:
mWebTransportSession = aSession;
mSessionId = aSession->StreamId();
ChangeState(WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED);
break;
case WebTransportSessionProxyState::DONE:
// The session has been canceled. We do not need to set
// mWebTransportSession.
break;
}
return NS_OK;
}

Kershaw Chang
committed
NS_IMETHODIMP
WebTransportSessionProxy::OnIncomingStreamAvailableInternal(
Http3WebTransportStream* aStream) {
nsCOMPtr<WebTransportSessionEventListener> listener;
{
MutexAutoLock lock(mMutex);

Kershaw Chang
committed
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
LOG(
("WebTransportSessionProxy::OnIncomingStreamAvailableInternal %p "
"mState=%d "
"mStopRequestCalled=%d",
this, mState, mStopRequestCalled));
// Since OnSessionReady on the listener is called on the main thread,
// OnIncomingStreamAvailableInternal and OnSessionReady can be racy. If
// OnStopRequest is not called yet, OnIncomingStreamAvailableInternal needs
// to wait.
if (!mStopRequestCalled) {
mPendingEvents.AppendElement(
[self = RefPtr{this}, stream = RefPtr{aStream}]() {
self->OnIncomingStreamAvailableInternal(stream);
});
return NS_OK;
}
if (!mTarget->IsOnCurrentThread()) {
RefPtr<WebTransportSessionProxy> self(this);
RefPtr<Http3WebTransportStream> stream = aStream;
Unused << mTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::OnIncomingStreamAvailableInternal",
[self{std::move(self)}, stream{std::move(stream)}]() {
self->OnIncomingStreamAvailableInternal(stream);
}));
return NS_OK;
}
LOG(
("WebTransportSessionProxy::OnIncomingStreamAvailableInternal %p "
"mState=%d mListener=%p",
this, mState, mListener.get()));

Kershaw Chang
committed
if (mState == WebTransportSessionProxyState::ACTIVE) {
listener = mListener;
}
}
if (!listener) {
// Session can be already closed.

Kershaw Chang
committed
return NS_OK;
}
RefPtr<WebTransportStreamProxy> streamProxy =
new WebTransportStreamProxy(aStream);
if (aStream->StreamType() == WebTransportStreamType::BiDi) {
Unused << listener->OnIncomingBidirectionalStreamAvailable(streamProxy);

Kershaw Chang
committed
} else {
Unused << listener->OnIncomingUnidirectionalStreamAvailable(streamProxy);

Kershaw Chang
committed
}
return NS_OK;
}
NS_IMETHODIMP
WebTransportSessionProxy::OnIncomingBidirectionalStreamAvailable(
nsIWebTransportBidirectionalStream* aStream) {
return NS_OK;
}
NS_IMETHODIMP
WebTransportSessionProxy::OnIncomingUnidirectionalStreamAvailable(
nsIWebTransportReceiveStream* aStream) {
return NS_OK;
}

Dragana Damjanovic
committed
NS_IMETHODIMP
WebTransportSessionProxy::OnSessionReady(uint64_t ready) {
MOZ_ASSERT(false, "Should not b called");
return NS_OK;
}
NS_IMETHODIMP
WebTransportSessionProxy::OnSessionClosed(uint32_t status,
const nsACString& reason) {
MOZ_ASSERT(OnSocketThread(), "not on socket thread");
MutexAutoLock lock(mMutex);

Kershaw Chang
committed
LOG(
("WebTransportSessionProxy::OnSessionClosed %p mState=%d "
"mStopRequestCalled=%d",
this, mState, mStopRequestCalled));
// Since OnSessionReady on the listener is called on the main thread,
// OnSessionClosed and OnSessionReady can be racy. If OnStopRequest is not
// called yet, OnSessionClosed needs to wait.
if (!mStopRequestCalled) {
nsCString closeReason(reason);
mPendingEvents.AppendElement(
[self = RefPtr{this}, status(status), closeReason(closeReason)]() {
Unused << self->OnSessionClosed(status, closeReason);
});
return NS_OK;
}

Dragana Damjanovic
committed
switch (mState) {
case WebTransportSessionProxyState::INIT:
case WebTransportSessionProxyState::NEGOTIATING:
case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING:
MOZ_ASSERT(false, "OnSessionClosed cannot be called in this state.");
return NS_ERROR_ABORT;
case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED:
case WebTransportSessionProxyState::ACTIVE: {
mCloseStatus = status;
mReason = reason;
mWebTransportSession = nullptr;
ChangeState(WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING);

Kershaw Chang
committed
CallOnSessionClosed();

Dragana Damjanovic
committed
} break;
case WebTransportSessionProxyState::SESSION_CLOSE_PENDING:
ChangeState(WebTransportSessionProxyState::DONE);
break;
case WebTransportSessionProxyState::DONE:
// The session has been canceled. We do not need to set
// mWebTransportSession.
break;
}
return NS_OK;
}

Kershaw Chang
committed
void WebTransportSessionProxy::CallOnSessionClosedLocked() {
MutexAutoLock lock(mMutex);
CallOnSessionClosed();
}

Dragana Damjanovic
committed
void WebTransportSessionProxy::CallOnSessionClosed() {

Kershaw Chang
committed
mMutex.AssertCurrentThreadOwns();

Kershaw Chang
committed
if (!mTarget->IsOnCurrentThread()) {
RefPtr<WebTransportSessionProxy> self(this);
Unused << mTarget->Dispatch(NS_NewRunnableFunction(
"WebTransportSessionProxy::CallOnSessionClosed",
[self{std::move(self)}]() { self->CallOnSessionClosedLocked(); }));
return;
}
MOZ_ASSERT(mTarget->IsOnCurrentThread());

Dragana Damjanovic
committed
nsCOMPtr<WebTransportSessionEventListener> listener;
nsAutoCString reason;
uint32_t closeStatus = 0;

Kershaw Chang
committed
switch (mState) {
case WebTransportSessionProxyState::INIT:
case WebTransportSessionProxyState::NEGOTIATING:
case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED:
case WebTransportSessionProxyState::ACTIVE:
case WebTransportSessionProxyState::SESSION_CLOSE_PENDING:
MOZ_ASSERT(false, "CallOnSessionClosed cannot be called in this state.");
break;
case WebTransportSessionProxyState::CLOSE_CALLBACK_PENDING:
listener = mListener;
mListener = nullptr;
reason = mReason;
closeStatus = mCloseStatus;
ChangeState(WebTransportSessionProxyState::DONE);
break;
case WebTransportSessionProxyState::DONE:
break;

Dragana Damjanovic
committed
}

Kershaw Chang
committed

Dragana Damjanovic
committed
if (listener) {

Kershaw Chang
committed
// Don't invoke the callback under the lock.
MutexAutoUnlock unlock(mMutex);

Dragana Damjanovic
committed
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
listener->OnSessionClosed(closeStatus, reason);
}
}
void WebTransportSessionProxy::ChangeState(
WebTransportSessionProxyState newState) {
mMutex.AssertCurrentThreadOwns();
LOG(("WebTransportSessionProxy::ChangeState %d -> %d [this=%p]", mState,
newState, this));
switch (newState) {
case WebTransportSessionProxyState::INIT:
MOZ_ASSERT(false, "Cannot change into INIT sate.");
break;
case WebTransportSessionProxyState::NEGOTIATING:
MOZ_ASSERT(mState == WebTransportSessionProxyState::INIT,
"Only from INIT can be change into NEGOTIATING");
MOZ_ASSERT(mChannel);
MOZ_ASSERT(mListener);
break;
case WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED:
MOZ_ASSERT(
mState == WebTransportSessionProxyState::NEGOTIATING,
"Only from NEGOTIATING can be change into NEGOTIATING_SUCCEEDED");
MOZ_ASSERT(mChannel);
MOZ_ASSERT(mWebTransportSession);
MOZ_ASSERT(mListener);
break;
case WebTransportSessionProxyState::ACTIVE:
MOZ_ASSERT(mState == WebTransportSessionProxyState::NEGOTIATING_SUCCEEDED,
"Only from NEGOTIATING_SUCCEEDED can be change into ACTIVE");
MOZ_ASSERT(!mChannel);
MOZ_ASSERT(mWebTransportSession);