2024-05-03 02:35 AM
Hi,
I want to connect the microphone feature to my raspberry pi so that I can extract the data and put it in databases. Only when I connect my SensorTile.box (STEVAL-MKSBOX1V1) with my Raspberry pi 3 Model B+ I can see the standard features but not the microphone feature. I flashed the FP-SNS-ALLMEMS1 firmware to my sensor and followed the Getting started with the BlueST protocol and SDK documentation to write my code.
Here's the output I get:
guest@raspberrypi:~ $ sudo python3 mic_test.py
Creating Bluetooth Manager
done
Scanning Bluetooth devices...
Discovery started.
New device discovered: AM1V410.
Discovery stopped.
[<blue_st_sdk.node.Node object at 0x7639d370>]
Connecting to device: AM1V410 (ea:eb:d5:a3:cf:fd)
Device AM1V410 connected.
Available features:
Gesture
Activity Recognition
Temperature
Temperature
Humidity
Pressure
Magnetometer
Gyroscope
Accelerometer
name 'SCANNING_TIME_s' is not defined
Available features:
Gesture
Activity Recognition
Temperature
Temperature
Humidity
Pressure
Magnetometer
Gyroscope
Accelerometer
Microphone not in features
Listening to Temperature feature...
Temperature(1504): 25.2 C
Temperature(1540): 25.2 C
Temperature(1576): 25.2 C
Temperature(1612): 25.1 C
Temperature(1648): 25.1 C
Temperature(1684): 25.1 C
Temperature(1720): 25.1 C
Temperature(1756): 25.1 C
Temperature(1792): 25.1 C
Temperature(1828): 25.1 C
Temperature(1864): 25.1 C
Temperature(1900): 25.1 C
Temperature(1936): 25.1 C
Temperature(1972): 25.1 C
Temperature(2008): 25.2 C
Shutting down...
As you can see it can't find the microphone feature but it can connect with other features like temperature.
Here's the programs to get the microphone feature:
Here's the first program:
from blue_st_sdk.feature import Feature
from blue_st_sdk.feature import Sample
from blue_st_sdk.feature import ExtractedData
from blue_st_sdk.features.field import Field
from blue_st_sdk.features.field im port FieldType
from blue_st_sdk.utils.number_conversion import LittleEndian
from blue_st_sdk.utils.blue_st_exceptions import BlueSTInvalidOperationException
from blue_st_sdk.utils.blue_st_exceptions import BlueSTInvalidDataException
import sys, traceback
class FeatureMicrophone(Feature):
"""The feature handles the data coming from a microphone.
Data is two bytes long and has one decimal value.
"""
FEATURE_NAME = "Microphone"
FEATURE_UNIT = "dB"
FEATURE_DATA_NAME = "Microphone"
DATA_MAX = 130 # Acoustic overload point of the microphone
DATA_MIN = 0
FEATURE_FIELDS = Field(
FEATURE_DATA_NAME,
FEATURE_UNIT,
FieldType.UInt8,
DATA_MAX,
DATA_MIN)
DATA_LENGTH_BYTES = 1
SCALE_FACTOR = 1.0
def __init__(self, node):
"""Constructor.
Args:
node (:class:`blue_st_sdk.node.Node`): Node that will send data to
this feature.
"""
super(FeatureMicrophone, self).__init__(self.FEATURE_NAME, node, [self.FEATURE_FIELDS])
def extract_data(self, timestamp, data, offset):
"""Extract the data from the feature's raw data.
Args:
timestamp (int): Data's timestamp.
data (str): The data read from the feature.
offset (int): Offset where to start reading data.
Returns:
:class:`blue_st_sdk.feature.ExtractedData`: Container of the number
of bytes read and the extracted data.
Raises:
:exc:`blue_st_sdk.utils.blue_st_exceptions.BlueSTInvalidDataException`
if the data array has not enough data to read.
"""
if len(data) - offset < self.DATA_LENGTH_BYTES:
raise BlueSTInvalidDataException(
'There are no %d bytes available to read.' \
% (self.DATA_LENGTH_BYTES))
sample = Sample(
[data[offset] / self.SCALE_FACTOR],
self.get_fields_description(),
timestamp)
return ExtractedData(sample, self.DATA_LENGTH_BYTES)
@classmethod
def get_mic_level(self, sample):
"""Get the mic level value from a sample.
Args:
sample (:class:`blue_st_sdk.feature.Sample`): Sample data.
Returns:
float: The mic level value if the data array is valid, <nan>
otherwise.
"""
if sample is not None:
if sample._data:
if sample._data[0] is not None:
return float(sample._data[0])
return float('nan')
def read_mic_level(self):
"""Read the mic level value.
Returns:
float: The mic level value if the read operation is successful,
<nan> otherwise.
Raises:
:exc:`blue_st_sdk.utils.blue_st_exceptions.BlueSTInvalidOperationException`
is raised if the feature is not enabled or the operation
required is not supported.
:exc:`blue_st_sdk.utils.blue_st_exceptions.BlueSTInvalidDataException`
if the data array has not enough data to read.
"""
try:
self._read_data()
return FeatureMicrophone.get_mic_level(self._get_sample())
except (BlueSTInvalidOperationException, BlueSTInvalidDataException) as e:
raise e
Here's the second program:
import os
import sys
import time
from blue_st_sdk.manager import Manager, ManagerListener
from blue_st_sdk.node import NodeListener
from blue_st_sdk.feature import FeatureListener
from blue_st_sdk.features.audio.adpcm.feature_audio_adpcm import FeatureAudioADPCM
from blue_st_sdk.features.audio.adpcm.feature_audio_adpcm_sync import FeatureAudioADPCMSync
from blue_st_sdk.utils.uuid_to_feature_map import UUIDToFeatureMap
from blue_st_sdk.utils.ble_node_definitions import FeatureCharacteristic
from feature_microphone import FeatureMicrophone
SCANNING_TIME_s = 5
MAX_NOTIFICATIONS = 15
DEVICE_NAME = "AM1V410"
class MyManagerListener(ManagerListener):
#
# This method is called whenever a discovery process starts or stops.
#
# @PAram manager Manager instance that starts/stops the process.
# @PAram enabled True if a new discovery starts, False otherwise.
#
def on_discovery_change(self, manager, enabled):
print('Discovery %s.' % ('started' if enabled else 'stopped'))
if not enabled:
print()
#
# This method is called whenever a new node is discovered.
#
# @PAram manager Manager instance that discovers the node.
# @PAram node New node discovered.
#
def on_node_discovered(self, manager, node):
print('New device discovered: %s.' % (node.get_name()))
class MyNodeListener(NodeListener):
#
# To be called whenever a node connects to a host.
#
# @PAram node Node that has connected to a host.
#
def on_connect(self, node):
print('Device %s connected.' % (node.get_name()))
#
# To be called whenever a node disconnects from a host.
#
# @PAram node Node that has disconnected from a host.
# @PAram unexpected True if the disconnection is unexpected, False otherwise
# (called by the user).
#
def on_disconnect(self, node, unexpected=False):
print('Device %s disconnected%s.' % \
(node.get_name(), ' unexpectedly' if unexpected else ''))
if unexpected:
# Exiting.
print('\nExiting...\n')
sys.exit(0)
class MyFeatureListener(FeatureListener):
_notifications = 0
"""Counting notifications to print only the desired ones."""
#
# To be called whenever the feature updates its data.
#
# @PAram feature Feature that has updated.
# @PAram sample Data extracted from the feature.
#
def on_update(self, feature, sample):
if self._notifications < MAX_NOTIFICATIONS:
self._notifications += 1
print(feature)
def main():
try:
# Creating Bluetooth Manager.
print('Creating Bluetooth Manager')
manager = Manager.instance()
manager_listener = MyManagerListener()
manager.add_listener(manager_listener)
print('done')
# Synchronous discovery of Bluetooth devices.
print('Scanning Bluetooth devices...\n')
manager.discover(SCANNING_TIME_s)
# Getting discovered devices.
discovered_devices = manager.get_nodes()
print(discovered_devices)
if not discovered_devices:
raise Exception("No devices found.")
# Find the correct device name
filtered_devices = list(filter(lambda x: x.get_name() == DEVICE_NAME, discovered_devices))
# Only have the one device right now
device = filtered_devices[0]
print("Connecting to device: {} ({})\n".format(device.get_name(), device.get_tag()))
node_listener = MyNodeListener()
device.add_listener(node_listener)
if not device.connect():
print('Connection failed.\n')
raise Exception("Failed to connect to device {} ({}).".format(device.get_name(), device.get_tag()))
# Getting features.
features = device.get_features()
print("Available features:")
for each in features:
print(each.get_name())
# Append custom mic level feature into the BlueST library before discovery
mask_to_features_dic = FeatureCharacteristic.SENSOR_TILE_BOX_MASK_TO_FEATURE_DIC
mask_to_features_dic [0x04000000] = FeatureMicrophone
#try:
Manager.add_features_to_node(0x06, mask_to_features_dic) #0x06
#except Exception as e:
# print('error adding feature')
# print(e)
# Getting features.
features = device.get_features()
print("Available features:")
for each in features:
print(each.get_name())
# Get only the mic feature
filtered_features = list(filter(lambda x: x.get_name() == "Microphone", features))
if len(filtered_features)==0:
print('Microphone not in features')
filtered_features = list(filter(lambda x: x.get_name() == "Temperature", features))
feature = filtered_features[0]
print("Listening to {} feature...".format(feature.get_name()))
# Enabling notifications.
feature_listener = MyFeatureListener()
feature.add_listener(feature_listener)
device.enable_notifications(feature)
# Handling audio case (both audio features have to be enabled).
if isinstance(feature, FeatureAudioADPCM):
audio_sync_feature_listener = MyFeatureListener()
audio_sync_feature.add_listener(audio_sync_feature_listener)
device.enable_notifications(audio_sync_feature)
elif isinstance(feature, FeatureAudioADPCMSync):
audio_feature_listener = MyFeatureListener()
audio_feature.add_listener(audio_feature_listener)
device.enable_notifications(audio_feature)
# Getting notifications.
notifications = 0
start_time = time.time()
while notifications < MAX_NOTIFICATIONS:
if device.wait_for_notifications(0.05):
start_time = time.time()
notifications += 1
if time.time() > start_time + 10:
print("Timed out waiting for notifications.")
break
print("Shutting down...")
# Disabling notifications.
device.disable_notifications(feature)
feature.remove_listener(feature_listener)
# Handling audio case (both audio features have to be disabled).
if isinstance(feature, FeatureAudioADPCM):
device.disable_notifications(audio_sync_feature)
audio_sync_feature.remove_listener(audio_sync_feature_listener)
elif isinstance(feature, FeatureAudioADPCMSync):
device.disable_notifications(audio_feature)
audio_feature.remove_listener(audio_feature_listener)
# Shut everything down
device.remove_listener(node_listener)
device.disconnect()
manager.remove_listener(manager_listener)
except KeyboardInterrupt:
print("Program killed. Exiting...")
device.disable_notifications(feature)
feature.remove_listener(feature_listener)
device.remove_listener(node_listener)
device.disconnect()
manager.remove_listener(manager_listener)
sys.exit(0)
except SystemExit:
os._exit(0)
if __name__ == "__main__":
main()
2024-05-13 05:45 AM
Hi @Milan127 ,
Try updating to version FP-SNS-ALLMEMS1 4.3.0, I have tried it and have no problems exporting via BLE.
You can try using a BLE Sniffer to rule out Python-related problems.