Commit 0dd4d4b9 authored by Kagami Sascha Rosylight's avatar Kagami Sascha Rosylight
Browse files

Bug 1763142 - Implement transform sink/source algorithms r=mgaudet,emilio

parent e97ca7ef
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -1883,7 +1883,7 @@ void SetUpReadableByteStreamController(
  aController->PendingPullIntos().clear();

  // Step 13. Set stream.[[controller]] to controller.
  aStream->SetController(aController);
  aStream->SetController(*aController);

  // Step 14. Let startResult be the result of performing startAlgorithm.
  JS::RootedValue startResult(aCx, JS::UndefinedValue());
+5 −4
Original line number Diff line number Diff line
@@ -60,13 +60,14 @@ class ReadableStream final : public nsISupports, public nsWrapperCache {

  // Slot Getter/Setters:
 public:
  ReadableStreamController* Controller() { return mController; }
  MOZ_KNOWN_LIVE ReadableStreamController* Controller() { return mController; }
  ReadableStreamDefaultController* DefaultController() {
    MOZ_ASSERT(mController && mController->IsDefault());
    return mController->AsDefault();
  }
  void SetController(ReadableStreamController* aController) {
    mController = aController;
  void SetController(ReadableStreamController& aController) {
    MOZ_ASSERT(!mController);
    mController = &aController;
  }

  bool Disturbed() const { return mDisturbed; }
@@ -137,7 +138,7 @@ class ReadableStream final : public nsISupports, public nsWrapperCache {

  // Internal Slots:
 private:
  RefPtr<ReadableStreamController> mController;
  MOZ_KNOWN_LIVE RefPtr<ReadableStreamController> mController;
  bool mDisturbed = false;
  RefPtr<ReadableStreamGenericReader> mReader;
  ReaderState mState = ReaderState::Readable;
+1 −1
Original line number Diff line number Diff line
@@ -491,7 +491,7 @@ void SetUpReadableStreamDefaultController(
  aController->SetAlgorithms(aAlgorithms);

  // Step 8.
  aStream->SetController(aController);
  aStream->SetController(*aController);

  // Step 9. Default algorithm returns undefined. See Step 2 of
  // https://streams.spec.whatwg.org/#set-up-readable-stream-default-controller
+253 −35
Original line number Diff line number Diff line
@@ -6,9 +6,13 @@

#include "mozilla/dom/TransformStream.h"

#include "TransformerCallbackHelpers.h"
#include "UnderlyingSourceCallbackHelpers.h"
#include "js/TypeDecls.h"
#include "mozilla/Attributes.h"
#include "mozilla/dom/Promise.h"
#include "mozilla/dom/Promise-inl.h"
#include "mozilla/dom/PromiseNativeHandler.h"
#include "mozilla/dom/WritableStream.h"
#include "mozilla/dom/ReadableStream.h"
#include "mozilla/dom/RootedDictionary.h"
@@ -17,6 +21,14 @@
#include "mozilla/dom/StreamUtils.h"
#include "nsWrapperCache.h"

// XXX: GCC somehow does not allow attributes before lambda return types, while
// clang requires so. See also bug 1627007.
#ifdef __clang__
#  define MOZ_CAN_RUN_SCRIPT_BOUNDARY_LAMBDA MOZ_CAN_RUN_SCRIPT_BOUNDARY
#else
#  define MOZ_CAN_RUN_SCRIPT_BOUNDARY_LAMBDA
#endif

namespace mozilla::dom {

NS_IMPL_CYCLE_COLLECTION_WRAPPERCACHE(TransformStream, mGlobal,
@@ -82,6 +94,49 @@ void TransformStreamError(JSContext* aCx, TransformStream* aStream,
  TransformStreamErrorWritableAndUnblockWrite(aCx, aStream, aError, aRv);
}

// https://streams.spec.whatwg.org/#transform-stream-default-controller-perform-transform
MOZ_CAN_RUN_SCRIPT static already_AddRefed<Promise>
TransformStreamDefaultControllerPerformTransform(
    JSContext* aCx, TransformStreamDefaultController* aController,
    JS::HandleValue aChunk, ErrorResult& aRv) {
  // Step 1: Let transformPromise be the result of performing
  // controller.[[transformAlgorithm]], passing chunk.
  RefPtr<TransformerAlgorithms> algorithms = aController->Algorithms();
  RefPtr<Promise> transformPromise =
      algorithms->TransformCallback(aCx, aChunk, *aController, aRv);
  if (aRv.Failed()) {
    return nullptr;
  }

  // Step 2: Return the result of reacting to transformPromise with the
  // following rejection steps given the argument r:
  auto result = transformPromise->CatchWithCycleCollectedArgs(
      [](JSContext* aCx, JS::HandleValue aError, ErrorResult& aRv,
         const RefPtr<TransformStreamDefaultController>& aController)
          MOZ_CAN_RUN_SCRIPT_BOUNDARY_LAMBDA -> already_AddRefed<Promise> {
            // Step 2.1: Perform ! TransformStreamError(controller.[[stream]],
            // r).
            // TODO: Remove MOZ_KnownLive (bug 1761577)
            TransformStreamError(aCx, MOZ_KnownLive(aController->Stream()),
                                 aError, aRv);
            if (aRv.Failed()) {
              return nullptr;
            }

            // Step 2.2: Throw r.
            JS::RootedValue r(aCx, aError);
            aRv.MightThrowJSException();
            aRv.ThrowJSException(aCx, r);
            return nullptr;
          },
      RefPtr(aController));
  if (result.isErr()) {
    aRv.Throw(result.unwrapErr());
    return nullptr;
  }
  return result.unwrap().forget();
}

// https://streams.spec.whatwg.org/#initialize-transform-stream
class TransformStreamUnderlyingSinkAlgorithms final
    : public UnderlyingSinkAlgorithmsBase {
@@ -103,38 +158,185 @@ class TransformStreamUnderlyingSinkAlgorithms final
    aRetVal.setObject(*mStartPromise->PromiseObj());
  }

  already_AddRefed<Promise> WriteCallback(
  MOZ_CAN_RUN_SCRIPT already_AddRefed<Promise> WriteCallback(
      JSContext* aCx, JS::Handle<JS::Value> aChunk,
      WritableStreamDefaultController& aController, ErrorResult& aRv) override {
    // Step 2. Let writeAlgorithm be the following steps, taking a chunk
    // argument:
    //  Step 1. Return ! TransformStreamDefaultSinkWriteAlgorithm(stream,
    // Step 2.1. Return ! TransformStreamDefaultSinkWriteAlgorithm(stream,
    // chunk).
    // TODO
    return Promise::CreateResolvedWithUndefined(mStream->GetParentObject(),
                                                aRv);

    // Inlining TransformStreamDefaultSinkWriteAlgorithm here:
    // https://streams.spec.whatwg.org/#transform-stream-default-sink-write-algorithm

    // Step 1: Assert: stream.[[writable]].[[state]] is "writable".
    MOZ_ASSERT(mStream->Writable()->State() ==
               WritableStream::WriterState::Writable);

    // Step 2: Let controller be stream.[[controller]].
    RefPtr<TransformStreamDefaultController> controller = mStream->Controller();

    // Step 3: If stream.[[backpressure]] is true,
    if (mStream->Backpressure()) {
      // Step 3.1: Let backpressureChangePromise be
      // stream.[[backpressureChangePromise]].
      RefPtr<Promise> backpressureChangePromise =
          mStream->BackpressureChangePromise();

      // Step 3.2: Assert: backpressureChangePromise is not undefined.
      MOZ_ASSERT(backpressureChangePromise);

      // Step 3.3: Return the result of reacting to backpressureChangePromise
      // with the following fulfillment steps:
      auto result = backpressureChangePromise->ThenWithCycleCollectedArgsJS(
          [](JSContext* aCx, JS::HandleValue, ErrorResult& aRv,
             const RefPtr<TransformStream>& aStream,
             const RefPtr<TransformStreamDefaultController>& aController,
             JS::HandleValue aChunk)
              MOZ_CAN_RUN_SCRIPT_BOUNDARY_LAMBDA -> already_AddRefed<Promise> {
                // Step 1: Let writable be stream.[[writable]].
                RefPtr<WritableStream> writable = aStream->Writable();

                // Step 2: Let state be writable.[[state]].
                WritableStream::WriterState state = writable->State();

                // Step 3: If state is "erroring", throw
                // writable.[[storedError]].
                if (state == WritableStream::WriterState::Erroring) {
                  JS::RootedValue storedError(aCx, writable->StoredError());
                  aRv.MightThrowJSException();
                  aRv.ThrowJSException(aCx, storedError);
                  return nullptr;
                }

                // Step 4: Assert: state is "writable".
                MOZ_ASSERT(state == WritableStream::WriterState::Writable);

                // Step 5: Return !
                // TransformStreamDefaultControllerPerformTransform(controller,
                // chunk).
                return TransformStreamDefaultControllerPerformTransform(
                    aCx, aController, aChunk, aRv);
              },
          std::make_tuple(mStream, controller), std::make_tuple(aChunk));

      if (result.isErr()) {
        aRv.Throw(result.unwrapErr());
        return nullptr;
      }
      return result.unwrap().forget();
    }

  already_AddRefed<Promise> AbortCallback(
    // Step 4: Return !
    // TransformStreamDefaultControllerPerformTransform(controller, chunk).
    return TransformStreamDefaultControllerPerformTransform(aCx, controller,
                                                            aChunk, aRv);
  }

  MOZ_CAN_RUN_SCRIPT already_AddRefed<Promise> AbortCallback(
      JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
      ErrorResult& aRv) override {
    // Step 3. Let abortAlgorithm be the following steps, taking a reason
    // argument:
    //  Step 1. Return ! TransformStreamDefaultSinkAbortAlgorithm(stream,
    // Step 3.1. Return ! TransformStreamDefaultSinkAbortAlgorithm(stream,
    // reason).
    // TODO

    // Inlining TransformStreamDefaultSinkAbortAlgorithm here:
    // https://streams.spec.whatwg.org/#transform-stream-default-sink-abort-algorithm

    // Step 1:Perform ! TransformStreamError(stream, reason).
    TransformStreamError(
        aCx, mStream,
        aReason.WasPassed() ? aReason.Value() : JS::UndefinedHandleValue, aRv);
    if (aRv.Failed()) {
      return nullptr;
    }

    // Step 2: Return a promise resolved with undefined.
    return Promise::CreateResolvedWithUndefined(mStream->GetParentObject(),
                                                aRv);
  }

  already_AddRefed<Promise> CloseCallback(JSContext* aCx,
                                          ErrorResult& aRv) override {
  MOZ_CAN_RUN_SCRIPT already_AddRefed<Promise> CloseCallback(
      JSContext* aCx, ErrorResult& aRv) override {
    // Step 4. Let closeAlgorithm be the following steps:
    // Step 4.1. Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).

    //   Step 1. Return ! TransformStreamDefaultSinkCloseAlgorithm(stream).
    // TODO
    return Promise::CreateResolvedWithUndefined(mStream->GetParentObject(),
    // Inlining TransformStreamDefaultSinkCloseAlgorithm here:
    // https://streams.spec.whatwg.org/#transform-stream-default-sink-close-algorithm

    // Step 1: Let readable be stream.[[readable]].
    RefPtr<ReadableStream> readable = mStream->Readable();

    // Step 2: Let controller be stream.[[controller]].
    RefPtr<TransformStreamDefaultController> controller = mStream->Controller();

    // Step 3: Let flushPromise be the result of performing
    // controller.[[flushAlgorithm]].
    RefPtr<TransformerAlgorithms> algorithms = controller->Algorithms();
    RefPtr<Promise> flushPromise =
        algorithms->FlushCallback(aCx, *controller, aRv);
    if (aRv.Failed()) {
      return nullptr;
    }

    // Step 4: Perform !
    // TransformStreamDefaultControllerClearAlgorithms(controller).
    controller->SetAlgorithms(nullptr);

    // Step 5: Return the result of reacting to flushPromise:
    Result<RefPtr<Promise>, nsresult> result =
        flushPromise->ThenCatchWithCycleCollectedArgs(
            [](JSContext* aCx, JS::HandleValue aValue, ErrorResult& aRv,
               const RefPtr<ReadableStream>& aReadable,
               const RefPtr<TransformStream>& aStream)
                MOZ_CAN_RUN_SCRIPT_BOUNDARY_LAMBDA
            -> already_AddRefed<Promise> {
                  // Step 5.1: If flushPromise was fulfilled, then:

                  // Step 5.1.1: If readable.[[state]] is "errored", throw
                  // readable.[[storedError]].
                  if (aReadable->State() ==
                      ReadableStream::ReaderState::Errored) {
                    JS::RootedValue storedError(aCx, aReadable->StoredError());
                    aRv.MightThrowJSException();
                    aRv.ThrowJSException(aCx, storedError);
                    return nullptr;
                  }

                  // Step 5.1.2: Perform !
                  // ReadableStreamDefaultControllerClose(readable.[[controller]]).
                  ReadableStreamDefaultControllerClose(
                      aCx, MOZ_KnownLive(aReadable->Controller()->AsDefault()),
                      aRv);
                  return nullptr;
                },
            [](JSContext* aCx, JS::HandleValue aValue, ErrorResult& aRv,
               const RefPtr<ReadableStream>& aReadable,
               const RefPtr<TransformStream>& aStream)
                MOZ_CAN_RUN_SCRIPT_BOUNDARY_LAMBDA
            -> already_AddRefed<Promise> {
                  // Step 5.2: If flushPromise was rejected with reason r, then:

                  // Step 5.2.1: Perform ! TransformStreamError(stream, r).
                  TransformStreamError(aCx, aStream, aValue, aRv);
                  if (aRv.Failed()) {
                    return nullptr;
                  }

                  // Step 5.2.2: Throw readable.[[storedError]].
                  JS::RootedValue storedError(aCx, aReadable->StoredError());
                  aRv.MightThrowJSException();
                  aRv.ThrowJSException(aCx, storedError);
                  return nullptr;
                },
            readable, mStream);

    if (result.isErr()) {
      aRv.Throw(result.unwrapErr());
      return nullptr;
    }
    return result.unwrap().forget();
  }

 protected:
@@ -142,7 +344,8 @@ class TransformStreamUnderlyingSinkAlgorithms final

 private:
  RefPtr<Promise> mStartPromise;
  RefPtr<TransformStream> mStream;
  // MOZ_KNOWN_LIVE because it won't be reassigned
  MOZ_KNOWN_LIVE RefPtr<TransformStream> mStream;
};

NS_IMPL_CYCLE_COLLECTION_INHERITED(TransformStreamUnderlyingSinkAlgorithms,
@@ -179,21 +382,39 @@ class TransformStreamUnderlyingSourceAlgorithms final
                                         ReadableStreamController& aController,
                                         ErrorResult& aRv) override {
    // Step 6. Let pullAlgorithm be the following steps:
    //   Step 1. Return ! TransformStreamDefaultSourcePullAlgorithm(stream).
    // TODO
    return Promise::CreateResolvedWithUndefined(mStream->GetParentObject(),
                                                aRv);
    // Step 6.1. Return ! TransformStreamDefaultSourcePullAlgorithm(stream).

    // Inlining TransformStreamDefaultSourcePullAlgorithm here:
    // https://streams.spec.whatwg.org/#transform-stream-default-source-pull-algorithm

    // Step 1: Assert: stream.[[backpressure]] is true.
    MOZ_ASSERT(mStream->Backpressure());

    // Step 2: Assert: stream.[[backpressureChangePromise]] is not undefined.
    MOZ_ASSERT(mStream->BackpressureChangePromise());

    // Step 3: Perform ! TransformStreamSetBackpressure(stream, false).
    TransformStreamSetBackpressure(mStream, false, aRv);

    // Step 4: Return stream.[[backpressureChangePromise]].
    return do_AddRef(mStream->BackpressureChangePromise());
  }

  already_AddRefed<Promise> CancelCallback(
  MOZ_CAN_RUN_SCRIPT already_AddRefed<Promise> CancelCallback(
      JSContext* aCx, const Optional<JS::Handle<JS::Value>>& aReason,
      ErrorResult& aRv) override {
    // Step 7. Let cancelAlgorithm be the following steps, taking a reason
    // argument:
    //  Step 1. Perform ! TransformStreamErrorWritableAndUnblockWrite(stream,
    // Step 7.1. Perform ! TransformStreamErrorWritableAndUnblockWrite(stream,
    // reason).
    //  Step 2. Return a promise resolved with undefined.
    // TODO
    TransformStreamErrorWritableAndUnblockWrite(
        aCx, mStream,
        aReason.WasPassed() ? aReason.Value() : JS::UndefinedHandleValue, aRv);
    if (aRv.Failed()) {
      return nullptr;
    }

    // Step 7.2. Return a promise resolved with undefined.
    return Promise::CreateResolvedWithUndefined(mStream->GetParentObject(),
                                                aRv);
  }
@@ -205,7 +426,8 @@ class TransformStreamUnderlyingSourceAlgorithms final

 private:
  RefPtr<Promise> mStartPromise;
  RefPtr<TransformStream> mStream;
  // MOZ_KNOWNLIVE because it will never be reassigned
  MOZ_KNOWN_LIVE RefPtr<TransformStream> mStream;
};

NS_IMPL_CYCLE_COLLECTION_INHERITED(TransformStreamUnderlyingSourceAlgorithms,
@@ -417,16 +639,12 @@ already_AddRefed<TransformStream> TransformStream::Constructor(
  return transformStream.forget();
}

already_AddRefed<ReadableStream> TransformStream::GetReadable(
    ErrorResult& aRv) {
  aRv.Throw(NS_ERROR_NOT_IMPLEMENTED);
  return nullptr;
already_AddRefed<ReadableStream> TransformStream::GetReadable() {
  return mReadable.forget();
}

already_AddRefed<WritableStream> TransformStream::GetWritable(
    ErrorResult& aRv) {
  aRv.Throw(NS_ERROR_NOT_IMPLEMENTED);
  return nullptr;
already_AddRefed<WritableStream> TransformStream::GetWritable() {
  return mWritable.forget();
}

}  // namespace mozilla::dom
+2 −2
Original line number Diff line number Diff line
@@ -69,8 +69,8 @@ class TransformStream final : public nsISupports, public nsWrapperCache {
              const QueuingStrategy& aWritableStrategy,
              const QueuingStrategy& aReadableStrategy, ErrorResult& aRv);

  already_AddRefed<ReadableStream> GetReadable(ErrorResult& aRv);
  already_AddRefed<WritableStream> GetWritable(ErrorResult& aRv);
  already_AddRefed<ReadableStream> GetReadable();
  already_AddRefed<WritableStream> GetWritable();

 private:
  nsCOMPtr<nsIGlobalObject> mGlobal;
Loading