Spark Connector Connect to SQL Server

This post is how to use the Spark Connector to Connect to SQL Server.

Install Spark Connector

  1. spark-mssql-connector_2.12-1.2.0.jar

Install msal

  1. pip install msal

Connect using Azure SPN

  1. import msal
  2. global_token_cache = msal.TokenCache()
  3.  
  4. secret = "<GET SECRET SECURELY>"
  5.  
  6. global_spn_app = msal.ConfidentialClientApplication(
  7. <CLIENT_ID>, Authority='https://login.microsoftonline.com/<TENANT_ID>',
  8. client_credential=secret,
  9. token_cache=global_token_cache,
  10. )
  11.  
  12. result = global_spn_app.acquire_token_for_client(scopes=['https://database.windows.net//.default'])
  13.  
  14. jdbc_df = spark.read \
  15. .format("com.microsoft.sqlserver.jdbc.spark") \
  16. .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
  17. .option("query", "SELECT * FROM SOMETHING") \
  18. .option("accessToken", result['access_token']) \
  19. .option("encrypt", "true") \
  20. .option("hostNameInCertificate", "*.database.windows.net") \
  21. .load()

Connect using Domain Auth

  1. secret = "<GET SECRET SECURELY>"
  2.  
  3. jdbc_df = spark.read \
  4. .format("com.microsoft.sqlserver.jdbc.spark") \
  5. .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
  6. .option("query", "SELECT * FROM SOMETHING") \
  7. .option("authentication", "ActiveDirectoryPassword") \
  8. .option("user", "<USER>@<DOMAIN>") \
  9. .option("password", "<SECRET>") \
  10. .load()

Connect using SQL Auth

I do not recommend SQL Auth

  1. secret = "<GET SECRET SECURELY>"
  2.  
  3. jdbc_df = spark.read \
  4. .format("com.microsoft.sqlserver.jdbc.spark") \
  5. .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
  6. .option("query", "SELECT * FROM SOMETHING") \
  7. .option("user", "<USER>") \
  8. .option("password", "<SECRET>") \
  9. .load()

 

 

 

Python: pyodbc with SQL Server

This post is in regards to connecting to SQL Server using pyodbc.

Install package

  1. pip install pyodbc

If you are running in Databricks then the current driver will be “{ODBC Driver 17 for SQL Server}”.

If you are running in Synapse then the current driver will be “{ODBC Driver 18 for SQL Server}”.

Check pyodbc Version

  1. import pyodbc
  2. pyodbc.drivers()

Check Which Version of pyodbc in Databricks

  1. %sh
  2. cat /etc/odbcinst.ini

Install Databricks driver 17

  1. curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
  2. curl https://packages.microsoft.com/config/ubuntu/20.04/prod.list > /etc/apt/sources.list.d/mssql-release.list
  3. apt-get update
  4. ACCEPT_EULA=Y apt-get install msodbcsql17
  5. apt-get -y install unixodbc-dev

Connect using SQL Auth

I do not recommend SQL Auth

  1. import pyodbc
  2.  
  3. secret = "<GET SECRET SECURELY>"
  4.  
  5. connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};Server=tcp:<SERVER_NAME>;PORT=<PORT>;Database=<DATABASE>;Uid=<USER>;Pwd=<SECRET>;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=<TIMEOUT>;')

Connect Using Domain Auth

  1. import pyodbc
  2.  
  3. secret = "<GET SECRET SECURELY>"
  4.  
  5. connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};Server=tcp:<SERVER_NAME>;PORT=<PORT>;Database=<DATABASE>;Uid=<USER>;Pwd=<SECRET>;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=<TIMEOUT>;Authentication=ActiveDirectoryPassword')

Connect using Azure SPN

  1. pip install msal
  1. import struct
  2. import msal
  3.  
  4. global_token_cache = msal.TokenCache()
  5. secret = "<GET SECRET SECURELY>"
  6.  
  7. global_spn_app = msal.ConfidentialClientApplication(
  8. <CLIENT_ID>, Authority='https://login.microsoftonline.com/<TENANT_ID>',
  9. client_credential=secret,
  10. token_cache=global_token_cache,
  11. )
  12.  
  13. result = global_spn_app.acquire_token_for_client(scopes=['https://database.windows.net//.default'])
  14. SQL_COPT_SS_ACCESS_TOKEN = 1256
  15.  
  16. token = bytes(result['access_token'], 'utf-8')
  17. exptoken = b"";
  18.  
  19. for i in token:
  20. exptoken += bytes({i});
  21. exptoken += bytes(1);
  22.  
  23. token_struct = struct.pack("=i", len(exptoken)) + exptoken;
  24.  
  25. connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};Server=tcp:<SERVER_NAME>;PORT=<PORT>;Database=<DATABASE>;Uid=<USER>;Pwd=<SECRET>;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=<TIMEOUT>;' attrs_before = { SQL_COPT_SS_ACCESS_TOKEN:tokenstruct })

Once you have the connection you can setup the cursor.

  1. cursor = connection.cursor()

Then execute a command

  1. command = "<COMMAND>"
  2. params = ()
  3. cursor.execute(command, params)
  4. connection.commit()

After you Are finish Close

  1. cursor.close()
  2. connection.close()

 

Python: Arguments

This post is in how do use argparse package.

First you must import the package.

  1. import argparse

Next you setup the argument parser.

  1. parser = argparse.ArgumentParser()

