Skip to content
Snippets Groups Projects
Commit aedabb7a authored by Kagami Sascha Rosylight's avatar Kagami Sascha Rosylight
Browse files

Bug 1816718 - Part 1: Assert that BodyStream is only touched by the owning thread r=smaug

This removes mutex and locks from BodyStream.

Differential Revision: https://phabricator.services.mozilla.com/D170089
parent f6bc7267
No related branches found
No related tags found
No related merge requests found
......@@ -61,6 +61,11 @@ void BodyStreamHolder::StoreBodyStream(BodyStream* aBodyStream) {
}
// BodyStream
//
// This class has thread ownership assertions everywhere because historically it
// could be touched by random threads. See also:
// https://searchfox.org/mozilla-esr60/rev/02b4ae79b24aae2346b1338e2bf095a571192061/dom/fetch/FetchStream.cpp#302,306,312
//
// ---------------------------------------------------------------------------
class BodyStream::WorkerShutdown final : public WorkerControlRunnable {
......@@ -160,11 +165,12 @@ void BodyStream::Create(JSContext* aCx, BodyStreamHolder* aStreamHolder,
if (NS_WARN_IF(aRv.Failed())) {
return;
}
} else {
WorkerPrivate* workerPrivate = GetWorkerPrivateFromContext(aCx);
MOZ_ASSERT(workerPrivate);
workerPrivate->AssertIsOnWorkerThread();
RefPtr<StrongWorkerRef> workerRef = StrongWorkerRef::Create(
workerPrivate, "BodyStream", [stream]() { stream->Close(); });
......@@ -206,9 +212,7 @@ already_AddRefed<Promise> BodyStream::PullCallback(
MOZ_DIAGNOSTIC_ASSERT(stream->Disturbed());
AssertIsOnOwningThread();
MutexSingleWriterAutoLock lock(mMutex);
MOZ_DIAGNOSTIC_ASSERT(mOwningEventTarget->IsOnCurrentThread());
MOZ_DIAGNOSTIC_ASSERT(mState == eInitializing || mState == eWaiting ||
mState == eChecking || mState == eReading);
......@@ -249,7 +253,7 @@ already_AddRefed<Promise> BodyStream::PullCallback(
nsresult rv = NS_MakeAsyncNonBlockingInputStream(
mOriginalInputStream.forget(), getter_AddRefs(asyncStream));
if (NS_WARN_IF(NS_FAILED(rv))) {
ErrorPropagation(aCx, lock, stream, rv);
ErrorPropagation(aCx, stream, rv);
return nullptr;
}
......@@ -262,7 +266,7 @@ already_AddRefed<Promise> BodyStream::PullCallback(
nsresult rv = mInputStream->AsyncWait(this, 0, 0, mOwningEventTarget);
if (NS_WARN_IF(NS_FAILED(rv))) {
ErrorPropagation(aCx, lock, stream, rv);
ErrorPropagation(aCx, stream, rv);
return nullptr;
}
mAsyncWaitWorkerRef = mWorkerRef;
......@@ -279,9 +283,7 @@ void BodyStream::WriteIntoReadRequestBuffer(JSContext* aCx,
MOZ_DIAGNOSTIC_ASSERT(aBuffer);
MOZ_DIAGNOSTIC_ASSERT(aByteWritten);
AssertIsOnOwningThread();
MutexSingleWriterAutoLock lock(mMutex);
MOZ_DIAGNOSTIC_ASSERT(mOwningEventTarget->IsOnCurrentThread());
MOZ_DIAGNOSTIC_ASSERT(mInputStream);
MOZ_DIAGNOSTIC_ASSERT(mState == eWriting);
......@@ -306,7 +308,7 @@ void BodyStream::WriteIntoReadRequestBuffer(JSContext* aCx,
rv = mInputStream->Read(static_cast<char*>(buffer), aLength, &written);
if (NS_WARN_IF(NS_FAILED(rv))) {
ErrorPropagation(aCx, lock, aStream, rv);
ErrorPropagation(aCx, aStream, rv);
return;
}
}
......@@ -314,13 +316,13 @@ void BodyStream::WriteIntoReadRequestBuffer(JSContext* aCx,
*aByteWritten = written;
if (written == 0) {
CloseAndReleaseObjects(aCx, lock, aStream);
CloseAndReleaseObjects(aCx, aStream);
return;
}
rv = mInputStream->AsyncWait(this, 0, 0, mOwningEventTarget);
if (NS_WARN_IF(NS_FAILED(rv))) {
ErrorPropagation(aCx, lock, aStream, rv);
ErrorPropagation(aCx, aStream, rv);
return;
}
mAsyncWaitWorkerRef = mWorkerRef;
......@@ -329,7 +331,7 @@ void BodyStream::WriteIntoReadRequestBuffer(JSContext* aCx,
}
void BodyStream::CloseInputAndReleaseObjects() {
mMutex.AssertOnWritingThread();
MOZ_DIAGNOSTIC_ASSERT(mOwningEventTarget->IsOnCurrentThread());
if (mState == eInitializing) {
// The stream has been used for the first time.
......@@ -353,8 +355,7 @@ void BodyStream::CloseInputAndReleaseObjects() {
BodyStream::BodyStream(nsIGlobalObject* aGlobal,
BodyStreamHolder* aStreamHolder,
nsIInputStream* aInputStream)
: mMutex("BodyStream::mMutex", this),
mState(eInitializing),
: mState(eInitializing),
mGlobal(aGlobal),
mStreamHolder(aStreamHolder),
mOwningEventTarget(aGlobal->EventTargetFor(TaskCategory::Other)),
......@@ -363,11 +364,13 @@ BodyStream::BodyStream(nsIGlobalObject* aGlobal,
MOZ_DIAGNOSTIC_ASSERT(aStreamHolder);
}
void BodyStream::ErrorPropagation(JSContext* aCx,
const MutexSingleWriterAutoLock& aProofOfLock,
ReadableStream* aStream, nsresult aError) {
mMutex.AssertOnWritingThread();
mMutex.AssertCurrentThreadOwns();
BodyStream::~BodyStream() {
MOZ_DIAGNOSTIC_ASSERT(mOwningEventTarget->IsOnCurrentThread());
}
void BodyStream::ErrorPropagation(JSContext* aCx, ReadableStream* aStream,
nsresult aError) {
MOZ_DIAGNOSTIC_ASSERT(mOwningEventTarget->IsOnCurrentThread());
// Nothing to do.
if (mState == eClosed) {
......@@ -376,7 +379,7 @@ void BodyStream::ErrorPropagation(JSContext* aCx,
// Let's close the stream.
if (aError == NS_BASE_STREAM_CLOSED) {
CloseAndReleaseObjects(aCx, aProofOfLock, aStream);
CloseAndReleaseObjects(aCx, aStream);
return;
}
......@@ -391,7 +394,6 @@ void BodyStream::ErrorPropagation(JSContext* aCx,
MOZ_RELEASE_ASSERT(ok, "ToJSValue never fails for ErrorResult");
{
MutexSingleWriterAutoUnlock unlock(mMutex);
// This will be ignored if it's already errored.
IgnoredErrorResult rv;
aStream->ErrorNative(aCx, errorValue, rv);
......@@ -407,7 +409,7 @@ void BodyStream::ErrorPropagation(JSContext* aCx,
mInputStream->CloseWithStatus(NS_BASE_STREAM_CLOSED);
}
ReleaseObjects(aProofOfLock);
ReleaseObjects();
}
// https://fetch.spec.whatwg.org/#concept-bodyinit-extract
......@@ -455,18 +457,12 @@ void BodyStream::EnqueueChunkWithSizeIntoStream(JSContext* aCx,
aStream->EnqueueNative(aCx, chunkValue, aRv);
}
// thread-safety doesn't handle emplace well
NS_IMETHODIMP
BodyStream::OnInputStreamReady(nsIAsyncInputStream* aStream)
MOZ_NO_THREAD_SAFETY_ANALYSIS {
AssertIsOnOwningThread();
BodyStream::OnInputStreamReady(nsIAsyncInputStream* aStream) {
MOZ_DIAGNOSTIC_ASSERT(mOwningEventTarget->IsOnCurrentThread());
MOZ_DIAGNOSTIC_ASSERT(aStream);
mAsyncWaitWorkerRef = nullptr;
// Acquire |mMutex| in order to safely inspect |mState| and use |mGlobal|.
Maybe<MutexSingleWriterAutoLock> lock;
lock.emplace(mMutex);
// Already closed. We have nothing else to do here.
if (mState == eClosed) {
return NS_OK;
......@@ -499,7 +495,7 @@ BodyStream::OnInputStreamReady(nsIAsyncInputStream* aStream)
// No warning for stream closed.
if (rv == NS_BASE_STREAM_CLOSED || NS_WARN_IF(NS_FAILED(rv))) {
ErrorPropagation(cx, *lock, stream, rv);
ErrorPropagation(cx, stream, rv);
return NS_OK;
}
......@@ -511,15 +507,11 @@ BodyStream::OnInputStreamReady(nsIAsyncInputStream* aStream)
mState = eWriting;
// Release the mutex before the call below (which could execute JS), as well
// as before the microtask checkpoint queued up above occurs.
lock.reset();
ErrorResult errorResult;
EnqueueChunkWithSizeIntoStream(cx, stream, size, errorResult);
errorResult.WouldReportJSException();
if (errorResult.Failed()) {
lock.emplace(mMutex);
ErrorPropagation(cx, *lock, stream, errorResult.StealNSResult());
ErrorPropagation(cx, stream, errorResult.StealNSResult());
return NS_OK;
}
......@@ -540,7 +532,7 @@ nsresult BodyStream::RetrieveInputStream(BodyStreamHolder* aStream,
return NS_ERROR_DOM_INVALID_STATE_ERR;
}
stream->AssertIsOnOwningThread();
MOZ_DIAGNOSTIC_ASSERT(stream->mOwningEventTarget->IsOnCurrentThread());
// if mOriginalInputStream is null, the reading already started. We don't want
// to expose the internal inputStream.
......@@ -554,9 +546,7 @@ nsresult BodyStream::RetrieveInputStream(BodyStreamHolder* aStream,
}
void BodyStream::Close() {
AssertIsOnOwningThread();
MutexSingleWriterAutoLock lock(mMutex);
MOZ_DIAGNOSTIC_ASSERT(mOwningEventTarget->IsOnCurrentThread());
if (mState == eClosed) {
return;
......@@ -564,28 +554,24 @@ void BodyStream::Close() {
AutoJSAPI jsapi;
if (NS_WARN_IF(!jsapi.Init(mGlobal))) {
ReleaseObjects(lock);
ReleaseObjects();
return;
}
ReadableStream* stream = mStreamHolder->GetReadableStreamBody();
if (stream) {
JSContext* cx = jsapi.cx();
CloseAndReleaseObjects(cx, lock, stream);
CloseAndReleaseObjects(cx, stream);
} else {
ReleaseObjects(lock);
ReleaseObjects();
}
}
void BodyStream::CloseAndReleaseObjects(
JSContext* aCx, const MutexSingleWriterAutoLock& aProofOfLock,
ReadableStream* aStream) {
AssertIsOnOwningThread();
mMutex.AssertCurrentThreadOwns();
void BodyStream::CloseAndReleaseObjects(JSContext* aCx,
ReadableStream* aStream) {
MOZ_DIAGNOSTIC_ASSERT(mOwningEventTarget->IsOnCurrentThread());
MOZ_DIAGNOSTIC_ASSERT(mState != eClosed);
ReleaseObjects(aProofOfLock);
MutexSingleWriterAutoUnlock unlock(mMutex);
ReleaseObjects();
if (aStream->State() == ReadableStream::ReaderState::Readable) {
IgnoredErrorResult rv;
......@@ -595,16 +581,13 @@ void BodyStream::CloseAndReleaseObjects(
}
void BodyStream::ReleaseObjects() {
MutexSingleWriterAutoLock lock(mMutex);
ReleaseObjects(lock);
}
void BodyStream::ReleaseObjects(const MutexSingleWriterAutoLock& aProofOfLock) {
// This method can be called on 2 possible threads: the owning one and a JS
// thread used to release resources. If we are on the JS thread, we need to
// dispatch a runnable to go back to the owning thread in order to release
// resources correctly.
MOZ_DIAGNOSTIC_ASSERT(mOwningEventTarget->IsOnCurrentThread());
if (mState == eClosed) {
// Already gone. Nothing to do.
return;
......@@ -627,8 +610,6 @@ void BodyStream::ReleaseObjects(const MutexSingleWriterAutoLock& aProofOfLock) {
return;
}
AssertIsOnOwningThread();
mState = eClosed;
if (NS_IsMainThread()) {
......@@ -658,12 +639,6 @@ void BodyStream::ReleaseObjects(const MutexSingleWriterAutoLock& aProofOfLock) {
mStreamHolder = nullptr;
}
#ifdef DEBUG
void BodyStream::AssertIsOnOwningThread() const {
NS_ASSERT_OWNINGTHREAD(BodyStream);
}
#endif
// nsIObserver
// -----------
......@@ -671,7 +646,7 @@ NS_IMETHODIMP
BodyStream::Observe(nsISupports* aSubject, const char* aTopic,
const char16_t* aData) {
AssertIsOnMainThread();
AssertIsOnOwningThread();
MOZ_DIAGNOSTIC_ASSERT(mOwningEventTarget->IsOnCurrentThread());
MOZ_ASSERT(strcmp(aTopic, DOM_WINDOW_DESTROYED_TOPIC) == 0);
......
......@@ -17,7 +17,6 @@
#include "nsISupportsImpl.h"
#include "nsNetCID.h"
#include "nsWeakReference.h"
#include "mozilla/Mutex.h"
class nsIGlobalObject;
......@@ -79,12 +78,11 @@ class BodyStreamHolder : public nsISupports {
class BodyStream final : public nsIInputStreamCallback,
public nsIObserver,
public nsSupportsWeakReference,
public SingleWriterLockOwner {
public nsSupportsWeakReference {
friend class BodyStreamHolder;
public:
NS_DECL_THREADSAFE_ISUPPORTS
NS_DECL_ISUPPORTS
NS_DECL_NSIINPUTSTREAMCALLBACK
NS_DECL_NSIOBSERVER
......@@ -97,27 +95,13 @@ class BodyStream final : public nsIInputStreamCallback,
void Close();
bool OnWritingThread() const override {
#ifdef MOZ_THREAD_SAFETY_OWNERSHIP_CHECKS_SUPPORTED
return _mOwningThread.IsCurrentThread();
#else
return true;
#endif
}
static nsresult RetrieveInputStream(BodyStreamHolder* aStream,
nsIInputStream** aInputStream);
private:
BodyStream(nsIGlobalObject* aGlobal, BodyStreamHolder* aStreamHolder,
nsIInputStream* aInputStream);
~BodyStream() = default;
#ifdef DEBUG
void AssertIsOnOwningThread() const;
#else
void AssertIsOnOwningThread() const {}
#endif
~BodyStream();
public:
// Pull Callback
......@@ -139,21 +123,15 @@ class BodyStream final : public nsIInputStreamCallback,
JSContext* aCx, ReadableStream* aStream, uint64_t aAvailableData,
ErrorResult& aRv);
void ErrorPropagation(JSContext* aCx,
const MutexSingleWriterAutoLock& aProofOfLock,
ReadableStream* aStream, nsresult aError)
MOZ_REQUIRES(mMutex);
void ErrorPropagation(JSContext* aCx, ReadableStream* aStream,
nsresult aError);
// TODO: convert this to MOZ_CAN_RUN_SCRIPT (bug 1750605)
MOZ_CAN_RUN_SCRIPT_BOUNDARY void CloseAndReleaseObjects(
JSContext* aCx, const MutexSingleWriterAutoLock& aProofOfLock,
ReadableStream* aStream) MOZ_REQUIRES(mMutex);
JSContext* aCx, ReadableStream* aStream);
class WorkerShutdown;
void ReleaseObjects(const MutexSingleWriterAutoLock& aProofOfLock)
MOZ_REQUIRES(mMutex);
void ReleaseObjects();
// Common methods
......@@ -181,18 +159,12 @@ class BodyStream final : public nsIInputStreamCallback,
eClosed,
};
// We need a mutex because JS engine can release BodyStream on a non-owning
// thread. We must be sure that the releasing of resources doesn't trigger
// race conditions.
MutexSingleWriter mMutex;
// Protected by mutex.
State mState MOZ_GUARDED_BY(mMutex); // all writes are from the owning thread
State mState;
// mGlobal is set on creation, and isn't modified off the owning thread.
// It isn't set to nullptr until ReleaseObjects() runs.
nsCOMPtr<nsIGlobalObject> mGlobal;
RefPtr<BodyStreamHolder> mStreamHolder MOZ_GUARDED_BY(mMutex);
RefPtr<BodyStreamHolder> mStreamHolder;
nsCOMPtr<nsIEventTarget> mOwningEventTarget;
// This is the original inputStream received during the CTOR. It will be
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment