MediaPipeline.cpp 60 KB
Newer Older
1
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2
3
4
5
6
7
8
9
/* This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this file,
 * You can obtain one at http://mozilla.org/MPL/2.0/. */

// Original author: ekr@rtfm.com

#include "MediaPipeline.h"

10
#include <inttypes.h>
11
12
#include <math.h>

13
#include "AudioSegment.h"
14
#include "AudioConverter.h"
15
16
17
#include "DOMMediaStream.h"
#include "ImageContainer.h"
#include "ImageTypes.h"
18
#include "Layers.h"
19
#include "LayersLogging.h"
20
#include "MediaEngine.h"
21
#include "MediaSegment.h"
22
23
#include "MediaTrackGraphImpl.h"
#include "MediaTrackListener.h"
24
#include "MediaStreamTrack.h"
25
#include "RemoteTrackSource.h"
26
#include "RtpLogger.h"
27
#include "VideoFrameConverter.h"
28
29
30
#include "VideoSegment.h"
#include "VideoStreamTrack.h"
#include "VideoUtils.h"
31
#include "mozilla/Logging.h"
32
#include "mozilla/NullPrincipal.h"
33
#include "mozilla/PeerIdentity.h"
34
#include "mozilla/Preferences.h"
35
36
#include "mozilla/SharedThreadPool.h"
#include "mozilla/Sprintf.h"
37
#include "mozilla/StaticPrefs_media.h"
38
#include "mozilla/TaskQueue.h"
39
#include "mozilla/UniquePtr.h"
40
#include "mozilla/UniquePtrExtensions.h"
41
42
43
44
45
46
#include "mozilla/dom/RTCStatsReportBinding.h"
#include "mozilla/gfx/Point.h"
#include "mozilla/gfx/Types.h"
#include "nsError.h"
#include "nsThreadUtils.h"
#include "runnable_utils.h"
47
#include "signaling/src/peerconnection/MediaTransportHandler.h"
48
#include "Tracing.h"
49
#include "WebrtcImageBuffer.h"
50
#include "webrtc/common_video/include/video_frame_buffer.h"
51
#include "webrtc/modules/rtp_rtcp/include/rtp_rtcp.h"
52

53
54
// Max size given stereo is 480*2*2 = 1920 (10ms of 16-bits stereo audio at
// 48KHz)
55
#define AUDIO_SAMPLE_BUFFER_MAX_BYTES (480 * 2 * 2)
56
57
58
static_assert((WEBRTC_MAX_SAMPLE_RATE / 100) * sizeof(uint16_t) * 2 <=
                  AUDIO_SAMPLE_BUFFER_MAX_BYTES,
              "AUDIO_SAMPLE_BUFFER_MAX_BYTES is not large enough");
59

60
using namespace mozilla;
61
using namespace mozilla::dom;
62
using namespace mozilla::gfx;
63
using namespace mozilla::layers;
64

65
mozilla::LazyLogModule gMediaPipelineLog("MediaPipeline");
66
67
68

namespace mozilla {

69
// An async inserter for audio data, to avoid running audio codec encoders
70
// on the MTG/input audio thread.  Basically just bounces all the audio
71
72
// data to a single audio processing/input queue.  We could if we wanted to
// use multiple threads and a TaskQueue.
73
74
class AudioProxyThread {
 public:
75
76
  NS_INLINE_DECL_THREADSAFE_REFCOUNTING(AudioProxyThread)

77
78
  explicit AudioProxyThread(RefPtr<AudioSessionConduit> aConduit)
      : mConduit(std::move(aConduit)),
79
80
81
        mTaskQueue(new TaskQueue(
            GetMediaThreadPool(MediaThreadType::WEBRTC_DECODER), "AudioProxy")),
        mAudioConverter(nullptr) {
82
83
84
85
    MOZ_ASSERT(mConduit);
    MOZ_COUNT_CTOR(AudioProxyThread);
  }

86
87
88
89
  // This function is the identity if aInputRate is supported.
  // Else, it returns a rate that is supported, that ensure no loss in audio
  // quality: the sampling rate returned is always greater to the inputed
  // sampling-rate, if they differ..
90
  uint32_t AppropriateSendingRateForInputRate(uint32_t aInputRate) {
91
    AudioSessionConduit* conduit =
92
        static_cast<AudioSessionConduit*>(mConduit.get());
93
94
95
96
97
    if (conduit->IsSamplingFreqSupported(aInputRate)) {
      return aInputRate;
    }
    if (aInputRate < 16000) {
      return 16000;
98
99
    }
    if (aInputRate < 32000) {
100
      return 32000;
101
102
    }
    if (aInputRate < 44100) {
103
104
      return 44100;
    }
105
    return 48000;
106
107
108
109
110
  }

  // From an arbitrary AudioChunk at sampling-rate aRate, process the audio into
  // something the conduit can work with (or send silence if the track is not
  // enabled), and send the audio in 10ms chunks to the conduit.
111
112
  void InternalProcessAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
                                 bool aEnabled) {
113
    MOZ_ASSERT(mTaskQueue->IsCurrentThreadIn());
114

115
    // Convert to interleaved 16-bits integer audio, with a maximum of two
116
    // channels (since the WebRTC.org code below makes the assumption that the
117
118
119
120
    // input audio is either mono or stereo), with a sample-rate rate that is
    // 16, 32, 44.1, or 48kHz.
    uint32_t outputChannels = aChunk.ChannelCount() == 1 ? 1 : 2;
    int32_t transmissionRate = AppropriateSendingRateForInputRate(aRate);
121

122
123
124
125
    // We take advantage of the fact that the common case (microphone directly
    // to PeerConnection, that is, a normal call), the samples are already
    // 16-bits mono, so the representation in interleaved and planar is the
    // same, and we can just use that.
126
127
    if (aEnabled && outputChannels == 1 &&
        aChunk.mBufferFormat == AUDIO_FORMAT_S16 && transmissionRate == aRate) {
128
      const int16_t* samples = aChunk.ChannelData<int16_t>().Elements()[0];
129
      PacketizeAndSend(samples, transmissionRate, outputChannels,
130
131
132
133
134
135
136
137
138
139
140
141
                       aChunk.mDuration);
      return;
    }

    uint32_t sampleCount = aChunk.mDuration * outputChannels;
    if (mInterleavedAudio.Length() < sampleCount) {
      mInterleavedAudio.SetLength(sampleCount);
    }

    if (!aEnabled || aChunk.mBufferFormat == AUDIO_FORMAT_SILENCE) {
      PodZero(mInterleavedAudio.Elements(), sampleCount);
    } else if (aChunk.mBufferFormat == AUDIO_FORMAT_FLOAT32) {
142
143
      DownmixAndInterleave(aChunk.ChannelData<float>(), aChunk.mDuration,
                           aChunk.mVolume, outputChannels,
144
145
                           mInterleavedAudio.Elements());
    } else if (aChunk.mBufferFormat == AUDIO_FORMAT_S16) {
146
147
      DownmixAndInterleave(aChunk.ChannelData<int16_t>(), aChunk.mDuration,
                           aChunk.mVolume, outputChannels,
148
149
150
151
152
                           mInterleavedAudio.Elements());
    }
    int16_t* inputAudio = mInterleavedAudio.Elements();
    size_t inputAudioFrameCount = aChunk.mDuration;

153
    AudioConfig inputConfig(AudioConfig::ChannelLayout(outputChannels), aRate,
154
155
                            AudioConfig::FORMAT_S16);
    AudioConfig outputConfig(AudioConfig::ChannelLayout(outputChannels),
156
                             transmissionRate, AudioConfig::FORMAT_S16);
157
    // Resample to an acceptable sample-rate for the sending side
158
    if (!mAudioConverter || mAudioConverter->InputConfig() != inputConfig ||
159
160
161
162
163
164
        mAudioConverter->OutputConfig() != outputConfig) {
      mAudioConverter = MakeUnique<AudioConverter>(inputConfig, outputConfig);
    }

    int16_t* processedAudio = nullptr;
    size_t framesProcessed =
165
        mAudioConverter->Process(inputAudio, inputAudioFrameCount);
166
167
168

    if (framesProcessed == 0) {
      // In place conversion not possible, use a buffer.
169
170
      framesProcessed = mAudioConverter->Process(mOutputAudio, inputAudio,
                                                 inputAudioFrameCount);
171
      processedAudio = mOutputAudio.Data();
172
    } else {
173
      processedAudio = inputAudio;
174
175
    }

176
    PacketizeAndSend(processedAudio, transmissionRate, outputChannels,
177
178
                     framesProcessed);
  }
179

180
181
182
  // This packetizes aAudioData in 10ms chunks and sends it.
  // aAudioData is interleaved audio data at a rate and with a channel count
  // that is appropriate to send with the conduit.
183
184
  void PacketizeAndSend(const int16_t* aAudioData, uint32_t aRate,
                        uint32_t aChannels, uint32_t aFrameCount) {
185
186
187
    MOZ_ASSERT(AppropriateSendingRateForInputRate(aRate) == aRate);
    MOZ_ASSERT(aChannels == 1 || aChannels == 2);
    MOZ_ASSERT(aAudioData);
188

189
    uint32_t audio_10ms = aRate / 100;
190

191
    if (!mPacketizer || mPacketizer->PacketSize() != audio_10ms ||
192
193
194
195
        mPacketizer->Channels() != aChannels) {
      // It's the right thing to drop the bit of audio still in the packetizer:
      // we don't want to send to the conduit audio that has two different
      // rates while telling it that it has a constante rate.
196
197
      mPacketizer =
          MakeUnique<AudioPacketizer<int16_t, int16_t>>(audio_10ms, aChannels);
198
      mPacket = MakeUnique<int16_t[]>(audio_10ms * aChannels);
199
200
    }

201
    mPacketizer->Input(aAudioData, aFrameCount);
202

203
    while (mPacketizer->PacketsAvailable()) {
Alex Chronopoulos's avatar
Alex Chronopoulos committed
204
      mPacketizer->Output(mPacket.get());
205
206
      mConduit->SendAudioFrame(mPacket.get(), mPacketizer->PacketSize(), aRate,
                               mPacketizer->Channels(), 0);
207
208
209
    }
  }

210
211
  void QueueAudioChunk(TrackRate aRate, const AudioChunk& aChunk,
                       bool aEnabled) {
212
213
    RefPtr<AudioProxyThread> self = this;
    nsresult rv = mTaskQueue->Dispatch(NS_NewRunnableFunction(
214
215
216
        "AudioProxyThread::QueueAudioChunk", [self, aRate, aChunk, aEnabled]() {
          self->InternalProcessAudioChunk(aRate, aChunk, aEnabled);
        }));
217
    MOZ_DIAGNOSTIC_ASSERT(NS_SUCCEEDED(rv));
218
    Unused << rv;
219
220
  }

221
222
 protected:
  virtual ~AudioProxyThread() {
223
224
225
    // Conduits must be released on MainThread, and we might have the last
    // reference We don't need to worry about runnables still trying to access
    // the conduit, since the runnables hold a ref to AudioProxyThread.
226
    NS_ReleaseOnMainThread("AudioProxyThread::mConduit", mConduit.forget());
227
228
229
230
    MOZ_COUNT_DTOR(AudioProxyThread);
  }

  RefPtr<AudioSessionConduit> mConduit;
231
  const RefPtr<TaskQueue> mTaskQueue;
232
233
  // Only accessed on mTaskQueue
  UniquePtr<AudioPacketizer<int16_t, int16_t>> mPacketizer;
234
  // A buffer to hold a single packet of audio.
Alex Chronopoulos's avatar
Alex Chronopoulos committed
235
  UniquePtr<int16_t[]> mPacket;
236
237
238
  nsTArray<int16_t> mInterleavedAudio;
  AlignedShortBuffer mOutputAudio;
  UniquePtr<AudioConverter> mAudioConverter;
239
240
};

241
MediaPipeline::MediaPipeline(const std::string& aPc,
242
                             RefPtr<MediaTransportHandler> aTransportHandler,
243
                             DirectionType aDirection,
244
245
                             RefPtr<nsISerialEventTarget> aMainThread,
                             RefPtr<nsISerialEventTarget> aStsThread,
246
                             RefPtr<MediaSessionConduit> aConduit)
247
248
    : mDirection(aDirection),
      mLevel(0),
249
250
251
      mTransportHandler(std::move(aTransportHandler)),
      mConduit(std::move(aConduit)),
      mMainThread(std::move(aMainThread)),
252
      mStsThread(aStsThread),
253
      mTransport(new PipelineTransport(std::move(aStsThread))),
254
255
256
257
258
259
260
      mRtpPacketsSent(0),
      mRtcpPacketsSent(0),
      mRtpPacketsReceived(0),
      mRtcpPacketsReceived(0),
      mRtpBytesSent(0),
      mRtpBytesReceived(0),
      mPc(aPc),
261
      mFilter(),
262
263
      mRtpParser(webrtc::RtpHeaderParser::Create()),
      mPacketDumper(new PacketDumper(mPc)) {
264
265
  if (mDirection == DirectionType::RECEIVE) {
    mConduit->SetReceiverTransport(mTransport);
266
  } else {
267
    mConduit->SetTransmitterTransport(mTransport);
268
  }
269
270
}

271
MediaPipeline::~MediaPipeline() {
272
273
  MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
          ("Destroying MediaPipeline: %s", mDescription.c_str()));
274
  NS_ReleaseOnMainThread("MediaPipeline::mConduit", mConduit.forget());
275
276
}

