From 4ba886a78188fd8060f9cff18b32204bf0f05021 Mon Sep 17 00:00:00 2001 From: Bastien Nocera Date: May 14 2009 09:57:20 +0000 Subject: - Update pulsesink to latest upstream version, fixes assertion when Rhythmbox tries to change the sound from the wrong thread --- diff --git a/gstreamer-plugins-good.spec b/gstreamer-plugins-good.spec index 43e6264..054571b 100644 --- a/gstreamer-plugins-good.spec +++ b/gstreamer-plugins-good.spec @@ -6,7 +6,7 @@ Name: %{gstreamer}-plugins-good Version: 0.10.14 -Release: 2%{?dist} +Release: 3%{?dist} Summary: GStreamer plug-ins with good code and licensing Group: Applications/Multimedia @@ -16,6 +16,8 @@ Source: http://gstreamer.freedesktop.org/src/gst-plugins-good/gst-plugin BuildRoot: %{_tmppath}/%{name}-%{version}-%{release}-root-%(%{__id_u} -n) # http://bugzilla.gnome.org/show_bug.cgi?id=570995 Patch0: h264pay-compiler-warning.diff +# Upstream changes +Patch1: pulsesink-0.10.14-update.patch Requires: %{gstreamer} >= %{_gst} Requires(pre): GConf2 @@ -100,6 +102,7 @@ This is a dummy package to make gstreamer-plugins-good multilib. %prep %setup -q -n gst-plugins-good-%{version} %patch0 -p1 -b .h264-compile +%patch1 -p1 -b .pulse-fixes %build @@ -253,6 +256,10 @@ export GCONF_CONFIG_SOURCE=`gconftool-2 --get-default-source` gconftool-2 --makefile-install-rule %{_sysconfdir}/gconf/schemas/gstreamer-%{majorminor}.schemas > /dev/null || : %changelog +* Thu May 14 2009 Bastien Nocera 0.10.14-3 +- Update pulsesink to latest upstream version, fixes assertion when + Rhythmbox tries to change the sound from the wrong thread + * Tue Feb 24 2009 Fedora Release Engineering - 0.10.14-2 - Rebuilt for https://fedoraproject.org/wiki/Fedora_11_Mass_Rebuild diff --git a/pulsesink-0.10.14-update.patch b/pulsesink-0.10.14-update.patch new file mode 100644 index 0000000..c06dd87 --- /dev/null +++ b/pulsesink-0.10.14-update.patch @@ -0,0 +1,2679 @@ +diff -upr gst-plugins-good-0.10.14.old/ext/pulse/pulsemixerctrl.c gst-plugins-good-0.10.14/ext/pulse/pulsemixerctrl.c +--- gst-plugins-good-0.10.14.old/ext/pulse/pulsemixerctrl.c 2009-02-15 13:40:25.000000000 +0000 ++++ gst-plugins-good-0.10.14/ext/pulse/pulsemixerctrl.c 2009-03-25 12:29:09.000000000 +0000 +@@ -93,10 +93,11 @@ gst_pulsemixer_ctrl_sink_info_cb (pa_con + c->type = GST_PULSEMIXER_SINK; + + if (c->track) { +- int i = g_atomic_int_get ((gint *) & c->track->flags); ++ GstMixerTrackFlags flags = c->track->flags; + +- i = (i & ~GST_MIXER_TRACK_MUTE) | (c->muted ? GST_MIXER_TRACK_MUTE : 0); +- g_atomic_int_set ((gint *) & c->track->flags, i); ++ flags = ++ (flags & ~GST_MIXER_TRACK_MUTE) | (c->muted ? GST_MIXER_TRACK_MUTE : 0); ++ c->track->flags = flags; + } + + c->operation_success = 1; +@@ -142,10 +143,11 @@ gst_pulsemixer_ctrl_source_info_cb (pa_c + c->type = GST_PULSEMIXER_SOURCE; + + if (c->track) { +- int i = g_atomic_int_get ((gint *) & c->track->flags); ++ GstMixerTrackFlags flags = c->track->flags; + +- i = (i & ~GST_MIXER_TRACK_MUTE) | (c->muted ? GST_MIXER_TRACK_MUTE : 0); +- g_atomic_int_set ((gint *) & c->track->flags, i); ++ flags = ++ (flags & ~GST_MIXER_TRACK_MUTE) | (c->muted ? GST_MIXER_TRACK_MUTE : 0); ++ c->track->flags = flags; + } + + c->operation_success = 1; +@@ -572,10 +574,11 @@ gst_pulsemixer_ctrl_set_mute (GstPulseMi + c->update_mute = TRUE; + + if (c->track) { +- int i = g_atomic_int_get ((gint *) & c->track->flags); ++ GstMixerTrackFlags flags = c->track->flags; + +- i = (i & ~GST_MIXER_TRACK_MUTE) | (c->muted ? GST_MIXER_TRACK_MUTE : 0); +- g_atomic_int_set ((gint *) & c->track->flags, i); ++ flags = ++ (flags & ~GST_MIXER_TRACK_MUTE) | (c->muted ? GST_MIXER_TRACK_MUTE : 0); ++ c->track->flags = flags; + } + + restart_time_event (c); +diff -upr gst-plugins-good-0.10.14.old/ext/pulse/pulsesink.c gst-plugins-good-0.10.14/ext/pulse/pulsesink.c +--- gst-plugins-good-0.10.14.old/ext/pulse/pulsesink.c 2009-02-15 13:40:25.000000000 +0000 ++++ gst-plugins-good-0.10.14/ext/pulse/pulsesink.c 2009-05-14 10:49:35.000000000 +0100 +@@ -1,7 +1,7 @@ +-/* +- * GStreamer pulseaudio plugin ++/* GStreamer pulseaudio plugin + * + * Copyright (c) 2004-2008 Lennart Poettering ++ * (c) 2009 Wim Taymans + * + * gst-pulse is free software; you can redistribute it and/or modify + * it under the terms of the GNU Lesser General Public License as +@@ -66,1009 +66,1659 @@ enum + PROP_VOLUME + }; + +-static void gst_pulsesink_destroy_stream (GstPulseSink * pulsesink); +- +-static void gst_pulsesink_destroy_context (GstPulseSink * pulsesink); +- +-static void gst_pulsesink_set_property (GObject * object, guint prop_id, +- const GValue * value, GParamSpec * pspec); +-static void gst_pulsesink_get_property (GObject * object, guint prop_id, +- GValue * value, GParamSpec * pspec); +-static void gst_pulsesink_finalize (GObject * object); +- +-static gboolean gst_pulsesink_open (GstAudioSink * asink); ++#define GST_TYPE_PULSERING_BUFFER \ ++ (gst_pulseringbuffer_get_type()) ++#define GST_PULSERING_BUFFER(obj) \ ++ (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_PULSERING_BUFFER,GstPulseRingBuffer)) ++#define GST_PULSERING_BUFFER_CLASS(klass) \ ++ (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_PULSERING_BUFFER,GstPulseRingBufferClass)) ++#define GST_PULSERING_BUFFER_GET_CLASS(obj) \ ++ (G_TYPE_INSTANCE_GET_CLASS ((obj), GST_TYPE_PULSERING_BUFFER, GstPulseRingBufferClass)) ++#define GST_PULSERING_BUFFER_CAST(obj) \ ++ ((GstPulseRingBuffer *)obj) ++#define GST_IS_PULSERING_BUFFER(obj) \ ++ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PULSERING_BUFFER)) ++#define GST_IS_PULSERING_BUFFER_CLASS(klass)\ ++ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSERING_BUFFER)) ++ ++typedef struct _GstPulseRingBuffer GstPulseRingBuffer; ++typedef struct _GstPulseRingBufferClass GstPulseRingBufferClass; ++ ++/* We keep a custom ringbuffer that is backed up by data allocated by ++ * pulseaudio. We must also overide the commit function to write into ++ * pulseaudio memory instead. */ ++struct _GstPulseRingBuffer ++{ ++ GstRingBuffer object; ++ ++ gchar *stream_name; ++ ++ pa_context *context; ++ pa_stream *stream; ++ ++ pa_sample_spec sample_spec; ++ gint64 offset; ++ ++ gboolean corked; ++ gboolean in_commit; ++ gboolean paused; ++ guint required; ++}; + +-static gboolean gst_pulsesink_close (GstAudioSink * asink); ++struct _GstPulseRingBufferClass ++{ ++ GstRingBufferClass parent_class; ++}; + +-static gboolean gst_pulsesink_prepare (GstAudioSink * asink, ++static void gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass); ++static void gst_pulseringbuffer_init (GstPulseRingBuffer * ringbuffer, ++ GstPulseRingBufferClass * klass); ++static void gst_pulseringbuffer_finalize (GObject * object); ++ ++static GstRingBufferClass *ring_parent_class = NULL; ++ ++static gboolean gst_pulseringbuffer_open_device (GstRingBuffer * buf); ++static gboolean gst_pulseringbuffer_close_device (GstRingBuffer * buf); ++static gboolean gst_pulseringbuffer_acquire (GstRingBuffer * buf, + GstRingBufferSpec * spec); +-static gboolean gst_pulsesink_unprepare (GstAudioSink * asink); +- +-static guint gst_pulsesink_write (GstAudioSink * asink, gpointer data, +- guint length); +-static guint gst_pulsesink_delay (GstAudioSink * asink); +- +-static void gst_pulsesink_reset (GstAudioSink * asink); +- +-static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event); +- +-static GstStateChangeReturn gst_pulsesink_change_state (GstElement * +- element, GstStateChange transition); ++static gboolean gst_pulseringbuffer_release (GstRingBuffer * buf); ++static gboolean gst_pulseringbuffer_start (GstRingBuffer * buf); ++static gboolean gst_pulseringbuffer_pause (GstRingBuffer * buf); ++static gboolean gst_pulseringbuffer_stop (GstRingBuffer * buf); ++static guint gst_pulseringbuffer_commit (GstRingBuffer * buf, ++ guint64 * sample, guchar * data, gint in_samples, gint out_samples, ++ gint * accum); + +-static void gst_pulsesink_init_interfaces (GType type); +- +-static gboolean gst_pulsesink_is_dead (GstPulseSink * pulsesink); +- +-#if (G_BYTE_ORDER == G_LITTLE_ENDIAN) +-# define ENDIANNESS "LITTLE_ENDIAN, BIG_ENDIAN" +-#else +-# define ENDIANNESS "BIG_ENDIAN, LITTLE_ENDIAN" +-#endif +- +-GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink); +-GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstAudioSink, +- GST_TYPE_AUDIO_SINK, gst_pulsesink_init_interfaces); +- +-static gboolean +-gst_pulsesink_interface_supported (GstImplementsInterface * +- iface, GType interface_type) ++/* ringbuffer abstract base class */ ++static GType ++gst_pulseringbuffer_get_type (void) + { +- GstPulseSink *this = GST_PULSESINK (iface); +- +- if (interface_type == GST_TYPE_PROPERTY_PROBE && this->probe) +- return TRUE; ++ static GType ringbuffer_type = 0; + +- return FALSE; +-} ++ if (!ringbuffer_type) { ++ static const GTypeInfo ringbuffer_info = { ++ sizeof (GstPulseRingBufferClass), ++ NULL, ++ NULL, ++ (GClassInitFunc) gst_pulseringbuffer_class_init, ++ NULL, ++ NULL, ++ sizeof (GstPulseRingBuffer), ++ 0, ++ (GInstanceInitFunc) gst_pulseringbuffer_init, ++ NULL ++ }; + +-static void +-gst_pulsesink_implements_interface_init (GstImplementsInterfaceClass * klass) +-{ +- klass->supported = gst_pulsesink_interface_supported; ++ ringbuffer_type = ++ g_type_register_static (GST_TYPE_RING_BUFFER, "GstPulseSinkRingBuffer", ++ &ringbuffer_info, 0); ++ } ++ return ringbuffer_type; + } + + static void +-gst_pulsesink_init_interfaces (GType type) ++gst_pulseringbuffer_class_init (GstPulseRingBufferClass * klass) + { +- static const GInterfaceInfo implements_iface_info = { +- (GInterfaceInitFunc) gst_pulsesink_implements_interface_init, +- NULL, +- NULL, +- }; +- static const GInterfaceInfo probe_iface_info = { +- (GInterfaceInitFunc) gst_pulsesink_property_probe_interface_init, +- NULL, +- NULL, +- }; ++ GObjectClass *gobject_class; ++ GstRingBufferClass *gstringbuffer_class; + +- g_type_add_interface_static (type, GST_TYPE_IMPLEMENTS_INTERFACE, +- &implements_iface_info); +- g_type_add_interface_static (type, GST_TYPE_PROPERTY_PROBE, +- &probe_iface_info); +-} ++ gobject_class = (GObjectClass *) klass; ++ gstringbuffer_class = (GstRingBufferClass *) klass; + +-static void +-gst_pulsesink_base_init (gpointer g_class) +-{ ++ ring_parent_class = g_type_class_peek_parent (klass); + +- static GstStaticPadTemplate pad_template = GST_STATIC_PAD_TEMPLATE ("sink", +- GST_PAD_SINK, +- GST_PAD_ALWAYS, +- GST_STATIC_CAPS ("audio/x-raw-int, " +- "endianness = (int) { " ENDIANNESS " }, " +- "signed = (boolean) TRUE, " +- "width = (int) 16, " +- "depth = (int) 16, " +- "rate = (int) [ 1, MAX ], " +- "channels = (int) [ 1, 32 ];" +- "audio/x-raw-float, " +- "endianness = (int) { " ENDIANNESS " }, " +- "width = (int) 32, " +- "rate = (int) [ 1, MAX ], " +- "channels = (int) [ 1, 32 ];" +- "audio/x-raw-int, " +- "endianness = (int) { " ENDIANNESS " }, " +- "signed = (boolean) TRUE, " +- "width = (int) 32, " +- "depth = (int) 32, " +- "rate = (int) [ 1, MAX ], " +- "channels = (int) [ 1, 32 ];" +- "audio/x-raw-int, " +- "signed = (boolean) FALSE, " +- "width = (int) 8, " +- "depth = (int) 8, " +- "rate = (int) [ 1, MAX ], " +- "channels = (int) [ 1, 32 ];" +- "audio/x-alaw, " +- "rate = (int) [ 1, MAX], " +- "channels = (int) [ 1, 32 ];" +- "audio/x-mulaw, " +- "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ]") +- ); ++ gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_finalize); + +- GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); ++ gstringbuffer_class->open_device = ++ GST_DEBUG_FUNCPTR (gst_pulseringbuffer_open_device); ++ gstringbuffer_class->close_device = ++ GST_DEBUG_FUNCPTR (gst_pulseringbuffer_close_device); ++ gstringbuffer_class->acquire = ++ GST_DEBUG_FUNCPTR (gst_pulseringbuffer_acquire); ++ gstringbuffer_class->release = ++ GST_DEBUG_FUNCPTR (gst_pulseringbuffer_release); ++ gstringbuffer_class->start = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start); ++ gstringbuffer_class->pause = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_pause); ++ gstringbuffer_class->resume = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_start); ++ gstringbuffer_class->stop = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_stop); + +- gst_element_class_set_details_simple (element_class, +- "PulseAudio Audio Sink", +- "Sink/Audio", "Plays audio to a PulseAudio server", "Lennart Poettering"); +- gst_element_class_add_pad_template (element_class, +- gst_static_pad_template_get (&pad_template)); ++ gstringbuffer_class->commit = GST_DEBUG_FUNCPTR (gst_pulseringbuffer_commit); + } + + static void +-gst_pulsesink_class_init (GstPulseSinkClass * klass) ++gst_pulseringbuffer_init (GstPulseRingBuffer * pbuf, ++ GstPulseRingBufferClass * g_class) + { +- GObjectClass *gobject_class = G_OBJECT_CLASS (klass); +- GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass); +- GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass); +- GstAudioSinkClass *gstaudiosink_class = GST_AUDIO_SINK_CLASS (klass); +- +- gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pulsesink_finalize); +- gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_pulsesink_set_property); +- gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_pulsesink_get_property); +- +- gstelement_class->change_state = +- GST_DEBUG_FUNCPTR (gst_pulsesink_change_state); +- +- gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_pulsesink_event); +- +- gstaudiosink_class->open = GST_DEBUG_FUNCPTR (gst_pulsesink_open); +- gstaudiosink_class->close = GST_DEBUG_FUNCPTR (gst_pulsesink_close); +- gstaudiosink_class->prepare = GST_DEBUG_FUNCPTR (gst_pulsesink_prepare); +- gstaudiosink_class->unprepare = GST_DEBUG_FUNCPTR (gst_pulsesink_unprepare); +- gstaudiosink_class->write = GST_DEBUG_FUNCPTR (gst_pulsesink_write); +- gstaudiosink_class->delay = GST_DEBUG_FUNCPTR (gst_pulsesink_delay); +- gstaudiosink_class->reset = GST_DEBUG_FUNCPTR (gst_pulsesink_reset); ++ pbuf->stream_name = NULL; ++ pbuf->context = NULL; ++ pbuf->stream = NULL; + +- /* Overwrite GObject fields */ +- g_object_class_install_property (gobject_class, +- PROP_SERVER, +- g_param_spec_string ("server", "Server", +- "The PulseAudio server to connect to", NULL, +- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +- g_object_class_install_property (gobject_class, PROP_DEVICE, +- g_param_spec_string ("device", "Sink", +- "The PulseAudio sink device to connect to", NULL, +- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); +- g_object_class_install_property (gobject_class, +- PROP_DEVICE_NAME, +- g_param_spec_string ("device-name", "Device name", +- "Human-readable name of the sound device", NULL, +- G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); +-#if HAVE_PULSE_0_9_12 +- g_object_class_install_property (gobject_class, +- PROP_VOLUME, +- g_param_spec_double ("volume", "Volume", +- "Volume of this stream", 0.0, 1000.0, 1.0, +- G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); ++#if HAVE_PULSE_0_9_13 ++ pa_sample_spec_init (&pbuf->sample_spec); ++#else ++ pbuf->sample_spec.format = PA_SAMPLE_INVALID; ++ pbuf->sample_spec.rate = 0; ++ pbuf->sample_spec.channels = 0; + #endif ++ ++ pbuf->paused = FALSE; ++ pbuf->corked = TRUE; + } + + static void +-gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass) ++gst_pulsering_destroy_stream (GstPulseRingBuffer * pbuf) + { +- int e; +- +- pulsesink->server = pulsesink->device = pulsesink->stream_name = +- pulsesink->device_description = NULL; +- +- pulsesink->context = NULL; +- pulsesink->stream = NULL; +- +- pulsesink->volume = 1.0; +- pulsesink->volume_set = FALSE; +- +-#if HAVE_PULSE_0_9_13 +- pa_sample_spec_init (&pulsesink->sample_spec); +-#else +- pulsesink->sample_spec.format = PA_SAMPLE_INVALID; +- pulsesink->sample_spec.rate = 0; +- pulsesink->sample_spec.channels = 0; +-#endif ++ if (pbuf->stream) { ++ pa_stream_disconnect (pbuf->stream); + +- pulsesink->operation_success = FALSE; +- pulsesink->did_reset = FALSE; +- pulsesink->in_write = FALSE; +- pulsesink->notify = 0; +- +- pulsesink->mainloop = pa_threaded_mainloop_new (); +- g_assert (pulsesink->mainloop); ++ /* Make sure we don't get any further callbacks */ ++ pa_stream_set_state_callback (pbuf->stream, NULL, NULL); ++ pa_stream_set_write_callback (pbuf->stream, NULL, NULL); + +- e = pa_threaded_mainloop_start (pulsesink->mainloop); +- g_assert (e == 0); ++ pa_stream_unref (pbuf->stream); ++ pbuf->stream = NULL; ++ } + +- pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink), G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device, TRUE, FALSE); /* TRUE for sinks, FALSE for sources */ ++ g_free (pbuf->stream_name); ++ pbuf->stream_name = NULL; + } + + static void +-gst_pulsesink_destroy_stream (GstPulseSink * pulsesink) ++gst_pulsering_destroy_context (GstPulseRingBuffer * pbuf) + { +- if (pulsesink->stream) { +- pa_stream_disconnect (pulsesink->stream); ++ gst_pulsering_destroy_stream (pbuf); ++ ++ if (pbuf->context) { ++ pa_context_disconnect (pbuf->context); + + /* Make sure we don't get any further callbacks */ +- pa_stream_set_state_callback (pulsesink->stream, NULL, NULL); +- pa_stream_set_write_callback (pulsesink->stream, NULL, NULL); +- pa_stream_set_latency_update_callback (pulsesink->stream, NULL, NULL); ++ pa_context_set_state_callback (pbuf->context, NULL, NULL); ++ pa_context_set_subscribe_callback (pbuf->context, NULL, NULL); + +- pa_stream_unref (pulsesink->stream); +- pulsesink->stream = NULL; ++ pa_context_unref (pbuf->context); ++ pbuf->context = NULL; + } +- +- g_free (pulsesink->stream_name); +- pulsesink->stream_name = NULL; +- +- g_free (pulsesink->device_description); +- pulsesink->device_description = NULL; + } + + static void +-gst_pulsesink_destroy_context (GstPulseSink * pulsesink) ++gst_pulseringbuffer_finalize (GObject * object) + { ++ GstPulseRingBuffer *ringbuffer; + +- gst_pulsesink_destroy_stream (pulsesink); ++ ringbuffer = GST_PULSERING_BUFFER_CAST (object); + +- if (pulsesink->context) { +- pa_context_disconnect (pulsesink->context); ++ gst_pulsering_destroy_context (ringbuffer); + +- /* Make sure we don't get any further callbacks */ +- pa_context_set_state_callback (pulsesink->context, NULL, NULL); +- pa_context_set_subscribe_callback (pulsesink->context, NULL, NULL); ++ G_OBJECT_CLASS (ring_parent_class)->finalize (object); ++} ++ ++static gboolean ++gst_pulsering_is_dead (GstPulseSink * psink, GstPulseRingBuffer * pbuf) ++{ ++ if (!pbuf->context ++ || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pbuf->context)) ++ || !pbuf->stream ++ || !PA_STREAM_IS_GOOD (pa_stream_get_state (pbuf->stream))) { ++ const gchar *err_str = pbuf->context ? ++ pa_strerror (pa_context_errno (pbuf->context)) : NULL; + +- pa_context_unref (pulsesink->context); +- pulsesink->context = NULL; ++ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Disconnected: %s", ++ err_str), (NULL)); ++ return TRUE; + } ++ return FALSE; + } + + static void +-gst_pulsesink_finalize (GObject * object) ++gst_pulsering_context_state_cb (pa_context * c, void *userdata) + { +- GstPulseSink *pulsesink = GST_PULSESINK (object); +- +- pa_threaded_mainloop_stop (pulsesink->mainloop); ++ GstPulseSink *psink; ++ GstPulseRingBuffer *pbuf; ++ pa_context_state_t state; + +- gst_pulsesink_destroy_context (pulsesink); ++ pbuf = GST_PULSERING_BUFFER_CAST (userdata); ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + +- g_free (pulsesink->server); +- g_free (pulsesink->device); ++ state = pa_context_get_state (c); ++ GST_LOG_OBJECT (psink, "got new context state %d", state); + +- pa_threaded_mainloop_free (pulsesink->mainloop); ++ switch (state) { ++ case PA_CONTEXT_READY: ++ case PA_CONTEXT_TERMINATED: ++ case PA_CONTEXT_FAILED: ++ GST_LOG_OBJECT (psink, "signaling"); ++ pa_threaded_mainloop_signal (psink->mainloop, 0); ++ break; + +- if (pulsesink->probe) { +- gst_pulseprobe_free (pulsesink->probe); +- pulsesink->probe = NULL; ++ case PA_CONTEXT_UNCONNECTED: ++ case PA_CONTEXT_CONNECTING: ++ case PA_CONTEXT_AUTHORIZING: ++ case PA_CONTEXT_SETTING_NAME: ++ break; + } +- +- G_OBJECT_CLASS (parent_class)->finalize (object); + } + + #if HAVE_PULSE_0_9_12 + static void +-gst_pulsesink_set_volume (GstPulseSink * pulsesink, gdouble volume) ++gst_pulsering_context_subscribe_cb (pa_context * c, ++ pa_subscription_event_type_t t, uint32_t idx, void *userdata) + { +- pa_cvolume v; +- pa_operation *o = NULL; +- +- pa_threaded_mainloop_lock (pulsesink->mainloop); ++ GstPulseSink *psink; ++ GstPulseRingBuffer *pbuf; + +- pulsesink->volume = volume; +- pulsesink->volume_set = TRUE; ++ pbuf = GST_PULSERING_BUFFER_CAST (userdata); ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + +- if (!pulsesink->stream) +- goto unlock; +- +- gst_pulse_cvolume_from_linear (&v, pulsesink->sample_spec.channels, volume); ++ GST_LOG_OBJECT (psink, "type %d, idx %u", t, idx); + +- if (!(o = pa_context_set_sink_input_volume (pulsesink->context, +- pa_stream_get_index (pulsesink->stream), &v, NULL, NULL))) { +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, +- ("pa_stream_set_sink_input_volume() failed: %s", +- pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); +- goto unlock; +- } ++ if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) && ++ t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW)) ++ goto done; + +- /* We don't really care about the result of this call */ ++ if (!pbuf->stream) ++ goto done; + +-unlock: ++ if (idx != pa_stream_get_index (pbuf->stream)) ++ goto done; + +- if (o) +- pa_operation_unref (o); ++ /* Actually this event is also triggered when other properties of ++ * the stream change that are unrelated to the volume. However it is ++ * probably cheaper to signal the change here and check for the ++ * volume when the GObject property is read instead of querying it always. */ + +- pa_threaded_mainloop_unlock (pulsesink->mainloop); ++done: ++ /* inform streaming thread to notify */ ++ g_atomic_int_compare_and_exchange (&psink->notify, 0, 1); + } ++#endif + +-static void +-gst_pulsesink_sink_input_info_cb (pa_context * c, const pa_sink_input_info * i, +- int eol, void *userdata) ++/* will be called when the device should be opened. In this case we will connect ++ * to the server. We should not try to open any streams in this state. */ ++static gboolean ++gst_pulseringbuffer_open_device (GstRingBuffer * buf) + { +- GstPulseSink *pulsesink = GST_PULSESINK (userdata); ++ GstPulseSink *psink; ++ GstPulseRingBuffer *pbuf; ++ gchar *name; ++ pa_mainloop_api *api; + +- if (!i) +- return; ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); ++ pbuf = GST_PULSERING_BUFFER_CAST (buf); + +- if (!pulsesink->stream) +- return; ++ g_assert (!pbuf->context); ++ g_assert (!pbuf->stream); + +- g_assert (i->index == pa_stream_get_index (pulsesink->stream)); ++ name = gst_pulse_client_name (); + +- pulsesink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume)); +-} ++ pa_threaded_mainloop_lock (psink->mainloop); + +-static gdouble +-gst_pulsesink_get_volume (GstPulseSink * pulsesink) +-{ +- pa_operation *o = NULL; +- gdouble v; ++ /* get the mainloop api and create a context */ ++ GST_LOG_OBJECT (psink, "new context with name %s", GST_STR_NULL (name)); ++ api = pa_threaded_mainloop_get_api (psink->mainloop); ++ if (!(pbuf->context = pa_context_new (api, name))) ++ goto create_failed; ++ ++ /* register some essential callbacks */ ++ pa_context_set_state_callback (pbuf->context, ++ gst_pulsering_context_state_cb, pbuf); ++#if HAVE_PULSE_0_9_12 ++ pa_context_set_subscribe_callback (pbuf->context, ++ gst_pulsering_context_subscribe_cb, pbuf); ++#endif + +- pa_threaded_mainloop_lock (pulsesink->mainloop); ++ /* try to connect to the server and wait for completioni, we don't want to ++ * autospawn a deamon */ ++ GST_LOG_OBJECT (psink, "connect to server %s", GST_STR_NULL (psink->server)); ++ if (pa_context_connect (pbuf->context, psink->server, PA_CONTEXT_NOAUTOSPAWN, ++ NULL) < 0) ++ goto connect_failed; + +- if (!pulsesink->stream) +- goto unlock; ++ for (;;) { ++ pa_context_state_t state; + +- if (!(o = pa_context_get_sink_input_info (pulsesink->context, +- pa_stream_get_index (pulsesink->stream), +- gst_pulsesink_sink_input_info_cb, pulsesink))) { ++ state = pa_context_get_state (pbuf->context); + +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, +- ("pa_stream_get_sink_input_info() failed: %s", +- pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); +- goto unlock; +- } ++ GST_LOG_OBJECT (psink, "context state is now %d", state); + +- while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { ++ if (!PA_CONTEXT_IS_GOOD (state)) ++ goto connect_failed; + +- if (gst_pulsesink_is_dead (pulsesink)) +- goto unlock; ++ if (state == PA_CONTEXT_READY) ++ break; + +- pa_threaded_mainloop_wait (pulsesink->mainloop); ++ /* Wait until the context is ready */ ++ GST_LOG_OBJECT (psink, "waiting.."); ++ pa_threaded_mainloop_wait (psink->mainloop); + } + +-unlock: ++ GST_LOG_OBJECT (psink, "opened the device"); + +- if (o) +- pa_operation_unref (o); ++ pa_threaded_mainloop_unlock (psink->mainloop); ++ g_free (name); + +- v = pulsesink->volume; ++ return TRUE; + +- pa_threaded_mainloop_unlock (pulsesink->mainloop); ++ /* ERRORS */ ++unlock_and_fail: ++ { ++ gst_pulsering_destroy_context (pbuf); + +- return v; ++ pa_threaded_mainloop_unlock (psink->mainloop); ++ g_free (name); ++ return FALSE; ++ } ++create_failed: ++ { ++ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ++ ("Failed to create context"), (NULL)); ++ goto unlock_and_fail; ++ } ++connect_failed: ++ { ++ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ("Failed to connect: %s", ++ pa_strerror (pa_context_errno (pbuf->context))), (NULL)); ++ goto unlock_and_fail; ++ } + } +-#endif + ++/* close the device */ + static gboolean +-gst_pulsesink_is_dead (GstPulseSink * pulsesink) ++gst_pulseringbuffer_close_device (GstRingBuffer * buf) + { ++ GstPulseSink *psink; ++ GstPulseRingBuffer *pbuf; + +- if (!pulsesink->context +- || !PA_CONTEXT_IS_GOOD (pa_context_get_state (pulsesink->context)) +- || !pulsesink->stream +- || !PA_STREAM_IS_GOOD (pa_stream_get_state (pulsesink->stream))) { +- const gchar *err_str = pulsesink->context ? +- pa_strerror (pa_context_errno (pulsesink->context)) : NULL; ++ pbuf = GST_PULSERING_BUFFER_CAST (buf); ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); + +- GST_ELEMENT_ERROR ((pulsesink), RESOURCE, FAILED, ("Disconnected: %s", +- err_str), (NULL)); +- return TRUE; +- } ++ GST_LOG_OBJECT (psink, "closing device"); + +- return FALSE; ++ pa_threaded_mainloop_lock (psink->mainloop); ++ gst_pulsering_destroy_context (pbuf); ++ pa_threaded_mainloop_unlock (psink->mainloop); ++ ++ GST_LOG_OBJECT (psink, "closed device"); ++ ++ return TRUE; + } + + static void +-gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol, +- void *userdata) ++gst_pulsering_stream_state_cb (pa_stream * s, void *userdata) + { +- GstPulseSink *pulsesink = GST_PULSESINK (userdata); +- +- if (!i) +- return; +- +- if (!pulsesink->stream) +- return; ++ GstPulseSink *psink; ++ GstPulseRingBuffer *pbuf; ++ pa_stream_state_t state; + +- g_assert (i->index == pa_stream_get_device_index (pulsesink->stream)); ++ pbuf = GST_PULSERING_BUFFER_CAST (userdata); ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + +- g_free (pulsesink->device_description); +- pulsesink->device_description = g_strdup (i->description); +-} ++ state = pa_stream_get_state (s); ++ GST_LOG_OBJECT (psink, "got new stream state %d", state); + +-static gchar * +-gst_pulsesink_device_description (GstPulseSink * pulsesink) +-{ +- pa_operation *o = NULL; +- gchar *t; ++ switch (state) { ++ case PA_STREAM_READY: ++ case PA_STREAM_FAILED: ++ case PA_STREAM_TERMINATED: ++ GST_LOG_OBJECT (psink, "signaling"); ++ pa_threaded_mainloop_signal (psink->mainloop, 0); ++ break; ++ case PA_STREAM_UNCONNECTED: ++ case PA_STREAM_CREATING: ++ break; ++ } ++} + +- pa_threaded_mainloop_lock (pulsesink->mainloop); ++static void ++gst_pulsering_stream_request_cb (pa_stream * s, size_t length, void *userdata) ++{ ++ GstPulseSink *psink; ++ GstRingBuffer *rbuf; ++ GstPulseRingBuffer *pbuf; + +- if (!pulsesink->stream) +- goto unlock; ++ rbuf = GST_RING_BUFFER_CAST (userdata); ++ pbuf = GST_PULSERING_BUFFER_CAST (userdata); ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + +- if (!(o = pa_context_get_sink_info_by_index (pulsesink->context, +- pa_stream_get_device_index (pulsesink->stream), +- gst_pulsesink_sink_info_cb, pulsesink))) { ++ GST_LOG_OBJECT (psink, "got request for length %" G_GSIZE_FORMAT, length); + +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, +- ("pa_stream_get_sink_info() failed: %s", +- pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); +- goto unlock; ++ if (pbuf->in_commit && (length >= rbuf->spec.segsize)) { ++ /* only signal when we are waiting in the commit thread ++ * and got request for atleast a segment */ ++ pa_threaded_mainloop_signal (psink->mainloop, 0); + } ++} + +- while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { ++/* This method should create a new stream of the given @spec. No playback should ++ * start yet so we start in the corked state. */ ++static gboolean ++gst_pulseringbuffer_acquire (GstRingBuffer * buf, GstRingBufferSpec * spec) ++{ ++ GstPulseSink *psink; ++ GstPulseRingBuffer *pbuf; ++ pa_buffer_attr buf_attr; ++ const pa_buffer_attr *buf_attr_ptr; ++ pa_channel_map channel_map; ++ pa_operation *o = NULL; ++ pa_cvolume v, *pv; ++ pa_stream_flags_t flags; ++ const gchar *name; ++ GstAudioClock *clock; ++ gint64 time_offset; ++ ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); ++ pbuf = GST_PULSERING_BUFFER_CAST (buf); ++ ++ GST_LOG_OBJECT (psink, "creating sample spec"); ++ /* convert the gstreamer sample spec to the pulseaudio format */ ++ if (!gst_pulse_fill_sample_spec (spec, &pbuf->sample_spec)) ++ goto invalid_spec; ++ ++ pa_threaded_mainloop_lock (psink->mainloop); ++ ++ /* we need a context and a no stream */ ++ g_assert (pbuf->context); ++ g_assert (!pbuf->stream); ++ ++ /* enable event notifications */ ++ GST_LOG_OBJECT (psink, "subscribing to context events"); ++ if (!(o = pa_context_subscribe (pbuf->context, ++ PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL))) ++ goto subscribe_failed; + +- if (gst_pulsesink_is_dead (pulsesink)) +- goto unlock; ++ pa_operation_unref (o); + +- pa_threaded_mainloop_wait (pulsesink->mainloop); +- } ++ /* initialize the channel map */ ++ gst_pulse_gst_to_channel_map (&channel_map, spec); + +-unlock: ++ /* find a good name for the stream */ ++ if (psink->stream_name) ++ name = psink->stream_name; ++ else ++ name = "Playback Stream"; ++ ++ /* create a stream */ ++ GST_LOG_OBJECT (psink, "creating stream with name %s", name); ++ if (!(pbuf->stream = pa_stream_new (pbuf->context, ++ name, &pbuf->sample_spec, &channel_map))) ++ goto stream_failed; ++ ++ /* install essential callbacks */ ++ pa_stream_set_state_callback (pbuf->stream, ++ gst_pulsering_stream_state_cb, pbuf); ++ pa_stream_set_write_callback (pbuf->stream, ++ gst_pulsering_stream_request_cb, pbuf); + +- if (o) +- pa_operation_unref (o); ++ /* buffering requirements. When setting prebuf to 0, the stream will not pause ++ * when we cause an underrun, which causes time to continue. */ ++ memset (&buf_attr, 0, sizeof (buf_attr)); ++ buf_attr.tlength = spec->segtotal * spec->segsize; ++ buf_attr.maxlength = buf_attr.tlength * 2; ++ buf_attr.prebuf = 0; ++ buf_attr.minreq = spec->segsize; + +- t = g_strdup (pulsesink->device_description); ++ GST_INFO_OBJECT (psink, "tlength: %d", buf_attr.tlength); ++ GST_INFO_OBJECT (psink, "maxlength: %d", buf_attr.maxlength); ++ GST_INFO_OBJECT (psink, "prebuf: %d", buf_attr.prebuf); ++ GST_INFO_OBJECT (psink, "minreq: %d", buf_attr.minreq); ++ ++ /* configure volume when we changed it, else we leave the default */ ++ if (psink->volume_set) { ++ GST_LOG_OBJECT (psink, "have volume of %f", psink->volume); ++ pv = &v; ++ gst_pulse_cvolume_from_linear (pv, pbuf->sample_spec.channels, ++ psink->volume); ++ } else { ++ pv = NULL; ++ } + +- pa_threaded_mainloop_unlock (pulsesink->mainloop); ++ /* construct the flags */ ++ flags = PA_STREAM_INTERPOLATE_TIMING | PA_STREAM_AUTO_TIMING_UPDATE | ++#if HAVE_PULSE_0_9_11 ++ PA_STREAM_ADJUST_LATENCY | ++#endif ++ PA_STREAM_START_CORKED; + +- return t; +-} ++ /* we always start corked (see flags above) */ ++ pbuf->corked = TRUE; + +-static void +-gst_pulsesink_set_property (GObject * object, +- guint prop_id, const GValue * value, GParamSpec * pspec) +-{ +- GstPulseSink *pulsesink = GST_PULSESINK (object); ++ /* try to connect now */ ++ GST_LOG_OBJECT (psink, "connect for playback to device %s", ++ GST_STR_NULL (psink->device)); ++ if (pa_stream_connect_playback (pbuf->stream, psink->device, ++ &buf_attr, flags, pv, NULL) < 0) ++ goto connect_failed; ++ ++ /* our clock will now start from 0 again */ ++ clock = GST_AUDIO_CLOCK (GST_BASE_AUDIO_SINK (psink)->provided_clock); ++ gst_audio_clock_reset (clock, 0); ++ time_offset = clock->abidata.ABI.time_offset; ++ ++ GST_LOG_OBJECT (psink, "got time offset %" GST_TIME_FORMAT, ++ GST_TIME_ARGS (time_offset)); ++ ++ /* calculate the sample offset for 0 */ ++ if (time_offset > 0) ++ pbuf->offset = gst_util_uint64_scale_int (time_offset, ++ pbuf->sample_spec.rate, GST_SECOND); ++ else ++ pbuf->offset = -gst_util_uint64_scale_int (-time_offset, ++ pbuf->sample_spec.rate, GST_SECOND); ++ GST_LOG_OBJECT (psink, "sample offset %" G_GINT64_FORMAT, pbuf->offset); + +- switch (prop_id) { +- case PROP_SERVER: +- g_free (pulsesink->server); +- pulsesink->server = g_value_dup_string (value); ++ for (;;) { ++ pa_stream_state_t state; + +- if (pulsesink->probe) +- gst_pulseprobe_set_server (pulsesink->probe, pulsesink->server); ++ state = pa_stream_get_state (pbuf->stream); + +- break; ++ GST_LOG_OBJECT (psink, "stream state is now %d", state); + +- case PROP_DEVICE: +- g_free (pulsesink->device); +- pulsesink->device = g_value_dup_string (value); +- break; ++ if (!PA_STREAM_IS_GOOD (state)) ++ goto connect_failed; + +-#if HAVE_PULSE_0_9_12 +- case PROP_VOLUME: +- gst_pulsesink_set_volume (pulsesink, g_value_get_double (value)); ++ if (state == PA_STREAM_READY) + break; +-#endif + +- default: +- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); +- break; ++ /* Wait until the stream is ready */ ++ pa_threaded_mainloop_wait (psink->mainloop); + } +-} + +-static void +-gst_pulsesink_get_property (GObject * object, +- guint prop_id, GValue * value, GParamSpec * pspec) +-{ ++ GST_LOG_OBJECT (psink, "stream is acquired now"); + +- GstPulseSink *pulsesink = GST_PULSESINK (object); ++ /* get the actual buffering properties now */ ++ buf_attr_ptr = pa_stream_get_buffer_attr (pbuf->stream); + +- switch (prop_id) { +- case PROP_SERVER: +- g_value_set_string (value, pulsesink->server); +- break; ++ GST_INFO_OBJECT (psink, "tlength: %d", buf_attr_ptr->tlength); ++ GST_INFO_OBJECT (psink, "maxlength: %d", buf_attr_ptr->maxlength); ++ GST_INFO_OBJECT (psink, "prebuf: %d", buf_attr_ptr->prebuf); ++ GST_INFO_OBJECT (psink, "minreq: %d", buf_attr_ptr->minreq); + +- case PROP_DEVICE: +- g_value_set_string (value, pulsesink->device); +- break; ++ spec->segsize = buf_attr.minreq; ++ spec->segtotal = buf_attr.tlength / spec->segsize; + +- case PROP_DEVICE_NAME:{ +- char *t = gst_pulsesink_device_description (pulsesink); +- g_value_set_string (value, t); +- g_free (t); +- break; +- } ++ pa_threaded_mainloop_unlock (psink->mainloop); + +-#if HAVE_PULSE_0_9_12 +- case PROP_VOLUME: +- g_value_set_double (value, gst_pulsesink_get_volume (pulsesink)); +- break; +-#endif ++ return TRUE; + +- default: +- G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); +- break; ++ /* ERRORS */ ++unlock_and_fail: ++ { ++ gst_pulsering_destroy_stream (pbuf); ++ pa_threaded_mainloop_unlock (psink->mainloop); ++ ++ return FALSE; ++ } ++invalid_spec: ++ { ++ GST_ELEMENT_ERROR (psink, RESOURCE, SETTINGS, ++ ("Invalid sample specification."), (NULL)); ++ return FALSE; ++ } ++subscribe_failed: ++ { ++ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ++ ("pa_context_subscribe() failed: %s", ++ pa_strerror (pa_context_errno (pbuf->context))), (NULL)); ++ goto unlock_and_fail; ++ } ++stream_failed: ++ { ++ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ++ ("Failed to create stream: %s", ++ pa_strerror (pa_context_errno (pbuf->context))), (NULL)); ++ goto unlock_and_fail; ++ } ++connect_failed: ++ { ++ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ++ ("Failed to connect stream: %s", ++ pa_strerror (pa_context_errno (pbuf->context))), (NULL)); ++ goto unlock_and_fail; + } + } + +-static void +-gst_pulsesink_context_state_cb (pa_context * c, void *userdata) ++/* free the stream that we acquired before */ ++static gboolean ++gst_pulseringbuffer_release (GstRingBuffer * buf) + { +- GstPulseSink *pulsesink = GST_PULSESINK (userdata); ++ GstPulseSink *psink; ++ GstPulseRingBuffer *pbuf; + +- switch (pa_context_get_state (c)) { +- case PA_CONTEXT_READY: +- case PA_CONTEXT_TERMINATED: +- case PA_CONTEXT_FAILED: +- pa_threaded_mainloop_signal (pulsesink->mainloop, 0); +- break; ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (buf)); ++ pbuf = GST_PULSERING_BUFFER_CAST (buf); + +- case PA_CONTEXT_UNCONNECTED: +- case PA_CONTEXT_CONNECTING: +- case PA_CONTEXT_AUTHORIZING: +- case PA_CONTEXT_SETTING_NAME: +- break; +- } ++ pa_threaded_mainloop_lock (psink->mainloop); ++ gst_pulsering_destroy_stream (pbuf); ++ pa_threaded_mainloop_unlock (psink->mainloop); ++ ++ return TRUE; + } + +-#if HAVE_PULSE_0_9_12 + static void +-gst_pulsesink_context_subscribe_cb (pa_context * c, +- pa_subscription_event_type_t t, uint32_t idx, void *userdata) ++gst_pulsering_success_cb (pa_stream * s, int success, void *userdata) + { +- GstPulseSink *pulsesink = GST_PULSESINK (userdata); +- +- if (t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_CHANGE) && +- t != (PA_SUBSCRIPTION_EVENT_SINK_INPUT | PA_SUBSCRIPTION_EVENT_NEW)) +- return; +- +- if (!pulsesink->stream) +- return; ++ GstPulseRingBuffer *pbuf; ++ GstPulseSink *psink; + +- if (idx != pa_stream_get_index (pulsesink->stream)) +- return; ++ pbuf = GST_PULSERING_BUFFER_CAST (userdata); ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + +- /* Actually this event is also triggered when other properties of +- * the stream change that are unrelated to the volume. However it is +- * probably cheaper to signal the change here and check for the +- * volume when the GObject property is read instead of querying it always. */ +- +- /* inform streaming thread to notify */ +- g_atomic_int_compare_and_exchange (&pulsesink->notify, 0, 1); ++ pa_threaded_mainloop_signal (psink->mainloop, 0); + } +-#endif + +-static void +-gst_pulsesink_stream_state_cb (pa_stream * s, void *userdata) ++/* update the corked state of a stream, must be called with the mainloop ++ * lock */ ++static gboolean ++gst_pulsering_set_corked (GstPulseRingBuffer * pbuf, gboolean corked, ++ gboolean wait) + { +- GstPulseSink *pulsesink = GST_PULSESINK (userdata); ++ pa_operation *o = NULL; ++ GstPulseSink *psink; ++ gboolean res = FALSE; + +- switch (pa_stream_get_state (s)) { ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + +- case PA_STREAM_READY: +- case PA_STREAM_FAILED: +- case PA_STREAM_TERMINATED: +- pa_threaded_mainloop_signal (pulsesink->mainloop, 0); +- break; ++ GST_DEBUG_OBJECT (psink, "setting corked state to %d", corked); ++ if (pbuf->corked != corked) { ++ if (!(o = pa_stream_cork (pbuf->stream, corked, ++ gst_pulsering_success_cb, pbuf))) ++ goto cork_failed; ++ ++ while (wait && pa_operation_get_state (o) == PA_OPERATION_RUNNING) { ++ pa_threaded_mainloop_wait (psink->mainloop); ++ if (gst_pulsering_is_dead (psink, pbuf)) ++ goto server_dead; ++ } ++ pbuf->corked = corked; ++ } else { ++ GST_DEBUG_OBJECT (psink, "skipping, already in requested state"); ++ } ++ res = TRUE; + +- case PA_STREAM_UNCONNECTED: +- case PA_STREAM_CREATING: +- break; ++cleanup: ++ if (o) ++ pa_operation_unref (o); ++ ++ return res; ++ ++ /* ERRORS */ ++server_dead: ++ { ++ GST_DEBUG_OBJECT (psink, "the server is dead"); ++ goto cleanup; ++ } ++cork_failed: ++ { ++ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ++ ("pa_stream_cork() failed: %s", ++ pa_strerror (pa_context_errno (pbuf->context))), (NULL)); ++ goto cleanup; + } + } + +-static void +-gst_pulsesink_stream_request_cb (pa_stream * s, size_t length, void *userdata) ++/* start/resume playback ASAP, we don't uncork here but in the commit method */ ++static gboolean ++gst_pulseringbuffer_start (GstRingBuffer * buf) + { +- GstPulseSink *pulsesink = GST_PULSESINK (userdata); ++ GstPulseSink *psink; ++ GstPulseRingBuffer *pbuf; ++ ++ pbuf = GST_PULSERING_BUFFER_CAST (buf); ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); ++ ++ pa_threaded_mainloop_lock (psink->mainloop); ++ GST_DEBUG_OBJECT (psink, "starting"); ++ pbuf->paused = FALSE; ++ pa_threaded_mainloop_unlock (psink->mainloop); + +- pa_threaded_mainloop_signal (pulsesink->mainloop, 0); ++ return TRUE; + } + +-static void +-gst_pulsesink_stream_latency_update_cb (pa_stream * s, void *userdata) ++/* pause/stop playback ASAP */ ++static gboolean ++gst_pulseringbuffer_pause (GstRingBuffer * buf) + { +- GstPulseSink *pulsesink = GST_PULSESINK (userdata); ++ GstPulseSink *psink; ++ GstPulseRingBuffer *pbuf; ++ gboolean res; ++ ++ pbuf = GST_PULSERING_BUFFER_CAST (buf); ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); ++ ++ pa_threaded_mainloop_lock (psink->mainloop); ++ GST_DEBUG_OBJECT (psink, "pausing and corking"); ++ /* make sure the commit method stops writing */ ++ pbuf->paused = TRUE; ++ res = gst_pulsering_set_corked (pbuf, TRUE, FALSE); ++ if (pbuf->in_commit) { ++ /* we are waiting in a commit, signal */ ++ GST_DEBUG_OBJECT (psink, "signal commit"); ++ pa_threaded_mainloop_signal (psink->mainloop, 0); ++ } ++ pa_threaded_mainloop_unlock (psink->mainloop); + +- pa_threaded_mainloop_signal (pulsesink->mainloop, 0); ++ return res; + } + ++/* stop playback, we flush everything. */ + static gboolean +-gst_pulsesink_open (GstAudioSink * asink) ++gst_pulseringbuffer_stop (GstRingBuffer * buf) + { +- GstPulseSink *pulsesink = GST_PULSESINK (asink); +- gchar *name = gst_pulse_client_name (); ++ GstPulseSink *psink; ++ GstPulseRingBuffer *pbuf; ++ gboolean res = FALSE; ++ pa_operation *o = NULL; + +- pa_threaded_mainloop_lock (pulsesink->mainloop); ++ pbuf = GST_PULSERING_BUFFER_CAST (buf); ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + +- g_assert (!pulsesink->context); +- g_assert (!pulsesink->stream); ++ pa_threaded_mainloop_lock (psink->mainloop); ++ pbuf->paused = TRUE; ++ res = gst_pulsering_set_corked (pbuf, TRUE, TRUE); ++ /* Inform anyone waiting in _commit() call that it shall wakeup */ ++ if (pbuf->in_commit) { ++ GST_DEBUG_OBJECT (psink, "signal commit thread"); ++ pa_threaded_mainloop_signal (psink->mainloop, 0); ++ } ++ ++ if (strcmp (psink->pa_version, "0.9.12")) { ++ /* then try to flush, it's not fatal when this fails */ ++ GST_DEBUG_OBJECT (psink, "flushing"); ++ if ((o = pa_stream_flush (pbuf->stream, gst_pulsering_success_cb, pbuf))) { ++ while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { ++ GST_DEBUG_OBJECT (psink, "wait for completion"); ++ pa_threaded_mainloop_wait (psink->mainloop); ++ if (gst_pulsering_is_dead (psink, pbuf)) ++ goto server_dead; ++ } ++ GST_DEBUG_OBJECT (psink, "flush completed"); ++ } ++ } ++ res = TRUE; + +- if (!(pulsesink->context = +- pa_context_new (pa_threaded_mainloop_get_api (pulsesink->mainloop), +- name))) { +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, +- ("Failed to create context"), (NULL)); +- goto unlock_and_fail; ++cleanup: ++ if (o) { ++ pa_operation_cancel (o); ++ pa_operation_unref (o); + } ++ pa_threaded_mainloop_unlock (psink->mainloop); + +- pa_context_set_state_callback (pulsesink->context, +- gst_pulsesink_context_state_cb, pulsesink); +-#if HAVE_PULSE_0_9_12 +- pa_context_set_subscribe_callback (pulsesink->context, +- gst_pulsesink_context_subscribe_cb, pulsesink); +-#endif ++ return res; + +- if (pa_context_connect (pulsesink->context, pulsesink->server, 0, NULL) < 0) { +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Failed to connect: %s", +- pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); +- goto unlock_and_fail; +- } ++ /* ERRORS */ ++server_dead: ++ { ++ GST_DEBUG_OBJECT (psink, "the server is dead"); ++ goto cleanup; ++ } ++} ++ ++/* in_samples >= out_samples, rate > 1.0 */ ++#define FWD_UP_SAMPLES(s,se,d,de) \ ++G_STMT_START { \ ++ guint8 *sb = s, *db = d; \ ++ while (s <= se && d < de) { \ ++ memcpy (d, s, bps); \ ++ s += bps; \ ++ *accum += outr; \ ++ if ((*accum << 1) >= inr) { \ ++ *accum -= inr; \ ++ d += bps; \ ++ } \ ++ } \ ++ in_samples -= (s - sb)/bps; \ ++ out_samples -= (d - db)/bps; \ ++ GST_DEBUG ("fwd_up end %d/%d",*accum,*toprocess); \ ++} G_STMT_END ++ ++/* out_samples > in_samples, for rates smaller than 1.0 */ ++#define FWD_DOWN_SAMPLES(s,se,d,de) \ ++G_STMT_START { \ ++ guint8 *sb = s, *db = d; \ ++ while (s <= se && d < de) { \ ++ memcpy (d, s, bps); \ ++ d += bps; \ ++ *accum += inr; \ ++ if ((*accum << 1) >= outr) { \ ++ *accum -= outr; \ ++ s += bps; \ ++ } \ ++ } \ ++ in_samples -= (s - sb)/bps; \ ++ out_samples -= (d - db)/bps; \ ++ GST_DEBUG ("fwd_down end %d/%d",*accum,*toprocess); \ ++} G_STMT_END ++ ++#define REV_UP_SAMPLES(s,se,d,de) \ ++G_STMT_START { \ ++ guint8 *sb = se, *db = d; \ ++ while (s <= se && d < de) { \ ++ memcpy (d, se, bps); \ ++ se -= bps; \ ++ *accum += outr; \ ++ while ((*accum << 1) >= inr) { \ ++ *accum -= inr; \ ++ d += bps; \ ++ } \ ++ } \ ++ in_samples -= (sb - se)/bps; \ ++ out_samples -= (d - db)/bps; \ ++ GST_DEBUG ("rev_up end %d/%d",*accum,*toprocess); \ ++} G_STMT_END ++ ++#define REV_DOWN_SAMPLES(s,se,d,de) \ ++G_STMT_START { \ ++ guint8 *sb = se, *db = d; \ ++ while (s <= se && d < de) { \ ++ memcpy (d, se, bps); \ ++ d += bps; \ ++ *accum += inr; \ ++ while ((*accum << 1) >= outr) { \ ++ *accum -= outr; \ ++ se -= bps; \ ++ } \ ++ } \ ++ in_samples -= (sb - se)/bps; \ ++ out_samples -= (d - db)/bps; \ ++ GST_DEBUG ("rev_down end %d/%d",*accum,*toprocess); \ ++} G_STMT_END + +- for (;;) { +- pa_context_state_t state; + +- state = pa_context_get_state (pulsesink->context); ++/* our custom commit function because we write into the buffer of pulseaudio ++ * instead of keeping our own buffer */ ++static guint ++gst_pulseringbuffer_commit (GstRingBuffer * buf, guint64 * sample, ++ guchar * data, gint in_samples, gint out_samples, gint * accum) ++{ ++ GstPulseSink *psink; ++ GstPulseRingBuffer *pbuf; ++ guint result; ++ guint8 *data_end; ++ gboolean reverse; ++ gint *toprocess; ++ gint inr, outr, bps; ++ gint64 offset; ++ guint bufsize; ++ ++ pbuf = GST_PULSERING_BUFFER_CAST (buf); ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); ++ ++ /* FIXME post message rather than using a signal (as mixer interface) */ ++ if (g_atomic_int_compare_and_exchange (&psink->notify, 1, 0)) ++ g_object_notify (G_OBJECT (psink), "volume"); ++ ++ /* make sure the ringbuffer is started */ ++ if (G_UNLIKELY (g_atomic_int_get (&buf->state) != ++ GST_RING_BUFFER_STATE_STARTED)) { ++ /* see if we are allowed to start it */ ++ if (G_UNLIKELY (g_atomic_int_get (&buf->abidata.ABI.may_start) == FALSE)) ++ goto no_start; ++ ++ GST_DEBUG_OBJECT (buf, "start!"); ++ if (!gst_ring_buffer_start (buf)) ++ goto start_failed; ++ } ++ ++ pa_threaded_mainloop_lock (psink->mainloop); ++ GST_DEBUG_OBJECT (psink, "entering commit"); ++ pbuf->in_commit = TRUE; ++ ++ bps = buf->spec.bytes_per_sample; ++ bufsize = buf->spec.segsize * buf->spec.segtotal; ++ ++ /* our toy resampler for trick modes */ ++ reverse = out_samples < 0; ++ out_samples = ABS (out_samples); ++ ++ if (in_samples >= out_samples) ++ toprocess = &in_samples; ++ else ++ toprocess = &out_samples; ++ ++ inr = in_samples - 1; ++ outr = out_samples - 1; ++ ++ /* data_end points to the last sample we have to write, not past it. This is ++ * needed to properly handle reverse playback: it points to the last sample. */ ++ data_end = data + (bps * inr); ++ ++ if (pbuf->paused) ++ goto was_paused; ++ ++ /* correct for sample offset against the internal clock */ ++ offset = *sample; ++ if (pbuf->offset >= 0) { ++ if (offset > pbuf->offset) ++ offset -= pbuf->offset; ++ else ++ offset = 0; ++ } else { ++ if (offset > -pbuf->offset) ++ offset += pbuf->offset; ++ else ++ offset = 0; ++ } ++ /* offset is in bytes */ ++ offset *= bps; ++ ++ while (*toprocess > 0) { ++ size_t avail; ++ guint towrite; ++ ++ GST_LOG_OBJECT (psink, ++ "need to write %d samples at offset %" G_GINT64_FORMAT, *toprocess, ++ offset); ++ ++ for (;;) { ++ /* FIXME, this is not quite right */ ++ if ((avail = pa_stream_writable_size (pbuf->stream)) == (size_t) - 1) ++ goto writable_size_failed; ++ ++ /* We always try to satisfy a request for data */ ++ GST_LOG_OBJECT (psink, "writable bytes %" G_GSIZE_FORMAT, avail); ++ ++ /* convert to samples, we can only deal with multiples of the ++ * sample size */ ++ avail /= bps; ++ ++ if (avail > 0) ++ break; ++ ++ /* see if we need to uncork because we have no free space */ ++ if (pbuf->corked) { ++ if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE)) ++ goto uncork_failed; ++ } ++ ++ /* we can't write a single byte, wait a bit */ ++ GST_LOG_OBJECT (psink, "waiting for free space"); ++ pa_threaded_mainloop_wait (psink->mainloop); + +- if (!PA_CONTEXT_IS_GOOD (state)) { +- GST_DEBUG_OBJECT (pulsesink, "Context state was not READY. Got: %d", +- state); +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Failed to connect: %s", +- pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); +- goto unlock_and_fail; ++ if (pbuf->paused) ++ goto was_paused; + } + +- if (state == PA_CONTEXT_READY) +- break; ++ if (avail > out_samples) ++ avail = out_samples; + +- /* Wait until the context is ready */ +- pa_threaded_mainloop_wait (pulsesink->mainloop); ++ towrite = avail * bps; ++ ++ GST_LOG_OBJECT (psink, "writing %d samples at offset %" G_GUINT64_FORMAT, ++ avail, offset); ++ ++ if (G_LIKELY (inr == outr && !reverse)) { ++ /* no rate conversion, simply write out the samples */ ++ if (pa_stream_write (pbuf->stream, data, towrite, NULL, offset, ++ PA_SEEK_ABSOLUTE) < 0) ++ goto write_failed; ++ ++ data += towrite; ++ in_samples -= avail; ++ out_samples -= avail; ++ } else { ++ guint8 *dest, *d, *d_end; ++ ++ /* we need to allocate a temporary buffer to resample the data into, ++ * FIXME, we should have a pulseaudio API to allocate this buffer for us ++ * from the shared memory. */ ++ dest = d = g_malloc (towrite); ++ d_end = d + towrite; ++ ++ if (!reverse) { ++ if (inr >= outr) ++ /* forward speed up */ ++ FWD_UP_SAMPLES (data, data_end, d, d_end); ++ else ++ /* forward slow down */ ++ FWD_DOWN_SAMPLES (data, data_end, d, d_end); ++ } else { ++ if (inr >= outr) ++ /* reverse speed up */ ++ REV_UP_SAMPLES (data, data_end, d, d_end); ++ else ++ /* reverse slow down */ ++ REV_DOWN_SAMPLES (data, data_end, d, d_end); ++ } ++ /* see what we have left to write */ ++ towrite = (d - dest); ++ if (pa_stream_write (pbuf->stream, dest, towrite, ++ g_free, offset, PA_SEEK_ABSOLUTE) < 0) ++ goto write_failed; ++ ++ avail = towrite / bps; ++ } ++ *sample += avail; ++ offset += avail * bps; ++ ++ /* check if we need to uncork after writing the samples */ ++ if (pbuf->corked) { ++ const pa_timing_info *info; ++ ++ if ((info = pa_stream_get_timing_info (pbuf->stream))) { ++ GST_LOG_OBJECT (psink, ++ "read_index at %" G_GUINT64_FORMAT ", offset %" G_GINT64_FORMAT, ++ info->read_index, offset); ++ ++ /* we uncork when the read_index is too far behind the offset we need ++ * to write to. */ ++ if (info->read_index + bufsize <= offset) { ++ if (!gst_pulsering_set_corked (pbuf, FALSE, FALSE)) ++ goto uncork_failed; ++ } ++ } else { ++ GST_LOG_OBJECT (psink, "no timing info available yet"); ++ } ++ } + } ++ /* we consumed all samples here */ ++ data = data_end + bps; + +- pa_threaded_mainloop_unlock (pulsesink->mainloop); +- g_free (name); +- return TRUE; ++ pbuf->in_commit = FALSE; ++ pa_threaded_mainloop_unlock (psink->mainloop); ++ ++done: ++ result = inr - ((data_end - data) / bps); ++ GST_LOG_OBJECT (psink, "wrote %d samples", result); + ++ return result; ++ ++ /* ERRORS */ + unlock_and_fail: ++ { ++ pbuf->in_commit = FALSE; ++ GST_LOG_OBJECT (psink, "we are reset"); ++ pa_threaded_mainloop_unlock (psink->mainloop); ++ goto done; ++ } ++no_start: ++ { ++ GST_LOG_OBJECT (psink, "we can not start"); ++ return 0; ++ } ++start_failed: ++ { ++ GST_LOG_OBJECT (psink, "failed to start the ringbuffer"); ++ return 0; ++ } ++uncork_failed: ++ { ++ pbuf->in_commit = FALSE; ++ GST_ERROR_OBJECT (psink, "uncork failed"); ++ pa_threaded_mainloop_unlock (psink->mainloop); ++ goto done; ++ } ++was_paused: ++ { ++ pbuf->in_commit = FALSE; ++ GST_LOG_OBJECT (psink, "we are paused"); ++ pa_threaded_mainloop_unlock (psink->mainloop); ++ goto done; ++ } ++writable_size_failed: ++ { ++ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ++ ("pa_stream_writable_size() failed: %s", ++ pa_strerror (pa_context_errno (pbuf->context))), (NULL)); ++ goto unlock_and_fail; ++ } ++write_failed: ++ { ++ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ++ ("pa_stream_write() failed: %s", ++ pa_strerror (pa_context_errno (pbuf->context))), (NULL)); ++ goto unlock_and_fail; ++ } ++} ++ ++static void gst_pulsesink_set_property (GObject * object, guint prop_id, ++ const GValue * value, GParamSpec * pspec); ++static void gst_pulsesink_get_property (GObject * object, guint prop_id, ++ GValue * value, GParamSpec * pspec); ++static void gst_pulsesink_finalize (GObject * object); + +- gst_pulsesink_destroy_context (pulsesink); ++static gboolean gst_pulsesink_event (GstBaseSink * sink, GstEvent * event); ++ ++static void gst_pulsesink_init_interfaces (GType type); ++ ++#if (G_BYTE_ORDER == G_LITTLE_ENDIAN) ++# define ENDIANNESS "LITTLE_ENDIAN, BIG_ENDIAN" ++#else ++# define ENDIANNESS "BIG_ENDIAN, LITTLE_ENDIAN" ++#endif ++ ++GST_IMPLEMENT_PULSEPROBE_METHODS (GstPulseSink, gst_pulsesink); ++GST_BOILERPLATE_FULL (GstPulseSink, gst_pulsesink, GstBaseAudioSink, ++ GST_TYPE_BASE_AUDIO_SINK, gst_pulsesink_init_interfaces); ++ ++static gboolean ++gst_pulsesink_interface_supported (GstImplementsInterface * ++ iface, GType interface_type) ++{ ++ GstPulseSink *this = GST_PULSESINK_CAST (iface); ++ ++ if (interface_type == GST_TYPE_PROPERTY_PROBE && this->probe) ++ return TRUE; + +- pa_threaded_mainloop_unlock (pulsesink->mainloop); +- g_free (name); + return FALSE; + } + +-static gboolean +-gst_pulsesink_close (GstAudioSink * asink) ++static void ++gst_pulsesink_implements_interface_init (GstImplementsInterfaceClass * klass) + { +- GstPulseSink *pulsesink = GST_PULSESINK (asink); ++ klass->supported = gst_pulsesink_interface_supported; ++} + +- pa_threaded_mainloop_lock (pulsesink->mainloop); +- gst_pulsesink_destroy_context (pulsesink); +- pa_threaded_mainloop_unlock (pulsesink->mainloop); ++static void ++gst_pulsesink_init_interfaces (GType type) ++{ ++ static const GInterfaceInfo implements_iface_info = { ++ (GInterfaceInitFunc) gst_pulsesink_implements_interface_init, ++ NULL, ++ NULL, ++ }; ++ static const GInterfaceInfo probe_iface_info = { ++ (GInterfaceInitFunc) gst_pulsesink_property_probe_interface_init, ++ NULL, ++ NULL, ++ }; + +- return TRUE; ++ g_type_add_interface_static (type, GST_TYPE_IMPLEMENTS_INTERFACE, ++ &implements_iface_info); ++ g_type_add_interface_static (type, GST_TYPE_PROPERTY_PROBE, ++ &probe_iface_info); + } + +-static gboolean +-gst_pulsesink_prepare (GstAudioSink * asink, GstRingBufferSpec * spec) ++static void ++gst_pulsesink_base_init (gpointer g_class) + { +- pa_buffer_attr buf_attr; +- pa_channel_map channel_map; +- GstPulseSink *pulsesink = GST_PULSESINK (asink); +- pa_operation *o = NULL; +- pa_cvolume v; ++ static GstStaticPadTemplate pad_template = GST_STATIC_PAD_TEMPLATE ("sink", ++ GST_PAD_SINK, ++ GST_PAD_ALWAYS, ++ GST_STATIC_CAPS ("audio/x-raw-int, " ++ "endianness = (int) { " ENDIANNESS " }, " ++ "signed = (boolean) TRUE, " ++ "width = (int) 16, " ++ "depth = (int) 16, " ++ "rate = (int) [ 1, MAX ], " ++ "channels = (int) [ 1, 32 ];" ++ "audio/x-raw-float, " ++ "endianness = (int) { " ENDIANNESS " }, " ++ "width = (int) 32, " ++ "rate = (int) [ 1, MAX ], " ++ "channels = (int) [ 1, 32 ];" ++ "audio/x-raw-int, " ++ "endianness = (int) { " ENDIANNESS " }, " ++ "signed = (boolean) TRUE, " ++ "width = (int) 32, " ++ "depth = (int) 32, " ++ "rate = (int) [ 1, MAX ], " ++ "channels = (int) [ 1, 32 ];" ++ "audio/x-raw-int, " ++ "signed = (boolean) FALSE, " ++ "width = (int) 8, " ++ "depth = (int) 8, " ++ "rate = (int) [ 1, MAX ], " ++ "channels = (int) [ 1, 32 ];" ++ "audio/x-alaw, " ++ "rate = (int) [ 1, MAX], " ++ "channels = (int) [ 1, 32 ];" ++ "audio/x-mulaw, " ++ "rate = (int) [ 1, MAX], " "channels = (int) [ 1, 32 ]") ++ ); + +- if (!gst_pulse_fill_sample_spec (spec, &pulsesink->sample_spec)) { +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, SETTINGS, +- ("Invalid sample specification."), (NULL)); +- return FALSE; +- } ++ GstElementClass *element_class = GST_ELEMENT_CLASS (g_class); + +- pa_threaded_mainloop_lock (pulsesink->mainloop); ++ gst_element_class_set_details_simple (element_class, ++ "PulseAudio Audio Sink", ++ "Sink/Audio", "Plays audio to a PulseAudio server", "Lennart Poettering"); ++ gst_element_class_add_pad_template (element_class, ++ gst_static_pad_template_get (&pad_template)); ++} + +- if (!pulsesink->context) { +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Bad context"), (NULL)); +- goto unlock_and_fail; +- } ++static GstRingBuffer * ++gst_pulsesink_create_ringbuffer (GstBaseAudioSink * sink) ++{ ++ GstRingBuffer *buffer; ++ ++ GST_DEBUG_OBJECT (sink, "creating ringbuffer"); ++ buffer = g_object_new (GST_TYPE_PULSERING_BUFFER, NULL); ++ GST_DEBUG_OBJECT (sink, "created ringbuffer @%p", buffer); ++ ++ return buffer; ++} ++ ++static void ++gst_pulsesink_class_init (GstPulseSinkClass * klass) ++{ ++ GObjectClass *gobject_class = G_OBJECT_CLASS (klass); ++ GstBaseSinkClass *gstbasesink_class = GST_BASE_SINK_CLASS (klass); ++ GstBaseSinkClass *bc; ++ GstBaseAudioSinkClass *gstaudiosink_class = GST_BASE_AUDIO_SINK_CLASS (klass); ++ ++ gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_pulsesink_finalize); ++ gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_pulsesink_set_property); ++ gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_pulsesink_get_property); + +- g_assert (!pulsesink->stream); ++ gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_pulsesink_event); ++ ++ /* restore the original basesink pull methods */ ++ bc = g_type_class_peek (GST_TYPE_BASE_SINK); ++ gstbasesink_class->activate_pull = GST_DEBUG_FUNCPTR (bc->activate_pull); ++ ++ gstaudiosink_class->create_ringbuffer = ++ GST_DEBUG_FUNCPTR (gst_pulsesink_create_ringbuffer); ++ ++ /* Overwrite GObject fields */ ++ g_object_class_install_property (gobject_class, ++ PROP_SERVER, ++ g_param_spec_string ("server", "Server", ++ "The PulseAudio server to connect to", NULL, ++ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); ++ g_object_class_install_property (gobject_class, PROP_DEVICE, ++ g_param_spec_string ("device", "Sink", ++ "The PulseAudio sink device to connect to", NULL, ++ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); ++ g_object_class_install_property (gobject_class, ++ PROP_DEVICE_NAME, ++ g_param_spec_string ("device-name", "Device name", ++ "Human-readable name of the sound device", NULL, ++ G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); ++#if HAVE_PULSE_0_9_12 ++ g_object_class_install_property (gobject_class, ++ PROP_VOLUME, ++ g_param_spec_double ("volume", "Volume", ++ "Volume of this stream", 0.0, 1000.0, 1.0, ++ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS)); ++#endif ++} + +- if (!(o = +- pa_context_subscribe (pulsesink->context, +- PA_SUBSCRIPTION_MASK_SINK_INPUT, NULL, NULL))) { +- const gchar *err_str = pulsesink->context ? +- pa_strerror (pa_context_errno (pulsesink->context)) : NULL; ++/* returns the current time of the sink ringbuffer */ ++static GstClockTime ++gst_pulse_sink_get_time (GstClock * clock, GstBaseAudioSink * sink) ++{ ++ GstPulseSink *psink; ++ GstPulseRingBuffer *pbuf; ++ pa_usec_t time; ++ ++ if (sink->ringbuffer == NULL || sink->ringbuffer->spec.rate == 0) ++ return GST_CLOCK_TIME_NONE; ++ ++ pbuf = GST_PULSERING_BUFFER_CAST (sink->ringbuffer); ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); ++ ++ pa_threaded_mainloop_lock (psink->mainloop); ++ if (gst_pulsering_is_dead (psink, pbuf)) ++ goto server_dead; ++ ++ /* if we don't have enough data to get a timestamp, just return NONE, which ++ * will return the last reported time */ ++ if (pa_stream_get_time (pbuf->stream, &time) < 0) { ++ GST_DEBUG_OBJECT (psink, "could not get time"); ++ time = GST_CLOCK_TIME_NONE; ++ } else ++ time *= 1000; ++ pa_threaded_mainloop_unlock (psink->mainloop); ++ ++ GST_LOG_OBJECT (psink, "current time is %" GST_TIME_FORMAT, ++ GST_TIME_ARGS (time)); ++ ++ return time; ++ ++ /* ERRORS */ ++server_dead: ++ { ++ GST_DEBUG_OBJECT (psink, "the server is dead"); ++ pa_threaded_mainloop_unlock (psink->mainloop); + +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, +- ("pa_context_subscribe() failed: %s", err_str), (NULL)); +- goto unlock_and_fail; ++ return GST_CLOCK_TIME_NONE; + } ++} + +- pa_operation_unref (o); +- +- if (!(pulsesink->stream = pa_stream_new (pulsesink->context, +- pulsesink->stream_name ? +- pulsesink->stream_name : "Playback Stream", +- &pulsesink->sample_spec, +- gst_pulse_gst_to_channel_map (&channel_map, spec)))) { +- const gchar *err_str = pulsesink->context ? +- pa_strerror (pa_context_errno (pulsesink->context)) : NULL; ++static void ++gst_pulsesink_init (GstPulseSink * pulsesink, GstPulseSinkClass * klass) ++{ ++ pulsesink->server = NULL; ++ pulsesink->device = NULL; ++ pulsesink->device_description = NULL; + +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, +- ("Failed to create stream: %s", err_str), (NULL)); +- goto unlock_and_fail; +- } ++ pulsesink->volume = 1.0; ++ pulsesink->volume_set = FALSE; + +- pa_stream_set_state_callback (pulsesink->stream, +- gst_pulsesink_stream_state_cb, pulsesink); +- pa_stream_set_write_callback (pulsesink->stream, +- gst_pulsesink_stream_request_cb, pulsesink); +- pa_stream_set_latency_update_callback (pulsesink->stream, +- gst_pulsesink_stream_latency_update_cb, pulsesink); ++ pulsesink->notify = 0; + +- memset (&buf_attr, 0, sizeof (buf_attr)); +- buf_attr.tlength = spec->segtotal * spec->segsize; +- buf_attr.maxlength = buf_attr.tlength * 2; +- buf_attr.prebuf = buf_attr.tlength; +- buf_attr.minreq = spec->segsize; ++ /* needed for conditional execution */ ++ pulsesink->pa_version = pa_get_library_version (); + +- if (pulsesink->volume_set) +- gst_pulse_cvolume_from_linear (&v, pulsesink->sample_spec.channels, +- pulsesink->volume); +- +- if (pa_stream_connect_playback (pulsesink->stream, pulsesink->device, +- &buf_attr, +- PA_STREAM_INTERPOLATE_TIMING | +- PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_NOT_MONOTONOUS | +-#if HAVE_PULSE_0_9_11 +- PA_STREAM_ADJUST_LATENCY | +-#endif +- PA_STREAM_START_CORKED, pulsesink->volume_set ? &v : NULL, NULL) < 0) { ++ g_assert ((pulsesink->mainloop = pa_threaded_mainloop_new ())); ++ g_assert (pa_threaded_mainloop_start (pulsesink->mainloop) == 0); + +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, +- ("Failed to connect stream: %s", +- pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); +- goto unlock_and_fail; +- } ++ /* TRUE for sinks, FALSE for sources */ ++ pulsesink->probe = gst_pulseprobe_new (G_OBJECT (pulsesink), ++ G_OBJECT_GET_CLASS (pulsesink), PROP_DEVICE, pulsesink->device, ++ TRUE, FALSE); ++ ++ /* override with a custom clock */ ++ if (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock) ++ gst_object_unref (GST_BASE_AUDIO_SINK (pulsesink)->provided_clock); ++ GST_BASE_AUDIO_SINK (pulsesink)->provided_clock = ++ gst_audio_clock_new ("GstPulseSinkClock", ++ (GstAudioClockGetTimeFunc) gst_pulse_sink_get_time, pulsesink); ++} + +- for (;;) { +- pa_stream_state_t state; ++static void ++gst_pulsesink_finalize (GObject * object) ++{ ++ GstPulseSink *pulsesink = GST_PULSESINK_CAST (object); + +- state = pa_stream_get_state (pulsesink->stream); ++ pa_threaded_mainloop_stop (pulsesink->mainloop); + +- if (!PA_STREAM_IS_GOOD (state)) { +- GST_DEBUG_OBJECT (pulsesink, "Stream state was not READY. Got: %d", +- state); +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, +- ("Failed to connect stream: %s", +- pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); +- goto unlock_and_fail; +- } ++ g_free (pulsesink->server); ++ g_free (pulsesink->device); + +- if (state == PA_STREAM_READY) +- break; ++ pa_threaded_mainloop_free (pulsesink->mainloop); + +- /* Wait until the stream is ready */ +- pa_threaded_mainloop_wait (pulsesink->mainloop); ++ if (pulsesink->probe) { ++ gst_pulseprobe_free (pulsesink->probe); ++ pulsesink->probe = NULL; + } + +- pa_threaded_mainloop_unlock (pulsesink->mainloop); +- return TRUE; +- +-unlock_and_fail: +- +- gst_pulsesink_destroy_stream (pulsesink); +- +- pa_threaded_mainloop_unlock (pulsesink->mainloop); +- +- return FALSE; +-} +- +-static gboolean +-gst_pulsesink_unprepare (GstAudioSink * asink) +-{ +- GstPulseSink *pulsesink = GST_PULSESINK (asink); +- +- pa_threaded_mainloop_lock (pulsesink->mainloop); +- gst_pulsesink_destroy_stream (pulsesink); +- pa_threaded_mainloop_unlock (pulsesink->mainloop); +- +- return TRUE; ++ G_OBJECT_CLASS (parent_class)->finalize (object); + } + +-static guint +-gst_pulsesink_write (GstAudioSink * asink, gpointer data, guint length) ++#if HAVE_PULSE_0_9_12 ++static void ++gst_pulsesink_set_volume (GstPulseSink * psink, gdouble volume) + { +- GstPulseSink *pulsesink = GST_PULSESINK (asink); +- size_t sum = 0; ++ pa_cvolume v; ++ pa_operation *o = NULL; ++ GstPulseRingBuffer *pbuf; + +- /* FIXME post message rather than using a signal (as mixer interface) */ +- if (g_atomic_int_compare_and_exchange (&pulsesink->notify, 1, 0)) +- g_object_notify (G_OBJECT (pulsesink), "volume"); ++ pa_threaded_mainloop_lock (psink->mainloop); + +- pa_threaded_mainloop_lock (pulsesink->mainloop); ++ GST_DEBUG_OBJECT (psink, "setting volume to %f", volume); + +- pulsesink->in_write = TRUE; ++ psink->volume = volume; ++ psink->volume_set = TRUE; + +- while (length > 0) { +- size_t l; ++ pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); ++ if (pbuf == NULL || pbuf->stream == NULL) ++ goto unlock; + +- for (;;) { +- if (gst_pulsesink_is_dead (pulsesink)) +- goto unlock_and_fail; ++ gst_pulse_cvolume_from_linear (&v, pbuf->sample_spec.channels, volume); + +- if ((l = pa_stream_writable_size (pulsesink->stream)) == (size_t) - 1) { +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, +- ("pa_stream_writable_size() failed: %s", +- pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); +- goto unlock_and_fail; +- } ++ if (!(o = pa_context_set_sink_input_volume (pbuf->context, ++ pa_stream_get_index (pbuf->stream), &v, NULL, NULL))) ++ goto volume_failed; + +- if (l > 0) +- break; ++ /* We don't really care about the result of this call */ ++unlock: + +- if (pulsesink->did_reset) +- goto unlock_and_fail; ++ if (o) ++ pa_operation_unref (o); + +- pa_threaded_mainloop_wait (pulsesink->mainloop); +- } ++ pa_threaded_mainloop_unlock (psink->mainloop); + +- if (l > length) +- l = length; ++ return; + +- if (pa_stream_write (pulsesink->stream, data, l, NULL, 0, +- PA_SEEK_RELATIVE) < 0) { +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, +- ("pa_stream_write() failed: %s", +- pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); +- goto unlock_and_fail; +- } ++ /* ERRORS */ ++volume_failed: ++ { ++ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ++ ("pa_stream_set_sink_input_volume() failed: %s", ++ pa_strerror (pa_context_errno (pbuf->context))), (NULL)); ++ goto unlock; ++ } ++} + +- data = (guint8 *) data + l; +- length -= l; ++static void ++gst_pulsesink_sink_input_info_cb (pa_context * c, const pa_sink_input_info * i, ++ int eol, void *userdata) ++{ ++ GstPulseRingBuffer *pbuf; ++ GstPulseSink *psink; + +- sum += l; +- } ++ pbuf = GST_PULSERING_BUFFER_CAST (userdata); ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); + +- pulsesink->did_reset = FALSE; +- pulsesink->in_write = FALSE; ++ if (!i) ++ goto done; + +- pa_threaded_mainloop_unlock (pulsesink->mainloop); +- return sum; ++ if (!pbuf->stream) ++ goto done; + +-unlock_and_fail: ++ g_assert (i->index == pa_stream_get_index (pbuf->stream)); + +- pulsesink->did_reset = FALSE; +- pulsesink->in_write = FALSE; ++ psink->volume = pa_sw_volume_to_linear (pa_cvolume_max (&i->volume)); + +- pa_threaded_mainloop_unlock (pulsesink->mainloop); +- return (guint) - 1; ++done: ++ pa_threaded_mainloop_signal (psink->mainloop, 0); + } + +-static guint +-gst_pulsesink_delay (GstAudioSink * asink) ++static gdouble ++gst_pulsesink_get_volume (GstPulseSink * psink) + { +- GstPulseSink *pulsesink = GST_PULSESINK (asink); +- pa_usec_t t; +- +- pa_threaded_mainloop_lock (pulsesink->mainloop); +- +- for (;;) { +- if (gst_pulsesink_is_dead (pulsesink)) +- goto unlock_and_fail; ++ GstPulseRingBuffer *pbuf; ++ pa_operation *o = NULL; ++ gdouble v; + +- if (pa_stream_get_latency (pulsesink->stream, &t, NULL) >= 0) +- break; ++ pa_threaded_mainloop_lock (psink->mainloop); + +- if (pa_context_errno (pulsesink->context) != PA_ERR_NODATA) { +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, +- ("pa_stream_get_latency() failed: %s", +- pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); +- goto unlock_and_fail; +- } ++ pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); ++ if (pbuf == NULL || pbuf->stream == NULL) ++ goto no_buffer; ++ ++ if (!(o = pa_context_get_sink_input_info (pbuf->context, ++ pa_stream_get_index (pbuf->stream), ++ gst_pulsesink_sink_input_info_cb, pbuf))) ++ goto info_failed; + +- pa_threaded_mainloop_wait (pulsesink->mainloop); ++ while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { ++ pa_threaded_mainloop_wait (psink->mainloop); ++ if (gst_pulsering_is_dead (psink, pbuf)) ++ goto unlock; + } + +- pa_threaded_mainloop_unlock (pulsesink->mainloop); ++unlock: ++ if (o) ++ pa_operation_unref (o); + +- return gst_util_uint64_scale_int (t, pulsesink->sample_spec.rate, 1000000LL); ++ v = psink->volume; ++ pa_threaded_mainloop_unlock (psink->mainloop); + +-unlock_and_fail: ++ return v; + +- pa_threaded_mainloop_unlock (pulsesink->mainloop); +- return 0; ++ /* ERRORS */ ++no_buffer: ++ { ++ GST_DEBUG_OBJECT (psink, "we have no ringbuffer"); ++ goto unlock; ++ } ++info_failed: ++ { ++ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ++ ("pa_stream_get_sink_input_info() failed: %s", ++ pa_strerror (pa_context_errno (pbuf->context))), (NULL)); ++ goto unlock; ++ } + } ++#endif + + static void +-gst_pulsesink_success_cb (pa_stream * s, int success, void *userdata) ++gst_pulsesink_sink_info_cb (pa_context * c, const pa_sink_info * i, int eol, ++ void *userdata) + { +- GstPulseSink *pulsesink = GST_PULSESINK (userdata); ++ GstPulseRingBuffer *pbuf; ++ GstPulseSink *psink; ++ ++ pbuf = GST_PULSERING_BUFFER_CAST (userdata); ++ psink = GST_PULSESINK_CAST (GST_OBJECT_PARENT (pbuf)); ++ ++ if (!i) ++ goto done; + +- pulsesink->operation_success = !!success; +- pa_threaded_mainloop_signal (pulsesink->mainloop, 0); ++ if (!pbuf->stream) ++ goto done; ++ ++ g_assert (i->index == pa_stream_get_device_index (pbuf->stream)); ++ ++ g_free (psink->device_description); ++ psink->device_description = g_strdup (i->description); ++ ++done: ++ pa_threaded_mainloop_signal (psink->mainloop, 0); + } + +-static void +-gst_pulsesink_reset (GstAudioSink * asink) ++static gchar * ++gst_pulsesink_device_description (GstPulseSink * psink) + { +- GstPulseSink *pulsesink = GST_PULSESINK (asink); ++ GstPulseRingBuffer *pbuf; + pa_operation *o = NULL; ++ gchar *t; + +- pa_threaded_mainloop_lock (pulsesink->mainloop); +- +- if (gst_pulsesink_is_dead (pulsesink)) +- goto unlock_and_fail; ++ pa_threaded_mainloop_lock (psink->mainloop); ++ pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); ++ if (pbuf == NULL || pbuf->stream == NULL) ++ goto no_buffer; ++ ++ if (!(o = pa_context_get_sink_info_by_index (pbuf->context, ++ pa_stream_get_device_index (pbuf->stream), ++ gst_pulsesink_sink_info_cb, pbuf))) ++ goto info_failed; + +- if (!(o = +- pa_stream_flush (pulsesink->stream, gst_pulsesink_success_cb, +- pulsesink))) { +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, +- ("pa_stream_flush() failed: %s", +- pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); +- goto unlock_and_fail; ++ while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { ++ pa_threaded_mainloop_wait (psink->mainloop); ++ if (gst_pulsering_is_dead (psink, pbuf)) ++ goto unlock; + } + +- /* Inform anyone waiting in _write() call that it shall wakeup */ +- if (pulsesink->in_write) { +- pulsesink->did_reset = TRUE; +- pa_threaded_mainloop_signal (pulsesink->mainloop, 0); +- } ++unlock: ++ if (o) ++ pa_operation_unref (o); + +- pulsesink->operation_success = FALSE; +- while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { ++ t = g_strdup (psink->device_description); ++ pa_threaded_mainloop_unlock (psink->mainloop); + +- if (gst_pulsesink_is_dead (pulsesink)) +- goto unlock_and_fail; ++ return t; + +- pa_threaded_mainloop_wait (pulsesink->mainloop); ++ /* ERRORS */ ++no_buffer: ++ { ++ GST_DEBUG_OBJECT (psink, "we have no ringbuffer"); ++ goto unlock; + } +- +- if (!pulsesink->operation_success) { +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, ("Flush failed: %s", +- pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); +- goto unlock_and_fail; ++info_failed: ++ { ++ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ++ ("pa_stream_get_sink_info() failed: %s", ++ pa_strerror (pa_context_errno (pbuf->context))), (NULL)); ++ goto unlock; + } ++} + +-unlock_and_fail: ++static void ++gst_pulsesink_set_property (GObject * object, ++ guint prop_id, const GValue * value, GParamSpec * pspec) ++{ ++ GstPulseSink *pulsesink = GST_PULSESINK_CAST (object); + +- if (o) { +- pa_operation_cancel (o); +- pa_operation_unref (o); ++ switch (prop_id) { ++ case PROP_SERVER: ++ g_free (pulsesink->server); ++ pulsesink->server = g_value_dup_string (value); ++ if (pulsesink->probe) ++ gst_pulseprobe_set_server (pulsesink->probe, pulsesink->server); ++ break; ++ case PROP_DEVICE: ++ g_free (pulsesink->device); ++ pulsesink->device = g_value_dup_string (value); ++ break; ++#if HAVE_PULSE_0_9_12 ++ case PROP_VOLUME: ++ gst_pulsesink_set_volume (pulsesink, g_value_get_double (value)); ++ break; ++#endif ++ default: ++ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); ++ break; + } ++} ++ ++static void ++gst_pulsesink_get_property (GObject * object, ++ guint prop_id, GValue * value, GParamSpec * pspec) ++{ + +- pa_threaded_mainloop_unlock (pulsesink->mainloop); ++ GstPulseSink *pulsesink = GST_PULSESINK_CAST (object); ++ ++ switch (prop_id) { ++ case PROP_SERVER: ++ g_value_set_string (value, pulsesink->server); ++ break; ++ case PROP_DEVICE: ++ g_value_set_string (value, pulsesink->device); ++ break; ++ case PROP_DEVICE_NAME:{ ++ char *t = gst_pulsesink_device_description (pulsesink); ++ g_value_set_string (value, t); ++ g_free (t); ++ break; ++ } ++#if HAVE_PULSE_0_9_12 ++ case PROP_VOLUME: ++ g_value_set_double (value, gst_pulsesink_get_volume (pulsesink)); ++ break; ++#endif ++ default: ++ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); ++ break; ++ } + } + + static void +-gst_pulsesink_change_title (GstPulseSink * pulsesink, const gchar * t) ++gst_pulsesink_change_title (GstPulseSink * psink, const gchar * t) + { + pa_operation *o = NULL; ++ GstPulseRingBuffer *pbuf; + +- pa_threaded_mainloop_lock (pulsesink->mainloop); ++ pa_threaded_mainloop_lock (psink->mainloop); + +- g_free (pulsesink->stream_name); +- pulsesink->stream_name = g_strdup (t); ++ pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); + +- if (gst_pulsesink_is_dead (pulsesink)) +- goto unlock; ++ g_free (pbuf->stream_name); ++ pbuf->stream_name = g_strdup (t); + +- if (!(o = +- pa_stream_set_name (pulsesink->stream, pulsesink->stream_name, NULL, +- NULL))) { +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, +- ("pa_stream_set_name() failed: %s", +- pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); +- goto unlock; +- } ++ if (pbuf == NULL || pbuf->stream == NULL) ++ goto no_buffer; + +- /* We're not interested if this operation failed or not */ ++ if (!(o = pa_stream_set_name (pbuf->stream, pbuf->stream_name, NULL, NULL))) ++ goto name_failed; + ++ /* We're not interested if this operation failed or not */ + unlock: +- + if (o) + pa_operation_unref (o); ++ pa_threaded_mainloop_unlock (psink->mainloop); + +- pa_threaded_mainloop_unlock (pulsesink->mainloop); ++ return; ++ ++ /* ERRORS */ ++no_buffer: ++ { ++ GST_DEBUG_OBJECT (psink, "we have no ringbuffer"); ++ goto unlock; ++ } ++name_failed: ++ { ++ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ++ ("pa_stream_set_name() failed: %s", ++ pa_strerror (pa_context_errno (pbuf->context))), (NULL)); ++ goto unlock; ++ } + } + + #if HAVE_PULSE_0_9_11 + static void +-gst_pulsesink_change_props (GstPulseSink * pulsesink, GstTagList * l) ++gst_pulsesink_change_props (GstPulseSink * psink, GstTagList * l) + { +- + static const gchar *const map[] = { + GST_TAG_TITLE, PA_PROP_MEDIA_TITLE, + GST_TAG_ARTIST, PA_PROP_MEDIA_ARTIST, +@@ -1077,11 +1727,11 @@ gst_pulsesink_change_props (GstPulseSink + /* We might add more here later on ... */ + NULL + }; +- + pa_proplist *pl = NULL; + const gchar *const *t; + gboolean empty = TRUE; + pa_operation *o = NULL; ++ GstPulseRingBuffer *pbuf; + + pl = pa_proplist_new (); + +@@ -1098,44 +1748,53 @@ gst_pulsesink_change_props (GstPulseSink + g_free (n); + } + } +- + if (empty) + goto finish; + +- pa_threaded_mainloop_lock (pulsesink->mainloop); +- +- if (gst_pulsesink_is_dead (pulsesink)) +- goto unlock; +- +- if (!(o = +- pa_stream_proplist_update (pulsesink->stream, PA_UPDATE_REPLACE, pl, +- NULL, NULL))) { +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, +- ("pa_stream_proplist_update() failed: %s", +- pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); +- goto unlock; +- } ++ pa_threaded_mainloop_lock (psink->mainloop); ++ pbuf = GST_PULSERING_BUFFER_CAST (GST_BASE_AUDIO_SINK (psink)->ringbuffer); ++ if (pbuf == NULL || pbuf->stream == NULL) ++ goto no_buffer; ++ ++ if (!(o = pa_stream_proplist_update (pbuf->stream, PA_UPDATE_REPLACE, ++ pl, NULL, NULL))) ++ goto update_failed; + + /* We're not interested if this operation failed or not */ +- + unlock: + + if (o) + pa_operation_unref (o); + +- pa_threaded_mainloop_unlock (pulsesink->mainloop); ++ pa_threaded_mainloop_unlock (psink->mainloop); + + finish: + + if (pl) + pa_proplist_free (pl); ++ ++ return; ++ ++ /* ERRORS */ ++no_buffer: ++ { ++ GST_DEBUG_OBJECT (psink, "we have no ringbuffer"); ++ goto unlock; ++ } ++update_failed: ++ { ++ GST_ELEMENT_ERROR (psink, RESOURCE, FAILED, ++ ("pa_stream_proplist_update() failed: %s", ++ pa_strerror (pa_context_errno (pbuf->context))), (NULL)); ++ goto unlock; ++ } + } + #endif + + static gboolean + gst_pulsesink_event (GstBaseSink * sink, GstEvent * event) + { +- GstPulseSink *pulsesink = GST_PULSESINK (sink); ++ GstPulseSink *pulsesink = GST_PULSESINK_CAST (sink); + + switch (GST_EVENT_TYPE (event)) { + case GST_EVENT_TAG:{ +@@ -1182,62 +1841,3 @@ gst_pulsesink_event (GstBaseSink * sink, + + return GST_BASE_SINK_CLASS (parent_class)->event (sink, event); + } +- +-static void +-gst_pulsesink_pause (GstPulseSink * pulsesink, gboolean b) +-{ +- pa_operation *o = NULL; +- +- pa_threaded_mainloop_lock (pulsesink->mainloop); +- +- if (gst_pulsesink_is_dead (pulsesink)) +- goto unlock; +- +- if (!(o = pa_stream_cork (pulsesink->stream, b, NULL, NULL))) { +- +- GST_ELEMENT_ERROR (pulsesink, RESOURCE, FAILED, +- ("pa_stream_cork() failed: %s", +- pa_strerror (pa_context_errno (pulsesink->context))), (NULL)); +- goto unlock; +- } +- +- while (pa_operation_get_state (o) == PA_OPERATION_RUNNING) { +- +- if (gst_pulsesink_is_dead (pulsesink)) +- goto unlock; +- +- pa_threaded_mainloop_wait (pulsesink->mainloop); +- } +- +-unlock: +- +- if (o) +- pa_operation_unref (o); +- +- pa_threaded_mainloop_unlock (pulsesink->mainloop); +-} +- +- +-static GstStateChangeReturn +-gst_pulsesink_change_state (GstElement * element, GstStateChange transition) +-{ +- GstPulseSink *this = GST_PULSESINK (element); +- +- switch (transition) { +- +- case GST_STATE_CHANGE_PAUSED_TO_PLAYING: +- case GST_STATE_CHANGE_PLAYING_TO_PAUSED: +- +- gst_pulsesink_pause (this, +- GST_STATE_TRANSITION_NEXT (transition) == GST_STATE_PAUSED); +- break; +- +- default: +- ; +- } +- +- if (GST_ELEMENT_CLASS (parent_class)->change_state) +- return GST_ELEMENT_CLASS (parent_class)->change_state (element, transition); +- +- return GST_STATE_CHANGE_SUCCESS; +-} +diff -upr gst-plugins-good-0.10.14.old/ext/pulse/pulsesink.h gst-plugins-good-0.10.14/ext/pulse/pulsesink.h +--- gst-plugins-good-0.10.14.old/ext/pulse/pulsesink.h 2009-02-15 13:40:25.000000000 +0000 ++++ gst-plugins-good-0.10.14/ext/pulse/pulsesink.h 2009-05-14 10:49:35.000000000 +0100 +@@ -42,38 +42,33 @@ G_BEGIN_DECLS + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_PULSESINK)) + #define GST_IS_PULSESINK_CLASS(obj) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_PULSESINK)) ++#define GST_PULSESINK_CAST(obj) \ ++ ((GstPulseSink *)(obj)) + + typedef struct _GstPulseSink GstPulseSink; + typedef struct _GstPulseSinkClass GstPulseSinkClass; + + struct _GstPulseSink + { +- GstAudioSink sink; ++ GstBaseAudioSink sink; + + gchar *server, *device, *stream_name; ++ gchar *device_description; + + pa_threaded_mainloop *mainloop; + +- pa_context *context; +- pa_stream *stream; +- +- pa_sample_spec sample_spec; +- + GstPulseProbe *probe; + + gdouble volume; + gboolean volume_set; +- +- gchar *device_description; +- +- gboolean operation_success; +- gboolean did_reset, in_write; + gint notify; ++ ++ const gchar *pa_version; + }; + + struct _GstPulseSinkClass + { +- GstAudioSinkClass parent_class; ++ GstBaseAudioSinkClass parent_class; + }; + + GType gst_pulsesink_get_type (void); +diff -upr gst-plugins-good-0.10.14.old/ext/pulse/pulseutil.c gst-plugins-good-0.10.14/ext/pulse/pulseutil.c +--- gst-plugins-good-0.10.14.old/ext/pulse/pulseutil.c 2009-02-06 11:34:56.000000000 +0000 ++++ gst-plugins-good-0.10.14/ext/pulse/pulseutil.c 2009-05-14 10:49:35.000000000 +0100 +@@ -123,7 +123,6 @@ gst_pulse_gst_to_channel_map (pa_channel + const GstRingBufferSpec * spec) + { + int i; +- + GstAudioChannelPosition *pos; + + pa_channel_map_init (map); +@@ -131,14 +130,10 @@ gst_pulse_gst_to_channel_map (pa_channel + if (!(pos = + gst_audio_get_channel_positions (gst_caps_get_structure (spec->caps, + 0)))) { +-/* g_debug("%s: No channel positions!\n", G_STRFUNC); */ + return NULL; + } + +-/* g_debug("%s: Got channel positions:\n", G_STRFUNC); */ +- + for (i = 0; i < spec->channels; i++) { +- + if (pos[i] == GST_AUDIO_CHANNEL_POSITION_NONE) { + /* no valid mappings for these channels */ + g_free (pos); +@@ -147,15 +142,12 @@ gst_pulse_gst_to_channel_map (pa_channel + map->map[i] = gst_pos_to_pa[pos[i]]; + else + map->map[i] = PA_CHANNEL_POSITION_INVALID; +- +- /*g_debug(" channel %d: gst: %d pulse: %d\n", i, pos[i], map->map[i]); */ + } + + g_free (pos); + map->channels = spec->channels; + + if (!pa_channel_map_valid (map)) { +-/* g_debug("generated invalid map!\n"); */ + return NULL; + } +