Then you create a list of arguments. See the link above for more options then the below set.

  1. argument_list = [
  2. { "name": "<NAME>", "help": "<HELP_TEXT>", "type": "<TYPE>", "required": True}
  3. ]

Then we take your argument_list and create arguments and assign them to the parser.

  1. for arg in argument_list:
  2. parser.add_argument("--{}".format(arg["name"], help=arg["help"], type=arg["type"], required=arg["required"])

Then we parse the args from “sys.argv”. Parsing args this way means that if anything is unknown to your program than your program won’t fail but instead it will set those variables to the unknown variable and continue your application.

  1. args, unknown = parser.parse_known_args()

You could also parse the args from “sys.argv” this way. However that means that all the args passed to sys.argv must be known otherwise it will fail.

  1. args = parser.parse_args()

Then as a final step we set the values with their key to the config.

  1. config = vars(args)

 

 

 

Python: lxml

This post focus’ on the lxml package.

First you need to install the package

  1. from lxml import etree

Create xml object by string

  1. xml_str = "<root><subitem attr='test'>rec</subitem></root>"
  2. root = etree.fromstring(xml_str)

Get text in node

  1. text_str = root.xpath('//root/subitem/text()')[0]

Get Attribute

  1. attr = root.xpath('//root/subitem')[0].attrib['attr']

 

Python: Create a Logger

This post is how-to create a logger.

First we need to import

  1. import sys
  2. import logging
  3. from datetime import datetime
  4. from pytz import timezone

Then we create a class for Formatter

  1. class CustomFormatter(logging.Formatter):
  2. grey = "\x1b[38;20m"
  3. reset = "\x1b[0m"
  4. format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:)"
  5. FORMATS = {
  6. logging.DEBUG: '\x1b[38;5;23m' + format + reset,
  7. logging.INFO: grey + format + reset,
  8. logging.WARNING: '\x1b[38;5;56m' + format + reset,
  9. logging.ERROR: '\x1b[38;5;197m' + format + reset,
  10. logging.CRITICAL: '\x1b[38;5;1m' + format +reset
  11. }
  12.  
  13. def format(self, record):
  14. log_fmt = self.FORMATS.get(record.levelno)
  15. formatter = logging.Formatter(log_fmt)
  16. return formatter.format(record)

Then we create a function set our logger up.

  1. def set_logger(logging_level, name, log_dir, timezone):
  2. LOGGING_LEVELS = ['WARNING','INFO','DEBUG','ERROR']
  3. if logging_level not in LOGGING_LEVELS:
  4. logging_level = 'INFO'
  5.  
  6. level_lookup = {
  7. 'WARNING': logging.WARNING,
  8. 'INFO': logging.INFO,
  9. 'DEBUG': logging.DEBUG,
  10. 'ERROR': logging.ERROR,
  11. }
  12. logging.Formatter.converter = lambda *args: datetime.now(tz=timezone(timezone)).timetuple()
  13. logging.basicConfig(level=level_lookup[logging_level], format="[%(levelname)s] %(asctime)s - %(message)s:%(lineno)d")
  14. stream_handler = logging.StreamHandler(sys.stdout)
  15. stream_handler.setFormatter(CustomFormatter())
  16. logger = logging.getLogger(name)
  17. logger.addHandler(stream_handler)
  18. logger.setLevel(logging_level)
  19.  
  20. Path(log_dir).mkdir(parents=True, exist_ok=True)
  21.  
  22. now = datetime.now(tz=timezone(timezone))
  23. now = now.strftime("%H-%M-%S")
  24.  
  25. log_file = '%slog_%s.log' % (log_dir, now)
  26. file_handler = logging.FileHandler(log_file, mode='a')
  27. file_handler.setFormatter(logging.Formatter("[%(levelname)s] %(asctime)s - %(message)s:%(lineno)d"))
  28. logger.addHandler(file_handler)
  29.  
  30. return logger

References

https://alexandra-zaharia.github.io/posts/make-your-own-custom-color-formatter-with-python-logging/

Python: Unit Testing

This post focus’ on common hurdles when trying to do unit testing.

Testing Values During Run

You add the following line to anywhere you want to pause the unit test to check values.

  1. import pdb
  2. pdb.set_trace()

How to Patch a Function

  1. from unittest.mock import path
  2.  
  3. @patch('src.path.to.file.my_function')
  4. @path('src.path.to.file.my_function_add')
  5. def test_some_function(mock_my_function_add, mock_my_function):
  6. mock_function_add.return_value = <something>
  7. .......