277
void MediaPipeline::Shutdown_m() {
278
279
  Stop();
  DetachMedia();
280

281
  RUN_ON_THREAD(mStsThread,
282
283
                WrapRunnable(RefPtr<MediaPipeline>(this),
                             &MediaPipeline::DetachTransport_s),
284
                NS_DISPATCH_NORMAL);
285
286
}

287
void MediaPipeline::DetachTransport_s() {
288
  ASSERT_ON_THREAD(mStsThread);
289

290
291
  MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
          ("%s in %s", mDescription.c_str(), __FUNCTION__));
292

293
  disconnect_all();
294
295
296
  mRtpState = TransportLayer::TS_NONE;
  mRtcpState = TransportLayer::TS_NONE;
  mTransportId.clear();
297
  mTransport->Detach();
298
299

  // Make sure any cycles are broken
300
  mPacketDumper = nullptr;
301
302
}

303
304
305
306
307
308
309
void MediaPipeline::UpdateTransport_m(
    const std::string& aTransportId, UniquePtr<MediaPipelineFilter>&& aFilter) {
  mStsThread->Dispatch(NS_NewRunnableFunction(
      __func__, [aTransportId, filter = std::move(aFilter),
                 self = RefPtr<MediaPipeline>(this)]() mutable {
        self->UpdateTransport_s(aTransportId, std::move(filter));
      }));
310
311
}

312
313
void MediaPipeline::UpdateTransport_s(
    const std::string& aTransportId, UniquePtr<MediaPipelineFilter>&& aFilter) {
314
315
316
317
318
319
320
321
322
323
  ASSERT_ON_THREAD(mStsThread);
  if (!mSignalsConnected) {
    mTransportHandler->SignalStateChange.connect(
        this, &MediaPipeline::RtpStateChange);
    mTransportHandler->SignalRtcpStateChange.connect(
        this, &MediaPipeline::RtcpStateChange);
    mTransportHandler->SignalEncryptedSending.connect(
        this, &MediaPipeline::EncryptedPacketSending);
    mTransportHandler->SignalPacketReceived.connect(
        this, &MediaPipeline::PacketReceived);
324
325
    mTransportHandler->SignalAlpnNegotiated.connect(
        this, &MediaPipeline::AlpnNegotiated);
326
    mSignalsConnected = true;
327
  }
328

329
330
331
332
333
  if (aTransportId != mTransportId) {
    mTransportId = aTransportId;
    mRtpState = mTransportHandler->GetState(mTransportId, false);
    mRtcpState = mTransportHandler->GetState(mTransportId, true);
    CheckTransportStates();
334
  }
335

336
337
338
339
340
341
  if (mFilter) {
    for (const auto& extension : mFilter->GetExtmap()) {
      mRtpParser->DeregisterRtpHeaderExtension(
          webrtc::StringToRtpExtensionType(extension.uri));
    }
  }
342
  if (mFilter && aFilter) {
343
344
    // Use the new filter, but don't forget any remote SSRCs that we've learned
    // by receiving traffic.
345
    mFilter->Update(*aFilter);
346
  } else {
347
    mFilter = std::move(aFilter);
348
  }
349
350
351
352
353
354
  if (mFilter) {
    for (const auto& extension : mFilter->GetExtmap()) {
      mRtpParser->RegisterRtpHeaderExtension(
          webrtc::StringToRtpExtensionType(extension.uri), extension.id);
    }
  }
355
356
}

357
void MediaPipeline::AddRIDExtension_m(size_t aExtensionId) {
358
  RUN_ON_THREAD(mStsThread,
359
                WrapRunnable(RefPtr<MediaPipeline>(this),
360
                             &MediaPipeline::AddRIDExtension_s, aExtensionId),
361
362
363
                NS_DISPATCH_NORMAL);
}

364
void MediaPipeline::AddRIDExtension_s(size_t aExtensionId) {
365
366
  mRtpParser->RegisterRtpHeaderExtension(webrtc::kRtpExtensionRtpStreamId,
                                         aExtensionId);
367
368
}

369
void MediaPipeline::AddRIDFilter_m(const std::string& aRid) {
370
  RUN_ON_THREAD(mStsThread,
371
                WrapRunnable(RefPtr<MediaPipeline>(this),
372
                             &MediaPipeline::AddRIDFilter_s, aRid),
373
374
375
                NS_DISPATCH_NORMAL);
}

376
void MediaPipeline::AddRIDFilter_s(const std::string& aRid) {
377
  // Running a simulcast test, ignore other filtering
378
  mFilter = MakeUnique<MediaPipelineFilter>();
379
  mFilter->AddRemoteRtpStreamId(aRid);
380
381
}

