Loading dom/cache/ReadStream.cpp +6 −123 Original line number Diff line number Diff line Loading @@ -13,7 +13,6 @@ #include "mozilla/ipc/IPCStreamUtils.h" #include "mozilla/SnappyUncompressInputStream.h" #include "nsIAsyncInputStream.h" #include "nsStringStream.h" #include "nsTArray.h" namespace mozilla { Loading Loading @@ -93,18 +92,6 @@ private: void ForgetOnOwningThread(); nsIInputStream* EnsureStream(); void AsyncOpenStreamOnOwningThread(); void MaybeAbortAsyncOpenStream(); void OpenStreamFailed(); // Weak ref to the stream control actor. The actor will always call either // CloseStream() or CloseStreamWithoutReporting() before it's destroyed. The // weak ref is cleared in the resulting NoteClosedOnOwningThread() or Loading @@ -122,14 +109,13 @@ private: }; Atomic<State> mState; Atomic<bool> mHasEverBeenRead; bool mAsyncOpenStarted; // The wrapped stream objects may not be threadsafe. We need to be able // to close a stream on our owning thread while an IO thread is simultaneously // reading the same stream. Therefore, protect all access to these stream // objects with a mutex. Mutex mMutex; CondVar mCondVar; nsCOMPtr<nsIInputStream> mStream; nsCOMPtr<nsIInputStream> mSnappyStream; Loading Loading @@ -216,9 +202,7 @@ ReadStream::Inner::Inner(StreamControl* aControl, const nsID& aId, , mOwningEventTarget(GetCurrentThreadSerialEventTarget()) , mState(Open) , mHasEverBeenRead(false) , mAsyncOpenStarted(false) , mMutex("dom::cache::ReadStream") , mCondVar(mMutex, "dom::cache::ReadStream") , mStream(aStream) , mSnappyStream(new SnappyUncompressInputStream(aStream)) { Loading Loading @@ -304,10 +288,8 @@ ReadStream::Inner::Close() nsresult rv = NS_OK; { MutexAutoLock lock(mMutex); if (mSnappyStream) { rv = mSnappyStream->Close(); } } NoteClosed(); return rv; } Loading @@ -319,7 +301,7 @@ ReadStream::Inner::Available(uint64_t* aNumAvailableOut) nsresult rv = NS_OK; { MutexAutoLock lock(mMutex); rv = EnsureStream()->Available(aNumAvailableOut); rv = mSnappyStream->Available(aNumAvailableOut); } if (NS_FAILED(rv)) { Loading @@ -338,7 +320,7 @@ ReadStream::Inner::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut) nsresult rv = NS_OK; { MutexAutoLock lock(mMutex); rv = EnsureStream()->Read(aBuf, aCount, aNumReadOut); rv = mSnappyStream->Read(aBuf, aCount, aNumReadOut); } if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) || Loading Loading @@ -366,7 +348,7 @@ ReadStream::Inner::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, nsresult rv = NS_OK; { MutexAutoLock lock(mMutex); rv = EnsureStream()->ReadSegments(aWriter, aClosure, aCount, aNumReadOut); rv = mSnappyStream->ReadSegments(aWriter, aClosure, aCount, aNumReadOut); } if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK && Loading @@ -390,12 +372,8 @@ ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut) { // stream ops can happen on any thread MutexAutoLock lock(mMutex); if (mSnappyStream) { return mSnappyStream->IsNonBlocking(aNonBlockingOut); } *aNonBlockingOut = false; return NS_OK; } ReadStream::Inner::~Inner() { Loading Loading @@ -450,8 +428,6 @@ ReadStream::Inner::NoteClosedOnOwningThread() return; } MaybeAbortAsyncOpenStream(); MOZ_DIAGNOSTIC_ASSERT(mControl); mControl->NoteClosed(this, mId); mControl = nullptr; Loading @@ -467,104 +443,11 @@ ReadStream::Inner::ForgetOnOwningThread() return; } MaybeAbortAsyncOpenStream(); MOZ_DIAGNOSTIC_ASSERT(mControl); mControl->ForgetReadStream(this); mControl = nullptr; } nsIInputStream* ReadStream::Inner::EnsureStream() { mMutex.AssertCurrentThreadOwns(); // We need to block the current thread while we open the stream. We // cannot do this safely from the main owning thread since it would // trigger deadlock. This should be ok, though, since a blocking // stream like this should never be read on the owning thread anyway. if (mOwningEventTarget->IsOnCurrentThread()) { MOZ_CRASH("Blocking read on the js/ipc owning thread!"); } if (mSnappyStream) { return mSnappyStream; } nsCOMPtr<nsIRunnable> r = NewCancelableRunnableMethod("ReadStream::Inner::AsyncOpenStreamOnOwningThread", this, &ReadStream::Inner::AsyncOpenStreamOnOwningThread); nsresult rv = mOwningEventTarget->Dispatch(r.forget(), nsIThread::DISPATCH_NORMAL); if (NS_WARN_IF(NS_FAILED(rv))) { OpenStreamFailed(); return mSnappyStream; } mCondVar.Wait(); MOZ_DIAGNOSTIC_ASSERT(mSnappyStream); return mSnappyStream; } void ReadStream::Inner::AsyncOpenStreamOnOwningThread() { MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread()); if (!mControl || mState == Closed) { MutexAutoLock lock(mMutex); OpenStreamFailed(); mCondVar.NotifyAll(); return; } if (mAsyncOpenStarted) { return; } mAsyncOpenStarted = true; RefPtr<ReadStream::Inner> self = this; mControl->OpenStream(mId, [self](nsCOMPtr<nsIInputStream>&& aStream) { MutexAutoLock lock(self->mMutex); self->mAsyncOpenStarted = false; if (!self->mStream) { if (!aStream) { self->OpenStreamFailed(); } else { self->mStream = Move(aStream); self->mSnappyStream = new SnappyUncompressInputStream(self->mStream); } } self->mCondVar.NotifyAll(); }); } void ReadStream::Inner::MaybeAbortAsyncOpenStream() { if (!mAsyncOpenStarted) { return; } MutexAutoLock lock(mMutex); OpenStreamFailed(); mCondVar.NotifyAll(); } void ReadStream::Inner::OpenStreamFailed() { MOZ_DIAGNOSTIC_ASSERT(!mStream); MOZ_DIAGNOSTIC_ASSERT(!mSnappyStream); mMutex.AssertCurrentThreadOwns(); Unused << NS_NewCStringInputStream(getter_AddRefs(mStream), EmptyCString()); mSnappyStream = mStream; mStream->Close(); NoteClosed(); } // ---------------------------------------------------------------------------- NS_IMPL_ISUPPORTS(cache::ReadStream, nsIInputStream, ReadStream); Loading Loading
dom/cache/ReadStream.cpp +6 −123 Original line number Diff line number Diff line Loading @@ -13,7 +13,6 @@ #include "mozilla/ipc/IPCStreamUtils.h" #include "mozilla/SnappyUncompressInputStream.h" #include "nsIAsyncInputStream.h" #include "nsStringStream.h" #include "nsTArray.h" namespace mozilla { Loading Loading @@ -93,18 +92,6 @@ private: void ForgetOnOwningThread(); nsIInputStream* EnsureStream(); void AsyncOpenStreamOnOwningThread(); void MaybeAbortAsyncOpenStream(); void OpenStreamFailed(); // Weak ref to the stream control actor. The actor will always call either // CloseStream() or CloseStreamWithoutReporting() before it's destroyed. The // weak ref is cleared in the resulting NoteClosedOnOwningThread() or Loading @@ -122,14 +109,13 @@ private: }; Atomic<State> mState; Atomic<bool> mHasEverBeenRead; bool mAsyncOpenStarted; // The wrapped stream objects may not be threadsafe. We need to be able // to close a stream on our owning thread while an IO thread is simultaneously // reading the same stream. Therefore, protect all access to these stream // objects with a mutex. Mutex mMutex; CondVar mCondVar; nsCOMPtr<nsIInputStream> mStream; nsCOMPtr<nsIInputStream> mSnappyStream; Loading Loading @@ -216,9 +202,7 @@ ReadStream::Inner::Inner(StreamControl* aControl, const nsID& aId, , mOwningEventTarget(GetCurrentThreadSerialEventTarget()) , mState(Open) , mHasEverBeenRead(false) , mAsyncOpenStarted(false) , mMutex("dom::cache::ReadStream") , mCondVar(mMutex, "dom::cache::ReadStream") , mStream(aStream) , mSnappyStream(new SnappyUncompressInputStream(aStream)) { Loading Loading @@ -304,10 +288,8 @@ ReadStream::Inner::Close() nsresult rv = NS_OK; { MutexAutoLock lock(mMutex); if (mSnappyStream) { rv = mSnappyStream->Close(); } } NoteClosed(); return rv; } Loading @@ -319,7 +301,7 @@ ReadStream::Inner::Available(uint64_t* aNumAvailableOut) nsresult rv = NS_OK; { MutexAutoLock lock(mMutex); rv = EnsureStream()->Available(aNumAvailableOut); rv = mSnappyStream->Available(aNumAvailableOut); } if (NS_FAILED(rv)) { Loading @@ -338,7 +320,7 @@ ReadStream::Inner::Read(char* aBuf, uint32_t aCount, uint32_t* aNumReadOut) nsresult rv = NS_OK; { MutexAutoLock lock(mMutex); rv = EnsureStream()->Read(aBuf, aCount, aNumReadOut); rv = mSnappyStream->Read(aBuf, aCount, aNumReadOut); } if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK) || Loading Loading @@ -366,7 +348,7 @@ ReadStream::Inner::ReadSegments(nsWriteSegmentFun aWriter, void* aClosure, nsresult rv = NS_OK; { MutexAutoLock lock(mMutex); rv = EnsureStream()->ReadSegments(aWriter, aClosure, aCount, aNumReadOut); rv = mSnappyStream->ReadSegments(aWriter, aClosure, aCount, aNumReadOut); } if ((NS_FAILED(rv) && rv != NS_BASE_STREAM_WOULD_BLOCK && Loading @@ -390,12 +372,8 @@ ReadStream::Inner::IsNonBlocking(bool* aNonBlockingOut) { // stream ops can happen on any thread MutexAutoLock lock(mMutex); if (mSnappyStream) { return mSnappyStream->IsNonBlocking(aNonBlockingOut); } *aNonBlockingOut = false; return NS_OK; } ReadStream::Inner::~Inner() { Loading Loading @@ -450,8 +428,6 @@ ReadStream::Inner::NoteClosedOnOwningThread() return; } MaybeAbortAsyncOpenStream(); MOZ_DIAGNOSTIC_ASSERT(mControl); mControl->NoteClosed(this, mId); mControl = nullptr; Loading @@ -467,104 +443,11 @@ ReadStream::Inner::ForgetOnOwningThread() return; } MaybeAbortAsyncOpenStream(); MOZ_DIAGNOSTIC_ASSERT(mControl); mControl->ForgetReadStream(this); mControl = nullptr; } nsIInputStream* ReadStream::Inner::EnsureStream() { mMutex.AssertCurrentThreadOwns(); // We need to block the current thread while we open the stream. We // cannot do this safely from the main owning thread since it would // trigger deadlock. This should be ok, though, since a blocking // stream like this should never be read on the owning thread anyway. if (mOwningEventTarget->IsOnCurrentThread()) { MOZ_CRASH("Blocking read on the js/ipc owning thread!"); } if (mSnappyStream) { return mSnappyStream; } nsCOMPtr<nsIRunnable> r = NewCancelableRunnableMethod("ReadStream::Inner::AsyncOpenStreamOnOwningThread", this, &ReadStream::Inner::AsyncOpenStreamOnOwningThread); nsresult rv = mOwningEventTarget->Dispatch(r.forget(), nsIThread::DISPATCH_NORMAL); if (NS_WARN_IF(NS_FAILED(rv))) { OpenStreamFailed(); return mSnappyStream; } mCondVar.Wait(); MOZ_DIAGNOSTIC_ASSERT(mSnappyStream); return mSnappyStream; } void ReadStream::Inner::AsyncOpenStreamOnOwningThread() { MOZ_ASSERT(mOwningEventTarget->IsOnCurrentThread()); if (!mControl || mState == Closed) { MutexAutoLock lock(mMutex); OpenStreamFailed(); mCondVar.NotifyAll(); return; } if (mAsyncOpenStarted) { return; } mAsyncOpenStarted = true; RefPtr<ReadStream::Inner> self = this; mControl->OpenStream(mId, [self](nsCOMPtr<nsIInputStream>&& aStream) { MutexAutoLock lock(self->mMutex); self->mAsyncOpenStarted = false; if (!self->mStream) { if (!aStream) { self->OpenStreamFailed(); } else { self->mStream = Move(aStream); self->mSnappyStream = new SnappyUncompressInputStream(self->mStream); } } self->mCondVar.NotifyAll(); }); } void ReadStream::Inner::MaybeAbortAsyncOpenStream() { if (!mAsyncOpenStarted) { return; } MutexAutoLock lock(mMutex); OpenStreamFailed(); mCondVar.NotifyAll(); } void ReadStream::Inner::OpenStreamFailed() { MOZ_DIAGNOSTIC_ASSERT(!mStream); MOZ_DIAGNOSTIC_ASSERT(!mSnappyStream); mMutex.AssertCurrentThreadOwns(); Unused << NS_NewCStringInputStream(getter_AddRefs(mStream), EmptyCString()); mSnappyStream = mStream; mStream->Close(); NoteClosed(); } // ---------------------------------------------------------------------------- NS_IMPL_ISUPPORTS(cache::ReadStream, nsIInputStream, ReadStream); Loading