How to Patch a Function With No Return Value

  1. from unittest.mock import patch
  2.  
  3. def test_some_function():
  4. with(patch('src.path.to.file.my_function'):
  5. ...

How to Patch a Function With 1 Return Value

  1. from unittest.mock import patch
  2.  
  3. def test_some_function():
  4. with(patch('src.path.to.file.my_function', MagicMock(return_value=[<MY_VALUES>])):
  5. ...

How to Patch a Function With Multiple Return Value

  1. from unittest.mock import patch
  2.  
  3. def test_some_function():
  4. with(patch('src.path.to.file.my_function', MagicMock(side-effect=[[<MY_VALUES>], [<OTHER_VALUES>]])):
  5. ...

How to Create a Test Module

  1. from unittest import TestCase
  2.  
  3. class MyModule(TestCase):
  4. def setUp(self):
  5. some_class.my_variable = <something>
  6. ... DO OTHER STUFF
  7. def test_my_function(self):
  8. ... DO Function Test Stuff

How to Patch a Method

  1. patch_methods = [
  2. "pyodbc.connect"
  3. ]
  4.  
  5. for method in patch_methods:
  6. patch(method).start()

How to create a PySpark Session

Now once you do this you can just call spark and it will set it.

  1. import pytest
  2. from pyspark.sql import SparkSession
  3.  
  4. @pytest.fixture(scope='module')
  5. def spark():
  6. return (SparkSession.builder.appName('pyspark_test').getOrCreate())

How to Create a Spark SQL Example

  1. import pytest
  2. from pyspark.sql import SparkSession, Row
  3. from pyspark.sql.types import StructType, StructField, StringType
  4.  
  5. @pytest.fixture(scope='module')
  6. def spark():
  7. return (SparkSession.builder.appName('pyspark_test').getOrCreate())
  8.  
  9. def test_function(spark):
  10. query = 'SELECT * FROM SOMETHING'
  11. schema = StructType([
  12. StructField('column_a', StringType()),
  13. StructField('column_b', StringType()),
  14. StructField('column_c', StringType()),
  15. ])
  16.  
  17. data = [Row(column_a='a', column_b='b', column_c='c')]
  18. table = spark.createDataFrame(data, schema=schema)
  19. table.createOrReplaceTempView('<table_name>')
  20. df = spark.sql(query).toPandas()
  21.  
  22. assert not df.empty
  23. assert df.shape[0] == 1
  24. assert df.shape(1) == 5
  25.  
  26. spark.catalog.dropTempView('<table_name>')

How to Mock a Database Call

First let’s assume you have an exeucte sql function

  1. def execute_sql(cursor, sql, params):
  2. result = cursor.execute(sql, params).fetchone()
  3. connection.commit()
  4. return result

Next in your unit tests you want to test that funciton

  1. def test_execute_sql():
  2. val = <YOUR_RETURN_VALUE>
  3. with patch('path.to.code.execute_sql', MagicMock(return_value=val)) as mock_execute:
  4. return_val = some_other_function_that_calls_execute_sql(....)
  5. assert return_val == val

If you need to close a cursor or DB connection

  1. def test_execute_sql():
  2. val = <YOUR_RETURN_VALUE>
  3. mock_cursor = MagicMock()
  4. mock_cursor.configure_mock(
  5. **{
  6. "close": MagicMock()
  7. }
  8. )
  9. mock_connection = MagicMock()
  10. mock_connection.configure_mock(
  11. **{
  12. "close": MagicMock()
  13. }
  14. )
  15.  
  16. with patch('path.to.code.cursor', MagicMock(return_value=mock_cursor)) as mock_cursor_close:
  17. with patch('path.to.code.connection', MagicMock(return_value=mock_connection)) as mock_connection_close:
  18. return_val = some_other_function_that_calls_execute_sql(....)
  19. assert return_val == val

How to Mock Open a File Example 1

  1. @patch('builtins.open", new_callable=mock_open, read_data='my_data')
  2. def test_file_open(mock_file):
  3. assert open("my/file/path/filename.extension").read() == 'my_data'
  4. mock_file.assert_called_with("my/file/path/filename.extension")
  5.  
  6. val = function_to_test(....)
  7. assert 'my_data' == val

How to Mock Open a File Example 2

  1. def test_file_open():
  2. fake_file_path = 'file/path/to/mock'
  3. file_content_mock = 'test'
  4. with patch('path.to.code.function'.format(__name__), new=mock_open(read_data=file_content_mock)) as mock_file:
  5. with patch(os.utime') as mock_utime:
  6. actual = function_to_test(fake_file_path)
  7. mock_file.assert_called_once_with(fake_file_path)
  8. assertIsNotNone(actual)

Compare DataFrames

  1. def as_dicts(df):
  2. df = [row.asDict() for row in df.collect()]
  3. return sorted(df, key=lambda row: str(row))
  4.  
  5. assert as_dicts(df1) == as_dicts(df2)

Python: Create a WHL File

This post will just be a how-to on creating a whl file.

You need the following files:

Manifest.in:

  1. recursive-include <directory> *
  2. recursive-exclude tests *.py

Requirements.txt:

This file just holds your packages and the version.

Setup.py

You remove pytest and coverage from your whl file because you don’t want those applications being required when you deploy your code.

  1. from setuptools import find_packages
  2. from distutils.core import setup
  3. import os
  4. import json
  5.  
  6. if os.path.exists('requirements.txt'):
  7. req = [line.strip('\n') for line in open('requirements.txt') if 'pytest' not in line and 'coverage' not in line]
  8.  
  9. setup(
  10. include_package_data=True,
  11. name=<app_name>,
  12. version=<app-version>,
  13. description=<app_desc>,
  14. install_requires=req,
  15. packages=find_packages(excude=["*tests.*","*tests"]),
  16. classifiers=[
  17. "Programming Language :: Python || <python_Version>",
  18. "License || OSI Approved :: MIT License",
  19. "Operating System :: OS Independent",
  20. ],
  21. python_requires='>=<python_version>',
  22. package_dir={<directory>: <directory>},
  23. )

To Check Your Whl File

Install package

  1. pip install check-wheel-contents

Check WHL

  1. check-wheel-contents <PATH_TO_WHL>\<filename>.whl

Install WHL

This will deploy to <PATH_TO_PYTHON>\Lib\site-packages\<directory>

  1. <PATH_TO_PYTHON>\Scripts\pip3.7.exe install <PATH_TO_WHL>\<filename>.whl

 

 

 

Python: xlrd (Read Excel File)

In this tutorial I will show you how to read an excel file in Python.

Installation

  1. pip install xlrd

Open The Workbook

  1. import xlrd
  2.  
  3. my_excel = (r'C:\path\to\file')
  4. wb = xlrd.open_workbook(my_excel)

Select Sheet

  1. # Select the first sheet. If you want to select the third just change to (3)
  2. sheet = wb.sheet_by_index(0)

Get Data In Column

  1. #This loops through all the rows in that sheet
  2. for i in range(sheet.nrows):
  3. # if the value isn't empty then print it out.
  4. if sheet.cell_value(i, 0) != '':
  5. print(sheet.cell_value(i, 0))

Get all the Column Header

  1. #This loops through all the rows in that sheet
  2. for i in range(sheet.ncols):
  3. # if the value isn't empty then print it out.
  4. if sheet.cell_value(0, i) != '':
  5. print(sheet.cell_value(0, i))

 

Django: React Website

In this tutorial I will demonstrate how to create a Django + React website using Django 2.0. You must have Eclipse installed before you continue. If you have it already installed and configured you can continue on. We will require Postgres 9.4, nodejs before you continue. You can get Nodejs from here. You can get Postgres 9.4 from here.

Pip Django Install:
  1. pip install django
  2. pip install django-webpack-loader
Django Version:

If you are not sure what version you are running do the following

  1. python -c "import django; print(django.get_version())"
Eclipse Create Project:

 

 

 

 

 

 

Eclipse Setup Project:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Eclipse Django DB Settings:

 

 

 

 

 

 

 

 

 

 

 

 

 

Eclipse Django Setup Successful:

Once you click “Finish” your project will look like the following.

 

 

 

Folder Structure:
  • Under djangoApp project.
  • folder: static
  • folder: djangoApp
    • folder: templates
      • file: index.html
      • folder: base
        • file: base.html
  • folder: assets
    • folder: bundles
    • folder: js
      • file: index.jsx
Node:

Inside the djangoApp application do the following

  1. npm init
  2. npm install --save-dev jquery react react-dom webpack webpack-bundle-tracker babel-loader babel-core babel-preset-es2015 babel-preset-react
  3. npm install create-react-class --save
webpack.config.js:
  1. var path = require('path')
  2. var webpack = require('webpack')
  3. var BundleTracker = require('webpack-bundle-tracker')
  4.  
  5. module.exports = {
  6. //the base directory (absolute path) for resolving the entry option
  7. context: __dirname,
  8. //the entry point we created earlier. Note that './' means
  9. //your current directory.
  10. entry: {
  11. "index": [path.resolve(__dirname, "./assets/js/index.jsx")],
  12. },
  13. output: {
  14. path: path.resolve('./assets/bundles/'),
  15. filename: "[name]-[hash].js",
  16. },
  17. plugins: [
  18. //tells webpack where to store data about your bundles.
  19. new BundleTracker({filename: './webpack-stats.json'}),
  20. //makes jQuery available in every module
  21. new webpack.ProvidePlugin({
  22. $: 'jquery',
  23. jQuery: 'jquery',
  24. 'window.jQuery': 'jquery'
  25. })
  26. ],
  27. module: {
  28. loaders: [
  29. {
  30. test: /\.jsx?$/,
  31. exclude: /(node_modules)/,
  32. loader: 'babel-loader',
  33. query: {
  34. presets: ['react','es2015']
  35. }
  36. }
  37. ]
  38. }
  39. }
djangoApp\Settings.py:

Installed Apps

  1. INSTALLED_APPS = [
  2. 'django.contrib.admin',
  3. 'django.contrib.auth',
  4. 'django.contrib.contenttypes',
  5. 'django.contrib.sessions',
  6. 'django.contrib.messages',
  7. 'django.contrib.staticfiles',
  8. 'webpack_loader',
  9. ]

Add/Edit the following template directive

  1. TEMPLATES = [
  2. {
  3. 'BACKEND': 'django.template.backends.django.DjangoTemplates',
  4. 'DIRS': [os.path.join(BASE_DIR, 'djangoApp', 'templates'),],
  5. 'APP_DIRS': True,
  6. 'OPTIONS': {
  7. 'context_processors': [
  8. 'django.template.context_processors.debug',
  9. 'django.template.context_processors.request',
  10. 'django.contrib.auth.context_processors.auth',
  11. 'django.contrib.messages.context_processors.messages',
  12. ],
  13. },
  14. },]

Add the following static directive

  1. STATIC_URL = '/static/'
  2.  
  3. STATICFILES_DIRS = [
  4. os.path.join(BASE_DIR, 'assets'),
  5. ]

Modify DATABASES

  1. DATABASES = {
  2. 'default': {
  3. 'ENGINE': 'django.db.backends.postgresql_psycopg2',
  4. 'NAME': 'YOUR_DB_NAME',
  5. 'USER': 'YOUR_USER',
  6. 'PASSWORD': 'YOUR_PASSWORD',
  7. 'HOST': 'localhost',
  8. 'PORT': 5432
  9. }
  10. }

Webpack Loader

  1. WEBPACK_LOADER = {
  2. 'DEFAULT': {
  3. 'BUNDLE_DIR_NAME': 'bundles/',
  4. 'STATS_FILE': os.path.join(BASE_DIR, 'webpack-stats.json'),
  5. }
  6. }
djangoApp\views.py:

We will create our index page view. Notice the third dict. Those are variables passed to the template to make our site dynamic

  1. from django.shortcuts import render
  2.  
  3. def index(request):
  4. return render(request, 'index.html', {'title': 'Index Page', 'script_name': 'index'})
djangoApp\urls.py:

Add the following imports

  1. from django.conf.urls import url
  2. #This is the index view we created above
  3. from djangoApp.views import index
  4.  
  5. urlpatterns = [
  6. url(r'^$', index, name='index'),
  7. path('admin/', admin.site.urls),
  8. ]
djangoApp\templates\base\base.html:

Let’s setup our base template and setup our blocks that the other templates will inherit from.

  1. <html>
  2. <head>
  3. <title>{% block title %}{% endblock %}</title>
  4. </head>
  5. <body>
  6. {% block content %}
  7. {% endblock %}
  8. </body>
  9. </html>
djangoApp\templates\index.html:

The important parts here are the extends otherwise your base.html template won’t be inherited. As well the {% with %} and title variable makes our template dynamic and allows us to incorporate react in our site.

  1. {% extends "base/base.html" %}
  2. {% load render_bundle from webpack_loader %}
  3. {% load staticfiles %}
  4. {% block title %}
  5. {{title}}
  6. {% endblock %}
  7. {% block content %}
  8. <div id="container"></div>
  9. {% with script=script_name %}
  10. {% render_bundle script 'js' %}
  11. {% endwith %}
  12. {% endblock %}
assets\js\index.jsx:

This is our react class.

  1. var React = require('react');
  2. var ReactDOM = require('react-dom');
  3. var createReactClass = require('create-react-class');
  4.  
  5. var App = createReactClass({
  6. render: function() {
  7. return (
  8. <h1>
  9. React App Page
  10. </h1>
  11. )
  12. }
  13. });
  14.  
  15. ReactDOM.render(<App />, document.getElementById('container'));
Database Setup/Migration:

For this tutorial we used postgres. At this time please make sure you create your djangoApp db and user you specified in the settings.py file. Then run the following commands in order.

  1. #Migrates the auth
  2. python manage.py migrate auth
  3. #migrates the rest
  4. python manage.py migrate
  5. #Create the user for accessing the django admin ui
  6. #This will ask you for user names and passwords. Don't make it the same as in your settings.py file.
  7. python manage.py createsuperuser
Start Server:
  1. webpack -p
  2. python manage.py runserver

Your site is now running at http://localhost:8000.

Your admin site is now running at http://localhost:8000/admin/.

 

References:

I used this video as a guideline to get the project started. However some didn’t work right and needed to adjust and made adjustments to require just one template, etc.

Avro & Python: How to Schema, Write, Read

I have been experimenting with Apache Avro and Python. Below is what I have learned thus far.

Pip Install

At the time of this writing I am using 1.8.2.

  1. pip install avro-python3

Schema

There are so many different ways to work with the schema definition. There are primitive and complex types. You can find way more documentation on the schema definition here.

  1. import json
  2. import avro.schema
  3.  
  4. my_schema = avro.schema.Parse(json.dumps(
  5. {
  6. 'namespace': 'test.avro',
  7. 'type': 'record',
  8. 'name': 'MY_NAME',
  9. 'fields': [
  10. {'name': 'name_1', 'type': 'int'},
  11. {'name': 'name_2', 'type': {'type': 'array', 'items': 'float'}},
  12. {'name': 'name_3', 'type': 'float'},
  13. ]
  14. }))

Method 1

Write

  1. from avro.datafile import DataFileWriter
  2. from avro.io import DatumWriter
  3. import io
  4.  
  5. #write binary
  6. file = open(filename, 'wb')
  7.  
  8. datum_writer = DatumWriter()
  9. fwriter = DataFileWriter(file, datum_writer, my_schema)
  10. fwriter.append({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645})
  11. fwriter.close()

Write Deflate

  1. from avro.datafile import DataFileWriter
  2. from avro.io import DatumWriter
  3.  
  4. #write binary
  5. file = open(filename, 'wb')
  6.  
  7. datum_writer = DatumWriter()
  8. fwriter = DataFileWriter(file, datum_writer, my_schema, codec = 'deflate')
  9. fwriter.append({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645})
  10. fwriter.close()

Append

  1. from avro.datafile import DataFileWriter
  2. from avro.io import DatumWriter
  3. import io
  4.  
  5. #append binary
  6. file = open(filename, 'a+b')
  7.  
  8. datum_writer = DatumWriter()
  9. #Notice that the schema is not added the the datafilewriter. This is because you are appending to an existing avro file
  10. fwriter = DataFileWriter(file, datum_writer)
  11. fwriter.append({'name_1': 645675, 'name_2': [5.6,34.9], 'name_3': 649.5645})
  12. fwriter.close()

Read Schema

  1. from avro.datafile import DataFileReader
  2. from avro.io import DatumReader
  3.  
  4. file = open(filename, 'rb')
  5. datum_reader = DatumReader()
  6. file_reader = DataFileReader(file, datum_reader)
  7.  
  8. print(file_reader .meta)

Read

  1. from avro.datafile import DataFileReader
  2. from avro.io import DatumReader
  3.  
  4. #read binary
  5. fd = open(filename, 'rb')
  6. datum_reader = DatumReader()
  7. file_reader = DataFileReader(fd, datum_reader)
  8.  
  9. for datum in file_reader:
  10. print(datum['name_1'])
  11. print(datum['name_2'])
  12. print(datum['name_3'])
  13. file_reader.close()

Method 2

Write/Append BinaryEncoder

  1. import io
  2. from avro.io import DatumWriter, BinaryEncoder
  3.  
  4. #write binary
  5. file = open(filename, 'wb')
  6. #append binary
  7. file = open(filename, 'a+b')
  8. bytes_writer = io.BytesIO()
  9. encoder = BinaryEncoder(bytes_writer)
  10. writer_binary = DatumWriter(my_schema)
  11. writer_binary.write({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645}, encoder)
  12. file.write(bytes_writer.getvalue())

Read BinaryDecoder

  1. import io
  2. from avro.io import DatumReader, BinaryDecoder
  3.  
  4. file = open(filename, 'rb')
  5. bytes_reader = io.BytesIO(file.read())
  6. decoder = BinaryDecoder(bytes_reader)
  7. reader = DatumReader(my_schema)
  8.  
  9. while True:
  10. try:
  11. rec = reader.read(decoder)
  12. print(rec['name_1'])
  13. print(rec['name_2'])
  14. print(rec['name_3'])
  15. except:
  16. break

 

 

 

Python: Run Process

If you want to run a jar from python or really any process. You do so by leveraging subprocess package.

  1. from subprocess import Popen, PIPE

Then you need to call Popen. If you want to set java memory you can do so using -Xms and -Xmx in between java and -jar.

  1. #bufsize of 1 is line buffered
  2. #stdout and stderr to PIPE is to pipe the output of std out and std error to the PIPE so you can get the output
  3. result = Popen(['java -jar myapp.jar'], stdout=PIPE, stderr=PIPE, shell=False, bufsize=1)

If you want your process to wait until finished you will need to call wait.

  1. result.wait()

If you pushed the stderr and stdout then you can check the output.

  1. if result.stdout is not None:
  2. for line in result.stdout:
  3. print(line)
  4.  
  5. if result.stderr is not None:
  6. for line in result.stderr:
  7. print(line)

Python: Logging

If you want to do some basic logging to a file, etc. You can use the logging package that comes with python. Here are some of the basic ways to log.

You first have to import the package.

  1. import logging

You can setup your own logging configuration but for this we will just use the basic setup and log to a file.

  1. #If you are going to have multiple handlers you should setup your handler
  2. logging.root.handlers = []
  3.  
  4. #The file to log to
  5. log_file = /mnt/log/
  6.  
  7. #Setup the config with the level to log up to
  8. logging.basicConfig(filename=log_file, level=logging.INFO)

Then you setup your logger

  1. logger = logging.getLogger('my_awesome_log')

If you want your log to truncate after a certain size then you must add the handler for truncating the log and back. If you do not use the rotatingfilehandler then the log will increase till your drive runs out of space.

  1. handler = RotatingFileHandler(log_file, maxBytes=1024, backupCount=1)
  2. logger.addHandler(handler)

If you also want to log to console you will need to add an additional handler for the console setting the level to log.

  1. console = logging.StreamHandler()
  2. console.setLevel(logging.INFO)
  3. logger.addHandler(console)

That’s it a basic example of how to use the logging package.

 

Python: Multiprocessing Pool

Sometimes we want to run a method using multiple processors to process our code due to a costly function. Below is an example of how you could do it. There is other api’s you could use like ‘map’ but here is just one example.

  1. from multiprocessing import Pool
  2. # Sets the pool to utilize 4 processes
  3. pool = Pool(processes=4)
  4. result = pool.apply_async(func=my_method, args=("some_info",))
  5. # Performs the aync function
  6. data = result.get()
  7. pool.close()

Python: Selenium Tests

Selenium is a great way to test your UI. It is compatible with different browsers. I will show you two.

Gecko Driver Installation:

Make sure you are using latest version. At the time of this writing it is 0.19.0.

  1. wget https://github.com/mozilla/geckodriver/releases/download/v0.19.0/geckodriver-v0.19.0-linux64.tar.gz
  2. sudo tar -xvzf geckodriver-v0.19.0-linux64.tar.gz
  3. sudo chmod +x geckodriver
  4. cp geckodriver /usr/local/bin/
  5. sudo cp geckodriver /usr/local/bin/

You can use phantomjs, firefox, chrome, etc.

PhantomJS Installation:

  1. sudo mv phantomjs-2.1.1-linux-x86_64.tar.bz2 /usr/local/share/.
  2. cd /usr/local/share/
  3. sudo tar xjf phantomjs-2.1.1-linux-x86_64.tar.bz2
  4. sudo ln -s /usr/local/share/phantomjs-2.1.1-linux-x86_64 /usr/local/share/phantomjs
  5. sudo ln -s /usr/local/share/phantomjs/bin/phantomjs /usr/local/bin/phantomjs

Firefox Installation:

  1. sudo apt-get update
  2. wget https://ftp.mozilla.org/pub/firefox/releases/50.0/linux-x86_64/en-US/firefox-50.0.tar.bz2
  3. sudo tar -xjf firefox-50.0.tar.bz2
  4. sudo rm -rf /opt/firefox
  5. sudo mv firefox /opt/firefox
  6. sudo mv /usr/bin/firefox /usr/bin/firefoxold
  7. sudo ln -s /opt/firefoxX/firefox /usr/bin/firefox

Firefox Headless Installation:

  1. sudo apt-get install xvfb
  2. pip3 install pyvirtualdisplay==0.2.1

Selenium Installation:

  1. pip3 install selenium==3.6.0

PyUnit Selenium Test Examples:

Setup:

  1. #If you are using headless firefox
  2. from pyvirtualdisplay import Display
  3. #The selenium imports
  4. from selenium import webdriver
  5. from selenium.webdriver.common.by import By
  6. from selenium.webdriver.support import expected_conditions as EC
  7. from selenium.webdriver.support.ui import WebDriverWait
  8. import unittest, os, time
  9.  
  10. class MySeleniumTests(unittest.TestCase):
  11. @classmethod
  12. def setUpClass(self):
  13. self.server_url = "http://" + os.getenv("WEBSITE_URL", 'localhost:5000')
  14.  
  15. def setUp(self):
  16. #if you are using firefox headless browser
  17. display = Display(visible=0, size=(1080, 720))
  18. display.start()
  19. #Firefox selenium driver.
  20. self.driver = webdriver.Firefox()
  21. #PhantomJS selenium driver
  22. self.driver = webdriver.PhantomJS()
  23. self.driver.implicitly_wait(60)
  24. self.driver.set_page_load_timeout(60)
  25. self.driver.set_window_size(1080, 720)
  26. self.base_url = self.server_url
  27.  
  28. self.driver.get(self.base_url + "/")
  29. #If your site has a login then you need to set the username and password first.
  30. self.driver.find_element_by_id("user").clear()
  31. self.driver.find_element_by_id("user").send_keys(USERNAME)
  32. self.driver.find_element_by_id("password").clear()
  33. self.driver.find_element_by_id("password").send_keys(PWD)
  34. self.driver.find_element_by_id("submit").click()
  35. time.sleep(1)
  36.  
  37. def tearDown(self):
  38. self.driver.quit()
  39.  
  40. if __name__ == "__main__":
  41. unittest.main()

Test Title:

  1. self.driver.get(self.server_url)
  2. self.assertIn("MySite", self.driver.title)

Find Class:

  1. self.assertTrue(WebDriverWait(self.driver, 10).until(EC.visibility_of_element_located((By.CLASS_NAME, "my-awesome-class"))))

Find ID:

  1. self.assertTrue(WebDriverWait(self.driver, 10).until(EC.visibility_of_element_located((By.ID, "myId"))))

Find Partial Text:

  1. self.driver.find_element_by_partial_link_text("My Text On Page")

Find Element Contains Text:

  1. self.assertTrue('MyText' in self.driver.find_element_by_id('container').text)

Click Element:

  1. self.driver.find_element_by_id('myId').click()

Wait Element To Show:

  1. self.assertTrue(WebDriverWait(self.driver, 10).until(EC.text_to_be_present_in_element((By.ID, 'MyID'), "Text To See")))

xPath Click Second Element:

  1. self.driver.find_element_by_xpath("(//div[@class='my-awesome-class'])[1]").click()

Clear Input:

  1. self.driver.find_element_by_id("myId").clear()

Send Data To Input:

  1. self.driver.find_element_by_id("myId").send_keys('My New Data')

 

 

Python: MRJob

If you use hadoop and you want to run a map reduce type job using Python you can use MRJob.

Installation:

  1. pip install mrjob

Here is an example if you run just the mapper code and you load a json file. yield writes the data out.

  1. from mrjob.job import MRJob, MRStep
  2. import json
  3.  
  4. class MRTest(MRJob):
  5. def steps(self):
  6. return [
  7. MRStep(mapper=self.mapper_test)
  8. ]
  9.  
  10. def mapper_test(self, _, line):
  11. result = {}
  12. doc = json.loads(line)
  13.  
  14. yield key, result
  15.  
  16. if __name__ == '__main__':
  17. MRTest.run()

Python: Working with DateTimes

In this tutorial I will show you the different ways of working with dates and times in python. Which includes working with milliseconds. You should note this isn’t all available options just some that I have encountered over the years.

Install Python Packages:

Open cmd/terminal and if required navigate to your sites working folder. (note: if you are working in a virtual env you should ensure you source it first).

  1. pip install python-dateutil

There are many different packages that we can use to work with date and times. You need to decide what is right for you.

dateutil:

The following will convert the date string you give it fast and easily. This gives you back the datetime object. Notice how we don’t need to pass it a date time format. To me this is very convenient.

  1. from dateutil import parser
  2.  
  3. date_str = '2017-06-06'
  4. date_time_str = '2017-06-07 12:34'
  5. date_time_str_2 = '2017-06-07 12:34:46'
  6. date_time_str_3 = '2017-06-07 12:34:42.234'
  7.  
  8. result = parser.parse(date_str)
  9. print(result) #2017-06-06 00:00:00
  10. result = parser.parse(date_time_str)
  11. print(result) #2017-06-07 12:34:00
  12. result = parser.parse(date_time_str_2)
  13. print(result) #2017-06-07 12:34:46
  14. result = parser.parse(date_time_str_3)
  15. print(result) #2017-06-07 12:34:42.234000

datetime:

The following will convert the date string you give it fast and easily. This gives you back the datetime object. Notice how we need to pass the format of the datetime. If you don’t you will get an exception. This is a convenient way if you know the format before hand. But that might not always be the case.

  1. import datetime
  2.  
  3. date_str = '2017-06-06'
  4. date_time_str = '2017-06-07 12:34'
  5. date_time_str_2 = '2017-06-07 12:34:46'
  6. date_time_str_3 = '2017-06-07 12:34:42.234'
  7.  
  8. result = datetime.datetime.strptime(date_str, "%Y-%m-%d")
  9. print(result) #2017-06-06 00:00:00
  10. result = datetime.datetime.strptime(date_time_str, "%Y-%m-%d %H:%M")
  11. print(result) #2017-06-07 12:34:00
  12. result = datetime.datetime.strptime(date_time_str_2, "%Y-%m-%d %H:%M:%S")
  13. print(result) #2017-06-07 12:34:46
  14. result = datetime.datetime.strptime(date_time_str_3, "%Y-%m-%d %H:%M:%S.%f")
  15. print(result) #2017-06-07 12:34:42.234000

The above all works however the following example will not. Why do you think this is?

  1. import datetime
  2.  
  3. date_time_str = '2017-06-07 12:34:46'
  4.  
  5. try:
  6. datetime.datetime.strptime(date_time_str, "%Y-%m-%d %H:%M:%S")
  7. except:
  8. pass #just for this example don't do this lol

The reason is because datetime expects the correct format to be supplied. We gave it hour minute second but not milliseconds. You will get the following exception (ValueError: unconverted data remains: .234)

Timestamps:

Sometimes we want to convert the date to unix (epoch) time or vise versa.

From Date:
  1. from dateutil import parser
  2. from datetime import timezone
  3.  
  4. date_time_str = '2017-06-07 17:34:42.234'
  5. result = parser.parse(date_time_str)
  6.  
  7. timestamp = result.replace(tzinfo=timezone.utc).timestamp()
  8. print(timestamp) #1496856882.234

This gives us the timestamp as a float as 1496856882.234.

From Timestamp:
  1. from dateutil import parser
  2. import datetime
  3.  
  4. timestamp = 1496856882.234
  5.  
  6. result = datetime.datetime.fromtimestamp(timestamp)
  7. print(result) #2017-06-07 13:34:42.234000
  8.  
  9. result = datetime.datetime.utcfromtimestamp(timestamp)
  10. print(result) #2017-06-07 17:34:42.234000

Get Date Parts:

If you want to get specific date parts such as the year, month, day, hour, etc.

  1. import datetime
  2. from dateutil import parser
  3.  
  4. result = parser.parse(date_time_str_3)
  5. print(result) #2017-06-07 12:34:42.234000
  6.  
  7. year = result.year #2017
  8. month = result.month #6
  9. day = result.day #7
  10. hour = result.hour #12
  11. minute = result.minute #34
  12. second = result.second #42
  13. millisecond = result.microsecond #234000

Add To Date:

If you want to add time to a date.

  1. import datetime
  2. from dateutil import parser
  3. from datetime import timezone, timedelta
  4.  
  5. date_time_str = '2017-06-07 17:34:42.234'
  6. result = parser.parse(date_time_str)
  7. print(result) #2017-06-07 17:34:42.234000
  8.  
  9. timestamp = result.replace(tzinfo=timezone.utc).timestamp()
  10. print(timestamp) #1496856882.234
  11.  
  12. #Add 10 seconds to datetime
  13. new_time = int((datetime.datetime.fromtimestamp(timestamp) + timedelta(milliseconds=10000)).timestamp() * 1000)
  14. print(new_time) #1496856892234

As you can see you can 10 seconds has been added the datetime.

datetime strftime

  1. from datetime import datetime
  2.  
  3. now = datetime.now()
  4. datetime_str = now.strftime("%Y-%m-%d %H:%M:%S")
  5. print(datetime_str)

datetime fromisoformat

  1. from datetime import datetime
  2.  
  3. print(datetime.fromisoformat("2024-04-09 13:48:20"))

 

Python: CSV from Array

In this tutorial I will explain how to turn an array to a csv file. I will show you two ways. One is in memory and the other is to a file.

For both ways you need to import csv and io package.

  1. import csv, io
Way 1 Write (In Memory):
  1. #Create the string buffer
  2. output = io.StringIO()
  3.  
  4. #Setup the csv writer to write the results to a string buffer
  5. wr = csv.writer(output, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
Way 2 Write (File):
  1. #Crate the file itself in write mode
  2. f = open('filename.csv', 'w')
  3.  
  4. #Setup the csv writer to write the results to a file.
  5. wr = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)

Technically both ways have the same setup for the csv writer. Then to write results to the csv writer you then pass an array of values like below.

  1. wr.writerow(['123',5,4,'value'])

To Read the contents of the file or string buffer depends on which way you chose. I show you those ways below.

Way 1 Read (In Memory):
  1. b = bytes(output.getvalue(), 'utf-u')
Way 2 Read (File):
  1. f.close()
  2. file_data = open('filename.csv', 'r').read()

If you want to send the file down using something like flask send_file then you need to convert it to BytesIO.

  1. buffer = BytesIO()
  2. buffer.write(b)
  3. #You must seek to beginning otherwise it won't send anything back.
  4. buffer.seek(0)

Now if you are sending it as a file back to the user and are using something like flask this is how you do that. Pretty straight forward.

  1. return send_file(buffer, mimetype='application/octet-stream', as_attachment=True, attachment_filename='myFile.csv')