382
383
384
void MediaPipeline::GetContributingSourceStats(
    const nsString& aInboundRtpStreamId,
    FallibleTArray<dom::RTCRTPContributingSourceStats>& aArr) const {
385
386
  // Get the expiry from now
  DOMHighResTimeStamp expiry = RtpCSRCStats::GetExpiryFromTime(GetNow());
387
  for (auto info : mCsrcStats) {
388
389
390
    if (!info.second.Expired(expiry)) {
      RTCRTPContributingSourceStats stats;
      info.second.GetWebidlInstance(stats, aInboundRtpStreamId);
391
392
393
      if (!aArr.AppendElement(stats, fallible)) {
        mozalloc_handle_oom(0);
      }
394
395
396
397
    }
  }
}

398
399
void MediaPipeline::RtpStateChange(const std::string& aTransportId,
                                   TransportLayer::State aState) {
400
401
  if (mTransportId != aTransportId) {
    return;
402
  }
403
404
  mRtpState = aState;
  CheckTransportStates();
405
406
}

407
408
void MediaPipeline::RtcpStateChange(const std::string& aTransportId,
                                    TransportLayer::State aState) {
409
410
411
412
413
  if (mTransportId != aTransportId) {
    return;
  }
  mRtcpState = aState;
  CheckTransportStates();
414
415
}

416
void MediaPipeline::CheckTransportStates() {
417
  ASSERT_ON_THREAD(mStsThread);
418

419
420
421
422
423
  if (mRtpState == TransportLayer::TS_CLOSED ||
      mRtpState == TransportLayer::TS_ERROR ||
      mRtcpState == TransportLayer::TS_CLOSED ||
      mRtcpState == TransportLayer::TS_ERROR) {
    MOZ_LOG(gMediaPipelineLog, LogLevel::Warning,
424
            ("RTP Transport failed for pipeline %p flow %s", this,
425
426
427
             mDescription.c_str()));

    NS_WARNING(
428
        "MediaPipeline Transport failed. This is not properly cleaned up yet");
429
430
431
432
433
434
    // TODO(ekr@rtfm.com): SECURITY: Figure out how to clean up if the
    // connection was good and now it is bad.
    // TODO(ekr@rtfm.com): Report up so that the PC knows we
    // have experienced an error.
    mTransport->Detach();
    return;
435
436
  }

437
  if (mRtpState == TransportLayer::TS_OPEN) {
438
    MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
439
            ("RTP Transport ready for pipeline %p flow %s", this,
440
             mDescription.c_str()));
441
442
  }

443
444
  if (mRtcpState == TransportLayer::TS_OPEN) {
    MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
445
            ("RTCP Transport ready for pipeline %p flow %s", this,
446
447
             mDescription.c_str()));
  }
448

449
450
451
  if (mRtpState == TransportLayer::TS_OPEN && mRtcpState == mRtpState) {
    mTransport->Attach(this);
    TransportReady_s();
452
453
  }
}
454

455
void MediaPipeline::SendPacket(MediaPacket&& packet) {
456
  ASSERT_ON_THREAD(mStsThread);
457
458
  MOZ_ASSERT(mRtpState == TransportLayer::TS_OPEN);
  MOZ_ASSERT(!mTransportId.empty());
459
  mTransportHandler->SendPacket(mTransportId, std::move(packet));
460
461
}

462
void MediaPipeline::IncrementRtpPacketsSent(int32_t aBytes) {
463
464
  ++mRtpPacketsSent;
  mRtpBytesSent += aBytes;
465

466
  if (!(mRtpPacketsSent % 100)) {
467
    MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
468
469
            ("RTP sent packet count for %s Pipeline %p: %u (%" PRId64 " bytes)",
             mDescription.c_str(), this, mRtpPacketsSent, mRtpBytesSent));
470
471
472
  }
}

473
void MediaPipeline::IncrementRtcpPacketsSent() {
474
475
  ++mRtcpPacketsSent;
  if (!(mRtcpPacketsSent % 100)) {
476
    MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
477
            ("RTCP sent packet count for %s Pipeline %p: %u",
478
             mDescription.c_str(), this, mRtcpPacketsSent));
479
480
481
  }
}

482
void MediaPipeline::IncrementRtpPacketsReceived(int32_t aBytes) {
483
484
485
  ++mRtpPacketsReceived;
  mRtpBytesReceived += aBytes;
  if (!(mRtpPacketsReceived % 100)) {
486
487
488
489
    MOZ_LOG(
        gMediaPipelineLog, LogLevel::Info,
        ("RTP received packet count for %s Pipeline %p: %u (%" PRId64 " bytes)",
         mDescription.c_str(), this, mRtpPacketsReceived, mRtpBytesReceived));
490
491
492
  }
}

