Samples¶
Impatient developers often love samples to learn quickly. In this part, we will show you how to use CoWorks to :
Understand the CoWorks layer service.
Create a directory service to call technical microservice by there name.
Create a website with content defined in the CosmisJS headless tool.
CoWorks layers¶
Very simple microservice defining a simple HTML page and a call thru javascript.
import os
import re
from datetime import datetime
import boto3
from aws_xray_sdk.core import xray_recorder
from flask import render_template
from flask import url_for
from werkzeug.exceptions import BadRequest
from coworks import TechMicroService
from coworks import entry
from coworks import request
from coworks.blueprint.admin_blueprint import Admin
from coworks.extension.xray import XRay
class CoworksLayersMicroService(TechMicroService):
DOC_MD = """
## Layers service
Microservice to get all available CoWorks layers.
"""
def __init__(self, **kwargs):
super().__init__(name="cws_layers", **kwargs)
self.register_blueprint(Admin(), url_prefix='/admin')
access_key = os.getenv("KEY_ID")
secret_key = os.getenv("SECRET_KEY")
if not access_key or not secret_key:
raise BadRequest("Something wrong in your environment : no AWS credentials defined!")
session = boto3.Session(access_key, secret_key, region_name='eu-west-1')
self.lambda_client = session.client('lambda')
@entry(no_auth=True)
def get_home(self):
"""HTML page to get the layers."""
headers = {'Content-Type': 'text/html; charset=utf-8'}
return render_template('home.j2', url=url_for('get')), 200, headers
@entry(no_auth=True)
def get(self, full: bool = False):
"""Layers in json or text format."""
version_pattern = re.compile(r"coworks-[_\d]*")
res = self.lambda_clienlist_layers()
layers = {x['LayerName']: x for x in filter(lambda x: version_pattern.fullmatch(x['LayerName']), res['Layers'])}
if full:
return layers
versions = [*filter(lambda x: 'dev' not in x, map(lambda x: x[8:].replace('_', '.'), layers))]
versions.sort(key=lambda s: list(map(int, s.split('.'))))
last_version = layers[f"coworks-{versions[-1].replace('.', '_')}"]
layers = map(lambda x: x['LayerArn'], layers.values())
if request.accept_mimetypes['text/html']:
return to_html(layers, last_version)
return to_json(layers, last_version)
def to_json(layers, last_version):
return {
'last': last_version['LayerArn'],
'layers': [*layers],
}
def to_html(layers, last_version):
data = {
'layers': layers,
'now': datetime.now(),
}
return render_template('layers.j2', **data), 200, {'Content-Type': 'text/html; charset=utf-8'}
app = CoworksLayersMicroService()
XRay(app, xray_recorder)
Website¶
Very simple microservice defining a simple HTML website.
import os
import requests
from flask import send_from_directory
from flask_login import LoginManager
from account import AccountBlueprint
from coworks import TechMicroService
from coworks import __version__ as coworks_version
from coworks import entry
from coworks.blueprint.admin_blueprint import Admin
from util import render_html_template
class WebsiteMicroService(TechMicroService):
DOC_MD = """
## Simple Flask website
Microservice to implement a small website with session.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.config['SECRET_KEY'] = os.getenv('SECRET_KEY')
self.register_blueprint(Admin(), url_prefix='/admin')
self.register_blueprint(AccountBlueprint(LoginManager(self)), url_prefix='/account')
@self.context_processor
def inject_context():
context = {
"version": coworks_version
}
headers = {'Authorization': os.getenv('GITHUB_TOKEN')}
resp = requests.get(os.getenv('COWORKS_GITHUB_URL'), headers=headers)
if resp.ok:
context["stargazers_count"] = resp.json()["stargazers_count"]
return context
@entry(no_auth=True)
def get(self):
"""Entry for the home page."""
data = {}
headers = {'Accept': "application/json"}
resp = requests.get(os.getenv('COWORKS_LAYERS_URL'), headers=headers)
if resp.ok:
layers_resp = resp.json()
data['last_layer'] = layers_resp["last"]
zip_name = layers_resp["last"].split(':')[-1].replace('_', '.')
data['last_layer_url'] = f"https://coworks-layer.s3.eu-west-1.amazonaws.com/{zip_name}.zip"
return render_html_template("home.j2", **data)
@entry(no_auth=True)
def get_assets_css(self, filename):
"""Access for all css files."""
return send_from_directory('assets', f"css/{filename}", as_attachment=False, conditional=False)
@entry(no_auth=True, binary_headers={'Content-Type': 'image/webp', 'Content-Disposition': 'inline'})
def get_assets_img(self, filename):
"""Access for all images."""
return send_from_directory('assets', f"img/{filename}", conditional=False)
@entry(no_auth=True, binary_headers={'Content-Type': 'application/zip',
'Content-Disposition': 'attachment; filename=plugins.zip'})
def get_zip(self):
"""Access to the AirFlow plugins zip."""
return send_from_directory('assets', "plugins.zip", conditional=False)
Directory¶
This microservice is usefull for the BizMicroservice
.
from coworks.blueprint.admin_blueprint import Admin
from coworks.tech.directory import DirectoryMicroService
app = DirectoryMicroService()
app.register_blueprint(Admin(), url_prefix='/admin')
To create your directory service, you just have to define a file env_vars/vars.secret.json
like
{
"AWS_USER_ACCESS_KEY_ID": XXXX,
"AWS_USER_SECRET_ACCESS_KEY": YYY
}