493
void MediaPipeline::IncrementRtcpPacketsReceived() {
494
495
  ++mRtcpPacketsReceived;
  if (!(mRtcpPacketsReceived % 100)) {
496
    MOZ_LOG(gMediaPipelineLog, LogLevel::Info,
497
            ("RTCP received packet count for %s Pipeline %p: %u",
498
             mDescription.c_str(), this, mRtcpPacketsReceived));
499
500
501
  }
}

502
void MediaPipeline::RtpPacketReceived(const MediaPacket& packet) {
503
  if (mDirection == DirectionType::TRANSMIT) {
504
505
506
    return;
  }

507
  if (!mTransport->Pipeline()) {
508
509
    MOZ_LOG(gMediaPipelineLog, LogLevel::Error,
            ("Discarding incoming packet; transport disconnected"));
510
511
512
    return;
  }

513
  if (!mConduit) {
514
515
    MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
            ("Discarding incoming packet; media disconnected"));
516
517
    return;
  }
518

519
  if (!packet.len()) {
520
521
522
    return;
  }

523
  webrtc::RTPHeader header;
524
  if (!mRtpParser->Parse(packet.data(), packet.len(), &header, true)) {
525
526
527
    return;
  }

528
  if (mFilter && !mFilter->Filter(header)) {
529
    return;
530
531
  }

532
533
534
535
536
537
  // Make sure to only get the time once, and only if we need it by
  // using getTimestamp() for access
  DOMHighResTimeStamp now = 0.0;
  bool hasTime = false;

  // Remove expired RtpCSRCStats
538
  if (!mCsrcStats.empty()) {
539
540
541
542
543
    if (!hasTime) {
      now = GetNow();
      hasTime = true;
    }
    auto expiry = RtpCSRCStats::GetExpiryFromTime(now);
544
    for (auto p = mCsrcStats.begin(); p != mCsrcStats.end();) {
545
      if (p->second.Expired(expiry)) {
546
        p = mCsrcStats.erase(p);
547
548
549
550
551
552
553
554
555
556
557
558
559
        continue;
      }
      p++;
    }
  }

  // Add new RtpCSRCStats
  if (header.numCSRCs) {
    for (auto i = 0; i < header.numCSRCs; i++) {
      if (!hasTime) {
        now = GetNow();
        hasTime = true;
      }
560
561
562
      auto csrcInfo = mCsrcStats.find(header.arrOfCSRCs[i]);
      if (csrcInfo == mCsrcStats.end()) {
        mCsrcStats.insert(std::make_pair(
563
            header.arrOfCSRCs[i], RtpCSRCStats(header.arrOfCSRCs[i], now)));
564
565
566
567
568
569
      } else {
        csrcInfo->second.SetTimestamp(now);
      }
    }
  }

570
571
  MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
          ("%s received RTP packet.", mDescription.c_str()));
572
  IncrementRtpPacketsReceived(packet.len());
573
  OnRtpPacketReceived();
574

575
  RtpLogger::LogPacket(packet, true, mDescription);
576

577
578
  // Might be nice to pass ownership of the buffer in this case, but it is a
  // small optimization in a rare case.
579
580
  mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtp, false,
                      packet.encrypted_data(), packet.encrypted_len());
581

582
583
  mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtp, false, packet.data(),
                      packet.len());
584

585
  (void)mConduit->ReceivedRTPPacket(packet.data(), packet.len(),
586
                                    header);  // Ignore error codes
587
588
}

589
void MediaPipeline::RtcpPacketReceived(const MediaPacket& packet) {
590
  if (!mTransport->Pipeline()) {
591
592
    MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
            ("Discarding incoming packet; transport disconnected"));
593
594
595
    return;
  }

596
  if (!mConduit) {
597
598
    MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
            ("Discarding incoming packet; media disconnected"));
599
600
601
    return;
  }

602
  if (!packet.len()) {
603
604
605
    return;
  }

606
607
608
609
610
  // We do not filter RTCP. This is because a compound RTCP packet can contain
  // any collection of RTCP packets, and webrtc.org already knows how to filter
  // out what it is interested in, and what it is not. Maybe someday we should
  // have a TransportLayer that breaks up compound RTCP so we can filter them
  // individually, but I doubt that will matter much.
611

612
613
  MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
          ("%s received RTCP packet.", mDescription.c_str()));
614
  IncrementRtcpPacketsReceived();
615

616
  RtpLogger::LogPacket(packet, true, mDescription);
617

618
619
  // Might be nice to pass ownership of the buffer in this case, but it is a
  // small optimization in a rare case.
620
621
  mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Srtcp, false,
                      packet.encrypted_data(), packet.encrypted_len());
622

623
624
  mPacketDumper->Dump(mLevel, dom::mozPacketDumpType::Rtcp, false,
                      packet.data(), packet.len());
625

626
  if (StaticPrefs::media_webrtc_net_force_disable_rtcp_reception()) {
627
628
629
630
631
    MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
            ("%s RTCP packet forced to be dropped", mDescription.c_str()));
    return;
  }

632
633
  (void)mConduit->ReceivedRTCPPacket(packet.data(),
                                     packet.len());  // Ignore error codes
634
635
}

636
void MediaPipeline::PacketReceived(const std::string& aTransportId,
637
                                   const MediaPacket& packet) {
638
639
640
641
  if (mTransportId != aTransportId) {
    return;
  }

642
  if (!mTransport->Pipeline()) {
643
644
    MOZ_LOG(gMediaPipelineLog, LogLevel::Debug,
            ("Discarding incoming packet; transport disconnected"));
645
646
647
    return;
  }

648
649
  switch (packet.type()) {
    case MediaPacket::RTP:
650
      RtpPacketReceived(packet);
651
652
      break;
    case MediaPacket::RTCP:
653
      RtcpPacketReceived(packet);
654
      break;
655
    default:;
656
657
658
  }
}

659
660
661
662
663
664
665
void MediaPipeline::AlpnNegotiated(const std::string& aAlpn,
                                   bool aPrivacyRequested) {
  if (aPrivacyRequested) {
    MakePrincipalPrivate_s();
  }
}

666
void MediaPipeline::EncryptedPacketSending(const std::string& aTransportId,
667
                                           const MediaPacket& aPacket) {
668
669
670
671
672
673
674
675
676
677
678
679
680
681
  if (mTransportId == aTransportId) {
    dom::mozPacketDumpType type;
    if (aPacket.type() == MediaPacket::SRTP) {
      type = dom::mozPacketDumpType::Srtp;
    } else if (aPacket.type() == MediaPacket::SRTCP) {
      type = dom::mozPacketDumpType::Srtcp;
    } else if (aPacket.type() == MediaPacket::DTLS) {
      // TODO(bug 1497936): Implement packet dump for DTLS
      return;
    } else {
      MOZ_ASSERT(false);
      return;
    }
    mPacketDumper->Dump(Level(), type, true, aPacket.data(), aPacket.len());
682
683
684
  }
}

685
class MediaPipelineTransmit::PipelineListener
686
    : public DirectMediaTrackListener {
687
688
  friend class MediaPipelineTransmit;

689
 public:
690
691
  explicit PipelineListener(RefPtr<MediaSessionConduit> aConduit)
      : mConduit(std::move(aConduit)),
692
693
694
        mActive(false),
        mEnabled(false),
        mDirectConnect(false) {}
695

696
  ~PipelineListener() {
697
    NS_ReleaseOnMainThread("MediaPipeline::mConduit", mConduit.forget());
698
699
    if (mConverter) {
      mConverter->Shutdown();
700
    }
701
702
  }

703
704
705
706
707
708
  void SetActive(bool aActive) {
    mActive = aActive;
    if (mConverter) {
      mConverter->SetActive(aActive);
    }
  }
709
  void SetEnabled(bool aEnabled) { mEnabled = aEnabled; }
710

711
712
  // These are needed since nested classes don't have access to any particular
  // instance of the parent
713
714
  void SetAudioProxy(RefPtr<AudioProxyThread> aProxy) {
    mAudioProcessing = std::move(aProxy);
715
716
  }

717
718
  void SetVideoFrameConverter(RefPtr<VideoFrameConverter> aConverter) {
    mConverter = std::move(aConverter);
719
720
  }

721
  void OnVideoFrameConverted(const webrtc::VideoFrame& aVideoFrame) {
722
723
    MOZ_RELEASE_ASSERT(mConduit->type() == MediaSessionConduit::VIDEO);
    static_cast<VideoSessionConduit*>(mConduit.get())
724
        ->SendVideoFrame(aVideoFrame);
725
726
  }

727
728
  void SetTrackEnabled(MediaStreamTrack* aTrack, bool aEnabled);

729
730
  // Implement MediaTrackListener
  void NotifyQueuedChanges(MediaTrackGraph* aGraph, TrackTime aOffset,
731
732
                           const MediaSegment& aQueuedMedia) override;

733
734
  // Implement DirectMediaTrackListener
  void NotifyRealtimeTrackData(MediaTrackGraph* aGraph, TrackTime aOffset,
735
736
737
738
                               const MediaSegment& aMedia) override;
  void NotifyDirectListenerInstalled(InstallationResult aResult) override;
  void NotifyDirectListenerUninstalled() override;

739
 private:
740
  void NewData(const MediaSegment& aMedia, TrackRate aRate = 0);
741

742
743
744
  RefPtr<MediaSessionConduit> mConduit;
  RefPtr<AudioProxyThread> mAudioProcessing;
  RefPtr<VideoFrameConverter> mConverter;
745
746

  // active is true if there is a transport to send on
747
  mozilla::Atomic<bool> mActive;
748
749
  // enabled is true if the media access control permits sending
  // actual content; when false you get black/silence
750
  mozilla::Atomic<bool> mEnabled;
751

752
  // Written and read on the MediaTrackGraph thread
753
  bool mDirectConnect;
754
755
};

756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
// MediaStreamTrackConsumer inherits from SupportsWeakPtr, which is
// main-thread-only.
class MediaPipelineTransmit::PipelineListenerTrackConsumer
    : public MediaStreamTrackConsumer {
  virtual ~PipelineListenerTrackConsumer() { MOZ_ASSERT(NS_IsMainThread()); }

  const RefPtr<PipelineListener> mListener;

 public:
  NS_INLINE_DECL_REFCOUNTING(PipelineListenerTrackConsumer)

  explicit PipelineListenerTrackConsumer(RefPtr<PipelineListener> aListener)
      : mListener(std::move(aListener)) {
    MOZ_ASSERT(NS_IsMainThread());
  }

  // Implement MediaStreamTrackConsumer
  void NotifyEnabledChanged(MediaStreamTrack* aTrack, bool aEnabled) override {
    MOZ_ASSERT(NS_IsMainThread());
    mListener->SetTrackEnabled(aTrack, aEnabled);
  }
};

779
780
781
782
783
784
// Implements VideoConverterListener for MediaPipeline.
//
// We pass converted frames on to MediaPipelineTransmit::PipelineListener
// where they are further forwarded to VideoConduit.
// MediaPipelineTransmit calls Detach() during shutdown to ensure there is
// no cyclic dependencies between us and PipelineListener.
785
786
class MediaPipelineTransmit::VideoFrameFeeder : public VideoConverterListener {
 public:
787
788
  explicit VideoFrameFeeder(RefPtr<PipelineListener> aListener)
      : mMutex("VideoFrameFeeder"), mListener(std::move(aListener)) {
789
790
791
    MOZ_COUNT_CTOR(VideoFrameFeeder);
  }

792
  void Detach() {
793
    MutexAutoLock lock(mMutex);
794

795
    mListener = nullptr;
796
797
  }

798
  void OnVideoFrameConverted(const webrtc::VideoFrame& aVideoFrame) override {
799
    MutexAutoLock lock(mMutex);
800

801
    if (!mListener) {
802
803
804
      return;
    }

805
    mListener->OnVideoFrameConverted(aVideoFrame);
806
807
  }

808
 protected:
809
  MOZ_COUNTED_DTOR_OVERRIDE(VideoFrameFeeder)
810

811
  Mutex mMutex;  // Protects the member below.
812
  RefPtr<PipelineListener> mListener;
813
814
815
};

MediaPipelineTransmit::MediaPipelineTransmit(
816
817
818
    const std::string& aPc, RefPtr<MediaTransportHandler> aTransportHandler,
    RefPtr<nsISerialEventTarget> aMainThread,
    RefPtr<nsISerialEventTarget> aStsThread, bool aIsVideo,
819
    RefPtr<MediaSessionConduit> aConduit)
820
821
822
    : MediaPipeline(aPc, std::move(aTransportHandler), DirectionType::TRANSMIT,
                    std::move(aMainThread), std::move(aStsThread),
                    std::move(aConduit)),
823
      mIsVideo(aIsVideo),
824
      mListener(new PipelineListener(mConduit)),
825
826
827
828
      mTrackConsumer(
          MakeAndAddRef<nsMainThreadPtrHolder<PipelineListenerTrackConsumer>>(
              "MediaPipelineTransmit::mTrackConsumer",
              MakeAndAddRef<PipelineListenerTrackConsumer>(mListener))),
829
      mFeeder(aIsVideo ? MakeAndAddRef<VideoFrameFeeder>(mListener)
830
831
832
833
834
                       : nullptr),  // For video we send frames to an
                                    // async VideoFrameConverter that
                                    // calls back to a VideoFrameFeeder
                                    // that feeds I420 frames to
                                    // VideoConduit.
835
      mTransmitting(false) {
836
  if (!IsVideo()) {
837
    mAudioProcessing = MakeAndAddRef<AudioProxyThread>(
838
        static_cast<AudioSessionConduit*>(Conduit()));
839
    mListener->SetAudioProxy(mAudioProcessing);